From 6dd9155b4f6937fcf08d581e76fb9f79feeed798 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Wed, 6 Jan 2021 12:16:31 -0300 Subject: [PATCH 1/6] Allow to add/remove nodes thread safely in rclcpp::Executor Signed-off-by: Ivan Santiago Paunovic --- rclcpp/include/rclcpp/executor.hpp | 24 +++++++++++++++--------- rclcpp/src/rclcpp/executor.cpp | 17 ++++++++++++++--- rclcpp/test/rclcpp/test_executor.cpp | 1 + 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 7d93595735..ef9d2be2fb 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -476,7 +476,7 @@ class Executor rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, - bool notify = true); + bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_); /// Remove a callback group from the executor. /** @@ -487,7 +487,7 @@ class Executor remove_callback_group_from_map( rclcpp::CallbackGroup::SharedPtr group_ptr, WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, - bool notify = true); + bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_); RCLCPP_PUBLIC bool @@ -532,10 +532,11 @@ class Executor rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set(); // Mutex to protect the subsequent memory_strategy_. - std::mutex memory_strategy_mutex_; + mutable std::mutex mutex_; /// The memory strategy: an interface for handling user-defined memory allocation strategies. - memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; + memory_strategy::MemoryStrategy::SharedPtr + memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_); /// The context associated with this executor. std::shared_ptr context_; @@ -552,19 +553,24 @@ class Executor WeakNodesToGuardConditionsMap; /// maps nodes to guard conditions - WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_; + WeakNodesToGuardConditionsMap + weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); /// maps callback groups associated to nodes - WeakCallbackGroupsToNodesMap weak_groups_associated_with_executor_to_nodes_; + WeakCallbackGroupsToNodesMap + weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); /// maps callback groups to nodes associated with executor - WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_; + WeakCallbackGroupsToNodesMap + weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_); /// maps all callback groups to nodes - WeakCallbackGroupsToNodesMap weak_groups_to_nodes_; + WeakCallbackGroupsToNodesMap + weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); /// nodes that are associated with the executor - std::list weak_nodes_; + std::list + weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); }; namespace executor diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 3b02705b1b..e9e371c241 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -144,6 +144,7 @@ std::vector Executor::get_all_callback_groups() { std::vector groups; + std::lock_guard guard{mutex_}; for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { groups.push_back(group_node_ptr.first); } @@ -157,6 +158,7 @@ std::vector Executor::get_manually_added_callback_groups() { std::vector groups; + std::lock_guard guard{mutex_}; for (auto const & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { groups.push_back(group_node_ptr.first); } @@ -167,6 +169,7 @@ std::vector Executor::get_automatically_added_callback_groups_from_nodes() { std::vector groups; + std::lock_guard guard{mutex_}; for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { groups.push_back(group_node_ptr.first); } @@ -176,6 +179,7 @@ Executor::get_automatically_added_callback_groups_from_nodes() void Executor::add_callback_groups_from_nodes_associated_to_executor() { + std::lock_guard guard{mutex_}; for (auto & weak_node : weak_nodes_) { auto node = weak_node.lock(); if (node) { @@ -233,7 +237,6 @@ Executor::add_callback_group_to_map( } } // Add the node's notify condition to the guard condition handles - std::unique_lock lock(memory_strategy_mutex_); memory_strategy_->add_guard_condition(node_ptr->get_notify_guard_condition()); } } @@ -244,6 +247,7 @@ Executor::add_callback_group( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) { + std::lock_guard guard{mutex_}; this->add_callback_group_to_map( group_ptr, node_ptr, @@ -259,6 +263,7 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt if (has_executor.exchange(true)) { throw std::runtime_error("Node has already been added to an executor."); } + std::lock_guard guard{mutex_}; for (auto & weak_group : node_ptr->get_callback_groups()) { auto group_ptr = weak_group.lock(); if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() && @@ -307,7 +312,6 @@ Executor::remove_callback_group_from_map( throw_from_rcl_error(ret, "Failed to trigger guard condition on callback group remove"); } } - std::unique_lock lock(memory_strategy_mutex_); memory_strategy_->remove_guard_condition(node_ptr->get_notify_guard_condition()); } } @@ -317,6 +321,7 @@ Executor::remove_callback_group( rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify) { + std::lock_guard guard{mutex_}; this->remove_callback_group_from_map( group_ptr, weak_groups_associated_with_executor_to_nodes_, @@ -336,6 +341,7 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node throw std::runtime_error("Node needs to be associated with an executor."); } + std::lock_guard guard{mutex_}; bool found_node = false; auto node_it = weak_nodes_.begin(); while (node_it != weak_nodes_.end()) { @@ -489,6 +495,7 @@ Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr if (memory_strategy == nullptr) { throw std::runtime_error("Received NULL memory strategy in executor."); } + std::lock_guard guard{mutex_}; memory_strategy_ = memory_strategy; } @@ -662,7 +669,7 @@ void Executor::wait_for_work(std::chrono::nanoseconds timeout) { { - std::unique_lock lock(memory_strategy_mutex_); + std::lock_guard guard(mutex_); // Check weak_nodes_ to find any callback group that is not owned // by an executor and add it to the list of callbackgroups for @@ -730,6 +737,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) rcl_ret_t status = rcl_wait(&wait_set_, std::chrono::duration_cast(timeout).count()); + if (status == RCL_RET_WAIT_SET_EMPTY) { RCUTILS_LOG_WARN_NAMED( "rclcpp", @@ -741,6 +749,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) // check the null handles in the wait set and remove them from the handles in memory strategy // for callback-based entities + std::lock_guard guard(mutex_); memory_strategy_->remove_null_handles(&wait_set_); } @@ -764,6 +773,7 @@ Executor::get_node_by_group( rclcpp::CallbackGroup::SharedPtr Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer) { + std::lock_guard guard{mutex_}; for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) { auto group = pair.first.lock(); if (!group) { @@ -807,6 +817,7 @@ Executor::get_next_ready_executable_from_map( rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes) { bool success = false; + std::lock_guard guard{mutex_}; // Check the timers to see if there are any that are ready memory_strategy_->get_next_timer(any_executable, weak_groups_to_nodes); if (any_executable.timer) { diff --git a/rclcpp/test/rclcpp/test_executor.cpp b/rclcpp/test/rclcpp/test_executor.cpp index ee108fe1ec..019580d8d4 100644 --- a/rclcpp/test/rclcpp/test_executor.cpp +++ b/rclcpp/test/rclcpp/test_executor.cpp @@ -54,6 +54,7 @@ class DummyExecutor : public rclcpp::Executor rclcpp::node_interfaces::NodeBaseInterface::SharedPtr local_get_node_by_group( rclcpp::CallbackGroup::SharedPtr group) { + std::lock_guard guard_{mutex_}; // only to make happy the TSA return get_node_by_group(weak_groups_to_nodes_, group); } From bf6fba04f87aba4e0fceaf93583b2ff6f1ca2678 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Wed, 6 Jan 2021 13:11:53 -0300 Subject: [PATCH 2/6] fixes Signed-off-by: Ivan Santiago Paunovic --- rclcpp/include/rclcpp/executor.hpp | 2 +- rclcpp/src/rclcpp/executor.cpp | 1 - rclcpp/test/rclcpp/test_executor.cpp | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index ef9d2be2fb..b99787cc68 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -518,7 +518,7 @@ class Executor */ RCLCPP_PUBLIC virtual void - add_callback_groups_from_nodes_associated_to_executor(); + add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_); /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. std::atomic_bool spinning; diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index e9e371c241..8e9870ce5a 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -179,7 +179,6 @@ Executor::get_automatically_added_callback_groups_from_nodes() void Executor::add_callback_groups_from_nodes_associated_to_executor() { - std::lock_guard guard{mutex_}; for (auto & weak_node : weak_nodes_) { auto node = weak_node.lock(); if (node) { diff --git a/rclcpp/test/rclcpp/test_executor.cpp b/rclcpp/test/rclcpp/test_executor.cpp index 019580d8d4..c6d976816b 100644 --- a/rclcpp/test/rclcpp/test_executor.cpp +++ b/rclcpp/test/rclcpp/test_executor.cpp @@ -54,7 +54,7 @@ class DummyExecutor : public rclcpp::Executor rclcpp::node_interfaces::NodeBaseInterface::SharedPtr local_get_node_by_group( rclcpp::CallbackGroup::SharedPtr group) { - std::lock_guard guard_{mutex_}; // only to make happy the TSA + std::lock_guard guard_{mutex_}; // only to make the TSA happy return get_node_by_group(weak_groups_to_nodes_, group); } From ba30c0dd07c9d8a6ae3114fb5e84bd3d7c932e5d Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Wed, 6 Jan 2021 13:49:18 -0300 Subject: [PATCH 3/6] Add test Signed-off-by: Ivan Santiago Paunovic --- rclcpp/test/rclcpp/test_executor.cpp | 30 ++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/rclcpp/test/rclcpp/test_executor.cpp b/rclcpp/test/rclcpp/test_executor.cpp index c6d976816b..eec6d6d2e8 100644 --- a/rclcpp/test/rclcpp/test_executor.cpp +++ b/rclcpp/test/rclcpp/test_executor.cpp @@ -21,6 +21,7 @@ #include "rclcpp/executor.hpp" #include "rclcpp/memory_strategy.hpp" +#include "rclcpp/executors/single_threaded_executor.hpp" #include "rclcpp/strategies/allocator_memory_strategy.hpp" #include "../mocking_utils/patch.hpp" @@ -88,6 +89,35 @@ MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rcutils_allocator_t, !=) MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rcutils_allocator_t, <) MOCKING_UTILS_BOOL_OPERATOR_RETURNS_FALSE(rcutils_allocator_t, >) +TEST_F(TestExecutor, add_remove_node_thread_safe) { + using namespace std::chrono_literals; + + // Create an Executor + rclcpp::executors::SingleThreadedExecutor executor; + + auto future = std::async(std::launch::async, [&executor] {executor.spin();}); + + // Add and remove nodes repeatedly + // Test that this does not cause a segfault + size_t num_nodes = 100; + for (size_t i = 0; i < num_nodes; ++i) { + std::ostringstream name; + name << "node_" << i; + auto node = std::make_shared(name.str()); + executor.add_node(node); + // Sleeping here helps exaggerate the issue + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + executor.remove_node(node); + } + std::future_status future_status = std::future_status::timeout; + do { + executor.cancel(); + future_status = future.wait_for(1s); + } while (future_status == std::future_status::timeout); + EXPECT_EQ(future_status, std::future_status::ready); + future.get(); +} + TEST_F(TestExecutor, constructor_bad_guard_condition_init) { auto mock = mocking_utils::patch_and_return( "lib:rclcpp", rcl_guard_condition_init, RCL_RET_ERROR); From 44524078585283875a7a7cf434f3f601e5f98af6 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Wed, 6 Jan 2021 17:01:34 -0300 Subject: [PATCH 4/6] Delete extra new line Signed-off-by: Ivan Santiago Paunovic --- rclcpp/src/rclcpp/executor.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 8e9870ce5a..3f63193400 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -736,7 +736,6 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) rcl_ret_t status = rcl_wait(&wait_set_, std::chrono::duration_cast(timeout).count()); - if (status == RCL_RET_WAIT_SET_EMPTY) { RCUTILS_LOG_WARN_NAMED( "rclcpp", From 1972f0f3ec56d3fcb687ced4463f82a66e070e99 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Fri, 8 Jan 2021 18:42:47 -0300 Subject: [PATCH 5/6] Pass std::map by reference, don't make copies Signed-off-by: Ivan Santiago Paunovic --- rclcpp/include/rclcpp/executor.hpp | 6 +++--- rclcpp/src/rclcpp/executor.cpp | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index b99787cc68..946aa18851 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -448,7 +448,7 @@ class Executor RCLCPP_PUBLIC rclcpp::node_interfaces::NodeBaseInterface::SharedPtr get_node_by_group( - WeakCallbackGroupsToNodesMap weak_groups_to_nodes, + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, rclcpp::CallbackGroup::SharedPtr group); /// Return true if the node has been added to this executor. @@ -460,7 +460,7 @@ class Executor bool has_node( const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - WeakCallbackGroupsToNodesMap weak_groups_to_nodes) const; + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const; RCLCPP_PUBLIC rclcpp::CallbackGroup::SharedPtr @@ -497,7 +497,7 @@ class Executor bool get_next_ready_executable_from_map( AnyExecutable & any_executable, - WeakCallbackGroupsToNodesMap weak_groups_to_nodes); + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); RCLCPP_PUBLIC bool diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 3f63193400..768f7dd95a 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -753,7 +753,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) rclcpp::node_interfaces::NodeBaseInterface::SharedPtr Executor::get_node_by_group( - rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes, + const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, rclcpp::CallbackGroup::SharedPtr group) { if (!group) { @@ -812,7 +812,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) bool Executor::get_next_ready_executable_from_map( AnyExecutable & any_executable, - rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes) + const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { bool success = false; std::lock_guard guard{mutex_}; @@ -902,7 +902,7 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos bool Executor::has_node( const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes) const + const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const { return std::find_if( weak_groups_to_nodes.begin(), From f286348db9f54e2c3312f64be141f81fa72aca14 Mon Sep 17 00:00:00 2001 From: Ivan Santiago Paunovic Date: Mon, 11 Jan 2021 17:02:38 -0300 Subject: [PATCH 6/6] linters Signed-off-by: Ivan Santiago Paunovic --- rclcpp/src/rclcpp/executor.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 768f7dd95a..d33df58ee2 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -753,7 +753,8 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) rclcpp::node_interfaces::NodeBaseInterface::SharedPtr Executor::get_node_by_group( - const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, + const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & + weak_groups_to_nodes, rclcpp::CallbackGroup::SharedPtr group) { if (!group) { @@ -812,7 +813,8 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) bool Executor::get_next_ready_executable_from_map( AnyExecutable & any_executable, - const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) + const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & + weak_groups_to_nodes) { bool success = false; std::lock_guard guard{mutex_}; @@ -902,7 +904,8 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos bool Executor::has_node( const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, - const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const + const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & + weak_groups_to_nodes) const { return std::find_if( weak_groups_to_nodes.begin(),