# CommaAI calibration challenge

The goal of this project is to predict the orientation of a camera from a video.
We predict the pitch and yaw of the camera in radians at every frame of the footage.


In [None]:
import os
import sys
import glob

import numpy as np
import pytorch_lightning as pl
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as tdata
import torchvision

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
HEIGHT = 874
WIDTH = 1164
CHUNK_LENGTH = 30
np.random.seed(127)


# Data exploration

In [None]:
def indices_to_windows(idx: np.ndarray) -> list:
    # converts a list of indices to a list of windows
    # where a window is a tuple of (start, end)
    # where start and end are the indices of the window
    # e.g. [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] -> [(0, 10)]
    # e.g. [0, 1, 2, 3, 6, 7, 8, 9, 10] -> [(0, 4), (6, 11)]
    # e.g. [0, 1, 9, 10, 11, 12, 13, 17, 18, 19] -> [(0, 2), (9, 14), (17, 20)]
    if len(idx) == 0:
        return []
    jumps = np.argwhere(np.diff(idx) > 1).ravel()
    if len(jumps) == 0:
        return [(idx[0], idx[-1] + 1)]
    else:
        windows = []
        for i in range(len(jumps)):
            if i == 0:
                windows.append((idx[0], idx[jumps[i]] + 1))
            else:
                windows.append((idx[jumps[i - 1] + 1], idx[jumps[i]] + 1))

        windows.append((idx[jumps[-1] + 1], idx[-1] + 1))
        return windows


def nan_windows(arr: np.ndarray) -> list:
    if np.all(~np.isnan(arr[:, 0])):
        return []
    else:
        nan_indices = np.argwhere(np.isnan(arr[:, 0])).ravel()
        return indices_to_windows(nan_indices)


def non_nan_windows(arr: np.ndarray) -> list:
    if np.all(np.isnan(arr[:, 0])):
        return []
    else:
        non_nan_indices = np.argwhere(~np.isnan(arr[:, 0])).ravel()
        return indices_to_windows(non_nan_indices)


# print(indices_to_windows([0, 1, 2, 3, 6, 7, 8, 9, 10]))
# print(indices_to_windows([0, 1, 9, 10, 11, 12, 13, 17, 18, 19]))


In [None]:
videos = []
orientations = []
for i in range(5):
    vframes, _, _ = torchvision.io.read_video(
        f"labeled/{i}.mp4", start_pts=0, end_pts=60, pts_unit="sec"
    )
    videos.append(vframes)
    orientations.append(np.loadtxt(f"labeled/{i}.txt"))


In [None]:
for i in range(len(videos)):
    print(videos[i].shape, orientations[i].shape)


In [None]:
# load each mp4 video in labeled/ and split it in the max number of contiguous 30 frames chunks
# with non-nan orientation data
# save each chunk as a separate mp4 file as labeled_chunks/{index}_{chunk_index}.mp4
# save the orientation data for each chunk as labeled_chunks/{index}_{chunk_index}.txt
# where index is the index of the original video and chunk_index is the index of the chunk
# in the original video
if not os.path.exists("labeled_chunks"):
    os.mkdir("labeled_chunks")
for video_id in range(5):
    print(f"processing video {video_id}")
    vframes = videos[video_id]
    labels = orientations[video_id]
    windows = non_nan_windows(labels)
    chunk_id = 0
    for window in windows:
        start, end = window
        print(f"processing window {start},{end}")
        if end - start < CHUNK_LENGTH:
            print(f"skipping window {start},{end} because it's too short")
            continue
        else:
            num_chunks = (end - start) // CHUNK_LENGTH
            for i in range(num_chunks):
                chunk_start = start + i * CHUNK_LENGTH
                chunk_end = chunk_start + CHUNK_LENGTH
                chunk = vframes[chunk_start:chunk_end]
                chunk_orientations = labels[chunk_start:chunk_end]
                if np.any(np.isnan(chunk_orientations)):
                    raise ValueError("chunk has nan orientation data")

                torchvision.io.write_video(
                    f"labeled_chunks/{video_id}_{chunk_id}.mp4",
                    chunk,
                    fps=20,
                    video_codec="libx264",
                )
                np.savetxt(
                    f"labeled_chunks/{video_id}_{chunk_id}.txt", chunk_orientations
                )
                print(
                    f"saved chunk {video_id}_{chunk_id} on window {chunk_start},{chunk_end}"
                )
                chunk_id += 1


# Data

In [None]:
class VideoDataset(tdata.Dataset):
    data_dir: str
    batch_size: int
    filenames: list
    data: torch.Tensor
    labels: torch.Tensor

    def __init__(
        self, data_dir="labeled_chunks", filenames: list = [], device=DEVICE
    ) -> None:
        super().__init__()
        self.data_dir = data_dir
        self.filenames = filenames

        self.data = torch.zeros(
            (len(self.filenames), CHUNK_LENGTH, HEIGHT, WIDTH, 3), device=device
        )
        self.labels = torch.zeros((len(self.filenames), CHUNK_LENGTH, 2), device=device)
        for i, filename in enumerate(self.filenames):
            frames, _, _ = torchvision.io.read_video(
                os.path.join(self.data_dir, filename),
                start_pts=0,
                end_pts=CHUNK_LENGTH,
                pts_unit="pts",
            )
            self.data[i] = frames.to(device=device)
            self.labels[i] = torch.from_numpy(
                np.loadtxt(
                    os.path.join(self.data_dir, filename.replace(".mp4", ".txt"))
                )
            ).to(device=device)

    def __len__(self) -> int:
        return len(self.filenames)

    def __getitem__(self, index: int) -> tuple:
        return self.data[index], self.labels[index]


In [None]:
class DataModule(pl.LightningDataModule):
    def __init__(self, data_dir="labeled_chunks", batch_size: int = 3) -> None:
        super().__init__()
        self.data_dir = data_dir
        self.batch_size = batch_size

    def setup(self, stage: str) -> None:
        filenames = []
        for filename in os.listdir(self.data_dir):
            if len(filenames) >= 10:
                break
            if filename.endswith(".mp4"):
                filenames.append(filename)
        if len(filenames) == 0:
            raise ValueError("no files")
        print(filenames)
        np.random.shuffle(filenames)
        num_train = int(len(filenames) * 0.8)
        num_val = int(len(filenames) * 0.1)
        num_test = len(filenames) - num_train - num_val
        self.train_filenames = filenames[:num_train]
        self.val_filenames = filenames[num_train : num_train + num_val]
        self.test_filenames = filenames[num_train + num_val :]
        self.train_data = torch.zeros(
            (len(self.train_filenames), CHUNK_LENGTH, HEIGHT, WIDTH, 3)
        )
        self.val_data = torch.zeros(
            (len(self.val_filenames), CHUNK_LENGTH, HEIGHT, WIDTH, 3)
        )
        self.test_data = torch.zeros(
            (len(self.test_filenames), CHUNK_LENGTH, HEIGHT, WIDTH, 3)
        )
        self.train_labels = torch.zeros((len(self.train_filenames), CHUNK_LENGTH, 2))
        self.val_labels = torch.zeros((len(self.val_filenames), CHUNK_LENGTH, 2))
        self.test_labels = torch.zeros((len(self.test_filenames), CHUNK_LENGTH, 2))
        for i, filename in enumerate(self.train_filenames):
            frames, _, _ = torchvision.io.read_video(
                os.path.join(self.data_dir, filename),
                start_pts=0,
                end_pts=CHUNK_LENGTH,
                pts_unit="pts",
            )
            self.train_data[i] = frames
            self.train_labels[i] = torch.from_numpy(
                np.loadtxt(
                    os.path.join(self.data_dir, filename.replace(".mp4", ".txt"))
                )
            )
        for i, filename in enumerate(self.val_filenames):
            frames, _, _ = torchvision.io.read_video(
                os.path.join(self.data_dir, filename),
                start_pts=0,
                end_pts=CHUNK_LENGTH,
                pts_unit="pts",
            )
            self.val_data[i] = frames
            self.val_labels[i] = torch.from_numpy(
                np.loadtxt(
                    os.path.join(self.data_dir, filename.replace(".mp4", ".txt"))
                )
            )
        for i, filename in enumerate(self.test_filenames):
            frames, _, _ = torchvision.io.read_video(
                os.path.join(self.data_dir, filename),
                start_pts=0,
                end_pts=CHUNK_LENGTH,
                pts_unit="pts",
            )
            self.test_data[i] = frames
            self.test_labels[i] = torch.from_numpy(
                np.loadtxt(
                    os.path.join(self.data_dir, filename.replace(".mp4", ".txt"))
                )
            )

    def train_dataloader(self) -> tdata.DataLoader:
        return tdata.DataLoader(
            tdata.TensorDataset(self.train_data, self.train_labels),
            batch_size=self.batch_size,
            pin_memory=True,
        )

    def val_dataloader(self) -> tdata.DataLoader:
        return tdata.DataLoader(
            tdata.TensorDataset(self.val_data, self.val_labels),
            batch_size=self.batch_size,
            pin_memory=True,
        )

    def test_dataloader(self) -> tdata.DataLoader:
        return tdata.DataLoader(
            tdata.TensorDataset(self.test_data, self.test_labels),
            batch_size=self.batch_size,
            pin_memory=True,
        )


# RNN Model

In [None]:
# Original ConvLSTM cell as proposed by Shi et al.
class ConvLSTMCell(nn.Module):
    def __init__(
        self, in_channels, out_channels, kernel_size, padding, activation, frame_size
    ):

        super(ConvLSTMCell, self).__init__()

        if activation == "tanh":
            self.activation = torch.tanh
        elif activation == "relu":
            self.activation = torch.relu

        # Idea adapted from https://github.com/ndrplz/ConvLSTM_pytorch
        self.conv = nn.Conv2d(
            in_channels=in_channels + out_channels,
            out_channels=4 * out_channels,
            kernel_size=kernel_size,
            padding=padding,
        )

        # Initialize weights for Hadamard Products
        self.W_ci = nn.Parameter(torch.Tensor(out_channels, *frame_size))
        self.W_co = nn.Parameter(torch.Tensor(out_channels, *frame_size))
        self.W_cf = nn.Parameter(torch.Tensor(out_channels, *frame_size))

    def forward(self, X, H_prev, C_prev):

        # Idea adapted from https://github.com/ndrplz/ConvLSTM_pytorch
        conv_output = self.conv(torch.cat([X, H_prev], dim=1))

        # Idea adapted from https://github.com/ndrplz/ConvLSTM_pytorch
        i_conv, f_conv, C_conv, o_conv = torch.chunk(conv_output, chunks=4, dim=1)

        input_gate = torch.sigmoid(i_conv + self.W_ci * C_prev)
        forget_gate = torch.sigmoid(f_conv + self.W_cf * C_prev)

        # Current Cell output
        C = forget_gate * C_prev + input_gate * self.activation(C_conv)

        output_gate = torch.sigmoid(o_conv + self.W_co * C)

        # Current Hidden State
        H = output_gate * self.activation(C)

        return H, C


class ConvLSTM(nn.Module):
    def __init__(
        self, in_channels, out_channels, kernel_size, padding, activation, frame_size
    ):
        super(ConvLSTM, self).__init__()

        self.out_channels = out_channels

        # We will unroll this over time steps
        self.convLSTMcell = ConvLSTMCell(
            in_channels, out_channels, kernel_size, padding, activation, frame_size
        )

    def forward(self, X):
        # X is a frame sequence (batch_size, num_channels, seq_len, height, width)

        # Get the dimensions
        batch_size, _, seq_len, height, width = X.size()

        # Initialize output
        output = torch.zeros(
            batch_size, self.out_channels, seq_len, height, width, device=DEVICE
        )

        # Initialize Hidden State
        H = torch.zeros(batch_size, self.out_channels, height, width, device=DEVICE)

        # Initialize Cell Input
        C = torch.zeros(batch_size, self.out_channels, height, width, device=DEVICE)

        # Unroll over time steps
        for time_step in range(seq_len):
            H, C = self.convLSTMcell(X[:, :, time_step], H, C)

            output[:, :, time_step] = H

        return output

In [None]:
class OrientationEstimator(nn.Module):
    sequential: nn.Sequential
    conv: nn.Conv2d

    def __init__(
        self,
        num_kernels: int = 64,
        kernel_size: tuple = (3, 3),
        padding: tuple = (1, 1),
        activation: str = "relu",
        frame_size: tuple = (HEIGHT, WIDTH),
        num_layers: int = 1,
    ):
        super(OrientationEstimator, self).__init__()

        self.sequential = nn.Sequential()

        # Add First layer (Different in_channels than the rest)
        self.sequential.add_module(
            "convlstm1",
            ConvLSTM(
                in_channels=3,
                out_channels=num_kernels,
                kernel_size=kernel_size,
                padding=padding,
                activation=activation,
                frame_size=frame_size,
            ),
        )

        self.sequential.add_module(
            "batchnorm1", nn.BatchNorm3d(num_features=num_kernels)
        )

        # Add rest of the layers
        for l in range(2, num_layers + 1):
            self.sequential.add_module(
                f"convlstm{l}",
                ConvLSTM(
                    in_channels=num_kernels,
                    out_channels=num_kernels,
                    kernel_size=kernel_size,
                    padding=padding,
                    activation=activation,
                    frame_size=frame_size,
                ),
            )

            self.sequential.add_module(
                f"batchnorm{l}", nn.BatchNorm3d(num_features=num_kernels)
            )

        # Add final Convolutional Layer to predict camera orientation
        self.conv = nn.Conv2d(
            in_channels=num_kernels,
            out_channels=2,
            kernel_size=kernel_size,
            padding=padding,
        )

    def forward(self, X):
        # Forward propagation through all the layers
        output = self.sequential(X)

        # Return only the last output frame
        print("pre-conv, output.shape=", output.shape)
        return self.conv(output[:, :, -1])


In [None]:
class Seq2Seq(pl.LightningModule):
    def __init__(
        self,
        num_channels: int = 3,
        num_kernels: int = 64,
        kernel_size: tuple = (3, 3),
        padding: tuple = (1, 1),
        activation: str = "relu",
        frame_size: tuple = (HEIGHT, WIDTH),
        num_layers: int = 1,
    ):
        super(Seq2Seq, self).__init__()

        self.sequential = nn.Sequential()

        # Add First layer (Different in_channels than the rest)
        self.sequential.add_module(
            "convlstm1",
            ConvLSTM(
                in_channels=num_channels,
                out_channels=num_kernels,
                kernel_size=kernel_size,
                padding=padding,
                activation=activation,
                frame_size=frame_size,
            ),
        )

        self.sequential.add_module(
            "batchnorm1", nn.BatchNorm3d(num_features=num_kernels)
        )

        # Add rest of the layers
        for l in range(2, num_layers + 1):
            self.sequential.add_module(
                f"convlstm{l}",
                ConvLSTM(
                    in_channels=num_kernels,
                    out_channels=num_kernels,
                    kernel_size=kernel_size,
                    padding=padding,
                    activation=activation,
                    frame_size=frame_size,
                ),
            )

            self.sequential.add_module(
                f"batchnorm{l}", nn.BatchNorm3d(num_features=num_kernels)
            )

            # Add Convolutional Layer to predict output frame
        self.conv = nn.Conv2d(
            in_channels=num_kernels,
            out_channels=num_channels,
            kernel_size=kernel_size,
            padding=padding,
        )

    def forward(self, X):
        # Forward propagation through all the layers
        output = self.sequential(X)

        # Return only the last output frame
        output = self.conv(output[:, :, -1])

        return nn.Sigmoid()(output)

    def training_step(self, batch, batch_idx):
        X, y = batch
        y_hat = self(X)
        loss = F.mse_loss(y_hat, y)
        self.log("train_loss", loss)
        return loss

    def validation_step(self, batch, batch_idx):
        X, y = batch
        y_hat = self(X)
        loss = F.mse_loss(y_hat, y)
        self.log("val_loss", loss)
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=1e-3)


# Training

## Lightning trainer

In [None]:
trainer = pl.Trainer(
    accelerator="gpu",
    devices=1,
    max_epochs=2,
)
model = Seq2Seq()
data_module = DataModule(
    data_dir=DATA_DIR
)
trainer.fit(model, data_module)


## Classical training

In [None]:
def train_epoch(
    model: OrientationEstimator,
    loader: tdata.DataLoader,
    optimizer: torch.optim.Optimizer,
):
    model.train()
    for batch_idx, (X, y) in enumerate(loader):
        X = X.to(DEVICE)
        y = y.to(DEVICE)
        y_hat = model(X)
        loss = F.mse_loss(y_hat, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        sys.stdout.write(f"Batch {batch_idx}/{len(loader)} Loss: {loss.item()}\r")

    return loss.item()


def get_mse(gt, test):
    test = np.nan_to_num(test)
    return np.mean(np.nanmean((gt - test) ** 2, axis=0))


def eval_epoch(model: OrientationEstimator, loader: tdata.DataLoader):
    model.eval()
    zero_mses = []
    mses = []
    loss_mses = []
    for batch_idx, (X, y) in enumerate(loader):
        X = X.to(DEVICE)
        y_hat = model(X)
        y = y.to(DEVICE)
        loss_mses.append(F.mse_loss(y_hat, y).item())
        y = y.cpu().detach().numpy()
        y_hat = y_hat.cpu().detach().numpy()
        zero_mses.append(get_mse(y, np.zeros_like(y)))
        mses.append(get_mse(y, y_hat))
    return loss_mses, 100 * np.mean(mses) / np.mean(zero_mses)


def train(
    model_name: str,
    model: OrientationEstimator,
    train_loader: tdata.DataLoader,
    val_loader: tdata.DataLoader,
    optimizer: torch.optim.Optimizer,
    num_epochs: int = 1,
):
    best_val_err = 100
    with open(f"{model_name}.log", "w") as f:
        f.write("epoch,train_loss,val_loss,val_err")
        for epoch in range(num_epochs):
            train_loss = train_epoch(model, train_loader, optimizer)
            val_loss, val_err = eval_epoch(model, val_loader)
            print(
                f"\nEpoch {epoch}/{num_epochs} Train Loss: {train_loss}, Val loss: {val_loss}, Val err: {val_err}"
            )
            f.write(f"{epoch},{train_loss},{val_loss},{val_err}")
            if val_err < best_val_err:
                best_val_err = val_err
                torch.save(model.state_dict(), f"{model_name}.pth")
                print("Saved model at epoch {} with val err {}".format(epoch, val_err))

        return model


In [None]:
prefix = "/kaggle/input/commaai-calibration-challenge-dataset/labeled_chunks"
# get filenames of all mp4 files
filenames = glob.glob(os.path.join(prefix, "**", "*.mp4"), recursive=True)
assert len(filenames) > 0
# shuffle filenames and split them into train and validation
indices = np.arange(len(filenames))
np.random.shuffle(indices)
# train_indices = indices[: int(0.8 * len(indices))]
# val_indices = indices[int(0.8 * len(indices)) :]
train_indices = indices[:20]
val_indices = indices[20:25]
train_filenames = [filenames[i] for i in train_indices]
val_filenames = [filenames[i] for i in val_indices]
# create train and validation datasets
train_dataset = VideoDataset(
    data_dir=prefix,
    filenames=train_filenames,
    device="cuda:1"
)
val_dataset = VideoDataset(
    data_dir=prefix,
    filenames=val_filenames,
    device="cuda:1"
)
# create train and validation dataloaders
train_loader = tdata.DataLoader(
    train_dataset,
    batch_size=3,
    shuffle=False,
    pin_memory=True,
)
val_loader = tdata.DataLoader(
    val_dataset,
    batch_size=3,
    shuffle=False,
    pin_memory=True,
)

In [None]:
def get_tensor_size(tensor):
    return tensor.element_size() * tensor.nelement() / 1e9

In [None]:
# create model and optimizer
model = OrientationEstimator()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# train model
train(
    "orientation_estimator",
    model,
    train_loader,
    val_loader,
    optimizer,
    num_epochs=1,
)


In [None]:
# find memory taken by a tensor in GB
def get_tensor_size(tensor):
    return tensor.element_size() * tensor.nelement() / 1e9
