Skip to content

Commit

Permalink
Fix race condition in events-executor (ros2#2177)
Browse files Browse the repository at this point in the history
The initial implementation of the events-executor contained a bug where the executor
would end up in an inconsistent state and stop processing interrupt/shutdown notifications.
Manually adding a node to the executor results in a) producing a notify waitable event
and b) refreshing the executor collections.
The inconsistent state would happen if the event was processed before the collections were
finished to be refreshed: the executor would pick up the event but be unable to process it.
This would leave the `notify_waitable_event_pushed_` flag to true, preventing additional
notify waitable events to be pushed.
The behavior is observable only under heavy load.

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>
  • Loading branch information
alsora committed May 1, 2023
1 parent c396b68 commit 3670611
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ class EventsExecutor : public rclcpp::Executor
std::function<void(size_t, int)>
create_waitable_callback(const rclcpp::Waitable * waitable_id);

/// Utility to add the notify waitable to an entities collection
void
add_notify_waitable_to_collection(ExecutorEntitiesCollection::WaitableCollection & collection);

/// Searches for the provided entity_id in the collection and returns the entity if valid
template<typename CollectionType>
typename CollectionType::EntitySharedPtr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ EventsExecutor::EventsExecutor(
timers_manager_ =
std::make_shared<rclcpp::experimental::TimersManager>(context_, timer_on_ready_cb);

this->current_entities_collection_ =
std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();

notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
[this]() {
// This callback is invoked when:
Expand All @@ -61,6 +64,10 @@ EventsExecutor::EventsExecutor(
this->refresh_current_collection_from_callback_groups();
});

// Make sure that the notify waitable is immediately added to the collection
// to avoid missing events
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);

notify_waitable_->add_guard_condition(interrupt_guard_condition_);
notify_waitable_->add_guard_condition(shutdown_guard_condition_);

Expand All @@ -87,9 +94,6 @@ EventsExecutor::EventsExecutor(

this->entities_collector_ =
std::make_shared<rclcpp::executors::ExecutorEntitiesCollector>(notify_waitable_);

this->current_entities_collection_ =
std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();
}

EventsExecutor::~EventsExecutor()
Expand Down Expand Up @@ -395,18 +399,8 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
// retrieved in the "standard" way.
// To do it, we need to add the notify waitable as an entry in both the new and
// current collections such that it's neither added or removed.
rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
new_collection.waitables.insert(
{
this->notify_waitable_.get(),
{this->notify_waitable_, weak_group_ptr}
});

this->current_entities_collection_->waitables.insert(
{
this->notify_waitable_.get(),
{this->notify_waitable_, weak_group_ptr}
});
this->add_notify_waitable_to_collection(new_collection.waitables);
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);

this->refresh_current_collection(new_collection);
}
Expand Down Expand Up @@ -486,3 +480,16 @@ EventsExecutor::create_waitable_callback(const rclcpp::Waitable * entity_key)
};
return callback;
}

void
EventsExecutor::add_notify_waitable_to_collection(
ExecutorEntitiesCollection::WaitableCollection & collection)
{
// The notify waitable is not associated to any group, so use an invalid one
rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
collection.insert(
{
this->notify_waitable_.get(),
{this->notify_waitable_, weak_group_ptr}
});
}
68 changes: 68 additions & 0 deletions rclcpp/test/rclcpp/executors/test_events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,74 @@ TEST_F(TestEventsExecutor, destroy_entities)
spinner.join();
}

// This test verifies that the add_node operation is robust wrt race conditions.
// The initial implementation of the events-executor contained a bug where the executor
// would end up in an inconsistent state and stop processing interrupt/shutdown notifications.
// Manually adding a node to the executor results in a) producing a notify waitable event
// and b) refreshing the executor collections.
// The inconsistent state would happen if the event was processed before the collections were
// finished to be refreshed: the executor would pick up the event but be unable to process it.
// This would leave the `notify_waitable_event_pushed_` flag to true, preventing additional
// notify waitable events to be pushed.
// The behavior is observable only under heavy load, so this test spawns several worker
// threads. Due to the nature of the bug, this test may still succeed even if the
// bug is present. However repeated runs will show its flakiness nature and indicate
// an eventual regression.
TEST_F(TestEventsExecutor, testRaceConditionAddNode)
{
// rmw_connextdds doesn't support events-executor
if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) {
GTEST_SKIP();
}

// Spawn some threads to do some heavy work
std::atomic<bool> should_cancel = false;
std::vector<std::thread> stress_threads;
for (size_t i = 0; i < 5 * std::thread::hardware_concurrency(); i++) {
stress_threads.emplace_back([&should_cancel, i]() {
// This is just some arbitrary heavy work
size_t total = 0;
for (size_t k = 0; k < 549528914167; k++) {
if (should_cancel) {
break;
}
total += k * (i + 42);
}
// Do something with the total to avoid the thread's work being optimized away
std::cout<<"The dummy total is: "<< total<<std::endl;
});
}

// Create an executor
auto executor = std::make_shared<EventsExecutor>();
// Start spinning
auto executor_thread = std::thread([executor]() {
executor->spin();
});
// Add a node to the executor
auto node_options = ros2_test::unit_test_node_options();
auto node = std::make_shared<rclcpp::Node>("my_node", node_options);
executor->add_node(node->get_node_base_interface());

// Cancel the executor (make sure that it's already spinning first)
while (!executor->is_spinning() && rclcpp::ok())
{
continue;
}
executor->cancel();

// Try to join the thread after cancelling the executor
// This is the "test". We want to make sure that we can still cancel the executor
// regardless of the presence of race conditions
executor_thread.join();

// The test is now completed: we can join the stress threads
should_cancel = true;
for (auto & t : stress_threads) {
t.join();
}
}

// Testing construction of a subscriptions with QoS event callback functions.
std::string * g_pub_log_msg;
std::string * g_sub_log_msg;
Expand Down

0 comments on commit 3670611

Please sign in to comment.