In [None]:
import os
from functools import partial

from IPython.display import display, clear_output, SVG
import mcubes
import numpy as np
import pydot

import pydrake
from pydrake.all import (
    Adder, AddMultibodyPlantSceneGraph, Box, Cylinder, DiagramBuilder, 
    InverseKinematics, MeshcatVisualizerCpp, MeshcatVisualizerParams,
    Parser, FindResourceOrThrow, PassThrough,
    RigidTransform, RollPitchYaw, RotationMatrix, Simulator, Solve, Sphere,
    ConnectPlanarSceneGraphVisualizer, Demultiplexer, MultibodyPlant, PiecewiseQuaternionSlerp,
    InverseDynamicsController, StateInterpolatorWithDiscreteDerivative, TrajectorySource,
    LeafSystem, BasicVector, JacobianWrtVariable, RollPitchYaw, VectorLogSink, SpatialInertia, UnitInertia, Box, CoulombFriction,
    MathematicalProgram, Solve, eq, le, ge
)
from pydrake.examples.manipulation_station import ManipulationStation
from manipulation.meshcat_cpp_utils import (
    StartMeshcat, MeshcatPoseSliders, AddMeshcatTriad
)
from manipulation.scenarios import (
    AddIiwa, AddPlanarIiwa, AddTwoLinkIiwa, AddWsg, AddShape
)
from manipulation.utils import FindResource
from manipulation import running_as_notebook
from pydrake.trajectories import PiecewisePolynomial
from pydrake.multibody import inverse_kinematics
from pydrake.systems.controllers import PidController
import time
# from scipy.spatial.distance import cdist

In [None]:
# Start the visualizer.
meshcat = StartMeshcat()

In [None]:
# Function to setup and solve TrajOpt problem to generate wiping locations on table

def create_optimized_wipe_traj(dirt_locs, start_pos, end_pos, n):
    prog = MathematicalProgram()

    # Create optimization variables for the x and y positions that we will command the 
    # arm to
    x_locs = prog.NewContinuousVariables(n, "x")
    y_locs = prog.NewContinuousVariables(n, "y")

    # Add a squared-distance cost to the points to incentivize the agent to not take a massively 
    # deviating trajectory
    prog.AddCost(x_locs.dot(x_locs) + y_locs.dot(y_locs))

    def vec_min(x):
        min_distance = x[0]
        for i in range(1, n):
            # min_distance = pydrake.math.min(min_distance, x[i])
            min_distance = pydrake.symbolic.min(min_distance, x[i])

        return min_distance


    def min_distance_constraint(d, x_locs, y_locs):
        distance_constraint = vec_min([((d[0]-x_locs[i])**2 + (d[1]-y_locs[i])**2) for i in range(n)])
        # distance_constraint = vec_min([(abs(d[0]-x_locs[i]) + abs(d[1]-y_locs[i])) for i in range(n)])
        # print(distance_constraint)
        return distance_constraint

    # for d in dirt_locs:
    #     prog.AddConstraint(min_distance_constraint(d, x_locs, y_locs) <= 0.1)


    # near_dirt_constraint = le(min_distance_constraint(dirt_locs[1], x_locs, y_locs), 0.02)
    near_dirt_constraint = min_distance_constraint(dirt_locs[1], x_locs, y_locs) <= 0.02
    prog.AddConstraint(near_dirt_constraint)

    # Constrain the solution such that the first and last positions match start_pos and end_pos
    # prog.AddConstraint(x_locs[0] == start_pos[0])
    # prog.AddConstraint(y_locs[0] == start_pos[1])
    # prog.AddConstraint(x_locs[-1] == end_pos[0])
    # prog.AddConstraint(y_locs[-1] == end_pos[1])
    prog.AddConstraint(ge(x_locs, np.zeros(n)))
    prog.AddConstraint(le(x_locs, np.ones(n) * 2.0))
    prog.AddConstraint(ge(y_locs, np.ones(n) * -0.7))
    prog.AddConstraint(le(y_locs, np.ones(n) * 0.7))

    prog.SetInitialGuess(x_locs, np.array([0.8 for i in range(n)]))
    prog.SetInitialGuess(y_locs, np.array([0.1 for i in range(n)]))

    print(prog)
    result = Solve(prog)

    # 6. Get the solution
    if (result.is_success()): 
        sol = result.GetSolution()
        x_sol = sol[0:n]
        y_sol = sol[n:]
        print(f"X Solution: {x_sol}")
        print(f"Y Solution: {y_sol}")
    else:
        print("Failed to Solve :(")

In [None]:
dirt_locs = np.array([[0.64, 0.05]])

In [None]:
# Our controller definition

def compute_ctrl(p_xyz_now, v_xyz_now, rpy_now, px_des, py_des, fz_des, roll_des, pitch_des, yaw_des, pure_pos_control_mode):
  """Compute control action given current position and velocities, as well as 
  desired x-direction position p_des(t) / desired z-direction force f_des. 
  You may set theta_des yourself, though we recommend regulating it to zero. 
  Input:
    - p_xyz_now: np.array (dim 3), position of the EE. [px, py, pz] 
    - v_xyz_now: np.array (dim 6), velocity of the EE. [vroll, vpitch, vyaw, vx, vy, vz] 
    - rpy_now: np.array (dim 3), roll, pitch, yaw of the EE
    - px_des: float, desired x position
    - py_des: float, desired y position
    - fz_des: float, desired z force to exert
    - roll_des: float, desired roll in radians
    - pitch_des: float, desired pitch in radians
    - yaw_des: float, desired yaw in radians
    - pure_pos_control_mode: bool, true if we're in the pure position control setting of our manipulator (i.e, we're not in contact yet), 
      and false otherwise
  Output:
    - u: np.array (dim 6), spatial torques to send to the manipulator. [tau_x, tau_y, tau_z, fx, fy, fz]
    - reached_des: bool, True if the EE currently has reached (epsilon near) the desired x,y position. False otherwise 
  """
  u = np.zeros(6)
  reached_des = False
  epsilon = 0.055

  if not pure_pos_control_mode:
    # D roll
    kproll = 0.01
    kdroll = 2.0*np.sqrt(kproll)  # should really be kproll*I on inside
    # u[0] = kproll * (roll_des - rpy_now[0]) - kdroll*v_xyz_now[0]
    u[0] = -kdroll*v_xyz_now[0]

    # D pitch
    kppitch = 0.12
    kdpitch = 2.0*np.sqrt(kppitch)  # should really be kppitch*I on inside
    # u[1] = kppitch * (pitch_des - rpy_now[1]) - kdpitch*v_xyz_now[1]
    u[1] = - kdpitch*v_xyz_now[1]

    # D yaw
    kpyaw = 0.06
    kdyaw = 2.0*np.sqrt(kpyaw)  # should really be kpyaw*I on inside
    # u[2] = kpyaw * (yaw_des - rpy_now[2]) - kdyaw*v_xyz_now[2]
    u[2] = - kdyaw*v_xyz_now[2]

    # PD X
    kpx = 6.0
    kdx = 2.0*np.sqrt(kpx)  # should really be kpx*I on inside
    u[3] = kpx*(px_des - p_xyz_now[0]) - kdx*v_xyz_now[3]

    # PD Y
    kpy = 4.0
    kdy = 2.0*np.sqrt(kpy)  # should really be kpy*I on inside
    u[4] = kpy*(py_des - p_xyz_now[1]) - kdy*v_xyz_now[4]

    # PD Force Controller Z
    kpz = 0.8
    kdz = 0
    u[5] = -kpz * fz_des - kdz*v_xyz_now[5]

    if abs(px_des - p_xyz_now[0]) <= epsilon and abs(py_des - p_xyz_now[1]) <= epsilon:
      reached_des = True

  return u, reached_des

class TorqueController(LeafSystem):
  """Wrapper System for Commanding Pure Torques to planar iiwa.
    @param plant MultibodyPlant of the simulated plant.
    @param iiwa 
  """
  def __init__(self, plant, iiwa):
    LeafSystem.__init__(self)

    self._plant = plant 
    self._plant_context = plant.CreateDefaultContext() 
    self._iiwa = iiwa
    self._G = plant.GetFrameByName("iiwa_link_7", iiwa)
    self._W = plant.world_frame()
    self._ctrl_fun = compute_ctrl
    self._vx = -0.08125
    self._ik = inverse_kinematics.InverseKinematics(plant)
    self._curr_position_control_timesteps = 0
    self._des_position_control_timesteps = 16000
    self._curr_des_xypos_idx = 0

    self.DeclareVectorInputPort("iiwa_position_measured", BasicVector(7))
    self.DeclareVectorInputPort("iiwa_velocity_measured", BasicVector(7))
    # self.DeclareVectorInputPort("iiwa_torque_external", BasicVector(7)) # This was added in to do force control on z

    self.DeclareVectorOutputPort("iiwa_position_command", BasicVector(7),
                                 self.CalcPositionOutput)
    self.DeclareVectorOutputPort("iiwa_torque_cmd", BasicVector(7),
                                 self.CalcTorqueOutput)
    self.DeclareVectorOutputPort("iiwa_torque_log", BasicVector(7),
                                 self.CalcTorqueOutputLog)

  def CalcPositionOutput(self, context, output):
    """Set q_d = q_now. This ensures the iiwa goes into pure torque mode in sim by setting the 
    position control torques in InverseDynamicsController to zero. 
    NOTE(terry-suh): Do not use this method on hardware or deploy this notebook on hardware. 
    We can only simulate pure torque control mode for iiwa on sim. 
    """
    if self._curr_position_control_timesteps < self._des_position_control_timesteps:
      # TEMPORARY HACK: Initialize the pose to be on the table
      q_d = np.array([ 0.08779705,  0.68502306,  0.00395995, -1.60894861, -0.00333842,  0.84763008, 0.09307069]) * (self._curr_position_control_timesteps/self._des_position_control_timesteps)
      print(q_d)
      self._curr_position_control_timesteps += 1
    else:
      # Set the joint poses to their current values, to simulate torque control
      q_d = self.get_input_port(0).Eval(context)
    
    # print(f"q_d = {q_d}")
    output.SetFromVector(q_d)

  def CalcTorqueOutput(self, context, output):    
    # Hard-coded position and force profiles. Can be connected from Trajectory class. 
    des_xypos_list = [[0.8, 0.1], [0.55, 0.0]]
    fz_des = 1.50 #1.25
    # The desired roll, pitch and yaw are hardcoded to keep the gripper 'upright' w.r.t
    # the table so wiping can be accomplished correctly.
    roll_des = 3.13926408 
    pitch_des = 0.03596836
    yaw_des = -3.14142808

    # Read inputs 
    q_now = self.get_input_port(0).Eval(context)
    v_now = self.get_input_port(1).Eval(context)

    self._plant.SetPositions(self._plant_context, self._iiwa, q_now)

    # 1. Retrieve the cartesian location of the gripper (i.e, the last iiwa link)
    X_now = self._plant.CalcRelativeTransform(self._plant_context, self._W, self._G)
    rpy_now = RollPitchYaw(X_now.rotation()).vector()
    p_xyz_now = X_now.translation()
    print("Position:", p_xyz_now)

    # 2. Compute the Jacobian of the gripper frame w.r.t the robot arm base frame and use 
    # this to derive the cartesian velocity vector of the gripper frame
    J_G = self._plant.CalcJacobianSpatialVelocity(
        self._plant_context, JacobianWrtVariable.kQDot, 
        self._G, [0,0,0], self._W, self._W)
    v_xyz_now = J_G.dot(v_now)

    # 2. Apply ctrl_fun given the position and velocity of the robot to compute the 
    # EE-torque in Cartesian Space
    if self._curr_position_control_timesteps < self._des_position_control_timesteps: # If we're in pure pos control mode, output all 0's for torque
      F_xyz, reached_des = self._ctrl_fun(p_xyz_now, v_xyz_now, rpy_now, des_xypos_list[self._curr_des_xypos_idx][0], des_xypos_list[self._curr_des_xypos_idx][1], fz_des, roll_des, pitch_des, yaw_des, True)
    else: # Else, output the actual torque control command
      F_xyz, reached_des = self._ctrl_fun(p_xyz_now, v_xyz_now, rpy_now, des_xypos_list[self._curr_des_xypos_idx][0], des_xypos_list[self._curr_des_xypos_idx][1], fz_des, roll_des, pitch_des, yaw_des, False)
      if reached_des:
        if self._curr_des_xypos_idx < len(des_xypos_list) - 1:
          self._curr_des_xypos_idx += 1
        else:
          self._curr_des_xypos_idx = 0
        # print("Moving on to next wiping position!")

    # 3. Convert this torque back to joint space to apply a differential torque command!
    tau_cmd = J_G.T.dot(F_xyz)
    #print("Force:", F_xyz)
    output.SetFromVector(tau_cmd)
    return tau_cmd

  def CalcTorqueOutputLog(self, context, output):
    tau = self.CalcTorqueOutput(context, output)
    # print("tau:", tau)
    return tau

#####################################

In [None]:
# Setup wiping bot simulation and run controller

def setup_wiping_bot(time_step=0.0002):
    builder = DiagramBuilder()

    # Setting up environment
    plant, scene_graph = AddMultibodyPlantSceneGraph(builder, time_step=time_step)
    parser = Parser(plant, scene_graph)
    iiwa = AddIiwa(plant)

    sponge_shape = Box(0.08, 0.065, 0.065)
    sponge_instance = AddShape(plant, sponge_shape, "sponge_link", mass=0.2, mu=0.1, color=[1.0, 1.0, 0.0, 1.0])
    
    X_7B = RigidTransform(RotationMatrix(), [0, 0, 0.068])
    plant.WeldFrames(plant.GetFrameByName("iiwa_link_7", iiwa), plant.GetFrameByName("sponge_link", sponge_instance), X_7B)


    # Task Dependent Objects #

   
    plates_to_wipe = {}

    dirt_shape = pydrake.geometry.Box(0.005, 0.005, 0.005)

    number_dirt_patches = 1
    number_dirt_per_patch = 10

    dirt = {}
    dirt["num_patches"] = number_dirt_patches
    dirt["num_per_patch"] = number_dirt_per_patch

    for i in range(number_dirt_patches):
        plates_to_wipe[str(i)] = AddShape(plant, pydrake.geometry.Cylinder(0.1, 0.01), "plate_" + str(i), mass=0.5, mu=0.1, color=[1,1,1,1])
        for j in range(number_dirt_per_patch):
            dirt[str(i) + "_" + str(j)] = AddShape(plant, dirt_shape, "dirt_" + str(i) + "_" + str(j), mass=0.01, mu=0.1, color=[0,0,0,1])

    print(dirt)


    ###
    table_base = AddShape(plant, pydrake.geometry.Box(0.80, 0.80, 0.15), "table_to_wipe_base", mass=15, mu=0.1, color=[0.59,0.29,0.0,1.0])
    X_0B = RigidTransform(RotationMatrix(), [0.65, 0, 0.070])
    plant.WeldFrames(plant.world_frame(), plant.GetFrameByName("table_to_wipe_base", table_base), X_0B)
    plant.Finalize()

    # NOTE: Code taken from notebook setup by Prof. Russ Tedrake 
    # (https://deepnote.com/project/Manipulation-Station-p5S24ob_QPipeARm6RkGqQ/%2Fmanipulation_station.ipynb/#00003-c02ff458-1ca6-4059-9c48-a96ad5962c94)
    num_iiwa_positions = plant.num_positions(iiwa)
    iiwa_position = builder.AddSystem(PassThrough(num_iiwa_positions))
    builder.ExportOutput(iiwa_position.get_output_port(), "iiwa_position_command")

    # Export the iiwa "state" outputs.
    demux = builder.AddSystem(Demultiplexer(
        2 * num_iiwa_positions, num_iiwa_positions))
    builder.Connect(plant.get_state_output_port(iiwa), demux.get_input_port())
    builder.ExportOutput(demux.get_output_port(0), "iiwa_position_measured")
    builder.ExportOutput(demux.get_output_port(1), "iiwa_velocity_estimated")
    builder.ExportOutput(plant.get_state_output_port(iiwa), "iiwa_state_estimated")

    # Make the plant for the iiwa controller to use.
    controller_plant = MultibodyPlant(time_step=time_step)
    controller_iiwa = AddIiwa(controller_plant)
    controller_plant.Finalize()

    # Add the iiwa controller
    iiwa_controller = builder.AddSystem(
        InverseDynamicsController(
            controller_plant,
            kp=[100]*num_iiwa_positions,
            ki=[1]*num_iiwa_positions,
            kd=[20]*num_iiwa_positions,
            has_reference_acceleration=False))
    iiwa_controller.set_name("iiwa_controller")
    builder.Connect(
        plant.get_state_output_port(iiwa), iiwa_controller.get_input_port_estimated_state())

    # Add in the feed-forward torque
    adder = builder.AddSystem(Adder(2, num_iiwa_positions))
    builder.Connect(iiwa_controller.get_output_port_control(),
                    adder.get_input_port(0))
    # Use a PassThrough to make the port optional (it will provide zero values if not connected).
    torque_passthrough = builder.AddSystem(PassThrough([0]*num_iiwa_positions))
    builder.Connect(torque_passthrough.get_output_port(),
                    adder.get_input_port(1))
    # builder.ExportInput(torque_passthrough.get_input_port(), 
    #                     "iiwa_feedforward_torque")
    builder.Connect(adder.get_output_port(),
                    plant.get_actuation_input_port(iiwa))

    # Add discrete derivative to command velocities.
    desired_state_from_position = builder.AddSystem(
        StateInterpolatorWithDiscreteDerivative(
            num_iiwa_positions, time_step, suppress_initial_transient=False))
    desired_state_from_position.set_name("desired_state_from_position")
    builder.Connect(desired_state_from_position.get_output_port(),      
                    iiwa_controller.get_input_port_desired_state())
    builder.Connect(iiwa_position.get_output_port(), 
                    desired_state_from_position.get_input_port())

    # Export "cheat" ports.
    builder.ExportOutput(scene_graph.get_query_output_port(), "geometry_query")
    builder.ExportOutput(plant.get_contact_results_output_port(), 
                         "contact_results")
    builder.ExportOutput(plant.get_state_output_port(), 
                         "plant_continuous_state")

    #########################################################################################################################

    # Add the TorqueController for the iiwa
    logger = builder.AddSystem(VectorLogSink(7))
    hfp_controller = builder.AddSystem(TorqueController(controller_plant, controller_iiwa))
    hfp_controller.set_name("hfp_controller")
    builder.Connect(hfp_controller.get_output_port(0),
                    iiwa_position.get_input_port()) # This is the iiwa position input port
    builder.Connect(hfp_controller.get_output_port(1),
                    torque_passthrough.get_input_port()) # This is the iiwa feefroward torque port
    builder.Connect(hfp_controller.get_output_port(2),
                    logger.get_input_port(0))
    
    builder.Connect(demux.get_output_port(0), # This is the iiwa position measured port
                    hfp_controller.get_input_port(0))
    builder.Connect(demux.get_output_port(1), # This is the iiwa velocity estimated port
                    hfp_controller.get_input_port(1))

    visualizer = MeshcatVisualizerCpp.AddToBuilder(
        builder, 
        scene_graph, 
        meshcat,
        MeshcatVisualizerParams(delete_prefix_initialization_event=False))

    diagram = builder.Build()

    return diagram, plant, plates_to_wipe, dirt

    
wiping_bot_diagram, plant, plates_to_wipe, dirt = setup_wiping_bot()
context = wiping_bot_diagram.CreateDefaultContext()
wiping_bot_diagram.Publish(context)
simulator = Simulator(wiping_bot_diagram)

# Task Dependent Object Position Settings #

for i in range(dirt["num_patches"]):
    x_center = dirt_locs[i][0]
    y_center = dirt_locs[i][1]
    plant.SetPositions(plant.GetMyContextFromRoot(simulator.get_mutable_context()), 
        plates_to_wipe[str(i)],
        np.array([0.707, 0, 0, 0.707, x_center, y_center, 0.17]))

for i in range(dirt["num_patches"]):
    x_center = dirt_locs[i][0]
    y_center = dirt_locs[i][1]
    for j in range(dirt["num_per_patch"]):
        xy_pos = np.array([x_center, y_center]) + np.random.rand(2)*0.01 - 0.005
        dirt_location = np.array([0.707, 0, 0, 0.707, xy_pos[0], xy_pos[1], 0.25])
        plant.SetPositions(plant.GetMyContextFromRoot(simulator.get_mutable_context()), 
                                dirt[str(i) + "_" + str(j)],
                                dirt_location)

###
print(dirt_locs)

simulator.set_target_realtime_rate(0)
simulator.AdvanceTo(50.0)


[ 0.08125617  0.63398884  0.00366493 -1.48908194 -0.00308971  0.78448164
  0.08613692]
[ 0.08126166  0.63403166  0.00366518 -1.4891825  -0.00308992  0.78453462
  0.08614274]
Position: [0.58497136 0.04871193 0.36773386]
[ 0.08126714  0.63407447  0.00366543 -1.48928306 -0.00309013  0.78458759
  0.08614856]
Position: [0.58497136 0.04871193 0.36773386]
[ 0.08127263  0.63411728  0.00366568 -1.48938362 -0.00309033  0.78464057
  0.08615437]
[ 0.08127812  0.6341601   0.00366592 -1.48948418 -0.00309054  0.78469355
  0.08616019]
[ 0.08128361  0.63420291  0.00366617 -1.48958473 -0.00309075  0.78474652
  0.08616601]
Position: [0.58484907 0.04871533 0.3674905 ]
[ 0.08128909  0.63424573  0.00366642 -1.48968529 -0.00309096  0.7847995
  0.08617183]
[ 0.08129458  0.63428854  0.00366667 -1.48978585 -0.00309117  0.78485248
  0.08617764]
[ 0.08130007  0.63433135  0.00366691 -1.48988641 -0.00309138  0.78490545
  0.08618346]
Position: [0.58484907 0.04871533 0.3674905 ]
[ 0.08130556  0.63437417  0.00366716 -

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=e1f7a4de-0698-4427-bcc0-40d4d181e518' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>