In [None]:
import sys
sys.path.append("..")

from 
import numpy as np
import time
import matplotlib.pyplot as plt
from time import perf_counter
from scipy.integrate import odeint
import os
from pathlib import Path

plt.style.use(["science", "grid"])

In [None]:
import tensorflow as tf

# Inverted pendulum
Try MPPI controller with neural network dynamics model on an inverted pendulum.

In [None]:
from systems.dynamical_systems import Pendulum

In [None]:
# Get neural network model loaded
model_path = Path(os.getcwd()).parent / "neural_networks" / "pendulum" / "models"
availible_models = [x for x in model_path.iterdir() if x.is_dir()]

for (i, model_name) in enumerate(availible_models):
    print("[{}]  {}".format(i, model_name))

idx = int(input("Select which model number to load: "))
nn_model = tf.keras.models.load_model(availible_models[idx])

In [None]:
dt = 1 / 10

pend_env = Pendulum(m=1, l=1, b=0.1, dt=1 / 10)

def nn_next_state(nn_input):
    states, controls = nn_input[:, :pend_env.nx], nn_input[:, pend_env.nx:]
    x_ddots = nn_model(nn_input)
    
    x_dots = states[:, 1::2]
    
    next_states = states
    next_states[:, ::2] += x_dots * dt + 1/2 * x_ddots * (dt ** 2)
    next_states[:, 1::2] += x_ddots * dt
    
    return next_states

## Neural Network vs. Ground Truth (Natural Dynamics)
Test the neural network dynamics model with the natural dynamics of the system before using it in the controller

In [None]:
# Test the neural network dynamics on the system's natural dynamics

initial_state = np.array([np.radians(80), 0])

nn_current_state = initial_state
truth_current_state = initial_state
action = np.array([0])

simulation_length = 5
n_steps = int((1 / dt) * simulation_length)

# Simulate the pendulum
nn_states = np.empty((n_steps, 2))
truth_states = np.empty((n_steps, 2))

for i in range(n_steps):
    nn_states[i] = nn_current_state
    truth_states[i] = truth_current_state
    
    nn_input = np.hstack((nn_current_state, action)).reshape(1, -1)
    nn_current_state = nn_next_state(nn_input)[0]
    
    truth_current_state = pend_env.simulator(truth_current_state, action)

In [None]:
# Plot the simulated response

time = np.linspace(0, simulation_length, n_steps)

fig, ax = plt.subplots(1, 1, figsize=(6, 3))
fig.suptitle("Inverted Pendulum Simulated Response")

ax.plot(time, nn_states[:, 0], label="NN theta")
ax.plot(time, truth_states[:, 0], label="True theta")
# ax.plot(time, truth_states[:, 1], label="True omega")


ax.set(xlabel="Time (s)", ylabel="Theta (rad)")
# ax.set_ylim([-np.pi-0.3, np.pi+0.3])
ax.legend()

fig.savefig("nn_vs_true_theta_iv.png")

## MPPI Swing Up Task
Test the MPPI controller on a swing-up task using the neural network dynamics model for rollouts.

In [None]:
simulation_length = 15  # s
n_steps = int((1 / dt) * simulation_length)

# Create lists to store control sequence and state sequences for MPPI runs
controls, states = np.empty((n_steps, inv_pend_env.nu)), np.empty((n_steps, inv_pend_env.nx))

# Begin simulation
current_state = inv_pend_env.initial_state
start_time = perf_counter()

for i in range(0, n_steps):
    action = inv_pend_env.controller.step(current_state)
    current_state = inv_pend_env.simulator(current_state, action, measurement_noise=False)
    current_state[0] %= (2 * np.pi)
    
    states[i] = current_state
    controls[i] = action

print("Elapsed time: {:.5f} s".format(perf_counter() - start_time))

In [None]:
# Plot the results
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

fig.suptitle("MPPI Inverted Pendulum Swing Up Task (NN Dynamics)")

time = np.linspace(0, simulation_length, n_steps)

ax1.set_title("Pendulum Angle vs. Time")
ax1.plot(time, states[:, 0], label="Theta")
ax1.plot(time, np.repeat(inv_pend_env.desired_state[0], n_steps), label="Desired theta")
ax1.set(xlabel="Time (s)", ylabel="Theta (rad)")
# ax1.set_ylim([-np.pi-0.3, np.pi+0.3])
ax1.legend()

ax2.set_title("Input Torque vs. Time")
ax2.plot(time, controls[:, 0], label="Input torque")
ax2.set(xlabel="Time (s)", ylabel="Torque (Nm)")

fig.savefig("swingup_task.png")

## Online Learning Example
Let's change the inertial properties of the inverted pendulum system and continuously train the dynamics model on newly collected data.

In [None]:
dt = 1 / 10

inv_pend_env = InvertedPendulum(
    initial_state=np.array([0, 0]),
    m=0.5,     # kg
    l=0.5,     # m
    b=0.1,   # ?
    g=-9.81, # m / s^2
    dt=dt    # s
)

inv_pend_env.desired_state = np.array([np.radians(180), 0])

In [None]:
# Set up simulation parameters
simulation_length = 24  # s
n_steps = int((1 / dt) * simulation_length)

# Create lists to store control sequence and state sequences for MPPI runs
controls, states = np.empty((n_steps, inv_pend_env.nu)), np.empty((n_steps, inv_pend_env.nx))

# Set up online learning parameters
online_lr = 0.01
retrain_iters = 1

nn_model = tf.keras.models.load_model(availible_models[idx])

def train_online(x, y):
    nn_model.fit(
        x, y, verbose=0,
        epochs=1, batch_size=len(x)
    )

def init_new_data():
    new_examples = np.empty((retrain_iters, inv_pend_env.nx + inv_pend_env.nu))
    new_labels = np.empty((retrain_iters, inv_pend_env.nx // 2))
    
    return new_examples, new_labels

def label_from_states(prev_state, curr_state):
    acceleration = (curr_state[inv_pend_env.nx // 2:] - prev_state[inv_pend_env.nx // 2:]) / inv_pend_env.dt
    return acceleration

new_examples, new_labels = init_new_data()
tf.keras.backend.set_value(nn_model.optimizer.learning_rate, online_lr)

# Begin simulation
current_state = inv_pend_env.initial_state
start_time = perf_counter()

for i in range(0, n_steps):
    prev_state = current_state
    action = inv_pend_env.controller.step(current_state)
    current_state = inv_pend_env.simulator(current_state, action, measurement_noise=False)
    current_state[0] %= (2 * np.pi)
    
    states[i] = current_state
    controls[i] = action
    
    new_examples[i % retrain_iters] = np.hstack((prev_state, action))
    new_labels[i % retrain_iters] = label_from_states(prev_state, current_state)
    
    # Retrain if current time step is a retrain step
    if i % retrain_iters == 0 and i > 0:
        train_online(new_examples, new_labels)
        new_examples, new_labels = init_new_data()
        

print("Elapsed time: {:.5f} s".format(perf_counter() - start_time))

In [None]:
# Plot the results
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

fig.suptitle("MPPI Inverted Pendulum Swing Up Task (Online Learning NN Dynamics)")

time = np.linspace(0, simulation_length, n_steps)

ax1.set_title("Pendulum Angle vs. Time")
ax1.plot(time, states[:, 0], label="Theta")
ax1.plot(time, np.repeat(inv_pend_env.desired_state[0], n_steps), label="Desired theta")
ax1.set(xlabel="Time (s)", ylabel="Theta (rad)")
ax1.legend()

ax2.set_title("Input Torque vs. Time")
ax2.plot(time, controls[:, 0], label="Input torque")
ax2.set(xlabel="Time (s)", ylabel="Torque (Nm)")

fig.savefig("swingup_task_oll.png")