diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index fb5ba2a63f..5f8443d781 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -93,7 +93,8 @@ class RCLCPP_PUBLIC MemoryStrategy virtual void get_next_waitable( rclcpp::AnyExecutable & any_exec, - const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) = 0; + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, + rcl_wait_set_t * wait_set) = 0; virtual rcl_allocator_t get_allocator() = 0; diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index 88698179d4..12cf4a4be7 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -122,6 +122,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy for (size_t i = 0; i < waitable_handles_.size(); ++i) { if (!waitable_handles_[i]->is_ready(wait_set)) { waitable_handles_[i].reset(); + } else { + waitable_triggered_handles_.emplace_back(waitable_handles_[i]); } } @@ -390,10 +392,11 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy void get_next_waitable( rclcpp::AnyExecutable & any_exec, - const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, + rcl_wait_set_t * wait_set) override { - auto it = waitable_handles_.begin(); - while (it != waitable_handles_.end()) { + auto it = waitable_triggered_handles_.begin(); + while (it != waitable_triggered_handles_.end()) { std::shared_ptr & waitable = *it; if (waitable) { // Find the group for this handle and see if it can be serviced @@ -401,7 +404,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy if (!group) { // Group was not found, meaning the waitable is not valid... // Remove it from the ready list and continue looking - it = waitable_handles_.erase(it); + it = waitable_triggered_handles_.erase(it); continue; } if (!group->can_be_taken_from().load()) { @@ -410,15 +413,19 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy ++it; continue; } + if (!waitable->is_ready(wait_set)) { + it = waitable_triggered_handles_.erase(it); + continue; + } // Otherwise it is safe to set and return the any_exec any_exec.waitable = waitable; any_exec.callback_group = group; any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes); - waitable_handles_.erase(it); + waitable_triggered_handles_.erase(it); return; } // Else, the waitable is no longer valid, remove it and continue - it = waitable_handles_.erase(it); + it = waitable_triggered_handles_.erase(it); } } @@ -499,6 +506,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy VectorRebind> timer_handles_; VectorRebind> waitable_handles_; + VectorRebind> waitable_triggered_handles_; + std::shared_ptr allocator_; }; diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 32b895c1e3..fe90c8d2a4 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -884,7 +884,7 @@ Executor::get_next_ready_executable_from_map( } if (!success) { // Check the waitables to see if there are any that are ready - memory_strategy_->get_next_waitable(any_executable, weak_groups_to_nodes); + memory_strategy_->get_next_waitable(any_executable, weak_groups_to_nodes, &wait_set_); if (any_executable.waitable) { any_executable.data = any_executable.waitable->take_data(); success = true; diff --git a/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp b/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp index 1779d5d88e..2a42b67421 100644 --- a/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp +++ b/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp @@ -63,6 +63,60 @@ class TestWaitable : public rclcpp::Waitable } }; +static bool test_waitable_result2 = false; + +/** + * Mock Waitable class, with a globally setable boolean result. + */ +class TestWaitable2 : public rclcpp::Waitable +{ +public: + explicit TestWaitable2(rcl_publisher_t * pub_ptr) + : pub_ptr_(pub_ptr), + pub_event_(rcl_get_zero_initialized_event()) + { + EXPECT_EQ( + rcl_publisher_event_init(&pub_event_, pub_ptr_, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED), + RCL_RET_OK); + } + + ~TestWaitable2() + { + EXPECT_EQ(rcl_event_fini(&pub_event_), RCL_RET_OK); + } + + void add_to_wait_set(rcl_wait_set_t * wait_set) override + { + EXPECT_EQ(rcl_wait_set_add_event(wait_set, &pub_event_, &wait_set_event_index_), RCL_RET_OK); + } + + bool is_ready(rcl_wait_set_t *) override + { + return test_waitable_result2; + } + + std::shared_ptr + take_data() override + { + return nullptr; + } + + void execute(std::shared_ptr & data) override + { + (void) data; + } + + size_t get_number_of_ready_events() override + { + return 1; + } + +private: + rcl_publisher_t * pub_ptr_; + rcl_event_t pub_event_; + size_t wait_set_event_index_; +}; + struct RclWaitSetSizes { size_t size_of_subscriptions = 0; @@ -657,20 +711,129 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_timer) { } TEST_F(TestAllocatorMemoryStrategy, get_next_waitable) { - auto node1 = std::make_shared("waitable_node", "ns"); - auto node2 = std::make_shared("waitable_node2", "ns"); - rclcpp::Waitable::SharedPtr waitable1 = std::make_shared(); - rclcpp::Waitable::SharedPtr waitable2 = std::make_shared(); - node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr); - node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr); - auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes); + allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes, nullptr); return result; }; - EXPECT_TRUE(TestGetNextEntity(node1, node2, get_next_entity)); + { + auto node1 = std::make_shared( + "waitable_node", "ns", + rclcpp::NodeOptions() + .start_parameter_event_publisher(false) + .start_parameter_services(false)); + + rclcpp::PublisherOptions pub_options; + pub_options.use_default_callbacks = false; + + auto pub1 = node1->create_publisher( + "test_topic_1", rclcpp::QoS(10), pub_options); + + auto waitable1 = + std::make_shared(pub1->get_publisher_handle().get()); + node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr); + + auto basic_node = create_node_with_disabled_callback_groups("basic_node"); + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; + basic_node->for_each_callback_group( + [basic_node, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { + weak_groups_to_nodes.insert( + std::pair( + group_ptr, + basic_node->get_node_base_interface())); + }); + node1->for_each_callback_group( + [node1, &weak_groups_to_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { + weak_groups_to_nodes.insert( + std::pair( + group_ptr, + node1->get_node_base_interface())); + }); + allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); + + rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); + ASSERT_EQ( + rcl_wait_set_init( + &wait_set, + allocator_memory_strategy()->number_of_ready_subscriptions(), + allocator_memory_strategy()->number_of_guard_conditions(), + allocator_memory_strategy()->number_of_ready_timers(), + allocator_memory_strategy()->number_of_ready_clients(), + allocator_memory_strategy()->number_of_ready_services(), + allocator_memory_strategy()->number_of_ready_events(), + rclcpp::contexts::get_global_default_context()->get_rcl_context().get(), + allocator_memory_strategy()->get_allocator()), + RCL_RET_OK); + + ASSERT_TRUE(allocator_memory_strategy()->add_handles_to_wait_set(&wait_set)); + + ASSERT_EQ( + rcl_wait( + &wait_set, + std::chrono::duration_cast(std::chrono::milliseconds(100)) + .count()), + RCL_RET_OK); + test_waitable_result2 = true; + allocator_memory_strategy()->remove_null_handles(&wait_set); + + rclcpp::AnyExecutable result = get_next_entity(weak_groups_to_nodes); + EXPECT_EQ(result.node_base, node1->get_node_base_interface()); + test_waitable_result2 = false; + + EXPECT_EQ(rcl_wait_set_fini(&wait_set), RCL_RET_OK); + } + + { + auto node2 = std::make_shared( + "waitable_node2", "ns", + rclcpp::NodeOptions() + .start_parameter_services(false) + .start_parameter_event_publisher(false)); + + rclcpp::PublisherOptions pub_options; + pub_options.use_default_callbacks = false; + + auto pub2 = node2->create_publisher( + "test_topic_2", rclcpp::QoS(10), pub_options); + + auto waitable2 = + std::make_shared(pub2->get_publisher_handle().get()); + node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr); + + auto basic_node2 = std::make_shared( + "basic_node2", "ns", + rclcpp::NodeOptions() + .start_parameter_services(false) + .start_parameter_event_publisher(false)); + WeakCallbackGroupsToNodesMap weak_groups_to_uncollected_nodes; + basic_node2->for_each_callback_group( + [basic_node2, &weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { + weak_groups_to_uncollected_nodes.insert( + std::pair( + group_ptr, + basic_node2->get_node_base_interface())); + }); + node2->for_each_callback_group( + [node2, + &weak_groups_to_uncollected_nodes](rclcpp::CallbackGroup::SharedPtr group_ptr) + { + weak_groups_to_uncollected_nodes.insert( + std::pair( + group_ptr, + node2->get_node_base_interface())); + }); + + rclcpp::AnyExecutable failed_result = get_next_entity(weak_groups_to_uncollected_nodes); + EXPECT_EQ(failed_result.node_base, nullptr); + } } TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_mutually_exclusive) { @@ -735,11 +898,13 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_waitable_mutually_exclusive) { callback_group->can_be_taken_from() = false; rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes); + allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes, nullptr); return result; }; + test_waitable_result = true; EXPECT_TRUE(TestGetNextEntityMutuallyExclusive(node, get_next_entity)); + test_waitable_result = false; } TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_out_of_scope) { @@ -902,6 +1067,6 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_waitable_out_of_scope) { EXPECT_EQ(1u, allocator_memory_strategy()->number_of_waitables()); rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes); + allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes, nullptr); EXPECT_EQ(nullptr, result.node_base); }