From 0b58d56ca7c5bfd7b7201bd0b7a05b4699cad2d3 Mon Sep 17 00:00:00 2001 From: Dirk Thomas Date: Thu, 29 Aug 2019 15:09:39 -0700 Subject: [PATCH] add mutex in add/remove_node and wait_for_work to protect concurrent use/change of memory_strategy_ (#837) Signed-off-by: Dirk Thomas Signed-off-by: Zachary Michaels --- rclcpp/include/rclcpp/executor.hpp | 4 ++ rclcpp/src/rclcpp/executor.cpp | 72 ++++++++++++++++-------------- 2 files changed, 43 insertions(+), 33 deletions(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 8471754f30..b062915da8 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -355,6 +356,9 @@ class Executor /// Wait set for managing entities that the rmw layer waits on. rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set(); + // Mutex to protect the subsequent memory_strategy_. + std::mutex memory_strategy_mutex_; + /// The memory strategy: an interface for handling user-defined memory allocation strategies. memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 2a47b71bbd..25ef632ae5 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -140,6 +140,7 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt } } // Add the node's notify condition to the guard condition handles + std::unique_lock lock(memory_strategy_mutex_); memory_strategy_->add_guard_condition(node_ptr->get_notify_guard_condition()); } @@ -178,6 +179,7 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node } } } + std::unique_lock lock(memory_strategy_mutex_); memory_strategy_->remove_guard_condition(node_ptr->get_notify_guard_condition()); } @@ -423,43 +425,47 @@ Executor::execute_client( void Executor::wait_for_work(std::chrono::nanoseconds timeout) { - // Collect the subscriptions and timers to be waited on - memory_strategy_->clear_handles(); - bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); - - // Clean up any invalid nodes, if they were detected - if (has_invalid_weak_nodes) { - auto node_it = weak_nodes_.begin(); - auto gc_it = guard_conditions_.begin(); - while (node_it != weak_nodes_.end()) { - if (node_it->expired()) { - node_it = weak_nodes_.erase(node_it); - memory_strategy_->remove_guard_condition(*gc_it); - gc_it = guard_conditions_.erase(gc_it); - } else { - ++node_it; - ++gc_it; + { + std::unique_lock lock(memory_strategy_mutex_); + + // Collect the subscriptions and timers to be waited on + memory_strategy_->clear_handles(); + bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); + + // Clean up any invalid nodes, if they were detected + if (has_invalid_weak_nodes) { + auto node_it = weak_nodes_.begin(); + auto gc_it = guard_conditions_.begin(); + while (node_it != weak_nodes_.end()) { + if (node_it->expired()) { + node_it = weak_nodes_.erase(node_it); + memory_strategy_->remove_guard_condition(*gc_it); + gc_it = guard_conditions_.erase(gc_it); + } else { + ++node_it; + ++gc_it; + } } } - } - // clear wait set - if (rcl_wait_set_clear(&wait_set_) != RCL_RET_OK) { - throw std::runtime_error("Couldn't clear wait set"); - } + // clear wait set + if (rcl_wait_set_clear(&wait_set_) != RCL_RET_OK) { + throw std::runtime_error("Couldn't clear wait set"); + } - // The size of waitables are accounted for in size of the other entities - rcl_ret_t ret = rcl_wait_set_resize( - &wait_set_, memory_strategy_->number_of_ready_subscriptions(), - memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), - memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), - memory_strategy_->number_of_ready_events()); - if (RCL_RET_OK != ret) { - throw std::runtime_error( - std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str); - } + // The size of waitables are accounted for in size of the other entities + rcl_ret_t ret = rcl_wait_set_resize( + &wait_set_, memory_strategy_->number_of_ready_subscriptions(), + memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), + memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), + memory_strategy_->number_of_ready_events()); + if (RCL_RET_OK != ret) { + throw std::runtime_error( + std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str); + } - if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) { - throw std::runtime_error("Couldn't fill wait set"); + if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) { + throw std::runtime_error("Couldn't fill wait set"); + } } rcl_ret_t status = rcl_wait(&wait_set_, std::chrono::duration_cast(timeout).count());