In [None]:
# prompt: load folder from google drive folder layered-ac-main

import os

# Replace 'layered-ac-main' with the actual folder name in your Google Drive
folder_path = '/content/drive/My Drive/layered-ac-main'
from google.colab import drive
drive.mount('/content/drive/')
# Check if the folder exists
if os.path.exists(folder_path):
  # List the contents of the folder
  print(f"Contents of '{folder_path}':")
  for item in os.listdir(folder_path):
    print(item)
else:
  print(f"Folder '{folder_path}' not found.")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt

## linear with lqr tracking contoller

In [None]:
from env.dynamics.linear import LinearDynamics
from controller.ilqrtrackingctrl import ILQRtrackingcontroller
from numlp import NuMLP

In [None]:
# Set device to GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

In [None]:
# Define system dynamics (Double Integrator Model)
dt = 0.1
A = np.array([[1, 0, dt, 0],
              [0, 1, 0, dt],
              [0, 0, 1, 0],
              [0, 0, 0, 1]])

B = np.array([[0, 0],
              [0, 0],
              [dt, 0],
              [0, dt]])

dynamics = LinearDynamics(A, B)

In [None]:
# Define cost matrices for LQ Tracking Controller
Q = np.diag(np.array([1, 1, 0, 0]))
Qf = Q * 10
R = np.eye(2) * 0.1
controller = ILQRtrackingcontroller(dynamics, Q, R, Qf, dt=dt, device=device)

In [None]:
# Function to generate random reference trajectories
def generate_reference_trajectories(batch_size, T, step_length= 0.1):
    """ Generate batch of reference trajectories
    Returns:
    r_traj: (batch_size, T, Nx)
    """
    Nx = 4  # [x, y, vx, vy]
    r_traj = torch.zeros(batch_size, T, Nx, device=device)
    # Random initial positions
    x0 = torch.rand(batch_size, 2, device=device) * 1.0  # positions in [0,1)
    # Random initial velocities
    v0 = torch.zeros(batch_size, 2, device=device)
    # Generate random increments
    for i in range(batch_size):
        pos = x0[i]
        vel = v0[i]
        r_traj[i, 0, :2] = pos
        r_traj[i, 0, 2:] = vel 
        for t in range(1, T):
            # Random increments/decrements within step_length
            delta_pos = (torch.rand(2, device=device) - 0.5) * 2 * step_length
            pos = pos + delta_pos
            r_traj[i, t, :2] = pos
            r_traj[i, t, 2:] = vel  # Keep velocity zero for simplicity
    return r_traj

In [None]:
def generate_training_data(batch_size, T, dynamics, controller):
    """ Generate training data
    Returns:
    x_t: (batch_size, Nx)
    delta_r: (batch_size, Nx)
    """
    # Generate reference trajectories
    r_traj = generate_reference_trajectories(batch_size, T + 1)
    # Initial states x0 = zeros
    x0 = torch.zeros(batch_size, dynamics.Nx, device=device)

    # Reference of control output is setting to be zero defaultly
    u_ref_traj = torch.zeros(batch_size, T, Nu, device=device)
    u0 = torch.zeros(batch_size, T, Nu, device=device)
    
    # Use controller to get control inputs
    u_exac, x_exac = controller.solve(x0, u0, r_traj, u_ref_traj) 

    # Compute delta_r = r_t - x_t
    delta_r = r_traj - x_exac

    return r_traj, delta_r

In [None]:
T = 20
# Define neural network parameters
input_size = dynamics.Nx * (T + 1)  # Flattened reference trajectory
output_size = dynamics.Nx * (T + 1) # Flattened delta_r over the T
hidden_size = 128  # You can adjust this based on your requirements

# Initialize the neural network model
model = NuMLP(input_size, output_size, hidden_size=hidden_size).to(device)

# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

In [None]:
# Initialize the helper class
helper = NuMLPHelper(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    dynamics=dynamics,
    controller=controller,
    T=T,
    device=device
)

# Training parameters
num_epochs = 50
batch_size = 64

# Train the model
helper.train_model(
    num_epochs=num_epochs,
    batch_size=batch_size,
    generate_training_data_func=generate_training_data,
    checkpoint_interval=10  # Save model every 10 epochs
)

# Validate the model
helper.validate_model(
    batch_size=batch_size,
    generate_training_data_func=generate_training_data
)


## Unicycle with ? tracking contoller

In [None]:
from env.dynamics.unicycle import UnicycleDynamics
from controller.lqtrackingController import LQTrackingController

In [None]:
# Set device to GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Parameters
batch_size = 64
sequence_length = 20  # Time steps
dt = 0.1  # Time step size

def sample_reference_points(batch_size, sequence_length):
    # Random initial positions x and y
    x0 = torch.randn(batch_size, 1).to(device)
    y0 = torch.randn(batch_size, 1).to(device)

    # Random increments/decrements within step length
    dx = torch.randn(batch_size, sequence_length) * dt
    dy = torch.randn(batch_size, sequence_length) * dt

    # Cumulative sum to get positions over time
    x_refs = x0 + torch.cumsum(dx, dim=1)
    y_refs = y0 + torch.cumsum(dy, dim=1)
    
    # Stack x and y references to create reference trajectory
    r_refs = torch.stack([x_refs, y_refs], dim=-1)  # Shape: (batch_size, sequence_length, 2)
    
    return r_refs  # Shape: (batch_size, sequence_length, 2)

In [None]:
def get_exact_trajectories(r_refs, x_0 = torch.zeros(1, dynamics.Nx)):
    
    batch_size, sequence_length, _ = r_refs.shape
    x_trajs = []
    u_trajs = []
    
    # Initialize the state tensor
    x0 = torch.zeros(batch_size, dynamics.Nx).to(device)  # Shape: (batch_size, Nx)
    
    # Concatenate x0 and r_refs along time dimension
    x0_r_refs = torch.cat([x0.unsqueeze(1), r_refs], dim=1)  # Shape: (batch_size, T + 1, Nx)
    
    # Flatten along the time dimension
    x0_r_refs_flat = x0_r_refs.reshape(batch_size, -1)  # Shape: (batch_size, Nx * (T + 1))
    
    # Get control inputs from the controller
    u_trajs_flat = controller.control(x0_r_refs_flat)  # Should return control inputs over the T
    u_trajs = u_trajs_flat.reshape(batch_size, sequence_length, -1)  # Shape: (batch_size, sequence_length, Nu)
    
    # Simulate the dynamics over the T
    x_trajs = [x0.unsqueeze(1)]  # List to collect state trajectories, start with x0
    x_curr = x0
    for t in range(sequence_length):
        u_curr = u_trajs[:, t, :]  # Shape: (batch_size, Nu)
        x_next = dynamics.step(x_curr, u_curr)  # Shape: (batch_size, Nx)
        x_trajs.append(x_next.unsqueeze(1))  # Add the next state
        x_curr = x_next  # Update current state
    
    # Concatenate along the time dimension, excluding the initial state
    x_trajs = torch.cat(x_trajs[1:], dim=1)  # Shape: (batch_size, sequence_length, Nx)
    
    return x_trajs  # Shape: (batch_size, sequence_length, Nx)


In [None]:
class NuNetwork(nn.Module):
    def __init__(self, input_size, output_size):
        super(NuNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, output_size)
        
    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.relu(out)
        out = self.fc3(out)
        return out


In [None]:
# Flatten the trajectories for the network
def prepare_data(x_trajs, r_refs):
    # x_trajs and r_refs shape: (batch_size, sequence_length, Nx)
    x_inputs = x_trajs.reshape(-1, dynamics.Nx)
    nu_targets = (r_refs.reshape(-1, dynamics.Nx) - x_inputs)
    return x_inputs, nu_targets


In [None]:
# Assuming the classes are implemented and imported
dynamics = UnicycleDynamics(dt=dt)


In [None]:
controller = UnicyclePDController(dt=dt)

In [None]:
# Initial position assume start from (0, 0)

In [None]:
    # Initialize the helper class
    helper = NuMLPHelper(
        model=model,
        criterion=criterion,
        optimizer=optimizer,
        dynamics=dynamics,
        controller=controller,
        horizon=horizon,
        device=device
    )

    # Training parameters
    num_epochs = 50
    batch_size = 64

    # Train the model
    helper.train_model(
        num_epochs=num_epochs,
        batch_size=batch_size,
        generate_training_data_func=generate_training_data,
        checkpoint_interval=10  # Save model every 10 epochs
    )

    # Validate the model
    helper.validate_model(
        batch_size=batch_size,
        generate_training_data_func=generate_training_data
    )