# Testing a pure pytorch MPC design

In [None]:
import numpy as np
import torch as th
π = float(th.pi)
from torch.nn import Module, Parameter
import torch.nn.functional as F
from tqdm import tqdm

import matplotlib.pyplot as plt
from matplotlib.ticker import MultipleLocator, FuncFormatter
from matplotlib.animation import FuncAnimation
from IPython.display import HTML, display
# set default matplotlib params 
plt.rcParams['figure.figsize'] = (12, 8)
plt.rcParams['axes.grid'] = True
plt.rcParams['grid.alpha'] = 0.2
plt.style.use('dark_background')

In [None]:
# define model dynamics, single pendulum, input = torque
DEFAULT_PAR = [9.81, 1.0, 1.0, 0.05]  # g, l, m, b
def pendulum_dynamics(x, u, par=DEFAULT_PAR):
    # x = [theta, theta_dot]
    # u = [tau] (torque)
    g, l, m, b = par
    x_shape = x.shape # save original shape
    x = x.view(-1, 2)  # ensure x is of shape (n_par, 2)
    u = u.view(-1, 1)  # ensure u is of shape (n_par, 1)
    dtheta = x[:, 1]
    ddtheta = (u[:, 0] - m * g * l * th.sin(x[:, 0] + π) - b * x[:, 1]) / (m * l**2)
    return th.stack([dtheta, ddtheta], dim=1).view(x_shape)

def rk4_step(func, x, u, dt, par=DEFAULT_PAR):
    k1 = func(x, u, par=par)
    k2 = func(x + 0.5 * dt * k1, u, par=par)
    k3 = func(x + 0.5 * dt * k2, u, par=par)
    k4 = func(x + dt * k3, u, par=par)
    return x + (dt / 6) * (k1 + 2 * k2 + 2 * k3 + k4)

def euler_step(func, x, u, dt, par=DEFAULT_PAR):
    k1 = func(x, u, par=par)
    return x + dt * k1

def anim(xs):
    fig, ax = plt.subplots()
    line, = ax.plot([], [], 'o-')
    ax.set_xlim(-1.5, 1.5)
    ax.set_ylim(-1.5, 1.5)
    ax.set_aspect('equal')

    def update(frame):
        x = xs[frame]
        line.set_data([0, th.cos(x[0]+π/2)], [0, th.sin(x[0]+π/2)])
        return line,

    ani = FuncAnimation(fig, update, frames=len(xs), blit=True, interval=3*100/6)
    plt.close(fig)
    return display(HTML(ani.to_jshtml()))

def pi_formatter(x, pos):
    frac = x / np.pi
    if np.isclose(frac, 0):
        return "0"
    elif np.isclose(frac, 0.5):
        return r"$\frac{\pi}{2}$"
    elif np.isclose(frac, 1):
        return r"$\pi$"
    elif np.isclose(frac, -0.5):
        return r"$-\frac{\pi}{2}$"
    elif np.isclose(frac, -1):
        return r"$-\pi$"
    else:
        return r"${:.2f}\pi$".format(frac)
    
def my_plot(xs):
    # plot trajectory in phase space
    plt.figure(figsize=(8, 4))
    plt.plot(xs[:, 0].numpy(), xs[:, 1].numpy())

    plt.gca().xaxis.set_major_locator(MultipleLocator(π/8))
    plt.gca().xaxis.set_major_formatter(FuncFormatter(pi_formatter))
    plt.xlabel('Theta (rad)')
    plt.ylabel('Omega (rad/s)')
    plt.title('Phase Space Trajectory')

    plt.tight_layout()
    plt.show()

    anim(xs[::10])

In [None]:
## Create a simulation
x0 = th.tensor([0.5, 0.0])  # initial state (theta, omega)
dt = 0.01  # time step
T = 10.0  # total time

num_steps = int(T / dt)
xs = th.zeros((num_steps + 1, 2))
xs[0] = x0
for i in range(num_steps):
    u = th.tensor([0.0])  # constant torque
    x = rk4_step(pendulum_dynamics, xs[i], u, dt)
    xs[i + 1] = x

# my_plot(xs)s

In [None]:
# create n_par parallel simulations
n_par = 100
x0s = th.tensor([0.5, 0.0]) + th.randn(n_par, 2) * 0.5

xs = th.zeros((n_par, num_steps + 1, 2))
xs[:, 0] = x0s
for i in range(num_steps):
    u = th.zeros((n_par, 1))  # constant torque
    x = rk4_step(pendulum_dynamics, xs[:, i], u, dt)
    xs[:, i + 1] = x

#plot all trajectories
plt.figure()
for i in range(n_par):
    plt.plot(xs[i, :, 0].numpy(), xs[i, :, 1].numpy(), alpha=0.5)
plt.gca().xaxis.set_major_locator(MultipleLocator(π/8))
plt.gca().xaxis.set_major_formatter(FuncFormatter(pi_formatter))
plt.xlabel('Theta (rad)')
plt.ylabel('Omega (rad/s)')
plt.title('Phase Space Trajectories (Multiple Initial States)')
plt.tight_layout()
plt.show()

In [None]:
def soft_constraint(x, min_val, max_val, alpha=1.0):
    return F.relu(min_val - x) * th.exp(alpha * (min_val - x)) + F.relu(x - max_val) * th.exp(alpha * (x - max_val))

# # plot the soft constrain function
# MIN_X, MAX_X = -4, 6
# x = th.linspace(2*MIN_X, 2*MAX_X, 1000)
# for α in [0.1, 0.5, 1.0, 2.0, 5.0]:
#     y = soft_constraint(x, MIN_X, MAX_X, alpha=α)
#     plt.plot(x.numpy(), y.numpy(), label=f'α={α}')
# plt.axvline(MIN_X, color='red', linestyle='--', label='MIN_X')
# plt.axvline(MAX_X, color='green', linestyle='--', label='MAX_X')
# plt.legend()
# plt.title('Soft Constraint Function')
# plt.xlabel('x')
# plt.ylabel('Penalty')
# plt.ylim(-1, 10)
# plt.grid()
# plt.show()

In [None]:
# ES_LOSS = 5e-3 # minimum loss for early stopping
ES_LOSS = 5e-4 # minimum loss for early stopping

def calc_optimal_u(x0, ug=None, H=0.3, n_iter=20, dt=0.02, u_clip=18.0, verbose=False, return_states=False): #u_clip 8.0     
    nH = int(H / dt)  # number of time steps in horizon

    # weight last states more
    exp_decay_weight = th.ones(nH)
    # exp_decay_weight = th.logspace(-2, 0, nH)
    # exp_decay_weight = th.linspace(0.1, 1.0, nH)
    # exp_decay_weight = th.linspace(0.1, 1.0, nH)**(1/2)

    if ug is None: # no initial guess provided
        ug = th.zeros(nH, 1)
    else: assert ug.shape == (nH, 1), f'ug.shape should be ({nH}, 1), but got {ug.shape}'
    us = Parameter(ug.clone(), requires_grad=True)

    # ug = th.randn(nH, 1)*0.75*u_clip
    us = Parameter(ug.clone(), requires_grad=True)

    # lrs = 0.5 * th.logspace(0, -2, n_iter) # decaying lr
    lrs = .1 * th.ones(n_iter)  # constant lr
    # optimizer = th.optim.Adam([us], lr=lrs[0])
    optimizer = th.optim.AdamW([us], lr=lrs[0], weight_decay=1e-2)

    for i in range(n_iter):  # optimization loop
        optimizer.zero_grad()
        for pg in optimizer.param_groups: pg['lr'] = lrs[i]  # update learning rate

        # Initialize xs as a list to append states
        # states = [x0]
        x = x0
        state_loss = 0.0

        for t in range(nH):
            u = us[t]
            x = euler_step(pendulum_dynamics, x, u, dt)
            state_loss += (x[0]**2 + 1e-2 * x[1]**2)*exp_decay_weight[t]  # accumulate state loss

        terminal_loss = 10.0 * (x[0]**2 + 1e-2 * x[1]**2)  # terminal cost
        control_continuity_loss = 1e-3 * th.sum(th.diff(exp_decay_weight * us, dim=0)**2) 
        control_soft_constraint = soft_constraint(us, -u_clip, u_clip).sum() 
        control_value_loss = 1e-4 * F.l1_loss(us, th.zeros_like(us))
        if verbose: print(f'state: {state_loss.item()}, constr: {control_soft_constraint.item()}, continuity: {control_continuity_loss.item()}, term: {terminal_loss.item()}')

        # loss = terminal_loss + state_loss + control_soft_constraint + control_continuity_loss
        # loss = terminal_loss + control_soft_constraint + control_continuity_loss
        # loss = state_loss + control_continuity_loss
        loss = state_loss + control_continuity_loss + control_soft_constraint 
        loss = state_loss + control_continuity_loss + control_value_loss
        # loss = terminal_loss + control_continuity_loss + control_soft_constraint
        # loss = state_loss
        loss.backward()

        # print the gradients
        if verbose: print(f'us.grad: {us.grad.view(-1)}')

        optimizer.step()

        if verbose: print(f'Iter: {i+1}/{n_iter}, Loss: {loss.item()}')
        if loss.item() < ES_LOSS: break # early stopping
    # if terminal_loss.item() > 1.0: print(f'Warning: cannot see a feasible trajectory, terminal loss: {terminal_loss.item()}')
    if return_states: 
        xs = th.zeros((nH+1, 2))
        xs[0] = x0
        for t in range(nH):
            xs[t+1] = euler_step(pendulum_dynamics, xs[t], us[t].detach(), dt)
        return us.detach(), loss.item(), xs[:-1]
    else: return us.detach(), loss.item()


# test
u, _, xs = calc_optimal_u(th.tensor([0.5, 0.0]), H=0.3, verbose=True, return_states=True)
print(u[0])

# plt.figure()
# plt.subplot(2,1,1)
# plt.plot(u.numpy())
# plt.title('Control Inputs')
# plt.subplot(2,1,2)
# plt.plot(xs.numpy())
# plt.title('State Trajectory')

In [None]:
## Test Adam optimizer on input
dt = 0.001  # sim time step [s]
dtc = 0.05  # control model sim time step
cf = 100 # control frequency: control step every cf sim steps
T = 5.0  # total time [s]

# U_CLIP = 18.0 #easy
U_CLIP = 6.0 #hard

# H = 3.0 # horizon [s]
# H = 2.5 # horizon [s]
H = 2.0 # horizon [s]
# H = 1.2 # horizon [s]
# H = 0.8 # horizon [s]
# H = 0.5 # horizon [s]
# H = 0.3 # horizon [s]

# x0 = th.tensor([0.5, 0.0])  # easy
x0 = th.tensor([π-0.4, 0.0])  # hard

n_sim_steps = int(T / dt)


us0, _, xs0 = calc_optimal_u(x0, H=H, dt=dtc, n_iter=200, u_clip=U_CLIP, return_states=True)  # initial guess for the first step
print(f'Initial optimal input: {us0.view(-1)}')
u_prev = us0.clone()


# simulation with optimal control inputs at every step
xs = th.zeros((n_sim_steps + 1, 2))
xs[0] = x0
us, losses = th.zeros(n_sim_steps, int(H / dtc)), th.zeros(n_sim_steps)
for t in tqdm(range(n_sim_steps)):
    if t % cf == 0:  # control step
        u, l = calc_optimal_u(xs[t], ug=u_prev, H=H, dt=dtc, u_clip=U_CLIP)
    xs[t + 1] = rk4_step(pendulum_dynamics, xs[t], u[0], dt) + th.randn(2)*1e-3
    us[t], losses[t] = u.view(-1), l
    u_prev = u

In [None]:
#plot xs and u on a 3 raws plot
plt.figure(figsize=(12, 8))
t1 = np.arange(n_sim_steps) * dt
t2 = np.arange(n_sim_steps) * dtc

print(f'xs: {xs.shape}, us: {us.shape}, losses: {losses.shape}')

plt.subplot(3, 1, 1)
plt.plot(t1, xs[:-1, 0].numpy(), label='Theta (rad)')
plt.plot(t1, xs[:-1, 1].numpy(), label='Omega (rad/s)')
plt.plot(t2[:len(xs0)], xs0[:,0].numpy(), '--', label='Initial Theta (rad)')
plt.plot(t2[:len(xs0)], xs0[:,1].numpy(), '--', label='Initial Omega (rad/s)')

plt.ylabel('States')
plt.legend()

plt.subplot(3, 1, 2)
plt.plot(t1, us[:,0].numpy(), label='Control Inputs (Torque)')
plt.plot(t2[:len(us0)], us0.numpy(), '--', label='Initial Control Inputs (Torque)')
plt.ylabel('Control Inputs')
plt.legend()

plt.subplot(3, 1, 3)
plt.plot(t1, losses.numpy(), label='Loss')
plt.axhline(ES_LOSS, color='red', linestyle='--', label='ES_LOSS')
plt.ylabel('Loss')
plt.yscale('log')
plt.legend()

plt.tight_layout()
plt.show()

my_plot(xs[::10])