Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pass AnyExecutable objects as reference to avoid memory allocation #463

Merged
merged 2 commits into from
Apr 18, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions rclcpp/include/rclcpp/any_executable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ namespace executor

struct AnyExecutable
{
RCLCPP_SMART_PTR_DEFINITIONS(AnyExecutable)

RCLCPP_PUBLIC
AnyExecutable();

Expand Down
14 changes: 8 additions & 6 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class Executor
*/
RCLCPP_PUBLIC
void
execute_any_executable(AnyExecutable::SharedPtr any_exec);
execute_any_executable(AnyExecutable & any_exec);

RCLCPP_PUBLIC
static void
Expand Down Expand Up @@ -325,15 +325,17 @@ class Executor

RCLCPP_PUBLIC
void
get_next_timer(AnyExecutable::SharedPtr any_exec);
get_next_timer(AnyExecutable & any_exec);

RCLCPP_PUBLIC
AnyExecutable::SharedPtr
get_next_ready_executable();
bool
get_next_ready_executable(AnyExecutable & any_executable);

RCLCPP_PUBLIC
AnyExecutable::SharedPtr
get_next_executable(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
bool
get_next_executable(
AnyExecutable & any_executable,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;
Expand Down
10 changes: 3 additions & 7 deletions rclcpp/include/rclcpp/memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,27 +55,23 @@ class RCLCPP_PUBLIC MemoryStrategy
virtual void clear_handles() = 0;
virtual void remove_null_handles(rcl_wait_set_t * wait_set) = 0;

/// Provide a newly initialized AnyExecutable object.
// \return Shared pointer to the fresh executable.
virtual rclcpp::executor::AnyExecutable::SharedPtr instantiate_next_executable() = 0;

virtual void add_guard_condition(const rcl_guard_condition_t * guard_condition) = 0;

virtual void remove_guard_condition(const rcl_guard_condition_t * guard_condition) = 0;

virtual void
get_next_subscription(
rclcpp::executor::AnyExecutable::SharedPtr any_exec,
rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0;

virtual void
get_next_service(
rclcpp::executor::AnyExecutable::SharedPtr any_exec,
rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0;

virtual void
get_next_client(
rclcpp::executor::AnyExecutable::SharedPtr any_exec,
rclcpp::executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes) = 0;

virtual rcl_allocator_t
Expand Down
39 changes: 13 additions & 26 deletions rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,16 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
public:
RCLCPP_SMART_PTR_DEFINITIONS(AllocatorMemoryStrategy<Alloc>)

using ExecAllocTraits = allocator::AllocRebind<executor::AnyExecutable, Alloc>;
using ExecAlloc = typename ExecAllocTraits::allocator_type;
using ExecDeleter = allocator::Deleter<ExecAlloc, executor::AnyExecutable>;
using VoidAllocTraits = typename allocator::AllocRebind<void *, Alloc>;
using VoidAlloc = typename VoidAllocTraits::allocator_type;

explicit AllocatorMemoryStrategy(std::shared_ptr<Alloc> allocator)
{
executable_allocator_ = std::make_shared<ExecAlloc>(*allocator.get());
allocator_ = std::make_shared<VoidAlloc>(*allocator.get());
}

AllocatorMemoryStrategy()
{
executable_allocator_ = std::make_shared<ExecAlloc>();
allocator_ = std::make_shared<VoidAlloc>();
}

Expand Down Expand Up @@ -235,16 +230,9 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
return true;
}

/// Provide a newly initialized AnyExecutable object.
// \return Shared pointer to the fresh executable.
executor::AnyExecutable::SharedPtr instantiate_next_executable()
{
return std::allocate_shared<executor::AnyExecutable>(*executable_allocator_.get());
}

virtual void
get_next_subscription(
executor::AnyExecutable::SharedPtr any_exec,
executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes)
{
auto it = subscription_handles_.begin();
Expand Down Expand Up @@ -272,12 +260,12 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}
// Otherwise it is safe to set and return the any_exec
if (is_intra_process) {
any_exec->subscription_intra_process = subscription;
any_exec.subscription_intra_process = subscription;
} else {
any_exec->subscription = subscription;
any_exec.subscription = subscription;
}
any_exec->callback_group = group;
any_exec->node_base = get_node_by_group(group, weak_nodes);
any_exec.callback_group = group;
any_exec.node_base = get_node_by_group(group, weak_nodes);
subscription_handles_.erase(it);
return;
}
Expand All @@ -288,7 +276,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy

virtual void
get_next_service(
executor::AnyExecutable::SharedPtr any_exec,
executor::AnyExecutable & any_exec,
const WeakNodeVector & weak_nodes)
{
auto it = service_handles_.begin();
Expand All @@ -310,9 +298,9 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->service = service;
any_exec->callback_group = group;
any_exec->node_base = get_node_by_group(group, weak_nodes);
any_exec.service = service;
any_exec.callback_group = group;
any_exec.node_base = get_node_by_group(group, weak_nodes);
service_handles_.erase(it);
return;
}
Expand All @@ -322,7 +310,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}

virtual void
get_next_client(executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes)
get_next_client(executor::AnyExecutable & any_exec, const WeakNodeVector & weak_nodes)
{
auto it = client_handles_.begin();
while (it != client_handles_.end()) {
Expand All @@ -343,9 +331,9 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->client = client;
any_exec->callback_group = group;
any_exec->node_base = get_node_by_group(group, weak_nodes);
any_exec.client = client;
any_exec.callback_group = group;
any_exec.node_base = get_node_by_group(group, weak_nodes);
client_handles_.erase(it);
return;
}
Expand Down Expand Up @@ -396,7 +384,6 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
VectorRebind<std::shared_ptr<const rcl_client_t>> client_handles_;
VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;

std::shared_ptr<ExecAlloc> executable_allocator_;
std::shared_ptr<VoidAlloc> allocator_;
};

Expand Down
108 changes: 58 additions & 50 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,13 @@ Executor::spin_some()
throw std::runtime_error("spin_some() called while already spinning");
}
RCLCPP_SCOPE_EXIT(this->spinning.store(false); );
AnyExecutable::SharedPtr any_exec;
while ((any_exec = get_next_executable(std::chrono::milliseconds::zero())) && spinning.load()) {
execute_any_executable(any_exec);
while (spinning.load()) {
AnyExecutable any_exec;
if (get_next_executable(any_exec, std::chrono::milliseconds::zero())) {
execute_any_executable(any_exec);
} else {
break;
}
}
}

Expand All @@ -218,9 +222,11 @@ Executor::spin_once(std::chrono::nanoseconds timeout)
if (spinning.exchange(true)) {
throw std::runtime_error("spin_once() called while already spinning");
}
RCLCPP_SCOPE_EXIT(this->spinning.store(false); );
auto any_exec = get_next_executable(timeout);
if (any_exec) {
RCLCPP_SCOPE_EXIT({
this->spinning.store(false);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, is this a bug fix or style fix? It looks like the macro already adds {} around its arguments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency, there's another use of the scope exit macro on line 208 that could have {} added to it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove this diff and maybe make a separate pr for this change then, since its growing in scope :)

AnyExecutable any_exec;
if (get_next_executable(any_exec, timeout)) {
execute_any_executable(any_exec);
}
}
Expand All @@ -244,28 +250,28 @@ Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr
}

void
Executor::execute_any_executable(AnyExecutable::SharedPtr any_exec)
Executor::execute_any_executable(AnyExecutable & any_exec)
{
if (!any_exec || !spinning.load()) {
if (!spinning.load()) {
return;
}
if (any_exec->timer) {
execute_timer(any_exec->timer);
if (any_exec.timer) {
execute_timer(any_exec.timer);
}
if (any_exec->subscription) {
execute_subscription(any_exec->subscription);
if (any_exec.subscription) {
execute_subscription(any_exec.subscription);
}
if (any_exec->subscription_intra_process) {
execute_intra_process_subscription(any_exec->subscription_intra_process);
if (any_exec.subscription_intra_process) {
execute_intra_process_subscription(any_exec.subscription_intra_process);
}
if (any_exec->service) {
execute_service(any_exec->service);
if (any_exec.service) {
execute_service(any_exec.service);
}
if (any_exec->client) {
execute_client(any_exec->client);
if (any_exec.client) {
execute_client(any_exec.client);
}
// Reset the callback_group, regardless of type
any_exec->callback_group->can_be_taken_from().store(true);
any_exec.callback_group->can_be_taken_from().store(true);
// Wake the wait, because it may need to be recalculated or work that
// was previously blocked is now available.
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
Expand Down Expand Up @@ -508,7 +514,7 @@ Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer)
}

void
Executor::get_next_timer(AnyExecutable::SharedPtr any_exec)
Executor::get_next_timer(AnyExecutable & any_exec)
{
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
Expand All @@ -523,8 +529,8 @@ Executor::get_next_timer(AnyExecutable::SharedPtr any_exec)
for (auto & timer_ref : group->get_timer_ptrs()) {
auto timer = timer_ref.lock();
if (timer && timer->is_ready()) {
any_exec->timer = timer;
any_exec->callback_group = group;
any_exec.timer = timer;
any_exec.callback_group = group;
node = get_node_by_group(group);
return;
}
Expand All @@ -533,67 +539,69 @@ Executor::get_next_timer(AnyExecutable::SharedPtr any_exec)
}
}

AnyExecutable::SharedPtr
Executor::get_next_ready_executable()
bool
Executor::get_next_ready_executable(AnyExecutable & any_executable)
{
auto any_exec = memory_strategy_->instantiate_next_executable();
// Check the timers to see if there are any that are ready, if so return
get_next_timer(any_exec);
if (any_exec->timer) {
return any_exec;
get_next_timer(any_executable);
if (any_executable.timer) {
return true;
}
// Check the subscriptions to see if there are any that are ready
memory_strategy_->get_next_subscription(any_exec, weak_nodes_);
if (any_exec->subscription || any_exec->subscription_intra_process) {
return any_exec;
memory_strategy_->get_next_subscription(any_executable, weak_nodes_);
if (any_executable.subscription || any_executable.subscription_intra_process) {
return true;
}
// Check the services to see if there are any that are ready
memory_strategy_->get_next_service(any_exec, weak_nodes_);
if (any_exec->service) {
return any_exec;
memory_strategy_->get_next_service(any_executable, weak_nodes_);
if (any_executable.service) {
return true;
}
// Check the clients to see if there are any that are ready
memory_strategy_->get_next_client(any_exec, weak_nodes_);
if (any_exec->client) {
return any_exec;
memory_strategy_->get_next_client(any_executable, weak_nodes_);
if (any_executable.client) {
return true;
}
// If there is no ready executable, return a null ptr
return nullptr;
return false;
}

AnyExecutable::SharedPtr
Executor::get_next_executable(std::chrono::nanoseconds timeout)
bool
Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout)
{
bool success = false;
// Check to see if there are any subscriptions or timers needing service
// TODO(wjwwood): improve run to run efficiency of this function
auto any_exec = get_next_ready_executable();
success = get_next_ready_executable(any_executable);
// If there are none
if (!any_exec) {
if (!success) {
// Wait for subscriptions or timers to work on
wait_for_work(timeout);
if (!spinning.load()) {
return nullptr;
return false;
}
// Try again
any_exec = get_next_ready_executable();
success = get_next_ready_executable(any_executable);
}
// At this point any_exec should be valid with either a valid subscription
// or a valid timer, or it should be a null shared_ptr
if (any_exec) {
if (success) {
// If it is valid, check to see if the group is mutually exclusive or
// not, then mark it accordingly
if (any_exec->callback_group && any_exec->callback_group->type() == \
callback_group::CallbackGroupType::MutuallyExclusive)
using callback_group::CallbackGroupType;
if (
any_executable.callback_group &&
any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive)
{
// It should not have been taken otherwise
assert(any_exec->callback_group->can_be_taken_from().load());
assert(any_executable.callback_group->can_be_taken_from().load());
// Set to false to indicate something is being run from this group
// This is reset to true either when the any_exec is executed or when the
// any_exec is destructued
any_exec->callback_group->can_be_taken_from().store(false);
any_executable.callback_group->can_be_taken_from().store(false);
}
}
return any_exec;
return success;
}

std::ostream &
Expand Down
6 changes: 4 additions & 2 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,15 @@ void
MultiThreadedExecutor::run(size_t)
{
while (rclcpp::ok() && spinning.load()) {
executor::AnyExecutable::SharedPtr any_exec;
executor::AnyExecutable any_exec;
{
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
if (!rclcpp::ok() || !spinning.load()) {
return;
}
any_exec = get_next_executable();
if (!get_next_executable(any_exec)) {
continue;
}
}
execute_any_executable(any_exec);
}
Expand Down
Loading