In [1]:
import sys
import os
import numpy as np
import numpy.linalg as la 
import torch
from tqdm import tqdm

from ControlRF import GPController, ADPKernel, ADPRandomFeatures, ADKernel, ADRandomFeatures, VanillaKernel, VanillaRandomFeatures
from ControlRF.plots import *
from ControlRF.util import data_gen

from core.controllers import QPController, LQRController, FBLinController
from core.dynamics import AffineQuadCLF
from core.systems import InvertedPendulum, DoubleInvertedPendulum



  from .autonotebook import tqdm as notebook_tqdm


In [2]:
l_1 = 0.7
l_2 = 0.7
system = DoubleInvertedPendulum(0.25,0.25,l_1,l_2) 
system_est = DoubleInvertedPendulum(0.5,0.5,0.5,0.5)

Q , R = 10*np.identity(4), np.identity(2)
lyap_est = AffineQuadCLF.build_care(system_est, Q, R) 
alpha = min(la.eigvalsh(Q)) / max(la.eigvalsh(lyap_est.P))

lqr = LQRController.build(system_est, Q, R)
fb_lin = FBLinController(system_est, lqr)
# fb_lin_data = system.simulate(x_0, fb_lin, ts) #1

qp_controller = QPController.build_care(system_est, Q, R)
qp_controller.add_regularizer(fb_lin, 25)
qp_controller.add_static_cost(np.identity(2))
qp_controller.add_stability_constraint(lyap_est, comp=lambda r: alpha * r, slacked=True, coeff = 1e5)
#plot_simulation_dip(system, controller, 'qp_controller', x_0, T=100, num_steps=1000)

In [3]:
def create_data(T, num_steps):  

    initial_x0s = np.mgrid[.1:np.pi:1, -1:1:.4, 0:np.pi:1, -1:1:.4].reshape(4, -1).T
    for i,x_0 in enumerate(initial_x0s):
        if i==0:
            xs, ys, zs = data_gen(system, qp_controller, lyap_est, torch.from_numpy(x_0),  T, num_steps)
        else:
            x, y, z = data_gen(system, qp_controller, lyap_est, torch.from_numpy(x_0),  T, num_steps)
            xs = np.concatenate((xs,x))
            ys = np.concatenate((ys,y))
            zs = np.concatenate((zs,z))

    x, y, z = data_gen(system, qp_controller, lyap_est, torch.FloatTensor([0.1,0,0,0]), 100, 1000)
    xs = np.concatenate((xs,x))
    ys = np.concatenate((ys,y))
    zs = np.concatenate((zs,z))

    np.savez(f'data_{T}_{num_steps}', xs=xs, ys=ys, zs=zs)

In [4]:
#create_data(1, 10)

In [5]:
# data = np.load('data,pi,1,sim:10,100.npz')
data = np.load('data_1_10.npz')
xs = data['xs']
ys = data['ys']
zs = data['zs']
print(xs.shape)

(4599, 4)


In [6]:
ad_rf = ADRandomFeatures(xs, ys, zs, rf_d=1500)
adp_rf = ADPRandomFeatures(xs, ys, zs, rf_d=1500)
ad_kernel = ADKernel(xs, ys, zs)
adp_kernel = ADPKernel(xs, ys, zs)
gps = [ad_kernel, ad_rf, adp_kernel, adp_rf]
controllers = []
for gp in gps:
    print(gp.__name__)
    gp_controller = GPController(system_est, gp)
    gp_controller.add_regularizer(fb_lin, 25)
    gp_controller.add_static_cost(np.identity(2))
    gp_controller.add_stability_constraint(lyap_est, comp=alpha, slacked=True, beta = 1, coeff=1e5)
    controllers.append(gp_controller)
    print(f'training time for {gp.__name__}_gp is: {gp.training_time}')

ad_kernel
training time for ad_kernel_gp is: 7.698430762000001
ad_rf
training time for ad_rf_gp is: 0.9313169340000016
adp_kernel
training time for adp_kernel_gp is: 5.216278757999998
adp_rf
training time for adp_rf_gp is: 4.726491765999999


In [7]:
def plot():
    plot_simulation_dip(system, qp_controller, 'qp_controller', x_0, T=100, num_steps=1000)
    for gp,controller in tqdm(zip(gps,controllers)):
        plot_simulation_dip(system, controller, f'{gp.__name__}_controller', x_0, T=100, num_steps=1000)
        plot_closed_loop_errorbar(system, lyap_est, gp_controller, gp, x_0, cut_off=180) #cut_off=180 is good as well
    plot_pred_errorbar(xs[300:350], ys[300:350], zs[300:350], gps)
    plot_pred_errorbar(xs[950:], ys[950:], zs[950:], gps)

    plot_all_closed_loop_errorbars(system, lyap_est, controllers, gps, x_0, cut_off=180)


In [8]:
%matplotlib inline
import matplotlib.pyplot as plt
import networkx as nx
from matplotlib import animation
from matplotlib.patches import Rectangle
from matplotlib.animation import FuncAnimation, PillowWriter 
plt.style.use('seaborn-whitegrid')

def render(system, controller, controller_name, x_0,
					T=20, num_steps=200):

    xs, us, ts = simulate(system, controller, x_0, T, num_steps)
    dt = T/num_steps

    x_solution = np.zeros(len(xs))
    a_solution = xs[:, 0]
    b_solution = xs[:, 2]

    skip_frames = 5

    x_solution = x_solution[::skip_frames]
    a_solution = a_solution[::skip_frames]
    b_solution = b_solution[::skip_frames]

    frames = len(x_solution)

    j1_x = l_1 * np.sin(a_solution) + x_solution
    j1_y = l_1 * np.cos(a_solution)

    j2_x = l_2 * np.sin(b_solution) + j1_x
    j2_y = l_2 * np.cos(b_solution) + j1_y

    fig = plt.figure()
    ax = fig.add_subplot(111, autoscale_on=False, xlim=(-1, 1), ylim=(-1, 1))
    ax.set_aspect('equal')
    ax.grid()

    patch = ax.add_patch(Rectangle((0, 0), 0, 0, linewidth=1, edgecolor='k', facecolor='r'))

    line, = ax.plot([], [], 'o-', lw=2)
    time_template = 'time: %.1f s'
    time_text = ax.text(0.05, 0.9, '', transform=ax.transAxes)

    cart_width = 0.15
    cart_height = 0.1


    def init():
        line.set_data([], [])
        time_text.set_text('')
        patch.set_xy((-cart_width / 2, -cart_height / 2))
        patch.set_width(cart_width)
        patch.set_height(cart_height)
        return line, time_text


    def animate(i):
        thisx = [x_solution[i], j1_x[i], j2_x[i]]
        thisy = [0, j1_y[i], j2_y[i]]

        line.set_data(thisx, thisy)
        now = i * skip_frames * dt
        time_text.set_text(time_template % now)

        patch.set_x(x_solution[i] - cart_width / 2)
        return line, time_text, patch


    ani = animation.FuncAnimation(fig, animate, frames=frames,
                                  interval=1, blit=True, init_func=init, repeat=False)
    plt.close(fig)
    return ani

  plt.style.use('seaborn-whitegrid')


In [9]:
x_0 = np.array([-1,-.2,.1,.6])

In [10]:
def animate():
    ani = render(system, qp_controller, 'qp_controller', x_0, T=100, num_steps=1000)
    %time ani.save('dip_qp_2.gif', writer=animation.PillowWriter(fps=24))
    for gp,controller in tqdm(zip(gps,controllers)):
        ani = render(system, controller, f'{gp.__name__}_controller', x_0, T=100, num_steps=1000)
        %time ani.save(f'dip_{gp.__name__}_2.gif', writer=animation.PillowWriter(fps=24))

In [11]:
#animate()

CPU times: user 15.9 s, sys: 707 ms, total: 16.6 s
Wall time: 16.8 s


1it [01:36, 96.19s/it]

CPU times: user 14.5 s, sys: 410 ms, total: 14.9 s
Wall time: 14.9 s


2it [02:47, 81.54s/it]

CPU times: user 14.3 s, sys: 365 ms, total: 14.6 s
Wall time: 14.5 s


3it [04:23, 88.02s/it]

CPU times: user 17 s, sys: 851 ms, total: 17.9 s
Wall time: 19.2 s


3it [16:18, 326.19s/it]


KeyboardInterrupt: 

In [None]:
# ani = render(system, qp_controller, 'qp_controller', x_0, T=100, num_steps=1000)
# %time ani.save(f'dip_{gp.__name__}.gif', writer=animation.PillowWriter(fps=24))