In [None]:
import numpy as np
import anthropic
from matplotlib import pyplot as plt
from pydrake.all import (
    AddMultibodyPlantSceneGraph,
    AngleAxis,
    DiagramBuilder,
    Integrator,
    JacobianWrtVariable,
    LeafSystem,
    MeshcatVisualizer,
    MultibodyPlant,
    MultibodyPositionToGeometryPose,
    Parser,
    PiecewisePolynomial,
    PiecewisePose,
    Quaternion,
    Rgba,
    RigidTransform,
    RotationMatrix,
    SceneGraph,
    Simulator,
    StartMeshcat,
    TrajectorySource,
)

from manipulation import running_as_notebook
from manipulation.station import MakeHardwareStation, load_scenario
from manipulation.utils import RenderDiagram

In [None]:
def MakeGripperFrames(X_WG, X_WO):

    p_GgraspO = [0, 0.12, 0]
    R_GgraspO = RotationMatrix.MakeXRotation(
        np.pi / 2.0
    ) @ RotationMatrix.MakeZRotation(np.pi / 2.0)
    X_GgraspO = RigidTransform(R_GgraspO, p_GgraspO)
    X_OGgrasp = X_GgraspO.inverse()
    X_GgraspGpregrasp = RigidTransform([0, -0.25, 0])

    X_WG["pick"] = X_WO["initial"] @ X_OGgrasp
    X_WG["prepick"] = X_WG["pick"] @ X_GgraspGpregrasp
    X_WG["place"] = X_WO["goal"] @ X_OGgrasp
    X_WG["preplace"] = X_WG["place"] @ X_GgraspGpregrasp

    X_GprepickGpreplace = X_WG["prepick"].inverse() @ X_WG["preplace"]
    angle_axis = X_GprepickGpreplace.rotation().ToAngleAxis()
    X_GprepickGclearance = RigidTransform(
        AngleAxis(angle=angle_axis.angle() / 2.0, axis=angle_axis.axis()),
        X_GprepickGpreplace.translation() / 2.0 + np.array([0, -0.1, 0]),
    )
    X_WG["clearance"] = X_WG["prepick"] @ X_GprepickGclearance

    times = {"initial": 0}
    X_GinitialGprepick = X_WG["initial"].inverse() @ X_WG["prepick"]
    times["prepick"] = times["initial"] + 5 * np.linalg.norm(
        X_GinitialGprepick.translation()
    )
    times["pick_start"] = times["prepick"] + 2.0
    times["pick_end"] = times["pick_start"] + 2.0
    X_WG["pick_start"] = X_WG["pick"]
    X_WG["pick_end"] = X_WG["pick"]
    times["postpick"] = times["pick_end"] + 2.0
    X_WG["postpick"] = X_WG["prepick"]
    time_to_from_clearance = 5 * np.linalg.norm(X_GprepickGclearance.translation())
    times["clearance"] = times["postpick"] + time_to_from_clearance
    times["preplace"] = times["clearance"] + time_to_from_clearance
    times["place_start"] = times["preplace"] + 2.0
    times["place_end"] = times["place_start"] + 2.0
    X_WG["place_start"] = X_WG["place"]
    X_WG["place_end"] = X_WG["place"]
    times["postplace"] = times["place_end"] + 2.0
    X_WG["postplace"] = X_WG["preplace"]

    return X_WG, times

In [None]:
def MakeGripperPoseTrajectory(X_G, times):
    sample_times = []
    poses = []
    for name in [
        "initial",
        "prepick",
        "pick_start",
        "pick_end",
        "postpick",
        "clearance",
        "preplace",
        "place_start",
        "place_end",
        "postplace",
    ]:
        sample_times.append(times[name])
        poses.append(X_G[name])

    return PiecewisePose.MakeLinear(sample_times, poses)

In [None]:
def MakeGripperCommandTrajectory(times):
    opened = np.array([0.107])
    closed = np.array([0.0])

    traj_wsg_command = PiecewisePolynomial.FirstOrderHold(
        [times["initial"], times["pick_start"]],
        np.hstack([[opened], [opened]]),
    )
    traj_wsg_command.AppendFirstOrderSegment(times["pick_end"], closed)
    traj_wsg_command.AppendFirstOrderSegment(times["place_start"], closed)
    traj_wsg_command.AppendFirstOrderSegment(times["place_end"], opened)
    traj_wsg_command.AppendFirstOrderSegment(times["postplace"], opened)
    return traj_wsg_command

In [None]:
class GripperTrajectoriesToPosition(LeafSystem):
    def __init__(self, plant, traj_p_G, traj_R_G, traj_wsg_command):
        LeafSystem.__init__(self)
        self.plant = plant
        self.gripper_body = plant.GetBodyByName("body")
        self.left_finger_joint = plant.GetJointByName("left_finger_sliding_joint")
        self.right_finger_joint = plant.GetJointByName("right_finger_sliding_joint")
        self.traj_p_G = traj_p_G
        self.traj_R_G = traj_R_G
        self.traj_wsg_command = traj_wsg_command
        self.plant_context = plant.CreateDefaultContext()

        self.DeclareVectorOutputPort(
            "position", plant.num_positions(), self.CalcPositionOutput
        )

    def CalcPositionOutput(self, context, output):
        t = context.get_time()
        X_G = RigidTransform(Quaternion(self.traj_R_G.value(t)), self.traj_p_G.value(t))
        self.plant.SetFreeBodyPose(self.plant_context, self.gripper_body, X_G)
        wsg = self.traj_wsg_command.value(t)[0, 0]
        self.left_finger_joint.set_translation(self.plant_context, -wsg / 2.0)
        self.right_finger_joint.set_translation(self.plant_context, wsg / 2.0)
        output.SetFromVector(self.plant.GetPositions(self.plant_context))


def visualize_pick_and_place_trajectory(traj_p_G, traj_R_G, traj_wsg_command, X_O):
    builder = DiagramBuilder()

    scene_graph = builder.AddSystem(SceneGraph())
    plant = MultibodyPlant(time_step=0.0)
    plant.RegisterAsSourceForSceneGraph(scene_graph)
    parser = Parser(plant, scene_graph)
    parser.SetAutoRenaming(True)
    gripper = parser.AddModelsFromUrl(
        "package://drake/manipulation/models/wsg_50_description/sdf/schunk_wsg_50_no_tip.sdf"
    )[0]
    brick = "package://drake/examples/manipulation_station/models/061_foam_brick.sdf"
    for key, pose in X_O.items():
        o = parser.AddModelsFromUrl(brick)[0]
        plant.WeldFrames(
            plant.world_frame(), plant.GetFrameByName("base_link", o), pose
        )
    plant.Finalize()

    to_pose = builder.AddSystem(MultibodyPositionToGeometryPose(plant))
    builder.Connect(
        to_pose.get_output_port(),
        scene_graph.get_source_pose_port(plant.get_source_id()),
    )

    traj_to_position = builder.AddSystem(
        GripperTrajectoriesToPosition(plant, traj_p_G, traj_R_G, traj_wsg_command)
    )
    builder.Connect(traj_to_position.get_output_port(), to_pose.get_input_port())

    meshcat.Delete()
    MeshcatVisualizer.AddToBuilder(builder, scene_graph, meshcat)

    diagram = builder.Build()

    simulator = Simulator(diagram)
    meshcat.StartRecording(set_visualizations_while_recording=False)
    simulator.AdvanceTo(traj_p_G.end_time() if running_as_notebook else 0.1)
    meshcat.PublishRecording()

In [None]:
class PseudoInverseController(LeafSystem):
    def __init__(self, plant):
        LeafSystem.__init__(self)
        self._plant = plant
        self._plant_context = plant.CreateDefaultContext()
        self._iiwa = plant.GetModelInstanceByName("iiwa")
        self._G = plant.GetBodyByName("body").body_frame()
        self._W = plant.world_frame()
        
        self.V_G_port = self.DeclareVectorInputPort("V_WG", 6)
        self.q_port = self.DeclareVectorInputPort("iiwa.position", 7)
        self.DeclareVectorOutputPort("iiwa.velocity", 7, self.CalcOutput)
        self.iiwa_start = plant.GetJointByName("iiwa_joint_1").velocity_start()
        self.iiwa_end = plant.GetJointByName("iiwa_joint_7").velocity_start()


    def CalcOutput(self, context, output):
        V_G = self.V_G_port.Eval(context)
        q = self.q_port.Eval(context)
        self._plant.SetPositions(self._plant_context, self._iiwa, q)
        J_G = self._plant.CalcJacobianSpatialVelocity(
            self._plant_context,
            JacobianWrtVariable.kV,
            self._G,
            [0, 0, 0],
            self._W,
            self._W,
        )
        J_G = J_G[:, self.iiwa_start : self.iiwa_end + 1]  # Only iiwa terms.
        v = np.linalg.pinv(J_G).dot(V_G)
        output.SetFromVector(v)

In [None]:
scenario_data = """
directives:
- add_directives:
    file: package://manipulation/clutter.dmd.yaml
- add_model:
    name: black_cube
    file: file:///work/black_cube.sdf
- add_model:
    name: green_cube
    file: file:///work/green_cube.sdf
- add_model:
    name: orange_cube
    file: file:///work/orange_cube.sdf
- add_model:
    name: red_cube
    file: file:///work/red_cube.sdf
- add_model:
    name: yellow_cube
    file: file:///work/yellow_cube.sdf
- add_model:
    name: white_cube
    file: file:///work/white_cube.sdf
- add_model:
    name: blue_cube
    file: file:///work/blue_cube.sdf
- add_model:
    name: purple_cube
    file: file:///work/purple_cube.sdf
model_drivers:
    iiwa: !IiwaDriver
      hand_model_name: wsg
    wsg: !SchunkWsgDriver {}
"""



In [None]:
X_A = []
X_B = []

x = 0.15
for i in range(6):
    X_A.append(RigidTransform(
    RotationMatrix.MakeZRotation(0), [x, -0.5, 0]))
    X_B.append(RigidTransform(
    RotationMatrix.MakeZRotation(np.pi / 2), [0.5, x + 0.083, 0]))
    x -= 0.083

Bin_A = []
Bin_B = []
colors = ["purple", "green", "white", "black", "red","blue", "orange", "yellow"]



In [None]:
def pick_and_place(initial, goal):

    builder = DiagramBuilder()
    scenario = load_scenario(data=scenario_data)
    station = builder.AddSystem(MakeHardwareStation(scenario, meshcat=meshcat))
    plant = station.GetSubsystemByName("plant")

    i = 0
    for body_index in plant.GetFloatingBaseBodies():
        color = colors[i]
        if (Bin_A.__contains__(color)):
            plant.SetDefaultFreeBodyPose(plant.get_body(body_index), X_A[Bin_A.index(color)])
        elif (Bin_B.__contains__(color)):
            plant.SetDefaultFreeBodyPose(plant.get_body(body_index), X_B[Bin_B.index(color)])
        i += 1

    temp_context = station.CreateDefaultContext()
    temp_plant_context = plant.GetMyContextFromRoot(temp_context)

    X_G = {
        "initial": plant.EvalBodyPoseInWorld(
            temp_plant_context, plant.GetBodyByName("body")
        )
    }
    X_O = {
        "initial": initial
        ,
        "goal": goal
    }

    X_G, times = MakeGripperFrames(X_G, X_O)
    
    traj = MakeGripperPoseTrajectory(X_G, times)
    traj_V_G = traj.MakeDerivative()

    V_G_source = builder.AddSystem(TrajectorySource(traj_V_G))
    V_G_source.set_name("v_WG")
    controller = builder.AddSystem(PseudoInverseController(plant))
    controller.set_name("PseudoInverseController")
    builder.Connect(V_G_source.get_output_port(), controller.GetInputPort("V_WG"))
    
    integrator = builder.AddSystem(Integrator(7))
    integrator.set_name("integrator")
    builder.Connect(controller.get_output_port(), integrator.get_input_port())
    builder.Connect(integrator.get_output_port(), station.GetInputPort("iiwa.position"))
    builder.Connect(
        station.GetOutputPort("iiwa.position_measured"),
        controller.GetInputPort("iiwa.position"),
    )

    traj_wsg_command = MakeGripperCommandTrajectory(times)
    wsg_source = builder.AddSystem(TrajectorySource(traj_wsg_command))
    wsg_source.set_name("wsg.command")
    builder.Connect(wsg_source.get_output_port(), station.GetInputPort("wsg.position"))
    
    diagram = builder.Build()
    diagram.set_name("pick_and_place")

    simulator = Simulator(diagram)
    context = simulator.get_mutable_context()
    station_context = station.GetMyContextFromRoot(context)

    integrator.set_integral_value(
        integrator.GetMyContextFromRoot(context),
        plant.GetPositions(
            plant.GetMyContextFromRoot(context),
            plant.GetModelInstanceByName("iiwa"),
        ),
    )

    traj_p_G = traj.get_position_trajectory()

    diagram.ForcedPublish(context)
    meshcat.StartRecording(set_visualizations_while_recording=True)
    simulator.AdvanceTo(traj_p_G.end_time() if running_as_notebook else 0.1)

    
    
    if (X_A.__contains__(initial)):
        initialColor = Bin_A[X_A.index(initial)]
        Bin_A[X_A.index(initial)] = "empty"
    if (X_B.__contains__(initial)):
        initialColor = Bin_B[X_B.index(initial)]
        Bin_B[X_B.index(initial)] = "empty"
    if (X_A.__contains__(goal)):
        Bin_A[X_A.index(goal)] = initialColor if initialColor != "empty" else Bin_A[X_A.index(goal)]
    if (X_B.__contains__(goal)):
        Bin_B[X_B.index(goal)] = initialColor if initialColor != "empty" else Bin_B[X_B.index(goal)]

In [None]:
Bin_A = ["red", "black", "purple", "blue", "empty", "empty"]
Bin_B = ["green", "empty", "orange", "empty", "empty", "empty"]
goal = "Ensure that both bins contain at least one primary color and one secondary color"

In [None]:
client = anthropic.Anthropic(
        api_key="key",
    )
message = client.messages.create(
        model="claude-3-opus-20240229",
        max_tokens=1000,
        temperature=0.0,
        system = open('abstractcontext.txt', 'r').read() + open('abstractcontextexamples.txt', 'r').read(),
        messages=[
            {
                "role": "user",
                "content": [
                        {"type": "text", "text": "Bin A: " + ", ".join(Bin_A)},
                        {"type": "text", "text": "\nBin B: " + ", ".join(Bin_B)},
                        {"type": "text", "text": "Goal: " + ", ".join(goal)},
                ],
            }
        ]
    )
print(message.content[0].text)
concreteinput = message.content[0].text
message = client.messages.create(
        model="claude-3-opus-20240229",
        max_tokens=1000,
        temperature=0.0,
        system = open('concretecontext.txt', 'r').read() + open('concretecontextexamples.txt', 'r').read(),
        messages=[
            {
                "role": "user",
                "content": [
                        {"type": "text", "text": "Initital:\nBin A: " + ", ".join(Bin_A)},
                        {"type": "text", "text": "\nBin B: " + ", ".join(Bin_B)},
                        {"type": "text", "text": "\nGoal:\n" + concreteinput},
                ],
            }
        ]
    )
print(message.content[0].text)

Bin A: red, black, purple, empty, empty, empty
Bin B: green, blue, orange, empty, empty, empty
pick_and_place(X_A[3], X_B[1])


In [None]:
# Start the visualizer.
meshcat = StartMeshcat()

INFO:drake:Meshcat listening for connections at https://f5e4adfe-fc47-4fa8-aac7-6d9c9a6cf41c.deepnoteproject.com/7000/
Installing NginX server for MeshCat on Deepnote...


In [None]:
exec(message.content[0].text)

INFO:drake:PackageMap: Downloading https://github.com/RobotLocomotion/models/archive/fe5326c5ffc36fda12c58883d22d29dc86009d65.tar.gz


<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=f5e4adfe-fc47-4fa8-aac7-6d9c9a6cf41c' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>