## Project Template (Deepnote)

You can use this notebook as a starting point for your class project and/or the extra exercise for graduate students in Deepnote. It comes set up with the dependencies we use in class, so you don't have to install these yourself.

**Notes:**
- To use this in your own workspace, click “Duplicate” for the project in Deepnote.
- Class dependencies, namely `drake` and the `manipulation` package, are already preinstalled in the Deepnote image for this course, as well as a bunch of other dependencies that these packages depend on or that we use in class
- To see the full list of installed dependencies, as well as version numbers etc, see this file: [pyproject.toml](https://github.com/RussTedrake/manipulation/blob/master/pyproject.toml).
  - **Note**: we install all the extra dependencies, including the `dev` dependencies, into the Docker image that this deepnote project runs on.


### (Advanced Users) Installing additional dependencies

Here we show you how to install additional packages into this Deepnote project. We will use the package [gcsopt](https://pypi.org/project/gcsopt/) as an example. To install additional dependencies, you have two choices:
- **Recommended** (automatically installs your dependencies every time you boot the Deepnote machine):
  1. Add the dependency to the list of dependencies in `reqirements.txt` on the left.  
      - **Note**: Make sure to NOT remove the text that is already in the file. You should add your dependencies on new lines AFTER `manipulation=year.month.day`. For our example, the file contents will look like:
          ```
          --extra-index-url https://drake-packages.csail.mit.edu/whl/nightly
          manipulation==2025.9.23
          gcsopt
          ```
  2. Restart the Deepnote machine (or run the `Init` notebook)
- **Alternatively:** Run `!pip install gcsopt` within a cell in the Deepnote notebook. You will have to do this everytime you restart your Deepnote machine.

### (Advanced Users) Updating the `manipulation` version
We continually add new features to the `manipulation` package, which is automatically updated in Deepnote. However, after you first click "Duplicate", you will stop receiving these updates. Follow these steps to update your version to the latest version:
1. Find the latest version number for `manipulation` on the [PyPi package page](https://pypi.org/project/manipulation/#history).
2. Update the version number for `manipulation=year.month.day` in the `requirements.txt` file on the left
3. Restart the Deepnote machine (or run the `Init` notebook)

The changes will persist every time you start your Deepnote machine.

Good luck!

---


## Placeholder code: iiwa Simulation

As a starting point, the code below sets up a simple simulation with a single iiwa, visualized in Meshcat.

### Setup and imports
Let us first get our imports out of the way:

In [None]:
from pydrake.all import (
    DiagramBuilder,
    Simulator,
    StartMeshcat,
    BasicVector,
    LeafSystem,
    InverseKinematics,
    SnoptSolver,
    RotationMatrix,
    RigidTransform,
    Solve,
    LogVectorOutput,
    PiecewisePolynomial,
    TrajectorySource,
    ImageWriter,
    PixelType
)

from manipulation.station import (
    LoadScenario,
    MakeHardwareStation,
)

from manipulation import running_as_notebook

import matplotlib.pyplot as plt
import numpy as np

: 

### Meshcat Visualization

As always, let's start Meshcat for our 3D visualization!

In [3]:
# Start meshcat for visualization
meshcat = StartMeshcat()
print("Click the link above to open Meshcat in your browser!")

INFO:drake:Meshcat listening for connections at http://localhost:7000


Click the link above to open Meshcat in your browser!


In [4]:
scenario_string = """directives:
# add robot
- add_model:
    name: iiwa
    file: package://drake_models/iiwa_description/urdf/iiwa14_primitive_collision.urdf
    default_joint_positions:
      iiwa_joint_1: [-1.57]
      iiwa_joint_2: [0.1]
      iiwa_joint_3: [0]
      iiwa_joint_4: [-1.2]
      iiwa_joint_5: [0]
      iiwa_joint_6: [1.6]
      iiwa_joint_7: [0]
- add_weld:
    parent: world
    child: iiwa::iiwa_link_0

# add gripper
- add_model:
    name: wsg
    file: package://manipulation/hydro/schunk_wsg_50_with_tip.sdf
- add_weld:
    parent: iiwa::iiwa_link_7
    child: wsg::body
    X_PC:
        translation: [0, 0, 0.09]
        rotation: !Rpy { deg: [90, 0, 90]}

# add camera mounted to world 
- add_frame:
    name: camera0_origin
    X_PF:
        base_frame: world
        rotation: !Rpy { deg: [270, 0.0, 90.0]}
        translation: [1.2, -0.5, 0.5]
- add_model:
    name: camera0
    file: package://manipulation/camera_box.sdf
- add_weld:
    parent: camera0_origin
    child: camera0::base

# add camera mounted to robot wrist
- add_frame:
    name: camera_wrist
    X_PF:
      base_frame: iiwa::iiwa_link_7
      translation: [-0.05, 0, 0.1]   # 10 cm ahead of wrist
      rotation: !Rpy {deg: [0, 00, -90]}
- add_model:
    name: camera1
    file: package://manipulation/camera_box.sdf
- add_weld:
    parent: iiwa::camera_wrist
    child: camera1::base

# add cabinet
- add_model:
    name: cabinet
    file: package://drake_models/manipulation_station/cupboard.sdf
- add_frame:
    name: cabinet_origin
    X_PF:
      base_frame: world
      translation: [0, -1.1, 0.4]    # x, y, z in meters
      rotation: !Rpy { deg: [0, 0, 90]}  # roll, pitch, yaw
- add_weld:
    parent: cabinet_origin
    child: cabinet::cupboard_body

cameras:
  camera0:
    name: camera0
    depth: True
    X_PB:
      base_frame: camera0::base
  camera1:
    name: camera1
    depth: True
    X_PB:
      base_frame: camera1::base

model_drivers:
  iiwa: !IiwaDriver
    control_mode: position_only 
    hand_model_name: wsg
  wsg: !SchunkWsgDriver {}
"""

# scenario = LoadScenario(data=scenario_string)
# station = MakeHardwareStation(scenario, meshcat=meshcat)
# builder = DiagramBuilder()
# builder.AddSystem(station)
# diagram = builder.Build()
# context = diagram.CreateDefaultContext()

# simulator = Simulator(diagram)
# simulator.set_target_realtime_rate(1.0)
# simulator.AdvanceTo(0.0)

In [None]:
# run the following cell to visualize the rgb outputs of each of the cameras
cameras = ["camera0", "camera1"]
context = diagram.CreateDefaultContext()

station_context = diagram.GetSubsystemContext(station, context)

fig, axes = plt.subplots(
    1, len(cameras), figsize=(5 * len(cameras), 4), constrained_layout=True
)
# cam = cameras[0]
# ax = axes
for ax, cam in zip(axes, cameras):
    img = station.GetOutputPort(f"{cam}.rgb_image").Eval(station_context)
    arr = np.array(img.data, copy=False).reshape(img.height(), img.width(), -1)
    im = ax.imshow(arr)
    ax.set_title(f"{cam} rgb image")
    ax.axis("off")

if running_as_notebook:
    plt.show()

NameError: name 'diagram' is not defined

# Teleopping

In [6]:
class TeleopController(LeafSystem):
    """
    Buttons for end-effector pose to joint positions via ik
    """
    def __init__(self, meshcat, plant):
        LeafSystem.__init__(self)
        
        # Movement params
        self.STEP_SIZE_M = 0.01  
        self.STEP_SIZE_RAD = 0.087 # ~5 degrees
        
        
        self.meshcat = meshcat
        self.plant = plant
        
        # getting the necessary frames
        self.plant_context = plant.CreateDefaultContext()
        self.W_frame = plant.world_frame()

        wsg_instance = self.plant.GetModelInstanceByName("wsg")
        wsg_link = self.plant.GetBodyByName("body", wsg_instance)
        self.WG_frame = wsg_link.body_frame()
        
        self.nq = 7 
        
        # Configuration for IK
        self.q_initial = np.array([-1.57, 0.2, 0, -1.2, 0, 1.6, 0])
        self.iiwa = self.plant.GetModelInstanceByName("iiwa")
        first_iiwa_joint = self.plant.GetJointByName("iiwa_joint_1", self.iiwa)

        self.iiwa_q_start_index = first_iiwa_joint.position_start()
        
        # Initialize positions
        plant.SetPositions(self.plant_context, self.iiwa, self.q_initial)
        
        # initialize blank desired states
        self.X_WG_desired = plant.CalcRelativeTransform(
            self.plant_context, self.W_frame, self.WG_frame
        )
        self.R_WG_desired = self.X_WG_desired.rotation()
        self.q_desired = self.q_initial
        self.wsg_position_desired = 0.1

        # Register output ports for IIWA and WSG commands
        self.DeclareVectorOutputPort(
            "iiwa_position_command", 
            BasicVector(self.nq), 
            self.DoCalcIiwaOutput
        )
        self.DeclareVectorOutputPort(
            "wsg_position_command", 
            BasicVector(1), 
            lambda context, output: output.SetFromVector([self.wsg_position_desired])
        )

        # TODO: Convert buttons on meshcat simulation to keybindings
        
        self.button_map = {
            "Move +X (Forward)": np.array([self.STEP_SIZE_M, 0, 0]),
            "Move -X (Backward)": np.array([-self.STEP_SIZE_M, 0, 0]),
            "Move +Y (Right)": np.array([0, self.STEP_SIZE_M, 0]),
            "Move -Y (Left)": np.array([0, -self.STEP_SIZE_M, 0]),
            "Move +Z (Up)": np.array([0, 0, self.STEP_SIZE_M]),
            "Move -Z (Down)": np.array([0, 0, -self.STEP_SIZE_M]),
            "Open Gripper (O)": "OPEN",
            "Close Gripper (C)": "CLOSE",
            
            # Roll (Around X-axis of the World Frame)
            "Roll +X": "ROT_X_P",
            "Roll -X": "ROT_X_N",
            # Pitch (Around Y-axis of the World Frame)
            "Pitch +Y": "ROT_Y_P",
            "Pitch -Y": "ROT_Y_N",
            # Yaw (Around Z-axis of the World Frame)
            "Yaw +Z": "ROT_Z_P",
            "Yaw -Z": "ROT_Z_N",
        }

        
        self.last_click_counts = {}
        
        print("\n--- Teleoperation Controls (Meshcat Buttons) ---")
        for button_name in self.button_map.keys():
            meshcat.AddButton(button_name)
            self.last_click_counts[button_name] = 0
            print(f"Button added: {button_name}")
        
        meshcat.AddButton("Stop Simulation") 


    def step_control(self):
        """
        checks for update events
        """
        
        # only one direction/input at a time
        moved = False
        
        for button_name, action in self.button_map.items():
            current_clicks = self.meshcat.GetButtonClicks(button_name)
            
            if current_clicks > self.last_click_counts[button_name]:
                self.last_click_counts[button_name] = current_clicks
                
                if isinstance(action, np.ndarray):
                    # if its translational
                    self.X_WG_desired.set_translation(
                        self.X_WG_desired.translation() + action
                    )
                    
                    print(f"Action: {button_name} -> New Position: {self.X_WG_desired.translation()}")
                    moved = True
                elif action.startswith("ROT"):
                    # if rotational
                        
                    if action == "ROT_X_P":
                        delta_R = RotationMatrix.MakeXRotation(self.STEP_SIZE_RAD)
                    elif action == "ROT_X_N":
                        delta_R = RotationMatrix.MakeXRotation(-self.STEP_SIZE_RAD)
                    elif action == "ROT_Y_P":
                        delta_R = RotationMatrix.MakeYRotation(self.STEP_SIZE_RAD)
                    elif action == "ROT_Y_N":
                        delta_R = RotationMatrix.MakeYRotation(-self.STEP_SIZE_RAD)
                    elif action == "ROT_Z_P":
                        delta_R = RotationMatrix.MakeZRotation(self.STEP_SIZE_RAD)
                    elif action == "ROT_Z_N":
                        delta_R = RotationMatrix.MakeZRotation(-self.STEP_SIZE_RAD)
                    
                    self.R_WG_desired = delta_R.multiply(self.R_WG_desired)
                    moved = True
                    print(f"Action: {button_name} -> Rotated {action[-1]} by {np.degrees(self.STEP_SIZE_RAD):.1f}°")
                elif action == "OPEN":
                    self.wsg_position_desired = 0.1
                    print(f"Action: {button_name} -> Gripper fully OPEN")
                elif action == "CLOSE":
                    self.wsg_position_desired = 0.0
                    print(f"Action: {button_name} -> Gripper fully CLOSED")

        # recalculate ik if there was a movement
        if moved:
            self.X_WG_desired = RigidTransform(self.R_WG_desired, self.X_WG_desired.translation())
            self._calculate_ik()
            
    def _calculate_ik(self):
        """
        Finds the joint positions (q_desired) that best match the desired
        end-effector pose (X_WG_desired), using the last valid solution as a seed.
        """
        print("finding IK solution...")
        ik = InverseKinematics(self.plant, self.plant_context)
        
        # define decision variables for only the IIWA joints
        q_iiwa = ik.q()[self.iiwa_q_start_index : self.iiwa_q_start_index + self.nq]
        
        p_WG_W = self.X_WG_desired.translation()
        ik.AddPositionConstraint(
            self.WG_frame, [0, 0, 0], self.W_frame, p_WG_W, p_WG_W
        )
        
        R_WG_W = self.X_WG_desired.rotation()
        ik.AddOrientationConstraint(
            self.WG_frame, R_WG_W, self.W_frame, RotationMatrix(), 0.05
        )
        
        prog = ik.prog()
        q_current = self.q_desired
        
        # cost minimizes distance b/w current and future state
        prog.AddQuadraticErrorCost(
            np.identity(self.nq), 
            q_current, 
            q_iiwa
        )
        
        solver = SnoptSolver()
        
        # set initial guess
        q_full_initial_guess = self.plant.GetPositions(self.plant_context)
        
        result = solver.Solve(prog, initial_guess=q_full_initial_guess)

        if result.is_success():
            # update positions
            q_full_result = result.GetSolution(ik.q())
            q_iiwa_result = q_full_result[self.iiwa_q_start_index : self.iiwa_q_start_index + self.nq]
            
            self.q_desired = q_iiwa_result
            self.plant.SetPositions(self.plant_context, self.iiwa, self.q_desired)
        else:
            print(f"IK Warning: Failed to find joint solution. Check if target is reachable.")
            
    def DoCalcIiwaOutput(self, context, output):
        """
        Outputs the latest calculated joint position command.
        """
        output.SetFromVector(self.q_desired)

Run simulation to collect teleoperation data and save it to files.

In [None]:
target_dt = 0.05  # 20 Hz

scenario = LoadScenario(data=scenario_string)
station = MakeHardwareStation(scenario, meshcat=meshcat)
builder = DiagramBuilder()
builder.AddSystem(station)
image_writer = builder.AddSystem(ImageWriter())
plant = station.GetSubsystemByName("plant") 

camera_world = station.GetSubsystemByName("rgbd_sensor_camera0")
camera_wrist = station.GetSubsystemByName("rgbd_sensor_camera1")


# add controller
controller = builder.AddSystem(TeleopController(meshcat, plant))

# add logger
logger_joints = LogVectorOutput(controller.GetOutputPort("iiwa_position_command"), builder)
logger_gripper = LogVectorOutput(controller.GetOutputPort("wsg_position_command"), builder)

# Connect controller outputs to station inputs
builder.Connect(
    controller.GetOutputPort("iiwa_position_command"),
    station.GetInputPort("iiwa.position")
)
builder.Connect(
    controller.GetOutputPort("wsg_position_command"),
    station.GetInputPort("wsg.position")
)

diagram = builder.Build()
context = diagram.CreateDefaultContext()
station_context = diagram.GetSubsystemContext(station, context)

simulator = Simulator(diagram)
simulator.set_target_realtime_rate(1.0)

print("\n--- Starting Cartesian Teleop Demo ---")
print("Please focus the Meshcat viewer window to enable keyboard input.")

image_data_camera0 = []
image_data_camera1 = []
image_timestamps = []

# Run until the user clicks the "Stop Simulation" button in Meshcat
while meshcat.GetButtonClicks('Stop Simulation') == 0:
    controller.step_control()
    
    simulator.AdvanceTo(simulator.get_context().get_time() + target_dt)

    sim_context = simulator.get_context()
    station_context = diagram.GetSubsystemContext(station, sim_context)

    # Grab the current images from the output ports
    img0 = station.GetOutputPort("camera0.rgb_image").Eval(station_context)  # ImageRgba8U object
    img1 = station.GetOutputPort("camera1.rgb_image").Eval(station_context)  # ImageRgba8U object

    # Convert to numpy arrays for easier handling
    np_img0 = np.array(np.copy(img0.data)).reshape(img0.height(), img0.width(), -1)  # RGBA
    np_img1 = np.array(np.copy(img1.data)).reshape(img1.height(), img1.width(), -1)

    image_data_camera0.append(np_img0)
    image_data_camera1.append(np_img1)
    image_timestamps.append(sim_context.get_time())

print("Simulation stopped by user.")

np.savez(
    "teleop_data/image_timestamps.npz",
    time=np.array(image_timestamps)
)




--- Teleoperation Controls (Meshcat Buttons) ---
Button added: Move +X (Forward)
Button added: Move -X (Backward)
Button added: Move +Y (Right)
Button added: Move -Y (Left)
Button added: Move +Z (Up)
Button added: Move -Z (Down)
Button added: Open Gripper (O)
Button added: Close Gripper (C)
Button added: Roll +X
Button added: Roll -X
Button added: Pitch +Y
Button added: Pitch -Y
Button added: Yaw +Z
Button added: Yaw -Z

--- Starting Cartesian Teleop Demo ---
Please focus the Meshcat viewer window to enable keyboard input.
Action: Move +X (Forward) -> New Position: [ 0.01039956 -0.50175237  0.6703261 ]
finding IK solution...
Action: Move -X (Backward) -> New Position: [ 3.99558944e-04 -5.01752373e-01  6.70326103e-01]
finding IK solution...
Action: Move +Y (Right) -> New Position: [ 3.99558944e-04 -4.91752373e-01  6.70326103e-01]
finding IK solution...
Action: Move -Y (Left) -> New Position: [ 3.99558944e-04 -5.01752373e-01  6.70326103e-01]
finding IK solution...
Action: Move -Y (Left)

: 

Save images to teleop_data folder if we want. Otherwise, images are stored in image_data_camera0 and image_data_camera1.

In [None]:
from imageio import imwrite

for i, img in enumerate(image_data_camera0):
    imwrite(f"teleop_data/camera_0/frame_{i:04d}.png", img[:, :, :3])

for i, img in enumerate(image_data_camera1):
    imwrite(f"teleop_data/camera_1/frame_{i:04d}.png", img[:, :, :3])

Downsample collected data to 100Hz and save.

In [None]:
# Save original joint data to teleop_joint_data.npz
joint_log_data = logger_joints.FindLog(simulator.get_context())
joint_q_log = joint_log_data.data()        # shape (nq, N)
joint_t_log = joint_log_data.sample_times()  # shape (N,)
np.savez("teleop_data/teleop_joint_data.npz", time=joint_t_log, q=joint_q_log)
print(joint_q_log)
print(joint_t_log)

# Save original gripper data to teleop_gripper_data.npz
gripper_log_data = logger_gripper.FindLog(simulator.get_context())
gripper_q_log = gripper_log_data.data()        # shape (1, N)
gripper_t_log = gripper_log_data.sample_times()  # shape (N,)
np.savez("teleop_data/teleop_gripper_data.npz", time=gripper_t_log, q=gripper_q_log)
print(gripper_q_log)
print(gripper_t_log)

joint_t_uniform = np.arange(joint_t_log[0], joint_t_log[-1], target_dt)
joint_q_uniform = np.empty((joint_q_log.shape[0], len(joint_t_uniform)))

for i in range(joint_q_log.shape[0]):
    joint_q_uniform[i, :] = np.interp(joint_t_uniform, joint_t_log, joint_q_log[i, :])

np.savez("teleop_data/downsampled_teleop_joint_data.npz", time=joint_t_uniform, q=joint_q_uniform)

print(joint_t_uniform)
print(joint_q_uniform)

gripper_t_uniform = np.arange(gripper_t_log[0], gripper_t_log[-1], target_dt)
gripper_q_uniform = np.empty((gripper_q_log.shape[0], len(gripper_t_uniform)))

for i in range(gripper_q_log.shape[0]):
    gripper_q_uniform[i, :] = np.interp(gripper_t_uniform, gripper_t_log, gripper_q_log[i, :])

np.savez("teleop_data/downsampled_teleop_gripper_data.npz", time=gripper_t_uniform, q=gripper_q_uniform)

print(gripper_t_uniform)
print(gripper_q_uniform)

[[-1.57       -1.57       -1.57       ...  0.76386041  0.76386041
   0.76386041]
 [ 0.2         0.2         0.2        ... -1.71019599 -1.71019599
  -1.71019599]
 [ 0.          0.          0.         ... -1.54232117 -1.54232117
  -1.54232117]
 ...
 [ 0.          0.          0.         ... -0.40084424 -0.40084424
  -0.40084424]
 [ 1.6         1.6         1.6        ... -0.86692366 -0.86692366
  -0.86692366]
 [ 0.          0.          0.         ... -1.43165516 -1.43165516
  -1.43165516]]
[0.0000e+00 1.0000e-04 6.0000e-04 ... 4.9098e+01 4.9099e+01 4.9100e+01]
[[0.1 0.1 0.1 ... 0.  0.  0. ]]
[0.0000e+00 1.0000e-04 6.0000e-04 ... 4.9098e+01 4.9099e+01 4.9100e+01]
[0.000e+00 1.000e-02 2.000e-02 ... 4.907e+01 4.908e+01 4.909e+01]
[[-1.57       -1.57       -1.57       ...  0.76386041  0.76386041
   0.76386041]
 [ 0.2         0.2         0.2        ... -1.71019599 -1.71019599
  -1.71019599]
 [ 0.          0.          0.         ... -1.54232117 -1.54232117
  -1.54232117]
 ...
 [ 0.          0. 

Replay collected joint and gripper data. (Need to restart kernel, run first three cells to import dependencies and define scenario_string and meshcat, then run this cell.)

In [6]:
joint_data = np.load("downsampled_teleop_joint_data.npz")
joint_t_uniform = joint_data["time"]
joint_q_uniform = joint_data["q"]

gripper_data = np.load("downsampled_teleop_gripper_data.npz")
gripper_t_uniform = gripper_data["time"]
gripper_q_uniform = gripper_data["q"]

scenario = LoadScenario(data=scenario_string)
station = MakeHardwareStation(scenario, meshcat=meshcat)
plant = station.GetSubsystemByName("plant") 

traj_iiwa = PiecewisePolynomial.FirstOrderHold(joint_t_uniform, joint_q_uniform)
traj_wsg = PiecewisePolynomial.FirstOrderHold(gripper_t_uniform, gripper_q_uniform)

builder = DiagramBuilder()
builder.AddSystem(station)  # your station system already created

# Create trajectory sources
iiwa_source = builder.AddSystem(TrajectorySource(traj_iiwa))
wsg_source = builder.AddSystem(TrajectorySource(traj_wsg))

# Connect to the plant
builder.Connect(iiwa_source.get_output_port(0),
                station.GetInputPort("iiwa.position"))
builder.Connect(wsg_source.get_output_port(0),
                station.GetInputPort("wsg.position"))

diagram = builder.Build()

simulator = Simulator(diagram)
simulator.set_target_realtime_rate(1.0)

# Run for the duration of the trajectory
simulator.AdvanceTo(joint_t_uniform[-1])

print("Replay finished!")



Replay finished!


<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=cf4fb7c2-18e7-48b8-93e7-63f0209e538b' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>