diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 5b3206b131..f607414789 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -410,19 +410,36 @@ class TestWaitable : public rclcpp::Waitable add_to_wait_set(rcl_wait_set_t * wait_set) override { rclcpp::detail::add_guard_condition_to_rcl_wait_set(*wait_set, gc_); + if (retrigger_guard_condition && num_unprocessed_triggers > 0) { + gc_.trigger(); + } } void trigger() { + num_unprocessed_triggers++; gc_.trigger(); } + void trigger_and_hold_execute() + { + hold_execute = true; + + trigger(); + } + + void release_execute() + { + hold_execute = false; + cv.notify_one(); + } + bool is_ready(rcl_wait_set_t * wait_set) override { + is_ready_called_before_take_data = true; for (size_t i = 0; i < wait_set->size_of_guard_conditions; ++i) { if (&gc_.get_rcl_guard_condition() == wait_set->guard_conditions[i]) { - is_ready_called_before_take_data = true; return true; } } @@ -438,6 +455,9 @@ class TestWaitable : public rclcpp::Waitable } is_ready_called_before_take_data = false; + + num_unprocessed_triggers--; + return nullptr; } @@ -445,7 +465,7 @@ class TestWaitable : public rclcpp::Waitable take_data_by_entity_id(size_t id) override { (void) id; - return nullptr; + return take_data(); } void @@ -462,6 +482,12 @@ class TestWaitable : public rclcpp::Waitable throw; } } + + if(hold_execute) + { + std::unique_lock lk(cv_m); + cv.wait(lk); + } } void @@ -496,11 +522,22 @@ class TestWaitable : public rclcpp::Waitable return execute_promise_.get_future(); } + void enable_retriggering(bool enabled) + { + retrigger_guard_condition = enabled; + } + private: bool is_ready_called_before_take_data = false; + bool retrigger_guard_condition = true; std::promise execute_promise_; std::mutex execute_promise_mutex_; + std::atomic num_unprocessed_triggers = 0; + std::atomic hold_execute = false; size_t count_ = 0; + std::condition_variable cv; + std::mutex cv_m; + rclcpp::GuardCondition gc_; }; @@ -547,136 +584,209 @@ TYPED_TEST(TestExecutors, spinAll) spinner.join(); } -TEST(TestExecutorsOnlyNode, double_take_data) +TEST(TestExecutors, double_take_data) { rclcpp::init(0, nullptr); + rclcpp::executors::MultiThreadedExecutor executor; + const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info(); std::stringstream test_name; test_name << test_info->test_case_name() << "_" << test_info->name(); rclcpp::Node::SharedPtr node = std::make_shared("node", test_name.str()); - class MyExecutor : public rclcpp::executors::SingleThreadedExecutor - { -public: - /** - * This is a copy of Executor::get_next_executable with a callback, to test - * for a special race condition - */ - bool get_next_executable_with_callback( - rclcpp::AnyExecutable & any_executable, - std::chrono::nanoseconds timeout, - std::function inbetween) - { - bool success = false; - // Check to see if there are any subscriptions or timers needing service - // TODO(wjwwood): improve run to run efficiency of this function - success = get_next_ready_executable(any_executable); - // If there are none - if (!success) { - - inbetween(); - - // Wait for subscriptions or timers to work on - wait_for_work(timeout); - if (!spinning.load()) { - return false; - } - // Try again - success = get_next_ready_executable(any_executable); - } - return success; - } + auto waitable_interfaces = node->get_node_waitables_interface(); - void spin_once_with_callback( - std::chrono::nanoseconds timeout, - std::function inbetween) - { - rclcpp::AnyExecutable any_exec; - if (get_next_executable_with_callback(any_exec, timeout, inbetween)) { - execute_any_executable(any_exec); - } - } + auto first_cbg = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, + true); - }; + auto third_cbg = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, + true); - MyExecutor executor; + // these waitable have one job, to make the MemoryStrategy::collect_enties method take + // a long time, in order to force our race condition + std::vector> stuffing_waitables; + std::vector> stuffing_cbgs; + for (size_t i = 0; i < 50; i++) { + auto cbg = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, + true); + + stuffing_cbgs.push_back(cbg); + + for (int j = 0; j < 200; j++) { + auto waitable = std::make_shared(); + stuffing_waitables.push_back(waitable); + waitable_interfaces->add_waitable(waitable, cbg); + } + } + + // this is the callback group were wo introduce the double take auto callback_group = node->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive, true); + std::vector> waitables; - auto waitable_interfaces = node->get_node_waitables_interface(); + auto w3 = std::make_shared(); + waitable_interfaces->add_waitable(w3, third_cbg); + + // First group of waitables, that gets processed. + // We use the shared count of these waitables and the callback group, + // to estimate when MemoryStrategy::collect_enties is called in the spinner thread + std::vector> first_waitables; + auto non_triggered_in_first_cbg = std::make_shared(); + waitable_interfaces->add_waitable(non_triggered_in_first_cbg, first_cbg); + + auto non_triggered_in_first_cbg2 = std::make_shared(); + waitable_interfaces->add_waitable(non_triggered_in_first_cbg2, first_cbg); + - for (int i = 0; i < 3; i++) { + auto cbg_start = std::make_shared(); + waitable_interfaces->add_waitable(cbg_start, callback_group); + + // These waitables will get triggered while cbg_start is beeing executed + for (int i = 0; i < 20; i++) { auto waitable = std::make_shared(); waitables.push_back(waitable); waitable_interfaces->add_waitable(waitable, callback_group); } + + // used to detect if all triggers were processed + auto cbg_end = std::make_shared(); + waitable_interfaces->add_waitable(cbg_end, callback_group); + executor.add_node(node); - for (auto & waitable : waitables) { - waitable->trigger(); + // ref count if not reference by the internals of the executor + auto min_ref_cnt = non_triggered_in_first_cbg.use_count(); + auto cbg_min_ref_cnt = first_cbg.use_count(); + + for (auto & w : waitables) { + EXPECT_EQ(w->get_count(), 0); } - // a node has some default subscribers, that need to get executed first, therefore the loop - for (int i = 0; i < 10; i++) { - executor.spin_once(std::chrono::milliseconds(10)); - if (waitables.front()->get_count() > 0) { - // stop execution, after the first waitable has been executed - break; + std::atomic_bool exception = false; + + std::thread t([&executor, &exception]() { + try { + executor.spin(); + } catch (const std::exception & e) { + exception = true; + } catch (...) { + exception = true; + } + }); + + size_t start_count = cbg_start->get_count(); + cbg_start->trigger_and_hold_execute(); + + //wait until the first waitable is executed and blocks the callback_group + while (cbg_start->get_count() == start_count) { + std::this_thread::sleep_for(1ms); + } + + for (auto & w : waitables) { + w->trigger(); + } + + // trigger w3 to make sure, the MemoryStrategy clears its internal list of ready entities + { + auto cnt = w3->get_count(); + w3->trigger(); + while (w3->get_count() == cnt) { + std::this_thread::sleep_for(1ms); } } - EXPECT_EQ(waitables.front()->get_count(), 1); + // observe the use counts of non_triggered_in_first_cbg, non_triggered_in_first_cbg2 and first_cbg + // in order to fugure out if MemoryStrategy::collect_enties is beeing calles + while (true) { + w3->trigger(); + bool restart = false; - // block the callback group, this is something that may happen during multi threaded execution - // This removes my_waitable2 from the list of ready events, and triggers a call to wait_for_work - callback_group->can_be_taken_from().exchange(false); + // There should be no reference to our waitiable + while (min_ref_cnt != non_triggered_in_first_cbg.use_count() || + min_ref_cnt != non_triggered_in_first_cbg2.use_count()) + { + } - bool no_ready_executable = false; + // wait for the callback group to be taken + while (true) { + // node and callback group ptrs are referenced + if (cbg_min_ref_cnt != first_cbg.use_count()) { + break; + } - //now there should be no ready events now, - executor.spin_once_with_callback( - std::chrono::milliseconds(10), [&]() { - no_ready_executable = true; - }); + // is we got more references to the waitable, while the group pointer is not referenced, this is the wrong spot + if (min_ref_cnt != non_triggered_in_first_cbg.use_count() || + min_ref_cnt != non_triggered_in_first_cbg2.use_count()) + { + restart = true; + break; + } + } - EXPECT_TRUE(no_ready_executable); + if (restart) { + continue; + } - //rearm, so that rmw_wait will push a second entry into the queue - for (auto & waitable : waitables) { - waitable->trigger(); - } + // callback group pointer is referenced + while (true) { + // trigger criteria, both pointers were collected + if (min_ref_cnt != non_triggered_in_first_cbg.use_count() && + min_ref_cnt != non_triggered_in_first_cbg2.use_count()) + { + break; + } - no_ready_executable = false; + // invalid, second pointer is referenced, but not first one + if (min_ref_cnt == non_triggered_in_first_cbg.use_count() && + min_ref_cnt != non_triggered_in_first_cbg2.use_count()) + { + restart = true; + break; + } - while (!no_ready_executable) { - executor.spin_once_with_callback( - std::chrono::milliseconds(10), [&]() { - //unblock the callback group - callback_group->can_be_taken_from().exchange(true); + // group or node pointer was released, while waitable poitner were not taken. + if (cbg_min_ref_cnt == first_cbg.use_count()) { + restart = true; + break; + } - no_ready_executable = true; + } + if (restart) { + continue; + } - }); + break; } - EXPECT_TRUE(no_ready_executable); - // now we process all events from get_next_ready_executable - EXPECT_NO_THROW( - for (int i = 0; i < 10; i++) { - executor.spin_once(std::chrono::milliseconds(1)); + //we unblock the callback_group now, this should force the race condition + cbg_start->release_execute(); + + std::this_thread::yield(); + std::this_thread::sleep_for(10ms); + + size_t end_count = cbg_end->get_count(); + cbg_end->trigger(); + + // wait for all triggers to be executed, or for an exception to occur + while (end_count == cbg_end->get_count() && !exception) { + std::this_thread::sleep_for(10ms); } - ); - node.reset(); + EXPECT_FALSE(exception); + node.reset(); rclcpp::shutdown(); -} + t.join(); +} TYPED_TEST(TestExecutorsOnlyNode, missing_event) { @@ -732,6 +842,11 @@ TYPED_TEST(TestExecutorsOnlyNode, missing_event) executor.spin_until_future_complete(my_waitable2_execute_future, max_spin_duration); } + EXPECT_EQ(1u, my_waitable->get_count()); + EXPECT_EQ(1u, my_waitable2->get_count()); + //now the second waitable should get processed + executor.spin_once(std::chrono::milliseconds(10)); + EXPECT_EQ(1u, my_waitable->get_count()); EXPECT_EQ(1u, my_waitable2->get_count()); }