Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wjwwood] Updated "Data race fixes" #2500

Merged
merged 13 commits into from
Apr 12, 2024
Merged
9 changes: 9 additions & 0 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,15 @@ class Executor
AnyExecutable & any_executable,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// This function triggers a recollect of all entities that are registered to the executor.
/**
* Calling this function is thread safe.
*
* \param[in] notify if true will execute a trigger that will wake up a waiting executor
*/
void
trigger_entity_recollect(bool notify);

/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;

Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/wait_result.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ class WaitResult final

if (this->kind() == WaitResultKind::Ready) {
auto & wait_set = this->get_wait_set();
auto rcl_wait_set = wait_set.get_rcl_wait_set();
auto & rcl_wait_set = wait_set.get_rcl_wait_set();
while (next_waitable_index_ < wait_set.size_of_waitables()) {
auto cur_waitable = wait_set.waitables(next_waitable_index_++);
if (cur_waitable != nullptr && cur_waitable->is_ready(rcl_wait_set)) {
Expand Down
125 changes: 59 additions & 66 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <algorithm>
#include <cassert>
#include <chrono>
#include <iterator>
#include <memory>
Expand Down Expand Up @@ -72,13 +73,10 @@ Executor::Executor(const rclcpp::ExecutorOptions & options)
}
});

notify_waitable_->set_on_ready_callback(
[this](auto, auto) {
this->entities_need_rebuild_.store(true);
});

notify_waitable_->add_guard_condition(interrupt_guard_condition_);
notify_waitable_->add_guard_condition(shutdown_guard_condition_);

wait_set_.add_waitable(notify_waitable_);
}

Executor::~Executor()
Expand Down Expand Up @@ -122,6 +120,20 @@ Executor::~Executor()
}
}

void Executor::trigger_entity_recollect(bool notify)
{
this->entities_need_rebuild_.store(true);

if (!spinning.load() && entities_need_rebuild_.exchange(false)) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
}

if (notify) {
interrupt_guard_condition_->trigger();
}
}

std::vector<rclcpp::CallbackGroup::WeakPtr>
Executor::get_all_callback_groups()
{
Expand Down Expand Up @@ -152,19 +164,12 @@ Executor::add_callback_group(
(void) node_ptr;
this->collector_.add_callback_group(group_ptr);

if (!spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
}

if (notify) {
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group add: ") + ex.what());
}
try {
this->trigger_entity_recollect(notify);
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group add: ") + ex.what());
}
}

Expand All @@ -173,19 +178,12 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
{
this->collector_.add_node(node_ptr);

if (!spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
}

if (notify) {
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on node add: ") + ex.what());
}
try {
this->trigger_entity_recollect(notify);
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on node add: ") + ex.what());
}
}

Expand All @@ -196,18 +194,12 @@ Executor::remove_callback_group(
{
this->collector_.remove_callback_group(group_ptr);

if (!spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
}
if (notify) {
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group remove: ") + ex.what());
}
try {
this->trigger_entity_recollect(notify);
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group remove: ") + ex.what());
}
}

Expand All @@ -222,19 +214,12 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node
{
this->collector_.remove_node(node_ptr);

if (!spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
}

if (notify) {
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on node remove: ") + ex.what());
}
try {
this->trigger_entity_recollect(notify);
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on node remove: ") + ex.what());
}
}

Expand Down Expand Up @@ -379,6 +364,10 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
return;
}

assert(
(void("cannot execute an AnyExecutable without a valid callback group"),
any_exec.callback_group));

if (any_exec.timer) {
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
Expand All @@ -403,9 +392,7 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
}

// Reset the callback_group, regardless of type
if (any_exec.callback_group) {
any_exec.callback_group->can_be_taken_from().store(true);
}
any_exec.callback_group->can_be_taken_from().store(true);
}

template<typename Taker, typename Handler>
Expand Down Expand Up @@ -642,7 +629,6 @@ Executor::collect_entities()
// In the case that an entity already has an expired weak pointer
// before being removed from the waitset, additionally prune the waitset.
this->wait_set_.prune_deleted_entities();
this->entities_need_rebuild_.store(false);
}

void
Expand All @@ -655,7 +641,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)

{
std::lock_guard<std::mutex> guard(mutex_);
if (current_collection_.empty() || this->entities_need_rebuild_.load()) {
if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
this->collect_entities();
}
}
Expand All @@ -664,6 +650,13 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
"empty wait set received in wait(). This should never happen.");
} else {
if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) {
auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set();
if (current_notify_waitable_->is_ready(rcl_wait_set)) {
current_notify_waitable_->execute(current_notify_waitable_->take_data());
}
}
}
}

Expand All @@ -689,7 +682,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
auto entity_iter = current_collection_.timers.find(timer->get_timer_handle().get());
if (entity_iter != current_collection_.timers.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (callback_group && !callback_group->can_be_taken_from()) {
if (!callback_group || !callback_group->can_be_taken_from()) {
current_timer_index++;
continue;
}
Expand Down Expand Up @@ -719,7 +712,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
subscription->get_subscription_handle().get());
if (entity_iter != current_collection_.subscriptions.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (callback_group && !callback_group->can_be_taken_from()) {
if (!callback_group || !callback_group->can_be_taken_from()) {
continue;
}
any_executable.subscription = subscription;
Expand All @@ -735,7 +728,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
auto entity_iter = current_collection_.services.find(service->get_service_handle().get());
if (entity_iter != current_collection_.services.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (callback_group && !callback_group->can_be_taken_from()) {
if (!callback_group || !callback_group->can_be_taken_from()) {
continue;
}
any_executable.service = service;
Expand All @@ -751,7 +744,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
auto entity_iter = current_collection_.clients.find(client->get_client_handle().get());
if (entity_iter != current_collection_.clients.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (callback_group && !callback_group->can_be_taken_from()) {
if (!callback_group || !callback_group->can_be_taken_from()) {
continue;
}
any_executable.client = client;
Expand All @@ -767,7 +760,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
auto entity_iter = current_collection_.waitables.find(waitable.get());
if (entity_iter != current_collection_.waitables.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (callback_group && !callback_group->can_be_taken_from()) {
if (!callback_group || !callback_group->can_be_taken_from()) {
continue;
}
any_executable.waitable = waitable;
Expand Down
10 changes: 5 additions & 5 deletions rclcpp/src/rclcpp/executors/executor_entities_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ ready_executables(
continue;
}
auto group_info = group_cache(entity_iter->second.callback_group);
if (group_info && !group_info->can_be_taken_from().load()) {
if (!group_info || !group_info->can_be_taken_from().load()) {
continue;
}
if (!entity->call()) {
Expand All @@ -176,7 +176,7 @@ ready_executables(
continue;
}
auto group_info = group_cache(entity_iter->second.callback_group);
if (group_info && !group_info->can_be_taken_from().load()) {
if (!group_info || !group_info->can_be_taken_from().load()) {
continue;
}
rclcpp::AnyExecutable exec;
Expand All @@ -196,7 +196,7 @@ ready_executables(
continue;
}
auto group_info = group_cache(entity_iter->second.callback_group);
if (group_info && !group_info->can_be_taken_from().load()) {
if (!group_info || !group_info->can_be_taken_from().load()) {
continue;
}
rclcpp::AnyExecutable exec;
Expand All @@ -216,7 +216,7 @@ ready_executables(
continue;
}
auto group_info = group_cache(entity_iter->second.callback_group);
if (group_info && !group_info->can_be_taken_from().load()) {
if (!group_info || !group_info->can_be_taken_from().load()) {
continue;
}
rclcpp::AnyExecutable exec;
Expand All @@ -236,7 +236,7 @@ ready_executables(
continue;
}
auto group_info = group_cache(entry.callback_group);
if (group_info && !group_info->can_be_taken_from().load()) {
if (!group_info || !group_info->can_be_taken_from().load()) {
continue;
}
rclcpp::AnyExecutable exec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
std::optional<rclcpp::WaitResult<rclcpp::WaitSet>>
StaticSingleThreadedExecutor::collect_and_wait(std::chrono::nanoseconds timeout)
{
if (current_collection_.empty() || this->entities_need_rebuild_.load()) {
if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
this->collect_entities();
}
auto wait_result = wait_set_.wait(std::chrono::nanoseconds(timeout));
Expand All @@ -119,6 +119,13 @@ StaticSingleThreadedExecutor::collect_and_wait(std::chrono::nanoseconds timeout)
"rclcpp",
"empty wait set received in wait(). This should never happen.");
return {};
} else {
if (wait_result.kind() == WaitResultKind::Ready && current_notify_waitable_) {
auto & rcl_wait_set = wait_result.get_wait_set().get_rcl_wait_set();
if (current_notify_waitable_->is_ready(rcl_wait_set)) {
current_notify_waitable_->execute(current_notify_waitable_->take_data());
}
}
}
return wait_result;
}
Expand Down
9 changes: 9 additions & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,15 @@ if(TARGET test_executors)
target_link_libraries(test_executors_timer_cancel_behavior ${PROJECT_NAME} ${rosgraph_msgs_TARGETS})
endif()

ament_add_gtest(
test_executors_callback_group_behavior
executors/test_executors_callback_group_behavior.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}"
TIMEOUT 180)
if(TARGET test_executors)
target_link_libraries(test_executors_callback_group_behavior ${PROJECT_NAME})
endif()

ament_add_gtest(
test_executors_intraprocess
executors/test_executors_intraprocess.cpp
Expand Down
Loading