In [58]:
import yaml
import torch
from torch.utils.data import Dataset
from tqdm.notebook import tqdm
import numpy as np


def load_config(config_path: str) -> dict:

    # Open the YAML file, load its content, and return dictionary
    with open(config_path, "r") as file:
        config = yaml.safe_load(file)
    return config


config = load_config("config/competition.yaml")

Datalaoding and model definition

In [59]:
import torch
from torch import nn
import torch.nn.functional as F
import random


class NeuralNetwork(nn.Module):
    def __init__(self, input_channels=3, num_targets=6):
        """
        Custom regression model for detecting target corners with (x, y, z, visibility) coordinates.
        - input_channels: Number of input channels in the image RGB = 3
        - num_targets: Maximum number of targets per image.
        """
        super().__init__()
        self.num_targets = num_targets

        # Backbone Convolutional Layers
        self.conv1 = nn.Conv2d(input_channels, 64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
        self.conv4 = nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1)

        # Fully Connected Layers for Regression
        self.fc1 = nn.Linear(
            512 * 30 * 40, 1024
        )  # Adjust input size based on image resolution after convolutions
        self.fc2 = nn.Linear(
            1024, self.num_targets * 12
        )  # Output size: [num_targets, 12]

    def forward(self, x):
        # Feature extraction with convolutional layers
        x = F.relu(self.conv1(x))  # -> [batch, 64, H, W]
        x = F.max_pool2d(
            x, kernel_size=2, stride=2
        )  # Downsampling -> [batch, 64, H/2, W/2]

        x = F.relu(self.conv2(x))  # -> [batch, 128, H/2, W/2]
        x = F.max_pool2d(
            x, kernel_size=2, stride=2
        )  # Downsampling -> [batch, 128, H/4, W/4]

        x = F.relu(self.conv3(x))  # -> [batch, 256, H/4, W/4]
        x = F.max_pool2d(
            x, kernel_size=2, stride=2
        )  # Downsampling -> [batch, 256, H/8, W/8]

        x = F.relu(self.conv4(x))  # -> [batch, 512, H/8, W/8]
        x = F.max_pool2d(
            x, kernel_size=2, stride=2
        )  # Downsampling -> [batch, 512, H/16, W/16]

        # Flatten before feeding into fully connected layers
        x = x.view(x.size(0), -1)  # Flatten -> [batch, 512 * (H/16) * (W/16)]

        # Regression layers
        x = F.relu(self.fc1(x))  # Fully connected layer -> [batch, 1024]
        x = self.fc2(x)  # Final layer -> [batch, num_targets * 12]

        # Reshape to [batch, num_targets, 12]
        x = x.view(-1, self.num_targets, 12)

        return x

In [60]:
import torch

model = NeuralNetwork()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
print(f"Device: {device}")

Device: cuda


Defining data loaders and datasets

In [61]:
from torch.utils.data import DataLoader, random_split, Dataset
from dataloading import DroneDataset

data_path = "/workspaces/AE4353-Y24/competition/data/Autonomous"
dataset = DroneDataset(data_path)

# Split the dataset into training and validation sets
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Create training set loader
train_loader = DataLoader(
    train_dataset,
    batch_size=config["batchsize_train"],
    shuffle=config["shuffle_train"],
    num_workers=config["num_workers_train_loader"],
)

# Create validation set loader
val_loader = DataLoader(
    val_dataset,
    batch_size=config["batchsize_val"],
    shuffle=config["shuffle_val"],
    num_workers=config["num_workers_val_loader"],
)
optimizer = torch.optim.RMSprop(model.parameters(), lr=config["lr"])

Loss function

In [79]:
import torch
from tqdm import tqdm
from scipy.optimize import linear_sum_assignment
import torch
import torch.nn.functional as F
from tqdm import tqdm


def soft_matching_loss(gt_gates, pred_gates, p_max=10.0):
    """
    Calculate a soft matching loss between ground truth and predicted gates using a differentiable approach.
    Pads gt_gates or pred_gates to have the same number of gates if they are different.

    Args:
        gt_gates: Ground truth gates, shape (B, N, 12).
        pred_gates: Predicted gates, shape (B, M, 12).
        p_max: Maximum penalty for unmatched gates.

    Returns:
        loss: Calculated differentiable loss tensor.
    """
    # Get batch size, number of gt gates, and number of pred gates
    batch_size = gt_gates.size(0)
    num_gt = gt_gates.size(1)
    num_pred = pred_gates.size(1)

    # Pad either gt_gates or pred_gates to have the same number of gates
    if num_gt > num_pred:
        # Pad pred_gates to match gt_gates along the gates dimension
        padding = (0, 0, 0, num_gt - num_pred)  # Pad along the gates dimension
        pred_gates = F.pad(pred_gates, padding, value=0)
    elif num_gt < num_pred:
        # Pad gt_gates to match pred_gates along the gates dimension
        padding = (0, 0, 0, num_pred - num_gt)  # Pad along the gates dimension
        gt_gates = F.pad(gt_gates, padding, value=0)

    # Flatten to shape (batch_size * num_gates, 12)
    gt_gates_flat = gt_gates.view(-1, 12)
    pred_gates_flat = pred_gates.view(-1, 12)

    # Extract coordinates and visibility information
    gt_coords = gt_gates_flat.view(-1, 4, 3)[:, :, :2]  # Shape (N, 4, 2)
    pred_coords = pred_gates_flat.view(-1, 4, 3)[:, :, :2]  # Shape (N, 4, 2)

    # Calculate pairwise distances between ground truth and predicted gates
    distances = torch.cdist(gt_coords, pred_coords, p=2)  # Shape (N, N, 4)
    distances = distances.mean(dim=-1)  # Average over corners, shape (N, N)

    # Apply softmax to get "soft assignment" scores
    scores = F.softmax(
        -distances, dim=1
    )  # Negative to get matching scores, shape (N, N)

    # Calculate weighted matching score
    matching_loss = (
        (scores * distances).sum(dim=1).mean()
    )  # Mean over ground truth gates

    # Penalty for unmatched gates
    unmatched_penalty = p_max * (1 - scores.max(dim=1)[0]).mean()

    # Total loss is the sum of matching loss and unmatched penalty
    loss = matching_loss + unmatched_penalty

    return loss

Evaluate

In [80]:
def evaluate(dataloader, model, p_max=10.0):
    """
    Evaluate the performance of a model on a given dataloader using PyTorch tensors.

    Args:
        dataloader (torch.utils.data.DataLoader): The dataloader containing the evaluation data.
        model (torch.nn.Module): The model to be evaluated.
        p_max (float): Maximum penalty for matching.

    Returns:
        tuple: A tuple containing the average matching score (loss), evaluation metric (mean, median, and all values), and predictions.
    """
    model.eval()  # Set the model to evaluation mode
    total_loss = 0.0
    metrics = []
    all_predictions = []

    with torch.no_grad():  # No need to calculate gradients for evaluation
        for batch in tqdm(dataloader, desc="Eval", leave=False):
            inputs, targets_gt = batch
            inputs = inputs.to(device)
            targets_gt = targets_gt.to(device)

            # Forward pass to get predicted gates
            pred_gates = model(inputs)

            # Calculate matching score between ground truth and predicted gates using PyTorch tensors
            total_matching_score = soft_matching_loss(targets_gt, pred_gates, p_max)

            # Use the matching score as loss
            total_loss += total_matching_score

            # Store results
            metrics.append(total_matching_score)
            all_predictions.extend(pred_gates.tolist())

    # Convert to tensor for easier calculations
    metrics = torch.tensor(metrics)
    all_predictions = torch.tensor(all_predictions)

    return (
        total_loss / len(dataloader),
        (metrics.mean().item(), metrics.median().item(), metrics),
        all_predictions,
    )

Trainging Loop

In [85]:
from tqdm.notebook import tqdm


def train_epoch(train_loader, val_loader, model, optimizer, p_max=10.0):
    """
    Trains the model for one epoch using the provided data loaders, model, optimizer, and criterion.
    Args:
        train_loader (torch.utils.data.DataLoader): Data loader for the training set.
        val_loader (torch.utils.data.DataLoader): Data loader for the validation set.
        model (torch.nn.Module): The model to be trained.
        optimizer (torch.optim.Optimizer): The optimizer used for training.
        criterion (torch.nn.Module): The loss function used for training.
    Returns:
        tuple: A tuple containing the training loss, training performance, validation performance, and validation predictions.
    """
    model.train()
    total_loss = 0.0

    for batch in tqdm(train_loader, desc="Training", leave=False):
        inputs, targets_gt = batch
        inputs = inputs.to(device)
        targets_gt = targets_gt.to(device)

        optimizer.zero_grad()

        # Forward pass to get predicted gates
        pred_gates = model(inputs)
        # Calculate matching score between ground truth and predicted gates using PyTorch tensors
        loss = soft_matching_loss(targets_gt, pred_gates, p_max)

        loss.backward()
        optimizer.step()

    train_loss, train_performance, _ = evaluate(train_loader, model)
    _, val_performance, val_pred = evaluate(val_loader, model)

    return train_loss, train_performance, val_performance, val_pred

In [86]:
from torch.utils.tensorboard import SummaryWriter

exp_name = "First Run"
writer = SummaryWriter(f"runs/{exp_name}", comment="")

for epoch in tqdm(range(config["epochs"]), desc="Epochs"):
    (
        train_loss,
        (train_error_mean, train_error_median, train_error_tensor),
        (val_err_mean, val_err_median, val_error_tensor),
        val_pred,
    ) = train_epoch(train_loader, val_loader, model, optimizer)
    writer.add_scalar("Loss/train", train_loss, epoch)
    writer.add_scalar("Error/train/mean", train_error_mean, epoch)
    writer.add_scalar("Error/train/median", train_error_median, epoch)
    writer.add_scalar("Error/val/mean", val_err_mean, epoch)
    writer.add_scalar("Error/val/median", val_err_median, epoch)
    writer.add_histogram("Error/val", train_error_tensor, epoch)
    writer.add_histogram("Pred/val", val_pred, epoch)

Epochs:   0%|          | 0/5 [00:00<?, ?it/s]

Training:   0%|          | 0/641 [00:00<?, ?it/s]

KeyboardInterrupt: 