Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mutex to protect events_executor current entity collection #2187

Merged
merged 3 commits into from May 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -253,6 +253,8 @@ class EventsExecutor : public rclcpp::Executor
typename CollectionType::EntitySharedPtr
retrieve_entity(typename CollectionType::Key entity_id, CollectionType & collection)
{
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
fujitatomoya marked this conversation as resolved.
Show resolved Hide resolved

// Check if the entity_id is in the collection
auto it = collection.find(entity_id);
if (it == collection.end()) {
Expand All @@ -274,9 +276,12 @@ class EventsExecutor : public rclcpp::Executor
rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue_;

std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollector> entities_collector_;
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;

/// Mutex to protect the current_entities_collection_
std::recursive_mutex collection_mutex_;
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;

/// Flag used to reduce the number of unnecessary waitable events
std::atomic<bool> notify_waitable_event_pushed_ {false};

Expand Down
Expand Up @@ -386,11 +386,15 @@ EventsExecutor::get_automatically_added_callback_groups_from_nodes()
void
EventsExecutor::refresh_current_collection_from_callback_groups()
{
// Build the new collection
this->entities_collector_->update_collections();
auto callback_groups = this->entities_collector_->get_all_callback_groups();
rclcpp::executors::ExecutorEntitiesCollection new_collection;
rclcpp::executors::build_entities_collection(callback_groups, new_collection);

// Acquire lock before modifying the current collection
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
fujitatomoya marked this conversation as resolved.
Show resolved Hide resolved

// TODO(alsora): this may be implemented in a better way.
// We need the notify waitable to be included in the executor "current_collection"
// because we need to be able to retrieve events for it.
Expand Down
70 changes: 70 additions & 0 deletions rclcpp/test/rclcpp/executors/test_executors.cpp
Expand Up @@ -796,6 +796,76 @@ TYPED_TEST(TestExecutors, testRaceConditionAddNode)
}
}

// This test verifies the thread-safety of adding and removing a node
// while the executor is spinning and events are ready.
// This test does not contain expectations, but rather it verifies that
// we can run a "stressful routine" without crashing.
TYPED_TEST(TestExecutors, stressAddRemoveNode)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}

// Spawn some threads to do some heavy work
std::atomic<bool> should_cancel = false;
std::vector<std::thread> stress_threads;
for (size_t i = 0; i < 5 * std::thread::hardware_concurrency(); i++) {
stress_threads.emplace_back(
[&should_cancel, i]() {
// This is just some arbitrary heavy work
volatile size_t total = 0;
for (size_t k = 0; k < 549528914167; k++) {
if (should_cancel) {
break;
}
total += k * (i + 42);
}
});
}
fujitatomoya marked this conversation as resolved.
Show resolved Hide resolved

ExecutorType executor;

// A timer that is "always" ready (the timer callback doesn't do anything)
auto timer = this->node->create_wall_timer(std::chrono::nanoseconds(1), []() {});

// This thread spins the executor until it's cancelled
std::thread spinner_thread([&]() {
executor.spin();
});

// This thread publishes data in a busy loop (the node has a subscription)
std::thread publisher_thread([&]() {
for (size_t i = 0; i < 10000; i++) {
this->publisher->publish(test_msgs::msg::Empty());
}
});

// This thread adds/remove the node that contains the entities in a busy loop
std::thread add_remove_thread([&]() {
for (size_t i = 0; i < 10000; i++) {
executor.add_node(this->node);
executor.remove_node(this->node);
}
});

// Wait for the threads that do real work to finish
publisher_thread.join();
add_remove_thread.join();

// The test is now completed: we can join the threads
should_cancel = true;
for (auto & t : stress_threads) {
t.join();
}
executor.cancel();
spinner_thread.join();
}

// Check spin_until_future_complete with node base pointer (instantiates its own executor)
TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
{
Expand Down