Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in events-executor #2177

Merged
merged 6 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ class EventsExecutor : public rclcpp::Executor
std::function<void(size_t, int)>
create_waitable_callback(const rclcpp::Waitable * waitable_id);

/// Utility to add the notify waitable to an entities collection
void
add_notify_waitable_to_collection(
rclcpp::executors::ExecutorEntitiesCollection::WaitableCollection & collection);

/// Searches for the provided entity_id in the collection and returns the entity if valid
template<typename CollectionType>
typename CollectionType::EntitySharedPtr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ EventsExecutor::EventsExecutor(
timers_manager_ =
std::make_shared<rclcpp::experimental::TimersManager>(context_, timer_on_ready_cb);

this->current_entities_collection_ =
std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();

notify_waitable_ = std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
[this]() {
// This callback is invoked when:
Expand All @@ -61,6 +64,10 @@ EventsExecutor::EventsExecutor(
this->refresh_current_collection_from_callback_groups();
});

// Make sure that the notify waitable is immediately added to the collection
// to avoid missing events
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix: we immediately add the notify waitable to the current collection in the executor's constructor


notify_waitable_->add_guard_condition(interrupt_guard_condition_);
notify_waitable_->add_guard_condition(shutdown_guard_condition_);

Expand All @@ -87,9 +94,6 @@ EventsExecutor::EventsExecutor(

this->entities_collector_ =
std::make_shared<rclcpp::executors::ExecutorEntitiesCollector>(notify_waitable_);

this->current_entities_collection_ =
std::make_shared<rclcpp::executors::ExecutorEntitiesCollection>();
}

EventsExecutor::~EventsExecutor()
Expand Down Expand Up @@ -395,18 +399,8 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
// retrieved in the "standard" way.
// To do it, we need to add the notify waitable as an entry in both the new and
// current collections such that it's neither added or removed.
rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
new_collection.waitables.insert(
{
this->notify_waitable_.get(),
{this->notify_waitable_, weak_group_ptr}
});

this->current_entities_collection_->waitables.insert(
{
this->notify_waitable_.get(),
{this->notify_waitable_, weak_group_ptr}
});
this->add_notify_waitable_to_collection(new_collection.waitables);
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);

this->refresh_current_collection(new_collection);
}
Expand Down Expand Up @@ -486,3 +480,16 @@ EventsExecutor::create_waitable_callback(const rclcpp::Waitable * entity_key)
};
return callback;
}

void
EventsExecutor::add_notify_waitable_to_collection(
rclcpp::executors::ExecutorEntitiesCollection::WaitableCollection & collection)
{
// The notify waitable is not associated to any group, so use an invalid one
rclcpp::CallbackGroup::WeakPtr weak_group_ptr;
collection.insert(
{
this->notify_waitable_.get(),
{this->notify_waitable_, weak_group_ptr}
});
}
85 changes: 76 additions & 9 deletions rclcpp/test/rclcpp/executors/test_executors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
#include <gtest/gtest.h>

#include <algorithm>
#include <atomic>
#include <chrono>
#include <limits>
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include <vector>

#include "rcl/error_handling.h"
#include "rcl/time.h"
Expand All @@ -43,18 +45,10 @@ template<typename T>
class TestExecutors : public ::testing::Test
{
public:
static void SetUpTestCase()
void SetUp()
{
rclcpp::init(0, nullptr);
}

static void TearDownTestCase()
{
rclcpp::shutdown();
}

void SetUp()
{
const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info();
std::stringstream test_name;
test_name << test_info->test_case_name() << "_" << test_info->name();
Expand All @@ -75,6 +69,8 @@ class TestExecutors : public ::testing::Test
publisher.reset();
subscription.reset();
node.reset();

rclcpp::shutdown();
}

rclcpp::Node::SharedPtr node;
Expand Down Expand Up @@ -729,6 +725,77 @@ TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteInterrupted)
spinner.join();
}

// This test verifies that the add_node operation is robust wrt race conditions.
// It's mostly meant to prevent regressions in the events-executor, but the operation should be
// thread-safe in all executor implementations.
// The initial implementation of the events-executor contained a bug where the executor
// would end up in an inconsistent state and stop processing interrupt/shutdown notifications.
// Manually adding a node to the executor results in a) producing a notify waitable event
// and b) refreshing the executor collections.
// The inconsistent state would happen if the event was processed before the collections were
// finished to be refreshed: the executor would pick up the event but be unable to process it.
// This would leave the `notify_waitable_event_pushed_` flag to true, preventing additional
// notify waitable events to be pushed.
// The behavior is observable only under heavy load, so this test spawns several worker
// threads. Due to the nature of the bug, this test may still succeed even if the
// bug is present. However repeated runs will show its flakiness nature and indicate
// an eventual regression.
TYPED_TEST(TestExecutors, testRaceConditionAddNode)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}

// Spawn some threads to do some heavy work
std::atomic<bool> should_cancel = false;
std::vector<std::thread> stress_threads;
for (size_t i = 0; i < 5 * std::thread::hardware_concurrency(); i++) {
stress_threads.emplace_back(
[&should_cancel, i]() {
// This is just some arbitrary heavy work
volatile size_t total = 0;
for (size_t k = 0; k < 549528914167; k++) {
if (should_cancel) {
break;
}
total += k * (i + 42);
}
});
}

// Create an executor
auto executor = std::make_shared<ExecutorType>();
// Start spinning
auto executor_thread = std::thread(
[executor]() {
executor->spin();
});
// Add a node to the executor
executor->add_node(this->node);

// Cancel the executor (make sure that it's already spinning first)
while (!executor->is_spinning() && rclcpp::ok()) {
continue;
}
executor->cancel();

// Try to join the thread after cancelling the executor
// This is the "test". We want to make sure that we can still cancel the executor
// regardless of the presence of race conditions
executor_thread.join();

// The test is now completed: we can join the stress threads
should_cancel = true;
for (auto & t : stress_threads) {
t.join();
}
}

// Check spin_until_future_complete with node base pointer (instantiates its own executor)
TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
{
Expand Down