diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 132b8150fe..0fa95fd18d 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -550,6 +550,15 @@ class Executor AnyExecutable & any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + /// This function triggers a recollect of all entities that are registered to the executor. + /** + * Calling this function is thread safe. + * + * \param[in] notify if true will execute a trigger that will wake up a waiting executor + */ + void + trigger_entity_recollect(bool notify); + /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. std::atomic_bool spinning; diff --git a/rclcpp/include/rclcpp/wait_result.hpp b/rclcpp/include/rclcpp/wait_result.hpp index 74d3d2183c..3384a7846a 100644 --- a/rclcpp/include/rclcpp/wait_result.hpp +++ b/rclcpp/include/rclcpp/wait_result.hpp @@ -274,7 +274,7 @@ class WaitResult final if (this->kind() == WaitResultKind::Ready) { auto & wait_set = this->get_wait_set(); - auto rcl_wait_set = wait_set.get_rcl_wait_set(); + auto & rcl_wait_set = wait_set.get_rcl_wait_set(); while (next_waitable_index_ < wait_set.size_of_waitables()) { auto cur_waitable = wait_set.waitables(next_waitable_index_++); if (cur_waitable != nullptr && cur_waitable->is_ready(rcl_wait_set)) { diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 0b78e12912..9e9a737b09 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -72,13 +73,10 @@ Executor::Executor(const rclcpp::ExecutorOptions & options) } }); - notify_waitable_->set_on_ready_callback( - [this](auto, auto) { - this->entities_need_rebuild_.store(true); - }); - notify_waitable_->add_guard_condition(interrupt_guard_condition_); notify_waitable_->add_guard_condition(shutdown_guard_condition_); + + wait_set_.add_waitable(notify_waitable_); } Executor::~Executor() @@ -122,6 +120,20 @@ Executor::~Executor() } } +void Executor::trigger_entity_recollect(bool notify) +{ + this->entities_need_rebuild_.store(true); + + if (!spinning.load() && entities_need_rebuild_.exchange(false)) { + std::lock_guard guard(mutex_); + this->collect_entities(); + } + + if (notify) { + interrupt_guard_condition_->trigger(); + } +} + std::vector Executor::get_all_callback_groups() { @@ -152,19 +164,12 @@ Executor::add_callback_group( (void) node_ptr; this->collector_.add_callback_group(group_ptr); - if (!spinning.load()) { - std::lock_guard guard(mutex_); - this->collect_entities(); - } - - if (notify) { - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( - std::string( - "Failed to trigger guard condition on callback group add: ") + ex.what()); - } + try { + this->trigger_entity_recollect(notify); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group add: ") + ex.what()); } } @@ -173,19 +178,12 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt { this->collector_.add_node(node_ptr); - if (!spinning.load()) { - std::lock_guard guard(mutex_); - this->collect_entities(); - } - - if (notify) { - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( - std::string( - "Failed to trigger guard condition on node add: ") + ex.what()); - } + try { + this->trigger_entity_recollect(notify); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on node add: ") + ex.what()); } } @@ -196,18 +194,12 @@ Executor::remove_callback_group( { this->collector_.remove_callback_group(group_ptr); - if (!spinning.load()) { - std::lock_guard guard(mutex_); - this->collect_entities(); - } - if (notify) { - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( - std::string( - "Failed to trigger guard condition on callback group remove: ") + ex.what()); - } + try { + this->trigger_entity_recollect(notify); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group remove: ") + ex.what()); } } @@ -222,19 +214,12 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node { this->collector_.remove_node(node_ptr); - if (!spinning.load()) { - std::lock_guard guard(mutex_); - this->collect_entities(); - } - - if (notify) { - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( - std::string( - "Failed to trigger guard condition on node remove: ") + ex.what()); - } + try { + this->trigger_entity_recollect(notify); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on node remove: ") + ex.what()); } } @@ -379,6 +364,10 @@ Executor::execute_any_executable(AnyExecutable & any_exec) return; } + assert( + (void("cannot execute an AnyExecutable without a valid callback group"), + any_exec.callback_group)); + if (any_exec.timer) { TRACETOOLS_TRACEPOINT( rclcpp_executor_execute, @@ -403,9 +392,7 @@ Executor::execute_any_executable(AnyExecutable & any_exec) } // Reset the callback_group, regardless of type - if (any_exec.callback_group) { - any_exec.callback_group->can_be_taken_from().store(true); - } + any_exec.callback_group->can_be_taken_from().store(true); } template @@ -642,7 +629,6 @@ Executor::collect_entities() // In the case that an entity already has an expired weak pointer // before being removed from the waitset, additionally prune the waitset. this->wait_set_.prune_deleted_entities(); - this->entities_need_rebuild_.store(false); } void @@ -655,7 +641,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) { std::lock_guard guard(mutex_); - if (current_collection_.empty() || this->entities_need_rebuild_.load()) { + if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) { this->collect_entities(); } } @@ -664,6 +650,13 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) RCUTILS_LOG_WARN_NAMED( "rclcpp", "empty wait set received in wait(). This should never happen."); + } else { + if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) { + auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set(); + if (current_notify_waitable_->is_ready(rcl_wait_set)) { + current_notify_waitable_->execute(current_notify_waitable_->take_data()); + } + } } } @@ -689,7 +682,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) auto entity_iter = current_collection_.timers.find(timer->get_timer_handle().get()); if (entity_iter != current_collection_.timers.end()) { auto callback_group = entity_iter->second.callback_group.lock(); - if (callback_group && !callback_group->can_be_taken_from()) { + if (!callback_group || !callback_group->can_be_taken_from()) { current_timer_index++; continue; } @@ -719,7 +712,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) subscription->get_subscription_handle().get()); if (entity_iter != current_collection_.subscriptions.end()) { auto callback_group = entity_iter->second.callback_group.lock(); - if (callback_group && !callback_group->can_be_taken_from()) { + if (!callback_group || !callback_group->can_be_taken_from()) { continue; } any_executable.subscription = subscription; @@ -735,7 +728,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) auto entity_iter = current_collection_.services.find(service->get_service_handle().get()); if (entity_iter != current_collection_.services.end()) { auto callback_group = entity_iter->second.callback_group.lock(); - if (callback_group && !callback_group->can_be_taken_from()) { + if (!callback_group || !callback_group->can_be_taken_from()) { continue; } any_executable.service = service; @@ -751,7 +744,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) auto entity_iter = current_collection_.clients.find(client->get_client_handle().get()); if (entity_iter != current_collection_.clients.end()) { auto callback_group = entity_iter->second.callback_group.lock(); - if (callback_group && !callback_group->can_be_taken_from()) { + if (!callback_group || !callback_group->can_be_taken_from()) { continue; } any_executable.client = client; @@ -767,7 +760,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) auto entity_iter = current_collection_.waitables.find(waitable.get()); if (entity_iter != current_collection_.waitables.end()) { auto callback_group = entity_iter->second.callback_group.lock(); - if (callback_group && !callback_group->can_be_taken_from()) { + if (!callback_group || !callback_group->can_be_taken_from()) { continue; } any_executable.waitable = waitable; diff --git a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp index 765002b2ef..68ac56b656 100644 --- a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp +++ b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp @@ -153,7 +153,7 @@ ready_executables( continue; } auto group_info = group_cache(entity_iter->second.callback_group); - if (group_info && !group_info->can_be_taken_from().load()) { + if (!group_info || !group_info->can_be_taken_from().load()) { continue; } if (!entity->call()) { @@ -176,7 +176,7 @@ ready_executables( continue; } auto group_info = group_cache(entity_iter->second.callback_group); - if (group_info && !group_info->can_be_taken_from().load()) { + if (!group_info || !group_info->can_be_taken_from().load()) { continue; } rclcpp::AnyExecutable exec; @@ -196,7 +196,7 @@ ready_executables( continue; } auto group_info = group_cache(entity_iter->second.callback_group); - if (group_info && !group_info->can_be_taken_from().load()) { + if (!group_info || !group_info->can_be_taken_from().load()) { continue; } rclcpp::AnyExecutable exec; @@ -216,7 +216,7 @@ ready_executables( continue; } auto group_info = group_cache(entity_iter->second.callback_group); - if (group_info && !group_info->can_be_taken_from().load()) { + if (!group_info || !group_info->can_be_taken_from().load()) { continue; } rclcpp::AnyExecutable exec; @@ -236,7 +236,7 @@ ready_executables( continue; } auto group_info = group_cache(entry.callback_group); - if (group_info && !group_info->can_be_taken_from().load()) { + if (!group_info || !group_info->can_be_taken_from().load()) { continue; } rclcpp::AnyExecutable exec; diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index 2d837103e5..d517ccafd0 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -110,7 +110,7 @@ StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout) std::optional> StaticSingleThreadedExecutor::collect_and_wait(std::chrono::nanoseconds timeout) { - if (current_collection_.empty() || this->entities_need_rebuild_.load()) { + if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) { this->collect_entities(); } auto wait_result = wait_set_.wait(std::chrono::nanoseconds(timeout)); @@ -119,6 +119,13 @@ StaticSingleThreadedExecutor::collect_and_wait(std::chrono::nanoseconds timeout) "rclcpp", "empty wait set received in wait(). This should never happen."); return {}; + } else { + if (wait_result.kind() == WaitResultKind::Ready && current_notify_waitable_) { + auto & rcl_wait_set = wait_result.get_wait_set().get_rcl_wait_set(); + if (current_notify_waitable_->is_ready(rcl_wait_set)) { + current_notify_waitable_->execute(current_notify_waitable_->take_data()); + } + } } return wait_result; } diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index d52c94b1d2..ea29f7a288 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -473,6 +473,15 @@ if(TARGET test_executors) target_link_libraries(test_executors_timer_cancel_behavior ${PROJECT_NAME} ${rosgraph_msgs_TARGETS}) endif() +ament_add_gtest( + test_executors_callback_group_behavior + executors/test_executors_callback_group_behavior.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}" + TIMEOUT 180) +if(TARGET test_executors) + target_link_libraries(test_executors_callback_group_behavior ${PROJECT_NAME}) +endif() + ament_add_gtest( test_executors_intraprocess executors/test_executors_intraprocess.cpp diff --git a/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp b/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp new file mode 100644 index 0000000000..49391cd838 --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp @@ -0,0 +1,156 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This test checks that when callback groups go out of scope, that their associated executable + * entities should not be returned as valid executables. + * + * The test makes use of a bit of executor internals, but is meant to prevent regressions of behavior. + * Ref: https://github.com/ros2/rclcpp/issues/2474 + */ + +#include + +#include +#include + +#include +#include +#include + +std::chrono::milliseconds g_timer_period {1}; + +class CustomExecutor : public rclcpp::Executor +{ +public: + explicit CustomExecutor(const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions()) + : rclcpp::Executor(options) + {} + + ~CustomExecutor() override = default; + + void spin() override {} + + void collect() + { + this->collect_entities(); + } + + void wait() + { + this->wait_for_work(g_timer_period * 10); + } + + size_t collected_timers() const + { + return this->current_collection_.timers.size(); + } + + rclcpp::AnyExecutable next() + { + rclcpp::AnyExecutable ret; + this->get_next_ready_executable(ret); + return ret; + } +}; + + +TEST(TestCallbackGroup, valid_callback_group) +{ + rclcpp::init(0, nullptr); + + // Create a timer associated with a callback group + auto node = std::make_shared("node"); + + std::promise promise; + std::future future = promise.get_future(); + auto timer_callback = [&promise]() { + promise.set_value(); + }; + + // Add the callback group to the executor + auto executor = CustomExecutor(); + auto cbg = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, true); + auto timer = node->create_wall_timer(g_timer_period, timer_callback, cbg); + executor.add_callback_group(cbg, node->get_node_base_interface()); + + ASSERT_EQ( + rclcpp::FutureReturnCode::SUCCESS, + executor.spin_until_future_complete(future, std::chrono::seconds(10))); + + // Collect the entities + executor.collect(); + EXPECT_EQ(1u, executor.collected_timers()); + + executor.wait(); + auto next_executable = executor.next(); + EXPECT_EQ(timer, next_executable.timer); + EXPECT_EQ(cbg, next_executable.callback_group); + EXPECT_NE(nullptr, next_executable.data); + + EXPECT_EQ(nullptr, next_executable.client); + EXPECT_EQ(nullptr, next_executable.service); + EXPECT_EQ(nullptr, next_executable.subscription); + EXPECT_EQ(nullptr, next_executable.waitable); + + rclcpp::shutdown(); +} + +TEST(TestCallbackGroup, invalid_callback_group) +{ + rclcpp::init(0, nullptr); + + // Create a timer associated with a callback group + auto node = std::make_shared("node"); + + std::promise promise; + std::future future = promise.get_future(); + auto timer_callback = [&promise]() { + promise.set_value(); + }; + + // Add the callback group to the executor + auto executor = CustomExecutor(); + auto cbg = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, false); + auto timer = node->create_wall_timer(g_timer_period, timer_callback, cbg); + executor.add_callback_group(cbg, node->get_node_base_interface()); + + ASSERT_EQ( + rclcpp::FutureReturnCode::SUCCESS, + executor.spin_until_future_complete(future, std::chrono::seconds(10))); + + // Collect the entities + executor.collect(); + EXPECT_EQ(1u, executor.collected_timers()); + + executor.wait(); + + cbg.reset(); + + // Since the callback group has been reset, this should not be allowed to + // be a valid executable (timer and cbg should both be nullptr). + // In the regression, timer == next_executable.timer whil + // next_executable.callback_group == nullptr, which was incorrect. + auto next_executable = executor.next(); + EXPECT_EQ(nullptr, next_executable.timer); + EXPECT_EQ(nullptr, next_executable.callback_group); + + EXPECT_EQ(nullptr, next_executable.client); + EXPECT_EQ(nullptr, next_executable.service); + EXPECT_EQ(nullptr, next_executable.subscription); + EXPECT_EQ(nullptr, next_executable.waitable); + EXPECT_EQ(nullptr, next_executable.data); + + rclcpp::shutdown(); +}