In [168]:

import rtde_control
import rtde_receive
import threading
import time


ip_ginger = "192.168.1.101"
ip_fred = "192.168.1.102"

# --------------URcontroller ---------------
ginger_rtde_c = rtde_control.RTDEControlInterface(ip_ginger)
ginger_rtde_r = rtde_receive.RTDEReceiveInterface(ip_ginger)


# --------------URreceive ---------------
fred_rtde_c = rtde_control.RTDEControlInterface(ip_fred)
fred_rtde_r = rtde_receive.RTDEReceiveInterface(ip_fred)

# from scripts.RobotGripperModule import SchunkGripper

# ginger_gripper_ip = "192.168.1.252"
# ginger_gripper = SchunkGripper(ginger_gripper_ip)

# fred_gripper_ip = "192.168.1.253"
# fred_gripper = SchunkGripper(fred_gripper_ip)
# -------------------------------
# 分步骤测试
# -------------------------------
START = 0
END = 50











In [169]:

def parse_targets(json_list):
    parsed = []
    for step in json_list:
        # 找出带编号的 pose 字段名（如 Tcp_TargetPose_0）
        pose_key = [k for k in step if k.startswith("Tcp_TargetPose_")][0]
        speed_key = [k for k in step if k.startswith("TargetPose_") and "_Speed" in k][0]
        accel_key = [k for k in step if k.startswith("TargetPose_") and "_Acceleration" in k][0]

        pose_dict = step[pose_key]
        pose = [
            pose_dict["x"],
            pose_dict["y"],
            pose_dict["z"],
            pose_dict["Rx"],
            pose_dict["Ry"],
            pose_dict["Rz"]
        ]

        parsed.append({
            "pose": pose,
            "speed": step[speed_key],
            "accel": step[accel_key],
            "motion": "MoveL" if step.get("MotionType", "").lower().startswith("lin") else "MoveJ",
            "gripper": step.get("GripperCmd", "None")
        })
    return parsed


In [170]:

def move_robot_sequence(rtde_c, targets,start=START, end=END):
    
    if end is None or end > len(targets):
        end = len(targets)
        
    for i in range(start, end):
        step = targets[i]
        pose = step["pose"]
        speed = 0.2
        accel = 0.2
        motion = step["motion"]
        gripper = step["gripper"]
        print(f"[STEP {i}] Motion: {motion}, Gripper: {gripper}")
        print(f"         Pose: {pose} | Speed: {speed}, Accel: {accel}")

        if gripper == "Grasp":
            print("[Gripper] 执行抓取指令")
            # rtde_c.gripper_close()  # 如果有夹爪指令接口的话
        elif gripper == "Release":
            print("[Gripper] 执行释放指令")
            # rtde_c.gripper_open()

        if motion == "MoveL":
            rtde_c.moveL(pose, speed, accel)
        elif motion == "MoveJ":
            rtde_c.moveJ(pose, speed, accel)
        else:
            print(f"[警告] 未知运动类型: {motion}")

        time.sleep(0.05)  # 可调整等待时间

#move_robot_sequence(rtde_c, targets, start=START , end=END)

In [171]:

def initialize_robot(rtde_r,rtde_c):
    actual_q = rtde_r.getActualQ()
    actual_x = rtde_r.getActualTCPPose()

    print(f"机器人当前关节角度: {actual_q}")
    actual_q[0] = actual_q[0] - 0.6  # Move the second joint of the robot to the first target
    rtde_c.moveJ(actual_q, 0.5, 0.5)
    time.sleep(2)  # 等待机器人移动完成
    pass

def move_both_robots_individual_targets(ginger_rtde_c, ginger_targets, fred_rtde_c, fred_targets):
    
    # Create threads for parallel execution
    ginger_thread = threading.Thread(target=move_robot_sequence, args=(ginger_rtde_c, ginger_targets),kwargs={"start":START, "end":END})
    fred_thread = threading.Thread(target=move_robot_sequence, args=(fred_rtde_c, fred_targets),kwargs={"start":START, "end":END})
    # Start both movements
    ginger_thread.start()
    fred_thread.start()
  # Wait for both to finish
    ginger_thread.join()
    fred_thread.join()


In [172]:
# -------------------------------
# 示例读取和执行
# -------------------------------
import json
import os
data_dir = r"C:\Users\dg-admin\Desktop\DDU_Seminar\SS25_Robotic Collaboration\data"

ginger_path = os.path.join(data_dir, 'pose_data_ginger.json')
fred_path = os.path.join(data_dir, 'pose_data_fred.json')

with open(ginger_path, 'r') as f:
    ginger_raw = json.load(f)
with open(fred_path, 'r') as f:
   fred_raw = json.load(f)

ginger_targets = parse_targets(ginger_raw)
fred_targets = parse_targets(fred_raw)


In [173]:
move_both_robots_individual_targets(ginger_rtde_c, ginger_targets, fred_rtde_c, fred_targets)

[STEP 0] Motion: MoveL, Gripper: None
         Pose: [0.4125348700833919, -0.5863731465760209, 0.5808363094478846, 1.1215442908731643, 1.3366044379016528, 1.3366044379016255] | Speed: 0.2, Accel: 0.2
[STEP 0] Motion: MoveL, Gripper: None
         Pose: [-0.292809299374278, -0.41098848713242114, 0.5808363094478847, 1.2866970891644942, -1.0796670529547583, -1.0796670529548151] | Speed: 0.2, Accel: 0.2
[STEP 1] Motion: MoveL, Gripper: Grasp[STEP 1] Motion: MoveL, Gripper: Grasp
         Pose: [0.4125348700833919, -0.5863731465760209, 0.4808363094478846, 1.1215442908731643, 1.3366044379016528, 1.3366044379016255] | Speed: 0.2, Accel: 0.2
[Gripper] 执行抓取指令

         Pose: [-0.292809299374278, -0.41098848713242114, 0.4808363094478847, 1.2866970891644942, -1.0796670529547583, -1.0796670529548151] | Speed: 0.2, Accel: 0.2
[Gripper] 执行抓取指令
[STEP 2] Motion: MoveL, Gripper: Grasp[STEP 2] Motion: MoveL, Gripper: Grasp
         Pose: [-0.3922748824285111, -0.4285269530767815, 0.4808363094478848, 1.2

In [174]:
ginger_rtde_c.disconnect()
fred_rtde_c.disconnect()

ginger_gripper.grip() 
fred_gripper.grip()