In [1]:
import torch
import torch.nn as nn


class MLPPlanner(nn.Module):
    def __init__(
        self,
        n_track: int = 10,
        n_waypoints: int = 3,
    ):
        """
        Args:
            n_track (int): number of points in each side of the track
            n_waypoints (int): number of waypoints to predict
        """
        super().__init__()

        self.n_track = n_track
        self.n_waypoints = n_waypoints

        self.model = nn.Sequential(
            nn.Linear(2 * n_track * 2, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, n_waypoints * 2),
        )


    def forward(
        self,
        track_left: torch.Tensor,
        track_right: torch.Tensor,
        **kwargs,
    ) -> torch.Tensor:
        """
        Predicts waypoints from the left and right boundaries of the track.

        During test time, your model will be called with
        model(track_left=..., track_right=...), so keep the function signature as is.

        Args:
            track_left (torch.Tensor): shape (b, n_track, 2)
            track_right (torch.Tensor): shape (b, n_track, 2)

        Returns:
            torch.Tensor: future waypoints with shape (b, n_waypoints, 2)
        """
        b = track_left.shape[0]
        x = torch.cat([track_left, track_right], dim=1)
        x = x.flatten(start_dim=1)
        x = self.model(x)
        return x.view(b, self.n_waypoints, 2)

left = torch.randn(3, 10, 2).to("cuda")
right = torch.randn(3, 10, 2).to("cuda")
net = MLPPlanner().to("cuda")
net(left, right).shape

torch.Size([3, 3, 2])

In [8]:
left = torch.randn(3, 10, 2).to("cuda")
right = torch.randn(3, 10, 2).to("cuda")
x = torch.cat([left, right], dim=1)
x = x.flatten(start_dim=1)
x.shape

torch.Size([3, 40])

In [None]:
class TransformerPlanner(nn.Module):
    def __init__(
        self,
        n_track: int = 10,
        n_waypoints: int = 3,
        d_model: int = 64,
        n_head: int = 4,
        num_layers: int = 2,
    ):
        super().__init__()

        self.n_track = n_track
        self.n_waypoints = n_waypoints

        self.query_embed = nn.Embedding(n_waypoints, d_model)

        self.input_proj = nn.Linear(2, d_model)
        
        decoder_layer = nn.TransformerDecoderLayer(d_model=d_model, nhead=n_head, batch_first=True)
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)

        # 4. An output head to project from d_model to 2D waypoints
        self.output_head = nn.Linear(d_model, 2)

    def forward(
        self,
        track_left: torch.Tensor,
        track_right: torch.Tensor,
        **kwargs,
    ) -> torch.Tensor:
        """
        Predicts waypoints from the left and right boundaries of the track.

        During test time, your model will be called with
        model(track_left=..., track_right=...), so keep the function signature as is.

        Args:
            track_left (torch.Tensor): shape (b, n_track, 2)
            track_right (torch.Tensor): shape (b, n_track, 2)

        Returns:
            torch.Tensor: future waypoints with shape (b, n_waypoints, 2)
        """
        b = track_left.shape[0]

        # (n_waypoints, d_model) -> (1, n_waypoints, d_model) -> (b, n_waypoints, d_model)
        # create embedding for possible waypoint positions
        queries = self.query_embed.weight.unsqueeze(0).repeat(b, 1, 1)
        # print(queries.shape)

        track_points = torch.cat([track_left, track_right], dim=1)  # (b, 2 * n_track, 2)
        # print(track_points.shape)
        memory = self.input_proj(track_points)  # (b, 2 * n_track, d_model)
        # print(memory.shape)

        output = self.decoder(tgt=queries, memory=memory) # (b, n_waypoints, d_model)
        # print(output.shape)

        # 4. Get final waypoints
        waypoints = self.output_head(output) # (b, n_waypoints, 2)

        return waypoints

net = TransformerPlanner().to("cuda")
net(left, right).shape

torch.Size([3, 3, 64])


torch.Size([3, 3, 2])

In [None]:
INPUT_MEAN = [0.2788, 0.2657, 0.2629]
INPUT_STD = [0.2064, 0.1944, 0.2252]
class CNNPlanner(torch.nn.Module):
    def __init__(
        self,
        n_waypoints: int = 3,
    ):
        super().__init__()

        self.n_waypoints = n_waypoints

        self.register_buffer("input_mean", torch.as_tensor(INPUT_MEAN), persistent=False)
        self.register_buffer("input_std", torch.as_tensor(INPUT_STD), persistent=False)

        num_channels = 32

        self.blocks = nn.Sequential(
            nn.Conv2d(3, num_channels, kernel_size = 3, stride = 2, padding = 1, bias = False),
            nn.BatchNorm2d(num_channels),
            nn.ReLU(),
            nn.Conv2d(num_channels, num_channels * 2, kernel_size=3, stride = 2, padding = 1, bias=False), # 32x64
            nn.BatchNorm2d(num_channels * 2),
            nn.ReLU(),
            nn.Conv2d(num_channels * 2, num_channels * 4, kernel_size=3, stride = 2, padding = 1, bias=False), # 64x128
            nn.BatchNorm2d(num_channels * 4),
            nn.ReLU(),
            nn.Conv2d(num_channels * 4, num_channels * 8, kernel_size=3, stride = 2, padding = 1, bias=False), # 64x128
            nn.BatchNorm2d(num_channels * 8),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1, 1)),
        )

        self.outputs = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256, 64),
            nn.ReLU(),
            nn.Linear(64, self.n_waypoints * 2)
        )

    def forward(self, image: torch.Tensor, **kwargs) -> torch.Tensor:
        """
        Args:
            image (torch.FloatTensor): shape (b, 3, h, w) and vals in [0, 1]

        Returns:
            torch.FloatTensor: future waypoints with shape (b, n, 2)
        """
        b = image.shape[0]
        x = (image - self.input_mean[None, :, None, None]) / self.input_std[None, :, None, None]

        features = self.blocks(x)
        # print(features.shape)

        output = self.outputs(features)
        
        return output.view(b, self.n_waypoints, 2)

        raise NotImplementedError

x = torch.randn(1, 3, 96, 128).to("cuda")
net = CNNPlanner().to("cuda")
net(x).shape

torch.Size([1, 256, 1, 1])


torch.Size([1, 3, 2])

In [None]:
import argparse
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
import numpy as np

from homework.models import MLPPlanner, TransformerPlanner, CNNPlanner, load_model, save_model
from homework.metrics import PlannerMetric
from homework.datasets.road_dataset import load_data

exp_dir: str = "logs",
model_name: str = "mlp_planner",
num_epoch: int = 1,
lr: float = 1e-3,
batch_size: int = 128,
seed: int = 2024,
num_workers = 0,

device = torch.device("cuda")

# torch.manual_seed(seed)
# np.random.seed(seed)

# Use 'state_only' for MLP and Transformer planners, 'default' for CNN planner
transform_pipeline = "default" if model_name == "cnn_planner" else "state_only"

train_loader = load_data(
    dataset_path= "drive_data/train",
    transform_pipeline = transform_pipeline,
    num_workers = num_workers,
    batch_size = batch_size,
    shuffle = True,
)
# val_loader = load_data(
#     dataset_path = f"drive_data/val",
#     transform_pipeline = transform_pipeline,
#     num_workers = num_workers,
#     batch_size = batch_size,
# )

model = load_model(model_name, **kwargs)
model = model.to(device)
model.train()
criterion = nn.MSELoss(reduction='none') 
optimizer = optim.AdamW(model.parameters(), lr=lr)
metric = PlannerMetric()

best_val_error = float('inf')

# Training Loop
for epoch in range(num_epoch):
    train_loss = 0.0
    pbar = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{num_epoch} [Train]")
            
    for batch in pbar:
        # print(batch["waypoints_mask"])
        for k, v in batch.items():
            print(k)
        return
        # batch = {k: v.to(device) for k, v in batch.items()}


        # optimizer.zero_grad()
        # predictions = model(batch["left_track"], batch["right_track"])

Loaded 8000 samples from 16 episodes


TypeError: '<' not supported between instances of 'tuple' and 'int'