In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np

In [None]:
class TrajectoryDataset(Dataset):
    def __init__(self, npz_file):
        data = np.load(npz_file)
        self.past = torch.tensor(data['past'], dtype=torch.float32)
        self.future = torch.tensor(data['future'], dtype=torch.float32)

    def __len__(self):
        return len(self.past)

    def __getitem__(self, idx):
        return self.past[idx], self.future[idx]

train_dataset = TrajectoryDataset("/content/drive/MyDrive/ECE271B project/val/processed_val_pit.npz")
# test_dataset = TrajectoryDataset("/content/drive/MyDrive/ECE271B project/test_obs/processed_test_pit.npz")

BATCH_SIZE = 64
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
# test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=50):
        super().__init__()
        self.pe = nn.Parameter(torch.randn(1, max_len, d_model))

    def forward(self, x):
        return x + self.pe[:, :x.shape[1], :]

In [None]:
class MonteCarloTransformer(nn.Module):
    def __init__(self, input_dim=3, output_dim=2, hidden_dim=256, num_layers=6, num_heads=8, num_samples=10, dropout=0.2):
        super().__init__()

        self.num_samples = num_samples

        self.input_proj = nn.Linear(input_dim, hidden_dim)
        self.pos_encoding = PositionalEncoding(hidden_dim)

        self.encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=num_heads,
                                                        dim_feedforward=hidden_dim * 4, dropout=dropout, activation="gelu", batch_first=True)
        self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)

        self.decoder_layer = nn.TransformerDecoderLayer(d_model=hidden_dim, nhead=num_heads,
                                                        dim_feedforward=hidden_dim * 4, dropout=dropout, activation="gelu", batch_first=True)
        self.decoder = nn.TransformerDecoder(self.decoder_layer, num_layers=num_layers)

        self.latent_fc = nn.Linear(hidden_dim, hidden_dim)
        self.output_proj = nn.Linear(hidden_dim, output_dim)

        self.layer_norm = nn.LayerNorm(hidden_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, past_trajectory):
        batch_size, seq_len, _ = past_trajectory.shape
        past_trajectory = self.input_proj(past_trajectory)
        past_trajectory = self.pos_encoding(past_trajectory)
        past_trajectory = self.layer_norm(past_trajectory)

        memory = self.encoder(past_trajectory)

        future_preds = []
        for _ in range(self.num_samples):
            noise = torch.randn_like(memory) * 0.1
            sampled_memory = memory + self.latent_fc(noise)

            decoded_future = self.decoder(sampled_memory, sampled_memory)
            future_preds.append(self.output_proj(decoded_future))

        return torch.stack(future_preds, dim=1)  # (batch, num_samples, seq_len, output_dim)

In [None]:
def train_model(model, train_loader, num_epochs=50, lr=0.0005, alpha=1.0, beta=1.0, gamma=0.1):
    model.train()
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)

    for epoch in range(num_epochs):
        epoch_loss = 0
        for past, future in train_loader:
            past, future = past.to(device), future.to(device)

            optimizer.zero_grad()
            predicted_futures = model(past)  # (batch, num_samples, seq_len, output_dim)

            # --- minADE Loss ---
            distances = torch.norm(predicted_futures - future.unsqueeze(1), dim=-1)  # (batch, num_samples, seq_len)
            minADE_loss = torch.mean(torch.min(distances.mean(dim=2), dim=1)[0])  # Mean ADE

            # --- minFDE Loss ---
            final_displacement = torch.norm(predicted_futures[:, :, -1, :] - future[:, -1, :].unsqueeze(1), dim=-1)  # (batch, num_samples)
            minFDE_loss = torch.mean(torch.min(final_displacement, dim=1)[0])  # Mean FDE

            # --- Likelihood Loss ---
            log_likelihood = -torch.logsumexp(-distances.sum(dim=2), dim=1).mean()  # Stable logsumexp
            likelihood_loss = -log_likelihood  # Negative log likelihood

            # --- Total Loss ---
            loss = alpha * minADE_loss + beta * minFDE_loss + gamma * likelihood_loss
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()

        print(f"Epoch [{epoch+1}/{num_epochs}] | Total Loss: {epoch_loss / len(train_loader):.4f} "
              f"| minADE: {minADE_loss:.4f} | minFDE: {minFDE_loss:.4f} | LL: {likelihood_loss:.4f}")

In [None]:
def evaluate_model(model, test_loader):
    model.eval()
    total_minADE, total_minFDE = 0, 0

    with torch.no_grad():
        for past, future in test_loader:
            past, future = past.to(device), future.to(device)
            predicted_futures = model(past)

            # --- minADE ---
            distances = torch.norm(predicted_futures - future.unsqueeze(1), dim=-1)
            minADE = torch.mean(torch.min(distances.mean(dim=2), dim=1)[0])
            total_minADE += minADE.item()

            # --- minFDE ---
            final_displacement = torch.norm(predicted_futures[:, :, -1, :] - future[:, -1, :].unsqueeze(1), dim=-1)
            minFDE = torch.mean(torch.min(final_displacement, dim=1)[0])
            total_minFDE += minFDE.item()

    avg_minADE = total_minADE / len(test_loader)
    avg_minFDE = total_minFDE / len(test_loader)
    print(f"Evaluation Results -> minADE: {avg_minADE:.4f}, minFDE: {avg_minFDE:.4f}")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MonteCarloTransformer().to(device)

train_model(model, train_loader, num_epochs=50, lr=0.0005, alpha=1.0, beta=1.0, gamma=0.1)
evaluate_model(model, test_loader)