In [1]:
import os

from pydrake.geometry import StartMeshcat
from pydrake.multibody import inverse_kinematics
from manipulation.station import MakeHardwareStation, load_scenario
from manipulation.scenarios import AddIiwaDifferentialIK, ExtractBodyPose, MakeManipulationStation
from manipulation.meshcat_utils import AddMeshcatTriad, PublishPositionTrajectory
from pydrake.all import (
    AddMultibodyPlantSceneGraph,
    Simulator,
    InverseKinematics,
    ModelInstanceIndex,
    MultibodyPlant,
    MeshcatVisualizer,
    MeshcatVisualizerParams,
    RotationMatrix,
    Solve,
    Trajectory,
    TrajectorySource,
    KinematicTrajectoryOptimization,
    BsplineTrajectory,
    PositionConstraint,
    OrientationConstraint,
    Role,
    MinimumDistanceLowerBoundConstraint
)
from pydrake.visualization import MeshcatPoseSliders
from manipulation.meshcat_utils import WsgButton

from utils import *

In [2]:
meshcat = StartMeshcat()

INFO:drake:Meshcat listening for connections at http://localhost:7001


In [3]:
meshcat.Delete()

# Visualize the tree
visualize_model(meshcat)

# Load scenario data
iiwa_x, iiwa_y, iiwa_z = [0.3, -0.5, 0.2]
iiwa_R, iiwa_P, iiwa_Y = [0, 0, 90]
q_default = np.array([-0.2, 0.79, 0.32, -1.76, -0.36, 0.64, -0.73])
scenario_data = make_scenario_data(tree_model_name, iiwa_R, iiwa_P, iiwa_Y, iiwa_x, iiwa_y, iiwa_z, q_default)

In [22]:
meshcat.Delete()
builder = DiagramBuilder()

station = builder.AddSystem(MakeManipulationStation(model_directives=scenario_data, time_step=0.001))
visualizer = MeshcatVisualizer.AddToBuilder(
    builder,
    station.GetOutputPort("query_object"),
    meshcat,
    MeshcatVisualizerParams(delete_on_initialization_event=False),
)

initial_run = 3

# From default joint positions above. Should remove redunduancy by just calling SetDefaultPositions on the plant

initial_trajectory = PiecewisePolynomial.FirstOrderHold([0., initial_run], np.array([q_default, q_default]).T)
q_traj_system = builder.AddSystem(TrajectorySource(initial_trajectory))
builder.Connect(
    q_traj_system.get_output_port(), station.GetInputPort("iiwa_position")
)
initial_g_traj = PiecewisePolynomial.ZeroOrderHold([0., initial_run], np.array([[0.1], [0.1]]).T)
g_traj_system = builder.AddSystem(TrajectorySource(initial_g_traj))
builder.Connect(
    g_traj_system.get_output_port(), station.GetInputPort("wsg_position")
)

plant = station.GetSubsystemByName("plant")

diagram = builder.Build()

diagram_context = diagram.CreateDefaultContext()
station_context = station.GetMyContextFromRoot(diagram_context)
plant_context = plant.GetMyContextFromRoot(diagram_context)

simulator = Simulator(diagram, diagram_context)
#simulator.AdvanceTo(10)

port = station.GetInputPort("iiwa_position")
port.Eval(station_context)

q_traj_system_context = q_traj_system.GetMyContextFromRoot(diagram_context)

meshcat.StartRecording()
simulator.AdvanceTo(initial_run)
#meshcat.PublishRecording()

#port = q_traj_system.GetOutputPort("y0")
#print(port.Eval(q_traj_system_context))

iiwa_model = plant.GetModelInstanceByName("iiwa")
q0 = plant.GetPositions(plant_context, iiwa_model)

def get_apple_body_names():
    tree_index = plant.GetModelInstanceByName(tree_model_name)
    apple_body_names = []
    for i in plant.GetBodyIndices(tree_index):
        body_name = plant.get_body(i).name()
        if body_name.startswith('apple_'):
            apple_body_names.append(body_name)
    return apple_body_names

name = ""

def find_closest_apples_to_iiwa():
    iiwa_body = plant.GetBodyByName("left_finger")
    X_WI = plant.EvalBodyPoseInWorld(plant_context, iiwa_body)

    apple_body_names = get_apple_body_names()
    min_distance = np.inf
    apples = []
    for apple_body_name in apple_body_names:
        apple_body = plant.GetBodyByName(apple_body_name)
        X_WA = plant.EvalBodyPoseInWorld(plant_context, apple_body)
        X_IA = X_WI.inverse() @ X_WA
        distance = np.linalg.norm(X_IA.translation())
        apples.append((apple_body_name, X_IA, distance))
        '''
        if distance < min_distance:
            min_distance = distance
            best_apple = apple_body
            name = apple_body_name
    return best_apple
        '''
    apples = sorted(apples, key=lambda apple: apple[2])
    return apples

apples = find_closest_apples_to_iiwa()
#best_apple_name = apples[1][0]
#best_apple_body = plant.GetBodyByName(best_apple_name)

#X_WA = plant.EvalBodyPoseInWorld(plant_context, best_apple_body)

#AddMeshcatTriad(meshcat, "apple", length=0.25, radius=0.01, X_PT=X_WA)
#iiwa_body = plant.GetBodyByName("left_finger")
#X_WI = plant.EvalBodyPoseInWorld(plant_context, iiwa_body)

#frame = plant.GetFrameByName("wsg_on_iiwa")
#X_WI = frame.CalcPoseInWorld(plant_context)

#AddMeshcatTriad(meshcat, "iiwa", length=0.25, radius=0.01, X_PT=X_WI)

In [23]:
def is_pose_feasible(X_WTarget, q_home = None):

    ITERS = 20

    if q_home is None:
        print("using default q_home")
        q_home = q_default
    
    plant, _, _ = CreateIiwaControllerPlant(q_home, iiwa_R, iiwa_P, iiwa_Y, iiwa_x, iiwa_y, iiwa_z)
    q_lower_limit = plant.GetPositionLowerLimits()
    q_upper_limit = plant.GetPositionUpperLimits()
    
    wsg = plant.GetModelInstanceByName("Schunk_Gripper")
    gripper_frame = plant.GetFrameByName("body", wsg)

    q_guess = q_home

    for i in range(ITERS):

        
        
        plant_context = plant.CreateDefaultContext()

        ik = InverseKinematics(plant, plant_context)
        ik.AddPositionConstraint(
            gripper_frame,
            [0, 0, 0],
            plant.world_frame(),
            X_WTarget.translation(),
            X_WTarget.translation(),
        )
        ik.AddOrientationConstraint(
            gripper_frame,
            RotationMatrix(),
            plant.world_frame(),
            X_WTarget.rotation(),
            0.0,
        )
        prog = ik.get_mutable_prog()
        q = ik.q()
        prog.AddQuadraticErrorCost(np.identity(len(q)), q_home, q)
        prog.SetInitialGuess(q, q_guess)
        result = Solve(ik.prog())
        if result.is_success():
            print(f"\tFound feasible end q in {i} iterations")
            return True, result.GetSolution()

        q_guess = np.random.uniform(q_lower_limit, q_upper_limit)

    print(f"\tFailed to fine feasible end q")
    return False, None


In [24]:
def find_trajectory(q_start, X_WTarget, q_home = None, control_points = 10):

    if q_home is None:
        q_home = q_default

    feasible, q_end = is_pose_feasible(X_WTarget, q_home)
    if not feasible:
        return False, None

    visualize = False

    q_knots = []
    local_plant, visualizer, diagram = CreateIiwaControllerPlant(q_home, iiwa_R, iiwa_P, iiwa_Y, iiwa_x, iiwa_y, iiwa_z)

    context = diagram.CreateDefaultContext()
    local_plant_context = local_plant.GetMyContextFromRoot(context)

    world_frame = local_plant.world_frame()

    wsg = local_plant.GetModelInstanceByName("Schunk_Gripper")

    num_q = local_plant.num_positions()
    #q0 = local_plant.GetPositions(local_plant_context)
    gripper_frame = local_plant.GetFrameByName("body", wsg)

    trajopt = KinematicTrajectoryOptimization(local_plant.num_positions(), control_points)
    prog = trajopt.get_mutable_prog()

    # Todo should I attempt better or different guesses
    
    q_guess = np.linspace(q_start.squeeze(), q_end, trajopt.num_control_points()).T
    path_guess = BsplineTrajectory(trajopt.basis(), q_guess)
    trajopt.SetInitialGuess(path_guess)

    # Uncomment this to see the initial guess:
    # PublishPositionTrajectory(path_guess, context, plant, visualizer)

    
    trajopt.AddDurationCost(1.0)
    trajopt.AddPathLengthCost(1.0)
    trajopt.AddPositionBounds(
        local_plant.GetPositionLowerLimits(), local_plant.GetPositionUpperLimits()
    )
    trajopt.AddVelocityBounds(
        local_plant.GetVelocityLowerLimits()/5, local_plant.GetVelocityUpperLimits()/5
    )
    
    trajopt.AddDurationConstraint(1, 10)

    # start constraint
    trajopt.AddPathPositionConstraint(lb=q_start, ub=q_start, s=0)

    # goal constraint
    goal_position_constraint = PositionConstraint(
        local_plant,
        local_plant.world_frame(),
        X_WTarget.translation(),
        X_WTarget.translation(),
        gripper_frame,
        [0, 0.1, 0],
        local_plant_context,
    )
    trajopt.AddPathPositionConstraint(goal_position_constraint, 1)
    goal_orientation_constraint = OrientationConstraint(
        local_plant,
        local_plant.world_frame(),
        X_WTarget.rotation(),
        gripper_frame,
        RotationMatrix(),
        np.pi/180 * 1,
        local_plant_context,
    )
    trajopt.AddPathPositionConstraint(goal_orientation_constraint, 1)

    prog.AddQuadraticErrorCost(
        np.eye(num_q), q_home, trajopt.control_points()[:, -1]
    )

    # start and end with zero velocity
    trajopt.AddPathVelocityConstraint(
        np.zeros((num_q, 1)), np.zeros((num_q, 1)), 0
    )
    trajopt.AddPathVelocityConstraint(
        np.zeros((num_q, 1)), np.zeros((num_q, 1)), 1
    )

    # collision constraints
    '''
    collision_constraint = MinimumDistanceLowerBoundConstraint(
        plant, 0.001, plant_context, None, 0.01
    )
    evaluate_at_s = np.linspace(0, 1, 50)
    for s in evaluate_at_s:
        trajopt.AddPathPositionConstraint(collision_constraint, s)
    '''

    result = Solve(prog)

    trajectory = trajopt.ReconstructTrajectory(result)
    
    if (visualize):
        PublishPositionTrajectory(
            trajectory, context, local_plant, visualizer
        )
    
    if not result.is_success():
        print("\tTrajectory optimization failed")
        print("\t" + result.get_solver_id().name())
        return False, trajectory
    else:
        print("\tTrajectory optimization success")
        return True, trajectory

    
    
    

In [25]:
def trajectories_to_pluck_apple(plant, plant_context, apple_name):

    apple_body = plant.GetBodyByName(apple_name)
    X_WA = plant.EvalBodyPoseInWorld(plant_context, apple_body)
    
    def convertApplePoseToGrasp(X_WA):
        #X_AG = RigidTransform(RotationMatrix.MakeXRotation(-np.pi/2), [0, 0, 0.03])
        X_AG = RigidTransform(RotationMatrix.MakeXRotation(-np.pi/2), [0, 0, 0.03])
        X_WG = X_WA @ X_AG
        #rotation = RigidTransform(RotationMatrix.MakeYRotation(np.pi/4 * 7))
        #X_WG = X_WG @ rotation
        return X_WG
    
    def convertGraspPoseToPreGrasp(X_WGrasp):
        X_GPregrasp = RigidTransform([0, -0.1, 0])
        X_WPregrasp = X_WGrasp @ X_GPregrasp
        return X_WPregrasp
    
    def returnPose():
        X_WReturn = RigidTransform(RotationMatrix.MakeXRotation(-np.pi/2), [0.2, 0, 0.4])
        rotation = RigidTransform(RotationMatrix.MakeYRotation(np.pi/2))
        X_WReturn = X_WReturn @ rotation
        return X_WReturn
    
    X_WGrasp = convertApplePoseToGrasp(X_WA)
    X_WPregrasp = convertGraspPoseToPreGrasp(X_WGrasp)
    X_WPluck = X_WPregrasp
    X_WReturn = returnPose()
    
    #AddMeshcatTriad(meshcat, "grasp", length=0.25, radius=0.01, X_PT=X_WGrasp)
    AddMeshcatTriad(meshcat, "return", length=0.25, radius=0.01, X_PT=X_WReturn)
    
    iiwa_model = plant.GetModelInstanceByName("iiwa")
    q0 = plant.GetPositions(plant_context, iiwa_model)

    # Pregrasp

    overall_success = False
    while True: 
    
        print("Pre")
        success, traj_pregrasp = find_trajectory(q_default, X_WPregrasp)
        if not success:
            break
        q_pregrasp = traj_pregrasp.value(traj_pregrasp.end_time())
        
        # Grasp
        print("Grasp")
        success, traj_grasp = find_trajectory(q_pregrasp, X_WGrasp, q_pregrasp, 4)
        if not success:
            break
        q_grasp = traj_grasp.value(traj_grasp.end_time())
        AddMeshcatTriad(meshcat, "grasp", length=0.25, radius=0.01, X_PT=X_WGrasp)
        
        # Hold
        traj_hold = PiecewisePolynomial.FirstOrderHold([0., 0.5], np.array([q_grasp.squeeze(), q_grasp.squeeze()]).T)
        q_hold = traj_hold.value(traj_hold.end_time())
        
        # Pluck
        print("Pluck")
        success, traj_pluck = find_trajectory(q_hold, X_WPluck, q_hold, 4)
        if not success:
            break
        q_pluck = traj_pluck.value(traj_pluck.end_time())
        
        # Return
        print("Return")
        success, traj_return = find_trajectory(q_pluck, X_WReturn)
        if not success:
            break
        q_return = traj_return.value(traj_return.end_time())
        AddMeshcatTriad(meshcat, "return", length=0.25, radius=0.01, X_PT=X_WReturn)
    
        # Drop
        traj_drop = PiecewisePolynomial.FirstOrderHold([0., 1.], np.array([q_return.squeeze(), q_return.squeeze()]).T)

        overall_success = True
        print("Found overall success\n")
        break

    if not overall_success:
        print("Abandoning apple\n")
        return False, None, None
    
    trajectories = [traj_pregrasp, traj_grasp, traj_hold, traj_pluck, traj_return, traj_drop]
    grasp_states = [False, False, True, True, True, False]

    combined_trajectory, intervals = concatenate_trajectories(initial_trajectory, trajectories)
    combined_gripper_trajectory = generate_gripper_trajectories(initial_g_traj, grasp_states, intervals)

    return True, combined_trajectory, combined_gripper_trajectory


#best_apple_name = apples[7][0]
#best_apple_name = apples[5][0]
sucess, combined_trajectory, combined_gripper_trajector = False, None, None
for apple in apples[0:5]:
    apple_name = apple[0]
    success, combined_trajectory, combined_gripper_trajectory = trajectories_to_pluck_apple(plant, plant_context, apple_name)
    if success:
        break


Pre
	Found feasible end q in 0 iterations
	Trajectory optimization success
Grasp
	Found feasible end q in 0 iterations
	Trajectory optimization success
Pluck
	Found feasible end q in 0 iterations
	Trajectory optimization success
Return
	Found feasible end q in 0 iterations
	Trajectory optimization success
Found overall success

[3.0, 4.024543707115298]
[4.024543707115298, 5.024543707115298]
[5.024543707115298, 5.524543707115298]
[5.524543707115298, 6.524543707115298]
[6.524543707115298, 7.524543707115298]
[7.524543707115298, 8.5245437071153]


In [26]:
end_t = combined_trajectory.end_time()
q_traj_system.UpdateTrajectory(combined_trajectory)
g_traj_system.UpdateTrajectory(combined_gripper_trajectory)
#meshcat.StartRecording()
simulator.AdvanceTo(end_t)
meshcat.PublishRecording()

In [9]:
def create_q_knots(q0, X_WA, visualize = False):
    """Convert end-effector pose list to joint position list using series of
    InverseKinematics problems. Note that q is 9-dimensional because the last 2 dimensions
    contain gripper joints, but these should not matter to the constraints.
    @param: pose_lst (python list): post_lst[i] contains keyframe X_WG at index i.
    @return: q_knots (python_list): q_knots[i] contains IK solution that will give f(q_knots[i]) \approx pose_lst[i].
    """
    q_knots = []
    plant, visualizer, diagram = CreateIiwaControllerPlant(visualize)
    context = diagram.CreateDefaultContext()
    plant_context = plant.GetMyContextFromRoot(context)
    world_frame = plant.world_frame()
    gripper_frame = plant.GetFrameByName("body")
    q_nominal = np.array(
        [0.0, 0.6, 0.0, -1.75, 0.0, 1.0, 0.0]
    )  # nominal joint angles for joint-centering.

    def AddOrientationConstraint(ik, R_WG, bounds):
        """Add orientation constraint to the ik problem. Implements an inequality
        constraint where the axis-angle difference between f_R(q) and R_WG must be
        within bounds. Can be translated to:
        ik.prog().AddBoundingBoxConstraint(angle_diff(f_R(q), R_WG), -bounds, bounds)
        """
        ik.AddOrientationConstraint(
            frameAbar=world_frame,
            R_AbarA=R_WG,
            frameBbar=gripper_frame,
            R_BbarB=RotationMatrix(),
            theta_bound=bounds,
        )

    def AddPositionConstraint(ik, p_WG_lower, p_WG_upper):
        """Add position constraint to the ik problem. Implements an inequality
        constraint where f_p(q) must lie between p_WG_lower and p_WG_upper. Can be
        translated to
        ik.prog().AddBoundingBoxConstraint(f_p(q), p_WG_lower, p_WG_upper)
        """
        ik.AddPositionConstraint(
            frameA=world_frame,
            frameB=gripper_frame,
            p_BQ=np.zeros(3),
            p_AQ_lower=p_WG_lower,
            p_AQ_upper=p_WG_upper,
        )

    pose_lst = [X_WA]
    q_nominal = q0
    for i in range(len(pose_lst)):
        ik = inverse_kinematics.InverseKinematics(plant)
        q_variables = ik.q()  # Get variables for MathematicalProgram
        prog = ik.prog()  # Get MathematicalProgram

        #### Modify here ###############################

        # Initial Guess
        if i == 0:
            prog.SetInitialGuess(q_variables, q_nominal)
        else:
            prog.SetInitialGuess(q_variables, q_knots[-1])
   
        # L2 Squared Cost
        prog.AddQuadraticErrorCost(np.eye(len(q_variables)), q_nominal, q_variables)

        X_WG = pose_lst[i]

        # Position Constraint
        p_WG = X_WG.translation()
        d = 0.01
        p_WG_lower = p_WG - d
        p_WG_upper = p_WG + d
        AddPositionConstraint(ik, p_WG_lower, p_WG_upper)

        # Rotation Constraint
        R_WG = X_WG.rotation()
        AddOrientationConstraint(ik, R_WG, np.pi/180)

        ################################################

        result = Solve(prog)

        assert result.is_success()
        solution = result.GetSolution(q_variables)
        q_knots.append(solution)

    q_knots = np.array(q_knots)
    print(q_knots.shape)
    q_knots = np.vstack([q0, q_knots])
    trajectory = PiecewisePolynomial.FirstOrderHold([0, 2], q_knots[:, 0:7].T)
    if (visualize):  
        PublishPositionTrajectory(
            trajectory, context, plant, visualizer
        )
    
    return trajectory

#gripper_frame = self.plant.GetFrameByName("body")
gripper_frame = plant.GetFrameByName("iiwa_link_7")
world_frame = plant.world_frame()

iiwa_model = plant.GetModelInstanceByName("iiwa")
q0 = plant.GetPositions(plant_context, iiwa_model)
new_traj = create_q_knots(q0, X_WA, True)


NameError: name 'X_WA' is not defined

In [None]:
# Graveyard

iiwa_plant = plant.GetModelInstanceByName("iiwa")
q0 = plant.GetPositions(plant_context)
n = plant.num_model_instances()
for i in range(n):
    idx = ModelInstanceIndex(i)
    model_name = plant.GetModelInstanceName(idx)
    print(model_name)

#######


#differential_ik.GetInputPort("X_WE_desired").FixValue(differential_context, X_WA)
print(X_WA)

####

simulator.set_target_realtime_rate(1)
meshcat.AddButton("Stop Simulation", "Escape")
print("Press Escape to stop the simulation")
while meshcat.GetButtonClicks("Stop Simulation") < 1:
    print("Simulating")
    simulator.AdvanceTo(simulator.get_context().get_time() + 2)
meshcat.DeleteButton("Stop Simulation")

#####

meshcat.StartRecording()
simulator.AdvanceTo(4.0)
meshcat.PublishRecording()

###
plant_robot.GetJointByName("iiwa_joint_7").set_position_limits(
            [-np.inf], [np.inf]
)
###

# Old Hardware Station Approach
'''
scenario = load_scenario(data=scenario_data)
station = builder.AddSystem(MakeHardwareStation(scenario, meshcat))
controller_plant = station.GetSubsystemByName(
        "iiwa.controller"
).get_multibody_plant_for_control()
differential_ik = AddIiwaDifferentialIK(
    builder,
    controller_plant,
    frame=controller_plant.GetFrameByName("iiwa_link_7"), # should be "body" for wsg i think
)
# Nothing is connected to controller.get_input_port(0)
builder.Connect(
    differential_ik.get_output_port(),
    station.GetInputPort("iiwa.position"),
)
builder.Connect(
    station.GetOutputPort("iiwa.state_estimated"),
    differential_ik.GetInputPort("robot_state"),
)
'''

###
'''
# Teleop
meshcat.DeleteAddedControls()
teleop = builder.AddSystem(
    MeshcatPoseSliders(
        meshcat,
        lower_limit=[0, -np.pi, -np.pi, -0.6, -1, 0],
        upper_limit=[2 * np.pi, np.pi, np.pi, 0.8, 1, 1.1],       
    )
)
builder.Connect(
    teleop.get_output_port(), differential_ik.GetInputPort("X_WE_desired")
)
plant = station.GetSubsystemByName("plant")
ee_pose = builder.AddSystem(
    ExtractBodyPose(
        station.GetOutputPort("body_poses"),
        plant.GetBodyByName("iiwa_link_7").index(),
    )
)
builder.Connect(
    station.GetOutputPort("body_poses"), ee_pose.get_input_port()
)
builder.Connect(ee_pose.get_output_port(), teleop.get_input_port())

wsg_teleop = builder.AddSystem(WsgButton(meshcat))
builder.Connect(
    wsg_teleop.get_output_port(0), station.GetInputPort("wsg.position")
)
'''

#station.GetInputPort("wsg_position").FixValue(plant_context, [0.1, 0.1])

#differential_context = differential_ik.GetMyContextFromRoot(diagram_context)
#fixed_value = differential_ik.GetInputPort("X_WE_desired").FixValue(differential_context, RigidTransform([0, 0, 0]))

#plant.get_actuation_input_port().FixValue(plant_context, np.zeros(9))

####

gripper_frame = plant.GetFrameByName("iiwa_link_7")
world_frame = plant.world_frame()

def get_X_WG():
    X_WG = plant.CalcRelativeTransform(
        plant_context, frame_A=world_frame, frame_B=gripper_frame
    )
    return X_WG