In [17]:
import numpy as np
import pickle

import torch
from torch import nn
import torch.nn.functional as F

import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format='retina'

In [None]:
data_path = 'parsed_data_no_timestamps.pkl'
with open(data_path, "rb") as file:
  data = pickle.load(file)
print(f"Shape of full dataset: {data.shape}")

X_train = data[:int(0.8*len(data)), :80, :]
X_val = data[int(0.8*len(data)): , :80, :]

y_train = data[:int(0.8*len(data)), 80:, :]
y_val = data[int(0.8*len(data)): , 80:, :]

print(X_train.shape, X_val.shape, y_train.shape, y_val.shape)

In [19]:
import torch
import torch.nn as nn

class TrajectoryLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_features=2, output_timesteps=10, num_layers=1):  # Set default n to 10
        super(TrajectoryLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_features * output_timesteps) 
        self.output_features = output_features
        self.output_timesteps = output_timesteps

    def forward(self, x):
        # x: (batch_size, seq_length, input_size)
        _, (hn, _) = self.lstm(x)
        # hn: (num_layers, batch_size, hidden_size)
        out = self.fc(hn[-1])
        return out.view(-1, self.output_timesteps, self.output_features)  # Reshape to (batch_size, 10, input_size)

In [None]:
import torch
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from loss_functions import get_loss_function, TrajectoryLoss

# Initialize lists to store loss history
train_losses = []
val_losses = []

def train_model(model, train_loader, val_loader, num_epochs=10, initial_lr=0.001, T_max=10, loss_fn=None):
    optimizer = optim.Adam(model.parameters(), lr=initial_lr)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=T_max)

    best_val_loss = float('inf')  # Initialize the best validation loss
    model_save_path = "best_model.pth"  # Save path for the best model

    for epoch in range(num_epochs):
        # Training loop
        model.train()
        epoch_train_loss = 0
        for batch_x, batch_y in train_loader:
            batch_x, batch_y = batch_x.to("cuda"), batch_y.to("cuda")

            optimizer.zero_grad()
            predictions = model(batch_x)
            loss = loss_fn(predictions, batch_y, batch_x[:, 0, :2])
            loss.backward()

            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()

            epoch_train_loss += loss.item()

        scheduler.step()

        avg_train_loss = epoch_train_loss / len(train_loader)
        train_losses.append(avg_train_loss)
        print(f'Epoch [{epoch+1}/{num_epochs}], Training Loss: {avg_train_loss:.4f}, LR: {scheduler.get_last_lr()[0]:.6f}')

        # Validation loop
        model.eval()
        epoch_val_loss = 0
        with torch.no_grad():
            for batch_x, batch_y in val_loader:
                batch_x, batch_y = batch_x.to("cuda"), batch_y.to("cuda")
                predictions = model(batch_x)
                loss = loss_fn(predictions, batch_y, batch_x[:, 0, :2])
                epoch_val_loss += loss.item()

            avg_val_loss = epoch_val_loss / len(val_loader)
            val_losses.append(avg_val_loss)
            print(f'Validation Loss: {avg_val_loss:.4f}')

            # Save the model if it has the best validation loss
            if avg_val_loss < best_val_loss:
                best_val_loss = avg_val_loss
                torch.save(model.state_dict(), model_save_path)
                print(f"Saved best model with validation loss: {best_val_loss:.4f}")

    print(f"Training complete. Best validation loss: {best_val_loss:.4f}. Model saved to {model_save_path}.")


train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32))
val_dataset = TensorDataset(torch.tensor(X_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.float32))
# Mess with batch size here
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)


input_size = X_train.shape[2]
hidden_size = 64
model = TrajectoryLSTM(input_size, hidden_size, output_features=2, output_timesteps=10, num_layers=4).to("cuda")

loss_weights = { # Update weights of different loss terms 
    "position_loss": 2.0,
    "velocity_loss": 0.5,
    "smoothness_loss": 0.2,
    "terminal_loss": 2.0,
    "delta_loss": 1.0
}
# Customize loss function here (Everything is True/False except time_weighting_scheme)
loss_fn = get_loss_function(
    "trajectory_loss",
    use_velocity_loss=True,
    use_smoothness_loss=True,
    use_terminal_loss=True,
    time_weighting_scheme='Exponential',  #None, Linear, Exponential
    use_delta_loss=True,
    prediction_horizon=10, # change to match predicted timesteps (used for time)
    loss_weights=loss_weights # defaults to ALL = 1.0 if not included
) # Add more lines as new loss functions are added

# mess with epochs, initial_lr and T_max here
train_model(model, train_loader, val_loader, num_epochs=100, initial_lr=1e-2, T_max=100, loss_fn=loss_fn)


In [None]:
def plot_loss_curves(train_losses, val_losses):
    plt.figure(figsize=(8, 4))
    plt.plot(train_losses, label='Training Loss', color='blue')
    plt.plot(val_losses, label='Validation Loss', color='orange')
    plt.title('Training and Validation Loss Curves')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid()
    plt.show()

plot_loss_curves(train_losses, val_losses)

In [None]:
from metrics import (
    compute_predicted_positions,
    compute_mean_ade,
    compute_fde,
    compute_directional_accuracy,
    compute_linear_wade,
    compute_exponential_wade
) # add new imports here as new metrics created

x = torch.tensor(X_val, dtype=torch.float32).to("cuda")
y = torch.tensor(y_val, dtype=torch.float32).to("cuda")
# Compute metrics
with torch.no_grad():
    pred_position = compute_predicted_positions(model, x, y)

    mean_ade = compute_mean_ade(pred_position, y)
    fde = compute_fde(pred_position, y)
    directional_accuracy = compute_directional_accuracy(pred_position, y)
    l_wade = compute_linear_wade(pred_position, y)
    alpha = 0.5 #Smaller alpha -> more uniform weights, larger alpha -> more emphasis on earlier timesteps (defaults to 0.1)
    e_wade = compute_exponential_wade(pred_position, y, alpha=alpha) 

print(f"Mean Average Displacement Error (MADE): {mean_ade}")
print(f"Final Displacement Error (FDE): {fde}")
print(f"Directional Accuracy: {directional_accuracy}")
print(f"Linear Weighted Average Displacement Error (LWADE): {l_wade}")
print(f"Exponential Weighted Average Displacement Error (EWADE) with alpha={alpha}: {e_wade}")

In [23]:
# Choose a random example from the validation dataset
example_index = 400 # np.random.randint(0, len(X_val))
past_traj = X_val[example_index]
future_traj = y_val[example_index]

# Get the predicted future trajectory
with torch.no_grad():
  model.eval()
  X = torch.tensor(past_traj[np.newaxis, :, :], dtype=torch.float32).to("cuda")
  predicted_vels = model(X)

  predicted_vels = predicted_vels[0]
  cum_disp = torch.cumsum(predicted_vels * 0.1, dim=1)
  future_pred = cum_disp + X[0, -1, :2]
  future_pred = future_pred.cpu().numpy()

# print(past_traj[:, :2])

In [None]:
# Plot the past trajectory, actual future trajectory, and predicted future trajectory
plt.figure(figsize=(6, 6))
plt.plot(past_traj[:, 0], past_traj[:, 1], label='Past Trajectory', color='blue')
plt.plot(future_traj[:, 0], future_traj[:, 1], label='Actual Future Trajectory', color='red')
plt.plot(future_pred[:, 0], future_pred[:, 1], label='Predicted Future Trajectory', color='green')
plt.xlabel('X Coordinate')
plt.ylabel('Y Coordinate')
plt.title(f"Predicted vs. Actual Future Trajectory (Example {example_index})")
plt.axis('equal')
plt.legend()
plt.grid()
plt.show()