In [None]:
import numpy as np
from pydrake.all import (
    DirectCollocation,
    DiagramBuilder,
    Simulator,
    SnoptSolver,
    PiecewisePolynomial,
    LeafSystem_,
    TemplateSystem,
    BasicVector_,
    SolverOptions,
    CommonSolverOption,Variable,
)
import matplotlib.pyplot as plt
from robot_arm import ArmDynamics, ArmPlant_, Arm2DVisualizer

from IPython.display import HTML, display

In [None]:
@TemplateSystem.define("DynamicActuator_")
def DynamicActuator_(T):
    class Impl(LeafSystem_[T]):
        def _construct(self, arm_dynamics, motor_inertia, torque_constant, viscous_friction, resistance, inductance, converter=None):
            LeafSystem_[T].__init__(self, converter)
            self.arm_dynamics = arm_dynamics
            self.motor_inertia = motor_inertia 
            self.torque_constant = torque_constant
            self.back_emf_constant = torque_constant # assumes its equal to the torque constant
            self.viscous_friction = viscous_friction
            self.resistance = resistance
            self.inductance = inductance

            self.old_torques = np.zeros(2)
            self.V = self.DeclareVectorInputPort("voltage", BasicVector_[T](2))
            self.qv = self.DeclareVectorInputPort("qv", BasicVector_[T](6))
            #self.ddtheta = self.DeclareVectorInputPort("ddq", BasicVector_[T](1))
            #self.DeclareVectorOutputPort("torque", 1, self.CalcTorque, {self.xcdot_ticket()}) # didn't work
            self.DeclareVectorOutputPort("torque", 2, self.CalcTorque, {self.all_state_ticket()})

        def CalcTorque(self, context, output):
            V = self.V.Eval(context)
            qv = self.qv.Eval(context)
            
            dtheta = qv[(3,4)]
            dqv = self.arm_dynamics(qv, self.old_torques[0], self.old_torques[1])
            ddtheta = dqv[(3,4)]
            # V = R*i + L*i + k_b*omega
            current = (V - self.back_emf_constant * dtheta) / (self.resistance + self.inductance)

            # J*omega_dot = k_t*i - v*oemga + Tau_ext
            torque = self.motor_inertia * ddtheta - self.torque_constant * current + self.viscous_friction * dtheta
            
            self.old_torques = torque[:]
            
            output.SetFromVector(torque)

        def _construct_copy(
            self,
            other,
            converter=None,
        ):
            Impl._construct(self, other.arm_dynamics, other.motor_inertia, other.torque_constant, other.viscous_friction, other.resistance, other.inductance, converter=converter)

    return Impl

def plot_traj(x_traj, u_traj):
    t = np.linspace(x_traj.start_time(), x_traj.end_time(), int(1e4))

    x_log = np.array([x_traj.value(i) for i in t]).T
    u_log = np.array([u_traj.value(i) for i in t]).T

    velocities = []
    for q in x_log.T:
        velocities.append(arm_dynamics.EvalKeypoint(arm_dynamics.drE, q).T[0])
    velocities = np.array(velocities) 

    fig, ax = plt.subplots(2, 2, figsize=(15,12), facecolor='white')
    # joint angles
    ax[0,0].plot(t, x_log.T[:,0], label='q1')
    ax[0,0].plot(t, x_log.T[:,1], label='q2')
    ax[0,0].plot(t, x_log.T[:,2], label='q3')
    ax[0,0].set_title("Joint Positions")
    ax[0,0].set_xlabel("t (s)")
    ax[0,0].set_ylabel("theta (rad)")
    ax[0,0].legend()

    #joint velocities
    ax[0,1].plot(t, x_log.T[:,3], label='dq1')
    ax[0,1].plot(t, x_log.T[:,4], label='dq2')
    ax[0,1].plot(t, x_log.T[:,5], label='dq3')
    ax[0,1].set_title("Joint Velocities")
    ax[0,1].set_xlabel("t (s)")
    ax[0,1].set_ylabel("dtheta (rad/s)")
    ax[0,1].legend()

    ax[1,0].plot(t, u_log.T[:,0], label='tau1')
    ax[1,0].plot(t, u_log.T[:,1], label='tau2')
    ax[1,0].set_title("Motor Voltages")
    ax[1,0].set_xlabel("t (s)")
    ax[1,0].set_ylabel("Voltage (V)")
    ax[1,0].legend()

    ax[1,1].plot(t, velocities[:,0], label='dx')
    ax[1,1].plot(t, velocities[:,1], label='dy')
    #ax[1,1].plot(t, velocities[:,2], label='dz')
    #ax[1,1].plot(t, velocities, label='dy')
    ax[1,1].plot(t, np.sqrt(velocities[:,0]**2 +  velocities[:,2]**2), label='sqrt(dx**2+dy**2)')
    ax[1,1].set_title("Endeffector velocity")
    ax[1,1].set_xlabel("t (s)")
    ax[1,1].set_ylabel("speed (m/s)")
    ax[1,1].legend()

    return fig


In [None]:
arm_dynamics = ArmDynamics(0.2,0.1, 2*50, simplify=False)
ArmPlant = ArmPlant_[None]  # Default instantiation

In [None]:
def visualize_arm_trajectory(x_traj, frame_rate=1 / 60.0):
    builder = DiagramBuilder()
    plant = builder.AddSystem(ArmPlant(arm_dynamics))
    # Setup visualization
    visualizer = builder.AddSystem(Arm2DVisualizer(arm_dynamics, show=False))
    builder.Connect(plant.get_output_port(), visualizer.get_input_port())

    diagram = builder.Build()
    print("Finished Building")
    print(f"Frame rate is set to: {1/frame_rate} fps")

    # Set up a simulator to run this diagram
    simulator = Simulator(diagram)
    context = simulator.get_mutable_context()

    visualizer.start_recording()
    for t in np.hstack((np.arange(x_traj.start_time(),
                                x_traj.end_time(),
                                frame_rate),
                        x_traj.end_time())):
        context.SetTime(t)
        x = x_traj.value(t)
        context.SetContinuousState(x)
        diagram.Publish(context)

    ani = visualizer.get_recording_as_animation()
    display(HTML(ani.to_jshtml()))

# Trajectory Optimization

In [None]:
options = SolverOptions()
options.SetOption(CommonSolverOption.kPrintFileName, "./debugOutput.txt")
snopt= SnoptSolver().solver_id()
options.SetOption(snopt, 'Iterations Limits', 1e6) # 1e5
options.SetOption(snopt, 'Major Feasibility Tolerance', 1e-5) #1e-6
options.SetOption(snopt, 'Function Precision', 1e-6) 
options.SetOption(snopt, 'Major Optimality Tolerance', np.sqrt(1e-6)) #2e-6 # sqrt of function precision
options.SetOption(snopt, 'Major Iterations Limit', 1000) 
#options.SetOption(snopt, 'Superbasics limit', 2000) #209

In [None]:
J = 0.0035/18.75**2
k_t = 0.18
nu = 0.0005
R = 2.0
L = 0

In [None]:
builder = DiagramBuilder()
arm = builder.AddSystem(ArmPlant(arm_dynamics))
motors = builder.AddSystem(DynamicActuator_[None](arm_dynamics, J, k_t, nu, R, L))
builder.Connect(arm.get_output_port(), motors.GetInputPort('qv'))
builder.Connect(motors.get_output_port(), arm.get_input_port())
builder.ExportInput(motors.GetInputPort('voltage'), 'input')

diagram = builder.Build()
context = diagram.CreateDefaultContext()
diagram.Publish(context)

In [None]:
x_traj_list, u_traj_list = [], []
x_traj, u_traj = None, None
best_cost = np.inf
#duration_bounds = [0.1,3]
duration_bounds = [0.1,1]
print("Start Iterations")
#for N in [11, 21, 41, 81, 101, 121, 141, 161, 181, 201]:#, 321]:
#for N in [11,21,41,81]:
for N in [11,21,41]:
    arm_context = arm.CreateDefaultContext()
    print("-----------------------------------")
    print(f"min_dt: {duration_bounds[0]/(N-1)}")
    print(f"max_dt: {duration_bounds[1]/(N-1)}")
    print(f"N: {N}")
    dircol = DirectCollocation(arm,
                            arm_context,
                            num_time_samples=N,
                            minimum_timestep=duration_bounds[0]/N,
                            maximum_timestep=duration_bounds[1]/N)
 
    prog = dircol.prog()
    dircol.AddEqualTimeIntervalsConstraints()
    
    #actuation_limit = 2.0  # N*m.
    actuation_limit = 12.0  # V.
    u = dircol.input()

    for i in range(len(u)):
        dircol.AddConstraintToAllKnotPoints(-actuation_limit <= u[i])
        dircol.AddConstraintToAllKnotPoints(u[i] <= actuation_limit)
    
    # intial torque is 0
    ui = dircol.input(0)
    for i in range(len(ui)):
        dircol.AddConstraintToAllKnotPoints(ui[i] == 0)
        

    # joint position constraints 
    s = dircol.state()
    dircol.AddConstraintToAllKnotPoints(s[0] <= arm.PositionUpperLimit[0])
    dircol.AddConstraintToAllKnotPoints(s[1] <= arm.PositionUpperLimit[1])
    dircol.AddConstraintToAllKnotPoints(s[2] <= arm.PositionUpperLimit[2])
    dircol.AddConstraintToAllKnotPoints(s[0] >= arm.PositionLowerLimit[0])
    dircol.AddConstraintToAllKnotPoints(s[1] >= arm.PositionLowerLimit[1])
    dircol.AddConstraintToAllKnotPoints(s[2] >= arm.PositionLowerLimit[2])
    
        # joint velocity constraints 
    #max_rad_s = 250
    #dircol.AddConstraintToAllKnotPoints(s[3] <= max_rad_s)
    #dircol.AddConstraintToAllKnotPoints(s[4] <= max_rad_s)
    #dircol.AddConstraintToAllKnotPoints(s[5] <= max_rad_s)
    #dircol.AddConstraintToAllKnotPoints(s[3] >= -max_rad_s)
    #dircol.AddConstraintToAllKnotPoints(s[4] >= -max_rad_s)
    #dircol.AddConstraintToAllKnotPoints(s[5] >= -max_rad_s)
    
    #initial joint velocity is 0
    qi = dircol.initial_state()
    dircol.AddConstraintToAllKnotPoints(qi[3] == 0)
    dircol.AddConstraintToAllKnotPoints(qi[4] == 0)
    dircol.AddConstraintToAllKnotPoints(qi[5] == 0)

    # initial joint angle of the wrist joint is 0
    prog.AddConstraint(qi[2] == 0)
    
    qf = dircol.final_state()
    prog.AddCost(arm_dynamics.EvalCost, vars = qf) 
    #dircol.AddFinalCost(cost, qf)

    if not x_traj is None:
        dircol.SetInitialTrajectory(PiecewisePolynomial(), x_traj)
        #for n in range(N):
        #    prog.SetInitialGuess(dircol.state(n), initial_x_traj.value(duration_bounds[1]*n/(N-1)))
    else:
        initial_x_traj = PiecewisePolynomial.FirstOrderHold([0., duration_bounds[1]], np.hstack([np.array([-3.14,-1.57,0, 0, 0,0])[:,None],
                                                            np.array([3.14,1.57,1.57, 10,10,10])[:,None]]))
        dircol.SetInitialTrajectory(PiecewisePolynomial(), initial_x_traj)
    
        for n in range(N):
            prog.SetInitialGuess(dircol.state(n), initial_x_traj.value(duration_bounds[1]*n/(N-1)))
    

    solver = SnoptSolver()

    result = solver.Solve(prog, solver_options=options)
    if not result.is_success():
        print("failed")
        break
    
    print(f"success: {result.is_success()}")
    if result.is_success():
        x_traj_list.append(dircol.ReconstructStateTrajectory(result))
        u_traj_list.append(dircol.ReconstructInputTrajectory(result))
        
        visualize_arm_trajectory(x_traj_list[-1], frame_rate=1/460)
        plot_traj(x_traj_list[-1], u_traj_list[-1])
        plt.show()
        
        qf_new = x_traj_list[-1].value(x_traj_list[-1].end_time())
        new_cost = arm_dynamics.GetCost(qf_new)
        
        print(f'Old cost: {best_cost}, New cost: {new_cost}')
        if new_cost <= best_cost:
            print('Iteration was improving')
            x_traj = x_traj_list[-1]
            u_traj = u_traj_list[-1]

### Visualize traj opt results

In [None]:
visualize_arm_trajectory(x_traj, frame_rate=1/460.)

In [None]:
plot_traj(x_traj, u_traj)