In [1]:
import torch
import torch.nn as nn
import math
import numpy as np
from torch.utils.data import Dataset, DataLoader

In [None]:
def collate_fn(batch):
    if len(batch[0]) == 3:  # Training data with future
        pasts, masks, futures = zip(*batch)
        past = torch.stack(pasts)
        mask = torch.stack(masks)
        future = torch.stack(futures)
        return past, mask, future
    else:  # Test data without future
        pasts, masks = zip(*batch)
        past = torch.stack(pasts)
        mask = torch.stack(masks)
        return past, mask
    
class TrajectoryDataset(Dataset):
    def __init__(self, input_path=None, data=None, T_past=50, T_future=60, is_test=False):
        if data is not None:
            self.data = data
        else:
            npz = np.load(input_path)
            self.data = npz['data']
        self.T_past = T_past
        self.T_future = T_future
        self.is_test = is_test
        
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        scene = self.data[idx]  #(num_agents, T, 6)
        
        # past trajectory of all agents
        past = scene[:, :self.T_past, :]
        
        # create mask for valid agents, essentially checking if the agent has any past trajectory
        # no padding in the first two dimensions
        mask = np.sum(np.abs(past[..., :2]), axis=(1, 2)) > 0
        
        # for training data, also extract future trajectory of ego vehicle
        if not self.is_test and scene.shape[1] >= self.T_past + self.T_future:
            future = scene[0, self.T_past:self.T_past+self.T_future, :2]  # Ego vehicle future (x,y)
            return torch.tensor(past, dtype=torch.float32), torch.tensor(mask, dtype=torch.bool), torch.tensor(future, dtype=torch.float32)
        
        # for test data, only return past
        return torch.tensor(past, dtype=torch.float32), torch.tensor(mask, dtype=torch.bool)

In [7]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pos_enc = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() *
                             (-math.log(10000.0) / d_model))
        pos_enc[:, 0::2] = torch.sin(position * div_term)
        pos_enc[:, 1::2] = torch.cos(position * div_term)
        pos_enc = pos_enc.unsqueeze(1)
        self.register_buffer('pos_enc', pos_enc)

    def forward(self, x):
        seq_len = x.size(0)
        return x + self.pos_enc[:seq_len]

In [8]:
class TrajectoryTransformer(nn.Module):
    def __init__(self,
                 feature_dim=6,
                 d_model=128,
                 nhead=8,
                 num_layers_temporal=2,
                 num_layers_social=2,
                 dim_feedforward=256,
                 T_past=50,
                 T_future=60,
                 dropout=0.1):
        super().__init__()
        self.d_model = d_model
        self.T_past = T_past
        self.T_future = T_future

        self.input_embed = nn.Linear(feature_dim, d_model)
        self.time_pos_enc = PositionalEncoding(d_model, max_len=T_past)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout)
        self.temporal_encoder = nn.TransformerEncoder(
            encoder_layer, num_layers=num_layers_temporal)

        social_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout)
        self.social_encoder = nn.TransformerEncoder(
            social_layer, num_layers=num_layers_social)

        self.mlp = nn.Sequential(
            nn.Linear(d_model, dim_feedforward),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(dim_feedforward, dim_feedforward // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(dim_feedforward // 2, 2 * T_future)
        )

    def forward(self, past, agent_mask):
        B, N, T, F = past.shape
        # temporal encoding per agent
        x = past.view(B * N, T, F).permute(1, 0, 2)
        x = self.input_embed(x) * math.sqrt(self.d_model)
        x = self.time_pos_enc(x)
        x = self.temporal_encoder(x)
        agent_feats = x[-1].view(B, N, self.d_model)

        # social encoding across agents
        scene = agent_feats.permute(1, 0, 2)
        scene = self.social_encoder(scene, src_key_padding_mask=~agent_mask)
        ego_embed = scene[0]  # (B, d_model)

        # Directly regress entire future trajectory
        out = self.mlp(ego_embed)  # (B, 2 * T_future)
        preds = out.view(B, self.T_future, 2)
        return preds


In [9]:
def train(model, dataloader, optimizer, device):
    model.train()
    total_loss = 0.0
    criterion = nn.MSELoss()
    
    for batch in dataloader:
        past, mask, future = [x.to(device) for x in batch]
        
        optimizer.zero_grad()
        pred = model(past, mask)
        
        # Calculate loss against ground truth future
        loss = criterion(pred, future)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item() * past.size(0)
        
    return total_loss / len(dataloader.dataset)


def evaluate(model, val_loader, device):
    model.eval()
    total_loss = 0.0
    criterion = nn.MSELoss()
    
    with torch.no_grad():
        for batch in val_loader:
            past, mask, future = [x.to(device) for x in batch]
            pred = model(past, mask)
            loss = criterion(pred, future)
            total_loss += loss.item() * past.size(0)
            
    return total_loss / len(val_loader.dataset)


def predict(model, test_loader, device):
    model.eval()
    all_preds = []
    
    with torch.no_grad():
        for batch in test_loader:
            past, mask = [x.to(device) for x in batch]
            pred = model(past, mask)
            all_preds.append(pred.cpu().numpy())
            
    return np.concatenate(all_preds, axis=0)

In [None]:
train_input = 'data/train.npz'
test_input = 'data/test_input.npz'
output_csv = 'predictions.csv'

batch_size = 32
lr = 5e-4
epochs = 20
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')



Epoch 1/20, Loss: 6375388.7720
Epoch 2/20, Loss: 2951867.6044


ValueError: not enough values to unpack (expected 3, got 2)

In [None]:
full_data = np.load(train_input)['data']

# Split into train and eval (7:3)
num_samples = len(full_data)
num_train = int(0.7 * num_samples)
perm = np.random.permutation(num_samples)
train_idx = perm[:num_train]
eval_idx = perm[num_train:]

train_data = full_data[train_idx]
eval_data = full_data[eval_idx]

train_ds = TrajectoryDataset(data=train_data)
eval_ds = TrajectoryDataset(data=eval_data)

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
eval_loader = DataLoader(eval_ds, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

test_ds = TrajectoryDataset(test_input)
test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

model = TrajectoryTransformer().to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=lr)

In [None]:
for epoch in range(1, 2+1):
    loss = train(model, train_loader, optimizer, device)
    print(f"Epoch {epoch}/{epochs}, Loss: {loss:.4f}")

In [None]:
test_preds = evaluate(model, test_loader, device)
B, T, D = test_preds.shape
flat = test_preds.reshape(B * T, D)
np.savetxt(output_csv, flat, delimiter=',')
print(f"Saved predictions to {output_csv}")