# Catching a Falling Object

In this notebook, we will learn how to catch an object that is falling with a Kuka iiwa-7. This task is difficult because of the timing components. We will calculate the expected position of the object at a certain point in time based on gravity, and then control the robot to catch it.

**Learning Objectives:**
1. Implement a system that is able to accurately predict the falling location of an object.
2. Implement control (force/position control) to catch a falling object without it hitting the ground.

**What you'll build:** A complete system with Kuka iiwa-7 that is able to predict the falling position of an object and catch it.

---


## Project Template (Deepnote)

You can use this notebook as a starting point for your class project and/or the extra exercise for graduate students in Deepnote. It comes set up with the dependencies we use in class, so you don't have to install these yourself.

**Notes:**
- To use this in your own workspace, click “Duplicate” for the project in Deepnote.
- Class dependencies, namely `drake` and the `manipulation` package, are already preinstalled in the Deepnote image for this course, as well as a bunch of other dependencies that these packages depend on or that we use in class
- To see the full list of installed dependencies, as well as version numbers etc, see this file: [pyproject.toml](https://github.com/RussTedrake/manipulation/blob/master/pyproject.toml).
  - **Note**: we install all the extra dependencies, including the `dev` dependencies, into the Docker image that this deepnote project runs on.


In [89]:
import os
import time
from pathlib import Path
from textwrap import dedent

import numpy as np
from pydrake.all import (
    AbstractValue,
    BasicVector,
    Context,
    DiagramBuilder,
    InverseKinematics,
    LeafSystem,
    MultibodyPlant,
    PiecewisePolynomial,
    RigidTransform,
    RotationMatrix,
    Simulator,
    Solve,
    StartMeshcat,
    TrajectorySource,
)

from manipulation import running_as_notebook
from manipulation.letter_generation import create_sdf_asset_from_letter
from manipulation.meshcat_utils import AddMeshcatTriad
from manipulation.station import LoadScenario, MakeHardwareStation

if running_as_notebook:
    import mpld3
    mpld3.enable_notebook()


## Step 2: Set up Meshcat Visualization

As always, let's start Meshcat for our 3D visualization!

In [90]:
# Start meshcat for visualization
meshcat = StartMeshcat()
print("Click the link above to open Meshcat in your browser!")

INFO:drake:Meshcat listening for connections at http://localhost:7004


Click the link above to open Meshcat in your browser!


In [91]:
# Create directories for scenarios and assets
outdir = "scenarios"
assets_dir = "assets"
os.makedirs(outdir, exist_ok=True)
os.makedirs(assets_dir, exist_ok=True)

your_initial = 'A'

In [92]:
# Create a simple ground plane SDF
ground_sdf = """<?xml version="1.0"?>
<sdf version="1.7">
  <model name="ground_plane">
    <static>true</static>
    <link name="ground_link">
      <collision name="ground_collision">
        <geometry>
          <box>
            <size>10 10 0.1</size>
          </box>
        </geometry>
        <surface>
          <friction>
            <ode>
              <mu>1.0</mu>
              <mu2>1.0</mu2>
            </ode>
          </friction>
        </surface>
      </collision>
      <visual name="ground_visual">
        <geometry>
          <box>
            <size>10 10 0.1</size>
          </box>
        </geometry>
        <material>
          <ambient>0.5 0.5 0.5 1</ambient>
          <diffuse>0.5 0.5 0.5 1</diffuse>
        </material>
      </visual>
    </link>
  </model>
</sdf>"""

# Save the ground SDF
ground_sdf_path = os.path.join(assets_dir, "ground_plane.sdf")
with open(ground_sdf_path, "w") as f:
    f.write(ground_sdf)

print(f"Created ground plane SDF at {ground_sdf_path}")


Created ground plane SDF at assets/ground_plane.sdf


In [93]:
# Create a simple box SDF for the falling object
# Box dimensions: 0.1m x 0.1m x 0.05m (width x depth x height)
box_sdf = """<?xml version="1.0"?>
<sdf version="1.7">
  <model name="falling_box">
    <link name="box_link">
      <inertial>
        <mass>0.1</mass>
        <inertia>
          <ixx>0.0001</ixx>
          <iyy>0.0001</iyy>
          <izz>0.0001</izz>
        </inertia>
      </inertial>
      <collision name="box_collision">
        <geometry>
          <box>
            <size>0.1 0.1 0.05</size>
          </box>
        </geometry>
      </collision>
      <visual name="box_visual">
        <geometry>
          <box>
            <size>0.1 0.1 0.05</size>
          </box>
        </geometry>
        <material>
          <ambient>0.8 0.2 0.2 1</ambient>
          <diffuse>0.8 0.2 0.2 1</diffuse>
        </material>
      </visual>
    </link>
  </model>
</sdf>"""

# Save the box SDF
box_sdf_path = os.path.join(assets_dir, "falling_box.sdf")
with open(box_sdf_path, "w") as f:
    f.write(box_sdf)

print(f"Created box SDF at {box_sdf_path}")
print("Box dimensions: 0.1m x 0.1m x 0.05m (width x depth x height)")
print("Box thickness (height): 0.05m")

Created box SDF at assets/falling_box.sdf
Box dimensions: 0.1m x 0.1m x 0.05m (width x depth x height)
Box thickness (height): 0.05m


## Step 4: Set up Environment with Kuka IIWA 7 and Falling Object

We'll create a scenario YAML file with the Kuka iiwa 7 and the falling box. The starting position of the box is a variable that can be changed.

In [94]:
# Variable: Starting position of the falling object (can be changed)
# Format: [x, y, z] in meters relative to world frame
falling_object_start_position = [0.3, 0.0, 4.0]  # 30cm in front, 1m high

def file_uri(path: str) -> str:
    """Return a file:// URI for an absolute or relative path."""
    abs_path = path if os.path.isabs(path) else os.path.abspath(path)
    return f"file://{abs_path}"

box_uri = file_uri(box_sdf_path)
ground_uri = file_uri(ground_sdf_path)

# Create scenario YAML
# Step 4: Simple environment with just Kuka iiwa 7, gripper, falling box, and ground
scenario_yaml = f"""directives:
- add_model:
    name: iiwa
    file: package://drake_models/iiwa_description/sdf/iiwa7_no_collision.sdf
    default_joint_positions:
        iiwa_joint_1: [0]
        iiwa_joint_2: [0.1]
        iiwa_joint_3: [0]
        iiwa_joint_4: [-1.2]
        iiwa_joint_5: [0]
        iiwa_joint_6: [1.6]
        iiwa_joint_7: [0]
- add_weld:
    parent: world
    child: iiwa::iiwa_link_0

- add_model:
    name: wsg
    file: package://manipulation/hydro/schunk_wsg_50_with_tip.sdf
- add_weld:
    parent: iiwa::iiwa_link_7
    child: wsg::body
    X_PC:
        translation: [0, 0, 0.09]
        rotation: !Rpy {{ deg: [90, 0, 90]}}

- add_model:
    name: ground_plane
    file: {ground_uri}
- add_weld:
    parent: world
    child: ground_plane::ground_link
    X_PC:
        translation: [0.0, 0.0, -0.05]
        rotation: !Rpy {{ deg: [0, 0, 0] }}

- add_model:
    name: falling_box
    file: {box_uri}
    default_free_body_pose:
        box_link:
            translation: {falling_object_start_position}
            rotation: !Rpy {{ deg: [0, 0, 0] }}

model_drivers:
    iiwa: !IiwaDriver
      control_mode: position_only
      hand_model_name: wsg
    wsg: !SchunkWsgDriver {{}}
"""

# Write scenario file
scenario_file = os.path.join(outdir, "catching_scenario.yaml")
with open(scenario_file, "w") as f:
    f.write(dedent(scenario_yaml).lstrip())

print(f"Created scenario file at {scenario_file}")
print(f"Falling object starting position: {falling_object_start_position}")

Created scenario file at scenarios/catching_scenario.yaml
Falling object starting position: [0.3, 0.0, 4.0]


In [95]:
# Load the catching scenario
scenario = LoadScenario(filename=scenario_file)

# Build the station
builder = DiagramBuilder()
station = builder.AddSystem(MakeHardwareStation(scenario, meshcat=meshcat))

# Get the plant for later use
plant = station.GetSubsystemByName("plant")

# Build the diagram
diagram = builder.Build()

# Create a context and publish initial state
context = diagram.CreateDefaultContext()
diagram.ForcedPublish(context)

print("Station created and visualized in Meshcat!")
print("You should see the robot arm and the red falling box in Meshcat.")

RuntimeError: AddJoint(): This LinkJointGraph already has joint 'sdformat_model_static_world_welds_to_ground_link' (model instance 4) connecting link 'world' to link 'ground_link'. Therefore adding joint 'world_welds_to_ground_link' (model instance 4) connecting link 'world' to link 'ground_link' is not allowed.

#  Load a ball into the system

Choose one of the following balls that you will attempt to catch as it falls down

## Seth's TO-DO List for this assignment


1 - Figure out how to load everything into meshcat. This is relatively simple
2 - Figure out how to set up the simulator so that it's just falling down

# Compute a potential velocity of the items

For now, we will assume that we have perfect knowledge of the items locations

In [96]:
# --- Drop this AFTER your ManipulationStationSim class definition ---

import numpy as np
from pydrake.all import Simulator

def _fix_if_exists(station, context_station, port_name, value):
    if station.HasInputPort(port_name):
        station.GetInputPort(port_name).FixValue(context_station, np.asarray(value))
        return True
    return False

def _station_required_inputs(station):
    """Handy for debugging: list available input ports."""
    return [p.get_name() for p in station.GetInputPorts()]

# Attach as methods to your class (monkey-patch style), or paste inside the class.
def attach_default_inputs(self, iiwa_q=None, wsg_q=None):
    """
    Feed zeros (or provided values) to any station inputs that must be connected.
    Call this after choose_sim(), once contexts exist.
    """
    if iiwa_q is None:
        # use whatever you already set; fallback to zeros
        iiwa_q = getattr(self, "q0", None)
        if iiwa_q is None:
            iiwa_q = np.zeros(7)

    if wsg_q is None:
        wsg_q = [self.gripper_setpoint]  # command (opening) in meters if station expects it

    # Always (re)fix the known position inputs you already used:
    _fix_if_exists(self.station, self.context_station, "iiwa.position", iiwa_q)
    _fix_if_exists(self.station, self.context_station, "wsg.position", wsg_q)

    # Common additional inputs many station configs require:
    _fix_if_exists(self.station, self.context_station, "iiwa.velocity", np.zeros(7))
    _fix_if_exists(self.station, self.context_station, "iiwa.acceleration", np.zeros(7))
    _fix_if_exists(self.station, self.context_station, "iiwa.feedforward_torque", np.zeros(7))
    _fix_if_exists(self.station, self.context_station, "wsg.force_limit", [40.0])  # N, pick a reasonable default

    # Some templates expose alternative names; cover those if present:
    _fix_if_exists(self.station, self.context_station, "iiwa.position_command", iiwa_q)
    _fix_if_exists(self.station, self.context_station, "wsg.state", [0.0])  # if required by specific setups

def print_inputs(self):
    print("Station input ports:", _station_required_inputs(self.station))

def simulate(self, T=5.0, realtime_rate=1.0, record_meshcat=True):
    """
    Runs the diagram forward in time with whatever constant inputs you've fixed.
    """
    # Make sure inputs are fixed before sim
    self.attach_default_inputs()

    sim = Simulator(self.diagram, self.context_diagram)
    if realtime_rate is not None:
        sim.set_target_realtime_rate(realtime_rate)

    try:
        if record_meshcat:
            meshcat.StartRecording()
    except NameError:
        # meshcat not defined in this scope; ignore recording
        record_meshcat = False

    time_final = 10
    sim.Initialize()
    sim.AdvanceTo(time_final)

    if record_meshcat:
        meshcat.StopRecording()
        meshcat.PublishRecording()

    return sim

# Bind the helpers onto your class
ManipulationStationSim.attach_default_inputs = attach_default_inputs
ManipulationStationSim.print_inputs = print_inputs
ManipulationStationSim.simulate = simulate


# Add Cameras

We now would like to load our cameras into the simulation. They have been placed 

In [97]:
def DepthCameraDemoSystem():
    builder = DiagramBuilder()

    scenario = LoadScenario(data=scenario_string)
    station = builder.AddSystem(MakeHardwareStation(scenario, meshcat))

    # Export the camera outputs
    builder.ExportOutput(station.GetOutputPort("camera0.rgb_image"), "rgb_image")
    builder.ExportOutput(station.GetOutputPort("camera0.depth_image"), "depth_image")

    to_point_cloud = AddPointClouds(
        scenario=scenario, station=station, builder=builder, meshcat=meshcat
    )

    # Add a box for the camera in the environment.
    plant = station.GetSubsystemByName("plant")
    camera_instance = plant.GetModelInstanceByName("camera")
    scene_graph = station.GetSubsystemByName("scene_graph")
    AddMultibodyTriad(
        plant.GetFrameByName("base", camera_instance),
        scene_graph,
        length=0.1,
        radius=0.005,
    )

    # Export the point cloud output.
    builder.ExportOutput(
        to_point_cloud["camera0"].point_cloud_output_port(), "point_cloud"
    )

    diagram = builder.Build()
    diagram.set_name("depth_camera_demo_system")
    return diagram


def plot_camera_images():
    system = DepthCameraDemoSystem()

    # Evaluate the camera output ports to get the images.
    context = system.CreateDefaultContext()
    system.ForcedPublish(context)
    color_image = system.GetOutputPort("rgb_image").Eval(context)
    depth_image = system.GetOutputPort("depth_image").Eval(context)

    # Plot the two images.
    plt.subplot(121)
    plt.imshow(color_image.data)
    plt.title("Color image")
    plt.subplot(122)
    plt.imshow(np.squeeze(depth_image.data))
    plt.title("Depth image")
    # mpld3.display()
    plt.show()


plot_camera_images()

NameError: name 'scenario_string' is not defined

In [88]:
# Set up inputs for the station
station_context = diagram.GetSubsystemContext(station, context)

# Fix the robot position (keep it in initial pose for now)
# We'll add control later
q_iiwa_initial = np.array([0, 0.1, 0, -1.2, 0, 1.6, 0])
gripper_width = 0.15  # Open gripper

# Fix the inputs
station.GetInputPort("iiwa.position").FixValue(station_context, q_iiwa_initial)
station.GetInputPort("wsg.position").FixValue(station_context, [gripper_width])

# Create simulator
simulator = Simulator(diagram, context)

# Set realtime rate for visualization
simulator.set_target_realtime_rate(1.0)

# Start recording for Meshcat playback
meshcat.StartRecording()

# Initialize and run simulation
simulator.Initialize()
print("Running simulation for 5 seconds...")
simulator.AdvanceTo(4.0)

# Stop recording and publish
meshcat.StopRecording()
meshcat.PublishRecording()

print("Simulation complete! Check Meshcat to see the falling box.")
print("The box should be falling under gravity. The robot is currently stationary.")


Running simulation for 5 seconds...
Simulation complete! Check Meshcat to see the falling box.
The box should be falling under gravity. The robot is currently stationary.
