In [1]:
import roslibpy
import roslibpy.actionlib
from tqdm import tqdm
import time

client = roslibpy.Ros(host='192.168.1.7', port=9090) # Change host to the IP of the robot
client.run()

In [None]:
topics = client.get_topics()
for topic in tqdm(topics):
    topic_type = client.get_topic_type(topic=topic)
    if "stat" in topic.lower():
        print(topic)
    # if 'command' in str(topic_type).lower():
    #     print(topic, topic_type)

In [None]:
my_topic = "/robot/robot_state"
topic_type = client.get_topic_type(topic=my_topic)
print(topic_type)
message_info = client.get_message_details(message_type=topic_type)
print(message_info)

In [None]:
# Sanity check to see if we are connected
print('Verifying the ROS target is connected?', client.is_connected)
# # 构建消息的 Header 字段
# header = {
#     'seq': 0,
#     'stamp': roslibpy.Time.now(),
#     'frame_id': 'base_link'  # 假设坐标系为 base_link
# }
#
# # 构建 JointJog 消息
# joint_names = ['joint1', 'joint2', 'joint3', 'joint4', 'joint5', 'joint6']  # 假设有 6 个关节
# displacements = [0.1, -0.1, 0.05, 0.0, -0.05, 0.0]  # 各关节的位移量，单位为弧度
# velocities = [0.05, 0.05, 0.05, 0.05, 0.05, 0.05]  # 各关节的速度，单位为弧度/秒
# duration = 2.0  # 持续时间，单位为秒
#
# # 构建控制消息
# joint_jog_message = {
#     'header': header,
#     'joint_names': joint_names,
#     'displacements': displacements,
#     'velocities': velocities,
#     'duration': duration
# }
#
# # 创建 Topic 实例，发布到 /default_move_group/joint_jog 话题
# joint_jog_topic = roslibpy.Topic(client, '/default_move_group/joint_jog', 'control_msgs/JointJog')
#
# # 发布控制消息
# joint_jog_topic.publish(roslibpy.Message(joint_jog_message))

def on_joint_states(message):
    print(message)

# joint_states_client = roslibpy.Topic(client, '/robot/joint_states', 'sensor_msgs/JointState')
joint_states_client = listener = roslibpy.Topic(
    client, my_topic, topic_type
)
joint_states_client.subscribe(on_joint_states)

try:
    while True:
        time.sleep(0.02)
except KeyboardInterrupt:
    print("Exiting...")


In [None]:
services = client.get_services()
for service in tqdm(services):
    if "default_move_group" in service.lower():
        print(service)
        service_type = client.get_service_type(service)
        print('service_type:', service_type)

# publisher = roslibpy.Topic(
#     client, "/devices/robotiqd/grip/goal",
#     "gripper_msgs/GripperBasicCommandActionGoal"
# )
# publisher.publish(roslibpy.Message({
#     "goal": {
#         "action": 2  # 0:GRIP, 1:RELEASE, 2:TOGGLE
#     }
# }))

In [None]:
def on_action(message):
    print(message)
actions = client.get_action_servers(on_action)
print(actions)

In [None]:
"""
Home TCP Pose:
Ori(Euler):
    Roll(X): 180
    Pitch(Y): 0
    Yaw(Z): 0
Ori(Quat):
    x: 1
    y: 0
    z: 0
    w: 0
Coords:
    x: 0.
    y: 0.4
    z: 0.4567
"""

# Ori
timeout = 30
goal_info = []
motion_goal_settings = {  # Dictionary in case of simple motion
    'pose': {
        'position': {'x': 0,
                     'y': 0.3,
                     'z': 0.5},
        'orientation': {'x': -1,
                        'y': 0,
                        'z': 0,
                        'w': 0},
    },
    'frame_id': 'world',  # Default frame
    'max_velocity': {'linear': 0.03,  ## Default velocities
                     'angular': 0.03},
    'max_joint_velocity': 0.03,
    'max_joint_acceleration': 0.03,
    'blend': {'linear': 0.0,
              'angular': 0.0
              }
}
goal_info.append(motion_goal_settings)

service = roslibpy.Service(client, '/robot/switch_controller', 'inovo_driver/SwitchControllerGroup')
request = roslibpy.ServiceRequest({'name': 'trajectory'})
result = service.call(request)

action_client = roslibpy.actionlib.ActionClient(
            client, '/default_move_group/move',
            'commander_msgs/MotionAction')

message_ = {
    'motion_sequence': goal_info}  ## Creating a dictionary that looks the same as the simple motion

goal = roslibpy.actionlib.Goal(action_client, roslibpy.Message(message_))
# goal.on('feedback', lambda f: print(f))
goal.on("feedback", lambda f: print(f))
# goal.on('feedback', lambda f: print(f))
goal.on("feedback", lambda f: print(f))
## Start the goal - this is where the robot will start moving!
goal.send()

result = goal.wait(timeout)
# Clean up the action client
action_client.dispose()

In [None]:
"""
Joint State:
Pos(1-6):
    0, 0, 1.57, 3.14, -1.57, 0
"""
joint_names = ['j1', 'j2', 'j3', 'j4', 'j5', 'j6']
trajectory_goal_settings = {  # Dictionary in case of trajectory ### WIP ###
    'positions': [0, 0, 1.57, 0, 1.57, 3.14],
    'velocities': [0.01] * 6,
    'time_from_start': {'secs': 40}  # Default time
}
trajectory_goals = [trajectory_goal_settings]

service = roslibpy.Service(client, '/robot/switch_controller', 'inovo_driver/SwitchControllerGroup')
request = roslibpy.ServiceRequest({'name': 'trajectory'})
result = service.call(request)
# print('Service response: {}'.format(result))

# Setting up the client for the trajectory action
action_client = roslibpy.actionlib.ActionClient(client, '/robot/joint_trajectory_controller/follow_joint_trajectory', 'control_msgs/FollowJointTrajectoryAction')

message_ = {
    'trajectory': {  # Compiling the message of different dictionaries and arrays to be sent to the server
        'joint_names': joint_names,
        'points': trajectory_goals
    }}

goal = roslibpy.actionlib.Goal(action_client, roslibpy.Message(message_))

goal.on('feedback', lambda f: print(f))
goal.send()
print("[DEBUG] waiting...")
result = goal.wait(timeout)
print("[DEBUG] finished waiting")
action_client.dispose()

In [2]:
# Get all action services
# 获取所有服务
services = client.get_services()

# 打印所有服务
print("Available Services:")
for service in services:
    print(service)

Available Services:
/rosmon/get_loggers
/rosmon/set_logger_level
/beckhoff_io/beckhoff_io_driver/get_loggers
/beckhoff_io/beckhoff_io_driver/set_logger_level
/beckhoff_io/io_info
/beckhoff_io/digital_write
/beckhoff_io/analog_write
/rosmon/start_stop
/joystick_profile_manager/get_loggers
/joystick_profile_manager/set_logger_level
/robot/inovo_driver/get_loggers
/robot/inovo_driver/set_logger_level
/robot/enable
/robot/disable
/robot/switch_to_passive_mode
/robot/refresh_node_ids
/robot/reset
/robot/switch_controller
/robot/arm_info
/commander/get_loggers
/commander/set_logger_level
/psu/estop/reset
/psu/safe_stop/reset
/psu/safe_stop/trip
/psu/enable
/psu/disable
/psu/reset_fault
/psu/get_fw_version
/pendant_joystick/profile/set_parameters
/default_move_group/set_parameters
/default_move_group/hidden_meshes_srv
/default_move_group/get_joint_angles
/default_move_group/get_tcp
/default_move_group/ik
/default_move_group/fk
/sequence/prompt
/sequence/upload
/sequence/download
/sequence/sta