# Robomimic Rapidly-exploring Ramdom Trees (RRT) - Instructional Notebook

This notebook is an implementation of **Rapidly-Exploring Random Trees (RRT)** within a `robomimic` simulation environment. The implementation is built on top of [robomimic tutorial](https://colab.research.google.com/drive/1b62r_km9pP40fKF0cBdpdTO2P_2eIbC6?usp=sharing).

---

## Objectives

- Understand the basics of motion planning with RRT.
- Integrate RRT into `robomimic`.
- Visualize planned trajectories and identify potential issues.
- Modify or extend the implementation as an exercise.

---

## Table of Contents

1. Environment Setup
2. Download Demonstration Dataset
3. Implement RRT Algorithm
4. Path Execution (Rollout)
5. Path Visualization

---

**Note**: Please run each cell in order. Some cells are left intentionally editable for hands-on experimentation.


### 1. Environment Setup

In [1]:
WS_DIR = "/content/"
%cd $WS_DIR

# Clone the repo and install the basic requirements
!git clone --branch v0.4 https://github.com/ARISE-Initiative/robomimic
!pip install -e robomimic/ > /dev/null

import sys
import os
sys.path.append('./robomimic/')

/content
Cloning into 'robomimic'...
remote: Enumerating objects: 3540, done.[K
remote: Counting objects: 100% (1625/1625), done.[K
remote: Compressing objects: 100% (362/362), done.[K
remote: Total 3540 (delta 1424), reused 1263 (delta 1263), pack-reused 1915 (from 4)[K
Receiving objects: 100% (3540/3540), 62.09 MiB | 15.69 MiB/s, done.
Resolving deltas: 100% (2457/2457), done.


In [2]:
# install all system dependencies for mujoco-py
!sudo DEBIAN_FRONTEND=noninteractive apt install curl git libgl1-mesa-dev libgl1-mesa-glx libglew-dev \
         libosmesa6-dev software-properties-common net-tools unzip vim \
         virtualenv wget xpra xserver-xorg-dev libglfw3-dev patchelf > /dev/null

#install mujoco-py
!pip install mujoco > /dev/null

#install robosuite
!pip install robosuite > /dev/null



Extracting templates from packages: 100%


## 2. Download Demonstration Dataset

In this step, we download the **robomimic demonstration dataset** for the `"can"` task.  
This dataset contains low-dimensional proprioceptive and object state information recorded from expert demonstrations.

We will later use this dataset not only to inspect robot and object configurations,  
but also to **extract statistical information** such as:

- Workspace bounds for RRT node sampling
- Candidate goal positions

These statistics are essential for initializing and constraining the RRT planner.

> **Note:**  
> - In Colab, rendering with MuJoCo requires `osmesa` or `egl`.  
> - This code sets `MUJOCO_GL` to `'osmesa'` by default for compatibility.


In [3]:
# import all utility functions

import os
import json
import h5py
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader

import robomimic
import robomimic.utils.obs_utils as ObsUtils
import robomimic.utils.torch_utils as TorchUtils
import robomimic.utils.test_utils as TestUtils
import robomimic.utils.file_utils as FileUtils
import robomimic.utils.train_utils as TrainUtils
from robomimic.utils.dataset import SequenceDataset

from robomimic.config import config_factory
from robomimic.algo import algo_factory

# for rendering mujoco in colab, you need turn on egl
os.environ['MUJOCO_GL'] = 'osmesa' # if gpu possible, use 'egl', if not, use 'osmesa'

# the dataset registry can be found at robomimic/__init__.py
from robomimic import DATASET_REGISTRY, HF_REPO_ID

# set download folder and make it
download_folder = WS_DIR + "robomimic_data/"
os.makedirs(download_folder, exist_ok=True)

# download the dataset
task = "can"
dataset_type = "ph"
hdf5_type = "low_dim"
FileUtils.download_file_from_hf(
    repo_id=HF_REPO_ID,
    filename=DATASET_REGISTRY[task][dataset_type][hdf5_type]["url"],
    download_dir=download_folder,
)

# enforce that the dataset exists
dataset_path = os.path.join(download_folder, "low_dim_v15.hdf5")
assert os.path.exists(dataset_path)

    No private macro file found!
    It is recommended to use a private macro file
    To setup, run: python /content/./robomimic/robomimic/scripts/setup_macros.py
)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


low_dim_v15.hdf5:   0%|          | 0.00/46.9M [00:00<?, ?B/s]

Visualize Playback Trajectory

- Replays 5 demonstrations episode from the dataset.

In [4]:
!python robomimic/robomimic/scripts/playback_dataset.py --dataset robomimic_data/low_dim_v15.hdf5 --video_path demo_playback_dataset.mp4 --n 5

    No private macro file found!
    It is recommended to use a private macro file
    To setup, run: python /content/robomimic/robomimic/scripts/setup_macros.py
)[0m


using obs modality: low_dim with keys: ['robot0_eef_pos']
using obs modality: rgb with keys: []
Created environment with name PickPlaceCan
Action size is 7
Playing back episode: demo_0
Playing back episode: demo_1
Playing back episode: demo_2
Playing back episode: demo_3
Playing back episode: demo_4


In [5]:
# visualize rollout video

from IPython.display import HTML
from base64 import b64encode

mp4 = open("demo_playback_dataset.mp4", "rb").read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML(f"""
<video width=400 controls>
      <source src="{data_url}" type="video/mp4">
</video>
""")

Here, we analyze the demonstration dataset to extract key statistics needed for RRT planning:

- **End-effector workspace bounds**:  
  For each episode, we compute the minimum and maximum positions of the robot's end-effector (`robot0_eef_pos`).  
  This helps define the **sampling bounds** for RRT node generation.

- **Goal positions**:  
  We extract the final object position in each trajectory.  
  This approximates where the object is located at the end of a successful demo.

- **Final end-effector positions**:  
  The last position of the robotâ€™s gripper in each demo is stored to estimate where the robot typically ends up.

These statistics are then aggregated across the dataset to compute:

- The **average goal position** (used as a reference during planning)
- The **average final end-effector position** (helpful for trajectory alignment)
- The **overall 3D bounds** for RRT sampling space

> These values are critical for initializing the planner in a task-aware manner rather than relying on hard-coded or arbitrarily chosen limits.


In [13]:
f = h5py.File(dataset_path, "r")
dataset = f["data"]

min_x, min_y, min_z, max_x, max_y, max_z = None, None, None, None, None, None
goal_pos_list = []
final_pos_list = []
for i in dataset.keys():
    i_min_x, i_min_y, i_min_z = np.min(dataset[i]["obs/robot0_eef_pos"], axis=0)
    i_max_x, i_max_y, i_max_z = np.max(dataset[i]["obs/robot0_eef_pos"], axis=0)

    goal_pos_list.append(dataset[i]["obs/object"][-1, 7:10])
    final_pos_list.append(dataset[i]["obs/robot0_eef_pos"][-1])

    min_x = i_min_x if min_x is None or i_min_x < min_x else min_x
    min_y = i_min_y if min_y is None or i_min_y < min_y else min_y
    min_z = i_min_z if min_z is None or i_min_z < min_z else min_z
    max_x = i_max_x if max_x is None or i_max_x > max_x else max_x
    max_y = i_max_y if max_y is None or i_max_y > max_y else max_y
    max_z = i_max_z if max_z is None or i_max_z > max_z else max_z

avg_goal_pos = np.mean(goal_pos_list, axis=0)
avg_final_pos = np.mean(final_pos_list, axis=0)
bounds = [
    (min_x, max_x),
    (min_y, max_y),
    (min_z, max_z)
]

## 3. Implement RRT Algorithm

We now implement the **Rapidly-Exploring Random Tree (RRT)** algorithm to perform motion planning in 3D space.

### Key Concepts

- **Node class**:  
  Represents a point in space along with a pointer to its parent node (for path backtracking).

- **Tree expansion**:
  1. Randomly sample a point within the workspace bounds.
  2. Find the nearest node already in the tree.
  3. "Steer" from the nearest node toward the sample, by a fixed step size.
  4. If the new node is not in collision, add it to the tree.

- **Collision checking**:  
  The planner avoids invalid positions using a simple geometric collision checker for spheres and boxes.

- **Termination**:  
  If a newly added node is within `goal_threshold` distance from the goal, the planner stops and reconstructs the path by backtracking through node parents.

### Adjustable Parameters

- `step_size`: how far to extend toward a sample at each step.
- `goal_threshold`: minimum distance to consider the goal "reached".
- `max_iter`: maximum number of iterations before giving up.
- `obstacles`: list of obstacles (spheres or boxes) defined by center and size.

---

> âœ… The `rrt` function returns a list of 3D points forming a valid path from start to goal, or `None` if the goal is unreachable.


In [37]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

class Node:
    def __init__(self, pos):
        self.pos = np.array(pos)
        self.parent = None

class RRT:
    def __init__(self):
        self.tree = []
        self.path = []

    def distance(self, n1, n2):
        return np.linalg.norm(n1.pos - n2.pos)

    def sample_free(self, bounds):
        return np.array([np.random.uniform(low, high) for low, high in bounds])

    def steer(self, from_node, to_pos, step_size):
        direction = to_pos - from_node.pos
        dist = np.linalg.norm(direction)
        if dist < step_size:
            new_pos = to_pos
        else:
            new_pos = from_node.pos + (direction / dist) * step_size
        new_node = Node(new_pos)
        new_node.parent = from_node
        return new_node

    def collision_checker(self, pos, obstacles):
        if obstacles is None:
            return False

        for obstacle in obstacles:
            center = obstacle["center"]
            size = obstacle["size"]
            shape = obstacle.get("shape", "sphere")

            if shape == "sphere":
                if np.linalg.norm(pos - center) < size:
                    return True
            elif shape == "box":
                lower = center - size
                upper = center + size
                if np.all(pos >= lower) and np.all(pos <= upper):
                    return True
        return False

    def rrt_search(self, start_pos, goal_pos, bounds, obstacles=None, step_size=0.005, max_iter=5000, goal_threshold=0.0001, goal_bias=False):
        start = Node(start_pos)
        goal = Node(goal_pos)
        self.tree.append(start)

        for i in range(max_iter):
            sample = self.sample_free(bounds)

            # goal bias implementation (10%)
            if goal_bias and np.random.rand() < 0.1:
                sample = goal.pos

            nearest = min(self.tree, key=lambda n: np.linalg.norm(n.pos - sample))
            new_node = self.steer(nearest, sample, step_size)

            if self.collision_checker(new_node.pos, obstacles):
                continue

            self.tree.append(new_node)

            if np.linalg.norm(new_node.pos - goal.pos) < goal_threshold:
                goal.parent = new_node
                self.tree.append(goal)
                print(f"Goal reached in {i} iterations.")
                break

        if goal.parent is None:
            print("Goal not reachable.")
            return None

        # backtrack path
        node = goal
        while node:
            self.path.append(node.pos)
            node = node.parent
        self.path.reverse()
        return self.path

In [38]:
class RRT_star(RRT):
    def __init__(self, rewire_radius=0.005):
        super().__init__()
        self.rewire_radius = rewire_radius

    def cost(self, node):
        total_cost = 0.0
        current = node
        while current.parent is not None:
            total_cost += self.distance(current, current.parent)
            current = current.parent
        return total_cost

    def get_nearby_nodes(self, new_node):
        nearby_nodes = []
        for node in self.tree:
            if self.distance(new_node, node) <= self.rewire_radius:
                nearby_nodes.append(node)
        return nearby_nodes

    def rrt_search(self, start_pos, goal_pos, bounds, obstacles=None, step_size=0.005, max_iter=5000, goal_threshold=0.0001):
        start = Node(start_pos)
        goal = Node(goal_pos)
        self.tree.append(start)

        for i in range(max_iter):
            sample = self.sample_free(bounds)

            # goal bias implementation (10%)
            if goal_bias and np.random.rand() < 0.1:
                sample = goal.pos

            nearest = min(self.tree, key=lambda n: np.linalg.norm(n.pos - sample))
            new_node = self.steer(nearest, sample, step_size)

            if self.collision_checker(new_node.pos, obstacles):
                continue

            # Choose parent among nearby nodes
            nearby_nodes = self.get_nearby_nodes(new_node)
            min_cost = self.cost(nearest) + self.distance(nearest, new_node)
            best_parent = nearest

            for node in nearby_nodes:
                if not self.collision_checker(self.steer(node, new_node.pos, step_size).pos, obstacles):
                    c = self.cost(node) + self.distance(node, new_node)
                    if c < min_cost:
                        min_cost = c
                        best_parent = node

            new_node.parent = best_parent
            self.tree.append(new_node)

            # Rewire
            for node in nearby_nodes:
                if not self.collision_checker(self.steer(new_node, node.pos, step_size).pos, obstacles):
                    c = self.cost(new_node) + self.distance(new_node, node)
                    if c < self.cost(node):
                        node.parent = new_node

            if self.distance(new_node, goal) < goal_threshold:
                goal.parent = new_node
                self.tree.append(goal)
                print(f"Goal reached in {i} iterations.")
                break

        if goal.parent is None:
            print("Goal not reachable.")
            return None

        # backtrack path
        node = goal
        while node:
            self.path.append(node.pos)
            node = node.parent
        self.path.reverse()
        return self.path

## 4. Path Execution (Rollout)

Now that we've implemented RRT, we will **execute actions in the robomimic**.

### Environment Setup

We first create a robomimic-compatible environment from metadata in the dataset.  
This ensures that the simulation settings (robot type, object type, etc.) exactly match the demonstration data.

In [39]:
from robosuite.environments.manipulation.pick_place import PickPlace
from robosuite.utils.mjcf_utils import CustomMaterial
from robosuite.models.objects import (
    BoxObject,
    BreadObject,
    BreadVisualObject,
    CanObject,
    CanVisualObject,
    CerealObject,
    CerealVisualObject,
    MilkObject,
    MilkVisualObject,
)
from robosuite.utils.placement_samplers import SequentialCompositeSampler, UniformRandomSampler

from robosuite.environments.base import register_env

class PickPlaceWithObstacles(PickPlace):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def _construct_objects(self):
        """
        Function that can be overriden by subclasses to load different objects.
        """
        self.objects = []

        obj = CanObject(name="Can")
        self.objects.append(obj)

        # initialize obstacle
        tex_attrib = {
            "type": "cube",
        }
        mat_attrib = {
            "texrepeat": "1 1",
            "specular": "0.4",
            "shininess": "0.1",
        }
        redwood = CustomMaterial(
            texture="WoodRed",
            tex_name="redwood",
            mat_name="redwood_mat",
            tex_attrib=tex_attrib,
            mat_attrib=mat_attrib,
        )

        cube = BoxObject(
            name="cube",
            size_min=[0.2, 0.14, 0.04],
            size_max=[0.2, 0.14, 0.04],
            rgba=[1, 0, 0, 1],
            material=redwood,
        )
        self.objects.append(cube)

    def _get_placement_initializer(self):
        """
        Helper function for defining placement initializer and object sampling bounds.
        """
        self.placement_initializer = SequentialCompositeSampler(name="ObjectSampler")

        # can sample anywhere in bin
        bin_x_half = self.model.mujoco_arena.table_full_size[0] / 2 - 0.15
        bin_y_half = self.model.mujoco_arena.table_full_size[1] / 2 - 0.15

        # each object should just be sampled in the bounds of the bin (with some tolerance)
        self.placement_initializer.append_sampler(
            sampler=UniformRandomSampler(
                name="CollisionObjectSampler",
                mujoco_objects=self.objects[0],
                x_range=[-bin_x_half, bin_x_half],
                y_range=[-bin_y_half, bin_y_half],
                rotation=self.z_rotation,
                rotation_axis="z",
                ensure_object_boundary_in_range=True,
                ensure_valid_placement=True,
                reference_pos=self.bin1_pos,
                z_offset=self.z_offset,
            )
        )

        bin_x_low = self.bin2_pos[0]
        bin_y_low = self.bin2_pos[1] - self.bin_size[1] / 2
        bin_x_high = bin_x_low + self.bin_size[0] / 2
        bin_y_high = bin_y_low + self.bin_size[1] / 2
        bin_center = np.array(
            [
                (bin_x_low + bin_x_high) / 2.0,
                (bin_y_low + bin_y_high) / 2.0,
            ]
        )

        # placement is relative to object bin, so compute difference and send to placement initializer
        rel_center = bin_center - self.bin1_pos[:2]

        # each obstacles should just be sampled in the bounds of the bin (with some tolerance)
        self.placement_initializer.append_sampler(
            sampler=UniformRandomSampler(
                name="ObstacleObjectSampler",
                mujoco_objects=self.objects[1],
                x_range=[0, 0],
                y_range=[rel_center[1]-0.02, rel_center[1]-0.02],
                rotation=0.0,
                rotation_axis="z",
                ensure_object_boundary_in_range=False,
                ensure_valid_placement=True,
                reference_pos=self.bin1_pos,
                z_offset=0.11 +self.bin2_pos[2] - self.bin1_pos[2],
            )
        )

        # each visual object should just be at the center of each target bin
        index = 0
        for vis_obj in self.visual_objects:

            # get center of target bin
            bin_x_low = self.bin2_pos[0]
            bin_y_low = self.bin2_pos[1]
            if index == 0 or index == 2:
                bin_x_low -= self.bin_size[0] / 2
            if index < 2:
                bin_y_low -= self.bin_size[1] / 2
            bin_x_high = bin_x_low + self.bin_size[0] / 2
            bin_y_high = bin_y_low + self.bin_size[1] / 2
            bin_center = np.array(
                [
                    (bin_x_low + bin_x_high) / 2.0,
                    (bin_y_low + bin_y_high) / 2.0,
                ]
            )

            # placement is relative to object bin, so compute difference and send to placement initializer
            rel_center = bin_center - self.bin1_pos[:2]

            self.placement_initializer.append_sampler(
                sampler=UniformRandomSampler(
                    name=f"{vis_obj.name}ObjectSampler",
                    mujoco_objects=vis_obj,
                    x_range=[rel_center[0], rel_center[0]],
                    y_range=[rel_center[1], rel_center[1]],
                    rotation=0.0,
                    rotation_axis="z",
                    ensure_object_boundary_in_range=False,
                    ensure_valid_placement=False,
                    reference_pos=self.bin1_pos,
                    z_offset=self.bin2_pos[2] - self.bin1_pos[2],
                )
            )
            index += 1

register_env(PickPlaceWithObstacles)

In [40]:
import robomimic.utils.env_utils as EnvUtils

dummy_spec = dict(
    obs=dict(
        low_dim=["robot0_eef_pos", "robot0_eef_quat", "robot0_gripper_qpos", "object"],
        rgb=[],
    ),
)
ObsUtils.initialize_obs_utils_with_obs_specs(obs_modality_specs=dummy_spec)

env_meta = FileUtils.get_env_metadata_from_dataset(dataset_path)

env = EnvUtils.create_env_from_metadata(
    env_meta=env_meta,
    env_name="PickPlaceWithObstacles",
    render=False,
    render_offscreen=True,
    use_image_obs=False,
)



using obs modality: low_dim with keys: ['object', 'robot0_gripper_qpos', 'robot0_eef_quat', 'robot0_eef_pos']
using obs modality: rgb with keys: []
Created environment with name PickPlaceWithObstacles
Action size is 7


### Rollout Function

We define a `rollout` function that:
- Takes in the environment and a sequence of predicted actions
- Executes the actions in the simulation, one step at a time
- Records frames for visualization
- Tracks success metrics such as task completion and total reward


In [41]:
def rollout(env, pred_actions, video_writer, obstacles=[]):
    results = {}

    video_frames = []
    video_count = 0  # video frame counter

    rews = []
    success = None # success metrics

    for ac in pred_actions:
        # play action
        ob_dict, r, done, _ = env.step(ac)

        # compute reward
        rews.append(r)

        cur_success_metrics = env.is_success()

        if success is None:
            success = deepcopy(cur_success_metrics)
        else:
            for k in success:
                success[k] = success[k] | cur_success_metrics[k]

        # visualization
        frame = env.render(mode="rgb_array", height=512, width=512)

        video_frames.append(frame)
        video_count += 1

        # break if done
        if done:
            end_step = video_count
            break

    for frame in video_frames:
        video_writer.append_data(frame)

    end_step = video_count
    total_reward = np.sum(rews[:end_step + 1])

    results["Return"] = total_reward
    results["Horizon"] = end_step + 1
    results["Success_Rate"] = float(success["task"])

    # log additional success metrics
    for k in success:
        if k != "task":
            results["{}_Success_Rate".format(k)] = float(success[k])

    return ob_dict, results

###Action Execution Steps

We execute **three RRT rollouts**, corresponding to stages in a typical manipulation task:

1. **Reach the Object**
   - Plan a path from the robot's current end-effector position to the object.
   - Use RRT to generate the trajectory and convert it to a list of actions.
   - Execute the actions with gripper open (`-1`).

2. **Grasp and Transport**
   - Plan a new path from the grasp point to a transport goal position.
   - Include a virtual **obstacle** to simulate occlusion or constrained lifting paths.
   - Execute with gripper closed (`1`).

3. **Arrive and Release**
   - Execute with gripper open (`-1`) to release.
   
   ---

> ðŸ“Œ Note:  
> All actions are formatted as `[x, y, z, 0, 0, 0, gripper]` where `gripper` is `-1` (open) or `1` (close).


In [42]:
def rollout_phase(phase, ob_dict, video_writer):
    start = ob_dict["robot0_eef_pos"]
    obstacles = None
    gripper = -1
    if phase == "reach":
        goal = ob_dict["object"][7:10] + np.array([0.0, 0.0, 0.03])
    elif phase == "grasp":
        goal = avg_goal_pos + np.array([0.0, 0.0, 0.2])
        obstacles = [
            {"center": np.array([0, 0.1375, 0.85]), "size": np.array([0.2, 0.15, 0.25]), "shape": "box"},
        ]
        gripper = 1
    elif phase == "final":
        goal = avg_final_pos

    rrt = RRT()
    pred_xyz = rrt.rrt_search(start, goal, bounds, obstacles=obstacles, step_size=0.005, goal_threshold=0.00001)
    if pred_xyz is None:
        raise Exception("Goal not reachable.")
    diff_xyz = np.diff(pred_xyz, axis=0) * action_modifier
    pred_actions = np.array([np.array([x, y, z, 0, 0, 0, gripper]) for x, y, z in diff_xyz])
    ob_dict, results = rollout(env, pred_actions, video_writer)

    return ob_dict, results

In [43]:
from robomimic.algo import RolloutPolicy
from robomimic.utils.train_utils import run_rollout
import imageio
from copy import deepcopy

# create a video writer
video_path = "rollout.mp4"
video_writer = imageio.get_writer(video_path, fps=20)

action_modifier = 80

env.reset()
ob_dict, _, _, _ = env.step(np.zeros(7))

ob_dict, results = rollout_phase("reach", ob_dict, video_writer)
ob_dict, results = rollout_phase("grasp", ob_dict, video_writer)
ob_dict, results = rollout_phase("final", ob_dict, video_writer)

video_writer.close()

Goal reached in 344 iterations.
Goal reached in 1292 iterations.
Goal reached in 77 iterations.


## 5. Path Visualization

### Video Logging

- A video of the entire trajectory is saved as `rollout.mp4`.

In [44]:
# visualize rollout video

from IPython.display import HTML
from base64 import b64encode

mp4 = open(video_path, "rb").read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML(f"""
<video width=400 controls>
      <source src="{data_url}" type="video/mp4">
</video>
""")