Skip to content

Commit

Permalink
Fix Callback execution in MGI (backport #1305) (#1461)
Browse files Browse the repository at this point in the history
The initial implementation with the private node allowed for concurrent spinning of the same node, producing runtime exceptions. This change removes the need for a private node by letting MGI manage its own CallbackGroup and Executor thread.

(cherry picked from commit 042186a)

Co-authored-by: Henning Kayser <henningkayser@picknik.ai>
  • Loading branch information
mergify[bot] and henningkayser committed Aug 10, 2022
1 parent bb82b64 commit a3955c5
Showing 1 changed file with 39 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,13 @@ class MoveGroupInterface::MoveGroupInterfaceImpl
const std::shared_ptr<tf2_ros::Buffer>& tf_buffer, const rclcpp::Duration& wait_for_servers)
: opt_(opt), node_(node), tf_buffer_(tf_buffer)
{
pnode_ = std::make_shared<rclcpp::Node>("move_group_interface_node");
// We have no control on how the passed node is getting executed. To make sure MGI is functional, we're creating
// our own callback group which is managed in a separate callback thread
callback_group_ = node_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive,
false /* don't spin with node executor */);
callback_executor_.add_callback_group(callback_group_, node->get_node_base_interface());
callback_thread_ = std::thread([this]() { callback_executor_.spin(); });

robot_model_ = opt.robot_model_ ? opt.robot_model_ : getSharedRobotModel(node_, opt.robot_description_);
if (!getRobotModel())
{
Expand Down Expand Up @@ -145,19 +151,19 @@ class MoveGroupInterface::MoveGroupInterfaceImpl
end_effector_link_ = joint_model_group_->getLinkModelNames().back();
pose_reference_frame_ = getRobotModel()->getModelFrame();
// Append the slash between two topic components
trajectory_event_publisher_ = pnode_->create_publisher<std_msgs::msg::String>(
trajectory_event_publisher_ = node_->create_publisher<std_msgs::msg::String>(
rclcpp::names::append(opt_.move_group_namespace_,
trajectory_execution_manager::TrajectoryExecutionManager::EXECUTION_EVENT_TOPIC),
1);
attached_object_publisher_ = pnode_->create_publisher<moveit_msgs::msg::AttachedCollisionObject>(
attached_object_publisher_ = node_->create_publisher<moveit_msgs::msg::AttachedCollisionObject>(
rclcpp::names::append(opt_.move_group_namespace_,
planning_scene_monitor::PlanningSceneMonitor::DEFAULT_ATTACHED_COLLISION_OBJECT_TOPIC),
1);

current_state_monitor_ = getSharedStateMonitor(node_, robot_model_, tf_buffer_);

move_action_client_ = rclcpp_action::create_client<moveit_msgs::action::MoveGroup>(
pnode_, rclcpp::names::append(opt_.move_group_namespace_, move_group::MOVE_ACTION));
node_, rclcpp::names::append(opt_.move_group_namespace_, move_group::MOVE_ACTION), callback_group_);
move_action_client_->wait_for_action_server(wait_for_servers.to_chrono<std::chrono::duration<double>>());
// TODO(JafarAbdi): Enable once moveit_ros_manipulation is ported
// pick_action_client_ = rclcpp_action::create_client<moveit_msgs::action::Pickup>(
Expand All @@ -168,18 +174,22 @@ class MoveGroupInterface::MoveGroupInterfaceImpl
// node_, move_group::PLACE_ACTION);
// place_action_client_->wait_for_action_server(std::chrono::nanoseconds(timeout_for_servers.nanoseconds()));
execute_action_client_ = rclcpp_action::create_client<moveit_msgs::action::ExecuteTrajectory>(
pnode_, rclcpp::names::append(opt_.move_group_namespace_, move_group::EXECUTE_ACTION_NAME));
node_, rclcpp::names::append(opt_.move_group_namespace_, move_group::EXECUTE_ACTION_NAME), callback_group_);
execute_action_client_->wait_for_action_server(wait_for_servers.to_chrono<std::chrono::duration<double>>());

query_service_ = pnode_->create_client<moveit_msgs::srv::QueryPlannerInterfaces>(
rclcpp::names::append(opt_.move_group_namespace_, move_group::QUERY_PLANNERS_SERVICE_NAME));
get_params_service_ = pnode_->create_client<moveit_msgs::srv::GetPlannerParams>(
rclcpp::names::append(opt_.move_group_namespace_, move_group::GET_PLANNER_PARAMS_SERVICE_NAME));
set_params_service_ = pnode_->create_client<moveit_msgs::srv::SetPlannerParams>(
rclcpp::names::append(opt_.move_group_namespace_, move_group::SET_PLANNER_PARAMS_SERVICE_NAME));
query_service_ = node_->create_client<moveit_msgs::srv::QueryPlannerInterfaces>(
rclcpp::names::append(opt_.move_group_namespace_, move_group::QUERY_PLANNERS_SERVICE_NAME),
rmw_qos_profile_services_default, callback_group_);
get_params_service_ = node_->create_client<moveit_msgs::srv::GetPlannerParams>(
rclcpp::names::append(opt_.move_group_namespace_, move_group::GET_PLANNER_PARAMS_SERVICE_NAME),
rmw_qos_profile_services_default, callback_group_);
set_params_service_ = node_->create_client<moveit_msgs::srv::SetPlannerParams>(
rclcpp::names::append(opt_.move_group_namespace_, move_group::SET_PLANNER_PARAMS_SERVICE_NAME),
rmw_qos_profile_services_default, callback_group_);

cartesian_path_service_ = pnode_->create_client<moveit_msgs::srv::GetCartesianPath>(
rclcpp::names::append(opt_.move_group_namespace_, move_group::CARTESIAN_PATH_SERVICE_NAME));
cartesian_path_service_ = node_->create_client<moveit_msgs::srv::GetCartesianPath>(
rclcpp::names::append(opt_.move_group_namespace_, move_group::CARTESIAN_PATH_SERVICE_NAME),
rmw_qos_profile_services_default, callback_group_);

// plan_grasps_service_ = pnode_->create_client<moveit_msgs::srv::GraspPlanning>(GRASP_PLANNING_SERVICE_NAME);

Expand All @@ -190,6 +200,11 @@ class MoveGroupInterface::MoveGroupInterfaceImpl
{
if (constraints_init_thread_)
constraints_init_thread_->join();

callback_executor_.cancel();

if (callback_thread_.joinable())
callback_thread_.join();
}

const std::shared_ptr<tf2_ros::Buffer>& getTF() const
Expand Down Expand Up @@ -222,8 +237,7 @@ class MoveGroupInterface::MoveGroupInterfaceImpl
auto req = std::make_shared<moveit_msgs::srv::QueryPlannerInterfaces::Request>();
auto future_response = query_service_->async_send_request(req);

// wait until future is done
if (rclcpp::spin_until_future_complete(pnode_, future_response) == rclcpp::FutureReturnCode::SUCCESS)
if (future_response.valid())
{
const auto& response = future_response.get();
if (!response->planner_interfaces.empty())
Expand All @@ -239,7 +253,7 @@ class MoveGroupInterface::MoveGroupInterfaceImpl
{
auto req = std::make_shared<moveit_msgs::srv::QueryPlannerInterfaces::Request>();
auto future_response = query_service_->async_send_request(req);
if (rclcpp::spin_until_future_complete(pnode_, future_response) == rclcpp::FutureReturnCode::SUCCESS)
if (future_response.valid())
{
const auto& response = future_response.get();
if (!response->planner_interfaces.empty())
Expand All @@ -259,10 +273,10 @@ class MoveGroupInterface::MoveGroupInterfaceImpl
req->group = group;
std::map<std::string, std::string> result;

auto res = get_params_service_->async_send_request(req);
if (rclcpp::spin_until_future_complete(pnode_, res) == rclcpp::FutureReturnCode::SUCCESS)
auto future_response = get_params_service_->async_send_request(req);
if (future_response.valid())
{
response = res.get();
response = future_response.get();
for (unsigned int i = 0, end = response->params.keys.size(); i < end; ++i)
result[response->params.keys[i]] = response->params.values[i];
}
Expand Down Expand Up @@ -796,7 +810,6 @@ class MoveGroupInterface::MoveGroupInterfaceImpl
// wait until send_goal_opts.result_callback is called
while (!done)
{
rclcpp::spin_some(pnode_);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

Expand Down Expand Up @@ -876,7 +889,6 @@ class MoveGroupInterface::MoveGroupInterfaceImpl
// wait until send_goal_opts.result_callback is called
while (!done)
{
rclcpp::spin_some(pnode_);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

Expand Down Expand Up @@ -945,7 +957,6 @@ class MoveGroupInterface::MoveGroupInterfaceImpl
// wait until send_goal_opts.result_callback is called
while (!done)
{
rclcpp::spin_some(pnode_);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

Expand Down Expand Up @@ -979,10 +990,10 @@ class MoveGroupInterface::MoveGroupInterfaceImpl
req->avoid_collisions = avoid_collisions;
req->link_name = getEndEffectorLink();

auto res = cartesian_path_service_->async_send_request(req);
if (rclcpp::spin_until_future_complete(pnode_, res) == rclcpp::FutureReturnCode::SUCCESS)
auto future_response = cartesian_path_service_->async_send_request(req);
if (future_response.valid())
{
response = res.get();
response = future_response.get();
error_code = response->error_code;
if (response->error_code.val == moveit_msgs::msg::MoveItErrorCodes::SUCCESS)
{
Expand Down Expand Up @@ -1360,7 +1371,9 @@ class MoveGroupInterface::MoveGroupInterfaceImpl

Options opt_;
rclcpp::Node::SharedPtr node_;
rclcpp::Node::SharedPtr pnode_;
rclcpp::CallbackGroup::SharedPtr callback_group_;
rclcpp::executors::SingleThreadedExecutor callback_executor_;
std::thread callback_thread_;
std::shared_ptr<tf2_ros::Buffer> tf_buffer_;
moveit::core::RobotModelConstPtr robot_model_;
planning_scene_monitor::CurrentStateMonitorPtr current_state_monitor_;
Expand Down

0 comments on commit a3955c5

Please sign in to comment.