Skip to content

Commit

Permalink
add mutex to protect events_executor current entity collection and un…
Browse files Browse the repository at this point in the history
…it-test (ros2#2187)

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>
  • Loading branch information
alsora authored and Alberto Soragna committed May 14, 2023
1 parent 9c82596 commit 697dc50
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 15 deletions.
Expand Up @@ -274,9 +274,12 @@ class EventsExecutor : public rclcpp::Executor
rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue_;

std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollector> entities_collector_;
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;

/// Mutex to protect the current_entities_collection_
std::recursive_mutex collection_mutex_;
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;

/// Flag used to reduce the number of unnecessary waitable events
std::atomic<bool> notify_waitable_event_pushed_ {false};

Expand Down
Expand Up @@ -273,10 +273,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
switch (event.type) {
case ExecutorEventType::CLIENT_EVENT:
{
auto client = this->retrieve_entity(
static_cast<const rcl_client_t *>(event.entity_key),
current_entities_collection_->clients);

rclcpp::ClientBase::SharedPtr client;
{
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
client = this->retrieve_entity(
static_cast<const rcl_client_t *>(event.entity_key),
current_entities_collection_->clients);
}
if (client) {
for (size_t i = 0; i < event.num_events; i++) {
execute_client(client);
Expand All @@ -287,9 +290,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
}
case ExecutorEventType::SUBSCRIPTION_EVENT:
{
auto subscription = this->retrieve_entity(
static_cast<const rcl_subscription_t *>(event.entity_key),
current_entities_collection_->subscriptions);
rclcpp::SubscriptionBase::SharedPtr subscription;
{
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
subscription = this->retrieve_entity(
static_cast<const rcl_subscription_t *>(event.entity_key),
current_entities_collection_->subscriptions);
}
if (subscription) {
for (size_t i = 0; i < event.num_events; i++) {
execute_subscription(subscription);
Expand All @@ -299,10 +306,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
}
case ExecutorEventType::SERVICE_EVENT:
{
auto service = this->retrieve_entity(
static_cast<const rcl_service_t *>(event.entity_key),
current_entities_collection_->services);

rclcpp::ServiceBase::SharedPtr service;
{
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
service = this->retrieve_entity(
static_cast<const rcl_service_t *>(event.entity_key),
current_entities_collection_->services);
}
if (service) {
for (size_t i = 0; i < event.num_events; i++) {
execute_service(service);
Expand All @@ -319,9 +329,13 @@ EventsExecutor::execute_event(const ExecutorEvent & event)
}
case ExecutorEventType::WAITABLE_EVENT:
{
auto waitable = this->retrieve_entity(
static_cast<const rclcpp::Waitable *>(event.entity_key),
current_entities_collection_->waitables);
rclcpp::Waitable::SharedPtr waitable;
{
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
waitable = this->retrieve_entity(
static_cast<const rclcpp::Waitable *>(event.entity_key),
current_entities_collection_->waitables);
}
if (waitable) {
for (size_t i = 0; i < event.num_events; i++) {
auto data = waitable->take_data_by_entity_id(event.waitable_data);
Expand Down Expand Up @@ -386,6 +400,7 @@ EventsExecutor::get_automatically_added_callback_groups_from_nodes()
void
EventsExecutor::refresh_current_collection_from_callback_groups()
{
// Build the new collection
this->entities_collector_->update_collections();
auto callback_groups = this->entities_collector_->get_all_callback_groups();
rclcpp::executors::ExecutorEntitiesCollection new_collection;
Expand All @@ -400,6 +415,9 @@ EventsExecutor::refresh_current_collection_from_callback_groups()
// To do it, we need to add the notify waitable as an entry in both the new and
// current collections such that it's neither added or removed.
this->add_notify_waitable_to_collection(new_collection.waitables);

// Acquire lock before modifying the current collection
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);
this->add_notify_waitable_to_collection(current_entities_collection_->waitables);

this->refresh_current_collection(new_collection);
Expand All @@ -409,6 +427,9 @@ void
EventsExecutor::refresh_current_collection(
const rclcpp::executors::ExecutorEntitiesCollection & new_collection)
{
// Acquire lock before modifying the current collection
std::lock_guard<std::recursive_mutex> lock(collection_mutex_);

current_entities_collection_->timers.update(
new_collection.timers,
[this](rclcpp::TimerBase::SharedPtr timer) {timers_manager_->add_timer(timer);},
Expand Down
54 changes: 54 additions & 0 deletions rclcpp/test/rclcpp/executors/test_executors.cpp
Expand Up @@ -796,6 +796,60 @@ TYPED_TEST(TestExecutors, testRaceConditionAddNode)
}
}

// This test verifies the thread-safety of adding and removing a node
// while the executor is spinning and events are ready.
// This test does not contain expectations, but rather it verifies that
// we can run a "stressful routine" without crashing.
TYPED_TEST(TestExecutors, stressAddRemoveNode)
{
using ExecutorType = TypeParam;
// rmw_connextdds doesn't support events-executor
if (
std::is_same<ExecutorType, rclcpp::experimental::executors::EventsExecutor>() &&
std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0)
{
GTEST_SKIP();
}

ExecutorType executor;

// A timer that is "always" ready (the timer callback doesn't do anything)
auto timer = this->node->create_wall_timer(std::chrono::nanoseconds(1), []() {});

// This thread spins the executor until it's cancelled
std::thread spinner_thread([&]() {
executor.spin();
});

// This thread publishes data in a busy loop (the node has a subscription)
std::thread publisher_thread1([&]() {
for (size_t i = 0; i < 100000; i++) {
this->publisher->publish(test_msgs::msg::Empty());
}
});
std::thread publisher_thread2([&]() {
for (size_t i = 0; i < 100000; i++) {
this->publisher->publish(test_msgs::msg::Empty());
}
});

// This thread adds/remove the node that contains the entities in a busy loop
std::thread add_remove_thread([&]() {
for (size_t i = 0; i < 100000; i++) {
executor.add_node(this->node);
executor.remove_node(this->node);
}
});

// Wait for the threads that do real work to finish
publisher_thread1.join();
publisher_thread2.join();
add_remove_thread.join();

executor.cancel();
spinner_thread.join();
}

// Check spin_until_future_complete with node base pointer (instantiates its own executor)
TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr)
{
Expand Down

0 comments on commit 697dc50

Please sign in to comment.