In [1]:
from threading import Thread
from math import pi

import rclpy
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.node import Node

from pymoveit2 import MoveIt2
from pymoveit2.robots import ur

from geometry_msgs.msg import Pose, Quaternion, Point
import rclpy.wait_for_message   
from ur_msgs.srv import SetIO
from ur_commander.srv import VisualizePoses

rclpy.init()

# Create node for this example
node = Node("notebook_example")
callback_group = ReentrantCallbackGroup()


def display_poses(poses: list[Pose], frame_id: str = "ur_base_link") -> None:
    client = node.create_client(VisualizePoses, "/visualize_poses")
    while not client.wait_for_service(timeout_sec=1.0):
        node.get_logger().info("service not available, waiting again...")
    client.call(VisualizePoses.Request(poses=poses, frame_id=frame_id))


moveit2 = MoveIt2(
    node=node,
    joint_names=ur.joint_names(),
    base_link_name=ur.base_link_name(),
    end_effector_name=ur.end_effector_name(),
    group_name=ur.MOVE_GROUP_ARM,
    callback_group=callback_group,
)

# Spin the node in background thread(s) and wait a bit for initialization
executor = rclpy.executors.MultiThreadedExecutor(2)
executor.add_node(node)
executor_thread = Thread(target=executor.spin, daemon=True, args=())
executor_thread.start()
node.create_rate(1.0).sleep()

# Scale down velocity and acceleration of joints (percentage of maximum)
moveit2.max_velocity = 0.2
moveit2.max_acceleration = 0.1
synchronous = True
cancel_after_secs = 0.0
cartesian = False
cartesian_max_step = 0.0025
cartesian_fraction_threshold = 0.0
cartesian_jump_threshold = 0.0
cartesian_avoid_collisions = False
moveit2.planner_id = "PTP"

# Add collision objects
moveit2.add_collision_box(
    id="table",
    size=[2.0, 1.0, 0.05],
    position=[0.0, 0.0, -0.025],
    quat_xyzw=[0.0, 0.0, 0.0, 0.0],
)

# Add collision objects
moveit2.add_collision_box(
    id="table",
    size=[2.0, 1.0, 0.05],
    position=[0.0, 0.0, -0.025],
    quat_xyzw=[0.0, 0.0, 0.0, 0.0],
)

**Define Joint Position**   

Forward kinematics, a trajectory will be planned

In [4]:
home_position = [0.0, -pi / 2, pi / 2, 0.0, 0.0, 0.0]
node.get_logger().info("Moving to home position")
traj = moveit2.move_to_configuration(home_position)
if traj is None:
    node.get_logger().error("Failed to move to home position")

[INFO] [1733412876.666953828] [notebook_example]: Moving to home position


In [5]:
moveit2.execute(traj)
success = moveit2.wait_until_executed()
if not success:
    node.get_logger().error("Failed to execute trajectory")
else:
    node.get_logger().info("Moved to home position")

[INFO] [1733412880.865178472] [notebook_example]: Moved to home position


**Define a Target Pose**    

Display the pose

In [6]:
target = Pose(
    position=Point(x=0.6, y=0.0, z=0.5),
    orientation=Quaternion(x=0.0, y=0.7071067811865475, z=0.0, w=0.7071067811865476),
)

display_poses([target])

Plan a trajectory

In [7]:
position = [target.position.x, target.position.y, target.position.z]
quat_xyzw = [target.orientation.x, target.orientation.y, target.orientation.z, target.orientation.w]


node.get_logger().info(f"Moving to {{position: {list(position)}, quat_xyzw: {list(quat_xyzw)}}}")
traj = moveit2.plan(
    position=position,
    quat_xyzw=quat_xyzw,
    cartesian=False,
    cartesian_fraction_threshold=cartesian_fraction_threshold,
)

[INFO] [1733412887.419219961] [notebook_example]: Moving to {position: [0.6, 0.0, 0.5], quat_xyzw: [0.0, 0.7071067811865475, 0.0, 0.7071067811865476]}


Execute the trajectory

In [8]:
moveit2.execute(traj)
success = moveit2.wait_until_executed()
if not success:
    node.get_logger().error("Failed to execute trajectory")

[WARN] [1733412900.855418126] [notebook_example]: Action 'execute_trajectory' was unsuccessful: 6.
[ERROR] [1733412900.880574080] [notebook_example]: Failed to execute trajectory


In [None]:
target0 = Pose(
    position=Point(x=0.6, y=-0.3, z=0.5),
    orientation=Quaternion(x=0.0, y=0.7071067811865475, z=0.0, w=0.7071067811865476),
)
target1 = Pose(
    position=Point(x=0.6, y=0.3, z=0.5),
    orientation=Quaternion(x=0.5, y=0.5, z=0.5, w=0.5),
)

display_poses([target0, target1])

In [None]:
targets = [target0, target1]
for i, target_pose in enumerate(targets):

    position = [target_pose.position.x, target_pose.position.y, target_pose.position.z]
    quat_xyzw = [
        target_pose.orientation.x,
        target_pose.orientation.y,
        target_pose.orientation.z,
        target_pose.orientation.w,
    ]

    node.get_logger().info(f"Moving to target {i} {{position: {list(position)}, quat_xyzw: {list(quat_xyzw)}}}")
    traj = moveit2.move_to_pose(
        position=position,
        quat_xyzw=quat_xyzw,
        cartesian=cartesian,
        cartesian_max_step=cartesian_max_step,
        cartesian_fraction_threshold=cartesian_fraction_threshold,
    )
    if traj is not None:
        node.get_logger().info(f"Move to target {i}")
        moveit2.execute(traj)
        success = moveit2.wait_until_executed()

In [7]:
target_poses = [
    Pose(
        position=Point(x=0.6, y=0.1, z=0.8),
        orientation=Quaternion(x=0.0, y=0.7071067811865475, z=0.0, w=0.7071067811865476),
    ),
    Pose(
        position=Point(x=0.6, y=0.3, z=0.5),
        orientation=Quaternion(x=0.5, y=0.5, z=0.5, w=0.5),
    ),
    Pose(
        position=Point(x=0.6, y=-0.3, z=0.5),
        orientation=Quaternion(x=0.0, y=0.7071067811865475, z=0.0, w=0.7071067811865476),
    ),
]

display_poses(target_poses)

In [8]:
traj = moveit2.move_sequence_async(
    poses=target_poses,
    blend_radius=[0.1, 0.1, 0.0],
    pipeline_id="pilz_industrial_motion_planner",
    planner_ids=["PTP", "PTP", "PTP"],
)

[INFO] [1733412119.469049730] [notebook_example]: Sending plan request to action server...
[INFO] [1733412119.492267030] [notebook_example]: Planning successful. Returning trajectory.


Exception in thread Thread-4 (spin):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/home/huanyu/.local/lib/python3.10/site-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/executors.py", line 294, in spin
    self.spin_once()
  File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/executors.py", line 794, in spin_once
    self._spin_once_impl(timeout_sec)
  File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/executors.py", line 775, in _spin_once_impl
    handler, entity, node = self.wait_for_ready_callbacks(
  File "/opt/ros/humble/local/lib/python3.10/dist-packages/rclpy/executors.py", line 711, in wait_for_ready_callbacks
    return next(self._cb_iter)
  File "/opt/ros/h

In [9]:
if traj is not None:
    moveit2.execute(traj)