In [43]:
import numpy as np
from enum import Enum
from pydrake.all import (StartMeshcat, MeshcatVisualizer, DiagramBuilder,
                        Simulator, LeafSystem, RigidTransform, RotationMatrix,
                        PiecewisePose, AbstractValue, Integrator, JacobianWrtVariable,
                        ConstantVectorSource,)
from manipulation.scenarios import MakeManipulationStation

# For diagram viz
from IPython.display import SVG
import pydot

In [44]:
meshcat = StartMeshcat()

INFO:drake:Meshcat listening for connections at http://localhost:7003


In [147]:
class PseudoInverseController(LeafSystem):
    """
    Ingests a desired iiwa spatial velocity time sequence (differentiated trajectory)
    and outputs a time sequence of joint velocity commands
    """
    def __init__(self, plant):
        LeafSystem.__init__(self)
        self._plant = plant
        self._plant_context = plant.CreateDefaultContext()
        self._iiwa = plant.GetModelInstanceByName("iiwa")
        self._G = plant.GetBodyByName("body").body_frame()
        self._W = plant.world_frame()

        self.V_G_port = self.DeclareVectorInputPort("V_WG", 6)
        self.q_port = self.DeclareVectorInputPort("iiwa_position", 7)
        self.DeclareVectorOutputPort("iiwa_velocity", 7, self.CalcOutput)
        self.iiwa_start = plant.GetJointByName("iiwa_joint_1").velocity_start()
        self.iiwa_end = plant.GetJointByName("iiwa_joint_7").velocity_start()

    def CalcOutput(self, context, output):
        V_G = self.V_G_port.Eval(context)
        q = self.q_port.Eval(context)
        self._plant.SetPositions(self._plant_context, self._iiwa, q)
        J_G = self._plant.CalcJacobianSpatialVelocity(
            self._plant_context, JacobianWrtVariable.kV,
            self._G, [0,0,0], self._W, self._W)
        J_G = J_G[:,self.iiwa_start:self.iiwa_end+1] # Only iiwa terms.
        v = np.linalg.pinv(J_G).dot(V_G)
        output.SetFromVector(v)

class ClosedLoopPseudoInverseController(LeafSystem):
    """
    Ingests a desired iiwa spatial trajectory and outputs joint velocity commands
    """
    def __init__(self, plant, dt=0.05):
        LeafSystem.__init__(self)
        self._plant = plant
        self._plant_context = plant.CreateDefaultContext()
        self._iiwa = plant.GetModelInstanceByName("iiwa")
        self._G = plant.GetBodyByName("body").body_frame()
        self._W = plant.world_frame()
        self.dt = dt # Look ahead time for trajectory follower

        traj_dtype = AbstractValue.Make(PiecewisePose())
        self.T_G_port = self.DeclareAbstractInputPort("T_WG", traj_dtype)
        self.q_port = self.DeclareVectorInputPort("iiwa_position", 7)
        self.DeclareVectorOutputPort("iiwa_velocity", 7, self.CalcOutput)
        self.iiwa_start = plant.GetJointByName("iiwa_joint_1").velocity_start()
        self.iiwa_end = plant.GetJointByName("iiwa_joint_7").velocity_start()

    def CalcOutput(self, context, output):
        
        T_G = self.T_G_port.Eval(context)

        # TODO: Change to .empty() or something like that
        if T_G.get_number_of_segments() == 0:
            output.SetFromVector(np.zeros(7))
            return
        
        # Update internal plant context to match the current state of the plant
        q = self.q_port.Eval(context)
        self._plant.SetPositions(self._plant_context, self._iiwa, q)
        # Forward kinematics to get current pose of the gripper
        X_G = self._plant.CalcRelativeTransform(
            self._plant_context,
            frame_A=self._W,
            frame_B=self._G)
        
        # Evaluate instantaneous trajectory goal
        time = context.get_time()
        time_next = time + self.dt
        X_D_array = T_G.value(time_next) # np array
        X_D = RigidTransform(X_D_array) # convert to RigidTransform
        T_D_mini = PiecewisePose.MakeLinear([time, time_next], [X_G, X_D]) # Mini trajectory to next desired pose
        V_D = T_D_mini.MakeDerivative().value(time) # Instantaneously Desired Velocity

        # Calculate Jacobian
        J_G = self._plant.CalcJacobianSpatialVelocity(
            self._plant_context, JacobianWrtVariable.kV,
            self._G, [0,0,0], self._W, self._W)
        J_G = J_G[:,self.iiwa_start:self.iiwa_end+1] # Only iiwa terms.

        # Calculate joint velocities
        v = np.linalg.pinv(J_G).dot(V_D)
        output.SetFromVector(v)


In [148]:
import os
import pathlib
path = os.getcwd()
MODEL_YAML = pathlib.Path(path + "/../models/recycling.dmd.yaml").as_uri()

class PiecewisePoseSource(LeafSystem):
    def __init__(self, traj):
        LeafSystem.__init__(self)
        self.traj = traj
        self.DeclareAbstractOutputPort("T_WG", lambda: AbstractValue.Make(PiecewisePose()), self.CalcOutput)

    def CalcOutput(self, context, output):
        output.set_value(self.traj)

class IIWA:
    def __init__(self, q0, traj):
        
        # Setup diagram builder components
        builder = DiagramBuilder()
        self.station = MakeManipulationStation(
            filename=MODEL_YAML,
            package_xmls=["../package.xml"])
        builder.AddSystem(self.station)
        self.plant = self.station.GetSubsystemByName("plant")
        self.visualizer = MeshcatVisualizer.AddToBuilder(
            builder, self.station.GetOutputPort("query_object"), meshcat) 

        # # Open loop controller
        # # In case of initializing empty station
        # if traj is not None:
        #     # Connect trajectories to pseudo-inverse controller
        #     traj_V_G = traj.MakeDerivative()
        #     V_G_source = builder.AddSystem(TrajectorySource(traj_V_G))
        #     V_G_source.set_name("V_WG")
        #     self.controller = builder.AddSystem(PseudoInverseController(self.plant))
        #     self.controller.set_name("PseudoInverseController")
        #     builder.Connect(V_G_source.get_output_port(), self.controller.GetInputPort("V_WG"))

        #     # Integrate controller velocity commands to get joint angles
        #     self.integrator = builder.AddSystem(Integrator(7))
        #     self.integrator.set_name("integrator")
        #     builder.Connect(self.controller.get_output_port(),
        #                     self.integrator.get_input_port())
        #     builder.Connect(self.integrator.get_output_port(),
        #                     self.station.GetInputPort("iiwa_position"))
        #     builder.Connect(self.station.GetOutputPort("iiwa_position_measured"),
        #                     self.controller.GetInputPort("iiwa_position"))

        # Closed loop controller
        if traj is not None:
            # Connect trajectories to pseudo-inverse controller
            T_G_source = builder.AddSystem(PiecewisePoseSource(traj))
            self.controller = builder.AddSystem(ClosedLoopPseudoInverseController(self.plant))
            self.controller.set_name("ClosedLoopPseudoInverseController")
            builder.Connect(T_G_source.get_output_port(), self.controller.GetInputPort("T_WG"))

            # Integrate controller velocity commands to get joint angles
            self.integrator = builder.AddSystem(Integrator(7))
            self.integrator.set_name("integrator")
            builder.Connect(self.controller.get_output_port(),
                            self.integrator.get_input_port())
            builder.Connect(self.integrator.get_output_port(),
                            self.station.GetInputPort("iiwa_position"))
            builder.Connect(self.station.GetOutputPort("iiwa_position_measured"),
                            self.controller.GetInputPort("iiwa_position"))
            
        # Fix station input ports for gripper controller to no motion
        wsg_position = builder.AddSystem(ConstantVectorSource([0.1]))
        builder.Connect(wsg_position.get_output_port(),
                        self.station.GetInputPort("wsg_position"))

        # Build
        self.diagram = builder.Build()
        
        # House frames
        self.gripper_frame = self.plant.GetFrameByName('body')
        self.world_frame = self.plant.world_frame()
        
        # Develop context
        self.context = self.diagram.CreateDefaultContext()
        station_context = self.diagram.GetMutableSubsystemContext(self.station, self.context)
        plant_context = self.diagram.GetMutableSubsystemContext(self.plant, self.context)
        
        # Set initial joint positions
        iiwa = self.plant.GetModelInstanceByName("iiwa")
        self.plant.SetPositions(plant_context, iiwa, q0)
        self.plant.SetVelocities(plant_context, iiwa, np.zeros(7))
        wsg = self.plant.GetModelInstanceByName("wsg")
        self.plant.SetPositions(plant_context, wsg, [-0.05, 0.05])
        self.plant.SetVelocities(plant_context, wsg, [0, 0])  
        
        if hasattr(self, 'integrator'):
            self.integrator.set_integral_value(
                self.integrator.GetMyMutableContextFromRoot(self.context), q0)
        
    def Get_X_WG(self):
        
        plant_context = self.diagram.GetMutableSubsystemContext(self.plant, self.context)
        X_WG = self.plant.CalcRelativeTransform(
                    plant_context,
                    frame_A=self.world_frame,
                    frame_B=self.gripper_frame)
        return X_WG
    
    def Simulate(self, t):
        
        self.simulator = Simulator(self.diagram, self.context)
        self.simulator.set_target_realtime_rate(1.0)
        self.visualizer.StartRecording()
        self.simulator.AdvanceTo(t)
        self.visualizer.PublishRecording()
        
    def ShowDiagram(self):
        
        SVG(pydot.graph_from_dot_data(self.diagram.GetGraphvizString())[0].create_svg())
        

In [None]:
meshcat.Delete()

q0 = np.array([ -1.57,  1.56461165e-01, -3.82761069e-05,
                       -1.32296976e+00, -6.29097287e-06,  1.61181157e+00, -2.66900985e-05])

temp_iiwa = IIWA(q0, None)
X_init = temp_iiwa.Get_X_WG()

R = RotationMatrix(np.array([[0, 0, -1],
                             [1, 0, 0],
                             [0, -1, 0]]))
X_recycle = RigidTransform(R, [0.38, 0.05, 0.6])
X_waste = RigidTransform(R, [0.38, -0.15, 0.6])
X_organic = RigidTransform(R, [0.38, -0.35, 0.6])

traj = PiecewisePose.MakeLinear([0, 2], [X_init, X_waste])
traj = PiecewisePose()

iiwa = IIWA(q0, traj)

In [None]:
iiwa.Simulate(10)