In [1]:
from pydrake.all import (MathematicalProgram, Solve, MonomialBasis,
                         DiagramBuilder, Evaluate, LogVectorOutput, Simulator,
                         SymbolicVectorSystem, Variable, ToLatex, Polynomial,
                         VectorSystem, eq, ge, le, Formula, Expression, Evaluate,
                         LeafSystem, AbstractValue,
                         )

from pydrake.all import (PiecewisePolynomial, ModelInstanceIndex,
    RotationMatrix, RigidTransform, Rgba, Box, Sphere, BaseField,
    Evaluate, Fields, PointCloud, MeshcatAnimation)

from IPython.display import clear_output

import os
import copy
import time
import numpy as np
from typing import Tuple
import matplotlib.pyplot as plt
from IPython.display import Markdown, display
from dataclasses import dataclass
from dccm_quasistatic.utils.math_utils import (create_square_symmetric_matrix_from_lower_tri_array,
                                               get_n_lower_tri_from_matrix_dim,
                                               matrix_inverse)
from dccm_quasistatic.utils.sample_generator import (SampleGenerator, SampleGeneratorParams)
from dccm_quasistatic.controller.dccm_params import DCCMParams
from dccm_quasistatic.controller_synthesizer.dccm_synthesizer import DCCMSynthesizer
 
from qsim.parser import (
    QuasistaticParser,
    QuasistaticSystemBackend,
    GradientMode,
)

from qsim.simulator import ForwardDynamicsMode, InternalVisualizationType
from qsim.model_paths import models_dir, package_paths_dict

  import pydrake.solvers.mathematicalprogram as mp


In [2]:
def synthesize_controller(sample_generator_params: SampleGeneratorParams, dccm_params: DCCMParams):
    file_prefix = "saved_dccm_coeffs/circ_"
    base_filename = f"samples{sample_generator_params.n_samples}_lbw{sample_generator_params.log_barrier_weight}_deg{dccm_params.deg}_beta{str(dccm_params.beta).split('.')[1]}.npy"
    
    # Set up dynamical system
    package_paths_dict["dccm_quasistatic"] =  "/home/shaoyuan/Documents/Software/dccm_quasistatic"
    q_model_path = os.path.join("../../models", "q_sys", "box_pushing.yml")
    q_parser = QuasistaticParser(q_model_path)
    q_sim = q_parser.make_simulator_cpp()
    q_sim_py = q_parser.make_simulator_py(InternalVisualizationType.Cpp)
    
    sample_generator = SampleGenerator(sample_generator_params, q_sim=q_sim, q_sim_py=q_sim_py, parser=q_parser)

    if sample_generator_params.log_barrier_weight == 10:
        samples = sample_generator.generate_circular_traj_samples(b_r = 1.5, s_r_buff = 1.1, visualize=False)
    elif sample_generator_params.log_barrier_weight == 100:
        print("Using 100 log barrier weight")
        samples = sample_generator.generate_circular_traj_samples(b_r = 1.5, s_r_buff = 0.18, visualize=False)

    dccm_synth = DCCMSynthesizer(dccm_params)
    success, wijc, lijc = dccm_synth.calculate_dccm_from_samples(*samples)
    if success:
        np.save(f"{file_prefix}wijc_{base_filename}", wijc)
        np.save(f"{file_prefix}lijc_{base_filename}", lijc)
    return success

def evaluate_controller(sample_generator_params, dccm_params, traj_id):
    file_prefix = "saved_dccm_coeffs/circ_"
    base_filename = f"samples{sample_generator_params.n_samples}_lbw{sample_generator_params.log_barrier_weight}_deg{dccm_params.deg}_beta{str(dccm_params.beta).split('.')[1]}.npy"
    
    # Set up dynamical system
    package_paths_dict["dccm_quasistatic"] =  "/home/shaoyuan/Documents/Software/dccm_quasistatic"
    q_model_path = os.path.join("../../models", "q_sys", "box_pushing.yml")
    q_parser = QuasistaticParser(q_model_path)
    q_sim = q_parser.make_simulator_cpp()
    q_sim_py = q_parser.make_simulator_py(InternalVisualizationType.Cpp)



In [4]:
sample_generator_params = SampleGeneratorParams(
    log_barrier_weight=100,
    n_samples=100,
    workspace_radius=2,
    actuated_collision_geomtery_names=["hand::collision"]
)

dccm_params = DCCMParams(
    dim_x=5,
    dim_u=2,
    deg=4,
    beta=0.1,
    n_geodesic_segments=3,
)









INFO:drake:Meshcat listening for connections at http://localhost:7001


In [5]:


# Convex program so should not require initial guess
# initial_guess_base_filename = f"samples{2000}_lbw{sample_generator_params.log_barrier_weight}_wr{sample_generator_params.workspace_radius}_deg{dccm_params.deg}.npy"
# wijc_initial_guess = np.load(f"{file_prefix}wijc_{initial_guess_base_filename}")
# lijc_initial_guess = np.load(f"{file_prefix}lijc_{initial_guess_base_filename}")
# success = dccm_system.calculate_dccm_from_samples(*samples, wijc_initial_guess, lijc_initial_guess)

Start solving DCCM
Solver succeeded:  True  in  200.86884546279907  seconds
wijc:
 [[ 2.79169610e+00 -8.84300669e-01  6.46576343e+00 ... -2.05066958e+01
   1.36545255e+01  1.18893966e+02]
 [ 9.36572946e-03 -7.54964800e-02  2.95938545e-03 ... -9.95078794e-01
   2.32356547e+00  1.98730961e+00]
 [ 2.90441410e+00 -8.32517646e-01  6.49678331e+00 ... -1.89317333e+01
   1.74048297e+01  1.18363851e+02]
 ...
 [ 4.83434463e-03  6.78220583e-01 -9.44641219e-02 ...  8.66048870e+00
  -1.10413328e+01 -6.87870955e+00]
 [-1.33557681e-02 -7.41158602e-03 -1.70154432e-03 ... -6.32921069e-01
  -2.46170502e-01  3.52344569e-01]
 [-1.87636032e-01 -1.39876360e-01 -1.56370433e-01 ... -3.69697357e-01
   1.71030325e+00  1.86721771e+00]]

lijc:
 [[-8.87345059e-02 -1.40956617e-05 -1.44402369e-01 ...  2.65944217e-01
   2.08770204e-02  8.03874480e-01]
 [-2.78529429e-02  5.18019231e-03 -7.55203903e-03 ...  4.74691552e-02
  -1.30029664e-01 -8.06401751e-02]
 [ 4.91281383e-01  6.93822694e-02  3.53670387e-01 ... -2.160490

In [6]:
dccm_system = DCCMSystem(dccm_params)
# Load the DCCM from file
dccm_system._wijc = np.load(f"{file_prefix}wijc_{base_filename}")
dccm_system._lijc = np.load(f"{file_prefix}lijc_{base_filename}")

print(f"wijc:\n{dccm_system._wijc}")
print(f"lijc:\n{dccm_system._lijc}")

FileNotFoundError: [Errno 2] No such file or directory: 'saved_dccm_coeffs/circ_wijc_samples100_lbw100_wr2_deg4_beta1.npy'

In [None]:
sim_p = copy.deepcopy(q_sim.get_sim_params())
sim_p.h = 0.1
sim_p.unactuated_mass_scale = 10
# exact dynamics
# sim_p.gradient_mode = GradientMode.kNone
# sim_p.forward_mode = ForwardDynamicsMode.kQpMp
# Smoothed dynamics
sim_p.gradient_mode = GradientMode.kAB
sim_p.log_barrier_weight = sample_generator_params.log_barrier_weight
sim_p.forward_mode = ForwardDynamicsMode.kLogIcecream

meshcat = q_sim_py.meshcat

def draw_pose_of_box(name, pos, color):
    pose = RigidTransform(RotationMatrix().MakeXRotation(pos[-1]), np.concatenate(([0], pos[0:2])))
    
    meshcat.SetObject(name, Box(1,1,1), rgba=color)
    meshcat.SetTransform(name, pose)

def visualize_open_loop_traj(q0, q_desired):
    qs = [q0]
    q = q0
    for i, q_des in enumerate(q_desired):
        if i == len(q_desired) - 1:
            break
        u_desired = q_desired[i+1, -2:]
        b_des = q_des[:3]
        draw_pose_of_box("b_des", b_des, Rgba(0, 1, 0, 0.5))
        q_sim_py.update_mbp_positions_from_vector(q)
        q_sim_py.draw_current_configuration()
        time.sleep(0.1)
        q = q_sim.calc_dynamics(q, u_desired, sim_p)
        qs.append(q)
    return np.array(qs)
    
def make_desired_traj_achievable(q_desired):
    actual = [q_desired[0]]
    q = q_desired[0]
    for i, q_des in enumerate(q_desired):
        if i == len(q_desired) - 1:
            u_desired = q_desired[i, -2:]
        else:
            u_desired = q_desired[i+1, -2:]
        b_des = q_des[:3]
        q_sim_py.update_mbp_positions_from_vector(q)
        # q_sim_py.draw_current_configuration()
        # time.sleep(0.02)
        q = q_sim.calc_dynamics(q, u_desired, sim_p)
        actual.append(q)
    return np.array(actual)

# This trajectory of u_desired works open loop
# Most ideal trajectory for b_r = 1.2
def make_idealized_traj(b_r, s_r, theta_buff, n_steps):
    q_desired = []
    for i in range(n_steps):
        b_theta = np.pi * 2 * i / n_steps
        b_x = b_r * np.sin(b_theta)
        b_y = b_r * np.cos(b_theta)
        s_theta = b_theta - theta_buff
        s_x = s_r * np.sin(s_theta)
        s_y = s_r * np.cos(s_theta)
        q = np.array([b_x, b_y, -b_theta, s_x, s_y])
        q_desired.append(q)
        q_sim_py.update_mbp_positions_from_vector(q)
        # q_sim_py.draw_current_configuration()
        # time.sleep(0.03)
        # q_next = q_sim.calc_dynamics(q, q[-2:], sim_params)
        
        # q_sim_py.update_mbp_positions_from_vector(q_next)
        # q_sim_py.draw_current_configuration()
        # time.sleep(0.3)
    return np.array(q_desired)

if sample_generator_params.log_barrier_weight == 100:
    n_steps = 100
    b_r = 1.4
    s_r = b_r + 1.3
    theta_buff = 0.9
    q_desired_1 = make_idealized_traj(b_r, s_r, theta_buff, n_steps)
    q_desired_1 = make_desired_traj_achievable(np.array(q_desired_1))

    # visualize_open_loop_traj(q_desired_1[0], q_desired_1)

    b_r = 1.5
    s_r = b_r + 1.1
    theta_buff = 0.9
    q_desired_2 = make_idealized_traj(b_r, s_r, theta_buff, n_steps)
    q_desired_2 = make_desired_traj_achievable(np.array(q_desired_2))
    # visualize_open_loop_traj(q_desired_2[0], q_desired_2)

    # Splice the the two trajectories together
    break_idx = int(n_steps/3)
    # lower, higher, lower
    # q_desired = np.concatenate((q_desired_1[:break_idx], q_desired_2[break_idx:break_idx*2], q_desired_1[break_idx*2:]), axis=0)
    # higher, lower, higher
    # q_desired = np.concatenate((q_desired_2[:break_idx], q_desired_1[break_idx:break_idx*2], q_desired_2[break_idx*2:]), axis=0)

    break_idx = int(n_steps/2)
    # lower, higher # this one seems to correct itself not a good example
    # q_desired = np.concatenate((q_desired_1[:break_idx], q_desired_2[break_idx:]), axis=0)
    # higher, lower # this is a pretty good test case, see if the controller can achieve lower error than this open loop traj
    # traj A
    traj_id = "A"
    q_desired = np.concatenate((q_desired_2[:break_idx], q_desired_1[break_idx:]), axis=0)
    # q_desired = q_desired_2

    initial_offset = np.array([0.2, -0.1, 0.1, 0.1, 0.2])
    q = q_desired[0] + initial_offset

elif sample_generator_params.log_barrier_weight == 10:
    n_steps = 100
    b_r = 1.5
    s_r = b_r + 0.18
    theta_buff = 0.4
    q_desired = make_idealized_traj(b_r, s_r, theta_buff, n_steps)
    q = q_desired[0]

openloopqs = visualize_open_loop_traj(q, q_desired)
    


In [None]:
total_duration_s = 10

# q_desired = np.array(q_desired)
# initial_offset = np.array([0.2, -0.1, 0.1, 0.1, 0.2])
# q = q_desired[0] + initial_offset

q_sim_py.update_mbp_positions_from_vector(q)
q_sim_py.draw_current_configuration()

n_steps = int(total_duration_s / sim_p.h)
t_log = []
q_log = []
u_log = []
u_desired_log = []
geodesic_log = []
success_log = []
tracking_error_log = []

plt.rcParams.update({'font.size': 40})
linewidth = 5
def draw_plot(t, q, q_des, u, u_des, geodesic, tracking_error):
    clear_output(wait=True)
    fig, axs = plt.subplots(4, 1, figsize=(45, 32))

    axis = axs[0]
    axis.step(t, q, linestyle='solid', linewidth=linewidth)
    axis.step(t, q_des, linestyle='dashed', linewidth=linewidth)
    axis.legend([r'$b_x$', r'$b_y$', r'$b_{\theta}$', r'$s_x$', r'$s_y$',
                 r'$b^*_x$', r'$b^*_y$', r'$b^*_{\theta}$', r'$s^*_x$', r'$s^*_y$'],
                 ncol=2)
    axis.set_ylabel('state')
    axis.set_xlabel('t')

    axis = axs[1]
    axis.step(t, u, linestyle='solid', linewidth=linewidth)
    axis.step(t, u_des, linestyle='dashed', linewidth=linewidth)
    axis.legend([r'$u_{x}$',r'$u_{y}$', r'$u^*_{x}$',r'$u^*_{y}$'],
                ncol=2)
    axis.set_ylabel('u')
    axis.set_xlabel('t')

    axis = axs[2]
    axis.step(t, geodesic, linewidth=linewidth)
    axis.set_ylabel('geodesic length')
    axis.set_xlabel('t')

    axis = axs[3]
    axis.step(t, tracking_error, linewidth=linewidth)
    axis.set_ylabel('tracking error')
    axis.set_xlabel('t')
    plt.show()

for i in range(n_steps):
    t=i*sim_p.h
    t_log.append(t)
    q_log.append(q)
    if i == len(q_desired) - 1:
        break
    u_desired = q_desired[i+1, -2:]
    tracking_error = np.linalg.norm(q - q_desired[i])
    tracking_error_log.append(tracking_error)
    # Dummy data
    # u = u_desired
    # geodesic = 0
    # success = True
    u, geodesic, success = dccm_system.control_law(q,q_desired[i, :], u_desired, t)
    q = q_sim.calc_dynamics(q, u, sim_p)
    q_sim_py.update_mbp_positions_from_vector(q)
    q_sim_py.draw_current_configuration()
    b_des = q_desired[i+1, :3]
    draw_pose_of_box("b_des", b_des, Rgba(0, 1, 0, 0.1))
    draw_pose_of_box("open loop b", openloopqs[i+1, :3], Rgba(1, 0, 0, 0.1))
    
    u_desired_log.append(u_desired)
    u_log.append(u)
    geodesic_log.append(geodesic)
    success_log.append(success)
    draw_plot(t_log, q_log, q_desired[:i+1, :], u_log, u_desired_log, geodesic_log, tracking_error_log)
    print(f"success: {success_log}")


print(f"total tracking error: ", np.sum(tracking_error_log))



In [None]:
import zarr
results_file_name = f"results/circ_traj{traj_id}_ngeoseg_{dccm_params.n_geodesic_segments}_{base_filename}.zarr"
root = zarr.open_group(results_file_name, mode='w')

root["success"] = success_log
root["t"] = t_log
root["q_desired"] = q_desired
root["u_desired"] = u_desired_log
root["q"] = q_log
root["u"] = u_log
root["geodesic"] = geodesic_log
root["tracking_error"] = tracking_error_log


In [None]:
root = zarr.open_group(results_file_name, mode='a')
print(np.sum(root["tracking_error"]))
def get_arrays_from_root(root):
    success = root["success"]
    t = root["t"]
    q_desired = root["q_desired"]
    u_desired = root["u_desired"]
    q = root["q"]
    u = root["u"]
    geodesic = root["geodesic"]
    tracking_error = root["tracking_error"]
    return success, t, q_desired, u_desired, q, u, geodesic, tracking_error

