# 1: Import All Neccesary Modules


In [226]:
import numpy as np
import pandas as pd
import os
from pathlib import Path
import random
import h5py
import hdf5plugin
from PIL import Image
from tqdm import tqdm
import torch
import torchvision.transforms as T
import torchvision.transforms.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.nn as nn
import torch.distributed as dist
from torch.nn.utils.rnn import pad_sequence
import torchvision
from torchvision.models.detection.keypoint_rcnn import KeypointRCNNPredictor
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import hdf5plugin
import torch
import torch.nn as nn
import torch.functional as F
import torchvision.models as models
import lightning as L 
from glob import glob
import sys



# 2: Create Data Handler Class
The data handler class pulls all the data from the Dataset Directory and organizes it into a trainable dataset



In [227]:
from torch import float32

class DroneRaceDataset(Dataset):
    def __init__(self, h5_files, transform=None, max_corners=40):
        self.h5_files = h5_files
        self.images = []
        self.targets = []
        self.max_corners = max_corners
        self.transforms = transform
        
        # Load all images and targets from multiple h5 files
        for h5_file in self.h5_files:
            with h5py.File(h5_file, 'r') as h5f:
                print("Gathering Images")
                images = h5f["images"]
                self.images = [None] * len(images)
                for idx in tqdm(range(len(images))):
                    self.images[idx] = (images[idx])
                print("Images Loaded. Length of Images: ", len(self.images))
                self.targets = [h5f[f"targets/{i:05d}"][()] for i in range(len(self.images))]
                print("Targets Loaded. Length of Targets: ", len(self.targets))
                    

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        
        image = self.images[idx] 
        image = image.transpose(1, 2, 0).astype('uint8')

        target_data = self.targets[idx]
        # Convert image to PIL Image
        image = Image.fromarray(image)

        # Ensure the image is in RGB mode
        if image.mode != 'RGB':
            image = image.convert('RGB')

        # Get original image size
        width, height = image.size

        # Apply transforms
        if self.transforms:
            image = self.transforms(image)

        # Prepare the target dictionary
        target = {}
        boxes = []
        labels = []
        keypoints = []

        for gate in target_data:
            gate = gate.reshape(-1, 3)  # Shape: (4, 3)
            visibility = gate[:, 2]
            xy = gate[:, :2]  # Assuming coordinates are normalized between 0 and 1
            # Scale coordinates to image size
            xy[:, 0] *= width
            xy[:, 1] *= height

            # Get visible keypoints
            valid_indices = visibility > 0
            if not valid_indices.any():
                continue  # Skip gates with no visible keypoints

            # Bounding box
            x_coords = xy[valid_indices, 0]
            y_coords = xy[valid_indices, 1]
            xmin = x_coords.min()
            ymin = y_coords.min()
            xmax = x_coords.max()
            ymax = y_coords.max()

            # Check for zero-area boxes
            if xmin >= xmax or ymin >= ymax:
                # Optionally, expand the box slightly
                xmin = max(xmin - 1, 0)
                xmax = min(xmax + 1, width)
                ymin = max(ymin - 1, 0)
                ymax = min(ymax + 1, height)
                # Check for and skip any invalid boxes
                if xmin >= xmax or ymin >= ymax:
                    continue  

            boxes.append([xmin, ymin, xmax, ymax])
            labels.append(1) 

            # Keypoints
            kp = np.zeros((4, 3), dtype=np.float32)
            kp[:, :2] = xy
            kp[:, 2] = visibility
            keypoints.append(kp)

        if len(boxes) == 0:
            # If no valid gates are present, create a dummy target
            boxes = torch.zeros((0, 4), dtype=torch.float32)
            labels = torch.zeros((0,), dtype=torch.int64)
            keypoints = torch.zeros((0, 4, 3), dtype=torch.float32)
        else:
            boxes = torch.as_tensor(boxes, dtype=torch.float32)
            labels = torch.as_tensor(labels, dtype=torch.int64)
            keypoints = torch.as_tensor(np.array(keypoints), dtype=torch.float32)

        #Form target dictionary for RCNN_FPN(see pytorch documentation)
        target['boxes'] = boxes
        target['labels'] = labels
        target['keypoints'] = keypoints
        target['image_id'] = torch.tensor([idx])
        target['area'] = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        target['iscrowd'] = torch.zeros((len(boxes),), dtype=torch.int64)

        return image, target




# 3: Create Model
The model runs on a resnet50 backbone 



In [228]:

# Create DataLoaders
def collate_fn(batch):
    return tuple(zip(*batch))


# 4: Create Ligtning Class
Integrate model into Pytorch Lightning for Faster Testing

In [229]:
from torchvision.models.detection import keypointrcnn_resnet50_fpn

class LitCornerDetector(L.LightningModule):
    def __init__(self, keypoints=4, learning_rate=1e-3, im_to_print=4):
        super(LitCornerDetector, self).__init__()
        # Use keypointRCNN, since it is a large model that takes care of bounding box calcs 
        self.model = keypointrcnn_resnet50_fpn(weights=None, weights_backbone=None, num_classes=2, num_keypoints=keypoints)
        self.learning_rate = learning_rate
        self.im_to_print=4

    def forward(self, images, targets):
        if targets is not None:
            return self.model(images, targets) 
        else:
            return self.model(images)
    
    def training_step(self, batch, batch_idx):
        images, targets = batch
        images = list(img.to(self.device) for img in images)
        targtets = [{k: v.to(self.device) for k, v in t.items()} for t in targets]

        loss_dict = self.model(images, targtets)
        losses = sum(loss for loss in loss_dict.values())

        self.log("train_loss", losses)
        
        return losses

    def validation_step(self, batch, batch_idx):
        images, targets = batch  
        
        images = list(img.to(self.device) for img in images)
        targtets = [{k: v.to(self.device) for k, v in t.items()} for t in targets]

        loss_dict = self.model(images, targtets)
        losses = sum(loss for loss in loss_dict.values())

        self.log("val_loss", losses)
        return losses

    def on_train_epoch_end(self):
        # Add specific logging here
        self.print_outputs(self.dataset, self.device, self.im_to_print)
        pass

    def print_outputs(self, dataset, device, num_images):
        self.model.eval()  # Set the model to evaluation mode
        
        with torch.no_grad():
            indices = np.random.choice(len(dataset), num_images, replace=False)  # Choose random indices
            
            for idx in indices:
                # Get the validation data
                image, target = dataset[idx]
                image_input = image.to(device)
                output = self.model([image_input])[0]
                
                # Convert image to numpy array
                image_np = image.cpu().permute(1, 2, 0).numpy()
                image_np = np.clip(image_np, 0, 1)
 
                plt.figure(figsize=(8, 8))
                plt.imshow(image_np)
                ax = plt.gca()

                # Ground truth: plot only visible corners
                for box, keypoints in zip(target['boxes'], target['keypoints']):
                    box = box.numpy()
                    keypoints = keypoints.numpy()
                    xmin, ymin, xmax, ymax = box
                    rect = plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin, linewidth=2, edgecolor='g', facecolor='none')
                    ax.add_patch(rect)
                    plt.plot(keypoints[:, 0], keypoints[:, 1], 'go')

                # Predicted: plot only predicted visible corners
                boxes = output['boxes'].cpu().numpy()
                scores = output['scores'].cpu().numpy()
                keypoints = output['keypoints'].cpu().numpy()
                for box, score, kp in zip(boxes, scores, keypoints):
                    if score < 0.5:  # Skip low-confidence predictions
                        continue
                    xmin, ymin, xmax, ymax = box
                    rect = plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin, linewidth=2, edgecolor='r', facecolor='none', linestyle='--')
                    ax.add_patch(rect)
                    plt.plot(kp[:, 0], kp[:, 1], 'rx')  # Predicted keypoints
                
                plt.title(f"Sample Index: {idx}")
                plt.axis('off')
                plt.show()

    def configure_optimizers(self):
        # Configure optimizer and scheduler
        optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.2, patience=2)
        return {"optimizer": optimizer, "lr_scheduler": scheduler, "monitor": "val_loss"}


# 5: Load in Dataset and then Sanity Check with Visbility

In [232]:
from torchvision.transforms import Compose, ToTensor

max_epochs = 4
batch_size = 100 

#h5_files_full = glob(os.path.join("../Datasets", "autonomous_flight-??a-ellipse.h5"))
h5_files_test = glob(os.path.join("../Datasets", "LittletonBlue.h5"))

transform = Compose([ToTensor()])
# Initialize the dataset and dataloader
full_dataset = DroneRaceDataset(h5_files_test, transform=transform)

# Define the split ratio
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size

# Split the dataset
train_dataset, val_dataset = torch.utils.data.random_split(full_dataset, [train_size, val_size])



Gathering Images


100%|██████████| 142/142 [00:02<00:00, 48.37it/s]

Images Loaded. Length of Images:  142
Targets Loaded. Length of Targets:  142





# 5.2 Visualize the Image and Truth Data

# Run the Training Loop

In [233]:
from lightning.pytorch.loggers import TensorBoardLogger

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

modelLight = LitCornerDetector()

logger = TensorBoardLogger("runs", name="MI", version="ex", log_graph=True)
trainer = L.Trainer(logger=logger, limit_train_batches=1.0, max_epochs=max_epochs, log_every_n_steps=5)
trainer.fit(modelLight, train_dataloaders=train_loader, val_dataloaders=val_loader)

GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
`Trainer(limit_train_batches=1.0)` was configured so 100% of the batches per epoch will be used..
/opt/anaconda3/envs/delft_dev/lib/python3.12/site-packages/lightning/pytorch/callbacks/model_checkpoint.py:654: Checkpoint directory runs/MI/ex/checkpoints exists and is not empty.

  | Name  | Type         | Params | Mode 
-----------------------------------------------
0 | model | KeypointRCNN | 59.1 M | train
-----------------------------------------------
59.1 M    Trainable params
0         Non-trainable params
59.1 M    Total params
236.335   Total estimated model params size (MB)
209       Modules in train mode
0         Modules in eval mode
/opt/anaconda3/envs/delft_dev/lib/python3.12/site-packages/lightning/pytorch/loggers/tensorboard.py:191: Could not log computational graph to TensorBoard: The `model.example_input_array` attribute is not set or `input_array` was not

Sanity Checking DataLoader 0:   0%|          | 0/1 [00:00<?, ?it/s]

/opt/anaconda3/envs/delft_dev/lib/python3.12/site-packages/lightning/pytorch/trainer/connectors/data_connector.py:424: The 'val_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.
Error: command buffer exited with error status.
	The Metal Performance Shaders operations encoded on it may not have completed.
	Error: 
	(null)
	Internal Error (0000000e:Internal Error)
	<AGXG14XFamilyCommandBuffer: 0x4bdcf41d0>
    label = <none> 
    device = <AGXG14CDevice: 0x17b378c00>
        name = Apple M2 Max 
    commandQueue = <AGXG14XFamilyCommandQueue: 0x17b42da00>
        label = <none> 
        device = <AGXG14CDevice: 0x17b378c00>
            name = Apple M2 Max 
    retainedReferences = 1
Error: command buffer exited with error status.
	The Metal Performance Shaders operations encoded on it may not have completed.
	Error: 
	(null)
	Internal Error (0000000e

NotImplementedError: The operator 'aten::upsample_bicubic2d.out' is not currently implemented for the MPS device. If you want this op to be added in priority during the prototype phase of this feature, please comment on https://github.com/pytorch/pytorch/issues/77764. As a temporary fix, you can set the environment variable `PYTORCH_ENABLE_MPS_FALLBACK=1` to use the CPU as a fallback for this op. WARNING: this will be slower than running natively on MPS.