Chess Playing Robot

Enter a game as a series of moves and the robot will play out the game on simulation. 

In [50]:
import logging
import os
from copy import copy
from enum import Enum

import numpy as np

from pydrake.common import temp_directory
from pydrake.geometry import StartMeshcat
from pydrake.systems.analysis import Simulator
from pydrake.visualization import ModelVisualizer
from pydrake.all import (
    AbstractValue,
    AddMultibodyPlantSceneGraph,
    Concatenate,
    DiagramBuilder,
    InputPortIndex,
    LeafSystem,
    MeshcatVisualizer,
    Parser,
    PiecewisePolynomial,
    ConstantVectorSource,
    PiecewisePose,
    PointCloud,
    PortSwitch,
    RandomGenerator,
    RigidTransform,
    RollPitchYaw,
    Simulator,
    StartMeshcat,
    UniformlyRandomRotationMatrix,
)

from manipulation import ConfigureParser, FindResource, running_as_notebook
from manipulation.clutter import GenerateAntipodalGraspCandidate
from manipulation.meshcat_utils import AddMeshcatTriad
from manipulation.pick import (
    MakeGripperCommandTrajectory,
    MakeGripperFrames,
    MakeGripperPoseTrajectory,
)
from manipulation.scenarios import AddIiwaDifferentialIK, ycb, AddMultibodyTriad
from manipulation.station import (
    AddPointClouds,
    MakeHardwareStation,
    add_directives,
    load_scenario,
)


class NoDiffIKWarnings(logging.Filter):
    def filter(self, record):
        return not record.getMessage().startswith("Differential IK")


logging.getLogger("drake").addFilter(NoDiffIKWarnings())

In [52]:
# Start the visualizer.
meshcat = StartMeshcat()


INFO:drake:Meshcat listening for connections at http://localhost:7005


Create the Scenario and the Environment

In [71]:
temp_dir = temp_directory()
table_top_sdf_file = os.path.join(temp_dir, "table_top.sdf")
table_top_sdf = """<?xml version="1.0"?>
<sdf version="1.7">

  <model name="table_top">
    <link name="table_top_link">
      <inertial>
        <mass>18.70</mass>
        <inertia>
          <ixx>0.79</ixx>
          <ixy>0</ixy>
          <ixz>0</ixz>
          <iyy>0.53</iyy>
          <iyz>0</iyz>
          <izz>1.2</izz>
        </inertia>
      </inertial>
    <visual name="bottom">
        <pose>0.0 0.0 0.445 0 0 0</pose>
        <geometry>
          <box>
            <size>0.49 0.63 0.015</size>
          </box>
        </geometry>
        <material>
          <diffuse>0.9 0.9 0.9 1.0</diffuse>
        </material>
      </visual>
      <collision name="bottom">
        <pose>0.0 0.0 0.445 0 0 0</pose>
        <geometry>
          <box>
            <size>0.49 0.63 0.015</size>
          </box>
        </geometry>
        <drake:proximity_properties>
          <drake:compliant_hydroelastic/>
          <drake:hydroelastic_modulus>1.0e6</drake:hydroelastic_modulus>
        </drake:proximity_properties>
      </collision>
    </link>
    <frame name="table_top_center">
      <pose relative_to="table_top_link">0 0 0.47 0 0 0</pose>
    </frame>
  </model>
</sdf>

"""

with open(table_top_sdf_file, "w") as f:
    f.write(table_top_sdf)

cwd = os.getcwd()
your_model_filename = cwd+"/Board.sdf"

def convert_to_coords(pos):
    x = ord(pos[0]) - ord('a')
    y = int(pos[1])-1
    return "" + str(.6*.125*(y-3.5)) + ", "+ str(.6*.125*(x-3.5)) 

def get_starting_pose(pos):
    # They are flipped buy oh well
    x = ord(pos[0]) - ord('a')
    y = int(pos[1])-1
    return RigidTransform([.6*.125*(y-3.5), .6*.125*(x-3.5), 0])

def add_piece(piece_id, name,  pos):
    return f"""
- add_model:
    name: {piece_id}
    file: file://{cwd}/{name}.sdf
    default_free_body_pose:
        {name}:
            translation: [{convert_to_coords(pos)}, .75]
            rotation: !Rpy {{ deg: [0, 0, 0] }}"""

scenario = None
def create_scene():
    global scenario
    scenario_data = f"""
directives:
- add_model:
    name: table_top
    file: file://{table_top_sdf_file}
- add_weld:
    parent: world
    child: table_top::table_top_center
"""
    if your_model_filename:
        scenario_data += f"""
- add_model:
    name: board
    file: file://{your_model_filename}
    default_free_body_pose:
        Board:
            translation: [0, 0, .5]
            rotation: !Rpy {{ deg: [90, 0, 0] }}    
"""
    scenario_data += add_piece('p1', 'BlackKnight',  'b8')
    scenario_data += add_piece('p2', 'BlackBishop',  'c8')
    scenario_data += add_piece('p3', 'BlackKnight',  'g8')
    scenario_data += add_piece('p4', 'BlackBishop',  'f8')
    scenario_data += add_piece('p5', 'BlackQueen',  'd8')
    scenario_data += add_piece('p6', 'BlackRook',  'a8')
    scenario_data += add_piece('p7', 'BlackRook',  'h8')
    scenario_data += add_piece('p8', 'BlackKing',  'e8') 
    scenario_data += add_piece('p9', 'WhiteKnight',  'b1')
    scenario_data += add_piece('p10', 'WhiteBishop',  'c1')
    scenario_data += add_piece('p11', 'WhiteKnight',  'g1')
    scenario_data += add_piece('p12', 'WhiteBishop',  'f1')
    scenario_data += add_piece('p13', 'WhiteQueen',  'd1')
    scenario_data += add_piece('p14', 'WhiteRook',  'a1')
    scenario_data += add_piece('p15', 'WhiteRook',  'h1')
    scenario_data += add_piece('p16', 'WhiteKing',  'e1')
    for i in range(8): 
       scenario_data += add_piece(f"p{17+i}", 'WhitePawn',  f"{chr(97+i)}2")
       scenario_data += add_piece(f"p{25+i}", 'BlackPawn',  f"{chr(97+i)}7")
    
    scenario_data += """
- add_model:
    name: iiwa
    file: package://drake/manipulation/models/iiwa_description/sdf/iiwa14_no_collision.sdf
    default_joint_positions:
        iiwa_joint_1: [1.57]
        iiwa_joint_2: [0.1]
        iiwa_joint_3: [0]
        iiwa_joint_4: [-1.2]
        iiwa_joint_5: [0]
        iiwa_joint_6: [ 1.6]
        iiwa_joint_7: [0]
- add_weld:
    parent: world
    child: iiwa::iiwa_link_0
    X_PC:
        translation: [0, -.55, 0]

- add_model:
    name: gripper
    file: package://drake/manipulation/models/wsg_50_description/sdf/schunk_wsg_50_no_tip.sdf
- add_weld:
    parent: iiwa_link_7
    child: gripper::body
    X_PC:
        translation: [0, 0, 0.09]
        rotation: !Rpy { deg: [90, 0, 0] }
- add_model:
    name: camera
    file: package://manipulation/camera_box.sdf
- add_weld:
    parent: gripper::body
    child: camera::base
    X_PC:
        translation: [0, 0.03, 0.04]
        # Point slightly down towards camera
        # RollPitchYaw(0, -0.2, 0.2) @ RollPitchYaw(-np.pi/2, 0, np.pi/2)
        rotation: !Rpy { deg: [-90, 0, 0] }
model_drivers:
    iiwa: !IiwaDriver
        hand_model_name: gripper
    gripper: !SchunkWsgDriver {}
cameras:
    main_camera:
        name: camera0
        depth: True
        X_PB:
            base_frame: camera::base
"""
    with open('board.yaml', 'w') as f:
        f.write(scenario_data)
        
    scenario = load_scenario(data=scenario_data)
    return scenario

Now the Chess Robot Control Part

In [74]:
rng = np.random.default_rng(135)  # this is for python
generator = RandomGenerator(rng.integers(0, 1000))  # this is for c++

class GraspSelector(LeafSystem):
    def __init__(self, plant, camera_body_index):
        LeafSystem.__init__(self)
        model_point_cloud = AbstractValue.Make(PointCloud(0))
        #input ports from camera and the current pose of the robot
        self.DeclareAbstractInputPort("cloud_W", model_point_cloud)
        self.DeclareAbstractInputPort("body_poses", AbstractValue.Make([RigidTransform()]))
        #output the rigid transform result of running the select grasp function
        port = self.DeclareAbstractOutputPort(
            "grasp_selection",
            lambda: AbstractValue.Make((np.inf, RigidTransform())),
            self.SelectGrasp)
        port.disable_caching_by_default()

    def SelectGrasp(self, context, output):
        body_poses = self.get_input_port(1).Eval(context)
        cloud = self.get_input_port(0).Eval(context)
        #now need to figure out how to crop the cloud 
        


class PlannerState(Enum):
    WAIT_FOR_OBJECTS_TO_SETTLE = 1
    MOVING_TO_LOCATION = 2
    PICK_UP_PIECE = 3
    PLACE_PIECE = 4
    GO_HOME = 5

class Planner(LeafSystem):
    def __init__(self, plant):
        LeafSystem.__init__(self)
        self._gripper_body_index = plant.GetBodyByName("body").index()
        self.DeclareAbstractInputPort(
            "body_poses", AbstractValue.Make([RigidTransform()])
        )
        self._piece_grasp_index = self.DeclareAbstractInputPort(
            "piece_grasp", AbstractValue.Make((np.inf, RigidTransform()))
        ).get_index()
        self._wsg_state_index = self.DeclareVectorInputPort(
            "wsg_state", 2
        ).get_index()

        self._mode_index = self.DeclareAbstractState(
            AbstractValue.Make(PlannerState.WAIT_FOR_OBJECTS_TO_SETTLE)
        )
        self._traj_X_G_index = self.DeclareAbstractState(
            AbstractValue.Make(PiecewisePose())
        )
        self._traj_wsg_index = self.DeclareAbstractState(
            AbstractValue.Make(PiecewisePolynomial())
        )
        self._times_index = self.DeclareAbstractState(
            AbstractValue.Make({"initial": 0.0})
        )
        self._attempts_index = self.DeclareDiscreteState(1)

        self.DeclareAbstractOutputPort(
            "X_WG",
            lambda: AbstractValue.Make(RigidTransform()),
            self.CalcGripperPose,
        )
        self.DeclareVectorOutputPort("wsg_position", 1, self.CalcWsgPosition)

        # For GoHome mode.
        num_positions = 7
        self._iiwa_position_index = self.DeclareVectorInputPort(
            "iiwa_position", num_positions
        ).get_index()
        self.DeclareAbstractOutputPort(
            "control_mode",
            lambda: AbstractValue.Make(InputPortIndex(0)),
            self.CalcControlMode,
        )
        self.DeclareAbstractOutputPort(
            "reset_diff_ik",
            lambda: AbstractValue.Make(False),
            self.CalcDiffIKReset,
        )
        self._q0_index = self.DeclareDiscreteState(num_positions)  # for q0
        self._traj_q_index = self.DeclareAbstractState(
            AbstractValue.Make(PiecewisePolynomial())
        )
        self.DeclareVectorOutputPort(
            "iiwa_position_command", num_positions, self.CalcIiwaPosition
        )

        self.DeclarePeriodicUnrestrictedUpdateEvent(0.1, 0.0, self.Update)

    def Update(self, context, state):
        mode = context.get_abstract_state(int(self._mode_index)).get_value()

        current_time = context.get_time()
        times = context.get_abstract_state(int(self._times_index)).get_value()

        if mode == PlannerState.WAIT_FOR_OBJECTS_TO_SETTLE:
            if context.get_time() - times["initial"] > 1.0:
                self.Plan(context, state)
            return

        # If we are between pick and place and the gripper is closed, then
        # we've missed or dropped the object.  Time to replan.

        #only update a new plan when the current trajectory is 
        traj_X_G = context.get_abstract_state(
            int(self._traj_X_G_index)
        ).get_value()
        if not traj_X_G.is_time_in_range(current_time):
            self.Plan(context, state)
            return

    def Plan(self, context, state):
        mode = copy(
            state.get_mutable_abstract_state(int(self._mode_index)).get_value()
        )

        X_G = {
            "initial": self.get_input_port(0).Eval(context)[
                int(self._gripper_body_index)
            ]
        }
        """
        MOVING_TO_LOCATION = 2
        PICK_UP_PIECE = 3
        PLACE_PIECE = 4
        GO_HOME = 5
        """

        X_G["pick"] = RigidTransform(
                    RollPitchYaw(-np.pi / 2, 0, 0),
                    [0,0,0],
                )

        match mode:

            case PlannerState.WAIT_FOR_OBJECTS_TO_SETTLE:
                state.get_mutable_abstract_state(int(self._mode_index)).set_value(PlannerState.MOVING_TO_LOCATION)
                X_G["place"] = RigidTransform(
                    RollPitchYaw(-np.pi / 2, 0, 0),
                    [0,0.1,0],
                )
            case PlannerState.MOVING_TO_LOCATION:
                X_G["place"] = RigidTransform(
                    RollPitchYaw(-np.pi / 2, 0, 0),
                    [0,0.1,0],
                )
            case PlannerState.PICK_UP_PIECE:
                X_G["place"] = RigidTransform(
                    RollPitchYaw(-np.pi / 2, 0, 0),
                    [rng.uniform(-0.25, 0.15), rng.uniform(-0.6, -0.4), 0.3],
                )
            case PlannerState.PLACE_PIECE:
                X_G["place"] = RigidTransform(
                    RollPitchYaw(-np.pi / 2, 0, 0),
                    [rng.uniform(-0.25, 0.15), rng.uniform(-0.6, -0.4), 0.3],
                )
        
        X_G, times = MakeGripperFrames(X_G, t0=context.get_time())
        print(
            f"Planned {times['postplace'] - times['initial']} second trajectory in mode {mode} at time {context.get_time()}."
        )
        state.get_mutable_abstract_state(int(self._times_index)).set_value(
            times
        )

        traj_X_G = MakeGripperPoseTrajectory(X_G, times)
        traj_wsg_command = MakeGripperCommandTrajectory(times)

        state.get_mutable_abstract_state(int(self._traj_X_G_index)).set_value(
            traj_X_G
        )
        state.get_mutable_abstract_state(int(self._traj_wsg_index)).set_value(
            traj_wsg_command
        )


    def start_time(self, context):
        return (
            context.get_abstract_state(int(self._traj_X_G_index))
            .get_value()
            .start_time()
        )

    def end_time(self, context):
        return (
            context.get_abstract_state(int(self._traj_X_G_index))
            .get_value()
            .end_time()
        )

    def CalcGripperPose(self, context, output):
        context.get_abstract_state(int(self._mode_index)).get_value()

        traj_X_G = context.get_abstract_state(
            int(self._traj_X_G_index)
        ).get_value()
        if traj_X_G.get_number_of_segments() > 0 and traj_X_G.is_time_in_range(
            context.get_time()
        ):
            # Evaluate the trajectory at the current time, and write it to the
            # output port.
            output.set_value(
                context.get_abstract_state(int(self._traj_X_G_index))
                .get_value()
                .GetPose(context.get_time())
            )
            return

        # Command the current position (note: this is not particularly good if the velocity is non-zero)
        output.set_value(
            self.get_input_port(0).Eval(context)[int(self._gripper_body_index)]
        )

    def CalcWsgPosition(self, context, output):
        mode = context.get_abstract_state(int(self._mode_index)).get_value()
        opened = np.array([0.107])
        np.array([0.0])

        if mode == PlannerState.GO_HOME:
            # Command the open position
            output.SetFromVector([opened])
            return

        traj_wsg = context.get_abstract_state(
            int(self._traj_wsg_index)
        ).get_value()
        if traj_wsg.get_number_of_segments() > 0 and traj_wsg.is_time_in_range(
            context.get_time()
        ):
            # Evaluate the trajectory at the current time, and write it to the
            # output port.
            output.SetFromVector(traj_wsg.value(context.get_time()))
            return

        # Command the open position
        output.SetFromVector([opened])

    def CalcControlMode(self, context, output):
        output.set_value(InputPortIndex(1))  # Diff IK

    def CalcDiffIKReset(self, context, output):
        output.set_value(False)

    def Initialize(self, context, discrete_state):
        discrete_state.set_value(
            int(self._q0_index),
            self.get_input_port(int(self._iiwa_position_index)).Eval(context),
        )

    def CalcIiwaPosition(self, context, output):
        traj_q = context.get_mutable_abstract_state(
            int(self._traj_q_index)
        ).get_value()

        output.SetFromVector(traj_q.value(context.get_time()))



def play_chess():
    meshcat.Delete()
    builder = DiagramBuilder()

    scenario = create_scene()

    station = builder.AddSystem(MakeHardwareStation(scenario, meshcat))
    to_point_cloud = AddPointClouds(
        scenario=scenario, station=station, builder=builder
    )
    plant = station.GetSubsystemByName("plant")

    ##now need to add the GraspSelector and the Planner  
    piece_selector = builder.AddSystem(
        GraspSelector(
            plant, 
            camera_body_index = plant.GetBodyIndices(plant.GetModelInstanceByName("camera"))[0]
        )
    )
    #grasp selector looks at the input from camera and from the robot position
    builder.Connect(
        to_point_cloud["camera0"].get_output_port(),
        piece_selector.get_input_port(0),
    )
    builder.Connect(
        station.GetOutputPort("body_poses"),
        piece_selector.GetInputPort("body_poses"),
    )

    #now create the planner
    planner = builder.AddSystem(Planner(plant))
    builder.Connect(
        station.GetOutputPort("body_poses"), 
        planner.GetInputPort("body_poses")
    )
    builder.Connect(
        piece_selector.get_output_port(),
        planner.GetInputPort("piece_grasp"),
    )
    builder.Connect(
        station.GetOutputPort("gripper.state_measured"),
        planner.GetInputPort("wsg_state"),
    )
    builder.Connect(
        station.GetOutputPort("iiwa.position_measured"),
        planner.GetInputPort("iiwa_position"),
    )

    robot = station.GetSubsystemByName(
        "iiwa.controller"
    ).get_multibody_plant_for_control()

    # Set up differential inverse kinematics.
    diff_ik = AddIiwaDifferentialIK(builder, robot)
    builder.Connect(planner.GetOutputPort("X_WG"), diff_ik.get_input_port(0))
    builder.Connect(
        station.GetOutputPort("iiwa.state_estimated"),
        diff_ik.GetInputPort("robot_state"),
    )
    builder.Connect(
        planner.GetOutputPort("reset_diff_ik"),
        diff_ik.GetInputPort("use_robot_state"),
    )

    builder.Connect(
        planner.GetOutputPort("wsg_position"),
        station.GetInputPort("gripper.position"),
    )

    # The DiffIK and the direct position-control modes go through a PortSwitch
    switch = builder.AddSystem(PortSwitch(7))
    builder.Connect(
        diff_ik.get_output_port(), switch.DeclareInputPort("diff_ik")
    )
    builder.Connect(
        planner.GetOutputPort("iiwa_position_command"),
        switch.DeclareInputPort("position"),
    )
    builder.Connect(
        switch.get_output_port(), station.GetInputPort("iiwa.position")
    )
    builder.Connect(
        planner.GetOutputPort("control_mode"),
        switch.get_port_selector_input_port(),
    )

    visualizer = MeshcatVisualizer.AddToBuilder(
        builder, station.GetOutputPort("query_object"), meshcat
    )
    diagram = builder.Build()

    simulator = Simulator(diagram)
    context = simulator.get_context()

    simulator.AdvanceTo(0.1)
    meshcat.Flush()  # Wait for the large object meshes to get to meshcat.

    if running_as_notebook:
        simulator.set_target_realtime_rate(1.0)
        meshcat.AddButton("Stop Simulation", "Escape")
        print("Press Escape to stop the simulation")
        while meshcat.GetButtonClicks("Stop Simulation") < 1:
            simulator.AdvanceTo(simulator.get_context().get_time() + 2.0)
        meshcat.DeleteButton("Stop Simulation")



play_chess()

    

ERROR: In vtkPNGReader.cxx, line 133
vtkPNGReader (0x563074506a70): Unknown file type! Not a PNG file!

ERROR: In vtkPNGReader.cxx, line 272
vtkPNGReader (0x563074506a70): Invalid file header: not a PNG file

ERROR: In vtkPNGReader.cxx, line 133
vtkPNGReader (0x563074506a70): Unknown file type! Not a PNG file!

ERROR: In vtkPNGReader.cxx, line 396
vtkPNGReader (0x563074506a70): Invalid file header: not a PNG file

ERROR: In vtkPNGReader.cxx, line 133
vtkPNGReader (0x563073a6b2b0): Unknown file type! Not a PNG file!

ERROR: In vtkPNGReader.cxx, line 272
vtkPNGReader (0x563073a6b2b0): Invalid file header: not a PNG file

ERROR: In vtkPNGReader.cxx, line 133
vtkPNGReader (0x563073a6b2b0): Unknown file type! Not a PNG file!

ERROR: In vtkPNGReader.cxx, line 396
vtkPNGReader (0x563073a6b2b0): Invalid file header: not a PNG file

LCM detected that large packets are being received, but the kernel UDP
receive buffer is very small.  The possibility of dropping packets due to
insufficient buffer

Press Escape to stop the simulation
Planned 21.191879714073714 second trajectory in mode PlannerState.WAIT_FOR_OBJECTS_TO_SETTLE at time 1.1.


KeyboardInterrupt: 

new plan, 

make a camera thing that is a system

and make a trajecotry creator thing system

that is it, the traj system just feeds the robot moves to do 