Skip to content

Commit

Permalink
Fix race condition in events-executor (ros2#2177) (ros2#2191)
Browse files Browse the repository at this point in the history
The initial implementation of the events-executor contained a bug where the executor
would end up in an inconsistent state and stop processing interrupt/shutdown notifications.
Manually adding a node to the executor results in a) producing a notify waitable event
and b) refreshing the executor collections.
The inconsistent state would happen if the event was processed before the collections were
finished to be refreshed: the executor would pick up the event but be unable to process it.
This would leave the `notify_waitable_event_pushed_` flag to true, preventing additional
notify waitable events to be pushed.
The behavior is observable only under heavy load.

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>
(cherry picked from commit 6e1fea1)

Co-authored-by: Alberto Soragna <alberto.soragna@gmail.com>
Signed-off-by: Ignacio Vizzo <ignacio@dexory.com>
  • Loading branch information
2 people authored and nachovizzo committed Oct 3, 2023
1 parent 7538e40 commit 060c1b3
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ class EventsExecutor : public rclcpp::Executor
std::function<void(size_t, int)>
create_waitable_callback(const rclcpp::Waitable * waitable_id);

/// Utility to add the notify waitable to an entities collection
void
add_notify_waitable_to_collection(
rclcpp::executors::ExecutorEntitiesCollection::WaitableCollection & collection);

/// Searches for the provided entity_id in the collection and returns the entity if valid
template<typename CollectionType>
typename CollectionType::EntitySharedPtr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ EventsExecutor::EventsExecutor(
timers_manager_ =
std::make_shared<rclcpp::experimental::TimersManager>(context_, timer_on_ready_cb);

this->current_entities_collection_ =
std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();

notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
[this]() {
// This callback is invoked when:
Expand All @@ -61,6 +64,10 @@ EventsExecutor::EventsExecutor(
this->refresh_current_collection_from_callback_groups();
});

// Make sure that the notify waitable is immediately added to the collection
// to avoid missing events
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);

notify_waitable_->add_guard_condition(interrupt_guard_condition_);
notify_waitable_->add_guard_condition(shutdown_guard_condition_);

Expand All @@ -87,9 +94,6 @@ EventsExecutor::EventsExecutor(

this->entities_collector_ =
std::make_shared<rclcpp::executors::ExecutorEntitiesCollector>(notify_waitable_);

this->current_entities_collection_ =
std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();
}

EventsExecutor::~EventsExecutor()
Expand Down Expand Up @@ -395,18 +399,8 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
// retrieved in the "standard" way.
// To do it, we need to add the notify waitable as an entry in both the new and
// current collections such that it's neither added or removed.
rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
new_collection.waitables.insert(
{
this->notify_waitable_.get(),
{this->notify_waitable_, weak_group_ptr}
});

this->current_entities_collection_->waitables.insert(
{
this->notify_waitable_.get(),
{this->notify_waitable_, weak_group_ptr}
});
this->add_notify_waitable_to_collection(new_collection.waitables);
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);

this->refresh_current_collection(new_collection);
}
Expand Down Expand Up @@ -486,3 +480,16 @@ EventsExecutor::create_waitable_callback(const rclcpp::Waitable * entity_key)
};
return callback;
}

void
EventsExecutor::add_notify_waitable_to_collection(
rclcpp::executors::ExecutorEntitiesCollection::WaitableCollection & collection)
{
// The notify waitable is not associated to any group, so use an invalid one
rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
collection.insert(
{
this->notify_waitable_.get(),
{this->notify_waitable_, weak_group_ptr}
});
}
85 changes: 76 additions & 9 deletions rclcpp/test/rclcpp/executors/test_executors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
#include <gtest/gtest.h>

#include <algorithm>
#include <atomic>
#include <chrono>
#include <limits>
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include <vector>

#include "rcl/error_handling.h"
#include "rcl/time.h"
Expand All @@ -43,18 +45,10 @@ template<typename T>
class TestExecutors : public ::testing::Test
{
public:
static void SetUpTestCase()
void SetUp()
{
rclcpp::init(0, nullptr);
}

static void TearDownTestCase()
{
rclcpp::shutdown();
}

void SetUp()
{
const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info();
std::stringstream test_name;
test_name << test_info->test_case_name() << "_" << test_info->name();
Expand All @@ -75,6 +69,8 @@ class TestExecutors : public ::testing::Test
publisher.reset();
subscription.reset();
node.reset();

rclcpp::shutdown();
}

rclcpp::Node::SharedPtr node;
Expand Down Expand Up @@ -729,6 +725,77 @@ TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteInterrupted)
spinner.join();
}

// This test verifies that the add_node operation is robust wrt race conditions.
// It's mostly meant to prevent regressions in the events-executor, but the operation should be
// thread-safe in all executor implementations.
// The initial implementation of the events-executor contained a bug where the executor
// would end up in an inconsistent state and stop processing interrupt/shutdown notifications.
// Manually adding a node to the executor results in a) producing a notify waitable event
// and b) refreshing the executor collections.
// The inconsistent state would happen if the event was processed before the collections were
// finished to be refreshed: the executor would pick up the event but be unable to process it.
// This would leave the `notify_waitable_event_pushed_` flag to true, preventing additional
// notify waitable events to be pushed.
// The behavior is observable only under heavy load, so this test spawns several worker
// threads. Due to the nature of the bug, this test may still succeed even if the
// bug is present. However repeated runs will show its flakiness nature and indicate
// an eventual regression.
TYPED_TEST(TestExecutors, testRaceConditionAddNode)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}

// Spawn some threads to do some heavy work
std::atomic<bool> should_cancel = false;
std::vector<std::thread> stress_threads;
for (size_t i = 0; i < 5 * std::thread::hardware_concurrency(); i++) {
stress_threads.emplace_back(
[&should_cancel, i]() {
// This is just some arbitrary heavy work
volatile size_t total = 0;
for (size_t k = 0; k < 549528914167; k++) {
if (should_cancel) {
break;
}
total += k * (i + 42);
}
});
}

// Create an executor
auto executor = std::make_shared<ExecutorType>();
// Start spinning
auto executor_thread = std::thread(
[executor]() {
executor->spin();
});
// Add a node to the executor
executor->add_node(this->node);

// Cancel the executor (make sure that it's already spinning first)
while (!executor->is_spinning() && rclcpp::ok()) {
continue;
}
executor->cancel();

// Try to join the thread after cancelling the executor
// This is the "test". We want to make sure that we can still cancel the executor
// regardless of the presence of race conditions
executor_thread.join();

// The test is now completed: we can join the stress threads
should_cancel = true;
for (auto & t : stress_threads) {
t.join();
}
}

// Check spin_until_future_complete with node base pointer (instantiates its own executor)
TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
{
Expand Down

0 comments on commit 060c1b3

Please sign in to comment.