# 1/ Env Setup
Load necessary libraries to run this notebook. <br>
All libraries are cited in ```requirements.txt```. <br>
Documentation: https://docs.pytorch.org/vision/main/models/generated/torchvision.models.detection.retinanet_resnet50_fpn_v2.html

## 1.1/ Import dependencies
Load libraries:

In [None]:
import sys  
import os

current_dir = os.getcwd() # path to the current working directory (notebook location)
project_root = os.path.abspath(os.path.join(current_dir, '../..')) # path to project root

if project_root not in sys.path: # add project root to sys.path
    sys.path.insert(0, project_root)
print(f"Project root added to sys.path: {project_root}")

Project root added to sys.path: /Users/litani/Documents/myCode/steel-defects


In [3]:
from pathlib import Path
import torch 
import torchvision
import cv2
from torchmetrics.detection import MeanAveragePrecision
from torchvision.models.detection import retinanet_resnet50_fpn_v2
from torchvision.models.detection.retinanet import RetinaNetClassificationHead
import albumentations as A # note that albumentationsx was installed and but you still albumentations
from albumentations.pytorch import ToTensorV2
import numpy as np
from src.utils.parse_xml import parse_xml

## 1.2/ Set reproducibility
Device and seed:

In [4]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
torch.manual_seed(42)
np.random.seed(42)

# 2/ Configuration Management
Define:
- image path
- model hyperparameters
- hardware


In [6]:
class Config:
    # Paths
    DATA_ROOT = project_root / "data" / "raw"
    TRAIN_IMG = DATA_ROOT / "train_images"
    TRAIN_ANN = DATA_ROOT / "train_annotations"
    VAL_IMG = DATA_ROOT / "valid_images"
    VAL_ANN = DATA_ROOT / "valid_annotations"

    # Model parameters
    NUM_CLASSES = 7 # 6 defects + 1 background
    BACKBONE_PRETRAINED = True 
    
    # Training hyperparameters
    BATCH_SIZE = 16  # no mention of batch size in the paper
    NUM_EPOCHS = 5 # 24 epochs based on paper. Reduced for quicker testing
    LEARNING_RATE = 0.0025 # 0.0025 based on paper
    MOMENTUM = 0.9 # 0.9 based on paper
    WEIGHT_DECAY = 0.0005 # double check this value <<<<<<<

    # Hardware
    DEVICE = device
    NUM_WORKERS = 8
    PIN_MEMORY = True if torch.cuda.is_available() else False

config = Config()

TypeError: unsupported operand type(s) for /: 'str' and 'str'

In [44]:
config

<__main__.Config at 0x376bdda90>

# 3/ Dataset Class
- Load images and annotations into PyTorch format. 
- This is necessary since RetineNet excepts a dictionary format. The latter requires XML parsing.

In [None]:
class SteelDefectDataset(torch.utils.data.Dataset): # inherits from PyTorch Dataset class, making it compatible w/ DataLoader for batch and // processing
    def __init__(self, img_dir, ann_dir, transforms = None): # takes 3 inputs: image directory, annotation directory, and optional transforms (default no augmentation)
        self.img_dir = Path(img_dir)
        self.ann_dir = Path(ann_dir)
        self.transforms = transforms

        # Logic of loading a dataset
        self.images = sorted(list(self.img_dir.glob("*.jpg"))) # find all .jpg files, sort them in order
        self.class_map = { 
            "crazing" : 1,
            "inclusion" : 2,
            "patches" : 3,
            "pitted_surface" : 4,
            "rolled-in_scale" : 5,
            "scratches" : 6,
        }                                                  # mapping defect names (categorical) to integers (numerical)
    def __len__(self): # runs total number of images
        return len(self.images) 
    
    def __getitem__(self, idx):
        # Load image
        img_path = self.images[idx]
        image = cv2.imread(str(img_path))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # convert BGR, i.e. OpenCV format, to RGB, i.e. PyTorch/RetinaNet format

        # Parse annotations using parse_xml function
        xml_path = self.ann_dir / img_path.name.replace(".jpg", ".xml")
        boxes_data = parse_xml(str(xml_path))

        # Convert XML to lists: [x1, y1, x2, y2], labels = integers
        boxes = []
        labels = []
        for box in boxes_data:
            boxes.append([box["xmin"], box["ymin"], box["xmax"], box["ymax"]])
            labels.append(self.class_map[box["label"]])

        # Apply transforms before converting to tensors
        if self.transforms:
            transformed = self.transforms(image = image, bboxes = boxes, labels = labels)
            image = transformed["image"]
            target = {
                "boxes": torch.as_tensor(transformed["bboxes"], dtype = torch.float32),
                "labels": torch.as_tensor(transformed["labels"], dtype = torch.int64),
                "image_id": torch.as_tensor([idx])
            }
        else:
            target = {
                "boxes": torch.as_tensor(boxes, dtype = torch.float32),
                "labels": torch.as_tensor(labels, dtype = torch.int64),
                "image_id": torch.as_tensor([idx])
            }
        
        return image, target

# 4/ Data Augmentation
- We have 1800 images, resorting to image augmentation is mandatory to avoid overfitting. 
- Geometric transformation, simplist form, will be applied as a quick fix:
    - Horizental/Vertical flips
    - Rotate by 90
    - Others: brightness, contrast, adding random noise
- **NB:** OpenCV stores images as [Height in pixels, Width in pixels, RGB] while PyTorch expects [channel, height, width]

In [None]:
def get_train_transforms():
    return A.Compose([
        A.HorizontalFlip(p = .5), # probability of 50% to flip the image left-right >> defects visible from any orientation
        A.VerticalFlip(p = .5), # flip top-bottom >> surfaces viewed from any angle
        A.RandomRotate90(p = .5), # rotate 90, 180 or 270 degrees >> increase orientation variety
        A.RandomBrightnessContrast(p = .3), # adjust brightness/contrast >> image quality
        A.GaussNoise(p = .2), # add random noise
        A.Normalize(mean = (0,0,0), std = (1,1,1)),
        ToTensorV2() # converts NumPy array to PyTorch tensor, divides pix by 255 ([0,1] range), permutes format from [H,W,C] to [C,H,W].
    ], bbox_params = A.BboxParams(format = "pascal_voc", label_fields = ["labels"])) # XML annotation has a Pascal Voc format

def get_val_transforms(): # for validation on clean images, converting only to tensor format, no augmentation.
    return A.Compose([
        A.Normalize(mean = (0,0,0), std = (1,1,1)),
        ToTensorV2()
    ], bbox_params = A.BboxParams(format = "pascal_voc", label_fields = ["labels"]))

# 5/ Model Initilization
- Apply transfer learning where pretrained RetineNet is loaded the changes are applied based on the dataset

In [47]:
def create_model(num_classes, pretrained = True):
    # Load pretrained RetineNet w/ ResNet50 backbone,
    model = retinanet_resnet50_fpn_v2(weights = "DEFAULT" if pretrained else None)  # DEFAULT loads ImageNet pretrained weights for transfer learning
    
    # Replace head so that model learns defect-specific patterns
    num_anchors = model.head.classification_head.num_anchors # default is 9 anchors per location >> 3 scales x 3 aspect ratios
    model.head.classification_head = RetinaNetClassificationHead(
        in_channels = 256,          # Input: 256 features from FPN   
        num_anchors = num_anchors,  # Process: 9 anchors per location
        num_classes = num_classes   # Output: 7 classes scores per anchor
    )
    return model

model = create_model(config.NUM_CLASSES).to(device) # Create model instance and move to device CPU/GPU, config.NUM_CLASSES = 7 includes background

Downloading: "https://download.pytorch.org/models/retinanet_resnet50_fpn_v2_coco-5905b1c5.pth" to /Users/litani/.cache/torch/hub/checkpoints/retinanet_resnet50_fpn_v2_coco-5905b1c5.pth


100.0%


# 6/ Data Loaders
- collate_func is a fucntion that works on the collation process of RetinaNet since images have variable bbox counts.

In [48]:
# Create train dataset with augmentations and val dataset without augmentations, only format conversion
train_dataset = SteelDefectDataset(
    config.TRAIN_IMG,
    config.TRAIN_ANN,
    transforms = get_train_transforms()
)

val_dataset = SteelDefectDataset(
    config.VAL_IMG,
    config.VAL_ANN,
    transforms = get_val_transforms()
)

# Create data loaders
def collate_func(batch):
    return tuple(zip(*batch)) # transpose batch to a list of tensors instead of stack of images into a single tensor 
                                #[(img1, target1), (img2, target2)] >> ([img1, img2],[target1, target2])

train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size = config.BATCH_SIZE,
    shuffle = True, # shuffle training data for better generalization
    num_workers = config.NUM_WORKERS,
    pin_memory = config.PIN_MEMORY, # only useful if using GPU
    collate_fn = collate_func 
)

val_loader = torch.utils.data.DataLoader(
    val_dataset,
    batch_size = config.BATCH_SIZE,
    shuffle = False, # validation data should be consistent
    num_workers = config.NUM_WORKERS,
    pin_memory = config.PIN_MEMORY,
    collate_fn = collate_func
)

  self._set_keys()


# 7/ Training Loop
- Quick and dirty: use ADAM as an optimizer for an initial model training, won't be launching/tracking experiments in the beginning
- this is standard supervised learning using RetinaNet loss

In [None]:
# Optimizer
optimizer = torch.optim.SGD(
    model.parameters(),
    lr = config.LEARNING_RATE,
    momentum = config.MOMENTUM,
    weight_decay = config.WEIGHT_DECAY # >>> double check this value <<<
)

# Scheduler
scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size = 5, 
    gamma = 0.8   # reduce to 80% every 5 epochs
)

# Checkpoint directory
Path("models").mkdir(exist_ok = True)

# Training functions
def train_one_epoch(model, loader, optimizer, device):
    model.train()
    total_loss = 0

    for images, targets in loader:
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k,v in t.items()} for t in targets]

        # Forward pass: RetinaNet returns loss dictionary
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        # Backward pass
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        total_loss += losses.item()

    return total_loss / len(loader)

# Training Loop
for epoch in range(config.NUM_EPOCHS):
    train_loss = train_one_epoch(model, train_loader, optimizer, device)
    scheduler.step()

    print(f">>>Epoch {epoch +1}/{config.NUM_EPOCHS} | Loss: {train_loss: .5f} <<<")
    
    # Save checkpoint every 5 epochs
    if (epoch + 1) % 5 == 0:
        torch.save(model.state_dict(), f"models/retinanet_epoch_{epoch+1}.pth")

Traceback (most recent call last):
Traceback (most recent call last):
  File [35m"<string>"[0m, line [35m1[0m, in [35m<module>[0m
    from multiprocessing.spawn import spawn_main; [31mspawn_main[0m[1;31m(tracker_fd=97, pipe_handle=111)[0m
                                                  [31m~~~~~~~~~~[0m[1;31m^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^[0m
  File [35m"/opt/homebrew/Cellar/python@3.13/3.13.4/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/spawn.py"[0m, line [35m122[0m, in [35mspawn_main[0m
    exitcode = _main(fd, parent_sentinel)
  File [35m"/opt/homebrew/Cellar/python@3.13/3.13.4/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/spawn.py"[0m, line [35m132[0m, in [35m_main[0m
    self = reduction.pickle.load(from_parent)
  File [35m"<string>"[0m, line [35m1[0m, in [35m<module>[0m
    from multiprocessing.spawn import spawn_main; [31mspawn_main[0m[1;31m(tracker_fd=97, pipe_handle=118)[0m
          

RuntimeError: DataLoader worker (pid(s) 42765) exited unexpectedly

Traceback (most recent call last):
  File [35m"<string>"[0m, line [35m1[0m, in [35m<module>[0m
    from multiprocessing.spawn import spawn_main; [31mspawn_main[0m[1;31m(tracker_fd=97, pipe_handle=162)[0m
                                                  [31m~~~~~~~~~~[0m[1;31m^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^[0m
  File [35m"/opt/homebrew/Cellar/python@3.13/3.13.4/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/spawn.py"[0m, line [35m122[0m, in [35mspawn_main[0m
    exitcode = _main(fd, parent_sentinel)
  File [35m"/opt/homebrew/Cellar/python@3.13/3.13.4/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/spawn.py"[0m, line [35m132[0m, in [35m_main[0m
    self = reduction.pickle.load(from_parent)
[1;35mAttributeError[0m: [35mCan't get attribute 'SteelDefectDataset' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>[0m


# 8/ Validation and Metrics
- First, evaluate model perf without retraining
- Aim for .5-.6

In [None]:

# Evaluation function
@torch.no_grad()
def evaluate(model, loader, device):
    model.eval()
    metric = MeanAveragePrecision(iou_thresholds = [.5])  # Initialize Mean Average Precision metric

    all_predictions = []
    all_targets = []

    for images, targets in loader:
        images = [img.to(device) for img in images]
        predictions = model(images)

        # Set format for predictions for torchmetrics
        preds = [
            {
                "boxes": pred["boxes"],
                "scores": pred["scores"],
                "labels": pred["labels"]
            }
            for pred in predictions
        ]

        #Set format for targets for torchmetrics
        targs = [
            {
                "boxes": t["boxes"].to(device),
                "labels": t["labels"].to(device)
            }
            for t in targets
        ]
        
        metric.update(preds, targs)

    results = metric.compute()
    return results

# Run validation
results = evaluate(model, val_loader, device)
print(f"mAP at .5: {results["map_50"]: .4f}")