Skip to content

Commit

Permalink
fix: Fixed race condition in action server between is_ready and take_…
Browse files Browse the repository at this point in the history
…data and execute

Some background information: is_ready and take_data are guaranteed to be
called in sequence without interruption from another thread.
while execute is running, another thread may also call is_ready.

The problem was, that goal_request_ready_, cancel_request_ready_, result_request_ready_
and goal_expired_ were accessed and written from is_ready and execute.

This commit fixed this by only using the mentioned variables in is_ready and take_data.
execute now only accesses the given pointer and works on this.

Signed-off-by: Janosch Machowinski <J.Machowinski@cellumation.com>
  • Loading branch information
Janosch Machowinski authored and Janosch Machowinski committed Jul 31, 2023
1 parent 22a954e commit 147238c
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 68 deletions.
13 changes: 10 additions & 3 deletions rclcpp_action/include/rclcpp_action/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <unordered_map>
#include <utility>

#include "action_msgs/srv/cancel_goal.hpp"
#include "rcl/event_callback.h"
#include "rcl_action/action_server.h"
#include "rosidl_runtime_c/action_type_support_struct.h"
Expand Down Expand Up @@ -279,19 +280,25 @@ class ServerBase : public rclcpp::Waitable
/// \internal
RCLCPP_ACTION_PUBLIC
void
execute_goal_request_received(std::shared_ptr<void> & data);
execute_goal_request_received(
rcl_ret_t ret, rcl_action_goal_info_t goal_info, rmw_request_id_t request_header,
std::shared_ptr<void> message);

/// Handle a request to cancel goals on the server
/// \internal
RCLCPP_ACTION_PUBLIC
void
execute_cancel_request_received(std::shared_ptr<void> & data);
execute_cancel_request_received(
rcl_ret_t ret, std::shared_ptr<action_msgs::srv::CancelGoal::Request> request,
rmw_request_id_t request_header);

/// Handle a request to get the result of an action
/// \internal
RCLCPP_ACTION_PUBLIC
void
execute_result_request_received(std::shared_ptr<void> & data);
execute_result_request_received(
rcl_ret_t ret, std::shared_ptr<void> result_request,
rmw_request_id_t request_header);

/// Handle a timeout indicating a completed goal should be forgotten by the server
/// \internal
Expand Down
173 changes: 108 additions & 65 deletions rclcpp_action/src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "rcpputils/scope_exit.hpp"

#include "action_msgs/msg/goal_status_array.hpp"
#include "action_msgs/srv/cancel_goal.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp_action/server.hpp"

Expand Down Expand Up @@ -65,6 +64,11 @@ class ServerBaseImpl
std::atomic<bool> result_request_ready_{false};
std::atomic<bool> goal_expired_{false};

// variable used to check, that the sequence of
// is_ready and take_data is never interrupted by
// another thread.
std::atomic<bool> threadExlusive{false};

// Lock for unordered_maps
std::recursive_mutex unordered_map_mutex_;

Expand All @@ -78,6 +82,30 @@ class ServerBaseImpl
rclcpp::Logger logger_;
};
} // namespace rclcpp_action
struct ServerBaseData
{
using GoalRequestData = std::tuple<rcl_ret_t, const rcl_action_goal_info_t, rmw_request_id_t,
std::shared_ptr<void>>;

using CancelRequestData = std::tuple<rcl_ret_t,
std::shared_ptr<action_msgs::srv::CancelGoal::Request>,
rmw_request_id_t>;

using ResultRequestData = std::tuple<rcl_ret_t, std::shared_ptr<void>, rmw_request_id_t>;

using GoalExpiredData = struct Empty {};

std::variant<GoalRequestData, CancelRequestData, ResultRequestData, GoalExpiredData> data;

explicit ServerBaseData(GoalRequestData && dataIn)
: data(std::move(dataIn)) {}
explicit ServerBaseData(CancelRequestData && dataIn)
: data(std::move(dataIn)) {}
explicit ServerBaseData(ResultRequestData && dataIn)
: data(std::move(dataIn)) {}
explicit ServerBaseData(GoalExpiredData && dataIn)
: data(std::move(dataIn)) {}
};

ServerBase::ServerBase(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
Expand Down Expand Up @@ -178,6 +206,11 @@ ServerBase::add_to_wait_set(rcl_wait_set_t * wait_set)
bool
ServerBase::is_ready(rcl_wait_set_t * wait_set)
{
if (pimpl_->threadExlusive.exchange(true)) {
throw std::runtime_error(
"ServerBase::Internal error, is_ready called, before take_data was called");
}

bool goal_request_ready;
bool cancel_request_ready;
bool result_request_ready;
Expand All @@ -203,16 +236,31 @@ ServerBase::is_ready(rcl_wait_set_t * wait_set)
rclcpp::exceptions::throw_from_rcl_error(ret);
}

return pimpl_->goal_request_ready_.load() ||
pimpl_->cancel_request_ready_.load() ||
pimpl_->result_request_ready_.load() ||
pimpl_->goal_expired_.load();

bool isReady = pimpl_->goal_request_ready_.load() ||
pimpl_->cancel_request_ready_.load() ||
pimpl_->result_request_ready_.load() ||
pimpl_->goal_expired_.load();

if (!isReady) {
if (!pimpl_->threadExlusive.exchange(false)) {
throw std::runtime_error("ServerBase::Internal error, is_ready reentrant called");
}
}
return isReady;
}

std::shared_ptr<void>
ServerBase::take_data()
{
if (pimpl_->goal_request_ready_.load()) {
auto checkCallSequence = [this]() {
if (!pimpl_->threadExlusive.exchange(false)) {
throw std::runtime_error(
"ServerBase::Internal error, take_data called, before is_ready was called");
}
};

if (pimpl_->goal_request_ready_.exchange(false)) {
rcl_ret_t ret;
rcl_action_goal_info_t goal_info = rcl_action_get_zero_initialized_goal_info();
rmw_request_id_t request_header;
Expand All @@ -225,13 +273,12 @@ ServerBase::take_data()
&request_header,
message.get());

checkCallSequence();
return std::static_pointer_cast<void>(
std::make_shared
<std::tuple<rcl_ret_t, rcl_action_goal_info_t, rmw_request_id_t, std::shared_ptr<void>>>(
ret,
goal_info,
request_header, message));
} else if (pimpl_->cancel_request_ready_.load()) {
std::make_shared<ServerBaseData>(
ServerBaseData::GoalRequestData(
ret, goal_info, request_header, message)));
} else if (pimpl_->cancel_request_ready_.exchange(false)) {
rcl_ret_t ret;
rmw_request_id_t request_header;

Expand All @@ -244,11 +291,13 @@ ServerBase::take_data()
&request_header,
request.get());

checkCallSequence();
return std::static_pointer_cast<void>(
std::make_shared
<std::tuple<rcl_ret_t, std::shared_ptr<action_msgs::srv::CancelGoal::Request>,
rmw_request_id_t>>(ret, request, request_header));
} else if (pimpl_->result_request_ready_.load()) {
std::make_shared<ServerBaseData>(
ServerBaseData::CancelRequestData(
ret, request,
request_header)));
} else if (pimpl_->result_request_ready_.exchange(false)) {
rcl_ret_t ret;
// Get the result request message
rmw_request_id_t request_header;
Expand All @@ -257,11 +306,15 @@ ServerBase::take_data()
ret = rcl_action_take_result_request(
pimpl_->action_server_.get(), &request_header, result_request.get());

checkCallSequence();
return std::static_pointer_cast<void>(
std::make_shared<std::tuple<rcl_ret_t, std::shared_ptr<void>, rmw_request_id_t>>(
ret, result_request, request_header));
std::make_shared<ServerBaseData>(
ServerBaseData::ResultRequestData(
ret, result_request, request_header)));
} else if (pimpl_->goal_expired_.load()) {
return nullptr;
checkCallSequence();
return std::static_pointer_cast<void>(
std::make_shared<ServerBaseData>(ServerBaseData::GoalExpiredData()));
} else {
throw std::runtime_error("Taking data from action server but nothing is ready");
}
Expand All @@ -270,6 +323,11 @@ ServerBase::take_data()
std::shared_ptr<void>
ServerBase::take_data_by_entity_id(size_t id)
{
if (pimpl_->threadExlusive.exchange(true)) {
throw std::runtime_error(
"ServerBase::Internal error, is_ready called, before take_data was called");
}

// Mark as ready the entity from which we want to take data
switch (static_cast<EntityType>(id)) {
case EntityType::GoalService:
Expand All @@ -287,31 +345,36 @@ ServerBase::take_data_by_entity_id(size_t id)
}

void
ServerBase::execute(std::shared_ptr<void> & data)
ServerBase::execute(std::shared_ptr<void> & dataIn)
{
if (!data && !pimpl_->goal_expired_.load()) {
throw std::runtime_error("'data' is empty");
}

if (pimpl_->goal_request_ready_.load()) {
execute_goal_request_received(data);
} else if (pimpl_->cancel_request_ready_.load()) {
execute_cancel_request_received(data);
} else if (pimpl_->result_request_ready_.load()) {
execute_result_request_received(data);
} else if (pimpl_->goal_expired_.load()) {
execute_check_expired_goals();
} else {
throw std::runtime_error("Executing action server but nothing is ready");
}
std::shared_ptr<ServerBaseData> dataPtr = std::static_pointer_cast<ServerBaseData>(dataIn);

std::visit(
[&](auto && data) -> void {
using T = std::decay_t<decltype(data)>;
if constexpr (std::is_same_v<T, ServerBaseData::GoalRequestData>) {
execute_goal_request_received(
std::get<0>(data), std::get<1>(data), std::get<2>(data),
std::get<3>(data));
}
if constexpr (std::is_same_v<T, ServerBaseData::CancelRequestData>) {
execute_cancel_request_received(std::get<0>(data), std::get<1>(data), std::get<2>(data));
}
if constexpr (std::is_same_v<T, ServerBaseData::ResultRequestData>) {
execute_result_request_received(std::get<0>(data), std::get<1>(data), std::get<2>(data));
}
if constexpr (std::is_same_v<T, ServerBaseData::GoalExpiredData>) {
execute_check_expired_goals();
}
},
dataPtr->data);
}

void
ServerBase::execute_goal_request_received(std::shared_ptr<void> & data)
ServerBase::execute_goal_request_received(
rcl_ret_t ret, rcl_action_goal_info_t goal_info, rmw_request_id_t request_header,
const std::shared_ptr<void> message)
{
auto shared_ptr = std::static_pointer_cast
<std::tuple<rcl_ret_t, rcl_action_goal_info_t, rmw_request_id_t, std::shared_ptr<void>>>(data);
rcl_ret_t ret = std::get<0>(*shared_ptr);
if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data.
// This happens when a client shuts down and connext receives a sample saying the client is
Expand All @@ -320,14 +383,6 @@ ServerBase::execute_goal_request_received(std::shared_ptr<void> & data)
} else if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
rcl_action_goal_info_t goal_info = std::get<1>(*shared_ptr);
rmw_request_id_t request_header = std::get<2>(*shared_ptr);
std::shared_ptr<void> message = std::get<3>(*shared_ptr);

bool expected = true;
if (!pimpl_->goal_request_ready_.compare_exchange_strong(expected, false)) {
return;
}

GoalUUID uuid = get_goal_id_from_goal_request(message.get());
convert(uuid, &goal_info);
Expand Down Expand Up @@ -396,16 +451,13 @@ ServerBase::execute_goal_request_received(std::shared_ptr<void> & data)
// Tell user to start executing action
call_goal_accepted_callback(handle, uuid, message);
}
data.reset();
}

void
ServerBase::execute_cancel_request_received(std::shared_ptr<void> & data)
ServerBase::execute_cancel_request_received(
rcl_ret_t ret, std::shared_ptr<action_msgs::srv::CancelGoal::Request> request,
rmw_request_id_t request_header)
{
auto shared_ptr = std::static_pointer_cast
<std::tuple<rcl_ret_t, std::shared_ptr<action_msgs::srv::CancelGoal::Request>,
rmw_request_id_t>>(data);
auto ret = std::get<0>(*shared_ptr);
if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data.
// This happens when a client shuts down and connext receives a sample saying the client is
Expand All @@ -414,9 +466,6 @@ ServerBase::execute_cancel_request_received(std::shared_ptr<void> & data)
} else if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
auto request = std::get<1>(*shared_ptr);
auto request_header = std::get<2>(*shared_ptr);
pimpl_->cancel_request_ready_ = false;

// Convert c++ message to C message
rcl_action_cancel_request_t cancel_request = rcl_action_get_zero_initialized_cancel_request();
Expand Down Expand Up @@ -486,15 +535,13 @@ ServerBase::execute_cancel_request_received(std::shared_ptr<void> & data)
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
data.reset();
}

void
ServerBase::execute_result_request_received(std::shared_ptr<void> & data)
ServerBase::execute_result_request_received(
rcl_ret_t ret, std::shared_ptr<void> result_request,
rmw_request_id_t request_header)
{
auto shared_ptr = std::static_pointer_cast
<std::tuple<rcl_ret_t, std::shared_ptr<void>, rmw_request_id_t>>(data);
auto ret = std::get<0>(*shared_ptr);
if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data.
// This happens when a client shuts down and connext receives a sample saying the client is
Expand All @@ -503,10 +550,7 @@ ServerBase::execute_result_request_received(std::shared_ptr<void> & data)
} else if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
auto result_request = std::get<1>(*shared_ptr);
auto request_header = std::get<2>(*shared_ptr);

pimpl_->result_request_ready_ = false;
std::shared_ptr<void> result_response;

// check if the goal exists
Expand Down Expand Up @@ -542,7 +586,6 @@ ServerBase::execute_result_request_received(std::shared_ptr<void> & data)
rclcpp::exceptions::throw_from_rcl_error(rcl_ret);
}
}
data.reset();
}

void
Expand Down
3 changes: 3 additions & 0 deletions rclcpp_action/test/test_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,9 @@ TEST_F(TestGoalRequestServer, is_ready_rcl_error) {
EXPECT_NO_THROW(action_server_->add_to_wait_set(&wait_set));

EXPECT_TRUE(action_server_->is_ready(&wait_set));

EXPECT_NO_THROW(action_server_->take_data());

auto mock = mocking_utils::patch_and_return(
"lib:rclcpp_action", rcl_action_server_wait_set_get_entities_ready, RCL_RET_ERROR);
EXPECT_THROW(action_server_->is_ready(&wait_set), rclcpp::exceptions::RCLError);
Expand Down

0 comments on commit 147238c

Please sign in to comment.