diff --git a/rclcpp-eloquent/rclcpp_action/src/server.cpp b/rclcpp-eloquent/rclcpp_action/src/server.cpp index df21d34..8c545b1 100644 --- a/rclcpp-eloquent/rclcpp_action/src/server.cpp +++ b/rclcpp-eloquent/rclcpp_action/src/server.cpp @@ -43,8 +43,8 @@ class ServerBaseImpl { } - // Lock everything except user callbacks - std::recursive_mutex reentrant_mutex_; + // Lock for action_server_ + std::recursive_mutex action_server_reentrant_mutex_; std::shared_ptr action_server_; @@ -56,10 +56,13 @@ class ServerBaseImpl size_t num_services_ = 0; size_t num_guard_conditions_ = 0; - bool goal_request_ready_ = false; - bool cancel_request_ready_ = false; - bool result_request_ready_ = false; - bool goal_expired_ = false; + std::atomic goal_request_ready_{false}; + std::atomic cancel_request_ready_{false}; + std::atomic result_request_ready_{false}; + std::atomic goal_expired_{false}; + + // Lock for unordered_maps + std::recursive_mutex unordered_map_mutex_; // Results to be kept until the goal expires after reaching a terminal state std::unordered_map> goal_results_; @@ -88,12 +91,13 @@ ServerBase::ServerBase( if (nullptr != ptr) { rcl_node_t * rcl_node = node_base->get_rcl_node_handle(); rcl_ret_t ret = rcl_action_server_fini(ptr, rcl_node); - (void)ret; - RCLCPP_DEBUG( - rclcpp::get_logger("rclcpp_action"), - "failed to fini rcl_action_server_t in deleter"); + if (RCL_RET_OK != ret) { + RCLCPP_DEBUG( + rclcpp::get_logger("rclcpp_action"), + "failed to fini rcl_action_server_t in deleter"); + } + delete ptr; } - delete ptr; }; pimpl_->action_server_.reset(new rcl_action_server_t, deleter); @@ -159,7 +163,7 @@ ServerBase::get_number_of_ready_guard_conditions() bool ServerBase::add_to_wait_set(rcl_wait_set_t * wait_set) { - std::lock_guard lock(pimpl_->reentrant_mutex_); + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); rcl_ret_t ret = rcl_action_wait_set_add_action_server( wait_set, pimpl_->action_server_.get(), NULL); return RCL_RET_OK == ret; @@ -168,35 +172,47 @@ ServerBase::add_to_wait_set(rcl_wait_set_t * wait_set) bool ServerBase::is_ready(rcl_wait_set_t * wait_set) { - std::lock_guard lock(pimpl_->reentrant_mutex_); - rcl_ret_t ret = rcl_action_server_wait_set_get_entities_ready( - wait_set, - pimpl_->action_server_.get(), - &pimpl_->goal_request_ready_, - &pimpl_->cancel_request_ready_, - &pimpl_->result_request_ready_, - &pimpl_->goal_expired_); + bool goal_request_ready; + bool cancel_request_ready; + bool result_request_ready; + bool goal_expired; + rcl_ret_t ret; + { + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_server_wait_set_get_entities_ready( + wait_set, + pimpl_->action_server_.get(), + &goal_request_ready, + &cancel_request_ready, + &result_request_ready, + &goal_expired); + } + + pimpl_->goal_request_ready_ = goal_request_ready; + pimpl_->cancel_request_ready_ = cancel_request_ready; + pimpl_->result_request_ready_ = result_request_ready; + pimpl_->goal_expired_ = goal_expired; if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } - return pimpl_->goal_request_ready_ || - pimpl_->cancel_request_ready_ || - pimpl_->result_request_ready_ || - pimpl_->goal_expired_; + return pimpl_->goal_request_ready_.load() || + pimpl_->cancel_request_ready_.load() || + pimpl_->result_request_ready_.load() || + pimpl_->goal_expired_.load(); } void ServerBase::execute() { - if (pimpl_->goal_request_ready_) { + if (pimpl_->goal_request_ready_.load()) { execute_goal_request_received(); - } else if (pimpl_->cancel_request_ready_) { + } else if (pimpl_->cancel_request_ready_.load()) { execute_cancel_request_received(); - } else if (pimpl_->result_request_ready_) { + } else if (pimpl_->result_request_ready_.load()) { execute_result_request_received(); - } else if (pimpl_->goal_expired_) { + } else if (pimpl_->goal_expired_.load()) { execute_check_expired_goals(); } else { throw std::runtime_error("Executing action server but nothing is ready"); @@ -210,15 +226,19 @@ ServerBase::execute_goal_request_received() rcl_action_goal_info_t goal_info = rcl_action_get_zero_initialized_goal_info(); rmw_request_id_t request_header; - std::lock_guard lock(pimpl_->reentrant_mutex_); - std::shared_ptr message = create_goal_request(); - ret = rcl_action_take_goal_request( - pimpl_->action_server_.get(), - &request_header, - message.get()); + { + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_take_goal_request( + pimpl_->action_server_.get(), + &request_header, + message.get()); + } - pimpl_->goal_request_ready_ = false; + bool expected = true; + if (!pimpl_->goal_request_ready_.compare_exchange_strong(expected, false)) { + return; + } if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { // Ignore take failure because connext fails if it receives a sample without valid data. @@ -235,10 +255,13 @@ ServerBase::execute_goal_request_received() // Call user's callback, getting the user's response and a ros message to send back auto response_pair = call_handle_goal_callback(uuid, message); - ret = rcl_action_send_goal_response( - pimpl_->action_server_.get(), - &request_header, - response_pair.second.get()); + { + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_send_goal_response( + pimpl_->action_server_.get(), + &request_header, + response_pair.second.get()); + } if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); @@ -254,15 +277,19 @@ ServerBase::execute_goal_request_received() { if (nullptr != ptr) { rcl_ret_t fail_ret = rcl_action_goal_handle_fini(ptr); - (void)fail_ret; - RCLCPP_DEBUG( - rclcpp::get_logger("rclcpp_action"), - "failed to fini rcl_action_goal_handle_t in deleter"); + if (RCL_RET_OK != fail_ret) { + RCLCPP_DEBUG( + rclcpp::get_logger("rclcpp_action"), + "failed to fini rcl_action_goal_handle_t in deleter"); + } delete ptr; } }; rcl_action_goal_handle_t * rcl_handle; - rcl_handle = rcl_action_accept_new_goal(pimpl_->action_server_.get(), &goal_info); + { + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + rcl_handle = rcl_action_accept_new_goal(pimpl_->action_server_.get(), &goal_info); + } if (!rcl_handle) { throw std::runtime_error("Failed to accept new goal\n"); } @@ -271,7 +298,10 @@ ServerBase::execute_goal_request_received() // Copy out goal handle since action server storage disappears when it is fini'd *handle = *rcl_handle; - pimpl_->goal_handles_[uuid] = handle; + { + std::lock_guard lock(pimpl_->unordered_map_mutex_); + pimpl_->goal_handles_[uuid] = handle; + } if (GoalResponse::ACCEPT_AND_EXECUTE == status) { // Change status to executing @@ -297,13 +327,18 @@ ServerBase::execute_cancel_request_received() // Initialize cancel request auto request = std::make_shared(); - std::lock_guard lock(pimpl_->reentrant_mutex_); - ret = rcl_action_take_cancel_request( - pimpl_->action_server_.get(), - &request_header, - request.get()); + { + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_take_cancel_request( + pimpl_->action_server_.get(), + &request_header, + request.get()); + } - pimpl_->cancel_request_ready_ = false; + bool expected = true; + if (!pimpl_->cancel_request_ready_.compare_exchange_strong(expected, false)) { + return; + } if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { // Ignore take failure because connext fails if it receives a sample without valid data. @@ -323,10 +358,13 @@ ServerBase::execute_cancel_request_received() // Get a list of goal info that should be attempted to be cancelled rcl_action_cancel_response_t cancel_response = rcl_action_get_zero_initialized_cancel_response(); - ret = rcl_action_process_cancel_request( - pimpl_->action_server_.get(), - &cancel_request, - &cancel_response); + { + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_process_cancel_request( + pimpl_->action_server_.get(), + &cancel_request, + &cancel_response); + } if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } @@ -369,8 +407,11 @@ ServerBase::execute_cancel_request_received() publish_status(); } - ret = rcl_action_send_cancel_response( - pimpl_->action_server_.get(), &request_header, response.get()); + { + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_send_cancel_response( + pimpl_->action_server_.get(), &request_header, response.get()); + } if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } @@ -383,12 +424,16 @@ ServerBase::execute_result_request_received() // Get the result request message rmw_request_id_t request_header; std::shared_ptr result_request = create_result_request(); - std::lock_guard lock(pimpl_->reentrant_mutex_); - ret = rcl_action_take_result_request( - pimpl_->action_server_.get(), &request_header, result_request.get()); - - pimpl_->result_request_ready_ = false; + { + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_take_result_request( + pimpl_->action_server_.get(), &request_header, result_request.get()); + } + bool expected = true; + if (!pimpl_->result_request_ready_.compare_exchange_strong(expected, false)) { + return; + } if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) { // Ignore take failure because connext fails if it receives a sample without valid data. // This happens when a client shuts down and connext receives a sample saying the client is @@ -406,6 +451,10 @@ ServerBase::execute_result_request_received() convert(uuid, &goal_info); bool goal_exists; goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info); + { + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info); + } if (!goal_exists) { // Goal does not exists result_response = create_result_response(action_msgs::msg::GoalStatus::STATUS_UNKNOWN); @@ -419,6 +468,7 @@ ServerBase::execute_result_request_received() if (result_response) { // Send the result now + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); ret = rcl_action_send_result_response( pimpl_->action_server_.get(), &request_header, result_response.get()); if (RCL_RET_OK != ret) { @@ -426,6 +476,7 @@ ServerBase::execute_result_request_received() } } else { // Store the request so it can be responded to later + std::lock_guard lock(pimpl_->unordered_map_mutex_); pimpl_->result_requests_[uuid].push_back(request_header); } } @@ -439,9 +490,11 @@ ServerBase::execute_check_expired_goals() // Loop in case more than 1 goal expired while (num_expired > 0u) { - std::lock_guard lock(pimpl_->reentrant_mutex_); rcl_ret_t ret; - ret = rcl_action_expire_goals(pimpl_->action_server_.get(), expired_goals, 1, &num_expired); + { + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + ret = rcl_action_expire_goals(pimpl_->action_server_.get(), expired_goals, 1, &num_expired); + } if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); } else if (num_expired) { @@ -449,6 +502,7 @@ ServerBase::execute_check_expired_goals() GoalUUID uuid; convert(expired_goals[0], &uuid); RCLCPP_DEBUG(pimpl_->logger_, "Expired goal %s", to_string(uuid).c_str()); + std::lock_guard lock(pimpl_->unordered_map_mutex_); pimpl_->goal_results_.erase(uuid); pimpl_->result_requests_.erase(uuid); pimpl_->goal_handles_.erase(uuid); @@ -461,6 +515,11 @@ ServerBase::publish_status() { rcl_ret_t ret; + // We need to hold the lock across this entire method because + // rcl_action_server_get_goal_handles() returns an internal pointer to the + // goal data. + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + // Get all goal handles known to C action server rcl_action_goal_handle_t ** goal_handles = NULL; size_t num_goals = 0; @@ -516,20 +575,26 @@ ServerBase::publish_result(const GoalUUID & uuid, std::shared_ptr result_m // Check that the goal exists rcl_action_goal_info_t goal_info; convert(uuid, &goal_info); - std::lock_guard lock(pimpl_->reentrant_mutex_); bool goal_exists; - goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info); + { + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); + goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info); + } if (!goal_exists) { throw std::runtime_error("Asked to publish result for goal that does not exist"); } - pimpl_->goal_results_[uuid] = result_msg; + { + std::lock_guard lock(pimpl_->unordered_map_mutex_); + pimpl_->goal_results_[uuid] = result_msg; + } // if there are clients who already asked for the result, send it to them auto iter = pimpl_->result_requests_.find(uuid); if (iter != pimpl_->result_requests_.end()) { for (auto & request_header : iter->second) { + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); rcl_ret_t ret = rcl_action_send_result_response( pimpl_->action_server_.get(), &request_header, result_msg.get()); if (RCL_RET_OK != ret) { @@ -542,7 +607,7 @@ ServerBase::publish_result(const GoalUUID & uuid, std::shared_ptr result_m void ServerBase::notify_goal_terminal_state() { - std::lock_guard lock(pimpl_->reentrant_mutex_); + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); rcl_ret_t ret = rcl_action_notify_goal_done(pimpl_->action_server_.get()); if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); @@ -552,7 +617,7 @@ ServerBase::notify_goal_terminal_state() void ServerBase::publish_feedback(std::shared_ptr feedback_msg) { - std::lock_guard lock(pimpl_->reentrant_mutex_); + std::lock_guard lock(pimpl_->action_server_reentrant_mutex_); rcl_ret_t ret = rcl_action_publish_feedback(pimpl_->action_server_.get(), feedback_msg.get()); if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to publish feedback");