Skip to content

Commit

Permalink
Use CallbackGroup in PlanningSceneMonitor
Browse files Browse the repository at this point in the history
  • Loading branch information
henningkayser committed Jun 2, 2022
1 parent cea5764 commit d6b9ed2
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -482,12 +482,10 @@ class MOVEIT_PLANNING_SCENE_MONITOR_EXPORT PlanningSceneMonitor : private boost:

std::shared_ptr<rclcpp::Node> node_;

// TODO: (anasarrak) callbacks on ROS2?
// https://answers.ros.org/question/300874/how-do-you-use-callbackgroups-as-a-replacement-for-callbackqueues-in-ros2/
// ros::CallbackQueue queue_;
std::shared_ptr<rclcpp::Node> pnode_;
std::shared_ptr<rclcpp::executors::SingleThreadedExecutor> private_executor_;
rclcpp::CallbackGroup::SharedPtr callback_group_;
rclcpp::executors::SingleThreadedExecutor private_executor_;
std::thread private_executor_thread_;
rclcpp::SubscriptionOptions subscription_options_;

std::shared_ptr<tf2_ros::Buffer> tf_buffer_;
std::shared_ptr<tf2_ros::TransformListener> tf_listener_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,27 +100,21 @@ PlanningSceneMonitor::PlanningSceneMonitor(const rclcpp::Node::SharedPtr& node,
const std::string& name)
: monitor_name_(name)
, node_(node)
, private_executor_(std::make_shared<rclcpp::executors::SingleThreadedExecutor>())
, tf_buffer_(std::make_shared<tf2_ros::Buffer>(node->get_clock()))
, dt_state_update_(0.0)
, shape_transform_cache_lookup_wait_time_(0, 0)
, rm_loader_(rm_loader)
{
std::vector<std::string> new_args = rclcpp::NodeOptions().arguments();
new_args.push_back("--ros-args");
new_args.push_back("-r");
// Add random ID to prevent warnings about multiple publishers within the same node
new_args.push_back(std::string("__node:=") + node_->get_name() + "_private_" +
std::to_string(reinterpret_cast<std::size_t>(this)));
new_args.push_back("--");
pnode_ = std::make_shared<rclcpp::Node>("_", node_->get_namespace(), rclcpp::NodeOptions().arguments(new_args));
// We have no control on how the passed node is getting executed. To make sure MGI is functional, we're creating
// our own callback group which is managed in a separate callback thread
callback_group_ = node_->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive,
false /* don't spin with node executor */);
private_executor_.add_callback_group(callback_group_, node_->get_node_base_interface());
subscription_options_.callback_group = callback_group_;
private_executor_thread_ = std::thread([this]() { private_executor_.spin(); });

tf_listener_ = std::make_shared<tf2_ros::TransformListener>(*tf_buffer_);
tf_buffer_->setUsingDedicatedThread(true);
// use same callback queue as root_nh_
// root_nh_.setCallbackQueue(&queue_);
// nh_.setCallbackQueue(&queue_);
// spinner_.reset(new ros::AsyncSpinner(1, &queue_));
// spinner_->start();
initialize(scene);

use_sim_time_ = node->get_parameter("use_sim_time").as_bool();
Expand All @@ -138,10 +132,11 @@ PlanningSceneMonitor::~PlanningSceneMonitor()
stopWorldGeometryMonitor();
stopSceneMonitor();

private_executor_->cancel();
if (private_executor_.is_spinning())
private_executor_.cancel();

if (private_executor_thread_.joinable())
private_executor_thread_.join();
private_executor_.reset();

current_state_monitor_.reset();
scene_const_.reset();
Expand Down Expand Up @@ -223,32 +218,35 @@ void PlanningSceneMonitor::initialize(const planning_scene::PlanningScenePtr& sc
state_update_pending_ = false;
// Period for 0.1 sec
using std::chrono::nanoseconds;
state_update_timer_ = pnode_->create_wall_timer(dt_state_update_, [this]() { return stateUpdateTimerCallback(); });
private_executor_->add_node(pnode_);

// start executor on a different thread now
private_executor_thread_ = std::thread([this]() { private_executor_->spin(); });
state_update_timer_ = node_->create_wall_timer(
dt_state_update_, [this]() { return stateUpdateTimerCallback(); }, callback_group_);

auto declare_parameter = [this](const std::string& param_name, auto default_val,
const std::string& description) -> auto
auto get_parameter = [this](const std::string& param_name, auto default_val, const std::string& description) -> auto
{
rcl_interfaces::msg::ParameterDescriptor desc;
desc.set__description(description);
return pnode_->declare_parameter(param_name, default_val, desc);
if (node_->has_parameter(param_name))
{
return node_->get_parameter_or(param_name, default_val);
}
else
{
return node_->declare_parameter(param_name, default_val, desc);
}
};

try
{
// Set up publishing parameters
bool publish_planning_scene =
declare_parameter("publish_planning_scene", false, "Set to True to publish Planning Scenes");
bool publish_geometry_updates = declare_parameter("publish_geometry_updates", false,
"Set to True to publish geometry updates of the planning scene");
get_parameter("publish_planning_scene", false, "Set to True to publish Planning Scenes");
bool publish_geometry_updates = get_parameter("publish_geometry_updates", false,
"Set to True to publish geometry updates of the planning scene");
bool publish_state_updates =
declare_parameter("publish_state_updates", false, "Set to True to publish state updates of the planning scene");
bool publish_transform_updates = declare_parameter(
"publish_transforms_updates", false, "Set to True to publish transform updates of the planning scene");
double publish_planning_scene_hz = declare_parameter(
get_parameter("publish_state_updates", false, "Set to True to publish state updates of the planning scene");
bool publish_transform_updates = get_parameter("publish_transforms_updates", false,
"Set to True to publish transform updates of the planning scene");
double publish_planning_scene_hz = get_parameter(
"publish_planning_scene_hz", 4.0, "Set the maximum frequency at which planning scene updates are published");
updatePublishSettings(publish_geometry_updates, publish_state_updates, publish_transform_updates,
publish_planning_scene, publish_planning_scene_hz);
Expand All @@ -268,11 +266,11 @@ void PlanningSceneMonitor::initialize(const planning_scene::PlanningScenePtr& sc
bool publish_planning_scene = false, publish_geometry_updates = false, publish_state_updates = false,
publish_transform_updates = false;
double publish_planning_scene_hz = 4.0;
bool declared_params_valid = pnode_->get_parameter("publish_planning_scene", publish_planning_scene) &&
pnode_->get_parameter("publish_geometry_updates", publish_geometry_updates) &&
pnode_->get_parameter("publish_state_updates", publish_state_updates) &&
pnode_->get_parameter("publish_transforms_updates", publish_transform_updates) &&
pnode_->get_parameter("publish_planning_scene_hz", publish_planning_scene_hz);
bool declared_params_valid = node_->get_parameter("publish_planning_scene", publish_planning_scene) &&
node_->get_parameter("publish_geometry_updates", publish_geometry_updates) &&
node_->get_parameter("publish_state_updates", publish_state_updates) &&
node_->get_parameter("publish_transforms_updates", publish_transform_updates) &&
node_->get_parameter("publish_planning_scene_hz", publish_planning_scene_hz);

if (!declared_params_valid)
{
Expand All @@ -287,11 +285,10 @@ void PlanningSceneMonitor::initialize(const planning_scene::PlanningScenePtr& sc
const auto& type = parameter.get_type();

// Only allow already declared parameters with same value type
if (!pnode_->has_parameter(name) || pnode_->get_parameter(name).get_type() != type)
if (!node_->has_parameter(name) || node_->get_parameter(name).get_type() != type)
{
RCLCPP_ERROR(LOGGER, "Invalid parameter in PlanningSceneMonitor parameter callback");
result.successful = false;
return result;
RCLCPP_WARN_STREAM(LOGGER, "Invalid parameter in PlanningSceneMonitor parameter callback: " << name.c_str());
continue;
}

// Update parameter values
Expand All @@ -313,7 +310,7 @@ void PlanningSceneMonitor::initialize(const planning_scene::PlanningScenePtr& sc
return result;
};

callback_handler_ = pnode_->add_on_set_parameters_callback(psm_parameter_set_callback);
callback_handler_ = node_->add_on_set_parameters_callback(psm_parameter_set_callback);
}

void PlanningSceneMonitor::monitorDiffs(bool flag)
Expand Down Expand Up @@ -385,7 +382,7 @@ void PlanningSceneMonitor::startPublishingPlanningScene(SceneUpdateType update_t
publish_update_types_ = update_type;
if (!publish_planning_scene_ && scene_)
{
planning_scene_publisher_ = pnode_->create_publisher<moveit_msgs::msg::PlanningScene>(planning_scene_topic, 100);
planning_scene_publisher_ = node_->create_publisher<moveit_msgs::msg::PlanningScene>(planning_scene_topic, 100);
RCLCPP_INFO(LOGGER, "Publishing maintained planning scene on '%s'", planning_scene_topic.c_str());
monitorDiffs(true);
publish_planning_scene_ = std::make_unique<boost::thread>([this] { scenePublishingThread(); });
Expand Down Expand Up @@ -543,7 +540,8 @@ bool PlanningSceneMonitor::requestPlanningSceneState(const std::string& service_
throw std::runtime_error("requestPlanningSceneState() to self-provided service: " + service_name);
}
// use global namespace for service
auto client = pnode_->create_client<moveit_msgs::srv::GetPlanningScene>(service_name);
auto client = node_->create_client<moveit_msgs::srv::GetPlanningScene>(service_name, rmw_qos_profile_services_default,
callback_group_);
auto srv = std::make_shared<moveit_msgs::srv::GetPlanningScene::Request>();
srv->components.components = srv->components.SCENE_SETTINGS | srv->components.ROBOT_STATE |
srv->components.ROBOT_STATE_ATTACHED_OBJECTS | srv->components.WORLD_OBJECT_NAMES |
Expand Down Expand Up @@ -583,11 +581,13 @@ bool PlanningSceneMonitor::requestPlanningSceneState(const std::string& service_
void PlanningSceneMonitor::providePlanningSceneService(const std::string& service_name)
{
// Load the service
get_scene_service_ = pnode_->create_service<moveit_msgs::srv::GetPlanningScene>(
service_name, [this](moveit_msgs::srv::GetPlanningScene::Request::SharedPtr req,
moveit_msgs::srv::GetPlanningScene::Response::SharedPtr res) {
get_scene_service_ = node_->create_service<moveit_msgs::srv::GetPlanningScene>(
service_name,
[this](moveit_msgs::srv::GetPlanningScene::Request::SharedPtr req,
moveit_msgs::srv::GetPlanningScene::Response::SharedPtr res) {
return getPlanningSceneServiceCallback(req, res);
});
},
rmw_qos_profile_services_default, callback_group_);
}

void PlanningSceneMonitor::getPlanningSceneServiceCallback(moveit_msgs::srv::GetPlanningScene::Request::SharedPtr req,
Expand Down Expand Up @@ -1095,9 +1095,10 @@ void PlanningSceneMonitor::startSceneMonitor(const std::string& scene_topic)
// listen for planning scene updates; these messages include transforms, so no need for filters
if (!scene_topic.empty())
{
planning_scene_subscriber_ = pnode_->create_subscription<moveit_msgs::msg::PlanningScene>(
planning_scene_subscriber_ = node_->create_subscription<moveit_msgs::msg::PlanningScene>(
scene_topic, 100,
[this](const moveit_msgs::msg::PlanningScene::SharedPtr scene) { return newPlanningSceneCallback(scene); });
[this](const moveit_msgs::msg::PlanningScene::SharedPtr scene) { return newPlanningSceneCallback(scene); },
subscription_options_);
RCLCPP_INFO(LOGGER, "Listening to '%s'", planning_scene_subscriber_->get_topic_name());
}
}
Expand Down Expand Up @@ -1182,17 +1183,19 @@ void PlanningSceneMonitor::startWorldGeometryMonitor(const std::string& collisio
// Listen to the /collision_objects topic to detect requests to add/remove/update collision objects to/from the world
if (!collision_objects_topic.empty())
{
collision_object_subscriber_ = pnode_->create_subscription<moveit_msgs::msg::CollisionObject>(
collision_object_subscriber_ = node_->create_subscription<moveit_msgs::msg::CollisionObject>(
collision_objects_topic, 1024,
[this](moveit_msgs::msg::CollisionObject::SharedPtr obj) { return collisionObjectCallback(obj); });
[this](moveit_msgs::msg::CollisionObject::SharedPtr obj) { return collisionObjectCallback(obj); },
subscription_options_);
RCLCPP_INFO(LOGGER, "Listening to '%s'", collision_objects_topic.c_str());
}

if (!planning_scene_world_topic.empty())
{
planning_scene_world_subscriber_ = pnode_->create_subscription<moveit_msgs::msg::PlanningSceneWorld>(
planning_scene_world_subscriber_ = node_->create_subscription<moveit_msgs::msg::PlanningSceneWorld>(
planning_scene_world_topic, 1,
[this](moveit_msgs::msg::PlanningSceneWorld::SharedPtr world) { return newPlanningSceneWorldCallback(world); });
[this](moveit_msgs::msg::PlanningSceneWorld::SharedPtr world) { return newPlanningSceneWorldCallback(world); },
subscription_options_);
RCLCPP_INFO(LOGGER, "Listening to '%s' for planning scene world geometry", planning_scene_world_topic.c_str());
}

Expand Down Expand Up @@ -1241,8 +1244,7 @@ void PlanningSceneMonitor::startStateMonitor(const std::string& joint_states_top
{
if (!current_state_monitor_)
{
current_state_monitor_ =
std::make_shared<CurrentStateMonitor>(pnode_, getRobotModel(), tf_buffer_, use_sim_time_);
current_state_monitor_ = std::make_shared<CurrentStateMonitor>(node_, getRobotModel(), tf_buffer_, use_sim_time_);
}
current_state_monitor_->addUpdateCallback(
[this](const sensor_msgs::msg::JointState::ConstSharedPtr& joint_state) { return onStateUpdate(joint_state); });
Expand All @@ -1253,16 +1255,17 @@ void PlanningSceneMonitor::startStateMonitor(const std::string& joint_states_top
if (dt_state_update_.count() > 0)
// ROS original: state_update_timer_.start();
// TODO: re-enable WallTimer start()
state_update_timer_ =
pnode_->create_wall_timer(dt_state_update_, [this]() { return stateUpdateTimerCallback(); });
state_update_timer_ = node_->create_wall_timer(
dt_state_update_, [this]() { return stateUpdateTimerCallback(); }, callback_group_);
}

if (!attached_objects_topic.empty())
{
// using regular message filter as there's no header
attached_collision_object_subscriber_ = pnode_->create_subscription<moveit_msgs::msg::AttachedCollisionObject>(
attached_collision_object_subscriber_ = node_->create_subscription<moveit_msgs::msg::AttachedCollisionObject>(
attached_objects_topic, 1024,
[this](moveit_msgs::msg::AttachedCollisionObject::SharedPtr obj) { return attachObjectCallback(obj); });
[this](moveit_msgs::msg::AttachedCollisionObject::SharedPtr obj) { return attachObjectCallback(obj); },
subscription_options_);
RCLCPP_INFO(LOGGER, "Listening to '%s' for attached collision objects",
attached_collision_object_subscriber_->get_topic_name());
}
Expand Down Expand Up @@ -1376,7 +1379,8 @@ void PlanningSceneMonitor::setStateUpdateFrequency(double hz)
dt_state_update_ = std::chrono::duration<double>(1.0 / hz);
// ROS original: state_update_timer_.start();
// TODO: re-enable WallTimer start()
state_update_timer_ = pnode_->create_wall_timer(dt_state_update_, [this]() { return stateUpdateTimerCallback(); });
state_update_timer_ = node_->create_wall_timer(
dt_state_update_, [this]() { return stateUpdateTimerCallback(); }, callback_group_);
}
else
{
Expand Down

0 comments on commit d6b9ed2

Please sign in to comment.