In [None]:
import numpy as np
from xarm.wrapper import XArmAPI


root_path = "/home/yigit/projects/pemp/comparisons/pydmps/"

ROBOT_IP        = '192.168.1.225'
TRAJ_PATH       = root_path +'r_d_dmp_full.npy'  # 1x400x7 or 400x7
SERVO_RATE_HZ   = 100.0
TOTAL_TIME_S    = 8.0                # duration to play the whole trajectory (tune)
RAMP_TIME_S     = 1.0                # gentle ramp from current -> first waypoint

def load_traj(path):
    T = np.load(path)
    if T.ndim == 3 and T.shape[0] == 1:
        T = T[0]
    if T.shape[-1] != 7:
        raise ValueError(f'Expected last dim = 7, got {T.shape}')
    return T  # shape (N,7) in radians

def cubic_upsample(W, rate_hz, total_time_s):
    """Upsample a (N,7) waypoints array to fixed dt using cubic interpolation."""
    N = len(W)
    tq = np.linspace(0.0, 1.0, N)
    M = max(2, int(total_time_s * rate_hz))
    t  = np.linspace(0.0, 1.0, M)
    # per-joint cubic (fall back to linear if N<4)
    if N >= 4:
        from scipy.interpolate import CubicSpline  # optional; if missing, we do linear
        out = np.empty((M, 7), dtype=float)
        for j in range(7):
            out[:, j] = CubicSpline(tq, W[:, j], bc_type='clamped')(t)
        return out
    else:
        # linear fallback
        return np.stack([np.interp(t, tq, W[:, j]) for j in range(7)], axis=1)

def rad_clamp(q):
    """Basic clamp to [-pi, pi] for wrist roll etc. Remove if your limits differ."""
    q = np.asarray(q)
    q = (q + np.pi) % (2*np.pi) - np.pi
    return q

def move_home(arm):
    # Move to home position
    arm.set_position(x=470, y=0, z=425, roll=180, pitch=0, yaw=0, speed=50, is_radian=False)

def move_center(arm):
    # Move to center position
    arm.set_position(x=470, y=0, z=310, roll=180, pitch=0, yaw=0, speed=50, is_radian=False)

def wrap_up(arm):
    move_center(arm)
    move_home(arm)
    arm.disconnect()

SDK_VERSION: 1.17.0


In [None]:
arm = XArmAPI(ROBOT_IP, is_radian=True)  # use radians end-to-end
arm.motion_enable(True)
arm.set_mode(0); arm.set_state(0)

W = load_traj(TRAJ_PATH)              # (N,7) in radians
W_up = cubic_upsample(W, SERVO_RATE_HZ, TOTAL_TIME_S)

curr = np.array(arm.angles)
goal0 = np.array(W_up[0])
ramp_M = max(2, int(RAMP_TIME_S * SERVO_RATE_HZ))
ramp = np.stack([np.linspace(curr[j], goal0[j], ramp_M) for j in range(7)], axis=1)

In [None]:
import time


def stream_segment(seg):
    dt = 1.0 / SERVO_RATE_HZ
    t0 = time.time()
    for k in range(len(seg)):
        q = seg[k].tolist()  # 7 elems, radians
        arm.set_servo_angle(angle=q, is_radian=True, speed=100, wait=False)

        t_next = t0 + (k+1)*dt
        while time.time() < t_next:
            time.sleep(0.0005)

move_home(arm)
# time.sleep(4.0)
# try:
#     # 1) gentle ramp
#     stream_segment(ramp)
#     # 2) play the upsampled trajectory
stream_segment(W_up)
# finally:
#     # return to planner mode if you want to do other moves
#     # arm.set_mode(0); arm.set_state(0)
#     # wrap_up(arm)
#     asdad=1