In [1]:
%load_ext autoreload
%autoreload 2

import sys, os
from pydrake.all import StartMeshcat

# if you're not launching jupyter from the repo root, fix the path:
repo_root = os.path.abspath(os.path.join(os.getcwd(), ".."))
if repo_root not in sys.path:
    sys.path.insert(0, repo_root)

meshcat = StartMeshcat()
print("Meshcat:", meshcat.web_url())

INFO:drake:Meshcat listening for connections at http://localhost:7001


Meshcat: http://localhost:7001


In [57]:
import os
import numpy as np
from pydrake.all import StartMeshcat
from pydrake.multibody.tree import RevoluteJoint
from rocky.build import build_robot_diagram_two
from rocky.ilqr import ILQRController, CostParams
from rocky.dynamics import ArmParams, forward_kinematics

rng = np.random.default_rng()


# --- URDF ---
urdf = "../rocky.urdf"

# --- Build ---
bundle = build_robot_diagram_two(
    urdf_path=urdf,
    time_step=1e-3,
    gravity_vec=(0., 0., 0.),
    meshcat=meshcat,
)

plant, ctx, sim = bundle.plant, bundle.plant_context, bundle.simulator
ally, enemy = bundle.ally, bundle.enemy

W = plant.world_frame()
enemy_target_frame = plant.GetFrameByName("enemy_target")
ally_wall_frame = plant.GetFrameByName("wall", model_instance=ally)
ally_glove_frame = plant.GetFrameByName("glove", model_instance=ally)
enemy_wall_frame = plant.GetFrameByName("wall", model_instance=enemy)
enemy_elbow_frame = plant.GetFrameByName("elbow_marker", model_instance=enemy)
enemy_glove_frame = plant.GetFrameByName("glove", model_instance=enemy)


# --- Joint helpers ---
def get_revolute_joints_for_instance(plant, instance):
    joints = [plant.get_joint(j) for j in plant.GetJointIndices(instance)]
    return [j for j in joints if isinstance(j, RevoluteJoint)]


def set_angles_for_instance(plant, ctx, instance, angles_rad):
    revs = get_revolute_joints_for_instance(plant, instance)
    for j, q in zip(revs, angles_rad):
        j.set_angle(ctx, float(q))


def set_zero_torques(plant, ctx):
    plant.get_actuation_input_port().FixValue(ctx, np.zeros(plant.num_actuators()))


def get_ally_state(plant, ctx, ally):
    q = plant.GetPositions(ctx, ally)[:2]
    dq = plant.GetVelocities(ctx, ally)[:2]
    return np.hstack([q, dq])


def get_ally_shoulder_in_world(plant, ctx, ally_wall_frame):
    X_W_wall = plant.CalcRelativeTransform(ctx, W, ally_wall_frame)
    shoulder_in_wall = np.array([0.0, 0.0, 1.0])   # joint origin in wall frame
    return X_W_wall.translation() + shoulder_in_wall


def get_target_pos_2d(plant, ctx):
    p_W_shoulder = get_ally_shoulder_in_world(plant, ctx, ally_wall_frame)
    p_W_T = plant.CalcRelativeTransform(ctx, W, enemy_target_frame).translation()
    rel = p_W_T - p_W_shoulder              # [dx, dy, dz] in world
    return np.array([rel[0], rel[2]])       # model x,z  (no sign flip)


def get_enemy_pos_2d(plant, ctx):
    p_W_shoulder = get_ally_shoulder_in_world(plant, ctx, ally_wall_frame)
    p_W_EE = plant.CalcRelativeTransform(ctx, W, enemy_glove_frame).translation()
    rel = p_W_EE - p_W_shoulder
    return np.array([rel[0], rel[2]])       # model x,z  (no sign flip)


def get_ally_pos_2d_from_model(x, params):
    return forward_kinematics(x[:2], params)


def apply_ally_torque(plant, ctx, ally, u2):
    tau = np.zeros(plant.num_actuators())
    idx = list(plant.GetJointActuatorIndices(ally))
    tau[idx[0]] = u2[0]
    tau[idx[1]] = u2[1]
    plant.get_actuation_input_port().FixValue(ctx, tau)


def randomize_enemy_configuration(
    clearance_buffer=0.02,
    clearance_target=None,
    clearance_ally=None,
    max_tries=200,
):
    """Sample random enemy joint angles that keep the glove away from target/ally."""
    glove_radius = 0.05
    target_radius = 0.03
    clearance_target = clearance_target or (glove_radius + target_radius + clearance_buffer)
    clearance_ally = clearance_ally or (2 * glove_radius + clearance_buffer)

    joints = get_revolute_joints_for_instance(plant, enemy)
    limits = []
    for j in joints:
        lo = j.position_lower_limit()
        hi = j.position_upper_limit()
        if not np.isfinite(lo):
            lo = -np.pi
        if not np.isfinite(hi):
            hi = np.pi
        limits.append((lo, hi))

    for attempt in range(1, max_tries + 1):
        candidate = np.array([rng.uniform(lo, hi) for lo, hi in limits])
        set_angles_for_instance(plant, ctx, enemy, candidate)

        p_enemy = plant.CalcRelativeTransform(ctx, W, enemy_glove_frame).translation()
        p_target = plant.CalcRelativeTransform(ctx, W, enemy_target_frame).translation()
        p_ally = plant.CalcRelativeTransform(ctx, W, ally_glove_frame).translation()
        p_elbow = plant.CalcRelativeTransform(ctx, W, enemy_elbow_frame).translation()
        wall_x = plant.CalcRelativeTransform(ctx, W, enemy_wall_frame).translation()[0]

        if not (p_elbow[0] < wall_x and p_enemy[0] < wall_x):
            continue
        if np.linalg.norm(p_enemy - p_target) < clearance_target:
            continue
        if np.linalg.norm(p_enemy - p_ally) < clearance_ally:
            continue
        print(f"Enemy randomized in {attempt} tries.")
        return candidate

    raise RuntimeError("Failed to sample a collision-free enemy configuration")


params = ArmParams()
cost = CostParams(
    Q=np.diag([2.0, 2.0, 0.1, 0.1]),
    R=0.01 * np.eye(2),
    Qf=np.diag([50.0, 50.0, 1.0, 1.0]),
    target_weight=425.0,
    r_min=0.0, # Turning off Exponential Cost at a certain distance
    avoid_weight=50.0,
    avoid_sigma=0.08,
)

ctrl = ILQRController(
    horizon=60,
    dt=0.02,
    params=params,
    cost=cost,
    verbose=False,
)

print(ctrl)

# --- Initial angles ---

# Ally starts from a fixed configuration for repeatability
ally_angles = np.deg2rad([20.0, -90.0])
set_angles_for_instance(plant, ctx, ally, ally_angles)

# Enemy starts from a random, collision-free configuration on every run
enemy_angles = randomize_enemy_configuration()
print("Enemy start angles (deg):", np.rad2deg(enemy_angles))

sim.AdvanceTo(0.0)
print("Done.")


ILQRController(
  horizon=60, dt=0.02, max_iter=60, tol=0.001, reg=0.0001, verbose=False
  params: l1=0.5, l2=0.5, m1=0.6, m2=0.5, I1=0.002, I2=0.002, damping=[0.100 0.100], gravity=0, torque_limit=[50.000 50.000]
  cost: Q=[[2.000 0.000 0.000 0.000]
 [0.000 2.000 0.000 0.000]
 [0.000 0.000 0.100 0.000]
 [0.000 0.000 0.000 0.100]], R=[[0.010 0.000]
 [0.000 0.010]], Qf=[[50.000  0.000  0.000  0.000]
 [ 0.000 50.000  0.000  0.000]
 [ 0.000  0.000  1.000  0.000]
 [ 0.000  0.000  0.000  1.000]], target_w=425.0, avoid_w=50.0, avoid_sigma=0.08, x_goal=[0.000 0.000 0.000 0.000]
  alphas=[1.0, 0.5, 0.25, 0.1, 0.05]
)
Enemy randomized in 1 tries.
Enemy start angles (deg): [ 37.43200126 -88.12516512]
Done.


In [58]:
import time

t_final = 10.0
dt_ctrl = ctrl.dt
base_avoid_weight = cost.avoid_weight
avoid_decay_time = 5.0  # seconds to significantly reduce avoidance weight
min_avoid_scale = 0.1
t = sim.get_context().get_time()
u_traj_prev = None
dt_mpc = 0.02  # CHANGED: 0.05 is too large for stable feedback on this arm
horizon = 40   # Adjusted horizon to match new dt (40 * 0.02 = 0.8s lookahead)
step = 0

print("Starting MPC loop...")

def has_reached_target(ally_pos, target_pos, x, pos_tol=0.1, vel_tol=0.1):
    pos_err = np.linalg.norm(ally_pos - target_pos)
    return (pos_err < pos_tol)

while t < t_final:
    # 1) Current state
    x_current = get_ally_state(plant, ctx, ally)

    # 2) Positions
    target_pos_2d = get_target_pos_2d(plant, ctx)
    enemy_pos_2d  = get_enemy_pos_2d(plant, ctx)
    ally_pos_2d   = get_ally_pos_2d_from_model(x_current, params)
    
    # Check for NaN in state (Simulation exploded)
    if np.any(np.isnan(x_current)):
        print("ERROR: Simulation state is NaN. Physics exploded.")
        break

    if has_reached_target(ally_pos_2d, target_pos_2d, x_current):
        print(f"\n*** Target reached at t={t:.2f}s ***")
        print(f"ally EE pos: {ally_pos_2d}, target: {target_pos_2d}")
        print("Holding position with zero torque.")
        apply_ally_torque(plant, ctx, ally, np.zeros(2))
        sim.AdvanceTo(t + dt_ctrl)
        t = sim.get_context().get_time()
        step += 1
        break

    # 4) MPC Step
    decay = np.exp(-t / avoid_decay_time)
    avoid_scale = max(min_avoid_scale, decay)
    ctrl.cost.avoid_weight = base_avoid_weight * avoid_scale
    x_pred, result = ctrl.step_mpc(
        x_current,
        target_pos=target_pos_2d,
        enemy_pos=enemy_pos_2d,
        prev_u_trj=u_traj_prev,
    )

    if np.isfinite(result.cost) and result.converged:
        u_traj_prev = result.u_trj
        u0 = result.u_trj[0]
    else:
        print(f"[Warning] iLQR failed (cost: {result.cost}, conv: {result.converged}). Braking.")
        # Damping torque to stop robot if controller fails
        u0 = params.clip_u(-2.0 * x_current[2:])
        u_traj_prev = None # Reset warm start

    # 5) Debug prints
    if step % 5 == 0: # reduce spam
        print(f"t={t:.3f} | x={np.array2string(x_current, precision=2)} | cost={result.cost:.1f} | avoid_w={ctrl.cost.avoid_weight:.1f} | u0={np.array2string(u0, precision=2)}")

    # 6) Apply
    apply_ally_torque(plant, ctx, ally, u0)
    
    # Advance exactly by the controller dt
    sim.AdvanceTo(t + dt_mpc)
    t = sim.get_context().get_time()
    step += 1

print("MPC run complete.")


Starting MPC loop...
t=0.000 | x=[ 0.35 -1.57  0.    0.  ] | cost=6432.9 | avoid_w=50.0 | u0=[13.26 -5.34]
t=0.100 | x=[ 1.01 -1.97 10.49 10.1 ] | cost=712.0 | avoid_w=49.0 | u0=[-24.21   3.69]
t=0.200 | x=[ 1.41 -1.53  1.63  2.21] | cost=26890953396464963083567104.0 | avoid_w=48.0 | u0=[-3.26 -4.42]
t=0.300 | x=[ 1.3  -1.17 -3.86  6.1 ] | cost=8646.4 | avoid_w=47.1 | u0=[ 0.2  -0.56]
t=0.400 | x=[ 1.12 -1.01  0.9   0.13] | cost=7708.3 | avoid_w=46.2 | u0=[1.62 0.73]
t=0.500 | x=[ 1.11 -0.87 -1.28  2.26] | cost=1539.2 | avoid_w=45.2 | u0=[-2.52 -0.37]
t=0.600 | x=[ 0.95 -0.62 -1.51  2.44] | cost=476.8 | avoid_w=44.3 | u0=[0.69 0.41]
t=0.700 | x=[ 0.82 -0.39 -1.21  2.19] | cost=412.2 | avoid_w=43.5 | u0=[-0.09  0.22]

*** Target reached at t=0.78s ***
ally EE pos: [ 0.79415612 -0.4947249 ], target: [ 0.8 -0.4]
Holding position with zero torque.
MPC run complete.
