From c6d7db176e0ea51d2a74b3db279fdb7c348c8349 Mon Sep 17 00:00:00 2001 From: Guillaume Autran Date: Wed, 24 Jul 2019 15:41:34 -0400 Subject: [PATCH] Crash in callback group pointer vector iterator 1. Memory leak in callback group weak reference vectors There is leak in the callback group weak reference vectors that occurs when a weak reference becomes invalid (subscription is dropped, service deleted, etc). The now obsolete weak pointer reference is never deleted from the callback group pointer vector causing the leak. The resolution of this problem is implemented by scanning and deleting expired weak pointer at the time of insertion of a new weak pointer into the callback group vectors. This approach is the lowest computational cost to purging obsolete weak pointers. 2. Crash in iterator for callback group pointer vectors This problem exists because a reference to the callback group pointer vector is provided as a return value to facilitate loop iterator. This is a significant crash root cause with a multithreaded executor where a thread is able to add new subscription to the callback group. The crash is caused by a concurrent modification of the weak pointer vector while another thread is iterating over that same vector resulting in a crash. 3. Mutually exclusive callback group hangs The root cause for this issue is due to a combination between the multithreaded executor and the mutually exclusive callback group used for all the ROS topics. When the executor collects all the references to the subscriptions, timers and services, it skips the mutually exclusive callback_groups which are currently locked (ie: being processed by another thread). This cause the resulting waitset to only contain the guard pointers. If there is no activity on those guards, the thread will wait for work forever in the get_next_executable and block all other threads. The resolution is to simply add a timeout for the multithreaded get_next_executable call ensuring the calling thread will eventually return Testing: These changes where implemented and tested using a test application which creates / publish / deletes thousands of topics (up to 100,000) from a separate standalone thread while the ROS2 layer is receiving data on the topics deleted. The muiltithreaded was setup to contain 10 separate executor threads on a single mutually exclusive callback group containing thousands of topics. issue: #813 Signed-off-by: Guillaume Autran --- rclcpp/include/rclcpp/callback_group.hpp | 68 ++++++++++----- .../executors/multi_threaded_executor.hpp | 5 +- .../strategies/allocator_memory_strategy.hpp | 41 ++++----- rclcpp/src/rclcpp/callback_group.cpp | 64 +++++++------- rclcpp/src/rclcpp/executor.cpp | 28 ++++--- .../executors/multi_threaded_executor.cpp | 9 +- rclcpp/src/rclcpp/memory_strategy.cpp | 84 ++++++++++--------- 7 files changed, 164 insertions(+), 135 deletions(-) diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index 57319b2c23..a1b7a1229f 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -62,25 +62,40 @@ class CallbackGroup RCLCPP_PUBLIC explicit CallbackGroup(CallbackGroupType group_type); - RCLCPP_PUBLIC - const std::vector & - get_subscription_ptrs() const; - - RCLCPP_PUBLIC - const std::vector & - get_timer_ptrs() const; - - RCLCPP_PUBLIC - const std::vector & - get_service_ptrs() const; - - RCLCPP_PUBLIC - const std::vector & - get_client_ptrs() const; - - RCLCPP_PUBLIC - const std::vector & - get_waitable_ptrs() const; + template + rclcpp::SubscriptionBase::SharedPtr + find_subscription_ptrs_if(Function func) const + { + return _find_ptrs_if_impl(func, subscription_ptrs_); + } + + template + rclcpp::TimerBase::SharedPtr + find_timer_ptrs_if(Function func) const + { + return _find_ptrs_if_impl(func, timer_ptrs_); + } + + template + rclcpp::ServiceBase::SharedPtr + find_service_ptrs_if(Function func) const + { + return _find_ptrs_if_impl(func, service_ptrs_); + } + + template + rclcpp::ClientBase::SharedPtr + find_client_ptrs_if(Function func) const + { + return _find_ptrs_if_impl(func, client_ptrs_); + } + + template + rclcpp::Waitable::SharedPtr + find_waitable_ptrs_if(Function func) const + { + return _find_ptrs_if_impl(func, waitable_ptrs_); + } RCLCPP_PUBLIC std::atomic_bool & @@ -130,6 +145,21 @@ class CallbackGroup std::vector client_ptrs_; std::vector waitable_ptrs_; std::atomic_bool can_be_taken_from_; + +private: + template + typename TypeT::SharedPtr _find_ptrs_if_impl( + Function func, const std::vector & vect_ptrs) const + { + std::lock_guard lock(mutex_); + for (auto & weak_ptr : vect_ptrs) { + auto ref_ptr = weak_ptr.lock(); + if (ref_ptr && func(ref_ptr)) { + return ref_ptr; + } + } + return typename TypeT::SharedPtr(); + } }; } // namespace callback_group diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 72df5dd82a..316c016a45 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -15,6 +15,7 @@ #ifndef RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_ #define RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_ +#include #include #include #include @@ -53,7 +54,8 @@ class MultiThreadedExecutor : public executor::Executor MultiThreadedExecutor( const executor::ExecutorArgs & args = executor::ExecutorArgs(), size_t number_of_threads = 0, - bool yield_before_execute = false); + bool yield_before_execute = false, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); RCLCPP_PUBLIC virtual ~MultiThreadedExecutor(); @@ -77,6 +79,7 @@ class MultiThreadedExecutor : public executor::Executor std::mutex wait_mutex_; size_t number_of_threads_; bool yield_before_execute_; + std::chrono::nanoseconds next_exec_timeout_; std::set scheduled_timers_; }; diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index 61d5b74097..b70d849347 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -164,40 +164,31 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy if (!group || !group->can_be_taken_from().load()) { continue; } - for (auto & weak_subscription : group->get_subscription_ptrs()) { - auto subscription = weak_subscription.lock(); - if (subscription) { + group->find_subscription_ptrs_if( + [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { subscription_handles_.push_back(subscription->get_subscription_handle()); if (subscription->get_intra_process_subscription_handle()) { subscription_handles_.push_back( subscription->get_intra_process_subscription_handle()); } - } - } - for (auto & weak_service : group->get_service_ptrs()) { - auto service = weak_service.lock(); - if (service) { + return false; + }); + group->find_service_ptrs_if([this](const rclcpp::ServiceBase::SharedPtr & service) { service_handles_.push_back(service->get_service_handle()); - } - } - for (auto & weak_client : group->get_client_ptrs()) { - auto client = weak_client.lock(); - if (client) { + return false; + }); + group->find_client_ptrs_if([this](const rclcpp::ClientBase::SharedPtr & client) { client_handles_.push_back(client->get_client_handle()); - } - } - for (auto & weak_timer : group->get_timer_ptrs()) { - auto timer = weak_timer.lock(); - if (timer) { + return false; + }); + group->find_timer_ptrs_if([this](const rclcpp::TimerBase::SharedPtr & timer) { timer_handles_.push_back(timer->get_timer_handle()); - } - } - for (auto & weak_waitable : group->get_waitable_ptrs()) { - auto waitable = weak_waitable.lock(); - if (waitable) { + return false; + }); + group->find_waitable_ptrs_if([this](const rclcpp::Waitable::SharedPtr & waitable) { waitable_handles_.push_back(waitable); - } - } + return false; + }); } } return has_invalid_weak_nodes; diff --git a/rclcpp/src/rclcpp/callback_group.cpp b/rclcpp/src/rclcpp/callback_group.cpp index b812afc516..e9d0e80c48 100644 --- a/rclcpp/src/rclcpp/callback_group.cpp +++ b/rclcpp/src/rclcpp/callback_group.cpp @@ -23,40 +23,6 @@ CallbackGroup::CallbackGroup(CallbackGroupType group_type) : type_(group_type), can_be_taken_from_(true) {} -const std::vector & -CallbackGroup::get_subscription_ptrs() const -{ - std::lock_guard lock(mutex_); - return subscription_ptrs_; -} - -const std::vector & -CallbackGroup::get_timer_ptrs() const -{ - std::lock_guard lock(mutex_); - return timer_ptrs_; -} - -const std::vector & -CallbackGroup::get_service_ptrs() const -{ - std::lock_guard lock(mutex_); - return service_ptrs_; -} - -const std::vector & -CallbackGroup::get_client_ptrs() const -{ - std::lock_guard lock(mutex_); - return client_ptrs_; -} - -const std::vector & -CallbackGroup::get_waitable_ptrs() const -{ - std::lock_guard lock(mutex_); - return waitable_ptrs_; -} std::atomic_bool & CallbackGroup::can_be_taken_from() @@ -76,6 +42,12 @@ CallbackGroup::add_subscription( { std::lock_guard lock(mutex_); subscription_ptrs_.push_back(subscription_ptr); + subscription_ptrs_.erase( + std::remove_if( + subscription_ptrs_.begin(), + subscription_ptrs_.end(), + [](rclcpp::SubscriptionBase::WeakPtr x) {return x.expired();}), + subscription_ptrs_.end()); } void @@ -83,6 +55,12 @@ CallbackGroup::add_timer(const rclcpp::TimerBase::SharedPtr timer_ptr) { std::lock_guard lock(mutex_); timer_ptrs_.push_back(timer_ptr); + timer_ptrs_.erase( + std::remove_if( + timer_ptrs_.begin(), + timer_ptrs_.end(), + [](rclcpp::TimerBase::WeakPtr x) {return x.expired();}), + timer_ptrs_.end()); } void @@ -90,6 +68,12 @@ CallbackGroup::add_service(const rclcpp::ServiceBase::SharedPtr service_ptr) { std::lock_guard lock(mutex_); service_ptrs_.push_back(service_ptr); + service_ptrs_.erase( + std::remove_if( + service_ptrs_.begin(), + service_ptrs_.end(), + [](rclcpp::ServiceBase::WeakPtr x) {return x.expired();}), + service_ptrs_.end()); } void @@ -97,6 +81,12 @@ CallbackGroup::add_client(const rclcpp::ClientBase::SharedPtr client_ptr) { std::lock_guard lock(mutex_); client_ptrs_.push_back(client_ptr); + client_ptrs_.erase( + std::remove_if( + client_ptrs_.begin(), + client_ptrs_.end(), + [](rclcpp::ClientBase::WeakPtr x) {return x.expired();}), + client_ptrs_.end()); } void @@ -104,6 +94,12 @@ CallbackGroup::add_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr) { std::lock_guard lock(mutex_); waitable_ptrs_.push_back(waitable_ptr); + waitable_ptrs_.erase( + std::remove_if( + waitable_ptrs_.begin(), + waitable_ptrs_.end(), + [](rclcpp::Waitable::WeakPtr x) {return x.expired();}), + waitable_ptrs_.end()); } void diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 341b8457d2..4820b7f095 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -511,11 +511,12 @@ Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer) if (!group) { continue; } - for (auto & weak_timer : group->get_timer_ptrs()) { - auto t = weak_timer.lock(); - if (t == timer) { - return group; - } + auto timer_ref = group->find_timer_ptrs_if( + [timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool { + return timer_ptr == timer; + }); + if (timer_ref) { + return group; } } } @@ -535,14 +536,15 @@ Executor::get_next_timer(AnyExecutable & any_exec) if (!group || !group->can_be_taken_from().load()) { continue; } - for (auto & timer_ref : group->get_timer_ptrs()) { - auto timer = timer_ref.lock(); - if (timer && timer->is_ready()) { - any_exec.timer = timer; - any_exec.callback_group = group; - any_exec.node_base = node; - return; - } + auto timer_ref = group->find_timer_ptrs_if( + [](const rclcpp::TimerBase::SharedPtr & timer) -> bool { + return timer->is_ready(); + }); + if (timer_ref) { + any_exec.timer = timer_ref; + any_exec.callback_group = group; + any_exec.node_base = node; + return; } } } diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 24c5c79b2c..cc34d09d87 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -27,8 +27,11 @@ using rclcpp::executors::MultiThreadedExecutor; MultiThreadedExecutor::MultiThreadedExecutor( const rclcpp::executor::ExecutorArgs & args, size_t number_of_threads, - bool yield_before_execute) -: executor::Executor(args), yield_before_execute_(yield_before_execute) + bool yield_before_execute, + std::chrono::nanoseconds next_exec_timeout) +: executor::Executor(args), + yield_before_execute_(yield_before_execute), + next_exec_timeout_(next_exec_timeout) { number_of_threads_ = number_of_threads ? number_of_threads : std::thread::hardware_concurrency(); if (number_of_threads_ == 0) { @@ -77,7 +80,7 @@ MultiThreadedExecutor::run(size_t) if (!rclcpp::ok(this->context_) || !spinning.load()) { return; } - if (!get_next_executable(any_exec)) { + if (!get_next_executable(any_exec, next_exec_timeout_)) { continue; } if (any_exec.timer) { diff --git a/rclcpp/src/rclcpp/memory_strategy.cpp b/rclcpp/src/rclcpp/memory_strategy.cpp index 01b6226d3d..0a886e4a39 100644 --- a/rclcpp/src/rclcpp/memory_strategy.cpp +++ b/rclcpp/src/rclcpp/memory_strategy.cpp @@ -32,16 +32,14 @@ MemoryStrategy::get_subscription_by_handle( if (!group) { continue; } - for (auto & weak_subscription : group->get_subscription_ptrs()) { - auto subscription = weak_subscription.lock(); - if (subscription) { - if (subscription->get_subscription_handle() == subscriber_handle) { - return subscription; - } - if (subscription->get_intra_process_subscription_handle() == subscriber_handle) { - return subscription; - } - } + auto match_subscription = group->find_subscription_ptrs_if( + [&subscriber_handle](const rclcpp::SubscriptionBase::SharedPtr & subscription) -> bool { + return + (subscription->get_subscription_handle() == subscriber_handle) || + (subscription->get_intra_process_subscription_handle() == subscriber_handle); + }); + if (match_subscription) { + return match_subscription; } } } @@ -63,11 +61,12 @@ MemoryStrategy::get_service_by_handle( if (!group) { continue; } - for (auto & weak_service : group->get_service_ptrs()) { - auto service = weak_service.lock(); - if (service && service->get_service_handle() == service_handle) { - return service; - } + auto service_ref = group->find_service_ptrs_if( + [&service_handle](const rclcpp::ServiceBase::SharedPtr & service) -> bool { + return service->get_service_handle() == service_handle; + }); + if (service_ref) { + return service_ref; } } } @@ -89,11 +88,12 @@ MemoryStrategy::get_client_by_handle( if (!group) { continue; } - for (auto & weak_client : group->get_client_ptrs()) { - auto client = weak_client.lock(); - if (client && client->get_client_handle() == client_handle) { - return client; - } + auto client_ref = group->find_client_ptrs_if( + [&client_handle](const rclcpp::ClientBase::SharedPtr & client) -> bool { + return client->get_client_handle() == client_handle; + }); + if (client_ref) { + return client_ref; } } } @@ -138,11 +138,12 @@ MemoryStrategy::get_group_by_subscription( if (!group) { continue; } - for (auto & weak_sub : group->get_subscription_ptrs()) { - auto sub = weak_sub.lock(); - if (sub == subscription) { - return group; - } + auto match_sub = group->find_subscription_ptrs_if( + [&subscription](const rclcpp::SubscriptionBase::SharedPtr & sub) -> bool { + return sub == subscription; + }); + if (match_sub) { + return group; } } } @@ -164,11 +165,12 @@ MemoryStrategy::get_group_by_service( if (!group) { continue; } - for (auto & weak_serv : group->get_service_ptrs()) { - auto serv = weak_serv.lock(); - if (serv && serv == service) { - return group; - } + auto service_ref = group->find_service_ptrs_if( + [&service](const rclcpp::ServiceBase::SharedPtr & serv) -> bool { + return serv == service; + }); + if (service_ref) { + return group; } } } @@ -190,11 +192,12 @@ MemoryStrategy::get_group_by_client( if (!group) { continue; } - for (auto & weak_client : group->get_client_ptrs()) { - auto cli = weak_client.lock(); - if (cli && cli == client) { - return group; - } + auto client_ref = group->find_client_ptrs_if( + [&client](const rclcpp::ClientBase::SharedPtr & cli) -> bool { + return cli == client; + }); + if (client_ref) { + return group; } } } @@ -216,11 +219,12 @@ MemoryStrategy::get_group_by_waitable( if (!group) { continue; } - for (auto & weak_waitable : group->get_waitable_ptrs()) { - auto group_waitable = weak_waitable.lock(); - if (group_waitable && group_waitable == waitable) { - return group; - } + auto waitable_ref = group->find_waitable_ptrs_if( + [&waitable](const rclcpp::Waitable::SharedPtr & group_waitable) -> bool { + return group_waitable == waitable; + }); + if (waitable_ref) { + return group; } } }