diff --git a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp index 2365b67697..dd5b1ebe63 100644 --- a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp +++ b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp @@ -274,9 +274,12 @@ class EventsExecutor : public rclcpp::Executor rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue_; std::shared_ptr entities_collector_; - std::shared_ptr current_entities_collection_; std::shared_ptr notify_waitable_; + /// Mutex to protect the current_entities_collection_ + std::recursive_mutex collection_mutex_; + std::shared_ptr current_entities_collection_; + /// Flag used to reduce the number of unnecessary waitable events std::atomic notify_waitable_event_pushed_ {false}; diff --git a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp index 33e9cb67bd..64b07c0814 100644 --- a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp +++ b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp @@ -273,10 +273,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event) switch (event.type) { case ExecutorEventType::CLIENT_EVENT: { - auto client = this->retrieve_entity( - static_cast(event.entity_key), - current_entities_collection_->clients); - + rclcpp::ClientBase::SharedPtr client; + { + std::lock_guard lock(collection_mutex_); + client = this->retrieve_entity( + static_cast(event.entity_key), + current_entities_collection_->clients); + } if (client) { for (size_t i = 0; i < event.num_events; i++) { execute_client(client); @@ -287,9 +290,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event) } case ExecutorEventType::SUBSCRIPTION_EVENT: { - auto subscription = this->retrieve_entity( - static_cast(event.entity_key), - current_entities_collection_->subscriptions); + rclcpp::SubscriptionBase::SharedPtr subscription; + { + std::lock_guard lock(collection_mutex_); + subscription = this->retrieve_entity( + static_cast(event.entity_key), + current_entities_collection_->subscriptions); + } if (subscription) { for (size_t i = 0; i < event.num_events; i++) { execute_subscription(subscription); @@ -299,10 +306,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event) } case ExecutorEventType::SERVICE_EVENT: { - auto service = this->retrieve_entity( - static_cast(event.entity_key), - current_entities_collection_->services); - + rclcpp::ServiceBase::SharedPtr service; + { + std::lock_guard lock(collection_mutex_); + service = this->retrieve_entity( + static_cast(event.entity_key), + current_entities_collection_->services); + } if (service) { for (size_t i = 0; i < event.num_events; i++) { execute_service(service); @@ -319,9 +329,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event) } case ExecutorEventType::WAITABLE_EVENT: { - auto waitable = this->retrieve_entity( - static_cast(event.entity_key), - current_entities_collection_->waitables); + rclcpp::Waitable::SharedPtr waitable; + { + std::lock_guard lock(collection_mutex_); + waitable = this->retrieve_entity( + static_cast(event.entity_key), + current_entities_collection_->waitables); + } if (waitable) { for (size_t i = 0; i < event.num_events; i++) { auto data = waitable->take_data_by_entity_id(event.waitable_data); @@ -386,6 +400,7 @@ EventsExecutor::get_automatically_added_callback_groups_from_nodes() void EventsExecutor::refresh_current_collection_from_callback_groups() { + // Build the new collection this->entities_collector_->update_collections(); auto callback_groups = this->entities_collector_->get_all_callback_groups(); rclcpp::executors::ExecutorEntitiesCollection new_collection; @@ -400,6 +415,9 @@ EventsExecutor::refresh_current_collection_from_callback_groups() // To do it, we need to add the notify waitable as an entry in both the new and // current collections such that it's neither added or removed. this->add_notify_waitable_to_collection(new_collection.waitables); + + // Acquire lock before modifying the current collection + std::lock_guard lock(collection_mutex_); this->add_notify_waitable_to_collection(current_entities_collection_->waitables); this->refresh_current_collection(new_collection); @@ -409,6 +427,9 @@ void EventsExecutor::refresh_current_collection( const rclcpp::executors::ExecutorEntitiesCollection & new_collection) { + // Acquire lock before modifying the current collection + std::lock_guard lock(collection_mutex_); + current_entities_collection_->timers.update( new_collection.timers, [this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->add_timer(timer);}, diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 232baaace3..e9aad61685 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -796,6 +796,60 @@ TYPED_TEST(TestExecutors, testRaceConditionAddNode) } } +// This test verifies the thread-safety of adding and removing a node +// while the executor is spinning and events are ready. +// This test does not contain expectations, but rather it verifies that +// we can run a "stressful routine" without crashing. +TYPED_TEST(TestExecutors, stressAddRemoveNode) +{ + using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + + ExecutorType executor; + + // A timer that is "always" ready (the timer callback doesn't do anything) + auto timer = this->node->create_wall_timer(std::chrono::nanoseconds(1), []() {}); + + // This thread spins the executor until it's cancelled + std::thread spinner_thread([&]() { + executor.spin(); + }); + + // This thread publishes data in a busy loop (the node has a subscription) + std::thread publisher_thread1([&]() { + for (size_t i = 0; i < 100000; i++) { + this->publisher->publish(test_msgs::msg::Empty()); + } + }); + std::thread publisher_thread2([&]() { + for (size_t i = 0; i < 100000; i++) { + this->publisher->publish(test_msgs::msg::Empty()); + } + }); + + // This thread adds/remove the node that contains the entities in a busy loop + std::thread add_remove_thread([&]() { + for (size_t i = 0; i < 100000; i++) { + executor.add_node(this->node); + executor.remove_node(this->node); + } + }); + + // Wait for the threads that do real work to finish + publisher_thread1.join(); + publisher_thread2.join(); + add_remove_thread.join(); + + executor.cancel(); + spinner_thread.join(); +} + // Check spin_until_future_complete with node base pointer (instantiates its own executor) TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr) {