## Imports and Data Loading

In [1]:
import torch
from torch.utils.data import DataLoader

# Load the tensors you just saved
processed_data_path = '../processed/eth_test_sequences.pt'
samples = torch.load(processed_data_path)

# Quick check to ensure the hand-off worked
print(f"Successfully loaded {len(samples)} sequences for training.")

Successfully loaded 364 sequences for training.


  samples = torch.load(processed_data_path)


## Dataset Definition

In [4]:
import torch
from torch.utils.data import Dataset, DataLoader

# 1. Redefine the class so this notebook understands the data structure
class PedestrianDataset(Dataset):
    def __init__(self, samples):
        self.samples = samples

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        obs, target = self.samples[idx]
        return torch.tensor(obs, dtype=torch.float32), torch.tensor(target, dtype=torch.float32)

# 2. Re-wrap the 'samples' you successfully loaded earlier
dataset = PedestrianDataset(samples)

# 3. NOW run the loader
train_loader = DataLoader(dataset, batch_size=32, shuffle=True)

print(f"✅ train_loader is ready with {len(train_loader)} batches.")

✅ train_loader is ready with 12 batches.


## The LSTM Model Architecture

In [2]:
import torch.nn as nn

class LSTMEncoder(nn.Module):
    def __init__(self, input_size=2, hidden_size=64, num_layers=1):
        super(LSTMEncoder, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        
    def forward(self, x):
        # x shape: (batch, seq_len, input_size)
        _, (hidden, cell) = self.lstm(x)
        return hidden, cell

class LSTMDecoder(nn.Module):
    def __init__(self, output_size=2, hidden_size=64, num_layers=1):
        super(LSTMDecoder, self).__init__()
        self.lstm = nn.LSTM(output_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x, hidden, cell):
        # Predict one step at a time
        output, (hidden, cell) = self.lstm(x, (hidden, cell))
        prediction = self.fc(output)
        return prediction, hidden, cell

class TrajectoryPredictor(nn.Module):
    def __init__(self, input_size=2, hidden_size=64, output_len=12):
        super(TrajectoryPredictor, self).__init__()
        self.encoder = LSTMEncoder(input_size, hidden_size)
        self.decoder = LSTMDecoder(input_size, hidden_size)
        self.output_len = output_len
        
    def forward(self, obs_seq):
        batch_size = obs_seq.size(0)
        hidden, cell = self.encoder(obs_seq)
        
        # Start decoding from the last observed position
        decoder_input = obs_seq[:, -1:, :] 
        outputs = []
        
        for _ in range(self.output_len):
            prediction, hidden, cell = self.decoder(decoder_input, hidden, cell)
            outputs.append(prediction)
            decoder_input = prediction # Feed the prediction back as next input
            
        return torch.cat(outputs, dim=1)

## Training Setup

In [5]:
# Initialize model, loss, and optimizer
model = TrajectoryPredictor(input_size=2, hidden_size=128, output_len=12)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Create DataLoader for batching
from torch.utils.data import DataLoader
# Assuming you wrapped 'samples' in your PedestrianDataset class again
train_loader = DataLoader(dataset, batch_size=32, shuffle=True)

## The Training Loop

In [6]:
# Number of epochs determines how many times the model sees the entire dataset
num_epochs = 50 
print("Starting Training...")

for epoch in range(num_epochs):
    model.train() # Set model to training mode
    total_loss = 0
    
    for obs, target in train_loader:
        # 1. Clear previous gradients
        optimizer.zero_grad()
        
        # 2. Forward pass: model makes a prediction [8, 2] -> [12, 2]
        prediction = model(obs)
        
        # 3. Calculate Loss (MSE): how far is the guess from reality?
        loss = criterion(prediction, target)
        
        # 4. Backward pass: calculate how to adjust weights
        loss.backward()
        
        # 5. Step: update the weights
        optimizer.step()
        
        total_loss += loss.item()
    
    # Print progress every 10 epochs
    if (epoch + 1) % 10 == 0:
        avg_loss = total_loss / len(train_loader)
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')

print("Training Complete!")

Starting Training...
Epoch [10/50], Loss: 4.6120
Epoch [20/50], Loss: 0.9244
Epoch [30/50], Loss: 0.8138
Epoch [40/50], Loss: 0.5191
Epoch [50/50], Loss: 0.4973
Training Complete!


## Final Evaluation

In [7]:
def evaluate_metrics(model, loader):
    model.eval()
    ade_total = 0
    fde_total = 0
    total_samples = 0
    
    with torch.no_grad():
        for obs, target in loader:
            prediction = model(obs)
            
            # Calculate Euclidean distance at each step
            # dist shape: (batch, 12)
            dist = torch.norm(prediction - target, dim=2)
            
            # ADE: Mean distance over the 12-step horizon
            ade_total += torch.sum(torch.mean(dist, dim=1)).item()
            
            # FDE: Distance at the final (12th) step
            fde_total += torch.sum(dist[:, -1]).item()
            
            total_samples += obs.size(0)
            
    return ade_total / total_samples, fde_total / total_samples

ade, fde = evaluate_metrics(model, train_loader)
print(f"Final Baseline Results:")
print(f"ADE: {ade:.4f} meters")
print(f"FDE: {fde:.4f} meters")

Final Baseline Results:
ADE: 0.7464 meters
FDE: 1.0921 meters
