<a href="https://colab.research.google.com/github/ldmcgo26/deep_learning/blob/main/DeepLearning_MS2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Assignment Description

Please upload the following materials to the GitHub repository of your project work and submit the URL again:

- Python code that downloads, prepares and loads the data (this was the task of Milestone 1, now you have only to adjust it to the other parts of your code)
- Python code for the baseline model.
- Python code that trains a deep learning model,
- Python code that evaluates the results on a (separate) test set,
- Updated README.MD file with instructions how to run the code.

Please add as much comments to your code as much is needed to be able to easily understand it.

At this stage, it is not required to have good (or even reasonable) results, the only requirement is to have the data loading-preparation-training-evaluation pipeline ready.

## Our Model Architecture

### 1. Transfer Learning:
- Load pre-trained ResNet-50 or -101 model with FPN (better suited for drone images)
- Freeze or allow fine-tuning
- Remove original detection layers (head)

### 2. Region Proposal Network (RPN):
- Use built-in RPN layers to generate regions
- Anchor box tuning as needed (e.g. smaller sizes)

### 3. Region of Interest (ROI) Head:
- Customize ROI detection box head layers with dropout regularization
- Add final box predictor, specified for VisDrone data

### 4. Misc. Training Optimizations:
- Data augmentation
- Optimizers (SGD with momentum, L2 regularization)
- Learning rate scheduling
- Early stopping

In [5]:
"""""""""""""""""""""""""""""""""

Description:

Running the following scripts will collect our VisDrone data and load it into
the structure outlined below, primarily utilizing gdown:

VisDrone/
├── VisDrone2019-DET-test-dev/
│   ├── annotations/
│   └── images/
├── VisDrone2019-DET-train/
│   ├── annotations/
│   └── images/
├── VisDrone2019-DET-val/
│   ├── annotations/
│   └── images/
├── VisDrone2019-DET-test-dev.zip
├── VisDrone2019-DET-train.zip
└── VisDrone2019-DET-val.zip

We then perform several high-level checks of paths and contents to confirm the
expected structure above is present in the environment.


"""""""""""""""""""""""""""""""""

'\n\nDescription:\n\nRunning the following scripts will collect our VisDrone data and load it into\nthe structure outlined below, primarily utilizing gdown:\n\nVisDrone/\n├── VisDrone2019-DET-test-dev/\n│   ├── annotations/\n│   └── images/\n├── VisDrone2019-DET-train/\n│   ├── annotations/\n│   └── images/\n├── VisDrone2019-DET-val/\n│   ├── annotations/\n│   └── images/\n├── VisDrone2019-DET-test-dev.zip\n├── VisDrone2019-DET-train.zip\n└── VisDrone2019-DET-val.zip\n\nWe then perform several high-level checks of paths and contents to confirm the\nexpected structure above is present in the environment.\n\n\n'

In [20]:
# Imports (for version, see requirements.txt)
# !pip install torchmetrics
import os
import torch
import cv2
import glob
from torch.utils.data import Dataset, DataLoader, Subset
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from tqdm import tqdm
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from sklearn.metrics import precision_score, recall_score, f1_score
from torchmetrics.detection.mean_ap import MeanAveragePrecision
from torchvision.ops import box_iou, MultiScaleRoIAlign
import torch.nn as nn
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.models.detection.backbone_utils import resnet_fpn_backbone
import torch.optim as optim
import time

In [30]:
if torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")
print(f"Using device: {device}")

Using device: cpu


In [22]:
def get_image_and_annotation_paths(image_dir, annotation_dir):
    # Use image / annotation paths to map to actual training / testing data
    image_paths = sorted(glob.glob(os.path.join(image_dir, '*.jpg')))
    annotation_paths = [
        os.path.join(annotation_dir, os.path.basename(p).replace('.jpg', '.txt'))
        for p in image_paths
    ]
    return image_paths, annotation_paths

# Update paths for your train/val/test sets with the correct relative paths
train_image_paths, train_annotation_paths = get_image_and_annotation_paths(
    '../data/VisDrone2019-DET-train/images',
    '../data/VisDrone2019-DET-train/annotations'
)

val_image_paths, val_annotation_paths = get_image_and_annotation_paths(
    '../data/VisDrone2019-DET-val/images',
    '../data/VisDrone2019-DET-val/annotations'
)

test_image_paths, test_annotation_paths = get_image_and_annotation_paths(
    '../data/VisDrone2019-DET-test-dev/images',
    '../data/VisDrone2019-DET-test-dev/annotations'
)

In [10]:
# Function to check if a directory exists and count files
def check_directory(path, file_extension=None):
    if not os.path.exists(path):
        print(f"❌ Path does not exist: {path}")
        return False

    if file_extension:
        files = [f for f in os.listdir(path) if f.endswith(file_extension)]
        print(f"✅ Path exists: {path} (contains {len(files)} {file_extension} files)")
    else:
        files = os.listdir(path)
        print(f"✅ Path exists: {path} (contains {len(files)} files)")

    return True

# Check all directories
print("Checking training directories:")
train_img_dir = '../data/VisDrone2019-DET-train/images'
train_ann_dir = '../data/VisDrone2019-DET-train/annotations'
check_directory(train_img_dir, '.jpg')
check_directory(train_ann_dir, '.txt')

print("\nChecking validation directories:")
val_img_dir = '../data/VisDrone2019-DET-val/images'
val_ann_dir = '../data/VisDrone2019-DET-val/annotations'
check_directory(val_img_dir, '.jpg')
check_directory(val_ann_dir, '.txt')

print("\nChecking test directories:")
test_img_dir = '../data/VisDrone2019-DET-test-dev/images'
test_ann_dir = '../data/VisDrone2019-DET-test-dev/annotations'
check_directory(test_img_dir, '.jpg')
check_directory(test_ann_dir, '.txt')

# Check if the paths match by comparing file counts
print("\nVerifying image and annotation counts match:")
if len(train_image_paths) == len(train_annotation_paths):
    print(f"Training: {len(train_image_paths)} images and annotations match")
else:
    print(f"X Training: {len(train_image_paths)} images but {len(train_annotation_paths)} annotations")

if len(val_image_paths) == len(val_annotation_paths):
    print(f"Validation: {len(val_image_paths)} images and annotations match")
else:
    print(f"X Validation: {len(val_image_paths)} images but {len(val_annotation_paths)} annotations")

if len(test_image_paths) == len(test_annotation_paths):
    print(f"Testing: {len(test_image_paths)} images and annotations match")
else:
    print(f"X Testing: {len(test_image_paths)} images but {len(test_annotation_paths)} annotations")

# Try to access a sample file from each directory (if they exist)
print("\nTrying to access sample files:")
if train_image_paths and train_annotation_paths:
    print(f"Sample training image path: {train_image_paths[0]}")
    print(f"Sample training annotation path: {train_annotation_paths[0]}")

    # Check if the sample files exist
    if os.path.exists(train_image_paths[0]) and os.path.exists(train_annotation_paths[0]):
        print("Sample training files exist")
    else:
        print("X Sample training files don't exist")

Checking training directories:
✅ Path exists: ../data/VisDrone2019-DET-train/images (contains 6471 .jpg files)
✅ Path exists: ../data/VisDrone2019-DET-train/annotations (contains 6471 .txt files)

Checking validation directories:
✅ Path exists: ../data/VisDrone2019-DET-val/images (contains 548 .jpg files)
✅ Path exists: ../data/VisDrone2019-DET-val/annotations (contains 548 .txt files)

Checking test directories:
✅ Path exists: ../data/VisDrone2019-DET-test-dev/images (contains 1610 .jpg files)
✅ Path exists: ../data/VisDrone2019-DET-test-dev/annotations (contains 1610 .txt files)

Verifying image and annotation counts match:
Training: 6471 images and annotations match
Validation: 548 images and annotations match
Testing: 1610 images and annotations match

Trying to access sample files:
Sample training image path: ../data/VisDrone2019-DET-train/images/0000002_00005_d_0000014.jpg
Sample training annotation path: ../data/VisDrone2019-DET-train/annotations/0000002_00005_d_0000014.txt
Samp

In [23]:
"""""""""""""""""""""""""""""""""

Description:

The next two blocks of code define a Dataset class, and then utilize that class
to create 3 instances, one for the training, validation, and testing splits.

We then use those objects to create our Dataloaders, such that our data is ready
for use in training our model.

"""""""""""""""""""""""""""""""""

'\n\nDescription:\n\nThe next two blocks of code define a Dataset class, and then utilize that class\nto create 3 instances, one for the training, validation, and testing splits.\n\nWe then use those objects to create our Dataloaders, such that our data is ready\nfor use in training our model.\n\n'

In [44]:
class VisDroneDataset(Dataset):
    def __init__(self, image_paths, annotation_paths, resize_to=(640, 640), transforms=None, device='mps'):

        self.image_paths = image_paths
        self.annotation_paths = annotation_paths

        self.resize_to = resize_to
        self.transforms = transforms
        self.device = device

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
      # Load image
      img_path = self.image_paths[idx]
      img = cv2.imread(img_path)
      # Error check to skip faulty image data
      if img is None:
          print(f"Failed to load image: {img_path}, skipping.")
          return self.__getitem__((idx + 1) % len(self))

      img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
      img = cv2.resize(img, self.resize_to)
      img = torch.tensor(img / 255.0, dtype=torch.float32).permute(2, 0, 1)

      # Load annotation
      ann_path = self.annotation_paths[idx]
      boxes = []
      labels = []

      try:
          with open(ann_path, 'r') as f:
              for line in f:
                  line = line.strip()
                  if not line:
                      continue
                  try:
                      vals = list(map(int, line.split(',')))
                      x, y, w, h, cls_id = vals[0], vals[1], vals[2], vals[3], vals[5]

                      # Skip invalid boxes
                      if w <= 0 or h <= 0:
                          continue

                      x2, y2 = x + w, y + h
                      if x2 <= x or y2 <= y:
                          continue

                      # Skip invalid labels
                      if cls_id <= 0:
                          continue

                      boxes.append([x, y, x2, y2])
                      labels.append(cls_id)
                  except ValueError:
                      print(f"Skipping annotation in: {ann_path}")
                      continue
      except FileNotFoundError:
          print(f"Missing annotation file: {ann_path}, skipping.")
          return self.__getitem__((idx + 1) % len(self))

      # Skip samples with no valid annotations
      if len(boxes) == 0:
          print(f"No valid boxes in {img_path}, skipping.")
          return self.__getitem__((idx + 1) % len(self))

      boxes = torch.tensor(boxes, dtype=torch.float32)
      labels = torch.tensor(labels, dtype=torch.int64)

      target = {'boxes': boxes, 'labels': labels}
      return img, target

In [45]:
# Use the following lines when training on the full set
# train_dataset = VisDroneDataset(train_image_paths, train_annotation_paths, resize_to=(512, 512))
# val_dataset = VisDroneDataset(val_image_paths, val_annotation_paths, resize_to=(512, 512))
# test_dataset = VisDroneDataset(test_image_paths, test_annotation_paths, resize_to=(512, 512))

# For now, using these lines to limit the number of samples trained on, for exploration

full_train_dataset = VisDroneDataset(train_image_paths, train_annotation_paths, resize_to=(512, 512), device='cpu')
full_val_dataset = VisDroneDataset(val_image_paths, val_annotation_paths, resize_to=(512, 512), device='cpu')
full_test_dataset = VisDroneDataset(test_image_paths, test_annotation_paths, resize_to=(512, 512), device='cpu')

# Find valid sample indices for training dataset
valid_indices = []
for i in range(len(full_train_dataset)):
    try:
        img, target = full_train_dataset[i]
        if len(target['boxes']) > 0:
            valid_indices.append(i)
        # Break early for sample testing
        if len(valid_indices) >= 200:
            break
    except Exception:
        continue

# Use only the first 200 valid entries
train_dataset = Subset(full_train_dataset, valid_indices)

# Find valid sample indices for validation dataset
valid_indices = []
for i in range(len(full_val_dataset)):
    try:
        img, target = full_val_dataset[i]
        if len(target['boxes']) > 0:
            valid_indices.append(i)
        if len(valid_indices) >= 200:
            break
    except Exception:
        continue

# Use only the first 200 valid entries
val_dataset = Subset(full_val_dataset, valid_indices)

# Find valid sample indices for validation dataset
valid_indices = []
for i in range(len(full_test_dataset)):
    try:
        img, target = full_test_dataset[i]
        if len(target['boxes']) > 0:
            valid_indices.append(i)
        if len(valid_indices) >= 200:
            break
    except Exception:
        continue

# Use only the first 200 valid entries
test_dataset = Subset(full_test_dataset, valid_indices)

# Create dataloaders for each subset

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True,
                                   collate_fn=lambda x: tuple(zip(*x)))
valid_loader = DataLoader(val_dataset, batch_size=4, shuffle=False,
                                    collate_fn=lambda x: tuple(zip(*x)))
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False,
                                    collate_fn=lambda x: tuple(zip(*x)))

In [41]:
# VisDrone Label map
label_map = {
    0: 'Ignored',
    1: 'Pedestrian',
    2: 'Person',
    3: 'Car',
    4: 'Van',
    5: 'Bus',
    6: 'Truck',
    7: 'Motor',
    8: 'Bicycle',
    9: 'Awning-tricycle',
    10: 'Tricycle',
    11: 'Other'
}

In [46]:
# BASELINE MODEL:

# Load a pre-trained Faster R-CNN model
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)

# Replace the classifier with one that matches your dataset
num_classes = len(label_map) + 1  # +1 for background
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

# Move model to GPU if available
device = torch.device("mps")
model.to(device)

# Optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005)

# Learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Reduce number of region proposals to improve speed
model.rpn.pre_nms_top_n_train = 1000
model.rpn.post_nms_top_n_train = 300
model.rpn.pre_nms_top_n_test = 500
model.rpn.post_nms_top_n_test = 100

In [35]:
print(torch.mps.is_available())

True


In [None]:
# Training the baseline model
num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0.0

    for images, targets in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        # Record epoch loss
        loss_dict = model(images, targets)
        loss = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f"Epoch {epoch+1} loss: {epoch_loss:.4f}")
    lr_scheduler.step()

# Validation pass
model.eval()
with torch.no_grad():
    for images, targets in valid_loader:
        images = [img.to(device) for img in images]
        outputs = model(images)
        break

Epoch 1/5:  20%|██        | 10/50 [01:30<06:08,  9.21s/it]

In [None]:
# Function to visualize the output predicitons made by the model
def show_prediction(img_tensor, pred, label_map=None, score_thresh=0.5):
    img = img_tensor.permute(1, 2, 0).cpu().numpy()  # [H, W, C]
    fig, ax = plt.subplots(1, figsize=(10, 10))
    ax.imshow(img)

    boxes = pred['boxes'].cpu()
    labels = pred['labels'].cpu()
    scores = pred['scores'].cpu()

    # Print prediction info
    print("Prediction Summary:")
    print(f"  Total predictions: {len(boxes)}")
    print(f"  Scores: {scores}")
    print(f"  Boxes: {boxes}")
    print(f"  Labels: {labels}")

    has_detections = False

    # Visualize each bounding box using patches package and plt

    for box, label, score in zip(boxes, labels, scores):
        if score < score_thresh:
            continue
        has_detections = True
        x1, y1, x2, y2 = box
        width, height = x2 - x1, y2 - y1
        rect = patches.Rectangle((x1, y1), width, height, linewidth=2, edgecolor='red', facecolor='none')
        ax.add_patch(rect)
        label_str = f"{label}"
        if label_map and label in label_map:
            label_str = label_map[label]
        ax.text(x1, y1 - 5, f"{label_str} ({score:.2f})", color='yellow', fontsize=12)

    if not has_detections:
        print(f"No predictions above threshold ({score_thresh})")

    plt.axis('off')
    plt.show()

In [None]:
show_prediction(images[0], outputs[0], label_map=label_map, score_thresh=0.05)

In [None]:
# Model Evaluation:
device = torch.device("mps")

iou_threshold=0.5
confidence_threshold=0.5

# Initialize mAP metric from torchmetrics
map_metric = MeanAveragePrecision()

all_preds = []
all_gts = []

# Set the model to evaluation mode
model.eval()

with torch.no_grad():
    for images, targets in tqdm(test_loader, desc="Evaluating"):
        # Move data to device
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        # Forward pass
        preds = model(images)

        # Update mAP metric
        map_metric.update(preds, targets)

        # Collect predictions and ground truth labels for precision/recall/F1
        for pred, tgt in zip(preds, targets):
            pred_boxes = pred['boxes']
            pred_labels = pred['labels']
            pred_scores = pred['scores']

            gt_boxes = tgt['boxes']
            gt_labels = tgt['labels']

            # Filter predictions based on confidence score
            valid_preds = pred_scores > confidence_threshold
            pred_boxes = pred_boxes[valid_preds]
            pred_labels = pred_labels[valid_preds]

            # Match predictions to ground truth using IoU threshold
            ious = box_iou(pred_boxes, gt_boxes)
            matched_gt_indices = torch.argmax(ious, dim=1)

            # Only keep predictions that have at least one match with ground truth
            for i, pred_label in enumerate(pred_labels):
                if ious[i, matched_gt_indices[i]] > iou_threshold:
                    all_preds.append(pred_label.item())
                    all_gts.append(gt_labels[matched_gt_indices[i]].item())

# Compute mAP
map_metrics = map_metric.compute()

In [None]:
print(f"mAP (0.50:0.95): {map_metrics['map']:.8f}")
print(f"mAP@0.50:        {map_metrics['map_50']:.8f}")
print(f"mAP@0.75:        {map_metrics['map_75']:.8f}")
print(f"mAP (small):     {map_metrics['map_small']:.8f}")
print(f"mAP (medium):    {map_metrics['map_medium']:.8f}")
print(f"mAP (large):     {map_metrics['map_large']:.8f}")

# Compute Precision, Recall, and F1 score for each class
classes = list(range(1, 13))
precision = precision_score(all_gts, all_preds, average=None, labels=classes)
recall = recall_score(all_gts, all_preds, average=None, labels=classes)
f1 = f1_score(all_gts, all_preds, average=None, labels=classes)

# Print out precision, recall, and F1 per class
for cls, p, r, f in zip(classes, precision, recall, f1):
    print(f"Class {cls:2d} → Precision: {p:.8f}, Recall: {r:.8f}, F1: {f:.8f}")

In [None]:
# Custom model MLP head with dropout regularization
class TwoMLPHeadWithDropout(nn.Module):
    """A two-layer MLP head with dropout for ROI features."""
    def __init__(self, in_channels, representation_size=1024, dropout_prob=0.5):
        super().__init__()
        self.fc6 = nn.Linear(in_channels, representation_size)
        self.relu6 = nn.ReLU(inplace=True)
        self.dropout6 = nn.Dropout(p=dropout_prob)
        self.fc7 = nn.Linear(representation_size, representation_size)
        self.relu7 = nn.ReLU(inplace=True)
        self.dropout7 = nn.Dropout(p=dropout_prob)

    def forward(self, x):
        # x is [N, C, H, W] → flatten
        x = x.flatten(start_dim=1)
        x = self.fc6(x)
        x = self.relu6(x)
        x = self.dropout6(x)
        x = self.fc7(x)
        x = self.relu7(x)
        x = self.dropout7(x)  # Apply dropout after second layer too
        return x

In [None]:
# Custom Faster RCNN model with pretrained backbone, custom RPN, ROI head
def get_custom_model(
    num_classes,
    backbone_name='resnet50',  # optionally 'resnet101'
    trainable_backbone_layers=3,
    dropout_prob=0.5,
    # manually set smaller anchor sizes
    anchor_sizes=((16,), (32,), (64,), (128,), (256,)),
    aspect_ratios=((0.5, 1.0, 2.0),) * 5
):

    # 1) Backbone + FPN (Feature Pyramid Network)
    backbone = resnet_fpn_backbone(
        backbone_name,
        pretrained=True,
        trainable_layers=trainable_backbone_layers
    )

    # 2) RPN with custom anchor generator (tuned for drone imagery)
    rpn_anchor_generator = AnchorGenerator(
        sizes=anchor_sizes,
        aspect_ratios=aspect_ratios
    )

    # 3) ROI Align for feature extraction
    roi_pooler = MultiScaleRoIAlign(
        featmap_names=['0', '1', '2', '3'],
        output_size=7,
        sampling_ratio=2
    )

    # 4) Build the Faster R-CNN model
    model = FasterRCNN(
        backbone,
        num_classes=num_classes,
        rpn_anchor_generator=rpn_anchor_generator,
        box_roi_pool=roi_pooler,
        # Tune RPN parameters
        rpn_pre_nms_top_n_train=2000,
        rpn_post_nms_top_n_train=1000,
        rpn_pre_nms_top_n_test=1000,
        rpn_post_nms_top_n_test=500,
        rpn_nms_thresh=0.7,
        rpn_fg_iou_thresh=0.7,
        rpn_bg_iou_thresh=0.3,
        # ROI parameters
        box_score_thresh=0.05,
        box_nms_thresh=0.5,
        box_detections_per_img=100,
        box_fg_iou_thresh=0.5,
        box_bg_iou_thresh=0.5
    )

    # 5) Replace box_head with our custom dropout-regularized MLP
    in_channels = model.roi_heads.box_head.fc6.in_features
    model.roi_heads.box_head = TwoMLPHeadWithDropout(
        in_channels,
        representation_size=1024,
        dropout_prob=dropout_prob
    )

    # 6) Replace the box predictor for our specific number of classes
    feat_in = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(feat_in, num_classes)

    return model

In [None]:
# Training function with optimizations SGD, learning rate scheduling, early stopping
def train_model(model, train_loader, valid_loader, num_epochs=10, lr=0.005, device='mps'):

    model.to(device)

    # Optimizer: SGD with momentum and L2 regularization
    optimizer = optim.SGD(
        [p for p in model.parameters() if p.requires_grad],
        lr=lr,
        momentum=0.9,
        weight_decay=0.0005  # L2 regularization
    )

    # Learning rate scheduler
    lr_scheduler = optim.lr_scheduler.StepLR(
        optimizer,
        step_size=3,
        gamma=0.1
    )

    # Early stopping parameters
    patience = 3
    best_val_loss = float('inf')
    early_stop_counter = 0

    # Track loss history
    train_loss_history = []
    val_loss_history = []

    # Training loop
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        epoch_loss = 0
        start_time = time.time()

        for images, targets in train_loader:
            # Move data to device
            images = [image.to(device) for image in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            # Zero gradients
            optimizer.zero_grad()

            # Forward pass
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())

            # Backward pass and optimization
            losses.backward()
            optimizer.step()

            # Accumulate loss
            epoch_loss += losses.item()

        # Calculate average training loss
        avg_train_loss = epoch_loss / len(train_loader)
        train_loss_history.append(avg_train_loss)

        # Validation phase
        model.eval()
        val_loss = 0

        with torch.no_grad():
            for images, targets in valid_loader:
                images = [image.to(device) for image in images]
                targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

                loss_dict = model(images, targets)
                losses = sum(loss for loss in loss_dict.values())
                val_loss += losses.item()

        # Calculate average validation loss
        avg_val_loss = val_loss / len(valid_loader)
        val_loss_history.append(avg_val_loss)

        # Update learning rate
        lr_scheduler.step()

        # Print epoch summary
        time_elapsed = time.time() - start_time
        print(f'Epoch {epoch+1}/{num_epochs}, '
              f'Train Loss: {avg_train_loss:.4f}, '
              f'Val Loss: {avg_val_loss:.4f}, '
              f'Time: {time_elapsed:.2f}s')

        # Early stopping check
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), 'best_model.pth')
            early_stop_counter = 0
        else:
            early_stop_counter += 1
            if early_stop_counter >= patience:
                print(f'Early stopping at epoch {epoch+1}')
                break

    # Load best model
    model.load_state_dict(torch.load('best_model.pth'))
    return model, train_loss_history, val_loss_history

In [None]:
# Initialize and train the model
def initialize_and_train_model():
    # Set device
    device = torch.device("mps")

    # Number of classes in VisDrone dataset (10 classes + background)
    num_classes = len(label_map) + 1

    # Initialize model
    model = get_custom_model(num_classes=num_classes)

    # Train the model
    trained_model, train_losses, val_losses = train_model(
        model=model,
        train_loader=train_loader,
        valid_loader=valid_loader,
        num_epochs=10,
        lr=0.001,
        device=device
    )

    return trained_model, train_losses, val_losses

In [None]:
# Run the training
custom_model, train_loss_history, val_loss_history = initialize_and_train_model()

# Plot training and validation loss
plt.figure(figsize=(10, 5))
plt.plot(train_loss_history, label='Training Loss')
plt.plot(val_loss_history, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.grid(True)
plt.show()

# Evaluate the model on test set
custom_model.eval()
map_metric = MeanAveragePrecision()

with torch.no_grad():
    for images, targets in tqdm(test_loader, desc="Evaluating custom model"):
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        predictions = custom_model(images)
        map_metric.update(predictions, targets)

# Compute and print mAP metrics
map_results = map_metric.compute()
print(f"Custom Model Results:")
print(f"mAP (0.50:0.95): {map_results['map']:.8f}")
print(f"mAP@0.50:        {map_results['map_50']:.8f}")
print(f"mAP@0.75:        {map_results['map_75']:.8f}")
print(f"mAP (small):     {map_results['map_small']:.8f}")
print(f"mAP (medium):    {map_results['map_medium']:.8f}")
print(f"mAP (large):     {map_results['map_large']:.8f}")