In [None]:
# PID Swingup controller and inverted pendulum mujoco environments have been provided by Xian Li

#%%capture
%load_ext autoreload
%autoreload 2

import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
import casadi as ca
from IPython.display import clear_output, display
import time

import matplotlib.ticker as ticker
plt.rcParams.update(
    {
        "text.usetex": True,
        "font.family": "serif",
        "font.sans-serif": "Times",
        "font.size": 10,
    }
)


from select_deepc.deepc_controller import *

PAPER_COL_WIDTH_IN = (6.75 - 0.25) / 2
PAPER_CONTENT_WIDTH_IN = 6.75
PAPER_COL_WIDTH_CM = PAPER_COL_WIDTH_IN * 2.54
PAPER_CONTENT_WIDTH_CM = PAPER_CONTENT_WIDTH_IN * 2.5

color_iso = "salmon"
color_l1 = "chocolate"
color_rand = "orchid"

In [None]:
import sys
from os import path
sys.path.append(path.join(path.dirname(path.abspath(""))))
sys.path.append(path.join(path.dirname(path.abspath("")), "deepc"))
from deepc.deepc_controller import *

In [None]:
gym.envs.register(
    "InvertedPendulum-v4-swingup",
    entry_point="inverted_pendulum_v4_swingup:InvertedPendulumSwingupEnv",
    max_episode_steps=1000,
    reward_threshold=950.0,
)

In [None]:
# Perform the swingu up with a feedback controller, to gather data
np.random.seed(4)

# whether to show the animation, faster to run without
show_animation = False
# whether to show the predictions, cleaner plots without
show_predictions = 0

# Parameters for the feedback controller
ksu = 1.05
kcw = 1.5
kvw = 5
kem = 1e-13
eta = 1.0

m = 4.78
M = 10.47
L = 0.3
g = 9.81
Vd = 2 * m * g * L

xmax = 0.7
vmax = 10

# PID controller parameters
Kp_p = -3.2
Ki_p = -0.0
Kd_p = 0.5

Kp_c = 0.2
Ki_c = -0.005
Kd_c = 0.5

integral_p = 0.0
integral_c = 0.0

stabilise = 0
excitation = 0.02
numSteps = 20000
y = np.zeros(numSteps)
x = np.zeros((4, numSteps))

u = np.random.normal(0, 1, numSteps)

reset_time = []
failed = []
seed = 3

refpos = 0

if show_animation:
    env = gym.make("InvertedPendulum-v4-swingup", render_mode="human")
else:
    env = gym.make("InvertedPendulum-v4-swingup")
obs, info = env.reset(seed=seed)

# TODO: directly gather TrajectoryData
for k in range(numSteps):
        obs += np.random.normal(0, 0.00001, (np.size(obs)))
        angle = obs[1]  # Angle of the pendulum
        dangle = obs[3]
        pos = obs[0]
        speed = obs[2]
        error_p = -angle 
        error_c = refpos - pos  
        integral_c += error_c
                
        angle = obs[1] - np.pi
        V = 0.5 * m * L**2 * dangle**2 + m * g * L * (1 - np.cos(angle))
        angle = obs[1]
        
        # if the pendulum is close to the upright position, switch to the stabilising controller
        if np.abs(obs[1]) < 0.2:
            if stabilise == 0:
                refpos = pos
                integral_c = 0
            stabilise = 1

        if not stabilise:
            # the swing up controller
            u[k] = -ksu * np.sign(dangle * np.cos(angle)) + kcw * np.sign(pos) * np.log(1 - np.abs(pos)/(xmax*2)) + \
                    kem * (np.exp(np.abs(V - eta * Vd)) - 1) * np.sign(V - Vd) * np.sign(dangle * np.cos(angle)) + np.random.normal(0, excitation)
            if k > 200:
                u[k] += np.random.normal(0, 0.1)
        else:
            # the stabilising controller
            u[k] = Kp_p * error_p + Ki_p * integral_p + Kd_p * dangle \
                + Kp_c * error_c + Ki_c * integral_c + Kd_c * speed + np.random.normal(0, 0.05)
            
            if (k % 20) == 0:
                u[k] += np.random.normal(0,1)
                
        x[:,k] = obs
        

        # Clip the control input to the action space limits
        u[k] = np.clip(u[k], env.action_space.low[0], env.action_space.high[0])
        obs, reward, done, truncated, info = env.step([u[k]])
        
        if show_animation:
            time.sleep(0.05)
            
        if done or truncated or (k + 1)  % 100 == 0:
            seed += 1
            if np.abs(obs[1]) > 0.1:
                failed += [k]
            obs, info = env.reset(seed = seed)
            reset_time += [k]
            stabilise = 0
            integral_c = 0


env.close()

y = x
y_ref = y[:, 0:reset_time[0] + 1]
u_ref = u[0:reset_time[0] + 1]

In [None]:
ref = np.vstack((u_ref.T, y_ref))
swingup_dataset = TrajectoryDataSet(
    [], "swingup_data"
)
last_t=-1
for t in reset_time:
    # if t in failed:
    #     last_t = t
    #     continue

    # Extract sections for y and u
    y_section = y[:, last_t+1:t+1].T
    y_section_featurized = np.array(
        [
            y_section[:,0],
            np.sin(y_section[:,1]),
            np.cos(y_section[:,1]),
            y_section[:,2],
            y_section[:,3],
        ]
    ).T
    u_section = u[last_t+1:t+1].reshape(-1,1)
    if y_section.size == 0:
        continue
    last_t = t
    numSteps_section = len(u_section)
    swingup_dataset.dataset.append(
        TrajectoryData(y_section_featurized.copy(), u_section.copy())
    )

def setup_DeePC(trajectory_data: TrajectoryDataSet) -> DeePCControllerArgs:
    """"Set up the Data Driven Controller

    NOTE: Paramters in here can be adapted to tune the desired performance
    """
    p = 5 # sensor state size
    m = 1 # input size
    n = 10 # "estimated" system size
    T_past = 2
    T_fut = 30
    T_hankel= (m + 1) * (T_past + T_fut) + n - 1

    # x y w1 w2
    Q = np.diag([5, 0, 10000, 0.1, 0.1])
    R = np.diag([1])
    g_regularization_1 = 50000.0
    g_regularization_pi =  0.0
    slack_cost = 5000000
    controller_costs = DeePCCost(Q, R, slack_cost, g_regularization_1, g_regularization_pi)

    A_u = None
    b_u = None
    A_y = None
    b_y = None

    A_y = np.array(
        [
            [1, 0, 0, 0, 0],
            [-1, 0, 0, 0, 0],
        ]
    )

    b_y = np.array(
        [
            1,
            1,
        ]
    )

    A_u = np.array(
        [
            [1],
            [-1],
        ]
    )

    b_u = np.array(
        [
            3,
            3,
        ]
    )

    controller_constraints = DeePCConstraints(A_u, b_u, A_y, b_y)

    return DeePCControllerArgs(
        trajectory_data,
        DeePCDims(
            T_past,
            T_fut,
            p,
            m,
        ),
        T_hankel,
        controller_costs,
        controller_constraints,
        [0],
        False,
    )


def selector_cb(adj_H_u, adj_H_y):
    return 1*adj_H_u, np.tile([1, 1, 1, 1, 1], int(adj_H_y.shape[0]/5)).reshape(-1,1) * adj_H_y

controller_args = setup_DeePC(swingup_dataset)
select_deepc = SelectDeePC(
    controller_args,
    LkSelector(
        1,
        controller_args.deepc_dims,
        custom_callback=selector_cb,
        forgetting_factor=0.8
    ),
    num_hankel_cols=controller_args.deepc_dims.T_fut*controller_args.deepc_dims.m + 10,
    n_iter=1,
    debug=False,
)
deepc = DataDrivenPredictiveController.create_from_data(controller_args, True)

def run_experiment(controller, name, accumulator):
    env = gym.make("InvertedPendulum-v4-swingup", render_mode="rgb_array")
    env = gym.wrappers.RecordVideo(env=env, video_folder='video', name_prefix=name, step_trigger=lambda x: True)

    obs, _ = env.reset(seed=3)
    obs = np.array([obs[0], np.sin(obs[1]), np.cos(obs[1]), obs[2], obs[3]])

    traj = []
    preds = []
    g_vals = []

    T_sim = 150
    for i in range(T_sim):
        ref = [0, 0, 1, 0, 0]
        action = controller.compute_action(obs, ref)
        accumulator.update_cost(obs, ref, action)
        try:
            g_vals.append(controller._deepc._g.value)
        except:
            pass
        obs, _, _, _, _ = env.step(action)
        obs = np.array([obs[0], np.sin(obs[1]), np.cos(obs[1]), obs[2], obs[3]])
        if i in [0, 15, 22, 32, 120]:
            preds.append(controller.get_planned_state_trajectory())
        traj.append(obs)

    env.close()

    traj = np.array(traj)
    return traj, preds, g_vals, accumulator.cost


select_deepc = SelectDeePC(
    controller_args,
    LkSelector(
        1,
        controller_args.deepc_dims,
        custom_callback=selector_cb,
        forgetting_factor=0.8
    ),
    num_hankel_cols=controller_args.deepc_dims.T_fut*controller_args.deepc_dims.m + 10,
    n_iter=1,
    debug=False,
)
traj_select_deepc, preds_select_deepc, g_vals, cost = run_experiment(
    select_deepc, "select-deepc", DeePCCostAccumulator(controller_args.controller_costs)
)
traj_deepc, preds_deepc, _, _ = run_experiment(deepc, "deepc", DeePCCostAccumulator(controller_args.controller_costs))
plt.plot(traj_select_deepc[:, 2])
plt.plot(traj_deepc[:, 2])

for idx, pred in enumerate(preds_select_deepc):
    plt.plot(np.arange(10*idx-2, 10*idx+pred.shape[0]-2, 1), pred[:,2], "--")
for idx, pred in enumerate(preds_deepc):
    plt.plot(np.arange(10*idx-2, 10*idx+pred.shape[0]-2, 1), pred[:,2], "--")

In [None]:
angle = np.arctan2(traj_select_deepc[:,1], traj_select_deepc[:,2])
angle[angle < -1] += 2*np.pi
angle_plot = angle
plt.plot(angle_plot)
plt.plot([0]*angle_plot.size, "black", linestyle="--")

In [None]:
fig, axs = plt.subplots(1, 1, figsize=(3, 2.5))

blue = plt.rcParams["axes.prop_cycle"].by_key()["color"][0]
orange = plt.rcParams["axes.prop_cycle"].by_key()["color"][1]

def generate_pole_end_pos(traj):
    pos_xs = traj[:,0].reshape(-1,1)
    pole_begin = np.concatenate((pos_xs, np.zeros_like(pos_xs)), axis=1)
    # traj slice is (N,)
    # (N, 2, 2)
    sins = np.atleast_2d(traj[:,1]).T
    coss = np.atleast_2d(traj[:,2]).T
    rot_matrices = np.concatenate(
        (
            np.concatenate((coss, -sins), axis=1).reshape(-1, 1, 2),
            np.concatenate((sins, coss), axis= 1).reshape(-1, 1, 2),
        ),
        axis=1,
    )
    rot_matrices = rot_matrices.transpose([0,2,1])
    pole_end_loc = np.array([[0, -1],[1, 0]]) @ rot_matrices @ np.array([0.6, 0])

    pole_end = pole_begin + pole_end_loc
    return pole_begin, pole_end

pole_begin, pole_end = generate_pole_end_pos(traj_select_deepc)
pole_b_d, pole_e_d = generate_pole_end_pos(traj_deepc)
axs.plot(
    np.linspace(-1,1),
    np.zeros(50),
    c="black", 
    linewidth=2,
)
vis_idcs = [0, 15, 22, 32, 120]

for idx, i in enumerate(vis_idcs):
    axs.plot(
    [pole_begin[i,0]-0.03, pole_begin[i,0]+0.03],
    [pole_begin[i,1], pole_begin[i,1]],
    color="black",
    linewidth=7,
    )
    axs.plot(
        [pole_begin[i,0], pole_end[i,0]],
        [pole_begin[i,1], pole_end[i,1]],
        linewidth=6,
        color="black",
    )
    axs.plot(
        [pole_begin[i,0], pole_end[i,0]],
        [pole_begin[i,1], pole_end[i,1]],
        linewidth=5,
        color="grey",
    )

    if idx == 0:
        continue

    _, pole_end_traj = generate_pole_end_pos(preds_select_deepc[idx])
    axs.plot(
        pole_end_traj[:,0],
        pole_end_traj[:,1],
        c=blue,
        linestyle="--",
        zorder=500,
        alpha=0.5
    )


axs.tick_params(
    axis='both',
    which='both',      # both major and minor ticks are affected
    bottom=False,      # ticks along the bottom edge are off
    top=False,         # ticks along the top edge are off
    left=False,
    right=False,
    labelbottom=False, # labels along the bottom edge are off)
    labelleft=False,
)

axs.plot(pole_end[:,0], pole_end[:,1], c=blue, linestyle="-", alpha=1, zorder=50, label="Select-DeePC")
axs.plot(pole_e_d[:,0], pole_e_d[:,1], c=orange, linestyle="-", alpha=1, label="Full Data DeePC")

# axs.set_box_aspect(1)
axs.set_ylim(-0.75,0.75)
axs.set_aspect("equal")
axs.set_xlabel("$x$")
axs.set_ylabel("$y$")
axs.set_title("Closed Loop Trajectory")
fig.legend(ncols=2, bbox_to_anchor=(0.89, 0.07), fontsize="small", handlelength=1.25)
fig.tight_layout()
fig.savefig("figures/inverted_pendulum/inv-pendulum-cl.pdf", bbox_inches='tight')

In [None]:
def generate_g_val_animation():
    import matplotlib.animation as animation

    g_vals_cleaned = []
    for g in g_vals:
        if g is not None:
            g_vals_cleaned.append(g)
    fig, ax = plt.subplots()

    (line2,) = ax.plot(g_vals_cleaned[0], label=f"g values")
    ax.set(ylim=[-15, 15])
    ax.legend()

    def update(frame):
        # for each frame, update the data stored on each artist.

        # update the line plot:
        line2.set_ydata(g_vals_cleaned[frame])
        return line2

    ani = animation.FuncAnimation(fig=fig, func=update, frames=40, interval=30)
    ani.save("g_vals.gif")
    plt.show()

def generate_g_val_plot():
    num_g_nonzero = []
    for g_vec in g_vals:
        # due to solver failure
        if g_vec is None:
            continue
        num_g_nonzero.append(np.sum(np.abs(g_vec) > 1e-1))

    fig, ax = plt.subplots(1, 1, figsize=(PAPER_COL_WIDTH_IN, 1.5))
    ax.plot(num_g_nonzero)
    ax.set_title("Number of Linearly Combined Trajectories")
    ax.set_ylabel("\# Nonzero Elements")
    ax.set_xlabel("Simulation Timestep")
    fig.savefig("figures/inverted_pendulum/num_nonzero_elem.pdf", bbox_inches="tight")

generate_g_val_plot()

---

In [None]:
import itertools

# Sweep parameters
costs = []
for lambda_1, lambda_pi in tqdm.tqdm(itertools.product(np.logspace(3, 6, base=10, num=5), np.logspace(-5, 6, base=10, num=5)), total=25):
    print(f"running for l1: {lambda_1}, lpi: {lambda_pi}")
    controller_args.controller_costs.regularizer_cost_g_1 = lambda_1
    controller_args.controller_costs.regularizer_cost_g_pi = lambda_pi
    select_deepc = SelectDeePC(
        controller_args,
        LkSelector(
            1,
            controller_args.deepc_dims,
            custom_callback=selector_cb,
            forgetting_factor=0.8
        ),
        num_hankel_cols=controller_args.deepc_dims.T_fut*controller_args.deepc_dims.m + 30,
        n_iter=1,
        debug=False,
    )
    traj_select_deepc, preds_select_deepc, g_vals, cost = run_experiment(
        select_deepc,
        f"select-deepc-sweep-{int(lambda_1)}-{int(lambda_pi)}",
        DeePCCostAccumulator(controller_args.controller_costs)
    )
    print(cost)
    costs.append(cost)

costs_val = [val["cost"] for val in costs]
costs_val = np.array(costs_val)
# costs_val = np.log(costs_val)
fig, ax = plt.subplots(1, 1)
y, x = np.meshgrid(
    np.logspace(-5, 6, base=10, num=5), np.logspace(3, 6, base=10, num=5), indexing="ij"
)
im = ax.pcolormesh(
    x,
    y,
    costs_val.reshape(5, 5),
    # shading="gouraud",
)
ax.set_xscale("log")
ax.set_yscale("log")
ax.set_xlim(1e3, 1e6)
ax.set_ylim(1e-5, 1e6)
plt.colorbar(im, label=r"cost")
plt.ylabel(r"$\lambda_\Pi$")
plt.xlabel(r"$\lambda_1$")