# Imports

In [4]:
import os
import mpld3
import numpy as np
import pydot
from IPython.display import SVG
from matplotlib import pyplot as plt
from pydrake.all import (
    AddMultibodyPlantSceneGraph,
    AngleAxis,
    DiagramBuilder,
    Integrator,
    JacobianWrtVariable,
    LeafSystem,
    MeshcatVisualizer,
    MultibodyPlant,
    MultibodyPositionToGeometryPose,
    Parser,
    PiecewisePolynomial,
    PiecewisePose,
    Quaternion,
    Rgba,
    RigidTransform,
    RotationMatrix,
    SceneGraph,
    Simulator,
    StartMeshcat,
    TrajectorySource,
)
from pydrake.common import temp_directory

from manipulation import running_as_notebook
from manipulation.station import MakeHardwareStation, load_scenario

In [19]:
meshcat = StartMeshcat()

INFO:drake:Meshcat listening for connections at http://localhost:7004


## Table SDF

In [20]:
temp_dir = temp_directory()
table_top_sdf_file = os.path.join(temp_dir, "table_top.sdf")

table_dims = (1, 1, 0.015)
table_dims_str = " ".join([str(dim) for dim in table_dims])

table_pose = [0, 0.5, 0]
table_pose_str = " ".join([str(dim) for dim in table_pose])

table_top_sdf = f"""<?xml version="1.0"?>
<sdf version="1.7">

  <model name="table_top">
    <link name="table_top_link">
      <inertial>
        <mass>18.70</mass>
        <inertia>
          <ixx>0.79</ixx>
          <ixy>0</ixy>
          <ixz>0</ixz>
          <iyy>0.53</iyy>
          <iyz>0</iyz>
          <izz>1.2</izz>
        </inertia>
      </inertial>
    <visual name="bottom">
        <pose>{table_pose_str} 0 0 0</pose>
        <geometry>
          <box>
            <size>{table_dims_str}</size>
          </box>
        </geometry>
        <material>
          <diffuse>0.9 0.9 0.9 1.0</diffuse>
        </material>
      </visual>
      <collision name="bottom">
        <pose>{table_pose_str} 0 0 0</pose>
        <geometry>
          <box>
            <size>{table_dims_str}</size>
          </box>
        </geometry>
        <drake:proximity_properties>
          <drake:compliant_hydroelastic/>
          <drake:hydroelastic_modulus>1.0e6</drake:hydroelastic_modulus>
        </drake:proximity_properties>
      </collision>
    </link>
    <frame name="table_top_center">
      <pose relative_to="table_top_link">0 0 0 0 0 0</pose>
    </frame>
  </model>
</sdf>

"""

with open(table_top_sdf_file, "w") as f:
    f.write(table_top_sdf)

## Simulation Directive
Using the table top sdf from above, along with an iiwa arm welded to a Shunck gripper and a foam brick

In [21]:
p_origin = np.array(table_pose)
deltax = np.array([0.15, 0, 0])
deltay = np.array([0, 0.15, 0])
p_O = {
    'topRight': p_origin + deltax + deltay,
    'topLeft': p_origin - deltax + deltay,
    'botRight': p_origin + deltax - deltay,
    'botLeft': p_origin - deltax - deltay
}

def generateDirective(initPoseName1: str, initPoseName2: str, sim_test: bool = False) -> str:
    poseNames = ['topRight', 'topLeft', 'botRight', 'botLeft']
    assert initPoseName1 in poseNames, f"initPoseName1 must be one of {poseNames}"
    assert initPoseName2 in poseNames, f"initPoseName2 must be one of {poseNames}"
    brick1_position = p_O[initPoseName1].tolist()
    brick2_position = p_O[initPoseName2].tolist()
    model_directives = f"""
    directives:
    - add_frame:
        name: iiwa_frame
        X_PF:
          base_frame: world
          rotation: !Rpy {{ deg: [0.0, 0.0, 180.0 ]}}
          translation: [0.0, 0, 0.0]
    
    - add_model:
        name: iiwa
        file: package://drake/manipulation/models/iiwa_description/iiwa7/iiwa7_no_collision.sdf
        default_joint_positions:
            iiwa_joint_1: [-1.57]
            iiwa_joint_2: [0.1]
            iiwa_joint_3: [0]
            iiwa_joint_4: [-1.2]
            iiwa_joint_5: [0]
            iiwa_joint_6: [1.6]
            iiwa_joint_7: [0]
    - add_weld:
        parent: iiwa_frame
        child: iiwa::iiwa_link_0
    - add_model:
        name: wsg
        file: package://drake/manipulation/models/wsg_50_description/sdf/schunk_wsg_50_with_tip.sdf
    - add_weld:
        parent: iiwa::iiwa_link_7
        child: wsg::body
        X_PC:
            translation: [0, 0, 0.09]
            rotation: !Rpy {{ deg: [90, 0, 90]}}
    - add_model:
        name: table_top
        file: file://{table_top_sdf_file}
    - add_weld:
        parent: world
        child: table_top::table_top_center
    - add_model:
        name: foam_brick1
        file: package://manipulation/hydro/061_foam_brick.sdf
        default_free_body_pose:
            base_link:
                translation: {brick1_position}
                rotation: !Rpy {{ deg: [0, 0, 0] }} 
    - add_model:
        name: foam_brick2
        file: package://manipulation/hydro/061_foam_brick.sdf
        default_free_body_pose:
            base_link:
                translation: {brick2_position}
                rotation: !Rpy {{ deg: [0, 0, 0] }} 
    """
    if not sim_test:
        model_directives += """model_drivers:
            iiwa: !IiwaDriver
                hand_model_name: wsg
            wsg: !SchunkWsgDriver {}
        """
    return model_directives

Check to see that all parts of project loaded in properly

In [22]:
meshcat.Delete()
builder = DiagramBuilder()

model_directives = generateDirective('botLeft', 'topRight', True)
scenario = load_scenario(data=model_directives)
station = MakeHardwareStation(scenario, meshcat)

simulator = Simulator(station)
meshcat.StartRecording()
simulator.AdvanceTo(1.0 if running_as_notebook else 0.1)
meshcat.PublishRecording()


## TAMP Pose Generation
Use TAMP framework to get desired actions. Convert actions to poses and times.

In [23]:
from utils import *
from bfs import *

In [24]:
location_names = ['topRight', 'topLeft', 'botRight', 'botLeft']
locations = [Pose(name, p_O[name]) for name in location_names]

In [25]:
def MakeGripperFrames(X_WG, X_WO):
    """
    Takes a partial specification with X_G["initial"] and X_O["initial"] and
    X_0["goal"], and returns a X_G and times with all of the pick and place
    frames populated.
    """
    # Define (again) the gripper pose relative to the object when in grasp.
    p_GgraspO = [0, 0.12, 0]
    R_GgraspO = RotationMatrix.MakeXRotation(
        np.pi / 2.0
    ) @ RotationMatrix.MakeZRotation(np.pi / 2.0)
    X_GgraspO = RigidTransform(R_GgraspO, p_GgraspO)
    X_OGgrasp = X_GgraspO.inverse()
    # pregrasp is negative y in the gripper frame (see the figure!).
    X_GgraspGpregrasp = RigidTransform([0, -0.2, 0])

    X_WG["pick"] = X_WO["initial"] @ X_OGgrasp
    X_WG["prepick"] = X_WG["pick"] @ X_GgraspGpregrasp
    X_WG["place"] = X_WO["goal"] @ X_OGgrasp
    X_WG["preplace"] = X_WG["place"] @ X_GgraspGpregrasp

    # I'll interpolate a halfway orientation by converting to axis angle and halving the angle.
    X_GprepickGpreplace = X_WG["prepick"].inverse() @ X_WG["preplace"]
    angle_axis = X_GprepickGpreplace.rotation().ToAngleAxis()
    X_GprepickGclearance = RigidTransform(
        AngleAxis(angle=angle_axis.angle() / 2.0, axis=angle_axis.axis()),
        X_GprepickGpreplace.translation() / 2.0 + np.array([0, -0.3, 0]),
    )
    X_WG["clearance"] = X_WG["prepick"] @ X_GprepickGclearance

    # Now let's set the timing
    times = {"initial": 0}
    X_GinitialGprepick = X_G["initial"].inverse() @ X_WG["prepick"]
    times["prepick"] = times["initial"] + 10.0 * np.linalg.norm(
        X_GinitialGprepick.translation()
    )
    # Allow some time for the gripper to close.
    times["pick_start"] = times["prepick"] + 2.0
    times["pick_end"] = times["pick_start"] + 2.0
    X_WG["pick_start"] = X_WG["pick"]
    X_WG["pick_end"] = X_WG["pick"]
    times["postpick"] = times["pick_end"] + 2.0
    X_WG["postpick"] = X_WG["prepick"]
    time_to_from_clearance = 10.0 * np.linalg.norm(
        X_GprepickGclearance.translation()
    )
    times["clearance"] = times["postpick"] + time_to_from_clearance
    times["preplace"] = times["clearance"] + time_to_from_clearance
    times["place_start"] = times["preplace"] + 2.0
    times["place_end"] = times["place_start"] + 2.0
    X_WG["place_start"] = X_WG["place"]
    X_WG["place_end"] = X_WG["place"]
    times["postplace"] = times["place_end"] + 2.0
    X_WG["postplace"] = X_WG["preplace"]

    return X_WG, times


X_G = {
    "initial": RigidTransform(
        RotationMatrix.MakeXRotation(-np.pi / 2.0), [0, -0.25, 0.25]
    )
}
X_O = {
    "initial": RigidTransform(
        RotationMatrix.MakeZRotation(np.pi / 2.0), [-0.2, -0.75, 0.025]
    ),
    "goal": RigidTransform(
        RotationMatrix.MakeZRotation(np.pi), [0.75, 0, 0.025]
    ),
}
X_G, times = MakeGripperFrames(X_G, X_O)
print(
    f"Sanity check: The entire maneuver will take {times['postplace']} seconds to execute."
)
print(len(X_G))
print(X_G.keys())
print(times.keys())
print(times)
print(len(times))

Sanity check: The entire maneuver will take 30.97757392617375 seconds to execute.
12
dict_keys(['initial', 'pick', 'prepick', 'place', 'preplace', 'clearance', 'pick_start', 'pick_end', 'postpick', 'place_start', 'place_end', 'postplace'])
dict_keys(['initial', 'prepick', 'pick_start', 'pick_end', 'postpick', 'clearance', 'preplace', 'place_start', 'place_end', 'postplace'])
{'initial': 0, 'prepick': 5.468317840067455, 'pick_start': 7.468317840067455, 'pick_end': 9.468317840067455, 'postpick': 11.468317840067455, 'clearance': 18.2229458831206, 'preplace': 24.97757392617375, 'place_start': 26.97757392617375, 'place_end': 28.97757392617375, 'postplace': 30.97757392617375}
10


In [26]:
def MakeGripperPoseTrajectory(X_G, times):
    """
    Constructs a gripper position trajectory from the plan "sketch".
    """

    sample_times = []
    poses = []
    for name in [
        "initial",
        "prepick",
        "pick_start",
        "pick_end",
        "postpick",
        "clearance",
        "preplace",
        "place_start",
        "place_end",
        "postplace",
    ]:
        sample_times.append(times[name])
        poses.append(X_G[name])

    print(len(sample_times))
    print(len(poses))
    # print(sample_times)
    # print(poses)
    return PiecewisePose.MakeLinear(sample_times, poses)


traj_X_G = MakeGripperPoseTrajectory(X_G, times)

10
10


In [86]:
def MakeGripperPoseTrajectoryMulti(X_Gs, timess):
    """
    Constructs a gripper position trajectory from the plan "sketch".
    """

    sample_times = []
    poses = []
    for X_G, times in zip(X_Gs, timess):
        start_time = sample_times[-1] + 0.1 if sample_times else 0
        for name in [
            "initial",
            "prepick",
            "pick_start",
            "pick_end",
            "postpick",
            "clearance",
            "preplace",
            "place_start",
            "place_end",
            "postplace",
        ]:
            sample_times.append(times[name] + start_time)
            poses.append(X_G[name])

    end_time = max(sample_times)

    return PiecewisePose.MakeLinear(sample_times, poses), end_time

In [87]:
def MakeGripperCommandTrajectory(times):
    opened = np.array([0.107])
    closed = np.array([0.0])

    traj_wsg_command = PiecewisePolynomial.FirstOrderHold(
        [times["initial"], times["pick_start"]],
        np.hstack([[opened], [opened]]),
    )
    traj_wsg_command.AppendFirstOrderSegment(times["pick_end"], closed)
    traj_wsg_command.AppendFirstOrderSegment(times["place_start"], closed)
    traj_wsg_command.AppendFirstOrderSegment(times["place_end"], opened)
    traj_wsg_command.AppendFirstOrderSegment(times["postplace"], opened)
    return traj_wsg_command


traj_wsg_command = MakeGripperCommandTrajectory(times)

In [88]:
class GripperTrajectoriesToPosition(LeafSystem):
    def __init__(self, plant, traj_p_G, traj_R_G, traj_wsg_command):
        LeafSystem.__init__(self)
        self.plant = plant
        self.gripper_body = plant.GetBodyByName("body")
        self.left_finger_joint = plant.GetJointByName(
            "left_finger_sliding_joint"
        )
        self.right_finger_joint = plant.GetJointByName(
            "right_finger_sliding_joint"
        )
        self.traj_p_G = traj_p_G
        self.traj_R_G = traj_R_G
        self.traj_wsg_command = traj_wsg_command
        self.plant_context = plant.CreateDefaultContext()

        self.DeclareVectorOutputPort(
            "position", plant.num_positions(), self.CalcPositionOutput
        )

    def CalcPositionOutput(self, context, output):
        t = context.get_time()
        X_G = RigidTransform(
            Quaternion(self.traj_R_G.value(t)), self.traj_p_G.value(t)
        )
        self.plant.SetFreeBodyPose(self.plant_context, self.gripper_body, X_G)
        wsg = self.traj_wsg_command.value(t)[0, 0]
        self.left_finger_joint.set_translation(self.plant_context, -wsg / 2.0)
        self.right_finger_joint.set_translation(self.plant_context, wsg / 2.0)
        output.SetFromVector(self.plant.GetPositions(self.plant_context))

In [89]:
class PseudoInverseController(LeafSystem):
    def __init__(self, plant):
        LeafSystem.__init__(self)
        self._plant = plant
        self._plant_context = plant.CreateDefaultContext()
        self._iiwa = plant.GetModelInstanceByName("iiwa")
        self._G = plant.GetBodyByName("body").body_frame()
        self._W = plant.world_frame()

        self.V_G_port = self.DeclareVectorInputPort("V_WG", 6)
        self.q_port = self.DeclareVectorInputPort("iiwa.position", 7)
        self.DeclareVectorOutputPort("iiwa.velocity", 7, self.CalcOutput)
        self.iiwa_start = plant.GetJointByName("iiwa_joint_1").velocity_start()
        self.iiwa_end = plant.GetJointByName("iiwa_joint_7").velocity_start()

    def CalcOutput(self, context, output):
        V_G = self.V_G_port.Eval(context)
        q = self.q_port.Eval(context)
        self._plant.SetPositions(self._plant_context, self._iiwa, q)
        J_G = self._plant.CalcJacobianSpatialVelocity(
            self._plant_context,
            JacobianWrtVariable.kV,
            self._G,
            [0, 0, 0],
            self._W,
            self._W,
        )
        J_G = J_G[:, self.iiwa_start : self.iiwa_end + 1]  # Only iiwa terms.
        v = np.linalg.pinv(J_G).dot(V_G)
        output.SetFromVector(v)

Initialize Object (foam brick), Environment, and States 

In [90]:
import random

env = Environment(locations)
initPose1 = random.choice(locations)
while True:
    initPose2 = random.choice(locations)
    if initPose2 != initPose1:
        break
block1 = Object(pose=initPose1)
block2 = Object(pose=initPose2)
initObjects = [block1, block2]
goalObjects = [Object(pose=initPose2, objId=block1.id)]
initState = State(objects=initObjects, env=env)
goalState = State(objects=goalObjects, env=env, isGoal=True)

In [91]:
states, actions = bfs(initState, goalState)
print(states)
print(actions)
print(len(actions))

[State(objects={400: Object(id=400, pose=Pose(name=topLeft, position=[-0.15  0.65  0.  ])), 560: Object(id=560, pose=Pose(name=botRight, position=[0.15 0.35 0.  ]))}, prevAction=None, holding=None, State(objects={400: Object(id=400, pose=Pose(name=topLeft, position=[-0.15  0.65  0.  ])), 560: Object(id=560, pose=Pose(name=botRight, position=[0.15 0.35 0.  ]))}, prevAction=Action(actionType=ActionType.MOVE_PICK, effects=Effects(actionType=ActionType.MOVE_PICK, desiredPose=None, obj=Object(id=560, pose=Pose(name=botRight, position=[0.15 0.35 0.  ])))), holding=Object(id=560, pose=Pose(name=botRight, position=[0.15 0.35 0.  ])), State(objects={400: Object(id=400, pose=Pose(name=topLeft, position=[-0.15  0.65  0.  ])), 560: Object(id=560, pose=Pose(name=topRight, position=[0.15 0.65 0.  ]))}, prevAction=Action(actionType=ActionType.MOVE_PLACE, effects=Effects(actionType=ActionType.MOVE_PLACE, desiredPose=Pose(name=topRight, position=[0.15 0.65 0.  ]), obj=Object(id=560, pose=Pose(name=botR

In [92]:
def actionsToObjectPoses(actionPair: Tuple[Action]) -> dict[str, RigidTransform]:
    assert(len(actionPair) == 2)
    pick, place = actionPair
    assert(pick.actionType == ActionType.MOVE_PICK)
    assert(place.actionType == ActionType.MOVE_PLACE)

    X_O = {
        "initial": RigidTransform(RotationMatrix(), p_O[pick.effects.obj.pose.name]),
        "goal": RigidTransform(RotationMatrix(), p_O[place.effects.desiredPose.name])
    }
    
    return X_O

In [93]:
def MakeGripperPoseTrajectoryFullStack(actions: List[Action], X_G_init: RigidTransform) -> Tuple[PiecewisePose, float]:
    X_Os = []
    for i in range(len(actions)):
        if i % 2 == 0: continue
        X_Os.append(actionsToObjectPoses(actions[i-1:i+1]))
    
    X_Gs, timess = [], []
    for X_O in X_Os:
        X_G, times = MakeGripperFrames(X_G_init, X_O)
        X_Gs.append(X_G)
        timess.append(times)
    
    traj, end_time = MakeGripperPoseTrajectoryMulti(X_Gs, timess)
    return traj, end_time

In [94]:
meshcat.Delete()
builder = DiagramBuilder()
model_directives = generateDirective(initPoseName1=initPose1.name, initPoseName2=initPose2.name)
scenario = load_scenario(data=model_directives)
station = builder.AddSystem(MakeHardwareStation(scenario, meshcat=meshcat))
plant = station.GetSubsystemByName("plant")
# print(plant.GetBodyByName("body"))
# print(plant.GetBodyByName("base_link"))
# plant.SetDefaultFreeBodyPose(plant.GetBodyByName("base_link"), X_O["initial"])

# Find the initial pose of the gripper and object (as set in the default Context)
temp_context = station.CreateDefaultContext()
temp_plant_context = plant.GetMyContextFromRoot(temp_context)
# X_G = {
#     "initial": plant.EvalBodyPoseInWorld(
#         temp_plant_context, plant.GetBodyByName("body")
#     )
# }
# X_O["initial"] = plant.EvalBodyPoseInWorld(
#     temp_plant_context, plant.GetBodyByName("base_link")
# )
# X_G, times = MakeGripperFrames(X_G, X_O)
# print(
#     f"Sanity check: The entire maneuver will take {times['postplace']} seconds to execute."
# )
# print(times)

In [95]:
# Make the trajectories
# traj = MakeGripperPoseTrajectory(X_G, times)
traj, end_time = MakeGripperPoseTrajectoryFullStack(actions, X_G)
print(end_time)
traj_V_G = traj.MakeDerivative()

V_G_source = builder.AddSystem(TrajectorySource(traj_V_G))
V_G_source.set_name("v_WG")
controller = builder.AddSystem(PseudoInverseController(plant))
controller.set_name("PseudoInverseController")
builder.Connect(V_G_source.get_output_port(), controller.GetInputPort("V_WG"))

integrator = builder.AddSystem(Integrator(7))
integrator.set_name("integrator")
builder.Connect(controller.get_output_port(), integrator.get_input_port())
builder.Connect(
    integrator.get_output_port(), station.GetInputPort("iiwa.position")
)
builder.Connect(
    station.GetOutputPort("iiwa.position_measured"),
    controller.GetInputPort("iiwa.position"),
)

traj_wsg_command = MakeGripperCommandTrajectory(times)
wsg_source = builder.AddSystem(TrajectorySource(traj_wsg_command))
wsg_source.set_name("wsg.command")
builder.Connect(
    wsg_source.get_output_port(), station.GetInputPort("wsg.position")
)

diagram = builder.Build()
diagram.set_name("pick_and_place")

simulator = Simulator(diagram)
context = simulator.get_mutable_context()
station_context = station.GetMyContextFromRoot(context)
integrator.set_integral_value(
    integrator.GetMyContextFromRoot(context),
    plant.GetPositions(
        plant.GetMyContextFromRoot(context),
        plant.GetModelInstanceByName("iiwa"),
    ),
)

diagram.ForcedPublish(context)
meshcat.StartRecording(set_visualizations_while_recording=False)
simulator.AdvanceTo(end_time if running_as_notebook else 0.1)
meshcat.PublishRecording()

53.531775922873756
