
# ManiSkill Mini‑Project — Pose Estimation (P1) & Imitation Learning (P2)

This is a **student-facing** assignment notebook. Implement all code in regions marked:

```
# YOUR CODE HERE
# CODE ENDS HERE
```

**Recommended platform:** Google Colab with GPU (or any CUDA-enabled machine). CPU-only will be slow.

> You will see **Reading** callouts with suggested references for each section.



## 0) Environment Setup (Colab)

If you're on **Google Colab**, run the cell below (uncomment the `pip` commands first). Re-run the runtime if prompted after install.

**Reading:** Gymnasium vectorized envs overview; ManiSkill setup notes.


In [None]:

# Uncomment when running on Colab.
# !pip install --upgrade pip
# !pip install mani_skill==3.* gymnasium torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
# !pip install mplib transforms3d h5py tqdm imageio[ffmpeg]

import torch, sys
print("Python:", sys.version)
print("CUDA available:", torch.cuda.is_available())



---
# Part 1 — RGB → Pose (PickCube)

**Goal:** Collect RGB frames and cube poses from `PickCube-v1`, build a dataset, implement a small CNN regressor (position + quaternion), train it, and then use predictions in a simple plan‑execute routine.

**Readings**
- ManiSkill observation structure & `obs_mode` (how RGB & state are organized).  
- Gymnasium vectorized envs (why batching speeds things up):   
- (Background) Unit quaternions for rotation representations and why normalization matters: https://stackoverflow.com/questions/8919086/why-are-quaternions-used-for-rotations and https://en.wikipedia.org/wiki/Quaternions_and_spatial_rotation
- Quickstart notes on task options: https://maniskill.readthedocs.io/en/latest/user_guide/getting_started/quickstart.html




## 1.0) Smoke Test — Create Env & Inspect Observations

Create a small vectorized `PickCube-v1` environment, reset once, and print the top-level keys in the observation as well as the base camera keys.

**Deliverable:** print statements confirming the keys and close the env.


In [None]:

import mani_skill
import gymnasium as gym

# YOUR CODE HERE ---------------------------------------------------------------
# 1) Create env with a small num_envs (e.g., 4)
# 2) Reset once to get an observation dict
# 3) Print: top-level obs keys and base camera subkeys
# 4) Close the env
# CODE ENDS HERE ---------------------------------------------------------------



## 1.1) Data Collection

Collect a dataset of **RGB frames** and **cube poses** from `PickCube-v1`.

- Aim for ~**4096** samples total (use a **vectorized** env; adjust lower if memory-limited).
- Per reset:
  - read RGB from `obs["sensor_data"]["base_camera"]["rgb"]` with shape `[B, H, W, 3]` (uint8),
  - read cube pose from `env.unwrapped.cube.pose.raw_pose` with shape `[B, 7]` (xyz + quaternion).
- Accumulate batches until you reach your target, then concatenate to:
  - `images: torch.Tensor [N, H, W, 3]` (on CPU)
  - `poses:  torch.Tensor [N, 7]` (on CPU)

**Tip:** Accessing tensors from the simulator and moving them between devices can require careful cloning.

**Reading**
- ManiSkill observation & wrappers overview (how obs are structured): https://maniskill.readthedocs.io/en/latest/user_guide/concepts/observation.html.
- Gymnasium vector envs API (batched `reset`/`step` semantics): https://gymnasium.farama.org/api/vector/.


In [None]:

import gymnasium as gym
import torch

num_envs = 128
total_samples = 4096

# YOUR CODE HERE ---------------------------------------------------------------
# 1) Create a vectorized PickCube env with obs_mode="rgb" and render_mode="rgb_array"
# 2) Loop: reset, extract batch RGB and cube raw pose, append CPU copies to buffers
# 3) Stop when collected >= total_samples
# 4) Concatenate buffers into `images` and `poses` tensors
images = None
poses  = None
# CODE ENDS HERE ---------------------------------------------------------------

# Basic checks
assert images is not None and poses is not None, "You must populate images and poses."
assert images.shape[0] == poses.shape[0], "Counts should match."
print("images:", images.shape, images.dtype)
print("poses:",  poses.shape,  poses.dtype)



## 1.2) Dataset Class

Implement a PyTorch `Dataset` that returns `(img, pose)` where:
- `img`: `[3, H, W]` in `[0,1]` (`float32`), and
- `pose`: `[7]` (`float32`).

**Reading**
- PyTorch: Writing custom `Dataset` / `DataLoader` and transforms (normalization, channel order): https://docs.pytorch.org/tutorials/beginner/basics/data_tutorial.html and https://docs.pytorch.org/tutorials/beginner/data_loading_tutorial.html


In [None]:

from torch.utils.data import Dataset
import torch

class RGBPoseEstimationDataset(Dataset):
    def __init__(self, images, poses):
        # YOUR CODE HERE -------------------------------------------------------
        # Store references to images and poses
        # CODE ENDS HERE -------------------------------------------------------

    def __len__(self):
        # YOUR CODE HERE -------------------------------------------------------
        return 0
        # CODE ENDS HERE -------------------------------------------------------

    def __getitem__(self, idx):
        # YOUR CODE HERE -------------------------------------------------------
        # 1) Fetch one HxWx3 uint8 image and one 7-dim pose
        # 2) Convert image to [3,H,W] float32 in [0,1]
        # 3) Convert pose to float32
        # 4) Return (image_tensor, pose_tensor)
        # CODE ENDS HERE -------------------------------------------------------
        raise NotImplementedError

# Instantiate the dataset
# YOUR CODE HERE ---------------------------------------------------------------
dataset = None
# CODE ENDS HERE ---------------------------------------------------------------
print("Dataset length:", len(dataset))



## 1.3) Pose Estimator

Design a small CNN that encodes the image to a feature vector, then predicts:
- **position** (3) and
- **orientation quaternion** (4), re‑normalized to **unit norm** before returning.

**Reading**
- Why unit quaternions for 3D orientation; consequences of not normalizing.
- Split-head design for pose regression: https://arxiv.org/pdf/1505.07427
- Multi-task learning & hard parameter sharing (shared trunk + per-task heads): https://www.ruder.io/multi-task/ and https://arxiv.org/pdf/1706.05098


In [None]:

import torch.nn as nn
import torch

class PoseEstimator(nn.Module):
    def __init__(self):
        super().__init__()
        # YOUR CODE HERE -------------------------------------------------------
        # Define: encoder + two heads (position, quaternion)
        # Keep the network small and fast.
        # CODE ENDS HERE -------------------------------------------------------

    def forward(self, x):
        # YOUR CODE HERE -------------------------------------------------------
        # 1) Encode
        # 2) Predict pos(3) and quat(4)
        # 3) Renormalize quaternion to unit length
        # 4) Concatenate [pos, quat] and return
        # CODE ENDS HERE -------------------------------------------------------
        raise NotImplementedError



## 1.4) Train Pose Regressor

- Use `DataLoader` to batch your dataset.
- Loss: **MSE** for positions and **MSE** for quaternions; combine via a weighted sum.
- Track and plot training loss over epochs.
- Save your best checkpoint if you like.

**Reading**
- PyTorch `DataLoader` basics (batching, shuffling): https://docs.pytorch.org/tutorials/beginner/basics/data_tutorial.html and https://docs.pytorch.org/docs/stable/data.html


In [None]:

from torch.utils.data import DataLoader
import torch.optim as optim
import matplotlib.pyplot as plt

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = PoseEstimator().to(device)

# YOUR CODE HERE ---------------------------------------------------------------
# 1) Create DataLoader
# 2) Define optimizer & regression losses
# 3) Training loop: forward, compute weighted losses, backward, step
# 4) Track `training_losses` for plotting
training_losses = []
# CODE ENDS HERE ---------------------------------------------------------------

plt.figure()
plt.plot(training_losses)
plt.title("Pose Training Loss"); plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.grid(True); plt.show()



## 1.5) Written — Representation & Transforms (P1.1)

In 4–8 sentences, explain your image and pose representations and any transforms you apply. Justify each design choice (e.g., scaling, channel order, quaternion normalization).



## 1.6) Plan & Execute with Predicted Pose

Use your trained pose model to pick up the cube and move toward the goal bin.

We provide environment setup and a `Planner`. Implement a function that:
1) builds a normalized image batch from the observation,  
2) predicts `[x,y,z,qx,qy,qz,qw]`,  
3) computes a feasible end‑effector orientation, and  
4) executes a simple approach → grasp → move routine.

**Reading**
- ManiSkill recording wrapper for videos: https://maniskill.readthedocs.io/en/latest/user_guide/wrappers/record.html
- (Background) SciPy `Rotation` utilities for quaternion math: https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.transform.Rotation.html


In [None]:

# === Setup (provided; do not modify) ===
import mani_skill.envs
import gymnasium as gym
import numpy as np
from mani_skill.utils.wrappers.record import RecordEpisode
import mplib
from transforms3d.euler import euler2quat, quat2euler

def create_env():
    env = gym.make("PickCube-v1", num_envs=1, obs_mode="rgb",
                   control_mode="pd_joint_pos", render_mode="rgb_array",
                   reward_mode="none", human_render_camera_configs=dict(shader_pack="default"))
    env = RecordEpisode(env, output_dir="pick_cube_mp", video_fps=20, info_on_video=False, save_trajectory=False)
    env.reset(seed=42)
    return env

env = create_env()
robot = env.unwrapped.agent.robot
link_names  = [link.get_name() for link in robot.get_links()]
joint_names = [joint.get_name() for joint in robot.get_active_joints()]
planner = mplib.Planner(
    urdf=env.unwrapped.agent.urdf_path,
    srdf=env.unwrapped.agent.urdf_path.replace(".urdf", ".srdf"),
    user_link_names=link_names,
    user_joint_names=joint_names,
    move_group="panda_hand_tcp",
    joint_vel_limits=np.ones(7)*0.8,
    joint_acc_limits=np.ones(7)*0.8,
)
planner.set_base_pose(np.concatenate([robot.pose.sp.p, robot.pose.sp.q]))


In [None]:

from scipy.spatial.transform import Rotation as R
import torch, numpy as np

def pick_cube_mp_solution(env, obs, goal_pos, model, device=None):
    # YOUR CODE HERE -----------------------------------------------------------
    # Implement a minimal routine:
    # - Build input batch from observation
    # - Predict pose
    # - Convert quaternions to a target EE orientation
    # - Plan & execute a short sequence with the planner and gripper control
    # Return anything useful if desired.
    # CODE ENDS HERE -----------------------------------------------------------
    raise NotImplementedError


In [None]:

# === Evaluation scaffold ===
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = PoseEstimator().to(device)
# Optionally: load your trained weights
# model.load_state_dict(torch.load("best_pose_model.pth", map_location=device))

EPISODES = 3
successes = 0
for i in range(EPISODES):
    obs, _ = env.reset(seed=i)
    goal_pos = obs["extra"]["goal_pos"].cpu().numpy()[0]
    try:
        pick_cube_mp_solution(env, obs, goal_pos, model, device=device)
    except NotImplementedError:
        print("Implement pick_cube_mp_solution first.")
        break
    successes += env.get_info()["success"].item()

print("Success rate:", successes/max(1, EPISODES))



---
# Part 2 — Imitation Learning (PushCube)

You will:
1) Inspect a demonstration dataset (`.h5` + `.json`),  
2) Build a `TrajectoryDataset` mapping **state → action**,  
3) Implement an `Actor` MLP and train via **behavior cloning**,  
4) Evaluate the policy in the environment (don’t rely on training loss).

**Reading**
- (Basics) What behavior cloning is and how it relates to supervised learning: https://rail.eecs.berkeley.edu/deeprlcourse/deeprlcourse/static/slides/lec-2.pdf
- Also check this one out from CMU: https://www.andrew.cmu.edu/course/10-403/slides/S19_lecture2_behaviorcloning.pdf


## 2.0) Load & Inspect Demo File

Load the `.h5` file and matching `.json` metadata. Print field names and shapes for one trajectory to understand the data layout.


In [None]:

import h5py, json
from mani_skill.utils.io_utils import load_json

dataset_file = "demos/PushCube-v1/motionplanning/trajectory.state.pd_joint_delta_pos.physx_cpu.h5"
data = h5py.File(dataset_file, "r")
json_path = dataset_file.replace(".h5", ".json")
json_data = load_json(json_path)

def load_h5_data(group):
    out = {}
    for k in group.keys():
        if isinstance(group[k], h5py.Dataset):
            out[k] = group[k][:]
        else:
            out[k] = load_h5_data(group[k])
    return out

# Preview one trajectory
for k in data.keys():
    print("Trajectory key:", k)
    traj = load_h5_data(data[k])
    for kk, vv in traj.items():
        if not isinstance(vv, dict):
            print("-", kk, getattr(vv, "shape", None))
    break



## 2.1) Eval Helpers (Provided)


In [None]:

import gymnasium as gym
from mani_skill.vector.wrappers.gymnasium import ManiSkillVectorEnv
from mani_skill.utils.wrappers.record import RecordEpisode
from collections import defaultdict
import torch

def create_eval_envs(json_data, num_envs=16, max_episode_steps=100, video_path=None):
    env_info = json_data["env_info"]
    env_id = env_info["env_id"]
    env_kwargs = dict(env_info["env_kwargs"])
    env_kwargs.pop("num_envs", None)
    env_kwargs.pop("sim_backend", None)
    eval_envs = gym.make(env_id, num_envs=num_envs, reconfiguration_freq=1, max_episode_steps=max_episode_steps, **env_kwargs)
    if video_path is not None:
        eval_envs = RecordEpisode(eval_envs, output_dir=video_path, save_trajectory=False, max_steps_per_video=max_episode_steps)
    eval_envs = ManiSkillVectorEnv(eval_envs, ignore_terminations=True, record_metrics=True)
    return eval_envs

def eval_policy(eval_envs, actor, verbose=1):
    obs, _ = eval_envs.reset()
    eval_metrics = defaultdict(list)
    while True:
        action = actor(obs)
        obs, rew, terminated, truncated, info = eval_envs.step(action)
        if truncated.any():
            for k, v in info["final_info"]["episode"].items():
                eval_metrics[k].append(v.float())
            break
    if verbose:
        for k in eval_metrics.keys():
            print(f"{k}_mean:", torch.mean(torch.stack(eval_metrics[k])).item())
    return eval_metrics



## 2.2) Build `TrajectoryDataset`

Create a dataset that flattens all trajectories into pairs of `(obs_t, action_t)`.

**Reading**
- PyTorch custom `Dataset` patterns (index mapping across multiple sequences): https://docs.pytorch.org/tutorials/beginner/data_loading_tutorial.html


In [None]:

from torch.utils.data import Dataset
import torch

class TrajectoryDataset(Dataset):
    def __init__(self, data, json_data):
        self.data = data
        self.json_data = json_data
        # YOUR CODE HERE -------------------------------------------------------
        # Build a flat index: list of (traj_key, t)
        # CODE ENDS HERE -------------------------------------------------------

    def __len__(self):
        # YOUR CODE HERE -------------------------------------------------------
        return 0
        # CODE ENDS HERE -------------------------------------------------------

    def __getitem__(self, idx):
        # YOUR CODE HERE -------------------------------------------------------
        # 1) Use index map to select a (traj_key, t)
        # 2) Read observation vector and action vector at time t
        # 3) Return dict with float32 tensors: {'obs': ..., 'action': ...}
        # CODE ENDS HERE -------------------------------------------------------
        raise NotImplementedError

# Instantiate
# YOUR CODE HERE ---------------------------------------------------------------
il_dataset = None
# CODE ENDS HERE ---------------------------------------------------------------

print("IL dataset length:", len(il_dataset))



## 2.3) Actor Network (MLP)

Implement a small MLP mapping observation vectors → action vectors.

**Reading**
- Behavior cloning (conceptually supervised learning on state→action).


In [None]:

import torch.nn as nn
import torch

class Actor(nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()
        # YOUR CODE HERE -------------------------------------------------------
        # Define a compact MLP
        # CODE ENDS HERE -------------------------------------------------------

    def forward(self, x):
        # YOUR CODE HERE -------------------------------------------------------
        # CODE ENDS HERE -------------------------------------------------------
        raise NotImplementedError



## 2.4) Sanity Check Shapes

Create eval envs, sample one batch, and verify the actor returns a tensor with the correct action shape.


In [None]:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
eval_envs = create_eval_envs(json_data, num_envs=16)
sample_obs, _ = eval_envs.reset()
sample_act = eval_envs.action_space.sample()

# YOUR CODE HERE ---------------------------------------------------------------
# Instantiate actor with correct input/output dims and run a forward pass
# CODE ENDS HERE ---------------------------------------------------------------

eval_envs.close()



## 2.5) Train Behavior Cloning (BC)

Train an `Actor` by minimizing MSE between predicted and expert actions. Evaluate periodically by **running the policy in the environment**, not just by loss.

**Things to think about**
- Why evaluation in‑env is necessary vs. training loss alone.


In [None]:

from torch.utils.data import DataLoader
import torch.optim as optim
from tqdm.notebook import tqdm
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
eval_envs = create_eval_envs(json_data, num_envs=16, max_episode_steps=100)
sample_obs, _ = eval_envs.reset(seed=0)
sample_act = eval_envs.action_space.sample()

# YOUR CODE HERE ---------------------------------------------------------------
# 1) Instantiate Actor and move to device
# 2) Create DataLoader over il_dataset
# 3) Train for several epochs; periodically call eval_policy and track success rate
# 4) Optionally save the best model
# CODE ENDS HERE ---------------------------------------------------------------

eval_envs.close()



## 2.6) Final Evaluation (with Video)

Load your best checkpoint (optional) and evaluate several rollouts while recording to disk using the provided recording wrapper.


In [None]:

import numpy as np
from IPython.display import Video

# YOUR CODE HERE ---------------------------------------------------------------
# 1) Re-create eval envs with video_path, reset with a fixed seed
# 2) Re-instantiate Actor and optionally load best weights
# 3) Run several evals, compute mean success rate
# 4) Display one saved video if present
# CODE ENDS HERE ---------------------------------------------------------------



## 2.7) Written — Why not just training loss? (P2.2)

In 1–2 sentences, explain why minimizing MSE on the dataset is **not** sufficient to evaluate policy performance in sequential decision-making settings.



## 2.8) Written — Sense–Plan–Act vs End‑to‑End IL (P2.3)

Write 4–8 sentences comparing training difficulty and data requirements for **sense–plan–act** vs **end‑to‑end imitation learning**.
