Implement a model to predict the future trajectory of a vehicle given a lidar cloud.
Choose your own representation for the trajectory prediction.

Keep it simple :)

You can use the code below as a starting point, feel free to make any modifications you need, or don't use it, it's up to you.

You don't have to use Colab, use the tools, frameworks or lanuages you are most comfortable with.

If you are working on your computer, download the ZIP file, it will be faster!

In [12]:
from pathlib import Path

import numpy as np
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm

import torch.nn.functional as F
import spconv.pytorch as spconv
import pytorch_lightning as pl
import os

In [13]:
class SemanticKITTIDataset(Dataset):
    def __init__(
        self,
        data_path: str,
        lookahead: int = 30,
    ):
        self.data_path = Path(data_path)
        self.lookahead = lookahead

        self.poses = self._load_poses()
        self.lidar_files = sorted((self.data_path / "velodyne").glob("*.bin"))

    def _load_poses(self) -> np.ndarray:
        calib = self._parse_calibration(self.data_path / "calib.txt")
        return self._parse_poses(self.data_path / "poses.txt", calib)

    @staticmethod
    def _parse_calibration(filename: Path) -> dict[str, np.ndarray]:
        calib = {}
        for line in filename.read_text().splitlines():
            key, content = line.strip().split(":")
            values = [float(v) for v in content.strip().split()]
            pose = np.zeros((4, 4))
            pose[0, 0:4] = values[0:4]
            pose[1, 0:4] = values[4:8]
            pose[2, 0:4] = values[8:12]
            pose[3, 3] = 1.0
            calib[key] = pose
        return calib

    @staticmethod
    def _parse_poses(filename: Path, calibration: dict[str, np.ndarray]) -> np.ndarray:
        poses = []
        cab_tr = calibration["Tr"]
        tr_inv = np.linalg.inv(cab_tr)
        for line in filename.read_text().splitlines():
            values = [float(v) for v in line.strip().split()]
            pose = np.zeros((4, 4))
            pose[0, 0:4] = values[0:4]
            pose[1, 0:4] = values[4:8]
            pose[2, 0:4] = values[8:12]
            pose[3, 3] = 1.0
            poses.append(np.matmul(tr_inv, np.matmul(pose, cab_tr), dtype=np.float32))
        return np.array(poses, dtype=np.float32)

    @staticmethod
    def _load_lidar(lidar_file: Path) -> np.ndarray:
        scan = np.fromfile(lidar_file, dtype=np.float32).reshape((-1, 4))
        return scan[:, :3]

    def __len__(self) -> int:
        return len(self.lidar_files) - self.lookahead

    def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
        lidar_data = self._load_lidar(self.lidar_files[idx])[:100000]

        # Convert to tensor
        lidar_tensor = torch.from_numpy(lidar_data).float()

        # Calculate relative pose transformation
        current_pose = self.poses[idx]
        target_pose = self.poses[idx + self.lookahead]

        # Compute relative transformation
        relative_pose = np.matmul(np.linalg.inv(current_pose), target_pose)

        # Extract translation and rotation
        translation = relative_pose[:3, 3]
        rotation = relative_pose[:3, :3]

        # Combine into target tensor
        target = torch.cat(
            [
                torch.from_numpy(translation).float(),
                torch.from_numpy(rotation.flatten()).float(),
            ]
        )
        return lidar_tensor, target

In [23]:

class PointFeatureEncoder(nn.Module):
    def __init__(self, input_dim=3, hidden_dim=128):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.SiLU(),  # More efficient than ReLU
            nn.LayerNorm(hidden_dim),
            nn.Linear(hidden_dim, hidden_dim),
            nn.SiLU(),
            nn.LayerNorm(hidden_dim)
        )

    def forward(self, x):
        return self.encoder(x)

class SparseAttentionLayer(nn.Module):
    def __init__(self, dim, num_heads=8):
        super().__init__()
        self.num_heads = num_heads
        self.scale = dim ** -0.5

        self.query = nn.Linear(dim, dim)
        self.key = nn.Linear(dim, dim)
        self.value = nn.Linear(dim, dim)
        
        self.proj = nn.Linear(dim, dim)
        self.drop = nn.Dropout(0.1)

    def forward(self, x):
        B, N, C = x.shape
        q = self.query(x).reshape(B, N, self.num_heads, C // self.num_heads).permute(0, 2, 1, 3)
        k = self.key(x).reshape(B, N, self.num_heads, C // self.num_heads).permute(0, 2, 1, 3)
        v = self.value(x).reshape(B, N, self.num_heads, C // self.num_heads).permute(0, 2, 1, 3)

        attn = (q @ k.transpose(-2, -1)) * self.scale
        attn = attn.softmax(dim=-1)
        attn = self.drop(attn)

        x = (attn @ v).transpose(1, 2).reshape(B, N, C)
        x = self.proj(x)
        return x

class OptimizedTrajectoryPredictor(pl.LightningModule):
    def __init__(self, 
                 input_dim=3, 
                 hidden_dim=256, 
                 output_dim=12,
                 num_layers=3):
        super().__init__()
        
        # Automatic Mixed Precision
        self.automatic_optimization = False
        
        # Point Cloud Feature Encoding
        self.point_encoder = PointFeatureEncoder(input_dim, hidden_dim)
        
        # Sparse Convolutional Layers
        self.sparse_conv = spconv.SparseConv3d(
            hidden_dim, hidden_dim * 2, 
            kernel_size=3, 
            stride=1, 
            padding=1
        )
        
        # Multi-Head Attention Layers
        self.attention_layers = nn.ModuleList([
            SparseAttentionLayer(hidden_dim) 
            for _ in range(num_layers)
        ])
        
        # Trajectory Prediction Head
        self.trajectory_head = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.SiLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.SiLU(),
            nn.Linear(hidden_dim // 2, output_dim)
        )
        
        # Loss function
        self.criterion = nn.MSELoss()
        
    def forward(self, x):
        # x: (B, N, 3) - Batch of point clouds
        
        # Point Feature Encoding
        features = self.point_encoder(x)
        
        # Sparse Convolution (if using sparse point clouds)
        # This is a simplified version - adjust based on your exact sparse tensor format
        sparse_features = spconv.SparseConvTensor(
            features, 
            features.indices, 
            features.spatial_shape, 
            features.batch_size
        )
        sparse_features = self.sparse_conv(sparse_features)
        
        # Multi-Head Attention Layers
        for attention_layer in self.attention_layers:
            features = attention_layer(features)
        
        # Global Pooling
        global_features = features.mean(dim=1)
        
        # Trajectory Prediction
        trajectory = self.trajectory_head(global_features)
        
        return trajectory
    
    def training_step(self, batch, batch_idx):
        lidar_points, target = batch
        
        # Forward pass with automatic mixed precision
        with torch.cuda.amp.autocast():
            prediction = self(lidar_points)
            loss = self.criterion(prediction, target)
        
        # Log metrics
        self.log('train_loss', loss, prog_bar=True)
        
        return loss
    
    def configure_optimizers(self):
        # Adaptive optimizer with learning rate scheduling
        optimizer = torch.optim.AdamW(
            self.parameters(), 
            lr=1e-3, 
            weight_decay=1e-4
        )
        
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
            optimizer, 
            T_max=100,  # Total number of epochs
            eta_min=1e-5
        )
        
        return {
            'optimizer': optimizer,
            'lr_scheduler': scheduler
        }

# Additional Training Configuration
def train_model():
    # Lightning Trainer with optimizations
    trainer = pl.Trainer(
        accelerator='gpu',
        devices=1,  # Or multiple GPUs
        precision=16,  # 16-bit precision
        max_epochs=100,
        # gradient_clip_val=0.5,
        # accumulate_grad_batches=4,  # Simulate larger batch sizes
        strategy='auto' 
    )
    
    # Initialize model
    model = OptimizedTrajectoryPredictor()
    
    # Create dataset and dataloader
    dataset = SemanticKITTIDataset("/home/sandhu/learning/sensmore_test/SemanticKITTI_00")
    dataloader = DataLoader(
        dataset,
        batch_size=4,
        num_workers=os.cpu_count(),
        pin_memory=True,
        persistent_workers=True
    )
    
    # Train the model
    trainer.fit(model, dataloader)

In [24]:
# def train(model: nn.Module, train_loader: DataLoader, num_epochs: int):
#     device = "cpu"
#     if torch.cuda.is_available():
#         device = "cuda"
#     elif torch.backends.mps.is_available():
#         device = "mps"
#     device = torch.device(device)

#     model.to(device)

#     optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  # Adjust to your needs
#     criterion = nn.MSELoss()  # Adjust to your needs
#     for epoch in range(num_epochs):
#         model.train()
#         total_loss = 0.0

#         for lidar_points, target in (bar := tqdm(train_loader)):
#             lidar_points = lidar_points.to(device)
#             target = target.to(device)

#             optimizer.zero_grad()

#             # Predict trajectory
#             prediction = model(lidar_points)

#             # Compute loss (e.g., MSE for translation and rotation)
#             translation_loss = criterion(prediction[:, :3], target[:, :3])
#             rotation_loss = criterion(prediction[:, 3:], target[:, 3:])

#             loss = translation_loss + rotation_loss

#             loss.backward()
#             optimizer.step()

#             total_loss += loss.item()
#             bar.set_description(
#                     f"Epoch {epoch+1:2}/{num_epochs} Epoch Loss: {total_loss/len(train_loader):.4f} Loss: {loss:.2f}"
#                 )

#     return model



In [25]:
def main():
    dataset = SemanticKITTIDataset("/home/sandhu/learning/sensmore_test/SemanticKITTI_00")
    train_loader = DataLoader(
        dataset,
        batch_size=2,
        shuffle=True,
        num_workers=2,  # Adjust this to your machine
    )
    
    # model = OptimizedTrajectoryPredictor()
    train_model()


if __name__ == "__main__":
    main()

Using 16bit Automatic Mixed Precision (AMP)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name             | Type                | Params | Mode 
-----------------------------------------------------------------
0 | point_encoder    | PointFeatureEncoder | 67.8 K | train
1 | sparse_conv      | SparseConv3d        | 3.5 M  | train
2 | attention_layers | ModuleList          | 789 K  | train
3 | trajectory_head  | Sequential          | 100 K  | train
4 | criterion        | MSELoss             | 0      | train
-----------------------------------------------------------------
4.5 M     Trainable params
0         Non-trainable params
4.5 M     Total params
17.988    Total estimated model params size (MB)
36        Modules in train mode
0         Modules in eval mode


Training: |                                   | 0/? [00:00<?, ?it/s]

  with torch.cuda.amp.autocast():


OutOfMemoryError: CUDA out of memory. Tried to allocate 392.00 MiB. GPU 0 has a total capacity of 3.72 GiB of which 71.56 MiB is free. Including non-PyTorch memory, this process has 3.62 GiB memory in use. Of the allocated memory 3.52 GiB is allocated by PyTorch, and 11.42 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)