diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index a87a2d0b1a..8471754f30 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -364,7 +364,8 @@ class Executor private: RCLCPP_DISABLE_COPY(Executor) - std::vector weak_nodes_; + std::list weak_nodes_; + std::list guard_conditions_; }; } // namespace executor diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index 70db12237a..9b367ec529 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -15,8 +15,8 @@ #ifndef RCLCPP__MEMORY_STRATEGY_HPP_ #define RCLCPP__MEMORY_STRATEGY_HPP_ +#include #include -#include #include "rcl/allocator.h" #include "rcl/wait.h" @@ -42,11 +42,11 @@ class RCLCPP_PUBLIC MemoryStrategy { public: RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(MemoryStrategy) - using WeakNodeVector = std::vector; + using WeakNodeList = std::list; virtual ~MemoryStrategy() = default; - virtual bool collect_entities(const WeakNodeVector & weak_nodes) = 0; + virtual bool collect_entities(const WeakNodeList & weak_nodes) = 0; virtual size_t number_of_ready_subscriptions() const = 0; virtual size_t number_of_ready_services() const = 0; @@ -67,22 +67,22 @@ class RCLCPP_PUBLIC MemoryStrategy virtual void get_next_subscription( rclcpp::executor::AnyExecutable & any_exec, - const WeakNodeVector & weak_nodes) = 0; + const WeakNodeList & weak_nodes) = 0; virtual void get_next_service( rclcpp::executor::AnyExecutable & any_exec, - const WeakNodeVector & weak_nodes) = 0; + const WeakNodeList & weak_nodes) = 0; virtual void get_next_client( rclcpp::executor::AnyExecutable & any_exec, - const WeakNodeVector & weak_nodes) = 0; + const WeakNodeList & weak_nodes) = 0; virtual void get_next_waitable( rclcpp::executor::AnyExecutable & any_exec, - const WeakNodeVector & weak_nodes) = 0; + const WeakNodeList & weak_nodes) = 0; virtual rcl_allocator_t get_allocator() = 0; @@ -90,42 +90,42 @@ class RCLCPP_PUBLIC MemoryStrategy static rclcpp::SubscriptionBase::SharedPtr get_subscription_by_handle( std::shared_ptr subscriber_handle, - const WeakNodeVector & weak_nodes); + const WeakNodeList & weak_nodes); static rclcpp::ServiceBase::SharedPtr get_service_by_handle( std::shared_ptr service_handle, - const WeakNodeVector & weak_nodes); + const WeakNodeList & weak_nodes); static rclcpp::ClientBase::SharedPtr get_client_by_handle( std::shared_ptr client_handle, - const WeakNodeVector & weak_nodes); + const WeakNodeList & weak_nodes); static rclcpp::node_interfaces::NodeBaseInterface::SharedPtr get_node_by_group( rclcpp::callback_group::CallbackGroup::SharedPtr group, - const WeakNodeVector & weak_nodes); + const WeakNodeList & weak_nodes); static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_subscription( rclcpp::SubscriptionBase::SharedPtr subscription, - const WeakNodeVector & weak_nodes); + const WeakNodeList & weak_nodes); static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_service( rclcpp::ServiceBase::SharedPtr service, - const WeakNodeVector & weak_nodes); + const WeakNodeList & weak_nodes); static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_client( rclcpp::ClientBase::SharedPtr client, - const WeakNodeVector & weak_nodes); + const WeakNodeList & weak_nodes); static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_waitable( rclcpp::Waitable::SharedPtr waitable, - const WeakNodeVector & weak_nodes); + const WeakNodeList & weak_nodes); }; } // namespace memory_strategy diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index 8bf54656bc..61d5b74097 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -150,7 +150,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy ); } - bool collect_entities(const WeakNodeVector & weak_nodes) + bool collect_entities(const WeakNodeList & weak_nodes) { bool has_invalid_weak_nodes = false; for (auto & weak_node : weak_nodes) { @@ -265,7 +265,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy virtual void get_next_subscription( executor::AnyExecutable & any_exec, - const WeakNodeVector & weak_nodes) + const WeakNodeList & weak_nodes) { auto it = subscription_handles_.begin(); while (it != subscription_handles_.end()) { @@ -309,7 +309,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy virtual void get_next_service( executor::AnyExecutable & any_exec, - const WeakNodeVector & weak_nodes) + const WeakNodeList & weak_nodes) { auto it = service_handles_.begin(); while (it != service_handles_.end()) { @@ -342,7 +342,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy } virtual void - get_next_client(executor::AnyExecutable & any_exec, const WeakNodeVector & weak_nodes) + get_next_client(executor::AnyExecutable & any_exec, const WeakNodeList & weak_nodes) { auto it = client_handles_.begin(); while (it != client_handles_.end()) { @@ -375,7 +375,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy } virtual void - get_next_waitable(executor::AnyExecutable & any_exec, const WeakNodeVector & weak_nodes) + get_next_waitable(executor::AnyExecutable & any_exec, const WeakNodeList & weak_nodes) { auto it = waitable_handles_.begin(); while (it != waitable_handles_.end()) { diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 0caa728af2..2a47b71bbd 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -91,6 +91,10 @@ Executor::~Executor() } } weak_nodes_.clear(); + for (auto & guard_condition : guard_conditions_) { + memory_strategy_->remove_guard_condition(guard_condition); + } + guard_conditions_.clear(); // Finalize the wait set. if (rcl_wait_set_fini(&wait_set_) != RCL_RET_OK) { @@ -128,6 +132,7 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt } } weak_nodes_.push_back(node_ptr); + guard_conditions_.push_back(node_ptr->get_notify_guard_condition()); if (notify) { // Interrupt waiting to handle new node if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { @@ -148,17 +153,21 @@ void Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) { bool node_removed = false; - weak_nodes_.erase( - std::remove_if( - weak_nodes_.begin(), weak_nodes_.end(), - [&](rclcpp::node_interfaces::NodeBaseInterface::WeakPtr & i) - { - bool matched = (i.lock() == node_ptr); - node_removed |= matched; - return matched; + { + auto node_it = weak_nodes_.begin(); + auto gc_it = guard_conditions_.begin(); + while (node_it != weak_nodes_.end()) { + bool matched = (node_it->lock() == node_ptr); + if (matched) { + node_it = weak_nodes_.erase(node_it); + gc_it = guard_conditions_.erase(gc_it); + node_removed = true; + } else { + ++node_it; + ++gc_it; } - ) - ); + } + } std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); has_executor.store(false); if (notify) { @@ -420,15 +429,18 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) // Clean up any invalid nodes, if they were detected if (has_invalid_weak_nodes) { - weak_nodes_.erase( - remove_if( - weak_nodes_.begin(), weak_nodes_.end(), - [](rclcpp::node_interfaces::NodeBaseInterface::WeakPtr i) - { - return i.expired(); - } - ) - ); + auto node_it = weak_nodes_.begin(); + auto gc_it = guard_conditions_.begin(); + while (node_it != weak_nodes_.end()) { + if (node_it->expired()) { + node_it = weak_nodes_.erase(node_it); + memory_strategy_->remove_guard_condition(*gc_it); + gc_it = guard_conditions_.erase(gc_it); + } else { + ++node_it; + ++gc_it; + } + } } // clear wait set if (rcl_wait_set_clear(&wait_set_) != RCL_RET_OK) { diff --git a/rclcpp/src/rclcpp/memory_strategy.cpp b/rclcpp/src/rclcpp/memory_strategy.cpp index f77c534657..01b6226d3d 100644 --- a/rclcpp/src/rclcpp/memory_strategy.cpp +++ b/rclcpp/src/rclcpp/memory_strategy.cpp @@ -20,7 +20,7 @@ using rclcpp::memory_strategy::MemoryStrategy; rclcpp::SubscriptionBase::SharedPtr MemoryStrategy::get_subscription_by_handle( std::shared_ptr subscriber_handle, - const WeakNodeVector & weak_nodes) + const WeakNodeList & weak_nodes) { for (auto & weak_node : weak_nodes) { auto node = weak_node.lock(); @@ -51,7 +51,7 @@ MemoryStrategy::get_subscription_by_handle( rclcpp::ServiceBase::SharedPtr MemoryStrategy::get_service_by_handle( std::shared_ptr service_handle, - const WeakNodeVector & weak_nodes) + const WeakNodeList & weak_nodes) { for (auto & weak_node : weak_nodes) { auto node = weak_node.lock(); @@ -77,7 +77,7 @@ MemoryStrategy::get_service_by_handle( rclcpp::ClientBase::SharedPtr MemoryStrategy::get_client_by_handle( std::shared_ptr client_handle, - const WeakNodeVector & weak_nodes) + const WeakNodeList & weak_nodes) { for (auto & weak_node : weak_nodes) { auto node = weak_node.lock(); @@ -103,7 +103,7 @@ MemoryStrategy::get_client_by_handle( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr MemoryStrategy::get_node_by_group( rclcpp::callback_group::CallbackGroup::SharedPtr group, - const WeakNodeVector & weak_nodes) + const WeakNodeList & weak_nodes) { if (!group) { return nullptr; @@ -126,7 +126,7 @@ MemoryStrategy::get_node_by_group( rclcpp::callback_group::CallbackGroup::SharedPtr MemoryStrategy::get_group_by_subscription( rclcpp::SubscriptionBase::SharedPtr subscription, - const WeakNodeVector & weak_nodes) + const WeakNodeList & weak_nodes) { for (auto & weak_node : weak_nodes) { auto node = weak_node.lock(); @@ -152,7 +152,7 @@ MemoryStrategy::get_group_by_subscription( rclcpp::callback_group::CallbackGroup::SharedPtr MemoryStrategy::get_group_by_service( rclcpp::ServiceBase::SharedPtr service, - const WeakNodeVector & weak_nodes) + const WeakNodeList & weak_nodes) { for (auto & weak_node : weak_nodes) { auto node = weak_node.lock(); @@ -178,7 +178,7 @@ MemoryStrategy::get_group_by_service( rclcpp::callback_group::CallbackGroup::SharedPtr MemoryStrategy::get_group_by_client( rclcpp::ClientBase::SharedPtr client, - const WeakNodeVector & weak_nodes) + const WeakNodeList & weak_nodes) { for (auto & weak_node : weak_nodes) { auto node = weak_node.lock(); @@ -204,7 +204,7 @@ MemoryStrategy::get_group_by_client( rclcpp::callback_group::CallbackGroup::SharedPtr MemoryStrategy::get_group_by_waitable( rclcpp::Waitable::SharedPtr waitable, - const WeakNodeVector & weak_nodes) + const WeakNodeList & weak_nodes) { for (auto & weak_node : weak_nodes) { auto node = weak_node.lock(); diff --git a/rclcpp/test/test_executor.cpp b/rclcpp/test/test_executor.cpp index 40e378b38e..95371415fc 100644 --- a/rclcpp/test/test_executor.cpp +++ b/rclcpp/test/test_executor.cpp @@ -60,3 +60,10 @@ TEST_F(TestExecutors, detachOnDestruction) { EXPECT_NO_THROW(executor.add_node(node)); } } + +// Make sure that the executor can automatically remove expired nodes correctly +TEST_F(TestExecutors, addTemporaryNode) { + rclcpp::executors::SingleThreadedExecutor executor; + executor.add_node(std::make_shared("temporary_node")); + EXPECT_NO_THROW(executor.spin_some()); +} diff --git a/rclcpp/test/test_find_weak_nodes.cpp b/rclcpp/test/test_find_weak_nodes.cpp index bced853d60..f4e58e8d19 100644 --- a/rclcpp/test/test_find_weak_nodes.cpp +++ b/rclcpp/test/test_find_weak_nodes.cpp @@ -36,15 +36,15 @@ TEST_F(TestFindWeakNodes, allocator_strategy_with_weak_nodes) { rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>(); auto existing_node = rclcpp::Node::make_shared("existing_node"); auto dead_node = rclcpp::Node::make_shared("dead_node"); - rclcpp::memory_strategy::MemoryStrategy::WeakNodeVector weak_nodes; + rclcpp::memory_strategy::MemoryStrategy::WeakNodeList weak_nodes; weak_nodes.push_back(existing_node->get_node_base_interface()); weak_nodes.push_back(dead_node->get_node_base_interface()); // AND // Delete dead_node, creating a dangling pointer in weak_nodes dead_node.reset(); - ASSERT_FALSE(weak_nodes[0].expired()); - ASSERT_TRUE(weak_nodes[1].expired()); + ASSERT_FALSE(weak_nodes.front().expired()); + ASSERT_TRUE(weak_nodes.back().expired()); // WHEN bool has_invalid_weak_nodes = memory_strategy->collect_entities(weak_nodes); @@ -64,11 +64,11 @@ TEST_F(TestFindWeakNodes, allocator_strategy_no_weak_nodes) { rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>(); auto existing_node1 = rclcpp::Node::make_shared("existing_node1"); auto existing_node2 = rclcpp::Node::make_shared("existing_node2"); - rclcpp::memory_strategy::MemoryStrategy::WeakNodeVector weak_nodes; + rclcpp::memory_strategy::MemoryStrategy::WeakNodeList weak_nodes; weak_nodes.push_back(existing_node1->get_node_base_interface()); weak_nodes.push_back(existing_node2->get_node_base_interface()); - ASSERT_FALSE(weak_nodes[0].expired()); - ASSERT_FALSE(weak_nodes[1].expired()); + ASSERT_FALSE(weak_nodes.front().expired()); + ASSERT_FALSE(weak_nodes.back().expired()); // WHEN bool has_invalid_weak_nodes = memory_strategy->collect_entities(weak_nodes);