Skip to content

Commit

Permalink
fix(rclcpp_action): Fixed race condition in Client
Browse files Browse the repository at this point in the history
Some background information: is_ready and take_data are guaranteed to be called in sequence without interruption from another thread. while execute is running, another thread may also call is_ready.

The problem was, that is_goal_response_ready, is_result_response_ready, is_cancel_response_ready, is_feedback_ready and is_status_ready were accessed and written from is_ready and execute.

This commit fixed this by only using the mentioned variables in is_ready and take_data. execute now only accesses the given pointer and works on this.

Signed-off-by: Janosch Machowinski <J.Machowinski@cellumation.com>
  • Loading branch information
Janosch Machowinski authored and Janosch Machowinski committed Jul 31, 2023
1 parent 147238c commit 91fb1dd
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 71 deletions.
191 changes: 120 additions & 71 deletions rclcpp_action/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,62 @@ class ClientBaseImpl
std::default_random_engine, 8, unsigned int> random_bytes_generator;
};

struct ClientBaseData
{
struct FeedbackReadyData
{
FeedbackReadyData(rcl_ret_t retIn, std::shared_ptr<void> msg)
: ret(retIn), feedback_message(msg) {}
rcl_ret_t ret;
std::shared_ptr<void> feedback_message;
};
struct StatusReadyData
{
StatusReadyData(rcl_ret_t retIn, std::shared_ptr<void> msg)
: ret(retIn), status_message(msg) {}
rcl_ret_t ret;
std::shared_ptr<void> status_message;
};
struct GoalResponseData
{
GoalResponseData(rcl_ret_t retIn, rmw_request_id_t header, std::shared_ptr<void> response)
: ret(retIn), response_header(header), goal_response(response) {}
rcl_ret_t ret;
rmw_request_id_t response_header;
std::shared_ptr<void> goal_response;
};
struct CancelResponseData
{
CancelResponseData(rcl_ret_t retIn, rmw_request_id_t header, std::shared_ptr<void> response)
: ret(retIn), response_header(header), cancel_response(response) {}
rcl_ret_t ret;
rmw_request_id_t response_header;
std::shared_ptr<void> cancel_response;
};
struct ResultResponseData
{
ResultResponseData(rcl_ret_t retIn, rmw_request_id_t header, std::shared_ptr<void> response)
: ret(retIn), response_header(header), result_response(response) {}
rcl_ret_t ret;
rmw_request_id_t response_header;
std::shared_ptr<void> result_response;
};

std::variant<FeedbackReadyData, StatusReadyData, GoalResponseData, CancelResponseData,
ResultResponseData> data;

explicit ClientBaseData(FeedbackReadyData && dataIn)
: data(std::move(dataIn)) {}
explicit ClientBaseData(StatusReadyData && dataIn)
: data(std::move(dataIn)) {}
explicit ClientBaseData(GoalResponseData && dataIn)
: data(std::move(dataIn)) {}
explicit ClientBaseData(CancelResponseData && dataIn)
: data(std::move(dataIn)) {}
explicit ClientBaseData(ResultResponseData && dataIn)
: data(std::move(dataIn)) {}
};

ClientBase::ClientBase(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base,
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
Expand Down Expand Up @@ -551,43 +607,53 @@ std::shared_ptr<void>
ClientBase::take_data()
{
if (pimpl_->is_feedback_ready) {
pimpl_->is_feedback_ready = false;
std::shared_ptr<void> feedback_message = this->create_feedback_message();
rcl_ret_t ret = rcl_action_take_feedback(
pimpl_->client_handle.get(), feedback_message.get());
return std::static_pointer_cast<void>(
std::make_shared<std::tuple<rcl_ret_t, std::shared_ptr<void>>>(
ret, feedback_message));
std::make_shared<ClientBaseData>(
ClientBaseData::FeedbackReadyData(
ret, feedback_message)));
} else if (pimpl_->is_status_ready) {
pimpl_->is_status_ready = false;
std::shared_ptr<void> status_message = this->create_status_message();
rcl_ret_t ret = rcl_action_take_status(
pimpl_->client_handle.get(), status_message.get());
return std::static_pointer_cast<void>(
std::make_shared<std::tuple<rcl_ret_t, std::shared_ptr<void>>>(
ret, status_message));
std::make_shared<ClientBaseData>(
ClientBaseData::StatusReadyData(
ret, status_message)));
} else if (pimpl_->is_goal_response_ready) {
pimpl_->is_goal_response_ready = false;
rmw_request_id_t response_header;
std::shared_ptr<void> goal_response = this->create_goal_response();
rcl_ret_t ret = rcl_action_take_goal_response(
pimpl_->client_handle.get(), &response_header, goal_response.get());
return std::static_pointer_cast<void>(
std::make_shared<std::tuple<rcl_ret_t, rmw_request_id_t, std::shared_ptr<void>>>(
ret, response_header, goal_response));
std::make_shared<ClientBaseData>(
ClientBaseData::GoalResponseData(
ret, response_header, goal_response)));
} else if (pimpl_->is_result_response_ready) {
pimpl_->is_result_response_ready = false;
rmw_request_id_t response_header;
std::shared_ptr<void> result_response = this->create_result_response();
rcl_ret_t ret = rcl_action_take_result_response(
pimpl_->client_handle.get(), &response_header, result_response.get());
return std::static_pointer_cast<void>(
std::make_shared<std::tuple<rcl_ret_t, rmw_request_id_t, std::shared_ptr<void>>>(
ret, response_header, result_response));
std::make_shared<ClientBaseData>(
ClientBaseData::ResultResponseData(
ret, response_header, result_response)));
} else if (pimpl_->is_cancel_response_ready) {
pimpl_->is_cancel_response_ready = false;
rmw_request_id_t response_header;
std::shared_ptr<void> cancel_response = this->create_cancel_response();
rcl_ret_t ret = rcl_action_take_cancel_response(
pimpl_->client_handle.get(), &response_header, cancel_response.get());
return std::static_pointer_cast<void>(
std::make_shared<std::tuple<rcl_ret_t, rmw_request_id_t, std::shared_ptr<void>>>(
ret, response_header, cancel_response));
std::make_shared<ClientBaseData>(
ClientBaseData::CancelResponseData(
ret, response_header, cancel_response)));
} else {
throw std::runtime_error("Taking data from action client but nothing is ready");
}
Expand Down Expand Up @@ -619,71 +685,54 @@ ClientBase::take_data_by_entity_id(size_t id)
}

void
ClientBase::execute(std::shared_ptr<void> & data)
ClientBase::execute(std::shared_ptr<void> & dataIn)
{
if (!data) {
if (!dataIn) {
throw std::runtime_error("'data' is empty");
}

if (pimpl_->is_feedback_ready) {
auto shared_ptr = std::static_pointer_cast<std::tuple<rcl_ret_t, std::shared_ptr<void>>>(data);
auto ret = std::get<0>(*shared_ptr);
pimpl_->is_feedback_ready = false;
if (RCL_RET_OK == ret) {
auto feedback_message = std::get<1>(*shared_ptr);
this->handle_feedback_message(feedback_message);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "error taking feedback");
}
} else if (pimpl_->is_status_ready) {
auto shared_ptr = std::static_pointer_cast<std::tuple<rcl_ret_t, std::shared_ptr<void>>>(data);
auto ret = std::get<0>(*shared_ptr);
pimpl_->is_status_ready = false;
if (RCL_RET_OK == ret) {
auto status_message = std::get<1>(*shared_ptr);
this->handle_status_message(status_message);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "error taking status");
}
} else if (pimpl_->is_goal_response_ready) {
auto shared_ptr = std::static_pointer_cast<
std::tuple<rcl_ret_t, rmw_request_id_t, std::shared_ptr<void>>>(data);
auto ret = std::get<0>(*shared_ptr);
pimpl_->is_goal_response_ready = false;
if (RCL_RET_OK == ret) {
auto response_header = std::get<1>(*shared_ptr);
auto goal_response = std::get<2>(*shared_ptr);
this->handle_goal_response(response_header, goal_response);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "error taking goal response");
}
} else if (pimpl_->is_result_response_ready) {
auto shared_ptr = std::static_pointer_cast<
std::tuple<rcl_ret_t, rmw_request_id_t, std::shared_ptr<void>>>(data);
auto ret = std::get<0>(*shared_ptr);
pimpl_->is_result_response_ready = false;
if (RCL_RET_OK == ret) {
auto response_header = std::get<1>(*shared_ptr);
auto result_response = std::get<2>(*shared_ptr);
this->handle_result_response(response_header, result_response);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "error taking result response");
}
} else if (pimpl_->is_cancel_response_ready) {
auto shared_ptr = std::static_pointer_cast<
std::tuple<rcl_ret_t, rmw_request_id_t, std::shared_ptr<void>>>(data);
auto ret = std::get<0>(*shared_ptr);
pimpl_->is_cancel_response_ready = false;
if (RCL_RET_OK == ret) {
auto response_header = std::get<1>(*shared_ptr);
auto cancel_response = std::get<2>(*shared_ptr);
this->handle_cancel_response(response_header, cancel_response);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "error taking cancel response");
}
} else {
throw std::runtime_error("Executing action client but nothing is ready");
}
std::shared_ptr<ClientBaseData> dataPtr = std::static_pointer_cast<ClientBaseData>(dataIn);


std::visit(
[&](auto && data) -> void {
using T = std::decay_t<decltype(data)>;
if constexpr (std::is_same_v<T, ClientBaseData::FeedbackReadyData>) {
if (RCL_RET_OK == data.ret) {
this->handle_feedback_message(data.feedback_message);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) {
rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking feedback");
}
}
if constexpr (std::is_same_v<T, ClientBaseData::StatusReadyData>) {
if (RCL_RET_OK == data.ret) {
this->handle_status_message(data.status_message);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) {
rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking status");
}
}
if constexpr (std::is_same_v<T, ClientBaseData::GoalResponseData>) {
if (RCL_RET_OK == data.ret) {
this->handle_goal_response(data.response_header, data.goal_response);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) {
rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking goal response");
}
}
if constexpr (std::is_same_v<T, ClientBaseData::ResultResponseData>) {
if (RCL_RET_OK == data.ret) {
this->handle_result_response(data.response_header, data.result_response);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) {
rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking result response");
}
}
if constexpr (std::is_same_v<T, ClientBaseData::CancelResponseData>) {
if (RCL_RET_OK == data.ret) {
this->handle_cancel_response(data.response_header, data.cancel_response);
} else if (RCL_RET_ACTION_CLIENT_TAKE_FAILED != data.ret) {
rclcpp::exceptions::throw_from_rcl_error(data.ret, "error taking cancel response");
}
}
}, dataPtr->data);
}

} // namespace rclcpp_action
4 changes: 4 additions & 0 deletions rclcpp_action/src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ ServerBase::take_data_by_entity_id(size_t id)
void
ServerBase::execute(std::shared_ptr<void> & dataIn)
{
if (!dataIn) {
throw std::runtime_error("ServerBase::execute: give data pointer was null");
}

std::shared_ptr<ServerBaseData> dataPtr = std::static_pointer_cast<ServerBaseData>(dataIn);

std::visit(
Expand Down

0 comments on commit 91fb1dd

Please sign in to comment.