<a href="https://colab.research.google.com/github/pidipidi/cs577_RLI/blob/master/assignment_1/problem_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Problem 3 - Real-world robot demonstrations using a Baxter robot

This notebook is an implementation of **Dynamic Movement Primitives (DMPs)** within a `robomimic` simulation environment. The implementation is built on top of the [robomimic tutorial](https://colab.research.google.com/drive/1b62r_km9pP40fKF0cBdpdTO2P_2eIbC6?usp=sharing).

## Environment Setup

In [None]:
WS_DIR = "/content/"
%cd $WS_DIR

# Clone the repo and install the basic requirements
!git clone --branch v0.4 https://github.com/ARISE-Initiative/robomimic
!pip install -e robomimic/ > /dev/null

import sys
import os
sys.path.append('./robomimic/')

In [None]:
# install all system dependencies for mujoco-py
!sudo DEBIAN_FRONTEND=noninteractive apt install curl git libgl1-mesa-dev libgl1-mesa-glx libglew-dev \
         libosmesa6-dev software-properties-common net-tools unzip vim \
         virtualenv wget xpra xserver-xorg-dev libglfw3-dev patchelf > /dev/null

#install mujoco-py
!pip install mujoco==3.3.0 > /dev/null

#install robosuite
!pip install robosuite > /dev/null

In [None]:
import mujoco

mujoco.__version__

In [None]:
# import all utility functions
import os
import json
import h5py
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader

import robomimic
import robomimic.utils.obs_utils as ObsUtils
import robomimic.utils.torch_utils as TorchUtils
import robomimic.utils.test_utils as TestUtils
import robomimic.utils.file_utils as FileUtils
import robomimic.utils.train_utils as TrainUtils
from robomimic.utils.dataset import SequenceDataset

from robomimic.config import config_factory
from robomimic.algo import algo_factory

# for rendering mujoco in colab, you need turn on egl
os.environ['MUJOCO_GL'] = 'osmesa' # if gpu possible, use 'egl', if not, use 'osmesa'

# the dataset registry can be found at robomimic/__init__.py
from robomimic import DATASET_REGISTRY, HF_REPO_ID

# set download folder and make it
download_folder = WS_DIR + "robomimic_data/"
os.makedirs(download_folder, exist_ok=True)

# download the dataset
task = "lift"
dataset_type = "ph"
hdf5_type = "low_dim"
FileUtils.download_file_from_hf(
    repo_id=HF_REPO_ID,
    filename=DATASET_REGISTRY[task][dataset_type][hdf5_type]["url"],
    download_dir=download_folder,
)

# enforce that the dataset exists
dataset_path = os.path.join(download_folder, "low_dim_v15.hdf5")
assert os.path.exists(dataset_path)

In [None]:
import robomimic.utils.env_utils as EnvUtils
from robosuite import load_composite_controller_config, load_part_controller_config, make
from robosuite.environments.robot_env import RobotEnv
from robosuite.models.tasks import Task
from robosuite.models.arenas.empty_arena import EmptyArena
import matplotlib.pyplot as plt

class Free(RobotEnv):
    def __init__(self, robots, controller_configs):
        super().__init__(robots=robots, controller_configs=controller_configs)

    @property
    def _visualizations(self):
        """
        Visualization keywords for this environment

        Returns:
            set: All components that can be individually visualized for this environment
        """
        vis_set = super()._visualizations
        vis_set.add("grippers")
        return vis_set

    def reward(self, action):
        return 0

    def _load_model(self):
        super()._load_model()

        self.robots[0].robot_model.set_base_xpos([-1, 0, 0])

        mujoco_arena = EmptyArena()
        mujoco_arena.set_origin([0, 0, 0])

        self.model = Task(
            mujoco_arena = mujoco_arena,
            mujoco_robots = [robot.robot_model for robot in self.robots],
        )

    def _check_robot_configuration(self, robots):
        pass

custom_part_config1 = load_part_controller_config(default_controller = "JOINT_POSITION")
custom_part_config1["input_type"] = "absolute"
custom_part_config1["input_ref_frame"] = "base"
custom_part_config1["gripper"] = {'type':'GRIP'}
custom_part_config1['control_freq']=20

custom_part_config2 = load_part_controller_config(default_controller = "JOINT_POSITION")
custom_part_config2["input_type"] = "absolute"
custom_part_config2["input_ref_frame"] = "base"
custom_part_config2["gripper"] = {'type':'GRIP'}
custom_part_config2['control_freq']=20

controller_config = load_composite_controller_config(controller="BASIC")
controller_config['body_parts'] = {'right': custom_part_config1, 'left': custom_part_config2}
print(controller_config.keys())
controller_config['control_freq']=20

dummy_spec = dict(
    obs=dict(
        low_dim=[],
        rgb=[],
    ),
)
ObsUtils.initialize_obs_utils_with_obs_specs(obs_modality_specs=dummy_spec)

env = Free(robots=["Baxter"], controller_configs=controller_config)

### Download Demonstration Dataset

We download a pouring motion demo from the MIME dataset. (you can download other demo. if you want.)

In [None]:
def extract_data():
    """
    Extract a pouring motion demo from the MIME dataset. (you can download other demo. if you want.)
    """
    cur_dir = os.getcwd().split('/')[-1]
    assert cur_dir=='content', "Run the program on the assignment_1 folder. Current directory is {}".format(cur_dir)

    if os.path.isdir('dataset') is False:
        os.mkdir('dataset')
    os.chdir( 'dataset' )

    url = 'https://www.dropbox.com/sh/wmyek0jhrpm0hmh/AADAO2L1qN5BwOBthyMG82ima/4315Aug02?dl=0.zip'
    if os.path.isfile(url.split('/')[-1]) is False:
        os.system('wget '+url)

    if os.path.isfile('joint_angles.txt') is False:
        os.system('unzip 4315Aug02?dl=0.zip')

    print (os.getcwd())

    data = []
    for line in open('joint_angles.txt', 'r'):
        data.append( json.loads(line))

    # Get left/right arm trajectories
    joint_names = ['s0', 's1', 'e0', 'e1', 'w0', 'w1', 'w2']

    l_arm_traj = []
    r_arm_traj = []
    for d in data:
        v = []
        for name in joint_names:
            v.append(d['left_'+name])
        l_arm_traj.append(v)

        v = []
        for name in joint_names:
            v.append(d['right_'+name])
        r_arm_traj.append(v)

    os.chdir( '..' )
    return np.swapaxes(r_arm_traj, 0, 1)

We also provide an auxiliary visualization function to compare demonstrated and reproduced trajectories.

In [None]:
def plot_traj(trajs, trajs_demo=None):
    """
    """

    fig = plt.figure()

    for i, traj in enumerate(trajs):
        fig.add_subplot(len(trajs), 1, i+1)
        plt.plot(traj, label=str(i))

        if trajs_demo is not None:
            if len(np.shape(trajs_demo))==2:
                plt.plot(trajs_demo[i], 'r-', label=str(i))
            else:
                plt.plot(trajs_demo[0][i], 'r-', label=str(i))

    plt.show()

## A. Train improved DMPs with Joint-Space Demonstration.

In this sub-problem, we will re-use the improved Dynamic Movement Primitives (DMPs) to learn and reproduce a robot demonstration trajectory in joint space.

### A.1 Improved DMP Formulation

Now, let’s formulate the improved DMP equations:

- **Canonical System:**  
  <div align="center">
  $
  \tau \dot{s} = -\alpha s
  $
  </div>

- **Nonlinear Function:**  
  <div align="center">
  $
  f(s) = \frac{\sum_i w_i \, \phi_i(s)\, s}{\sum_i \phi_i(s)}
  $
  </div>  

- **Transformation System:**  
  <div align="center">
  $
  \tau \dot{v} = K(g - x) - Dv + K(g - x_0)s + Kf, \quad \tau \dot{x} = v
  $
  </div>
  
Based on these equations, please complete the code blocks below to implement the improved DMP formulation. Note that you can copy the code for the improved DMP's transformation system and DMP class code.

In [None]:
class OriginalFormulation(object):

    def __init__(self, K=50., D=None):
        self.K = K
        if D is None:
            D = 2.0 * np.sqrt(self.K)
        self.D = D

    def acceleration(self, x, dx, start, goal, tau, f, s):
        ''' return acceleration '''
        #------------------------------------------------------------
        # Place your code here
        return
        #------------------------------------------------------------

    def fs(self, x, dx, ddx, start, goal, tau, s):
        ''' return nonlinear function value '''
        #------------------------------------------------------------
        # Place your code here
        return
        #------------------------------------------------------------

class ImprovedFormulation(object):

    def __init__(self, K=50., D=None):
        self.K = K
        if D is None:
            D = self.K/4.
        self.D = D

    def acceleration(self, x, dx, start, goal, tau, f, s):
        ''' return acceleration '''
        #------------------------------------------------------------
        # Place your code here
        return
        #------------------------------------------------------------

    def fs(self, x, dx, ddx, start, goal, tau, s):
        ''' return nonlinear function value '''
        #------------------------------------------------------------
        # Place your code here
        return
        #------------------------------------------------------------

In [None]:
class DMPs_discrete(object):

    def __init__(self, dims, bfs, dt=.01, tau=1., alpha=14, enable_improved=False, **kwargs):
        '''
        dims int: number of dynamic motor primitives
        bfs int: number of basis functions per DMP
        dt float: timestep for simulation
        tau float: scales the timestep
                   increase tau to make the system execute faster
        alpha float: canonical system parameter
        '''

        self.dmps = dims
        self.bfs  = bfs
        self.dt   = dt
        self.tau  = tau
        self.alpha = alpha/2.0
        self.alpha_x = alpha

        self.prep_centers_and_variances()

        if enable_improved is False:
            self.formulation = OriginalFormulation()
        else:
            self.formulation = ImprovedFormulation()

        return


    def prep_centers_and_variances(self):
        '''
        Set the centre of the Gaussian basis functions be spaced evenly
        throughout run time.
        '''
        self.c = np.zeros(self.bfs)
        self.h = np.zeros(self.bfs)

        t = np.linspace(0,1,self.bfs) *0.5

        # From DMP matlab code
        self.c = np.exp(-self.alpha_x*t)
        self.D = (np.diff(self.c)*0.55)**2
        self.D = np.append(self.D, self.D[-1])
        self.D = 1/self.D
        self.h = np.ones(self.bfs)*0.5


    def gen_psi(self, x):
        '''
        Generates the activity of the basis functions for a given state of the
        canonical system.

        x float: the current state of the canonical system
        '''
        if isinstance(x, np.ndarray):
            x = x[:,None]
        return np.exp(-self.h * (x - self.c)**2 * self.D)


    def gen_phase(self, n_steps, tau=None):
        """
        Generate phase for open loop movements.

        n_steps int: number of steps
        """
        if tau is None: tau = self.tau
        return np.exp(-self.alpha/tau * np.linspace(0, 1, n_steps))


    def learn(self, y_des):
        """
        Encode a set of weights from the input trajectories.

        y_des list/array: the desired trajectories of each DMP
                          should be shaped [dmps, run_time]
        """
        # Set variables
        n_samples, dims, n_steps = np.shape(y_des)
        self.n_steps = n_steps
        assert dims==self.dmps, "wrong dimensions"

        # Get start and goal
        self.y0   = np.mean(y_des[:,:,0], axis=0)
        self.goal = np.mean(y_des[:,:,-1], axis=0)

        # Calculate yd_des, ydd_des
        yd_des = np.diff(y_des) / self.dt
        yd_des = np.concatenate((np.zeros((n_samples, self.dmps, 1)), yd_des), axis=2)

        ydd_des = np.diff(yd_des) / self.dt
        ydd_des = np.concatenate((np.zeros((n_samples, self.dmps, 1)), ydd_des), axis=2)

        # Get a canonical system
        x_track = self.gen_phase(n_steps)

        #------------------------------------------------------------
        # Place your code here

        # Calculate f
        f_des =




        # Calculate weights
        psi_track =

        x_track   =
        psi_track =
        f_des     =
        f_des     =

        self.w =





        #------------------------------------------------------------

        # set up tracking vectors
        y_track   = np.zeros((self.dmps, n_steps))
        yd_track  = np.zeros((self.dmps, n_steps))
        ydd_track = np.zeros((self.dmps, n_steps))

        y   = self.y0.copy()
        yd  = np.zeros(self.dmps)
        ydd = np.zeros(self.dmps)

        #------------------------------------------------------------
        # Place your code here
        x_track   =
        psi_track =

        f =



        # Recover the demonstration using the learned weights (for confirmation)
        for t in range(n_steps):




            # record timestep
            y_track[:,t] =
            yd_track[:,t] =
            ydd_track[:,t] =
        #------------------------------------------------------------

        return y_track, yd_track, ydd_track


    def plan(self, y0=None, goal=None, **kwargs):
        '''
        Run the DMP system within a specific period.

        y0   list/array: start position
        goal list/array: goal position
        tau  float:      scales the timestep
                         increase tau to make the system execute faster
        '''

        if y0 is None: y0 = self.y0
        if goal is None: goal = self.goal
        n_steps = int(self.n_steps/self.tau)

        # set up tracking vectors
        y_track   = np.zeros((self.dmps, n_steps))
        yd_track  = np.zeros((self.dmps, n_steps))
        ydd_track = np.zeros((self.dmps, n_steps))
        x_track   = self.gen_phase(n_steps, self.tau)

        #------------------------------------------------------------
        # Place your code here
        y   =
        yd  =
        ydd =

        for t in range(n_steps):









            # record timestep
            y_track[:,t] =
            yd_track[:,t] =
            ydd_track[:,t] =
        #------------------------------------------------------------

        return y_track, yd_track, ydd_track


    def plot_traj(self, trajs_demo, trajs_gen, axis_num=0):
        """Plot trajectories over an axis """

        fig = plt.figure()
        plt.title('Trajectory (X) - Demo (Td) and generated (Tg)')
        for i in range(len(trajs_demo)):
            plt.plot(trajs_demo[i,axis_num,:], 'r--', label='Td')
        for i in range(len(trajs_gen)):
            plt.plot(trajs_gen[i,axis_num,:],'g-', label='Tg')

        plt.legend()
        plt.show()

    def plot_basis(self):
        """Plot basis functions """
        fig = plt.figure()

        x = self.gen_phase(200)

        for idx in range(len(self.c)):
            psi = self.gen_psi(x)
            plt.plot(x, psi)

        plt.show()

    def plot_f(self, f, f_des=None):
        """Plot nonlinear functions """

        fig = plt.figure()
        plt.plot(f)
        if f_des is not None:
            plt.plot(f_des, '--')
        plt.show()

    def plot_canonical_sys(self):
        """Plot the phase change in the canonical system """
        x = self.gen_phase(200)

        fig = plt.figure()
        plt.plot(x)
        plt.show()

### A.2 Training and Reproduction of Joint-Space Demonstration

We train the improved DMPs with the selected joint-space demonstration trajectory. Please, plot the reproduced trajectories with the demonstrated trajectory with the same goal and start.

In [None]:
def problem_3a1(enable_plot=True):
    traj = extract_data()

    dims      = len(traj)
    bfs       = 30
    tau       = 1.
    freq      = 100
    duration  = 1.
    dt        = 1./freq

    traj_demo = np.expand_dims(traj, axis=0)

    # Learn via DMP original/improved
    dmp = DMPs_discrete(dims=dims, bfs=bfs, tau=tau, dt=dt,
                            enable_improved=True)
    traj, _, _ = dmp.learn(traj_demo)
    traj, _, _ = dmp.plan()

    if enable_plot: plot_traj(traj, traj_demo)
    return traj


Now, let's confirm if our DMP could reproduce the demonstrated trajectory.

In [None]:
r_traj_data = problem_3a1(enable_plot=True)

###  A.3 Reproduction on the Baxter robot

Let's enable the simulated Baxter robot to track the reproduced trajectory.

In [None]:
import imageio
from copy import deepcopy
import numpy as np
import os

os.environ["MUJOCO_GL"] = 'osmesa'

# create a video writer
video_path = "rollout_A.mp4"
video_writer = imageio.get_writer(video_path, fps=20)

ob_dict = env.reset()
init_joint_pos = ob_dict["robot0_joint_pos"]

# initialize the starting configuration
for i in range(200):
  action = np.concatenate([r_traj_data[:,0], init_joint_pos[7:], np.zeros(2)], axis=-1)
  ob_dict, _, _, _ = env.step(action)


#------------------------------------------------------------
# Place your code here

# follow the reproduced trajectory
traj = []
traj_des = []
for i in range(1,len(r_traj_data[0]),5): # 20Hz
    action =
    ob_dict, _, _, _ = env.step(action)

    cur_joint_pos = ob_dict["robot0_joint_pos"]
    traj.append(cur_joint_pos[:7])
    traj_des.append(r_traj_data[:,i])

    frame = env.sim.render(height=512, width=512, camera_name="frontview")[::-1]
    video_writer.append_data(frame)

#------------------------------------------------------------

video_writer.close()

Plot the desired and simulated robot trajectories.

In [None]:
plot_traj(np.array(traj).T, np.array(traj_des).T)

Visulize the simulated robot's rollout video.

In [None]:
# visualize rollout video
from IPython.display import HTML
from base64 import b64encode

mp4 = open(video_path, "rb").read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML(f"""
<video width=400 controls>
      <source src="{data_url}" type="video/mp4">
</video>
""")

## B. Train improved DMPs with Cartesian-Space Demonstration.

Next, in this sub-problem, we train the improved DMP using a Cartesian-space demonstration and reproduce the motion using a Baxter robot.

### B.1 Robot Kinematics Class Construction

We define an auxiliary class to perform forward and inverse kinematics.

In [None]:
from robosuite.utils.transform_utils import mat2quat, quat2mat  # robosuite: xyzw
import mujoco as mj

class robot_kinematics(object):
    def __init__(self, env, arm='right'):
        self.env = env
        self.arm = arm
        self.robot = env.robots[0]

        if arm == 'right':
            self.qpos_idx = self.robot._ref_joint_pos_indexes[:7]
        else:
            self.qpos_idx = self.robot._ref_joint_pos_indexes[7:]

        self.eef_sid = self.robot.eef_site_id[arm] if isinstance(self.robot.eef_site_id, dict) else self.robot.eef_site_id
        self.ndof = len(self.qpos_idx)

        # Find the base body id
        first_qi = int(self.qpos_idx[0])
        jids = np.where(env.sim.model.jnt_qposadr == first_qi)[0]
        first_jid = int(jids[0])
        self.base_body_id = int(env.sim.model.jnt_bodyid[first_jid])

        # Find the joint limits
        self.joint_low, self.joint_high = self._joint_limits_from_qpos_idx()

        m = self.env.sim.model
        base_name = m.body_id2name(self.base_body_id)
        eef_name  = m.site_id2name(self.eef_sid)
        print(f"[{self.arm}] base_body={base_name}, eef_site={eef_name}")

    @staticmethod
    def _quat_mul_xyzw(q1, q2):
        x1,y1,z1,w1 = q1; x2,y2,z2,w2 = q2
        return np.array([
            w1*x2 + x1*w2 + y1*z2 - z1*y2,
            w1*y2 - x1*z2 + y1*w2 + z1*x2,
            w1*z2 + x1*y2 - y1*x2 + z1*w2,
            w1*w2 - x1*x2 - y1*y2 - z1*z2
        ])

    @staticmethod
    def _quat_inv_xyzw(q):
        x,y,z,w = q
        return np.array([-x, -y, -z, w])  # 단위쿼터니언 가정

    @staticmethod
    def _rotvec_from_quats_xyzw(q_now, q_tgt):
        """q_err = q_tgt * inv(q_now) → axis-angle (회전벡터)"""
        q_err = robot_kinematics._quat_mul_xyzw(q_tgt, robot_kinematics._quat_inv_xyzw(q_now))
        x,y,z,w = q_err
        v = np.array([x,y,z], dtype=float)
        n = np.linalg.norm(v)
        if n < 1e-12:
            return np.zeros(3)
        axis = v / n
        angle = 2.0 * np.arctan2(n, w)
        return axis * angle  # 3D 회전벡터

    def get_arm_base_body_id(self, arm: str) -> int:
        """ Find a base body id """
        m = self.env.sim.model
        # 전역 joint name -> id 매핑
        name_to_jid = {name: i for i, name in enumerate(m.joint_names)}
        # 해당 팔의 관절명 목록에서 첫 관절의 body를 베이스로 사용
        first_jn = self.robot.joint_names[arm][0]
        jid = name_to_jid[first_jn]
        return int(m.jnt_bodyid[jid])

    def _joint_limits_from_qpos_idx(self):
        """각 qpos 인덱스에 대응하는 hinge/slide joint의 범위를 모아 반환"""
        d, m = self.env.sim.data, self.env.sim.model

        lows, highs = [], []
        for qi in self.qpos_idx:
            jids = np.where(m.jnt_qposadr == int(qi))[0]
            assert len(jids) > 0, "qpos→joint 매핑 실패"
            jid = int(jids[0])
            lo, hi = m.jnt_range[jid]
            if hi > lo:  # 유한 범위
                lows.append(lo); highs.append(hi)
            else:       # 무한(자유)처럼 정의 → 넓게 허용
                lows.append(-1e9); highs.append(1e9)
        return np.asarray(lows), np.asarray(highs)

    def fk_request(self, q):
        """ Foward Kinematics """
        d, m = self.env.sim.data, self.env.sim.model
        d.qpos[self.qpos_idx] = q
        d.qvel[...] = 0.0
        self.env.sim.forward()

        # World to EE
        p_w = d.site_xpos[self.eef_sid].copy()
        R_w = d.site_xmat[self.eef_sid].reshape(3,3).copy()
        q_w = mat2quat(R_w)  # (x,y,z,w)
        return p_w, q_w

        # World to Base
        #p_bw = d.xpos[self.base_body_id].copy()
        #R_bw = d.xmat[self.base_body_id].reshape(3,3).copy()

        # Base to EE
        #R_rel = R_bw.T @ R_w
        #p_rel = R_bw.T @ (p_w - p_bw)
        #q_rel = mat2quat(R_rel)

        #return p_rel, q_rel

    def ik_request(
        self,
        target_pos,                  # (3,) in world frame
        target_quat=None,            # (4,) xyzw in world frame; if None -> position-only IK
        q_init=None,               # 초기 관절값(ndof,) 없으면 현재 값을 사용
        max_iters=200,
        pos_tol=1e-4,
        ori_tol=1e-3,                # radians
        pos_w=1.0,
        ori_w=1.0,
        damping=1e-3,
        step_size=1.0,
        enforce_limits=True,
    ):
        """ Inverse Kinematics """
        d, m = self.env.sim.data, self.env.sim.model

        # initial configuration for jacobian
        if q_init is None:
            q_arm = d.qpos[self.qpos_idx].copy()
        else:
            q_arm = np.asarray(q_init, dtype=float).copy()
            assert q_arm.shape == (self.ndof,)


        # 조인트 제한
        if enforce_limits:
            joint_low, joint_high = self.joint_low, self.joint_high
        else:
            joint_low = np.full(self.ndof, -1e9); joint_high = np.full(self.ndof, 1e9)

        # 자코비안 버퍼 (MuJoCo 요구 스펙: float64, C-order, (3, nv))
        Jp = np.zeros((3, m.nv), dtype=np.float64, order="C")
        Jr = np.zeros((3, m.nv), dtype=np.float64, order="C")

        # robosuite 래퍼면 원본으로 언랩
        if hasattr(m, "_model"): mjm = m._model
        else: mjm = m
        if hasattr(d, "_data"):  mjd = d._data
        else: mjd = d

        for _ in range(max_iters):
            # 시뮬레이터 상태 반영
            d.qpos[self.qpos_idx] = q_arm
            d.qvel[self.qpos_idx] = 0.0
            self.env.sim.forward()

            # 현재 EEF 포즈
            p_now = d.site_xpos[self.eef_sid].copy()
            R_now = d.site_xmat[self.eef_sid].reshape(3,3).copy()
            q_now = mat2quat(R_now)  # xyzw

            # 오차
            dp = target_pos - p_now
            if target_quat is None:
                e = pos_w * dp                                # (3,)
                mj.mj_jacSite(mjm, mjd, Jp, None, self.eef_sid)
                J = pos_w * Jp[:, self.qpos_idx]                   # (3, ndof)
            else:
                drot = robot_kinematics._rotvec_from_quats_xyzw(q_now, target_quat)   # (3,)
                e = np.hstack([pos_w * dp, ori_w * drot])            # (6,)
                mj.mj_jacSite(mjm, mjd, Jp, Jr, self.eef_sid)
                J = np.vstack([pos_w * Jp[:, self.qpos_idx], ori_w * Jr[:, self.qpos_idx]])  # (6, ndof)

            # 수렴 체크
            pos_err = np.linalg.norm(dp)
            ori_err = 0.0 if target_quat is None else np.linalg.norm(drot)
            if (pos_err < pos_tol) and (target_quat is None or ori_err < ori_tol):
                return q_arm.copy(), True

            # DLS: dq = (J^T J + λ^2 I)^-1 J^T e
            JT = J.T
            H  = JT @ J + (damping**2) * np.eye(self.ndof)
            g  = JT @ e
            dq = np.linalg.solve(H, g)

            # 업데이트 + 제한
            q_arm = np.clip(q_arm + step_size * dq, joint_low, joint_high)

        return q_arm.copy(), False

### B.2 Training and Reproduction of Cartesian-Space Demonstration

We train the improved DMPs with the selected joint-space demonstration trajectory. Please, plot the reproduced trajectories with the demonstrated trajectory with the same goal and start.

In [None]:
def problem_3bc(env, arm='right', goal=None, video_path="rollout.mp4"):
    """
    Check if you want better orientation-based DMP:
    Ude et al. Orientation in cartesian space dynamic movement primitives. In IEEE International Conference on Robotics and Automation (ICRA), 2014.
    """
    dims      = 7
    bfs       = 30
    tau       = 1.
    freq      = 100
    duration  = 1.
    dt        = 1./freq
    sample_scale = 20
    r_traj_data  = extract_data()

    # FK, IK solvers for Baxter
    rk = robot_kinematics(env, arm=arm)

    #------------------------------------------------------------
    # Place your code here
    # make a list of x,y,z,qx,qy,qz,qw




    pose_list =
    pose_traj_demo =
    #------------------------------------------------------------

    # Learn via DMP original/improved
    dmp = DMPs_discrete(dims=dims, bfs=bfs, tau=tau, dt=dt,
                            enable_improved=True)
    _, _, _ = dmp.learn( pose_traj_demo ) #[:,:6,:] )

    # setting a goal
    pose_traj, _, _ = dmp.plan(goal=goal)

    # normalize the quaternions
    pose_traj[3:] /= np.sum( pose_traj[3:]**2, axis=0)

    # conver the pos+quaternion trajectory to pose list
    pose_list = []
    for i in range(len(pose_traj[0])):
        # reduce the number of samples (OPTION)
        if i%sample_scale==0:
            pose_list.append( pose_traj[:,i] )

    # Command Current Joint Positions first
    # create a video writer
    video_writer = imageio.get_writer(video_path, fps=20)

    ob_dict = env.reset()

    # initialize the starting configuration
    env.sim.data.qpos[rk.qpos_idx] = r_traj_data[:,0]
    env.sim.data.qvel[rk.qpos_idx] = 0.0
    env.sim.forward()

    #------------------------------------------------------------
    # Place your code here

    # follow the reproduced trajectory
    init_joint_pos = ob_dict["robot0_joint_pos"].copy()
    traj = []
    traj_des = []

    for i in range(1,len(pose_traj[0]),20): # 5Hz
        q_ret

        action =
        ob_dict, _, _, _ = env.step(action)

        cur_joint_pos = ob_dict["robot0_joint_pos"][:7]
        traj.append(cur_joint_pos)
        traj_des.append(q_ret)

        frame = env.sim.render(height=512, width=512, camera_name="frontview")[::-1]
        video_writer.append_data(frame)
    #------------------------------------------------------------

    video_writer.close()

    return pose_traj, pose_traj_demo

In [None]:
video_path = "rollout_B.mp4"

pose_traj, pose_traj_demo = problem_3bc(env, video_path=video_path)

### B.3 Reproduction on the Baxter robot

Let's run the simulated Baxter robot to track the reproduced trajectory.

In [None]:
# visualize rollout video
mp4 = open(video_path, "rb").read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML(f"""
<video width=400 controls>
      <source src="{data_url}" type="video/mp4">
</video>
""")

You can now check if the robot was able to track the reproduced trajectory.

In [None]:
plot_traj(pose_traj, pose_traj_demo)

## C. Adapt the learned DMPs with different goals.

### C.1 Training and Reproduction of Cartesian-Space Demonstration

We provide a new goal to show the adapation performance of the improved DMPs.

In [None]:
video_path = "rollout_C.mp4"

rk = robot_kinematics(env, arm='right')
pos, quat = rk.fk_request(r_traj_data[:,-1])
pos[1] += 0.1
goal = np.concatenate([pos, quat])
print("New goal:", goal)

pose_traj, pose_traj_demo = problem_3bc(env, goal=goal, video_path=video_path)

### C.2 Reproduction on the Baxter robot

Let's run the simulated Baxter robot to track the reproduced trajectory.

In [None]:
# visualize rollout video
mp4 = open(video_path, "rb").read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML(f"""
<video width=400 controls>
      <source src="{data_url}" type="video/mp4">
</video>
""")

You can now check if the robot was able to track the reproduced trajectory.

In [None]:
plot_traj(pose_traj, pose_traj_demo)

Now, you can try the same thing for the original formulation of DMPs below.