In [1]:
import mediapy as media
import mujoco
import numpy as np
import pathlib
from mujoco_mpc import agent as agent_lib
import predictive_sampling



In [2]:
model_path = (
    "/home/oscar/mujoco_mpc/build/mjpc/tasks/minicrane/task.xml"
    ##"/home/oscar/Downloads/brg_crane.xml"
)

In [3]:
model = mujoco.MjModel.from_xml_path(str(model_path))
data = mujoco.MjData(model)

In [6]:
mujoco.mj_resetData(model, data)
mujoco.mj_forward(model, data)

In [6]:
agent = agent_lib.Agent(
                server_binary_path=pathlib.Path(agent_lib.__file__).parent
                / "mjpc"
                / "ui_agent_server",
                task_id="Minicrane", 
                model=model)

I0000 00:00:1730573701.511186   17597 ui_agent_server.cc:57] Server listening on [::]:58943


In [5]:
agent = agent_lib.Agent(
                task_id="Minicrane", 
                model=model)

I0000 00:00:1730581310.393954  109911 agent_server.cc:55] Server listening on [::]:52303


In [8]:
mujoco.mj_kinematics(model, data)

In [7]:
data.joint("payload-joint")

<_MjDataJointViews
  cdof: array([[-0.00580863,  0.1098525 ,  0.        ,  0.        , -1.        ,
         0.        ],
       [-0.02890187,  0.        ,  0.1098525 ,  0.        , -1.        ,
         0.        ],
       [ 0.01208713,  0.        ,  0.2746025 ,  0.        ,  0.        ,
         0.        ],
       [ 1.        ,  0.        ,  0.        ,  0.        ,  0.        ,
         0.        ],
       [ 0.        ,  1.        ,  0.        ,  0.        ,  0.        ,
         0.        ],
       [ 0.        ,  0.        ,  1.        ,  1.        ,  0.        ,
         0.        ]])
  cdof_dot: array([[ 0.,  0.,  0.,  0.,  0., -0.],
       [ 0.,  0.,  0.,  0.,  0., -0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.]])
  id: 3
  name: 'payload-joint'
  qLDiagInv: array([5.00000000e+01, 5.00000000e+01, 5.00000000e+01, 5.76923077e+04,
       5.76923077e+04, 7.50000000e+05])

In [9]:
agent.get_state().time


0.0

In [10]:
agent.get_task_parameters()

{'GoalZ': 1.0, 'GoalY': -0.7, 'ROSTime': 0.0, 'GoalX': 0.0}

In [11]:

data.body('payload')

<_MjDataBodyViews
  cacc: array([0., 0., 0., 0., 0., 0.])
  cfrc_ext: array([0., 0., 0., 0., 0., 0.])
  cfrc_int: array([0., 0., 0., 0., 0., 0.])
  cinert: array([1.73333333e-05, 1.73333333e-05, 1.33333333e-06, 0.00000000e+00,
       0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
       0.00000000e+00, 2.00000000e-02])
  crb: array([1.73333333e-05, 1.73333333e-05, 1.33333333e-06, 0.00000000e+00,
       0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
       0.00000000e+00, 2.00000000e-02])
  cvel: array([0., 0., 0., 0., 0., 0.])
  id: 6
  name: 'payload'
  subtree_angmom: array([0., 0., 0.])
  subtree_com: array([0.71 , 0.024, 1.112])
  subtree_linvel: array([0., 0., 0.])
  xfrc_applied: array([0., 0., 0., 0., 0., 0.])
  ximat: array([1., 0., 0., 0., 1., 0., 0., 0., 1.])
  xipos: array([0.71 , 0.024, 1.112])
  xmat: array([1., 0., 0., 0., 1., 0., 0., 0., 1.])
  xpos: array([0.71 , 0.024, 1.112])
  xquat: array([1., 0., 0., 0.])
>

In [22]:
viewer = mujoco.viewer.launch_passive(model, data)

In [None]:
data.userdata

# try reward:
Distance: 77.5
control = 90.5
Orientation: 52.5
Angular Acceleartion: 43
Linear Velocity: 43

GoalX: 0.68 GoalY: 0.1 GoalZ: 0.4

Horizon: 1.31
Tiemstep:0.00709
Implicit Fast

Rollout: 40
Spline: Linear
Spline Pts: 7

In [None]:
w0 = 430 # Distance cost weight
w1 = 0   # Control Input cost weight
w2 = 20.5 # Orientation of Payload cost weight
w3 = 100 # Angular Acceleartion of Payload cost weight
w4 = 50 # Linear Velocity of payload cost weight

In [None]:
data.sensor('angle_acc')

In [None]:
def reward(model: mujoco.MjModel, data: mujoco.MjData) -> float:
    # residual[0]: distance between payload and target
    payload_pos = data.sensor('p_pos').data
    target_pos = data.mocap_pos[0, :3]
    r0 = -np.linalg.norm(payload_pos - target_pos)

    # residual[1]: control input
    r1 = -np.dot(data.ctrl, data.ctrl)

    # residual[2]: orientation of payload
    payload_quat = data.sensor('p_quat').data
    normal_quat = np.asarray([0., 0., 0., 1.])
    subquat = np.zeros(3)
    mujoco.mju_subQuat(subquat, normal_quat, payload_quat)
    r2 = -np.dot(subquat, subquat)

    # residual[3]: angular acceleration
    payload_acc = data.sensor('angle_acc').data
    r3 = -np.dot(payload_acc, payload_acc)

    #return 100*r0 + 10*r1 + 30 * r2 + 30*r3
    return w0*r0 + 0*r1 + w2*r2 + w3*r3
    

In [None]:
horizon = 0.625
splinestep = 0.1
planstep = 0.01
nimprove = 40
nsample = 40
noise_scale = 0.1
interp = "linear"
planner = predictive_sampling.Planner(
    model,
    reward,
    horizon,
    splinestep,
    planstep,
    nsample,
    noise_scale,
    nimprove,
    interp=interp,
)

In [None]:
mujoco.mj_resetData(model, data)
steps = 2000

In [None]:
data.mocap_pos[0, :3] = np.array([0.6, 0.1, 0.4]) # set a closer goal

In [None]:
data.mocap_pos[0, :3] = np.array([0.4, 0.4, 0.3])

In [None]:
qpos = [data.qpos]
qvel = [data.qvel]
act = [data.act]
ctrl = []
rewards = []

In [None]:
# frames
frames = []
FPS = 1.0 / model.opt.timestep

In [None]:
VERBOSE = False

In [None]:
renderer = mujoco.Renderer(model)

In [None]:
from mujoco import viewer
import time

start_time = time.time()
with viewer.launch_passive(model, data) as viewer:
    start = time.time()
    while viewer.is_running() and time.time() - start_time < 60:
        #mujoco.mj_forward(model, data)
        step_start = time.time()

        ##mujoco.mj_step(model, data)
        ## run predictive sampling
        # improve policy
        planner.improve_policy(
              data.qpos, data.qvel, data.act, data.time, data.mocap_pos, data.mocap_quat
        )
        # get action from policy
        data.ctrl = planner.action_from_policy(data.time)
        # reward
        rewards.append(reward(model, data))
        
        print(" action: ", data.ctrl)
        
        mujoco.mj_step(model, data)

        
        viewer.sync()
        
        #time_until_next_step = model.opt.timestep - (time.time() - step_start)
        #if time_until_next_step > 0:
        #    time.sleep(time_until_next_step)



In [None]:
from mujoco_mpc import agent as agent_lib
import mujoco_mpc as mjpc

In [None]:
data.mocap_pos[0][0]

In [None]:
for _ in range(steps):
  ## predictive sampling

  # improve policy
  planner.improve_policy(
      data.qpos, data.qvel, data.act, data.time, data.mocap_pos, data.mocap_quat
  )

  # get action from policy
  data.ctrl = planner.action_from_policy(data.time)

  # reward
  rewards.append(reward(model, data))

  if VERBOSE:
    print("time  : ", data.time)
    print(" qpos  : ", data.qpos)
    print(" qvel  : ", data.qvel)
    print(" act   : ", data.act)
    print(" action: ", data.ctrl)
    print(" reward: ", rewards[-1])
  else:
    print(" action: ", data.ctrl)
      
  # step
  mujoco.mj_step(model, data)

  # history
  qpos.append(data.qpos)
  qvel.append(data.qvel)
  act.append(data.act)
  ctrl.append(ctrl)

  # render and save frames
  renderer.update_scene(data)
  pixels = renderer.render()
  frames.append(pixels)

In [None]:
print("\nfinal qpos: ", qpos[-1])
print("goal state : ", data.mocap_pos[0, 0:3])
print("state error: ", np.linalg.norm(qpos[-1][0:2] - data.mocap_pos[0, 0:2]))
# %%
#media.show_video(frames, fps=FPS)

In [None]:
media.show_video(frames, fps=FPS)

In [None]:
renderer.close()