<a href="https://colab.research.google.com/github/ttntbn/Deep-Learning/blob/main/helmet_frcnn_clean_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# Helmet Detection using Faster R-CNN — Clean Notebook
A cleaned, minimal-yet-complete training notebook for Faster R-CNN on a VOC-XML style helmet dataset.

**What you get**
- Simple hyperparam block
- Clean dataset loader (VOC-XML)
- Robust DataLoader (Drive-friendly)
- Warmup LR + optional schedulers
- MNIST-style epoch-loss plot (in-cell, real-time)
- Optional Comet logging (toggle)
- Best checkpoint + per-epoch checkpoints


In [None]:

# === (Optional) Enable Comet: set `ENABLE_COMET=True` and fill API/WORKSPACE if you want logging ===
ENABLE_COMET = True  # set False to disable
COMET_API_KEY   = "KlDdmMhprhNGWTot1PPhnMo4u"        # required if ENABLE_COMET=True
COMET_WORKSPACE = "boonyapon-boontub-0272"      # e.g., "boonyapon-boontub-0272"
COMET_PROJECT   = "helmet-fasterrcnn"

if ENABLE_COMET:
    try:
        # Comet should be imported before torch for full auto logging;
        # we do our own explicit logging so this is mostly informational.
        import sys, subprocess
        subprocess.run([sys.executable, "-m", "pip", "install", "-q", "comet-ml"], check=False)
        import os
        from comet_ml import Experiment
        os.environ["COMET_API_KEY"]      = COMET_API_KEY
        os.environ["COMET_WORKSPACE"]    = COMET_WORKSPACE
        os.environ["COMET_PROJECT_NAME"] = COMET_PROJECT
        experiment = Experiment(
            api_key=os.getenv("COMET_API_KEY"),
            workspace=os.getenv("COMET_WORKSPACE"),
            project_name=os.getenv("COMET_PROJECT_NAME"),
            auto_metric_logging=False,
            auto_param_logging=False,
            auto_output_logging="simple",
        )
        experiment.set_name("fasterrcnn-helmet-run")
        print("Comet enabled.")
    except Exception as e:
        print("[WARN] Comet init failed -> turning off. Reason:", e)
        ENABLE_COMET = False
        experiment = None
else:
    experiment = None


[1;38;5;39mCOMET INFO:[0m Experiment is live on comet.com https://www.comet.com/boonyapon-boontub-0272/helmet-fasterrcnn/824a53f5c7db4579af4f3c6499051ef3



Comet enabled.


In [None]:

# ==== Imports & setup ====
import os, time, random, xml.etree.ElementTree as ET
from pathlib import Path
from typing import List, Dict, Tuple

import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms as T
import torchvision
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True  # tolerate truncated images

import matplotlib.pyplot as plt
from IPython.display import display

# speed up convs on CUDA
torch.backends.cudnn.benchmark = True

# Reproducibility
SEED = 1337
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)


Device: cuda


In [None]:

# ==== Hyperparameters ====
# Training
EPOCHS        = 15
BATCH_SIZE    = 8
LR            = 5e-4
WEIGHT_DECAY  = 1e-4
MOMENTUM      = 0.9
WARMUP_STEPS  = 500

# Scheduler
LR_SCHEDULER  = "multistep"   # ["none","multistep","cosine"]
MILESTONES    = [10, 13]
GAMMA         = 0.1
COSINE_TMAX   = EPOCHS
ETA_MIN       = 1e-6

# Data aug
H_FLIP_PROB   = 0.5
RAND_RESIZE   = (640, 1024)   # min, max shorter side
COLOR_JITTER  = True

# Paths (set this to your dataset root)
DATA_ROOT = Path("/content/drive/MyDrive/helmet_dataset")  # change if needed
TRAIN_IMG = DATA_ROOT / "train/images"
TRAIN_ANN = DATA_ROOT / "train/annotations"
VAL_IMG   = DATA_ROOT / "val/images"
VAL_ANN   = DATA_ROOT / "val/annotations"

# Output
OUTPUT_DIR = Path("./outputs_frcnn_clean")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)


In [None]:

# ==== Utilities ====

class Averager:
    def __init__(self): self.reset()
    def reset(self): self.total, self.count = 0.0, 0
    def update(self, v, n: int = 1): self.total += float(v) * n; self.count += n
    @property
    def value(self): return self.total / max(1, self.count)

def set_lr(opt, lr):
    for g in opt.param_groups: g['lr'] = lr

def get_warmup_lr(base_lr, step, warmup_steps):
    if warmup_steps <= 0: return base_lr
    return base_lr * min(1.0, step / float(warmup_steps))

def collate_fn(batch):
    imgs, tgts = list(zip(*batch))
    return list(imgs), list(tgts)

def make_transforms(train: bool):
    ts = []
    ts.append(T.ToTensor())
    if train:
        if H_FLIP_PROB > 0:
            ts.append(T.RandomHorizontalFlip(p=H_FLIP_PROB))
        if COLOR_JITTER:
            ts.append(T.ColorJitter(0.2, 0.2, 0.2, 0.1))
    return T.Compose(ts)


In [None]:

# ==== Dataset (VOC XML) ====

# Classes (first is background index 0; for torchvision Faster R-CNN you pass labels >=1)
CLASSES = ["__background__", "helmet"]  # adjust if you have multiple classes

def _read_voc_xml(xml_path: Path, class_to_idx: Dict[str,int]):
    boxes, labels = [], []
    root = ET.parse(str(xml_path)).getroot()
    for obj in root.findall("object"):
        name = obj.find("name").text
        if name not in class_to_idx:
            continue
        bnd = obj.find("bndbox")
        xmin = float(bnd.find("xmin").text)
        ymin = float(bnd.find("ymin").text)
        xmax = float(bnd.find("xmax").text)
        ymax = float(bnd.find("ymax").text)
        boxes.append([xmin, ymin, xmax, ymax])
        labels.append(class_to_idx[name])
    return boxes, labels

class HelmetVOCDataset(Dataset):
    def __init__(self, img_dir: Path, ann_dir: Path, classes, transforms=None):
        self.img_dir = Path(img_dir)
        self.ann_dir = Path(ann_dir)
        self.transforms = transforms
        self.classes = classes
        self.class_to_idx = {c:i for i,c in enumerate(self.classes)}
        assert self.img_dir.is_dir() and self.ann_dir.is_dir(), "Image/Annotation dirs not found"

        exts = {".jpg",".jpeg",".png"}
        items = []
        for p in sorted(self.img_dir.iterdir()):
            if p.suffix.lower() in exts and p.is_file():
                xml = self.ann_dir / (p.stem + ".xml")
                if xml.exists():
                    items.append((p, xml))
        if not items:
            raise RuntimeError(f"No image-xml pairs under {self.img_dir} & {self.ann_dir}")
        self.items = items

    def __len__(self): return len(self.items)

    def __getitem__(self, idx):
        img_path, xml_path = self.items[idx]
        img = Image.open(img_path).convert("RGB")
        try:
            boxes, labels = _read_voc_xml(xml_path, self.class_to_idx)
        except ET.ParseError as e:
            boxes, labels = [], []
            print("[WARN] Bad XML, empty target:", xml_path, e)

        boxes  = torch.as_tensor(boxes, dtype=torch.float32) if boxes else torch.zeros((0,4), dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)  if labels else torch.zeros((0,), dtype=torch.int64)
        target = {"boxes": boxes, "labels": labels, "image_id": torch.tensor([idx])}

        if self.transforms is not None:
            img = self.transforms(img)
        return img, target


In [None]:

# ==== Build datasets & loaders ====
assert TRAIN_IMG.exists() and TRAIN_ANN.exists() and VAL_IMG.exists() and VAL_ANN.exists(), "Check DATA_ROOT paths"

train_ds = HelmetVOCDataset(TRAIN_IMG, TRAIN_ANN, CLASSES, transforms=make_transforms(train=True))
val_ds   = HelmetVOCDataset(VAL_IMG,   VAL_ANN,   CLASSES, transforms=make_transforms(train=False))

# Start with num_workers=0 to reveal dataset errors clearly (Drive-friendly)
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,
                          num_workers=0, collate_fn=collate_fn)
val_loader   = DataLoader(val_ds, batch_size=1, shuffle=False,
                          num_workers=0, collate_fn=collate_fn)

print("train/val sizes:", len(train_ds), len(val_ds))


AssertionError: Check DATA_ROOT paths

In [None]:

# ==== Model: Faster R-CNN (ResNet50-FPN) ====
import torchvision
num_classes = len(CLASSES)
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)
model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

# Optimizer & Scheduler
optimizer = torch.optim.SGD([p for p in model.parameters() if p.requires_grad],
                            lr=LR, momentum=MOMENTUM, weight_decay=WEIGHT_DECAY)

if LR_SCHEDULER == "multistep":
    scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=MILESTONES, gamma=GAMMA)
elif LR_SCHEDULER == "cosine":
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=COSINE_TMAX, eta_min=ETA_MIN)
else:
    scheduler = None

# Comet params
if ENABLE_COMET and ('experiment' in globals()) and (experiment is not None):
    experiment.log_parameters({
        "EPOCHS": EPOCHS, "BATCH_SIZE": BATCH_SIZE, "LR": LR,
        "WEIGHT_DECAY": WEIGHT_DECAY, "MOMENTUM": MOMENTUM,
        "WARMUP_STEPS": WARMUP_STEPS, "LR_SCHEDULER": LR_SCHEDULER,
        "MILESTONES": MILESTONES, "GAMMA": GAMMA,
        "COSINE_TMAX": COSINE_TMAX, "ETA_MIN": ETA_MIN,
        "RAND_RESIZE": str(RAND_RESIZE), "H_FLIP_PROB": H_FLIP_PROB,
        "COLOR_JITTER": COLOR_JITTER, "SEED": SEED,
    })


In [None]:

# ==== Validate / Train ====
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class Averager:
    def __init__(self): self.reset()
    def reset(self): self.total, self.count = 0.0, 0
    def update(self, v, n: int = 1): self.total += float(v) * n; self.count += n
    @property
    def value(self): return self.total / max(1, self.count)

@torch.no_grad()
def validate(loader, model):
    model.train()  # detection losses computed in train()
    loss_hist = Averager()
    for images, targets in loader:
        images  = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        loss_dict = model(images, targets)
        loss_sum  = sum(loss for loss in loss_dict.values())
        loss_hist.update(loss_sum.item())
    return loss_hist.value

global_step = 0

def set_lr(opt, lr):
    for g in opt.param_groups:
        g['lr'] = lr

def get_warmup_lr(base_lr, step, warmup_steps):
    if warmup_steps <= 0: return base_lr
    return base_lr * min(1.0, step / float(warmup_steps))

def train_one_epoch(loader, model, epoch: int):
    global global_step
    model.train()
    loss_hist = Averager()

    num_batches = len(loader)
    print(f"[train] epoch {epoch} | batches: {num_batches}, batch_size: {BATCH_SIZE}")

    t_epoch0 = time.time()
    for step, (images, targets) in enumerate(loader, start=1):
        t0 = time.time()

        images  = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        # Warmup LR
        warm_lr = get_warmup_lr(LR, global_step, WARMUP_STEPS)
        set_lr(optimizer, warm_lr)

        # Forward/backward
        loss_dict = model(images, targets)
        losses    = sum(loss for loss in loss_dict.values())
        optimizer.zero_grad()
        losses.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=10.0)
        optimizer.step()

        loss_val = float(losses.item())
        loss_hist.update(loss_val)
        global_step += 1

        # Comet (step-level)
        if ENABLE_COMET and ('experiment' in globals()) and (experiment is not None):
            experiment.log_metric("train/step_loss", loss_val, step=global_step)
            experiment.log_metric("lr", optimizer.param_groups[0]['lr'], step=global_step)

        # Friendly log
        if (step % 10) == 1 or step == 1:
            step_time = time.time() - t0
            avg_step_time = (time.time() - t_epoch0) / step
            eta = avg_step_time * (num_batches - step)
            mem = (torch.cuda.memory_allocated()/1e9) if torch.cuda.is_available() else 0.0
            print(f"  [step {step:4d}/{num_batches}] "
                  f"loss={loss_val:.3f}  step:{step_time:.2f}s  ETA:{eta/60:.1f}m  "
                  f"lr={optimizer.param_groups[0]['lr']:.6f}  gpu~{mem:.2f}GB")

    return loss_hist.value


In [None]:

# ==== MNIST-style Epoch Loss Plot ====
train_epoch_loss, val_epoch_loss = [], []

fig_ep, ax_ep = plt.subplots(figsize=(7,4))
(tr_line,) = ax_ep.plot([], [], '-o', label='train loss')
(va_line,) = ax_ep.plot([], [], '-s', label='val loss')
ax_ep.set_xlabel('Epoch'); ax_ep.set_ylabel('Loss')
ax_ep.set_title('Loss per Epoch'); ax_ep.grid(True); ax_ep.legend()
ep_handle = display(fig_ep, display_id=True)

def update_epoch_plot():
    x = np.arange(1, len(train_epoch_loss) + 1)
    tr_line.set_data(x, train_epoch_loss)
    va_line.set_data(x, val_epoch_loss)
    ax_ep.set_xlim(1, max(5, len(train_epoch_loss) + 0.5))
    if train_epoch_loss or val_epoch_loss:
        y_all = (train_epoch_loss or []) + (val_epoch_loss or [])
        y0, y1 = float(min(y_all)), float(max(y_all))
        if y1 == y0: y1 += 1.0; y0 -= 1.0
        m = 0.1*(y1 - y0)
        ax_ep.set_ylim(y0 - m, y1 + m)
    fig_ep.canvas.draw()
    try: ep_handle.update(fig_ep)
    except: display(fig_ep)


In [None]:

# ==== Save helpers + Main loop ====
def save_best_model(val_loss, best_val, epoch):
    if val_loss < best_val[0]:
        best_val[0] = val_loss
        path = OUTPUT_DIR / "best_model.pth"
        torch.save({
            "epoch": epoch,
            "model": model.state_dict(),
            "optimizer": optimizer.state_dict(),
            "scheduler": scheduler.state_dict() if scheduler else None,
            "best_val_loss": best_val[0],
        }, path)
        print(f">>> Saved BEST (val_loss={val_loss:.4f}) at epoch {epoch}")
        if ENABLE_COMET and ('experiment' in globals()) and (experiment is not None):
            try: experiment.log_asset(str(path), step=epoch)
            except Exception: pass

def save_epoch_model(epoch):
    path = OUTPUT_DIR / f"epoch_{epoch}.pth"
    torch.save({
        "epoch": epoch,
        "model": model.state_dict(),
        "optimizer": optimizer.state_dict(),
        "scheduler": scheduler.state_dict() if scheduler else None,
    }, path)
    if ENABLE_COMET and ('experiment' in globals()) and (experiment is not None):
        try: experiment.log_asset(str(path), step=epoch)
        except Exception: pass

start_epoch = 1
NUM_EPOCHS = EPOCHS
best_val = [float("inf")]

for ep in range(start_epoch, NUM_EPOCHS + 1):
    print(f"\nEPOCH {ep} of {NUM_EPOCHS}")
    t0 = time.time()

    train_mean = train_one_epoch(train_loader, model, epoch=ep)
    val_mean   = validate(val_loader, model)
    elapsed    = time.time() - t0

    # Record & plot
    train_epoch_loss.append(train_mean)
    val_epoch_loss.append(val_mean)
    update_epoch_plot()

    # Scheduler step (per-epoch)
    if scheduler is not None:
        scheduler.step()

    # Comet (epoch-level)
    if ENABLE_COMET and ('experiment' in globals()) and (experiment is not None):
        experiment.log_metrics({
            "epoch/train_loss": train_mean,
            "epoch/val_loss": val_mean,
        }, step=ep, epoch=ep)

    print(f"Epoch #{ep:02d} | train={train_mean:.3f}  val={val_mean:.3f}  time={elapsed:.1f}s")

    # Save
    save_best_model(val_mean, best_val, ep)
    save_epoch_model(ep)

if ENABLE_COMET and ('experiment' in globals()) and (experiment is not None):
    experiment.end()
