Skip to content

Commit

Permalink
Fix action server deadlock, refre to
Browse files Browse the repository at this point in the history
  • Loading branch information
ylh committed Jan 29, 2024
1 parent b354068 commit 7aab0ee
Showing 1 changed file with 134 additions and 69 deletions.
203 changes: 134 additions & 69 deletions rclcpp-eloquent/rclcpp_action/src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class ServerBaseImpl
{
}

// Lock everything except user callbacks
std::recursive_mutex reentrant_mutex_;
// Lock for action_server_
std::recursive_mutex action_server_reentrant_mutex_;

std::shared_ptr<rcl_action_server_t> action_server_;

Expand All @@ -56,10 +56,13 @@ class ServerBaseImpl
size_t num_services_ = 0;
size_t num_guard_conditions_ = 0;

bool goal_request_ready_ = false;
bool cancel_request_ready_ = false;
bool result_request_ready_ = false;
bool goal_expired_ = false;
std::atomic<bool> goal_request_ready_{false};
std::atomic<bool> cancel_request_ready_{false};
std::atomic<bool> result_request_ready_{false};
std::atomic<bool> goal_expired_{false};

// Lock for unordered_maps
std::recursive_mutex unordered_map_mutex_;

// Results to be kept until the goal expires after reaching a terminal state
std::unordered_map<GoalUUID, std::shared_ptr<void>> goal_results_;
Expand Down Expand Up @@ -88,12 +91,13 @@ ServerBase::ServerBase(
if (nullptr != ptr) {
rcl_node_t * rcl_node = node_base->get_rcl_node_handle();
rcl_ret_t ret = rcl_action_server_fini(ptr, rcl_node);
(void)ret;
RCLCPP_DEBUG(
rclcpp::get_logger("rclcpp_action"),
"failed to fini rcl_action_server_t in deleter");
if (RCL_RET_OK != ret) {
RCLCPP_DEBUG(
rclcpp::get_logger("rclcpp_action"),
"failed to fini rcl_action_server_t in deleter");
}
delete ptr;
}
delete ptr;
};

pimpl_->action_server_.reset(new rcl_action_server_t, deleter);
Expand Down Expand Up @@ -159,7 +163,7 @@ ServerBase::get_number_of_ready_guard_conditions()
bool
ServerBase::add_to_wait_set(rcl_wait_set_t * wait_set)
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret = rcl_action_wait_set_add_action_server(
wait_set, pimpl_->action_server_.get(), NULL);
return RCL_RET_OK == ret;
Expand All @@ -168,35 +172,47 @@ ServerBase::add_to_wait_set(rcl_wait_set_t * wait_set)
bool
ServerBase::is_ready(rcl_wait_set_t * wait_set)
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
rcl_ret_t ret = rcl_action_server_wait_set_get_entities_ready(
wait_set,
pimpl_->action_server_.get(),
&pimpl_->goal_request_ready_,
&pimpl_->cancel_request_ready_,
&pimpl_->result_request_ready_,
&pimpl_->goal_expired_);
bool goal_request_ready;
bool cancel_request_ready;
bool result_request_ready;
bool goal_expired;
rcl_ret_t ret;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_server_wait_set_get_entities_ready(
wait_set,
pimpl_->action_server_.get(),
&goal_request_ready,
&cancel_request_ready,
&result_request_ready,
&goal_expired);
}

pimpl_->goal_request_ready_ = goal_request_ready;
pimpl_->cancel_request_ready_ = cancel_request_ready;
pimpl_->result_request_ready_ = result_request_ready;
pimpl_->goal_expired_ = goal_expired;

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}

return pimpl_->goal_request_ready_ ||
pimpl_->cancel_request_ready_ ||
pimpl_->result_request_ready_ ||
pimpl_->goal_expired_;
return pimpl_->goal_request_ready_.load() ||
pimpl_->cancel_request_ready_.load() ||
pimpl_->result_request_ready_.load() ||
pimpl_->goal_expired_.load();
}

void
ServerBase::execute()
{
if (pimpl_->goal_request_ready_) {
if (pimpl_->goal_request_ready_.load()) {
execute_goal_request_received();
} else if (pimpl_->cancel_request_ready_) {
} else if (pimpl_->cancel_request_ready_.load()) {
execute_cancel_request_received();
} else if (pimpl_->result_request_ready_) {
} else if (pimpl_->result_request_ready_.load()) {
execute_result_request_received();
} else if (pimpl_->goal_expired_) {
} else if (pimpl_->goal_expired_.load()) {
execute_check_expired_goals();
} else {
throw std::runtime_error("Executing action server but nothing is ready");
Expand All @@ -210,15 +226,19 @@ ServerBase::execute_goal_request_received()
rcl_action_goal_info_t goal_info = rcl_action_get_zero_initialized_goal_info();
rmw_request_id_t request_header;

std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);

std::shared_ptr<void> message = create_goal_request();
ret = rcl_action_take_goal_request(
pimpl_->action_server_.get(),
&request_header,
message.get());
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_take_goal_request(
pimpl_->action_server_.get(),
&request_header,
message.get());
}

pimpl_->goal_request_ready_ = false;
bool expected = true;
if (!pimpl_->goal_request_ready_.compare_exchange_strong(expected, false)) {
return;
}

if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data.
Expand All @@ -235,10 +255,13 @@ ServerBase::execute_goal_request_received()
// Call user's callback, getting the user's response and a ros message to send back
auto response_pair = call_handle_goal_callback(uuid, message);

ret = rcl_action_send_goal_response(
pimpl_->action_server_.get(),
&request_header,
response_pair.second.get());
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_send_goal_response(
pimpl_->action_server_.get(),
&request_header,
response_pair.second.get());
}

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
Expand All @@ -254,15 +277,19 @@ ServerBase::execute_goal_request_received()
{
if (nullptr != ptr) {
rcl_ret_t fail_ret = rcl_action_goal_handle_fini(ptr);
(void)fail_ret;
RCLCPP_DEBUG(
rclcpp::get_logger("rclcpp_action"),
"failed to fini rcl_action_goal_handle_t in deleter");
if (RCL_RET_OK != fail_ret) {
RCLCPP_DEBUG(
rclcpp::get_logger("rclcpp_action"),
"failed to fini rcl_action_goal_handle_t in deleter");
}
delete ptr;
}
};
rcl_action_goal_handle_t * rcl_handle;
rcl_handle = rcl_action_accept_new_goal(pimpl_->action_server_.get(), &goal_info);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_handle = rcl_action_accept_new_goal(pimpl_->action_server_.get(), &goal_info);
}
if (!rcl_handle) {
throw std::runtime_error("Failed to accept new goal\n");
}
Expand All @@ -271,7 +298,10 @@ ServerBase::execute_goal_request_received()
// Copy out goal handle since action server storage disappears when it is fini'd
*handle = *rcl_handle;

pimpl_->goal_handles_[uuid] = handle;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->goal_handles_[uuid] = handle;
}

if (GoalResponse::ACCEPT_AND_EXECUTE == status) {
// Change status to executing
Expand All @@ -297,13 +327,18 @@ ServerBase::execute_cancel_request_received()
// Initialize cancel request
auto request = std::make_shared<action_msgs::srv::CancelGoal::Request>();

std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
ret = rcl_action_take_cancel_request(
pimpl_->action_server_.get(),
&request_header,
request.get());
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_take_cancel_request(
pimpl_->action_server_.get(),
&request_header,
request.get());
}

pimpl_->cancel_request_ready_ = false;
bool expected = true;
if (!pimpl_->cancel_request_ready_.compare_exchange_strong(expected, false)) {
return;
}

if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data.
Expand All @@ -323,10 +358,13 @@ ServerBase::execute_cancel_request_received()
// Get a list of goal info that should be attempted to be cancelled
rcl_action_cancel_response_t cancel_response = rcl_action_get_zero_initialized_cancel_response();

ret = rcl_action_process_cancel_request(
pimpl_->action_server_.get(),
&cancel_request,
&cancel_response);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_process_cancel_request(
pimpl_->action_server_.get(),
&cancel_request,
&cancel_response);
}
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
Expand Down Expand Up @@ -369,8 +407,11 @@ ServerBase::execute_cancel_request_received()
publish_status();
}

ret = rcl_action_send_cancel_response(
pimpl_->action_server_.get(), &request_header, response.get());
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_send_cancel_response(
pimpl_->action_server_.get(), &request_header, response.get());
}
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
Expand All @@ -383,12 +424,16 @@ ServerBase::execute_result_request_received()
// Get the result request message
rmw_request_id_t request_header;
std::shared_ptr<void> result_request = create_result_request();
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
ret = rcl_action_take_result_request(
pimpl_->action_server_.get(), &request_header, result_request.get());

pimpl_->result_request_ready_ = false;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_take_result_request(
pimpl_->action_server_.get(), &request_header, result_request.get());
}

bool expected = true;
if (!pimpl_->result_request_ready_.compare_exchange_strong(expected, false)) {
return;
}
if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data.
// This happens when a client shuts down and connext receives a sample saying the client is
Expand All @@ -406,6 +451,10 @@ ServerBase::execute_result_request_received()
convert(uuid, &goal_info);
bool goal_exists;
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
}
if (!goal_exists) {
// Goal does not exists
result_response = create_result_response(action_msgs::msg::GoalStatus::STATUS_UNKNOWN);
Expand All @@ -419,13 +468,15 @@ ServerBase::execute_result_request_received()

if (result_response) {
// Send the result now
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_send_result_response(
pimpl_->action_server_.get(), &request_header, result_response.get());
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
} else {
// Store the request so it can be responded to later
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->result_requests_[uuid].push_back(request_header);
}
}
Expand All @@ -439,16 +490,19 @@ ServerBase::execute_check_expired_goals()

// Loop in case more than 1 goal expired
while (num_expired > 0u) {
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
rcl_ret_t ret;
ret = rcl_action_expire_goals(pimpl_->action_server_.get(), expired_goals, 1, &num_expired);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_expire_goals(pimpl_->action_server_.get(), expired_goals, 1, &num_expired);
}
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
} else if (num_expired) {
// A goal expired!
GoalUUID uuid;
convert(expired_goals[0], &uuid);
RCLCPP_DEBUG(pimpl_->logger_, "Expired goal %s", to_string(uuid).c_str());
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->goal_results_.erase(uuid);
pimpl_->result_requests_.erase(uuid);
pimpl_->goal_handles_.erase(uuid);
Expand All @@ -461,6 +515,11 @@ ServerBase::publish_status()
{
rcl_ret_t ret;

// We need to hold the lock across this entire method because
// rcl_action_server_get_goal_handles() returns an internal pointer to the
// goal data.
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);

// Get all goal handles known to C action server
rcl_action_goal_handle_t ** goal_handles = NULL;
size_t num_goals = 0;
Expand Down Expand Up @@ -516,20 +575,26 @@ ServerBase::publish_result(const GoalUUID & uuid, std::shared_ptr<void> result_m
// Check that the goal exists
rcl_action_goal_info_t goal_info;
convert(uuid, &goal_info);
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
bool goal_exists;
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
}

if (!goal_exists) {
throw std::runtime_error("Asked to publish result for goal that does not exist");
}

pimpl_->goal_results_[uuid] = result_msg;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->goal_results_[uuid] = result_msg;
}

// if there are clients who already asked for the result, send it to them
auto iter = pimpl_->result_requests_.find(uuid);
if (iter != pimpl_->result_requests_.end()) {
for (auto & request_header : iter->second) {
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret = rcl_action_send_result_response(
pimpl_->action_server_.get(), &request_header, result_msg.get());
if (RCL_RET_OK != ret) {
Expand All @@ -542,7 +607,7 @@ ServerBase::publish_result(const GoalUUID & uuid, std::shared_ptr<void> result_m
void
ServerBase::notify_goal_terminal_state()
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret = rcl_action_notify_goal_done(pimpl_->action_server_.get());
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
Expand All @@ -552,7 +617,7 @@ ServerBase::notify_goal_terminal_state()
void
ServerBase::publish_feedback(std::shared_ptr<void> feedback_msg)
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret = rcl_action_publish_feedback(pimpl_->action_server_.get(), feedback_msg.get());
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to publish feedback");
Expand Down

0 comments on commit 7aab0ee

Please sign in to comment.