Skip to content

Commit

Permalink
add mutex in add/remove_node and wait_for_work to protect concurrent …
Browse files Browse the repository at this point in the history
…use/change of memory_strategy_ (#837) (#857)

Signed-off-by: Dirk Thomas <dirk-thomas@users.noreply.github.com>
Signed-off-by: Zachary Michaels <zmichaels11@gmail.com>
  • Loading branch information
zmichaels11 authored and tfoote committed Sep 20, 2019
1 parent b4629bf commit 6e76503
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 33 deletions.
4 changes: 4 additions & 0 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <iostream>
#include <list>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

Expand Down Expand Up @@ -355,6 +356,9 @@ class Executor
/// Wait set for managing entities that the rmw layer waits on.
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();

// Mutex to protect the subsequent memory_strategy_.
std::mutex memory_strategy_mutex_;

/// The memory strategy: an interface for handling user-defined memory allocation strategies.
memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;

Expand Down
72 changes: 39 additions & 33 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
}
}
// Add the node's notify condition to the guard condition handles
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
memory_strategy_->add_guard_condition(node_ptr->get_notify_guard_condition());
}

Expand Down Expand Up @@ -178,6 +179,7 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node
}
}
}
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);
memory_strategy_->remove_guard_condition(node_ptr->get_notify_guard_condition());
}

Expand Down Expand Up @@ -423,43 +425,47 @@ Executor::execute_client(
void
Executor::wait_for_work(std::chrono::nanoseconds timeout)
{
// Collect the subscriptions and timers to be waited on
memory_strategy_->clear_handles();
bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_);

// Clean up any invalid nodes, if they were detected
if (has_invalid_weak_nodes) {
auto node_it = weak_nodes_.begin();
auto gc_it = guard_conditions_.begin();
while (node_it != weak_nodes_.end()) {
if (node_it->expired()) {
node_it = weak_nodes_.erase(node_it);
memory_strategy_->remove_guard_condition(*gc_it);
gc_it = guard_conditions_.erase(gc_it);
} else {
++node_it;
++gc_it;
{
std::unique_lock<std::mutex> lock(memory_strategy_mutex_);

// Collect the subscriptions and timers to be waited on
memory_strategy_->clear_handles();
bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_);

// Clean up any invalid nodes, if they were detected
if (has_invalid_weak_nodes) {
auto node_it = weak_nodes_.begin();
auto gc_it = guard_conditions_.begin();
while (node_it != weak_nodes_.end()) {
if (node_it->expired()) {
node_it = weak_nodes_.erase(node_it);
memory_strategy_->remove_guard_condition(*gc_it);
gc_it = guard_conditions_.erase(gc_it);
} else {
++node_it;
++gc_it;
}
}
}
}
// clear wait set
if (rcl_wait_set_clear(&wait_set_) != RCL_RET_OK) {
throw std::runtime_error("Couldn't clear wait set");
}
// clear wait set
if (rcl_wait_set_clear(&wait_set_) != RCL_RET_OK) {
throw std::runtime_error("Couldn't clear wait set");
}

// The size of waitables are accounted for in size of the other entities
rcl_ret_t ret = rcl_wait_set_resize(
&wait_set_, memory_strategy_->number_of_ready_subscriptions(),
memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
memory_strategy_->number_of_ready_events());
if (RCL_RET_OK != ret) {
throw std::runtime_error(
std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str);
}
// The size of waitables are accounted for in size of the other entities
rcl_ret_t ret = rcl_wait_set_resize(
&wait_set_, memory_strategy_->number_of_ready_subscriptions(),
memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(),
memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(),
memory_strategy_->number_of_ready_events());
if (RCL_RET_OK != ret) {
throw std::runtime_error(
std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str);
}

if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) {
throw std::runtime_error("Couldn't fill wait set");
if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) {
throw std::runtime_error("Couldn't fill wait set");
}
}
rcl_ret_t status =
rcl_wait(&wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());
Expand Down

0 comments on commit 6e76503

Please sign in to comment.