In [16]:
!git clone https://github.com/Damowerko/ese2000-dynamical-systems.git
import sys
sys.path.append('./ese2000-dynamical-systems/')

fatal: destination path 'ese2000-dynamical-systems' already exists and is not an empty directory.


In [17]:
from pathlib import Path
import matplotlib.style
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import torch
from tqdm.notebook import trange
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.distributions as distributions
import torch.nn.init as init

from ese2000_dynamical.config import Config
from ese2000_dynamical.track import load_track, Track
from ese2000_dynamical.simulator import Simulator, dynamics_ca

# matplotlib settings
matplotlib.style.use("seaborn-v0_8-colorblind")
plt.rcParams["figure.dpi"] = 150

data_path = Path("./ese2000-dynamical-systems/data")
figure_path = Path("figures")

device = "cpu"
# if torch.cuda.is_available():
#     device = "cuda"
# elif torch.backends.mps.is_available():
#     device = "mps"

In [18]:
sim = Simulator()

In [19]:
x_expert = np.load(data_path / "states.npy")
u_expert = np.load(data_path / "inputs.npy")
track = load_track(data_path / "track.npz")

# Choose a trajectory
x_trajectory = torch.tensor(x_expert[5]).float().to(device)
p = x_trajectory[:, :2]
v = x_trajectory[:, 2:]
a = torch.tensor(u_expert[5]).float().to(device)

# Load the pre-trained model from Lab 5A
# We've given you a parameterization but you can also save the model from your previous lab and load it here.
A = torch.Tensor(np.load("./weights/A.npy")).to(device)
B = torch.Tensor(np.load("./weights/B.npy")).to(device)
timesteps = x_trajectory.shape[0]

In [20]:
def plot_vs_expert(x, x_label: str, x_expert, track: Track):
    """
    Plot a given trajectory and expert trajectory on the same plot.

    Args:
        x: The trajectory to plot.
        x_label: The label for the trajectory.
        x_expert: The expert trajectory.
        track: The track to plot.
    """

    plt.figure()
    track.plot()
    plt.grid(True)
    plt.xlabel("x (m)")
    plt.ylabel("y (m)")
    plt.plot(x_expert[:, 0], x_expert[:, 1], "--", label="Expert")
    plt.plot(x[:, 0], x[:, 1], "-", label=x_label)
    plt.legend(loc="upper right", framealpha=1.0)

In [21]:
def reward_function(state, target):
    return torch.sum(- 0.5 * (state - target) ** 2)

In [22]:
def rollout(A, B, x_trajectory, model_policy, use_simulator=False):
    predicted_trajectory = []
    actions = []
    rewards = []
    total_reward = 0

    for t in range(1, timesteps):
        # using x_trajectory[0] compute trajectory using policy
        # compute q for each state, acceleration pair
        # (subtract from real q estimation)**2 * 0.5
        # sum all to get loss
        if t == 1:
            state = x_trajectory[0]
        else:
            state = predicted_trajectory[-1]
        action = model_policy(state)
        actions.append(action)
        if use_simulator:
            next_state = sim.step(state.detach(), action.detach()).float()
        else:
            next_state = A @ state + B @ action
        predicted_trajectory.append(next_state)
        rewards.append(reward_function(predicted_trajectory[-1], x_trajectory[-1]))
        total_reward += rewards[-1]
    predicted_trajectory = torch.stack(predicted_trajectory)
    actions = torch.stack(actions)
    rewards = torch.stack(rewards)
    

    return predicted_trajectory, actions, rewards

In [23]:
class QHat(nn.Module):
    def __init__(self):
        super(QHat, self).__init__()
        self.fc1 = nn.Linear(6, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 1)
        init.uniform_(self.fc1.weight, a=-0.5, b=0.5)
        init.uniform_(self.fc2.weight, a=-0.5, b=0.5)
        init.uniform_(self.fc3.weight, a=-0.5, b=0.5)

    def forward(self, state, action):
        x = torch.cat([state, action], dim=-1)
        x = F.tanh(self.fc1(x))
        x = F.tanh(self.fc2(x))
        q_value = self.fc3(x)
        return q_value

In [24]:
class Policy(nn.Module):
    def __init__(self):
        super(Policy, self).__init__()
        self.fc1 = nn.Linear(4, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 2)
        init.uniform_(self.fc1.weight, a=-0.05, b=0.05)
        init.uniform_(self.fc2.weight, a=-0.05, b=0.05)
        init.uniform_(self.fc3.weight, a=-0.05, b=0.05)

    def forward(self, state):
        x = F.tanh(self.fc1(state))
        x = F.tanh(self.fc2(x))
        x = 5 * F.tanh(self.fc3(x))
        return x


In [38]:
model_Q = QHat().to(device)
optimizer_Q = torch.optim.Adam(model_Q.parameters(), lr=1e-3)
model_policy = Policy().to(device)
optimizer_policy = torch.optim.Adam(model_policy.parameters(), lr=0.001)

In [None]:
torch.set_printoptions(precision=3)
epochs = 100
gamma = 0.5
q_train_iterations = 500
policy_train_iterations = 500
compute_Q_loss = nn.MSELoss()

for epoch in range(epochs):
    # 1) Sample trajectory
    model_policy.eval()
    with torch.no_grad():
        predicted_trajectory, actions, rewards = rollout(A, B, x_trajectory, model_policy)
        total_reward = torch.sum(rewards)

    # 2) Update Q model until residual is small
    for i in range(q_train_iterations):

        optimizer_Q.zero_grad()

        Q_hat = model_Q(predicted_trajectory, actions).squeeze()

        with torch.no_grad():
            Q = rewards + gamma * Q_hat

        loss_Q = compute_Q_loss(Q, Q_hat)
        loss_Q.backward()
        
        optimizer_Q.step()

    model_Q.eval()
    model_policy.train()
    for i in range(policy_train_iterations):
        optimizer_policy.zero_grad()

        predicted_trajectory, actions, rewards = rollout(A, B, x_trajectory, model_policy)

        Q = model_Q(predicted_trajectory, actions)

        loss_policy = -1 * torch.sum(Q)
        loss_policy.backward()
        optimizer_policy.step()

    print(f'Iteration {epoch+1}\t Total Reward: {total_reward}\t Policy Loss: {loss_policy}\t Q Loss: {loss_Q}')

Iteration 1	 Total Reward: -135859.59375	 Policy Loss: -7891.02685546875	 Q Loss: 616133.125


In [None]:
model_policy.eval()
predicted_trajectory, actions, rewards = rollout(None, None, x_trajectory, model_policy, use_simulator=True)

print("Total Reward: ", torch.sum(rewards))

In [None]:
plot_vs_expert(predicted_trajectory.detach().cpu().numpy(), 'RL', x_trajectory.detach().cpu().numpy() , track)