From 05ef9b5c45b9c497c2e9fe4132aef35e48cd30c1 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Mon, 16 Apr 2018 17:20:41 -0500 Subject: [PATCH 1/2] pass AnyExecutable objects as reference to avoid memory allocation --- rclcpp/include/rclcpp/any_executable.hpp | 2 - rclcpp/include/rclcpp/executor.hpp | 14 ++- rclcpp/include/rclcpp/memory_strategy.hpp | 10 +- .../strategies/allocator_memory_strategy.hpp | 39 +++---- rclcpp/src/rclcpp/executor.cpp | 108 ++++++++++-------- .../executors/multi_threaded_executor.cpp | 6 +- .../executors/single_threaded_executor.cpp | 7 +- 7 files changed, 91 insertions(+), 95 deletions(-) diff --git a/rclcpp/include/rclcpp/any_executable.hpp b/rclcpp/include/rclcpp/any_executable.hpp index 4820b4b8d4..3f810134ee 100644 --- a/rclcpp/include/rclcpp/any_executable.hpp +++ b/rclcpp/include/rclcpp/any_executable.hpp @@ -33,8 +33,6 @@ namespace executor struct AnyExecutable { - RCLCPP_SMART_PTR_DEFINITIONS(AnyExecutable) - RCLCPP_PUBLIC AnyExecutable(); diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 3059a549e8..657529f404 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -287,7 +287,7 @@ class Executor */ RCLCPP_PUBLIC void - execute_any_executable(AnyExecutable::SharedPtr any_exec); + execute_any_executable(AnyExecutable & any_exec); RCLCPP_PUBLIC static void @@ -325,15 +325,17 @@ class Executor RCLCPP_PUBLIC void - get_next_timer(AnyExecutable::SharedPtr any_exec); + get_next_timer(AnyExecutable & any_exec); RCLCPP_PUBLIC - AnyExecutable::SharedPtr - get_next_ready_executable(); + bool + get_next_ready_executable(AnyExecutable & any_executable); RCLCPP_PUBLIC - AnyExecutable::SharedPtr - get_next_executable(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + bool + get_next_executable( + AnyExecutable & any_executable, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. std::atomic_bool spinning; diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index da1b160888..aa88913692 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -55,27 +55,23 @@ class RCLCPP_PUBLIC MemoryStrategy virtual void clear_handles() = 0; virtual void remove_null_handles(rcl_wait_set_t * wait_set) = 0; - /// Provide a newly initialized AnyExecutable object. - // \return Shared pointer to the fresh executable. - virtual rclcpp::executor::AnyExecutable::SharedPtr instantiate_next_executable() = 0; - virtual void add_guard_condition(const rcl_guard_condition_t * guard_condition) = 0; virtual void remove_guard_condition(const rcl_guard_condition_t * guard_condition) = 0; virtual void get_next_subscription( - rclcpp::executor::AnyExecutable::SharedPtr any_exec, + rclcpp::executor::AnyExecutable & any_exec, const WeakNodeVector & weak_nodes) = 0; virtual void get_next_service( - rclcpp::executor::AnyExecutable::SharedPtr any_exec, + rclcpp::executor::AnyExecutable & any_exec, const WeakNodeVector & weak_nodes) = 0; virtual void get_next_client( - rclcpp::executor::AnyExecutable::SharedPtr any_exec, + rclcpp::executor::AnyExecutable & any_exec, const WeakNodeVector & weak_nodes) = 0; virtual rcl_allocator_t diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index ef820d6c0b..1b01681251 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -48,21 +48,16 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy public: RCLCPP_SMART_PTR_DEFINITIONS(AllocatorMemoryStrategy) - using ExecAllocTraits = allocator::AllocRebind; - using ExecAlloc = typename ExecAllocTraits::allocator_type; - using ExecDeleter = allocator::Deleter; using VoidAllocTraits = typename allocator::AllocRebind; using VoidAlloc = typename VoidAllocTraits::allocator_type; explicit AllocatorMemoryStrategy(std::shared_ptr allocator) { - executable_allocator_ = std::make_shared(*allocator.get()); allocator_ = std::make_shared(*allocator.get()); } AllocatorMemoryStrategy() { - executable_allocator_ = std::make_shared(); allocator_ = std::make_shared(); } @@ -235,16 +230,9 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy return true; } - /// Provide a newly initialized AnyExecutable object. - // \return Shared pointer to the fresh executable. - executor::AnyExecutable::SharedPtr instantiate_next_executable() - { - return std::allocate_shared(*executable_allocator_.get()); - } - virtual void get_next_subscription( - executor::AnyExecutable::SharedPtr any_exec, + executor::AnyExecutable & any_exec, const WeakNodeVector & weak_nodes) { auto it = subscription_handles_.begin(); @@ -272,12 +260,12 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy } // Otherwise it is safe to set and return the any_exec if (is_intra_process) { - any_exec->subscription_intra_process = subscription; + any_exec.subscription_intra_process = subscription; } else { - any_exec->subscription = subscription; + any_exec.subscription = subscription; } - any_exec->callback_group = group; - any_exec->node_base = get_node_by_group(group, weak_nodes); + any_exec.callback_group = group; + any_exec.node_base = get_node_by_group(group, weak_nodes); subscription_handles_.erase(it); return; } @@ -288,7 +276,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy virtual void get_next_service( - executor::AnyExecutable::SharedPtr any_exec, + executor::AnyExecutable & any_exec, const WeakNodeVector & weak_nodes) { auto it = service_handles_.begin(); @@ -310,9 +298,9 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy continue; } // Otherwise it is safe to set and return the any_exec - any_exec->service = service; - any_exec->callback_group = group; - any_exec->node_base = get_node_by_group(group, weak_nodes); + any_exec.service = service; + any_exec.callback_group = group; + any_exec.node_base = get_node_by_group(group, weak_nodes); service_handles_.erase(it); return; } @@ -322,7 +310,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy } virtual void - get_next_client(executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes) + get_next_client(executor::AnyExecutable & any_exec, const WeakNodeVector & weak_nodes) { auto it = client_handles_.begin(); while (it != client_handles_.end()) { @@ -343,9 +331,9 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy continue; } // Otherwise it is safe to set and return the any_exec - any_exec->client = client; - any_exec->callback_group = group; - any_exec->node_base = get_node_by_group(group, weak_nodes); + any_exec.client = client; + any_exec.callback_group = group; + any_exec.node_base = get_node_by_group(group, weak_nodes); client_handles_.erase(it); return; } @@ -396,7 +384,6 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy VectorRebind> client_handles_; VectorRebind> timer_handles_; - std::shared_ptr executable_allocator_; std::shared_ptr allocator_; }; diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index ae2a34d4c8..61da7d0f4e 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -206,9 +206,13 @@ Executor::spin_some() throw std::runtime_error("spin_some() called while already spinning"); } RCLCPP_SCOPE_EXIT(this->spinning.store(false); ); - AnyExecutable::SharedPtr any_exec; - while ((any_exec = get_next_executable(std::chrono::milliseconds::zero())) && spinning.load()) { - execute_any_executable(any_exec); + while (spinning.load()) { + AnyExecutable any_exec; + if (get_next_executable(any_exec, std::chrono::milliseconds::zero())) { + execute_any_executable(any_exec); + } else { + break; + } } } @@ -218,9 +222,11 @@ Executor::spin_once(std::chrono::nanoseconds timeout) if (spinning.exchange(true)) { throw std::runtime_error("spin_once() called while already spinning"); } - RCLCPP_SCOPE_EXIT(this->spinning.store(false); ); - auto any_exec = get_next_executable(timeout); - if (any_exec) { + RCLCPP_SCOPE_EXIT({ + this->spinning.store(false); + }); + AnyExecutable any_exec; + if (get_next_executable(any_exec, timeout)) { execute_any_executable(any_exec); } } @@ -244,28 +250,28 @@ Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr } void -Executor::execute_any_executable(AnyExecutable::SharedPtr any_exec) +Executor::execute_any_executable(AnyExecutable & any_exec) { - if (!any_exec || !spinning.load()) { + if (!spinning.load()) { return; } - if (any_exec->timer) { - execute_timer(any_exec->timer); + if (any_exec.timer) { + execute_timer(any_exec.timer); } - if (any_exec->subscription) { - execute_subscription(any_exec->subscription); + if (any_exec.subscription) { + execute_subscription(any_exec.subscription); } - if (any_exec->subscription_intra_process) { - execute_intra_process_subscription(any_exec->subscription_intra_process); + if (any_exec.subscription_intra_process) { + execute_intra_process_subscription(any_exec.subscription_intra_process); } - if (any_exec->service) { - execute_service(any_exec->service); + if (any_exec.service) { + execute_service(any_exec.service); } - if (any_exec->client) { - execute_client(any_exec->client); + if (any_exec.client) { + execute_client(any_exec.client); } // Reset the callback_group, regardless of type - any_exec->callback_group->can_be_taken_from().store(true); + any_exec.callback_group->can_be_taken_from().store(true); // Wake the wait, because it may need to be recalculated or work that // was previously blocked is now available. if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { @@ -508,7 +514,7 @@ Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer) } void -Executor::get_next_timer(AnyExecutable::SharedPtr any_exec) +Executor::get_next_timer(AnyExecutable & any_exec) { for (auto & weak_node : weak_nodes_) { auto node = weak_node.lock(); @@ -523,8 +529,8 @@ Executor::get_next_timer(AnyExecutable::SharedPtr any_exec) for (auto & timer_ref : group->get_timer_ptrs()) { auto timer = timer_ref.lock(); if (timer && timer->is_ready()) { - any_exec->timer = timer; - any_exec->callback_group = group; + any_exec.timer = timer; + any_exec.callback_group = group; node = get_node_by_group(group); return; } @@ -533,67 +539,69 @@ Executor::get_next_timer(AnyExecutable::SharedPtr any_exec) } } -AnyExecutable::SharedPtr -Executor::get_next_ready_executable() +bool +Executor::get_next_ready_executable(AnyExecutable & any_executable) { - auto any_exec = memory_strategy_->instantiate_next_executable(); // Check the timers to see if there are any that are ready, if so return - get_next_timer(any_exec); - if (any_exec->timer) { - return any_exec; + get_next_timer(any_executable); + if (any_executable.timer) { + return true; } // Check the subscriptions to see if there are any that are ready - memory_strategy_->get_next_subscription(any_exec, weak_nodes_); - if (any_exec->subscription || any_exec->subscription_intra_process) { - return any_exec; + memory_strategy_->get_next_subscription(any_executable, weak_nodes_); + if (any_executable.subscription || any_executable.subscription_intra_process) { + return true; } // Check the services to see if there are any that are ready - memory_strategy_->get_next_service(any_exec, weak_nodes_); - if (any_exec->service) { - return any_exec; + memory_strategy_->get_next_service(any_executable, weak_nodes_); + if (any_executable.service) { + return true; } // Check the clients to see if there are any that are ready - memory_strategy_->get_next_client(any_exec, weak_nodes_); - if (any_exec->client) { - return any_exec; + memory_strategy_->get_next_client(any_executable, weak_nodes_); + if (any_executable.client) { + return true; } // If there is no ready executable, return a null ptr - return nullptr; + return false; } -AnyExecutable::SharedPtr -Executor::get_next_executable(std::chrono::nanoseconds timeout) +bool +Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout) { + bool success = false; // Check to see if there are any subscriptions or timers needing service // TODO(wjwwood): improve run to run efficiency of this function - auto any_exec = get_next_ready_executable(); + success = get_next_ready_executable(any_executable); // If there are none - if (!any_exec) { + if (!success) { // Wait for subscriptions or timers to work on wait_for_work(timeout); if (!spinning.load()) { - return nullptr; + return false; } // Try again - any_exec = get_next_ready_executable(); + success = get_next_ready_executable(any_executable); } // At this point any_exec should be valid with either a valid subscription // or a valid timer, or it should be a null shared_ptr - if (any_exec) { + if (success) { // If it is valid, check to see if the group is mutually exclusive or // not, then mark it accordingly - if (any_exec->callback_group && any_exec->callback_group->type() == \ - callback_group::CallbackGroupType::MutuallyExclusive) + using callback_group::CallbackGroupType; + if ( + any_executable.callback_group && + any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive) { // It should not have been taken otherwise - assert(any_exec->callback_group->can_be_taken_from().load()); + assert(any_executable.callback_group->can_be_taken_from().load()); // Set to false to indicate something is being run from this group // This is reset to true either when the any_exec is executed or when the // any_exec is destructued - any_exec->callback_group->can_be_taken_from().store(false); + any_executable.callback_group->can_be_taken_from().store(false); } } - return any_exec; + return success; } std::ostream & diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index d129f6104d..4752bb8bb8 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -69,13 +69,15 @@ void MultiThreadedExecutor::run(size_t) { while (rclcpp::ok() && spinning.load()) { - executor::AnyExecutable::SharedPtr any_exec; + executor::AnyExecutable any_exec; { std::lock_guard wait_lock(wait_mutex_); if (!rclcpp::ok() || !spinning.load()) { return; } - any_exec = get_next_executable(); + if (!get_next_executable(any_exec)) { + continue; + } } execute_any_executable(any_exec); } diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index 291560d13c..0a2b697b09 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include "rclcpp/executors/single_threaded_executor.hpp" +#include "rclcpp/any_executable.hpp" #include "rclcpp/scope_exit.hpp" using rclcpp::executors::SingleThreadedExecutor; @@ -30,7 +31,9 @@ SingleThreadedExecutor::spin() } RCLCPP_SCOPE_EXIT(this->spinning.store(false); ); while (rclcpp::ok() && spinning.load()) { - auto any_exec = get_next_executable(); - execute_any_executable(any_exec); + rclcpp::executor::AnyExecutable any_executable; + if (get_next_executable(any_executable)) { + execute_any_executable(any_executable); + } } } From be2d76e6183de0dae3325905fb09ccc1d22cb074 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Tue, 17 Apr 2018 21:53:15 -0500 Subject: [PATCH 2/2] remove style change --- rclcpp/src/rclcpp/executor.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 61da7d0f4e..5cd79e9f1b 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -222,9 +222,7 @@ Executor::spin_once(std::chrono::nanoseconds timeout) if (spinning.exchange(true)) { throw std::runtime_error("spin_once() called while already spinning"); } - RCLCPP_SCOPE_EXIT({ - this->spinning.store(false); - }); + RCLCPP_SCOPE_EXIT(this->spinning.store(false); ); AnyExecutable any_exec; if (get_next_executable(any_exec, timeout)) { execute_any_executable(any_exec);