In this notebook, we show how to generate a FFMPC-acados solver using the interface of our FFMPC class

In [None]:
from ffmpc import FFMPC, FFMPCOptions
import numpy as np
import casadi as ca
from acados_template import AcadosModel
from trunk.ODE_chains.eom_system_16_casadi import eom_16 # our ODE
from trunk.ODE_chains.ODE_utils import get_ode_params_dict, params_dict_to_list
import matplotlib.pyplot as plt
from trunk.old.Trajectory_finding_script import select_periodic_trajectory

# create subclass of FFMPC that overwrites required functions with example specific details
class TrunkFFMPC(FFMPC):
    def __init__(self, Phi_t, Phi_dot_t, options: FFMPCOptions):
        super().__init__(options) 

        # save trajectory functions 
        self.Phi_t = Phi_t
        self.Phi_dot_t = Phi_dot_t

    def get_full_model(self):
        n_q = 16 # here, we consider 16 link system
        q = ca.MX.sym('q', n_q)  # generalized coodinates of trunk like system
        q_dot = ca.MX.sym('q_dot', n_q)  
        x = ca.vertcat(q, q_dot) # state
        u = ca.MX.sym('u', 2)  # Control input are the joint torques in the lower and upper half

        # transform inputs into section whise torques applied to hinge joints, this is required as our ODE takes in joint torques at the hinges and not u
        u_expanded = ca.vertcat( # expand u to actuate all joints
            ca.repmat(u[0], n_q // 2, 1),  # Repeat u[0] n_half times
            ca.repmat(u[1], n_q // 2, 1)  # Repeat u[1] n_half times
        )

        xdot = ca.MX.sym('xdot', 2*n_q)
        q_ddot = xdot[n_q:]

        # Create the AcadosModel 
        model = AcadosModel()
        model.name = "trunk_like_model"
        model.x = x  # Define the state
        model.q = q  # Also store the generalized coordinates (part of the state) 
        model.q_dot = q_dot
        model.u = u  # Define the control input
        model.xdot = xdot

        f_impl_expr_q_dot = xdot[:n_q] - q_dot
        params_list = params_dict_to_list(get_ode_params_dict()[f"{n_q}"])
        f_impl_expr_q_ddot = eom_16(q, q_dot, q_ddot, u_expanded, params_list) # contains the ODE for the 16-link 2D trunk
        model.f_impl_expr = ca.vertcat(f_impl_expr_q_dot, f_impl_expr_q_ddot)  # Implicit dynamics 

        return model
          
    def get_reduced_model(self):
        # state
        x = ca.MX.sym('x')       
        y = ca.MX.sym('y')       
        vx = ca.MX.sym('vx')     
        vy = ca.MX.sym('vy')     

        # controls
        ax = ca.MX.sym('ax')     
        ay = ca.MX.sym('ay') 

        # Combine states and controls into vectors
        states = ca.vertcat(x, y, vx, vy)
        controls = ca.vertcat(ax, ay)

        # dx/dt = [vx, vy, ax, ay]
        f_expl = ca.vertcat(vx, vy, ax, ay)

        # Create the ACADOS model
        model = AcadosModel()
        model.name = "point_mass_model"
        model.xdot = ca.MX.sym('xdot', 4)   

        # Assign states, controls, and dynamics
        model.x = states          # State vector
        model.ee_pos = ca.vertcat(x, y)
        model.u = controls        # Control vector
        model.f_expl_expr = f_expl  # Explicit dynamics
        model.f_impl_expr = model.xdot - f_expl  # Implicit dynamics (if needed)

        return model

    def get_transition_model(self):
        model = AcadosModel()
        model.name = "transition_model"
        n_q = 16
        q = ca.MX.sym('q', n_q)  # generalized coodinates
        q_dot = ca.MX.sym('q_dot', n_q)  # generalized coodinates
        model.x = ca.vertcat(q, q_dot) # state
        model.u = ca.SX.sym('u', 0, 0) # transition model has no inputs
        
        # transition map maps from q, q_dot of trunk-like system to x-y postiion and velocities of endeffector (state of the point-mass model
        x_pos_ee, y_pos_ee, x_ee_dot, y_ee_dot = self.forward_kinematics_ee_casadi(q, q_dot)  
        model.disc_dyn_expr = ca.vertcat(x_pos_ee, y_pos_ee, x_ee_dot, y_ee_dot)
        return model

    # this is a helper for the transition model and the cost in the trunk problem
    def forward_kinematics_ee_casadi(self, q, q_dot):
        l = get_ode_params_dict()[f"{16}"]["l"] # note that l in params dict corresponds to half length
        x_pos_ee, y_pos_ee, q_tot = 0, 0, 0
        for i in range(16):
            q_tot += q[i]
            x_pos_ee += 2*l*ca.sin(q_tot)
            y_pos_ee -= 2*l*ca.cos(q_tot)
        x_ee_dot = ca.jacobian(x_pos_ee, q) @ q_dot # Chain rule: dx/dq * q_dot
        y_ee_dot = ca.jacobian(y_pos_ee, q) @ q_dot  # Chain rule: dy/dq * q_dot    
        return x_pos_ee, y_pos_ee, x_ee_dot, y_ee_dot 

    def cost_expression_full(self, model):
        x_pos_ee, y_pos_ee, x_ee_dot, y_ee_dot = self.forward_kinematics_ee_casadi(model.q, model.q_dot)
        cost_y_expr = ca.vertcat(x_pos_ee, y_pos_ee, x_ee_dot, y_ee_dot, model.u)
        cost_y_expr_e = ca.vertcat(x_pos_ee, y_pos_ee, x_ee_dot, y_ee_dot)  # Terminal cost remains only on ee_pos, ee_pos_dot
        return cost_y_expr, cost_y_expr_e
    
    def cost_expression_reduced(self, model):
        cost_y_expr = ca.vertcat(model.x, model.u)
        cost_y_expr_e = model.x
        return cost_y_expr, cost_y_expr_e

    # if you want to overwrite solver options, do it here. Some examples:
    def overwrite_default_solver_options(self, ocp):
        ocp.solver_options.qp_solver = "PARTIAL_CONDENSING_HPIPM"
        ocp.solver_options.hessian_approx = 'GAUSS_NEWTON'
        ocp.solver_options.nlp_solver_type = "SQP"
        ocp.solver_options.nlp_solver_max_iter = 150
        ocp.solver_options.nlp_solver_tol_comp = 1e-7
        ocp.solver_options.nlp_solver_tol_eq = 1e-7
        ocp.solver_options.nlp_solver_tol_ineq = 1e-7
        ocp.solver_options.nlp_solver_tol_stat = 1e-7 
        return ocp

    def set_constraints_in_ocp_full(self, ocp):
        # Here ee-x position is constrained
        x_pos_ee, _, _, _ = self.forward_kinematics_ee_casadi(ocp.model.q, ocp.model.q_dot)
        ocp.model.con_h_expr = x_pos_ee
        ocp.model.con_h_expr_e = x_pos_ee
        ocp.constraints.lh = np.array([-1e15])   
        ocp.constraints.lh_e = np.array([-1e15]) 
        ocp.constraints.uh = np.array(0.2)  
        ocp.constraints.uh_e = np.array(0.2)
        return ocp

    def set_constraints_in_ocp_reduced(self, ocp):
        ocp.model.con_h_expr = ocp.model.x[0]
        ocp.model.con_h_expr_e = ocp.model.x[0]
        ocp.constraints.lh = np.array([-1e15])   
        ocp.constraints.lh_e = np.array([-1e15]) 
        ocp.constraints.uh = np.array(0.2)  
        ocp.constraints.uh_e = np.array(0.2)
        return ocp

    def get_y_ref(self, t, terminal=False, phase="full"):
        theta_ref = np.array(self.Phi_t(t)).squeeze()
        theta_dot_ref = np.array(self.Phi_dot_t(t)).squeeze()
        y_ref = [theta_ref[0], theta_ref[1], theta_dot_ref[0], theta_dot_ref[1]]
        if not terminal:
            y_ref += [0, 0]
        return np.array(y_ref)
    
    # If you want to set different solver options
    def overwrite_default_solver_options(self, ocp):
        ocp.solver_options.nlp_solver_max_iter = 150
        ocp.solver_options.levenberg_marquardt = 0.0
        ocp.solver_options.nlp_solver_tol_comp = 1e-7
        ocp.solver_options.nlp_solver_tol_eq = 1e-7
        ocp.solver_options.nlp_solver_tol_ineq = 1e-7
        ocp.solver_options.nlp_solver_tol_stat = 1e-7  
        return ocp

    def plot_closed_loop_trajectory(self, x_traj, u_traj):

        # evaluate trajectory for one period for plotting
        T = 1 / 1.0 # Here, trajectory frequency is 1Hz
        time_steps = np.linspace(0, T, 100)  
        trajectory = np.array([self.Phi_t(t_val) for t_val in time_steps])

        # Extract x and z coordinates
        trajectory_x = trajectory[:, 0]  # x-coordinates
        trajectory_z = trajectory[:, 1]  # z-coordinates

        # Compute end-effector positions based on x-trajectory
        l = get_ode_params_dict()[f"{16}"]["l"] # note that l in params dict corresponds to half length
        x_ee_positions = []
        y_ee_positions = []
        for x in x_traj:
            q = x[:16]
            x_pos_ee, y_pos_ee, q_tot = 0, 0, 0
            for i in range(16):
                q_tot += q[i]
                x_pos_ee += 2*l*np.sin(q_tot)
                y_pos_ee -= 2*l*np.cos(q_tot)
            x_ee_positions.append(np.array(x_pos_ee))
            y_ee_positions.append(np.array(y_pos_ee))
        x_ee_positions = np.array(x_ee_positions)
        y_ee_positions = np.array(y_ee_positions)

        # Plot the development of the end-effector in the x-z plane
        plt.plot(x_ee_positions, y_ee_positions , color='red', label='Closed-loop Trajectory')
        plt.plot([0.2, 0.2], [-0.5, -0.25], color='black', linestyle='--', linewidth=2.0, label='Constraint')

        # Add an ellipsoid with parameters a and b around the origin
        plt.plot(trajectory_x, trajectory_z, '--', color='green', label='Desired Trajectory')

        plt.xlabel('x-position [m]')
        plt.ylabel('z-position [m]')
        plt.axis('equal')  # Make the axes equal
        plt.xlim([-0.3, 0.3])  # Set the range from -1.0 to 2.0 on the x-axis
        plt.legend(loc="upper left")
        plt.grid(color='lightgray', linestyle='-', linewidth=0.5)
        plt.show()

In [None]:
# load a saved trajectory for the trunk:
trajectory_name = "trajectory_oval" # "trajectory_new" if new trajectory is created with the trajectory finding script
frequency = 1.0 # frequency with which the reference trajectory is considered

# load the considered trajectory
Phi_t_func, Phi_dot_t_func = select_periodic_trajectory(n=16, frequency=frequency, trajectory_name=trajectory_name, plot=False)

First, create the FFMPC

In [None]:
# specify desired options
ffmpc_trunk_options = FFMPCOptions()
ffmpc_trunk_options.N = 25
ffmpc_trunk_options.dt_initial = 0.005
ffmpc_trunk_options.T = ffmpc_trunk_options.dt_initial*40
ffmpc_trunk_options.switch_stage_k_bar = 12
ffmpc_trunk_options.Q_full = np.diag([4.0, 4.0, 0.05, 0.05])
ffmpc_trunk_options.R_full = np.diag([0.015, 0.015])
ffmpc_trunk_options.Q_red = np.diag([4.0, 4.0, 0.05, 0.05]) 
ffmpc_trunk_options.R_red = np.diag([0.00001, 0.00001])
ffmpc_trunk_options.system_name = "trunk-like" 
ffmpc_trunk_options.overwrite_y_ref = True

# create FFMPC objects, this intiates to creation of a multi-phase acados solver
ffmpc_trunk = TrunkFFMPC(Phi_t_func, Phi_dot_t_func, ffmpc_trunk_options)

In [None]:
# simulate ffmpc in closed loop 
dt_sim = 0.0025 # slightly smaller simulation frequency for higher resolution
sim_duration = 10.0 
x0 = np.zeros((2 * 16,))
_, _, stage_costs_ffmpc, solve_times_ffmpc = ffmpc_trunk.simulate_closed_loop(x0, dt_sim, sim_duration, plot=True) 

Next, create full resolution baseline

In [None]:
from copy import deepcopy
# options corresponding to a full resolution baseline
baseline_options = deepcopy(ffmpc_trunk_options)
baseline_options.N = 40
baseline_options.T = ffmpc_trunk_options.dt_initial*40
baseline_options.switch_stage_k_bar = baseline_options.N + 1 # no model switching

# create FFMPC objects, this intiates to creation of a multi-phase acados solver
full_resolution_mpc_trunk = TrunkFFMPC(Phi_t_func, Phi_dot_t_func, baseline_options)


In [None]:
# simulate baseline in closed loop 
_, _, stage_costs_baseline, solve_times_baseline = full_resolution_mpc_trunk.simulate_closed_loop(x0, dt_sim, sim_duration, plot=True) 

In [None]:
# compare ffmpc with baseline
speed_improvement_factor = np.mean(solve_times_baseline) / np.mean(solve_times_ffmpc)
closed_loop_cost_increase_in_percent = 100*(np.mean(stage_costs_ffmpc) - np.mean(stage_costs_baseline)) / np.mean(stage_costs_baseline)

print(f"FFMPC improved solve time by a factor of {speed_improvement_factor}, while increasing the closed loop costs by {closed_loop_cost_increase_in_percent} %")

Next, let's do a sweep over switching indices for the given step size schedule 

In [None]:
ffmpc_trunk.switching_stage_sweep(
    x0, dt_sim, sim_duration,
    build_fn=lambda opts: TrunkFFMPC(Phi_t_func, Phi_dot_t_func, opts)
)