In [20]:
#Imports, paths to (augmented) detector dataset, seeds, hyperparameters.
import json
import random
from pathlib import Path

import numpy as np
import torch
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
from torchvision import models

# Reproducibility
RANDOM_SEED = 42
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

DEVICE = (
    torch.device("cuda")
    if torch.cuda.is_available()
    else torch.device("mps")
    if hasattr(torch.backends, "mps") and torch.backends.mps.is_available()
    else torch.device("cpu")
)
print("Using device:", DEVICE)

# Paths (aligned with augmentation notebook)
BASE_DATA_ROOT = Path("/Users/stephenmacris/Documents/School/CS5100/Project/CarDD_release/CarDD_COCO")

DETECTOR_ROOT = BASE_DATA_ROOT / "detector"
AUG_ROOT = BASE_DATA_ROOT / "augmented" / "detector"

TRAIN_IMG_DIR = AUG_ROOT / "train" / "images"
TRAIN_ANN = AUG_ROOT / "train" / "annotations" / "annotations.json"

VAL_IMG_DIR = DETECTOR_ROOT / "val" / "images"
VAL_ANN = DETECTOR_ROOT / "val" / "annotations" / "annotations.json"

TEST_IMG_DIR = DETECTOR_ROOT / "test" / "images"
TEST_ANN = DETECTOR_ROOT / "test" / "annotations" / "annotations.json"

with open(TRAIN_ANN, "r") as f:
    train_coco = json.load(f)

categories = train_coco.get("categories", [])
category_id_to_name = {c["id"]: c["name"] for c in categories}
NUM_CLASSES = len(categories) + 1  # +1 for background
print("Classes:", category_id_to_name)

# Hyperparameters
NUM_EPOCHS = 10
BATCH_SIZE = 4
LEARNING_RATE = 5e-4
WEIGHT_DECAY = 1e-4
CHECKPOINT_PATH = "fasterrcnn_cardd_best.pt"


Using device: mps
Classes: {1: 'dent', 2: 'scratch', 3: 'crack', 4: 'glass shatter', 5: 'lamp broken', 6: 'tire flat'}


In [None]:
#Dataset/Dataloader: custom Dataset to read images + bboxes/labels from COCO, apply train/val transforms, 
#collate function for variable targets.
from collections import defaultdict
from PIL import Image
import torchvision.transforms.functional as F
from torch.utils.data import Dataset

class Compose:
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, image, target):
        for t in self.transforms:
            image, target = t(image, target)
        return image, target

class ToTensor:
    def __call__(self, image, target):
        return F.to_tensor(image), target

class RandomHorizontalFlip:
    def __init__(self, p=0.5):
        self.p = p

    def __call__(self, image, target):
        if random.random() < self.p:
            width, _ = image.size
            image = image.transpose(Image.FLIP_LEFT_RIGHT)
            if "boxes" in target:
                boxes = target["boxes"]
                boxes = boxes.clone()
                boxes[:, [0, 2]] = width - boxes[:, [2, 0]]
                target["boxes"] = boxes
        return image, target

def get_transform(train=True):
    transforms = []
    if train:
        transforms.append(RandomHorizontalFlip(0.5))
    transforms.append(ToTensor())
    return Compose(transforms)

class CocoDetectionDataset(Dataset):
    def __init__(self, images_dir: Path, ann_path: Path, transforms=None):
        with open(ann_path, "r") as f:
            coco = json.load(f)
        self.images_dir = Path(images_dir)
        self.transforms = transforms

        self.id_to_image = {img["id"]: img for img in coco.get("images", [])}
        self.anns_by_image = defaultdict(list)
        for ann in coco.get("annotations", []):
            self.anns_by_image[ann["image_id"]].append(ann)
        self.image_ids = list(self.id_to_image.keys())

    def __len__(self):
        return len(self.image_ids)

    def __getitem__(self, idx):
        image_id = self.image_ids[idx]
        img_info = self.id_to_image[image_id]
        img_path = self.images_dir / img_info["file_name"]
        image = Image.open(img_path).convert("RGB")

        boxes = []
        labels = []
        areas = []
        iscrowd = []

        for ann in self.anns_by_image.get(image_id, []):
            x, y, w, h = ann["bbox"]
            boxes.append([x, y, x + w, y + h])
            labels.append(ann["category_id"])
            areas.append(ann.get("area", w * h))
            iscrowd.append(ann.get("iscrowd", 0))

        boxes = torch.tensor(boxes, dtype=torch.float32)
        labels = torch.tensor(labels, dtype=torch.int64)
        areas = torch.tensor(areas, dtype=torch.float32)
        iscrowd = torch.tensor(iscrowd, dtype=torch.int64)

        target = {
            "boxes": boxes,
            "labels": labels,
            "image_id": torch.tensor([image_id]),
            "area": areas,
            "iscrowd": iscrowd,
        }

        if self.transforms:
            image, target = self.transforms(image, target)

        return image, target

def collate_fn(batch):
    return tuple(zip(*batch))

train_dataset = CocoDetectionDataset(TRAIN_IMG_DIR, TRAIN_ANN, transforms=get_transform(train=True))
val_dataset = CocoDetectionDataset(VAL_IMG_DIR, VAL_ANN, transforms=get_transform(train=False))

test_dataset = None
if TEST_ANN.exists():
    test_dataset = CocoDetectionDataset(TEST_IMG_DIR, TEST_ANN, transforms=get_transform(train=False))

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, collate_fn=collate_fn)

test_loader = None
if test_dataset:
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, collate_fn=collate_fn)

print(f"Train batches: {len(train_loader)}, Val batches: {len(val_loader)}")


Train batches: 2767, Val batches: 203


In [18]:
#Model definition: choose detector (e.g., torchvision fasterrcnn_resnet50_fpn or YOLOv5/YOLOv8 if available); 
#adapt num_classes to the damage categories.
from torchvision.models.detection import fasterrcnn_resnet50_fpn, FasterRCNN_ResNet50_FPN_Weights
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

# Use pretrained backbone weights when available, then replace the detection head to match NUM_CLASSES
try:
    weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
    print("Using pretrained ResNet-50 FPN weights; replacing head for NUM_CLASSES")
except Exception:
    weights = None
    print("Pretrained weights unavailable; initializing backbone randomly and replacing head")

model = fasterrcnn_resnet50_fpn(weights=weights)

# Replace the classification head (classifier + bbox regressor) to match our class count
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, NUM_CLASSES)

model.to(DEVICE)
print(model)


Using pretrained ResNet-50 FPN weights; replacing head for NUM_CLASSES
FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=0.0)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=0.0)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=0.0)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
 

In [19]:
#Training loop: optimizer/scheduler setup, epoch loop with loss logging, checkpointing best model by val mAP.
from torchvision.ops import box_iou

def evaluate_map(model, data_loader, device, iou_thresholds=None, score_thresh=0.05):
    if iou_thresholds is None:
        iou_thresholds = [0.5] + [round(x, 2) for x in np.arange(0.55, 0.96, 0.05)]

    stats_per_cls = {t: defaultdict(lambda: {"tp": 0, "fp": 0, "fn": 0}) for t in iou_thresholds}
    model.eval()
    with torch.no_grad():
        for images, targets in data_loader:
            images = [img.to(device) for img in images]
            outputs = model(images)

            for output, target in zip(outputs, targets):
                gt_boxes = target["boxes"].to(device)
                gt_labels = target["labels"].to(device)

                pred_boxes = output["boxes"].to(device)
                pred_labels = output["labels"].to(device)
                scores = output["scores"].to(device)

                keep = scores >= score_thresh
                pred_boxes = pred_boxes[keep]
                pred_labels = pred_labels[keep]

                for t in iou_thresholds:
                    matched = set()
                    for pb, pl in zip(pred_boxes, pred_labels):
                        cls = int(pl.item())
                        mask = (gt_labels == pl)
                        if mask.sum() == 0:
                            stats_per_cls[t][cls]["fp"] += 1
                            continue

                        ious = box_iou(pb.unsqueeze(0), gt_boxes[mask]).squeeze(0)
                        if ious.numel() == 0:
                            stats_per_cls[t][cls]["fp"] += 1
                            continue
                        max_iou, max_idx = ious.max(0)
                        if max_iou >= t:
                            global_idx = mask.nonzero(as_tuple=False).squeeze(1)[max_idx].item()
                            if global_idx not in matched:
                                matched.add(global_idx)
                                stats_per_cls[t][cls]["tp"] += 1
                            else:
                                stats_per_cls[t][cls]["fp"] += 1
                        else:
                            stats_per_cls[t][cls]["fp"] += 1

                    # FN: ground truths of each class that were not matched
                    for cls in gt_labels.unique():
                        cls_id = int(cls.item())
                        cls_mask = (gt_labels == cls)
                        gt_indices = cls_mask.nonzero(as_tuple=False).squeeze(1).tolist()
                        matched_cls = [idx for idx in matched if int(gt_labels[idx].item()) == cls_id]
                        fn = len(gt_indices) - len(matched_cls)
                        stats_per_cls[t][cls_id]["fn"] += max(fn, 0)

    map_per_t = []
    map50 = 0.0
    per_class_map50 = {}

    for idx, t in enumerate(iou_thresholds):
        cls_scores = []
        for cls_id, vals in stats_per_cls[t].items():
            tp, fp, fn = vals["tp"], vals["fp"], vals["fn"]
            denom = tp + fp + fn + 1e-6
            cls_score = tp / denom
            cls_scores.append(cls_score)
            if t == 0.5:
                per_class_map50[cls_id] = cls_score
        if cls_scores:
            score_t = float(np.mean(cls_scores))
            map_per_t.append(score_t)
            if t == 0.5:
                map50 = score_t
        else:
            map_per_t.append(0.0)
            if t == 0.5:
                map50 = 0.0

    map5095 = float(np.mean(map_per_t)) if map_per_t else 0.0
    return map50, map5095, per_class_map50

optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE, momentum=0.9, weight_decay=WEIGHT_DECAY)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

best_map50 = 0.0

for epoch in range(NUM_EPOCHS):
    model.train()
    epoch_loss = 0.0

    for images, targets in train_loader:
        images = [img.to(DEVICE) for img in images]
        targets = [
            {k: v.to(DEVICE) if torch.is_tensor(v) else v for k, v in t.items()}
            for t in targets
        ]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        epoch_loss += losses.item()

    lr_scheduler.step()
    avg_loss = epoch_loss / max(len(train_loader), 1)

    val_map50, val_map5095, _ = evaluate_map(model, val_loader, DEVICE)

    if val_map50 > best_map50:
        best_map50 = val_map50
        torch.save(model.state_dict(), CHECKPOINT_PATH)
        print(f"Epoch {epoch+1}: new best mAP@0.5={val_map50:.4f} â†’ saved {CHECKPOINT_PATH}")

    print(
        f"Epoch {epoch+1}/{NUM_EPOCHS} | loss={avg_loss:.4f} | val mAP@0.5={val_map50:.4f} | val mAP@[0.5:0.95]={val_map5095:.4f}"
    )


Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'CocoDetectionDataset' on <module '__main__' (built-in)>


KeyboardInterrupt: 

In [None]:
#Evaluation: compute mAP@0.5 and mAP@[0.5:0.95] on val/test; print per-class AP.
# Load best checkpoint if available
if Path(CHECKPOINT_PATH).exists():
    state = torch.load(CHECKPOINT_PATH, map_location=DEVICE)
    model.load_state_dict(state)
    print(f"Loaded checkpoint: {CHECKPOINT_PATH}")
else:
    print("No checkpoint found; evaluating current model state.")

val_map50, val_map5095, val_per_class = evaluate_map(model, val_loader, DEVICE)
print(f"Validation mAP@0.5: {val_map50:.4f} | mAP@[0.5:0.95]: {val_map5095:.4f}")
for cls_id, score in sorted(val_per_class.items()):
    name = category_id_to_name.get(cls_id, str(cls_id))
    print(f"  {name}: {score:.4f}")

if test_loader is not None:
    test_map50, test_map5095, test_per_class = evaluate_map(model, test_loader, DEVICE)
    print(f"\nTest mAP@0.5: {test_map50:.4f} | mAP@[0.5:0.95]: {test_map5095:.4f}")
    for cls_id, score in sorted(test_per_class.items()):
        name = category_id_to_name.get(cls_id, str(cls_id))
        print(f"  {name}: {score:.4f}")
else:
    print("Test set not found; skipping test evaluation.")


In [None]:
#Inference demo: run trained detector on a few test images, visualize and save predicted boxes/scores for qualitative review
from PIL import ImageDraw

sample_images = list(TEST_IMG_DIR.glob("*.jpg"))[:3] if TEST_IMG_DIR.exists() else []
if not sample_images:
    sample_images = list(VAL_IMG_DIR.glob("*.jpg"))[:3]

score_threshold = 0.5
model.eval()

for img_path in sample_images:
    img = Image.open(img_path).convert("RGB")
    img_tensor = F.to_tensor(img).to(DEVICE)

    with torch.no_grad():
        output = model([img_tensor])[0]

    keep = output["scores"] >= score_threshold
    boxes = output["boxes"][keep].cpu()
    labels = output["labels"][keep].cpu()
    scores = output["scores"][keep].cpu()

    vis = img.copy()
    draw = ImageDraw.Draw(vis)
    for box, label, score in zip(boxes, labels, scores):
        x1, y1, x2, y2 = box.tolist()
        cls_name = category_id_to_name.get(int(label.item()), str(int(label.item())))
        draw.rectangle([x1, y1, x2, y2], outline="red", width=3)
        draw.text((x1 + 2, y1 + 2), f"{cls_name}: {score:.2f}", fill="yellow")

    plt.figure(figsize=(8, 6))
    plt.title(img_path.name)
    plt.imshow(vis)
    plt.axis("off")
    plt.show()
