In [None]:
from IPython.display import display, SVG
import numpy as np
import time
import os
import pydot
import sys

from pydrake.all import (
    Adder, AddMultibodyPlantSceneGraph, Demultiplexer, DiagramBuilder, 
    InverseDynamicsController, MakeMultibodyStateToWsgStateSystem, 
    MeshcatVisualizerCpp, MultibodyPlant,Parser, PassThrough, 
    SchunkWsgPositionController, StateInterpolatorWithDiscreteDerivative,
    Sphere, Cylinder, Box, RigidTransform, SpatialVelocity, Simulator,
    ConstantVectorSource, LeafSystem, BasicVector, Integrator, JacobianWrtVariable,
    MeshcatVisualizerParams, RotationMatrix, PiecewisePose, PiecewisePolynomial, 
    TrajectorySource, FindResourceOrThrow, PiecewiseQuaternionSlerp
)
from manipulation.meshcat_cpp_utils import StartMeshcat, AddMeshcatTriad
from manipulation.scenarios import AddIiwa, AddWsg, AddRgbdSensors, AddShape
from manipulation.utils import FindResource
from manipulation import running_as_notebook
from pydrake.examples.manipulation_station import ManipulationStation

if running_as_notebook and sys.platform == "linux" and os.getenv("DISPLAY") is None:
    from pyvirtualdisplay import Display
    virtual_display = Display(visible=0, size=(1400, 900))
    virtual_display.start()


In [None]:
# Start the visualizer.
meshcat = StartMeshcat()

Meshcat is now available at https://52fe2c12-5462-4de8-b3a8-e001ac600845.deepnoteproject.com


# Table of Contents

1. Introduction and overview (context and goals)
2. References Used
3. Planning and executing a grasp maneuver
4. Engaging a tumbling target
5. Experiments with initial target conditions
6. Adding random noise
7. Future improvements

# Introduction and overview

# References

# Environment setup

Below is my modified MakeManipulationStation that is initialized with a target cylinder.

In [None]:
def MakeManipulationStation(time_step=0.002):
    builder = DiagramBuilder()

    # Add (only) the iiwa, WSG, and cameras to the scene.
    plant, scene_graph = AddMultibodyPlantSceneGraph(
        builder, time_step=time_step)
    iiwa = AddIiwa(plant)
    wsg = AddWsg(plant, iiwa)
    # Parser(plant).AddModelFromFile(
    #     FindResource("models/camera_box.sdf"), "camera0")
    plant.mutable_gravity_field().set_gravity_vector([0, 0, 0])

    # Set up target cylinder
    mu = 0.9
    r = 0.03
    l = 0.15
    m = 1
    color = [1, 0, 0, 1]
    target = AddShape(plant, Cylinder(r, l), name="target", mass=m, mu=mu, color=color)

    # Add floor
    # ground = AddShape(plant, Box(10,10,2.0), name="ground", mu=mu)
    # plant.WeldFrames(plant.world_frame(), plant.GetFrameByName("ground"), RigidTransform(p=[0,0,-1.0]))

    plant.Finalize()

    num_iiwa_positions = plant.num_positions(iiwa)

    # I need a PassThrough system so that I can export the input port.
    iiwa_position = builder.AddSystem(PassThrough(num_iiwa_positions))
    builder.ExportInput(iiwa_position.get_input_port(), "iiwa_position")
    builder.ExportOutput(iiwa_position.get_output_port(), "iiwa_position_command")

    # Export the iiwa "state" outputs.
    demux = builder.AddSystem(Demultiplexer(
        2 * num_iiwa_positions, num_iiwa_positions))
    builder.Connect(plant.get_state_output_port(iiwa), demux.get_input_port())
    builder.ExportOutput(demux.get_output_port(0), "iiwa_position_measured")
    builder.ExportOutput(demux.get_output_port(1), "iiwa_velocity_estimated")
    builder.ExportOutput(plant.get_state_output_port(iiwa), "iiwa_state_estimated")

    # Export the target "state" outputs.
    demux = builder.AddSystem(Demultiplexer(
        13, 1))
    builder.Connect(plant.get_state_output_port(target), demux.get_input_port())
    builder.ExportOutput(demux.get_output_port(7), "target_angular_vel_x")
    builder.ExportOutput(demux.get_output_port(8), "target_angular_vel_y")
    builder.ExportOutput(demux.get_output_port(9), "target_angular_vel_z")
    # builder.ExportOutput(demux.get_output_port(0), "target_position_measured")
    # builder.ExportOutput(demux.get_output_port(1), "target_velocity_estimated")
    # builder.ExportOutput(plant.get_state_output_port(target), "target_state_estimated")

    # Make the plant for the iiwa controller to use.
    controller_plant = MultibodyPlant(time_step=time_step)
    controller_iiwa = AddIiwa(controller_plant)
    AddWsg(controller_plant, controller_iiwa, welded=True)
    controller_plant.Finalize()

    # Add the iiwa controller
    iiwa_controller = builder.AddSystem(
        InverseDynamicsController(
            controller_plant,
            kp=[2500]*num_iiwa_positions, #2500
            ki=[1]*num_iiwa_positions,    #1
            kd=[20]*num_iiwa_positions,   #20
            has_reference_acceleration=False))
    iiwa_controller.set_name("iiwa_controller")
    builder.Connect(
        plant.get_state_output_port(iiwa), iiwa_controller.get_input_port_estimated_state())

    # Add in the feed-forward torque
    adder = builder.AddSystem(Adder(2, num_iiwa_positions))
    builder.Connect(iiwa_controller.get_output_port_control(),
                    adder.get_input_port(0))
    # Use a PassThrough to make the port optional (it will provide zero values if not connected).
    torque_passthrough = builder.AddSystem(PassThrough([0]*num_iiwa_positions))
    builder.Connect(torque_passthrough.get_output_port(),
                    adder.get_input_port(1))
    builder.ExportInput(torque_passthrough.get_input_port(), 
                        "iiwa_feedforward_torque")
    builder.Connect(adder.get_output_port(),
                    plant.get_actuation_input_port(iiwa))

    # Add discrete derivative to command velocities.
    desired_state_from_position = builder.AddSystem(
        StateInterpolatorWithDiscreteDerivative(
            num_iiwa_positions, time_step, suppress_initial_transient=True))
    desired_state_from_position.set_name("desired_state_from_position")
    builder.Connect(desired_state_from_position.get_output_port(),      
                    iiwa_controller.get_input_port_desired_state())
    builder.Connect(iiwa_position.get_output_port(), 
                    desired_state_from_position.get_input_port())

    # Export commanded torques.
    #builder.ExportOutput(adder.get_output_port(), "iiwa_torque_commanded")
    #builder.ExportOutput(adder.get_output_port(), "iiwa_torque_measured")

    # Wsg controller.
    wsg_controller = builder.AddSystem(SchunkWsgPositionController())
    wsg_controller.set_name("wsg_controller")
    builder.Connect(
        wsg_controller.get_generalized_force_output_port(),             
        plant.get_actuation_input_port(wsg))
    builder.Connect(plant.get_state_output_port(wsg),
                    wsg_controller.get_state_input_port())
    builder.ExportInput(wsg_controller.get_desired_position_input_port(), 
                        "wsg_position")
    builder.ExportInput(wsg_controller.get_force_limit_input_port(),  
                        "wsg_force_limit")
    wsg_mbp_state_to_wsg_state = builder.AddSystem(
        MakeMultibodyStateToWsgStateSystem())
    builder.Connect(plant.get_state_output_port(wsg), 
                    wsg_mbp_state_to_wsg_state.get_input_port())
    builder.ExportOutput(wsg_mbp_state_to_wsg_state.get_output_port(), 
                         "wsg_state_measured")
    builder.ExportOutput(wsg_controller.get_grip_force_output_port(), 
                         "wsg_force_measured")

    # Cameras.
    # AddRgbdSensors(builder, plant, scene_graph)

    # Export "cheat" ports.
    builder.ExportOutput(scene_graph.get_query_output_port(), "geometry_query")
    builder.ExportOutput(plant.get_contact_results_output_port(), 
                         "contact_results")
    builder.ExportOutput(plant.get_state_output_port(), 
                         "plant_continuous_state")

    diagram = builder.Build()
    return diagram, plant

diagram, plant = MakeManipulationStation()

# display(SVG(pydot.graph_from_dot_data(diagram.GetGraphvizString())[0].create_svg()))

For this project I will be using a modified version of the IIWAPainter class from the Robot Painter assignment. I call my version the IIWACollector.

In [None]:
class IIWACollector():
    def __init__(self, traj=None, traj_wsg=None, X_WT=RigidTransform(p=[1.0,0,1.0]), V_WT=SpatialVelocity(w=[0.0,0.0,0.0], v=[0.0,0.0,0.0])):
        builder = DiagramBuilder()
        # set up the system of manipulation station
        station_diagram, self.plant = MakeManipulationStation()
        self.station = builder.AddSystem(station_diagram)
        self.iiwa_model_instance = self.plant.GetModelInstanceByName("iiwa7")
        self.X_WT=X_WT
        self.V_WT=V_WT

        # optionally add trajectory source
        if traj is not None:
            traj_v_G = traj.get_position_trajectory().MakeDerivative()
            traj_w_G = traj.get_orientation_trajectory().MakeDerivative()
            v_G_source = builder.AddSystem(TrajectorySource(traj_v_G))
            w_G_source = builder.AddSystem(TrajectorySource(traj_w_G))
            self.controller = builder.AddSystem(PseudoInverseController(self.plant))
            builder.Connect(v_G_source.get_output_port(), self.controller.GetInputPort("v_G"))
            builder.Connect(w_G_source.get_output_port(), self.controller.GetInputPort("w_G"))
            self.integrator = builder.AddSystem(Integrator(7))
            builder.Connect(self.controller.get_output_port(), 
                        self.integrator.get_input_port())
            builder.Connect(self.integrator.get_output_port(),
                            self.station.GetInputPort("iiwa_position"))
            builder.Connect(self.station.GetOutputPort("iiwa_position_measured"),
                            self.controller.GetInputPort("iiwa_position"))
        
        if traj_wsg is not None:
            wsg_source = builder.AddSystem(TrajectorySource(traj_wsg))
            wsg_source.set_name("wsg_command")
            builder.Connect(wsg_source.get_output_port(), self.station.GetInputPort("wsg_position"))
        else:
            wsg_position = builder.AddSystem(ConstantVectorSource([0.1]))
            builder.Connect(wsg_position.get_output_port(), self.station.GetInputPort("wsg_position"))

        params = MeshcatVisualizerParams()
        params.delete_on_initialization_event = False
        self.visualizer = MeshcatVisualizerCpp.AddToBuilder(
            builder, self.station.GetOutputPort("geometry_query"), meshcat, params)

        self.diagram = builder.Build()
        self.gripper_frame = self.plant.GetFrameByName('body')
        self.world_frame = self.plant.world_frame()

        context = self.CreateDefaultContext()
        self.diagram.Publish(context)

    def visualize_frame(self, name, X_WF, length=0.15, radius=0.006):
        """
        visualize imaginary frame that are not attached to existing bodies
        
        Input: 
            name: the name of the frame (str)
            X_WF: a RigidTransform to from frame F to world.
        
        Frames whose names already exist will be overwritten by the new frame
        """
        AddMeshcatTriad(meshcat, "painter/" + name,
                        length=length, radius=radius, X_PT=X_WF)

    def CreateDefaultContext(self):
        context = self.diagram.CreateDefaultContext()
        plant_context = self.diagram.GetMutableSubsystemContext(self.plant,   context)
        station_context = self.diagram.GetMutableSubsystemContext(self.station, context)
        # station_context = self.station.GetMyContextFromRoot(self.simulator.get_mutable_context())
        
        # provide initial states
        q0 = np.array([ 1.40666193e-05,  1.56461165e-01, -3.82761069e-05, 
                       -1.32296976e+00, -6.29097287e-06,  1.61181157e+00, -2.66900985e-05])
        q0_2 = np.array([ 1.40666193e-05,  1.56461165e-01, -3.82761069e-05, 
                       -1.32296976e+00, -6.29097287e-06,  1.61181157e+00, 1])
        
        # set the joint positions of the kuka arm
        self.plant.SetPositions(plant_context, self.iiwa_model_instance, q0)

        # Set target's initial condition (pose and spatial velocity)
        target = self.plant.GetBodyByName('target')
        self.plant.SetFreeBodyPose(plant_context, target, X_WB=self.X_WT)
        self.plant.SetFreeBodySpatialVelocity(context=plant_context, body=target, V_WB=self.V_WT)

        if hasattr(self, 'integrator'):
            self.integrator.set_integral_value(
                self.integrator.GetMyMutableContextFromRoot(context), 
                self.plant.GetPositions(plant_context, self.iiwa_model_instance))

        return context
    
    def get_X_WG(self, context=None):

        if not context:
            context = self.CreateDefaultContext()
        plant_context = self.plant.GetMyMutableContextFromRoot(context)
        X_WG = self.plant.CalcRelativeTransform(
                    plant_context,
                    frame_A=self.world_frame,
                    frame_B=self.gripper_frame)
        return X_WG

    def simulate(self):
        context = self.CreateDefaultContext()
        simulator = Simulator(self.diagram, context)
        simulator.set_target_realtime_rate(1.0)

        duration = 20.0 if running_as_notebook else 0.01
        simulator.AdvanceTo(duration)

    def short_sim(self):
        context = self.CreateDefaultContext()
        simulator = Simulator(self.diagram, context)
        simulator.set_target_realtime_rate(1.0)

        duration = 5
        simulator.AdvanceTo(duration)

    def get_target_location(self):
        return self.X_WT.p, self.X_WT.R

    def get_diagram(self):
        return self.diagram

In [None]:
# meshcat.Delete()
collector = IIWACollector(X_WT=RigidTransform(p=[1.0,0,1.0]), V_WT=SpatialVelocity(w=[0.1,0.0,0.0], v=[0.01,0.0,0.0]))

diagram = collector.get_diagram()

# collector.simulate()
# display(SVG(pydot.graph_from_dot_data(diagram.GetGraphvizString())[0].create_svg()))

# Planning and executing a grasp maneuver

First, we need to provide an inverse kinematics controller and a pose trajectory for the wsg gripper to be at pre-specified grasp pose. This grasp pose is determined by the initial location of the target, and since we are currently assuming that our target pose estimation is perfect then the grasp pose is easy to compute. Also, in this section we will mainly be focusing on grasping a stationary target.

Let's plug in the PseudoInverseController from the Robot Painter assignment as a start.

In [None]:
class PseudoInverseController(LeafSystem):
    """
    same controller seen in 'Robot Painter'
    """
    def __init__(self, plant):
        LeafSystem.__init__(self)
        self._plant = plant
        self._plant_context = plant.CreateDefaultContext()
        self._iiwa = plant.GetModelInstanceByName("iiwa7")
        self._G = plant.GetBodyByName("body").body_frame()
        self._W = plant.world_frame()

        self.w_G_port = self.DeclareVectorInputPort("w_G", BasicVector(3))
        self.v_G_port = self.DeclareVectorInputPort("v_G", BasicVector(3))
        self.q_port = self.DeclareVectorInputPort("iiwa_position", BasicVector(7))
        self.DeclareVectorOutputPort("iiwa_velocity", BasicVector(7), 
                                     self.CalcOutput)
        self.iiwa_start = plant.GetJointByName("iiwa_joint_1").velocity_start()
        self.iiwa_end = plant.GetJointByName("iiwa_joint_7").velocity_start()

    def CalcOutput(self, context, output):
        w_G = self.w_G_port.Eval(context)
        v_G = self.v_G_port.Eval(context)
        V_G = np.hstack([w_G, v_G])
        q = self.q_port.Eval(context)
        self._plant.SetPositions(self._plant_context, self._iiwa, q)
        J_G = self._plant.CalcJacobianSpatialVelocity(
            self._plant_context, JacobianWrtVariable.kV, 
            self._G, [0,0,0], self._W, self._W)
        J_G = J_G[:,self.iiwa_start:self.iiwa_end+1] # Only iiwa terms.
        v = np.linalg.pinv(J_G).dot(V_G) #important
        output.SetFromVector(v)

To verify that my modified system is working, let's run a the circular trajectory example from the same Robot Painter assignment. Running the cell below should draw the circular trajectory of keyframes that we saw in the assignment.

In [None]:
X_WorldGripper_init = collector.get_X_WG()

# # define center and radius
radius = 0.1
p0 = [0.45, 0.0, 0.4]
R0 = RotationMatrix(np.array([[0, 1, 0], [0, 0, -1], [-1, 0, 0]], dtype=np.float).T)
X_WorldCenter = RigidTransform(R0, p0)
num_key_frames = 10
"""
you may use different thetas as long as your trajectory starts
from the Start Frame above and your rotation is positive
in the world frame about +z axis
thetas = np.linspace(0, 2*np.pi, num_key_frames)
"""
thetas = np.linspace(0, 2*np.pi, num_key_frames)

def compose_circular_key_frames(thetas, X_WorldCenter, X_WorldGripper_init):
    """    
    returns: a list of RigidTransforms
    """
    ## this is an template, replace your code below
    key_frame_poses_in_world = [X_WorldGripper_init]
    for theta in thetas:
        p_next_frame = np.array(p0) + np.array([radius*np.cos(theta), radius*np.sin(theta), 0])
        R_next_frame = RotationMatrix.MakeYRotation(-np.pi/2) @ RotationMatrix.MakeZRotation(np.pi/2) @ RotationMatrix.MakeYRotation(-theta)
        next_pose = RigidTransform(R_next_frame, p_next_frame)
        key_frame_poses_in_world.append(next_pose)
        
    return key_frame_poses_in_world

# check key frames instead of interpolated trajectory
def visualize_key_frames(frame_poses):
    for i, pose in enumerate(frame_poses):
        collector.visualize_frame('frame_{}'.format(i), pose, length=0.05)

key_frame_poses = compose_circular_key_frames(thetas, X_WorldCenter, X_WorldGripper_init)  
times = np.linspace(0, 20, num_key_frames+1)
traj = PiecewisePose.MakeLinear(times, key_frame_poses)

# Initialize target pose and spatial velocity
p_target = np.array([0.55,0.55,0.8])
R_target = RotationMatrix()
w_target = [0.0,0.0,0.0]
v_target = [0.0,0.0,0.0]
X_WT=RigidTransform(R_target, p_target) 
V_WT=SpatialVelocity(w_target, v_target)
collector = IIWACollector(traj=traj, X_WT=X_WT, V_WT=V_WT)

visualize_key_frames(key_frame_poses)

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  R0 = RotationMatrix(np.array([[0, 1, 0], [0, 0, -1], [-1, 0, 0]], dtype=np.float).T)


Final sanity check! Let's execute the circular trajectory.

In [None]:
collector.simulate()

Given the initial pose of the target, let's visualize the desired gripper pre-grasp pose (in the world frame) using a keyframe. Running the cell below draws the keyframe in the meshcat visualizer. The following function 'visualize_frame' comes from the Robot Painter problem set.

In [None]:
# Re-initialize environment
# collector = IIWACollector(traj, X_WT, V_WT)

# Compute pre-grasp pose
p_pre_grasp = p_target + np.array([0.0, -0.15, 0.0])
R_pre_grasp = RotationMatrix.MakeYRotation(np.pi)
# R_pre_grasp = RotationMatrix(np.array([[0, 1, 0], [0, 0, -1], [-1, 0, 0]], dtype=np.float).T)
X_pre_grasp = RigidTransform(R_pre_grasp, p_pre_grasp)
# X_pre_grasp = X_WT

collector.visualize_frame('pre_grasp', X_pre_grasp)

Now that we've defined the pre-grasp pose, let's generate a new trajectory like in the circular example.

In [None]:
X_WorldGripper_init = collector.get_X_WG()
num_key_frames = 1

def compose_traj_key_frames(X_WorldGripper_init):
    return [X_WorldGripper_init, X_pre_grasp]

key_frame_poses = compose_traj_key_frames(X_WorldGripper_init)  
times = np.linspace(0, 20, num_key_frames+1)
# print(times)
traj = PiecewisePose.MakeLinear(times, key_frame_poses)

# Re-initialize environment
collector = IIWACollector(traj=traj, X_WT=X_WT, V_WT=V_WT)

In [None]:
collector.simulate()

In this next section, we'll try performing a more effective interpolation approach while also clearly defining the phases in the trajectory and executing the full thing.

The maneuver can  be broken down like this:
1) Initial - gripper is at initial pose X_G_init, target is at initial pose X_T_init
2) Pre-grasp - gripper is at pre-grasp pose (relative to the target) X_T_Gpregrasp, target is still at pose X_T_init
3) Grasp - gripper is at grasp pose (relative to the target) X_T_Ggrasp, target is still at pose X_T_init

First, we need to define these frames in order to later assign them timestamps. This cell below is modified from the Chapter 3 notebook on Pick and Place methods.

In [None]:
def make_gripper_frames(X_G, X_O, target_clearance=np.array([0.0, 0.08, 0.0]), R_GgraspTarget=RotationMatrix.MakeYRotation(np.pi).MakeZRotation(-np.pi/4).inverse()):
  """
  Takes a partial specification with X_G["initial"] and X_O["initial"] and X_0["goal"], and 
  returns a X_G and times with all of the pick and place frames populated.
  """
  # Define (again) the gripper pose relative to the object when in grasp.
  p_GgraspO = target_clearance
  R_GgraspO = R_GgraspTarget
  # R_GgraspO = RotationMatrix.MakeZRotation(np.pi/2) #.MakeZRotation(-np.pi/4).inverse()
  X_GgraspO = RigidTransform(R_GgraspO, p_GgraspO)
  X_OGgrasp = X_GgraspO.inverse()
  # pregrasp is negative y in the gripper frame (see the figure!).
  X_GgraspGpregrasp = RigidTransform(p=[0, -0.2, 0])

  X_G["pick"] = X_O["initial"].multiply(X_OGgrasp)
  X_G["prepick"] = X_G["pick"].multiply(X_GgraspGpregrasp)
  X_G["adjusted"] = X_G["prepick"]
  X_G["postpick"] = X_G["pick"]

  # Now let's set the timing
  times = {"initial": 0}
  X_GinitialGprepick = X_G["initial"].inverse().multiply(X_G["prepick"])
  times["prepick"] = times["initial"] + 15.0*np.linalg.norm(X_GinitialGprepick.translation())
  times["adjusted"] = times["prepick"] + 2.0
  # Allow some time for the gripper to close.
  times["pick_start"] = times["adjusted"] + 7.0
  times["pick_end"] = times["pick_start"] + 2.0
  times["postpick"] = times["pick_end"] + 2.0

  return X_G, times

# Re-initialize environment
p_target = np.array([0.5,0.5,0.8])
R_target = RotationMatrix()
w_target = [0.0,0.0,0.0]
v_target = [0.0,0.0,0.0]
X_WT=RigidTransform(R_target, p_target) 
V_WT=SpatialVelocity(w_target, v_target)

X_G = {"initial": X_WorldGripper_init}
X_O = {"initial": X_WT, "goal": X_WT}
X_G, times = make_gripper_frames(X_G, X_O)
# print(X_G)
# print(times)
print(f"Sanity check: The entire maneuver will take {times['postpick']} seconds to execute.")

Sanity check: The entire maneuver will take 19.16613355823499 seconds to execute.


Let's visualize the frames to make sure they are correct.

In [None]:
def visualize_gripper_frames(X_G, X_O):
    builder = DiagramBuilder()

    plant, scene_graph = AddMultibodyPlantSceneGraph(builder, time_step = 0.0)
    parser = Parser(plant, scene_graph)
    gripper = FindResourceOrThrow(
        "drake/manipulation/models/wsg_50_description/sdf/schunk_wsg_50_no_tip.sdf")
    mu = 0.5
    r = 0.03
    l = 0.15
    m = 1
    color = [1, 0, 0, 1]
    AddShape(plant, Cylinder(r, l), name="target", mass=m, mu=mu, color=color)
    for key, pose in X_G.items():
      g = parser.AddModelFromFile(gripper, f"gripper_{key}")
      plant.WeldFrames(plant.world_frame(), plant.GetFrameByName("body", g), pose)
    plant.WeldFrames(plant.world_frame(), plant.GetFrameByName("target"), X_WT)

    plant.Finalize()

    meshcat.Delete()
    visualizer = MeshcatVisualizerCpp.AddToBuilder(
        builder, scene_graph, meshcat)

    diagram = builder.Build()
    context = diagram.CreateDefaultContext()
    diagram.Publish(context)

visualize_gripper_frames(X_G, X_O)

Next is constructing the position trajectory like we did before with PiecewisePose.MakeLinear, however now PiecwisePolynomial.

In [None]:
def make_gripper_position_trajectory(X_G, times):
    """
    Constructs a gripper position trajectory from the plan "sketch".
    """
    traj = PiecewisePolynomial.FirstOrderHold(
        [times["initial"], times["prepick"]], np.vstack([X_G["initial"].translation(), X_G["prepick"].translation()]).T)

    traj.AppendFirstOrderSegment(times["adjusted"], X_G["adjusted"].translation())
    traj.AppendFirstOrderSegment(times["pick_start"], X_G["pick"].translation())
    traj.AppendFirstOrderSegment(times["pick_end"], X_G["pick"].translation())
    traj.AppendFirstOrderSegment(times["postpick"], X_G["pick"].translation())

    return traj

traj_p_G = make_gripper_position_trajectory(X_G, times)

Similarly, we construct the orientation trajectory.

In [None]:
def make_gripper_orientation_trajectory(X_G, times):
    """
    Constructs a gripper position trajectory from the plan "sketch".
    """
    traj = PiecewiseQuaternionSlerp();
    traj.Append(times["initial"], X_G["initial"].rotation())
    traj.Append(times["prepick"], X_G["prepick"].rotation())
    traj.Append(times["adjusted"], X_G["adjusted"].rotation())
    traj.Append(times["pick_start"], X_G["pick"].rotation())
    traj.Append(times["pick_end"], X_G["pick"].rotation())
    traj.Append(times["postpick"], X_G["pick"].rotation())

    return traj

traj_R_G = make_gripper_orientation_trajectory(X_G, times)

Now we construct the gripper trajectory.

In [None]:
opened = np.array([0.107]);
closed = np.array([0.0]);

def make_wsg_command_trajectory(times):
    traj_wsg_command = PiecewisePolynomial.FirstOrderHold(
        [times["initial"], times["pick_start"]], np.hstack([[opened], [opened]]))
    traj_wsg_command.AppendFirstOrderSegment(times["pick_end"], closed)
    return traj_wsg_command

traj_wsg_command = make_wsg_command_trajectory(times)

Let's try executing the full trajectory now.

In [None]:
traj = PiecewisePose(traj_p_G, traj_R_G)

# meshcat.Delete()
collector = IIWACollector(traj, traj_wsg_command, X_WT, V_WT)
# diagram = collector.get_diagram()
# display(SVG(pydot.graph_from_dot_data(diagram.GetGraphvizString())[0].create_svg()))

In [None]:
collector.simulate()

While there are still some kinks in the pick-and-place pipeline, including some deviation from the desired poses, the task of grasping a free floating object is achievable. Unfortunately, to make this work I needed to experiment with various initial positions for the target and with the interpolated frame intervals. My current best explanation for the difficulty in getting this to work robustly is that the robot is encountering singularities or saturated joint velocities, which could be addressed using an DiffIKQP formulation. Also gravity is turned off, so...

# Engaging with a tumbling target

Now we'll try catching the target again, but with a few new cases:
1) the target will have some pre-specified rotational velocity (about the target's x-axis)
2) the target will have some pre-specified linear velocity
3) the target will have some combination of initial rotational and linear velocity

Tackling case #1 will require some changes to the WSG controller, in particular there's a need to drive the gripper rotational velocity to match the target's in order to successfully grasp it. Because we still know the target's state perfectly, we know the rotational velocity at all times and now we just need to use that as an input to our new WSG controller. One last important detail, however, is that we also need to align the gripper's rotation to grasp the target on its round face, as opposed to grasping it from the ends.

In [None]:
class CollectorPlanner(LeafSystem):
    ''' Effectively acts as a custom TrajectorySource
    '''
    def __init__(self, plant, traj, poses, times, output_vel):
        LeafSystem.__init__(self)
        self._plant = plant
        self._plant_context = plant.CreateDefaultContext()
        self._iiwa = plant.GetModelInstanceByName("iiwa7")
        self._G = plant.GetBodyByName("body").body_frame()
        self._W = plant.world_frame()
        
        assert traj.cols() == 1
        port = self.DeclareVectorOutputPort("value", traj.rows(), self.CalcOutput)
        port.disable_caching_by_default()
        self.w_x_port = self.DeclareVectorInputPort("target_rotation_rate_x", BasicVector(1))
        self.w_y_port = self.DeclareVectorInputPort("target_rotation_rate_y", BasicVector(1))
        self.w_z_port = self.DeclareVectorInputPort("target_rotation_rate_z", BasicVector(1))
        # Keep the trajectory as undeclared state
        self.traj = traj
        self.match_time = times['adjusted']
        self.updated = False
        self.w_cyl = None
        self.R_WG_adjusted = poses['adjusted'].rotation()

        # Flag to see whether outputting linear or angular velocities
        self.output_vel = output_vel

    def get_X_WG(self):

        X_WG = self._plant.CalcRelativeTransform(
                    self._plant_context,
                    frame_A=self._W,
                    frame_B=self._G)
        return X_WG

    def CalcOutput(self, context, output):
        # Note: you can add input ports and read their values here.
        if (context.get_time() >= times['pick_end']):
            output.SetFromVector(self.traj.value(context.get_time()))
        elif (context.get_time() >= self.match_time):
            target_ang_vel_x = self.w_x_port.Eval(context)
            target_ang_vel_y = self.w_y_port.Eval(context)
            target_ang_vel_z = self.w_z_port.Eval(context)
            target_ang_vel = np.array([target_ang_vel_x[0], target_ang_vel_y[0], target_ang_vel_z[0]])
            # print(target_ang_vel)

            # If outputting linear velocity
            if self.output_vel:
                lin_vel = np.array([0.0, 0.0, 0.0])
                # output.SetFromVector(lin_vel)
                output.SetFromVector(self.traj.value(context.get_time()))
            # Else if outputting angular velocity
            else:
                if self.w_cyl is None:
                    self.w_cyl = np.linalg.norm(target_ang_vel)
                ang_vel_y_G = (self.w_cyl * times['adjusted'] + self.w_cyl * (times['pick_start'] - times['adjusted'])) / (times['pick_start'] - times['adjusted'])
                ang_vel_G = np.array([0, ang_vel_y_G, 0.0])
                ang_vel = self.R_WG_adjusted.multiply(ang_vel_G)
                # print(ang_vel)
                output.SetFromVector(ang_vel)
        else:
            output.SetFromVector(self.traj.value(context.get_time()))


In [None]:
class IIWACollector2():
    def __init__(self, traj=None, poses=None, times=None, traj_wsg=None, X_WT=RigidTransform(p=[1.0,0,1.0]), V_WT=SpatialVelocity(w=[0.0,0.0,0.0], v=[0.0,0.0,0.0])):
        builder = DiagramBuilder()
        # set up the system of manipulation station
        station_diagram, self.plant = MakeManipulationStation()
        self.station = builder.AddSystem(station_diagram)
        self.iiwa_model_instance = self.plant.GetModelInstanceByName("iiwa7")
        self.X_WT=X_WT
        self.V_WT=V_WT

        # optionally add trajectory source
        if traj is not None:
            traj_v_G = traj.get_position_trajectory().MakeDerivative()
            traj_w_G = traj.get_orientation_trajectory().MakeDerivative()
            v_G_source = builder.AddSystem(CollectorPlanner(self.plant, traj_v_G, poses, times, True))
            w_G_source = builder.AddSystem(CollectorPlanner(self.plant, traj_w_G, poses, times, False))
            self.controller = builder.AddSystem(PseudoInverseController(self.plant))
            builder.Connect(v_G_source.get_output_port(), self.controller.GetInputPort("v_G"))
            builder.Connect(w_G_source.get_output_port(), self.controller.GetInputPort("w_G"))
            builder.Connect(self.station.GetOutputPort("target_angular_vel_x"), v_G_source.GetInputPort("target_rotation_rate_x"))
            builder.Connect(self.station.GetOutputPort("target_angular_vel_x"), w_G_source.GetInputPort("target_rotation_rate_x"))
            builder.Connect(self.station.GetOutputPort("target_angular_vel_y"), v_G_source.GetInputPort("target_rotation_rate_y"))
            builder.Connect(self.station.GetOutputPort("target_angular_vel_y"), w_G_source.GetInputPort("target_rotation_rate_y"))
            builder.Connect(self.station.GetOutputPort("target_angular_vel_z"), v_G_source.GetInputPort("target_rotation_rate_z"))
            builder.Connect(self.station.GetOutputPort("target_angular_vel_z"), w_G_source.GetInputPort("target_rotation_rate_z"))
            self.integrator = builder.AddSystem(Integrator(7))
            builder.Connect(self.controller.get_output_port(), 
                        self.integrator.get_input_port())
            builder.Connect(self.integrator.get_output_port(),
                            self.station.GetInputPort("iiwa_position"))
            builder.Connect(self.station.GetOutputPort("iiwa_position_measured"),
                            self.controller.GetInputPort("iiwa_position"))
        # handle gripper velocity matching
            
        if traj_wsg is not None:
            wsg_source = builder.AddSystem(TrajectorySource(traj_wsg))
            wsg_source.set_name("wsg_command")
            builder.Connect(wsg_source.get_output_port(), self.station.GetInputPort("wsg_position"))
        else:
            wsg_position = builder.AddSystem(ConstantVectorSource([0.1]))
            builder.Connect(wsg_position.get_output_port(), self.station.GetInputPort("wsg_position"))

        params = MeshcatVisualizerParams()
        params.delete_on_initialization_event = False
        self.visualizer = MeshcatVisualizerCpp.AddToBuilder(
            builder, self.station.GetOutputPort("geometry_query"), meshcat, params)

        self.diagram = builder.Build()
        self.gripper_frame = self.plant.GetFrameByName('body')
        self.world_frame = self.plant.world_frame()

        context = self.CreateDefaultContext()
        self.diagram.Publish(context)

    def visualize_frame(self, name, X_WF, length=0.15, radius=0.006):
        """
        visualize imaginary frame that are not attached to existing bodies
        
        Input: 
            name: the name of the frame (str)
            X_WF: a RigidTransform to from frame F to world.
        
        Frames whose names already exist will be overwritten by the new frame
        """
        AddMeshcatTriad(meshcat, "painter/" + name,
                        length=length, radius=radius, X_PT=X_WF)

    def CreateDefaultContext(self):
        context = self.diagram.CreateDefaultContext()
        plant_context = self.diagram.GetMutableSubsystemContext(self.plant,   context)
        station_context = self.diagram.GetMutableSubsystemContext(self.station, context)
        # station_context = self.station.GetMyContextFromRoot(self.simulator.get_mutable_context())
        
        # provide initial states
        q0 = np.array([ 1.40666193e-05,  1.56461165e-01, -3.82761069e-05, 
                       -1.32296976e+00, -6.29097287e-06,  1.61181157e+00, -2.66900985e-05])
        
        # set the joint positions of the kuka arm
        self.plant.SetPositions(plant_context, self.iiwa_model_instance, q0)

        # Set target's initial condition (pose and spatial velocity)
        target = self.plant.GetBodyByName('target')
        self.plant.SetFreeBodyPose(plant_context, target, X_WB=self.X_WT)
        self.plant.SetFreeBodySpatialVelocity(context=plant_context, body=target, V_WB=self.V_WT)

        if hasattr(self, 'integrator'):
            self.integrator.set_integral_value(
                self.integrator.GetMyMutableContextFromRoot(context), 
                self.plant.GetPositions(plant_context, self.iiwa_model_instance))

        return context
    
    def get_X_WG(self, context=None):

        if not context:
            context = self.CreateDefaultContext()
        plant_context = self.plant.GetMyMutableContextFromRoot(context)
        X_WG = self.plant.CalcRelativeTransform(
                    plant_context,
                    frame_A=self.world_frame,
                    frame_B=self.gripper_frame)
        return X_WG

    def simulate(self):
        context = self.CreateDefaultContext()
        simulator = Simulator(self.diagram, context)
        simulator.set_target_realtime_rate(1.0)

        duration = 20.0 if running_as_notebook else 0.01
        simulator.AdvanceTo(duration)

    def simulate_with_duration(self, tiempo):
        context = self.CreateDefaultContext()
        simulator = Simulator(self.diagram, context)
        simulator.set_target_realtime_rate(1.0)

        duration = tiempo if running_as_notebook else 0.01
        simulator.AdvanceTo(duration)

    def short_sim(self):
        context = self.CreateDefaultContext()
        simulator = Simulator(self.diagram, context)
        simulator.set_target_realtime_rate(1.0)

        duration = 5
        simulator.AdvanceTo(duration)

    def get_target_location(self):
        return self.X_WT.p, self.X_WT.R

    def get_diagram(self):
        return self.diagram

In [None]:
# Re-initialize target and motion plan

# p_target = np.array([0.6,0.6,0.8])
p_target = np.array([0.55,0.55,0.8])
R_target = RotationMatrix()
w_target = [0.15,0.15,0.0]
v_target = [0.0,0.0,0.0]
X_WT=RigidTransform(R_target, p_target) 
V_WT=SpatialVelocity(w_target, v_target)

X_G = {"initial": X_WorldGripper_init}
X_O = {"initial": X_WT, "goal": X_WT}
X_G, times = make_gripper_frames(X_G, X_O, target_clearance=np.array([0.0,0.1,0.0]))
# print(X_G)
# print(times)
print(f"Sanity check: The entire maneuver will take {times['postpick']} seconds to execute.")
visualize_gripper_frames(X_G, X_O)
traj_p_G = make_gripper_position_trajectory(X_G, times)
traj_R_G = make_gripper_orientation_trajectory(X_G, times)
traj_wsg_command = make_wsg_command_trajectory(times)
traj = PiecewisePose(traj_p_G, traj_R_G)

Sanity check: The entire maneuver will take 19.384713688718133 seconds to execute.


In [None]:
# Re-initialize collector (now IIWACollector2)

meshcat.Delete()
collector2 = IIWACollector2(traj=traj, poses=X_G, times=times, traj_wsg=traj_wsg_command, X_WT=X_WT, V_WT=V_WT)

# collector.short_sim()
diagram = collector2.get_diagram()
# display(SVG(pydot.graph_from_dot_data(diagram.GetGraphvizString())[0].create_svg()))

Running the cell below demonstrates the iiwa gripper predicting a future target attitude and setting a gripper angular velocity in order to catch the target at that future point.

In [None]:
collector2.simulate()

Next we can momentarily forget about rotation and think about target translation. Now the target will have some initial linear velocity and iiwa must effectively intercept the target in its trajectory. Because the target is free-floating in empty space, we can reuse a lot of the previous infrastructure for the motion planning by precomputing the predicted state of the target at the intercept point. Although it's messy, I will modify make_gripper_frames below to server the initial linear velocity case. 

In [None]:
def make_gripper_frames_lin(X_G, X_O, target_clearance=np.array([0.0, 0.08, 0.0]), R_GgraspTarget=RotationMatrix().MakeZRotation(np.pi/2), t_catch=20, v_target=0.05):
  """
  Takes a partial specification with X_G["initial"] and X_O["initial"] and X_0["goal"], and 
  returns a X_G and times with all of the pick and place frames populated.
  """
  # Define (again) the gripper pose relative to the object when in grasp.
  p_GgraspO = target_clearance
  R_GgraspO = R_GgraspTarget
  # R_GgraspO = RotationMatrix.MakeZRotation(np.pi/2) #.MakeZRotation(-np.pi/4).inverse()
  X_GgraspO = RigidTransform(R_GgraspO, p_GgraspO)
  X_OGgrasp = X_GgraspO.inverse()
  # pregrasp is negative y in the gripper frame (see the figure!).
  X_GgraspGpregrasp = RigidTransform(p=[0, -0.2, 0])

  X_G["pick"] = X_O["initial"].multiply(X_OGgrasp)
  X_G["prepick"] = X_G["pick"].multiply(X_GgraspGpregrasp)
  X_G["adjusted"] = X_G["prepick"]
  X_G["postpick"] = X_G["pick"]

  # Now let's set the timing
  times = {"initial": 0}
  delay_time = 3.0
  X_GinitialGprepick = X_G["initial"].inverse().multiply(X_G["prepick"])
  times["prepick"] = times["initial"] + t_catch - delay_time # 15.0*np.linalg.norm(X_GinitialGprepick.translation())
  times["adjusted"] = times["prepick"] + 1e-5
  # Allow some time for the gripper to close.
  times["pick_start"] = times["adjusted"] + delay_time + 5.0 * v_target
  times["pick_end"] = times["pick_start"] + 0.5 # 2.0
  times["postpick"] = times["pick_end"] + 0.5 # 2.0

  return X_G, times

In [None]:
# Re-initialize target and motion plan

# p_target = np.array([0.6,0.6,0.8])
p_target = np.array([0.55,0.55,0.8])
R_target = RotationMatrix()
w_target = [0.0,0.0,0.0]
v_target = [0.0,-0.01,0.0]
X_WT_init=RigidTransform(R_target, p_target) 
V_WT_init=SpatialVelocity(w_target, v_target)

# Set time of target capture from start of trajectory 
t_catch = 10 # in seconds

# Predict where the target will be after t_catch
p_target_intercept = p_target + np.array(v_target) * t_catch 

X_WT_intercept = RigidTransform(R_target, p_target_intercept)

X_G = {"initial": X_WorldGripper_init}
X_O = {"initial": X_WT_intercept, "goal": X_WT_intercept}
X_G, times = make_gripper_frames_lin(X_G, X_O, target_clearance=np.array([0.0,0.1,0.0]), R_GgraspTarget=RotationMatrix().MakeZRotation(np.pi/2), t_catch=t_catch, v_target=np.linalg.norm(np.array(v_target)))
# print(times)

# print(X_G)
# print(times)
print(f"Sanity check: The entire maneuver will take {times['postpick']} seconds to execute.")
visualize_gripper_frames(X_G, X_O)
traj_p_G = make_gripper_position_trajectory(X_G, times)
traj_R_G = make_gripper_orientation_trajectory(X_G, times)
traj_wsg_command = make_wsg_command_trajectory(times)
traj = PiecewisePose(traj_p_G, traj_R_G)

Sanity check: The entire maneuver will take 11.05001 seconds to execute.


In [None]:
# Re-initialize collector (now IIWACollector2)

meshcat.Delete()
collector3 = IIWACollector2(traj=traj, poses=X_G, times=times, traj_wsg=traj_wsg_command, X_WT=X_WT_init, V_WT=V_WT_init)

# collector.short_sim()
diagram = collector3.get_diagram()
# display(SVG(pydot.graph_from_dot_data(diagram.GetGraphvizString())[0].create_svg()))

In [None]:
collector3.simulate_with_duration(t_catch + 2.0)

Now, let's combine the initial rotation and translation rates.

In [None]:
def make_gripper_frames_combo(X_G, X_O, target_clearance=np.array([0.0, 0.08, 0.0]), R_GgraspTarget=RotationMatrix.MakeYRotation(np.pi).MakeZRotation(-np.pi/4).inverse(), t_catch=20, v_target=0.05):
  """
  Takes a partial specification with X_G["initial"] and X_O["initial"] and X_0["goal"], and 
  returns a X_G and times with all of the pick and place frames populated.
  """
  # Define (again) the gripper pose relative to the object when in grasp.
  p_GgraspO = target_clearance
  R_GgraspO = R_GgraspTarget
  # R_GgraspO = RotationMatrix.MakeZRotation(np.pi/2) #.MakeZRotation(-np.pi/4).inverse()
  X_GgraspO = RigidTransform(R_GgraspO, p_GgraspO)
  X_OGgrasp = X_GgraspO.inverse()
  # pregrasp is negative y in the gripper frame (see the figure!).
  X_GgraspGpregrasp = RigidTransform(p=[0, -0.2, 0])

  X_G["pick"] = X_O["initial"].multiply(X_OGgrasp)
  X_G["prepick"] = X_G["pick"].multiply(X_GgraspGpregrasp)
  X_G["adjusted"] = X_G["prepick"]
  X_G["postpick"] = X_G["pick"]

  # Now let's set the timing
  times = {"initial": 0}
  delay_time = 3.0
  X_GinitialGprepick = X_G["initial"].inverse().multiply(X_G["prepick"])
  times["prepick"] = times["initial"] + t_catch - delay_time # 15.0*np.linalg.norm(X_GinitialGprepick.translation())
  times["adjusted"] = times["prepick"] + 1e-5
  # Allow some time for the gripper to close.
  times["pick_start"] = times["adjusted"] + delay_time + 5.0 * v_target
  times["pick_end"] = times["pick_start"] + 0.5 # 2.0
  times["postpick"] = times["pick_end"] + 0.5 # 2.0

  return X_G, times

In [None]:
# Re-initialize target and motion plan

# p_target = np.array([0.6,0.6,0.8])
p_target = np.array([0.55,0.55,0.8])
R_target = RotationMatrix()
w_target = [0.05,0.05,0.0]
v_target = [0.0,-0.01,0.0]
X_WT_init=RigidTransform(R_target, p_target) 
V_WT_init=SpatialVelocity(w_target, v_target)

# Set time of target capture from start of trajectory 
t_catch = 10 # in seconds, try 15 for a really extended catch!

# Predict where the target will be after t_catch
p_target_intercept = p_target + np.array(v_target) * t_catch 

X_WT_intercept = RigidTransform(R_target, p_target_intercept)

X_G = {"initial": X_WorldGripper_init}
X_O = {"initial": X_WT_intercept, "goal": X_WT_intercept}
X_G, times = make_gripper_frames_combo(X_G, X_O, target_clearance=np.array([0.0,0.1,0.0]), t_catch=t_catch, v_target=np.linalg.norm(np.array(v_target)))
# print(times)

# print(X_G)
# print(times)
print(f"Sanity check: The entire maneuver will take {times['postpick']} seconds to execute.")
visualize_gripper_frames(X_G, X_O)
traj_p_G = make_gripper_position_trajectory(X_G, times)
traj_R_G = make_gripper_orientation_trajectory(X_G, times)
traj_wsg_command = make_wsg_command_trajectory(times)
traj = PiecewisePose(traj_p_G, traj_R_G)

Sanity check: The entire maneuver will take 11.05001 seconds to execute.


In [None]:
# Re-initialize collector (now IIWACollector2)

meshcat.Delete()
collector4 = IIWACollector2(traj=traj, poses=X_G, times=times, traj_wsg=traj_wsg_command, X_WT=X_WT_init, V_WT=V_WT_init)

# collector.short_sim()
diagram = collector4.get_diagram()
# display(SVG(pydot.graph_from_dot_data(diagram.GetGraphvizString())[0].create_svg()))

In [None]:
collector4.simulate_with_duration(t_catch + 2.0)

# Experiments with target initial conditions

The main experiments that are conducted here are separated by case:
- Case 1: Catching a rotating target
- Case 2: Catching a translating target
- Case 3: Catching a rotating and translating target
- Case 4: Catching a rotating and translating target (fixed velocities) with varying initial target positions

Under Case 1, we can test the following initial angular velocities (in rad/s):
- omega_target = 0.01
- omega_target = 0.05
- omega_target = 0.1
- omega_target = 0.2
- omega_target = 0.5

Under Case 2, we can test the following initial linear velocities (in m/s):
- v_target = 0.01 
- v_target = 0.05
- v_target = 0.1
- v_target = 0.2
- v_target = 0.5

Under Case 3, we can test the following conditions (angular velocity: rad/s, linear velocity: m/s):
- 0.01, 0.01
- 0.05, 0.01
- 0.1, 0.01
- 0.1, 0.1
- 0.2, 0.2

Under Case 4, we can test the following conditions (position vector):
- 0.7,0.7,0.8
- 0.1,0.1,0.1
- -0.55,-0.55,0.8

In [None]:
def case_1_test(omega_target, p_target=np.array([0.55,0.55,0.8]), R_target=RotationMatrix()):
    w_target = [omega_target,omega_target,0.0]
    v_target = [0.0,0.0,0.0]
    X_WT=RigidTransform(R_target, p_target) 
    V_WT=SpatialVelocity(w_target, v_target)

    X_G = {"initial": X_WorldGripper_init}
    X_O = {"initial": X_WT, "goal": X_WT}
    X_G, new_times = make_gripper_frames(X_G, X_O, target_clearance=np.array([0.0,0.1,0.0]))

    traj_p_G = make_gripper_position_trajectory(X_G, new_times)
    traj_R_G = make_gripper_orientation_trajectory(X_G, new_times)
    traj_wsg_command = make_wsg_command_trajectory(new_times)
    traj = PiecewisePose(traj_p_G, traj_R_G)

    meshcat.Delete()
    collector_case1 = IIWACollector2(traj=traj, poses=X_G, times=new_times, traj_wsg=traj_wsg_command, X_WT=X_WT, V_WT=V_WT)

    collector_case1.simulate()

case1_test_conditions = [0.01, 0.05, 0.1, 0.2, 0.5]
# for w in case1_test_conditions:
#     case_1_test(w)
#     time.sleep(2)

In [None]:
def case_2_test(v_target, p_target=np.array([0.55,0.55,0.8]), R_target=RotationMatrix(), t_catch=10):
    R_target_test = RotationMatrix.MakeZRotation(np.pi/4)
    w_target = [0.0, 0.0, 0.0]
    v_target = [0.0, v_target,0.0]
    X_WT=RigidTransform(R_target, p_target) 
    V_WT=SpatialVelocity(w_target, v_target)
    X_WT_test=RigidTransform(R_target_test, p_target)

    # Predict where the target will be after t_catch
    p_target_intercept = p_target + np.array(v_target) * t_catch 

    X_WT_intercept = RigidTransform(R_target, p_target_intercept)

    X_G = {"initial": X_WorldGripper_init}
    X_O = {"initial": X_WT_intercept, "goal": X_WT_intercept}

    X_G, times = make_gripper_frames_lin(X_G, X_O, target_clearance=np.array([0.0,0.1,0.0]), R_GgraspTarget=RotationMatrix().MakeZRotation(np.pi/2), t_catch=t_catch, v_target=np.linalg.norm(np.array(v_target)))

    traj_p_G = make_gripper_position_trajectory(X_G, times)
    traj_R_G = make_gripper_orientation_trajectory(X_G, times)
    traj_wsg_command = make_wsg_command_trajectory(times)
    traj = PiecewisePose(traj_p_G, traj_R_G)

    meshcat.Delete()
    collector_case2 = IIWACollector2(traj=traj, poses=X_G, times=times, traj_wsg=traj_wsg_command, X_WT=X_WT, V_WT=V_WT)

    collector_case2.simulate()

case2_test_conditions = [0.01, 0.05, 0.1, 0.2, 0.5]
# for w in case2_test_conditions:
#     case_2_test(-w)
#     time.sleep(2)

# Got a failure with v_target = 0.5, try changing t_catch
# case_2_test(-0.5, t_catch=5)


In [None]:
def case_3_test(omega_target, v_target, p_target=np.array([0.55,0.55,0.8]), R_target=RotationMatrix(), t_catch=10):
    R_target_test = RotationMatrix.MakeZRotation(np.pi/4)
    w_target = [omega_target, omega_target, 0.0]
    v_target = [0.0, v_target,0.0]
    X_WT=RigidTransform(R_target, p_target) 
    V_WT=SpatialVelocity(w_target, v_target)
    X_WT_test=RigidTransform(R_target_test, p_target)

    # Predict where the target will be after t_catch
    p_target_intercept = p_target + np.array(v_target) * t_catch 

    X_WT_intercept = RigidTransform(R_target, p_target_intercept)

    X_G = {"initial": X_WorldGripper_init}
    X_O = {"initial": X_WT_intercept, "goal": X_WT_intercept}

    X_G, times = make_gripper_frames_combo(X_G, X_O, target_clearance=np.array([0.0,0.1,0.0]), R_GgraspTarget=RotationMatrix().MakeZRotation(np.pi/2), t_catch=t_catch, v_target=np.linalg.norm(np.array(v_target)))

    traj_p_G = make_gripper_position_trajectory(X_G, times)
    traj_R_G = make_gripper_orientation_trajectory(X_G, times)
    traj_wsg_command = make_wsg_command_trajectory(times)
    traj = PiecewisePose(traj_p_G, traj_R_G)

    meshcat.Delete()
    collector_case3 = IIWACollector2(traj=traj, poses=X_G, times=times, traj_wsg=traj_wsg_command, X_WT=X_WT, V_WT=V_WT)

    collector_case3.simulate()

case3_test_conditions = [(0.01, 0.01), (0.05, 0.01), (0.1, 0.01), (0.1, 0.05), (0.2, 0.2)]
# for w in case3_test_conditions:
#     case_3_test(w[0], -w[1])
#     time.sleep(2)

In [None]:
def case_4_test(omega_target, v_target, p_target=np.array([0.55,0.55,0.8]), R_target=RotationMatrix(), R_GgraspObject=RotationMatrix().MakeZRotation(np.pi/2), t_catch=10):
    R_target_test = RotationMatrix.MakeZRotation(np.pi/4)
    w_target = [omega_target, omega_target, 0.0]
    v_target = [v_target, 0.0,0.0]
    X_WT=RigidTransform(R_target, p_target) 
    V_WT=SpatialVelocity(w_target, v_target)
    X_WT_test=RigidTransform(R_target_test, p_target)

    # Predict where the target will be after t_catch
    p_target_intercept = p_target + np.array(v_target) * t_catch 

    X_WT_intercept = RigidTransform(R_target, p_target_intercept)

    X_G = {"initial": X_WorldGripper_init}
    X_O = {"initial": X_WT_intercept, "goal": X_WT_intercept}

    X_G, times = make_gripper_frames_combo(X_G, X_O, target_clearance=np.array([0.0,0.1,0.0]), R_GgraspTarget=R_GgraspObject, t_catch=t_catch, v_target=np.linalg.norm(np.array(v_target)))

    traj_p_G = make_gripper_position_trajectory(X_G, times)
    traj_R_G = make_gripper_orientation_trajectory(X_G, times)
    traj_wsg_command = make_wsg_command_trajectory(times)
    traj = PiecewisePose(traj_p_G, traj_R_G)

    meshcat.Delete()
    collector_case4 = IIWACollector2(traj=traj, poses=X_G, times=times, traj_wsg=traj_wsg_command, X_WT=X_WT, V_WT=V_WT)

    collector_case4.simulate()

case4_test_conditions = [[-0.3, 0.3, 0.8], [0.5, 0.5, 0.2], [0.55, 0.55, 0.8]]
case4_Rgrasps = [RotationMatrix() for i in range(len(case4_test_conditions))]
# for i,w in enumerate(case4_test_conditions):
#     case_4_test(0.1, -0.025, p_target=np.array(w), R_GgraspObject=case4_Rgrasps[i])
#     time.sleep(2)

# Adding random noise

$w\in\mathbb{R}^2$ is drawn from a Gaussian distribution, $w\sim\mathcal{N}(0,\sigma^2=0.25)$. 

Add noise to initial target pose estimate

In [None]:
def case_5_test(omega_target, v_target, p_target=np.array([0.55,0.55,0.8]), R_target=RotationMatrix(), t_catch=10):
    R_target_test = RotationMatrix.MakeZRotation(np.pi/4)
    w_target = [omega_target, omega_target, 0.0]
    v_target = [0.0, v_target,0.0]
    X_WT=RigidTransform(R_target, p_target) 
    V_WT=SpatialVelocity(w_target, v_target)
    X_WT_test=RigidTransform(R_target_test, p_target)

    # Predict where the target will be after t_catch
    # Now add noisy kick
    w = np.random.normal(0.0, scale=np.sqrt(0.001), size=(3,))
    print(p_target)
    print(p_target + w)
    p_target_intercept = (p_target + w) + np.array(v_target) * t_catch 

    X_WT_intercept = RigidTransform(R_target, p_target_intercept)

    X_G = {"initial": X_WorldGripper_init}
    X_O = {"initial": X_WT_intercept, "goal": X_WT_intercept}

    X_G, times = make_gripper_frames_combo(X_G, X_O, target_clearance=np.array([0.0,0.1,0.0]), R_GgraspTarget=RotationMatrix().MakeZRotation(np.pi/2), t_catch=t_catch, v_target=np.linalg.norm(np.array(v_target)))

    traj_p_G = make_gripper_position_trajectory(X_G, times)
    traj_R_G = make_gripper_orientation_trajectory(X_G, times)
    traj_wsg_command = make_wsg_command_trajectory(times)
    traj = PiecewisePose(traj_p_G, traj_R_G)

    meshcat.Delete()
    collector_case5 = IIWACollector2(traj=traj, poses=X_G, times=times, traj_wsg=traj_wsg_command, X_WT=X_WT, V_WT=V_WT)

    collector_case5.simulate()

case5_test_conditions = [(0.01, 0.01), (0.05, 0.01), (0.1, 0.01), (0.1, 0.05), (0.2, 0.2)]
for w in case5_test_conditions:
    case_5_test(w[0], -w[1])
    time.sleep(1)

[0.55 0.55 0.8 ]
[0.52715706 0.51705713 0.82435884]
[0.55 0.55 0.8 ]
[0.59628263 0.5269974  0.80450566]
[0.55 0.55 0.8 ]
[0.56876058 0.55889461 0.78358158]
[0.55 0.55 0.8 ]
[0.57381008 0.4927869  0.7886319 ]
[0.55 0.55 0.8 ]
[0.54286577 0.53522889 0.75482905]


Add noise to initial linear velocity estimate

In [None]:
def case_6_test(omega_target, v_target, p_target=np.array([0.55,0.55,0.8]), R_target=RotationMatrix(), t_catch=10):
    R_target_test = RotationMatrix.MakeZRotation(np.pi/4)
    w_target = [omega_target, omega_target, 0.0]
    v_target = [0.0, v_target,0.0]
    X_WT=RigidTransform(R_target, p_target) 
    V_WT=SpatialVelocity(w_target, v_target)
    X_WT_test=RigidTransform(R_target_test, p_target)

    w = np.random.normal(0.0, scale=np.sqrt(0.001), size=(3,))
    w[0] = 0
    w[2] = 0

    # Predict where the target will be after t_catch
    # Now add noisy kick to linear velocity estimate as a 'model' for a continuous pose estimate with noise
    print(v_target)
    print(v_target + w)
    p_target_intercept = (p_target) + np.array(v_target + w) * t_catch 

    X_WT_intercept = RigidTransform(R_target, p_target_intercept)

    X_G = {"initial": X_WorldGripper_init}
    X_O = {"initial": X_WT_intercept, "goal": X_WT_intercept}

    X_G, times = make_gripper_frames_combo(X_G, X_O, target_clearance=np.array([0.0,0.1,0.0]), R_GgraspTarget=RotationMatrix().MakeZRotation(np.pi/2), t_catch=t_catch, v_target=np.linalg.norm(np.array(v_target+w)))

    traj_p_G = make_gripper_position_trajectory(X_G, times)
    traj_R_G = make_gripper_orientation_trajectory(X_G, times)
    traj_wsg_command = make_wsg_command_trajectory(times)
    traj = PiecewisePose(traj_p_G, traj_R_G)

    meshcat.Delete()
    collector_case6 = IIWACollector2(traj=traj, poses=X_G, times=times, traj_wsg=traj_wsg_command, X_WT=X_WT, V_WT=V_WT)

    collector_case6.simulate()

case6_test_conditions = [(0.01, 0.01), (0.05, 0.01), (0.1, 0.01), (0.1, 0.05), (0.2, 0.2)]
for w in case6_test_conditions:
    case_6_test(w[0], -w[1])
    time.sleep(1)

[0.0, -0.01, 0.0]
[ 0.         -0.01522328  0.        ]
[0.0, -0.01, 0.0]
[ 0.         -0.01060131  0.        ]
[0.0, -0.01, 0.0]
[ 0.         -0.00554668  0.        ]
[0.0, -0.05, 0.0]
[ 0.         -0.04737083  0.        ]
[0.0, -0.2, 0.0]
[ 0.         -0.10738168  0.        ]


Add noise to initial angular velocity estimate. To do this I need to modify the CollectorPlanner a bit and then modify the IIWACollector class yet again. 

In [None]:
class NoisyCollectorPlanner(LeafSystem):
    ''' Effectively acts as a custom TrajectorySource
    '''
    def __init__(self, plant, traj, poses, times, output_vel):
        LeafSystem.__init__(self)
        self._plant = plant
        self._plant_context = plant.CreateDefaultContext()
        self._iiwa = plant.GetModelInstanceByName("iiwa7")
        self._G = plant.GetBodyByName("body").body_frame()
        self._W = plant.world_frame()
        
        assert traj.cols() == 1
        port = self.DeclareVectorOutputPort("value", traj.rows(), self.CalcOutput)
        port.disable_caching_by_default()
        self.w_x_port = self.DeclareVectorInputPort("target_rotation_rate_x", BasicVector(1))
        self.w_y_port = self.DeclareVectorInputPort("target_rotation_rate_y", BasicVector(1))
        self.w_z_port = self.DeclareVectorInputPort("target_rotation_rate_z", BasicVector(1))
        # Keep the trajectory as undeclared state
        self.traj = traj
        self.match_time = times['adjusted']
        self.updated = False
        self.w_cyl = None
        self.R_WG_adjusted = poses['adjusted'].rotation()
        self.w = np.random.normal(0.0, scale=np.sqrt(0.001), size=(3,))
        self.w[2] = 0

        # Flag to see whether outputting linear or angular velocities
        self.output_vel = output_vel

    def get_X_WG(self):

        X_WG = self._plant.CalcRelativeTransform(
                    self._plant_context,
                    frame_A=self._W,
                    frame_B=self._G)
        return X_WG

    def CalcOutput(self, context, output):
        # Note: you can add input ports and read their values here.
        if (context.get_time() >= times['pick_end']):
            output.SetFromVector(self.traj.value(context.get_time()))
        elif (context.get_time() >= self.match_time):
            target_ang_vel_x = self.w_x_port.Eval(context)
            target_ang_vel_y = self.w_y_port.Eval(context)
            target_ang_vel_z = self.w_z_port.Eval(context)
            target_ang_vel = np.array([target_ang_vel_x[0], target_ang_vel_y[0], target_ang_vel_z[0]])
            # print(target_ang_vel)

            # If outputting linear velocity
            if self.output_vel:
                lin_vel = np.array([0.0, 0.0, 0.0])
                # output.SetFromVector(lin_vel)
                output.SetFromVector(self.traj.value(context.get_time()))
            # Else if outputting angular velocity
            else:
                if self.w_cyl is None:
                    self.w_cyl = np.linalg.norm(target_ang_vel + self.w)
                    print('Noisy rate measurement: ' + str(self.w_cyl))
                ang_vel_y_G = (self.w_cyl * times['adjusted'] + self.w_cyl * (times['pick_start'] - times['adjusted'])) / (times['pick_start'] - times['adjusted'])
                ang_vel_G = np.array([0, ang_vel_y_G, 0.0])
                ang_vel = self.R_WG_adjusted.multiply(ang_vel_G)
                # print(ang_vel)
                output.SetFromVector(ang_vel)
        else:
            output.SetFromVector(self.traj.value(context.get_time()))


In [None]:
class IIWACollector3():
    def __init__(self, traj=None, poses=None, times=None, traj_wsg=None, X_WT=RigidTransform(p=[1.0,0,1.0]), V_WT=SpatialVelocity(w=[0.0,0.0,0.0], v=[0.0,0.0,0.0])):
        builder = DiagramBuilder()
        # set up the system of manipulation station
        station_diagram, self.plant = MakeManipulationStation()
        self.station = builder.AddSystem(station_diagram)
        self.iiwa_model_instance = self.plant.GetModelInstanceByName("iiwa7")
        self.X_WT=X_WT
        self.V_WT=V_WT

        # optionally add trajectory source
        if traj is not None:
            traj_v_G = traj.get_position_trajectory().MakeDerivative()
            traj_w_G = traj.get_orientation_trajectory().MakeDerivative()
            v_G_source = builder.AddSystem(NoisyCollectorPlanner(self.plant, traj_v_G, poses, times, True))
            w_G_source = builder.AddSystem(NoisyCollectorPlanner(self.plant, traj_w_G, poses, times, False))
            self.controller = builder.AddSystem(PseudoInverseController(self.plant))
            builder.Connect(v_G_source.get_output_port(), self.controller.GetInputPort("v_G"))
            builder.Connect(w_G_source.get_output_port(), self.controller.GetInputPort("w_G"))
            builder.Connect(self.station.GetOutputPort("target_angular_vel_x"), v_G_source.GetInputPort("target_rotation_rate_x"))
            builder.Connect(self.station.GetOutputPort("target_angular_vel_x"), w_G_source.GetInputPort("target_rotation_rate_x"))
            builder.Connect(self.station.GetOutputPort("target_angular_vel_y"), v_G_source.GetInputPort("target_rotation_rate_y"))
            builder.Connect(self.station.GetOutputPort("target_angular_vel_y"), w_G_source.GetInputPort("target_rotation_rate_y"))
            builder.Connect(self.station.GetOutputPort("target_angular_vel_z"), v_G_source.GetInputPort("target_rotation_rate_z"))
            builder.Connect(self.station.GetOutputPort("target_angular_vel_z"), w_G_source.GetInputPort("target_rotation_rate_z"))
            self.integrator = builder.AddSystem(Integrator(7))
            builder.Connect(self.controller.get_output_port(), 
                        self.integrator.get_input_port())
            builder.Connect(self.integrator.get_output_port(),
                            self.station.GetInputPort("iiwa_position"))
            builder.Connect(self.station.GetOutputPort("iiwa_position_measured"),
                            self.controller.GetInputPort("iiwa_position"))
        # handle gripper velocity matching
            
        if traj_wsg is not None:
            wsg_source = builder.AddSystem(TrajectorySource(traj_wsg))
            wsg_source.set_name("wsg_command")
            builder.Connect(wsg_source.get_output_port(), self.station.GetInputPort("wsg_position"))
        else:
            wsg_position = builder.AddSystem(ConstantVectorSource([0.1]))
            builder.Connect(wsg_position.get_output_port(), self.station.GetInputPort("wsg_position"))

        params = MeshcatVisualizerParams()
        params.delete_on_initialization_event = False
        self.visualizer = MeshcatVisualizerCpp.AddToBuilder(
            builder, self.station.GetOutputPort("geometry_query"), meshcat, params)

        self.diagram = builder.Build()
        self.gripper_frame = self.plant.GetFrameByName('body')
        self.world_frame = self.plant.world_frame()

        context = self.CreateDefaultContext()
        self.diagram.Publish(context)

    def visualize_frame(self, name, X_WF, length=0.15, radius=0.006):
        """
        visualize imaginary frame that are not attached to existing bodies
        
        Input: 
            name: the name of the frame (str)
            X_WF: a RigidTransform to from frame F to world.
        
        Frames whose names already exist will be overwritten by the new frame
        """
        AddMeshcatTriad(meshcat, "painter/" + name,
                        length=length, radius=radius, X_PT=X_WF)

    def CreateDefaultContext(self):
        context = self.diagram.CreateDefaultContext()
        plant_context = self.diagram.GetMutableSubsystemContext(self.plant,   context)
        station_context = self.diagram.GetMutableSubsystemContext(self.station, context)
        # station_context = self.station.GetMyContextFromRoot(self.simulator.get_mutable_context())
        
        # provide initial states
        q0 = np.array([ 1.40666193e-05,  1.56461165e-01, -3.82761069e-05, 
                       -1.32296976e+00, -6.29097287e-06,  1.61181157e+00, -2.66900985e-05])
        
        # set the joint positions of the kuka arm
        self.plant.SetPositions(plant_context, self.iiwa_model_instance, q0)

        # Set target's initial condition (pose and spatial velocity)
        target = self.plant.GetBodyByName('target')
        self.plant.SetFreeBodyPose(plant_context, target, X_WB=self.X_WT)
        self.plant.SetFreeBodySpatialVelocity(context=plant_context, body=target, V_WB=self.V_WT)

        if hasattr(self, 'integrator'):
            self.integrator.set_integral_value(
                self.integrator.GetMyMutableContextFromRoot(context), 
                self.plant.GetPositions(plant_context, self.iiwa_model_instance))

        return context
    
    def get_X_WG(self, context=None):

        if not context:
            context = self.CreateDefaultContext()
        plant_context = self.plant.GetMyMutableContextFromRoot(context)
        X_WG = self.plant.CalcRelativeTransform(
                    plant_context,
                    frame_A=self.world_frame,
                    frame_B=self.gripper_frame)
        return X_WG

    def simulate(self):
        context = self.CreateDefaultContext()
        simulator = Simulator(self.diagram, context)
        simulator.set_target_realtime_rate(1.0)

        duration = 20.0 if running_as_notebook else 0.01
        simulator.AdvanceTo(duration)

    def simulate_with_duration(self, tiempo):
        context = self.CreateDefaultContext()
        simulator = Simulator(self.diagram, context)
        simulator.set_target_realtime_rate(1.0)

        duration = tiempo if running_as_notebook else 0.01
        simulator.AdvanceTo(duration)

    def short_sim(self):
        context = self.CreateDefaultContext()
        simulator = Simulator(self.diagram, context)
        simulator.set_target_realtime_rate(1.0)

        duration = 5
        simulator.AdvanceTo(duration)

    def get_target_location(self):
        return self.X_WT.p, self.X_WT.R

    def get_diagram(self):
        return self.diagram

In [None]:
def case_7_test(omega_target, v_target, p_target=np.array([0.55,0.55,0.8]), R_target=RotationMatrix(), t_catch=10):
    R_target_test = RotationMatrix.MakeZRotation(np.pi/4)
    w_target = [omega_target, omega_target, 0.0]
    v_target = [0.0, v_target,0.0]
    X_WT=RigidTransform(R_target, p_target) 
    V_WT=SpatialVelocity(w_target, v_target)
    X_WT_test=RigidTransform(R_target_test, p_target)

    # Predict where the target will be after t_catch
    # Now add noisy kick to linear velocity estimate as a 'model' for a continuous pose estimate with noise
    p_target_intercept = (p_target) + np.array(v_target) * t_catch 

    X_WT_intercept = RigidTransform(R_target, p_target_intercept)

    X_G = {"initial": X_WorldGripper_init}
    X_O = {"initial": X_WT_intercept, "goal": X_WT_intercept}

    X_G, times = make_gripper_frames_combo(X_G, X_O, target_clearance=np.array([0.0,0.1,0.0]), R_GgraspTarget=RotationMatrix().MakeZRotation(np.pi/2), t_catch=t_catch, v_target=np.linalg.norm(np.array(v_target)))

    traj_p_G = make_gripper_position_trajectory(X_G, times)
    traj_R_G = make_gripper_orientation_trajectory(X_G, times)
    traj_wsg_command = make_wsg_command_trajectory(times)
    traj = PiecewisePose(traj_p_G, traj_R_G)

    meshcat.Delete()
    collector_case7 = IIWACollector3(traj=traj, poses=X_G, times=times, traj_wsg=traj_wsg_command, X_WT=X_WT, V_WT=V_WT)

    collector_case7.simulate()

case7_test_conditions = [(0.01, 0.01), (0.05, 0.01), (0.1, 0.01), (0.1, 0.05)]
for w in case7_test_conditions:
    case_7_test(w[0], -w[1])
    time.sleep(1)

Noisy rate measurement: 0.043001498610295646
Noisy rate measurement: 0.03197766153912803
Noisy rate measurement: 0.2118509389350274
Noisy rate measurement: 0.11819056674743751
Noisy rate measurement: 0.284494229279346


# Future Improvements

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=52fe2c12-5462-4de8-b3a8-e001ac600845' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>