# üéØ MOUAADNET-ULTRA: Human Detection Training

**Author:** MOUAAD IDOUFKIR  
**Platform:** Lightning AI / Google Colab

---

In [None]:
# ============================================
# CELL 1: Check GPU & Install Dependencies
# ============================================
!nvidia-smi
!pip install -q torch torchvision tqdm pycocotools opencv-python

In [None]:
# ============================================
# CELL 2: Clone Repository
# ============================================
import os

# Works on both Lightning AI and Colab
if os.path.exists('/teamspace'):
    # Lightning AI
    WORK_DIR = '/teamspace/studios/this_studio'
else:
    # Colab
    WORK_DIR = '/content'

REPO_DIR = f'{WORK_DIR}/MouaadNet-Ultra'
DATA_DIR = f'{WORK_DIR}/coco'
CKPT_DIR = f'{WORK_DIR}/checkpoints'

os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(CKPT_DIR, exist_ok=True)

if not os.path.exists(REPO_DIR):
    !git clone https://github.com/mouuuuaad/MouaadNet-Ultra.git {REPO_DIR}
else:
    !cd {REPO_DIR} && git pull

os.chdir(REPO_DIR)
print(f"\n‚úÖ Working: {os.getcwd()}")
print(f"üìÅ Data: {DATA_DIR}")
print(f"üíæ Checkpoints: {CKPT_DIR}")

In [None]:
# ============================================
# CELL 3: Download COCO (Only Once)
# ============================================
import os

def download_if_missing(url, dest):
    if not os.path.exists(dest):
        name = url.split('/')[-1]
        print(f"üì• Downloading {name}...")
        !wget -q --show-progress {url} -O {dest}
    else:
        print(f"‚úÖ Already exists: {dest}")

# Check what exists
has_train = os.path.exists(f'{DATA_DIR}/train2017')
has_val = os.path.exists(f'{DATA_DIR}/val2017')
has_anno = os.path.exists(f'{DATA_DIR}/annotations')

if not has_train:
    download_if_missing('http://images.cocodataset.org/zips/train2017.zip', f'{DATA_DIR}/train2017.zip')
    !cd {DATA_DIR} && unzip -q train2017.zip && rm train2017.zip
    
if not has_val:
    download_if_missing('http://images.cocodataset.org/zips/val2017.zip', f'{DATA_DIR}/val2017.zip')
    !cd {DATA_DIR} && unzip -q val2017.zip && rm val2017.zip

if not has_anno:
    download_if_missing('http://images.cocodataset.org/annotations/annotations_trainval2017.zip', f'{DATA_DIR}/annotations.zip')
    !cd {DATA_DIR} && unzip -q annotations.zip && rm annotations.zip

print("\n‚úÖ COCO Dataset Ready!")
!ls {DATA_DIR}

In [None]:
# ============================================
# CELL 4: Imports
# ============================================
import os
import sys
import numpy as np
import cv2
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from pycocotools.coco import COCO
from tqdm.auto import tqdm
import matplotlib.pyplot as plt

# Add repo to path
sys.path.insert(0, REPO_DIR)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"üñ•Ô∏è Device: {device}")
print(f"üî• PyTorch: {torch.__version__}")

In [None]:
# ============================================
# CELL 5: Dataset Class
# ============================================

def gaussian2d(shape, sigma):
    """Create 2D Gaussian kernel."""
    m, n = [(s - 1) / 2 for s in shape]
    y, x = np.ogrid[-m:m+1, -n:n+1]
    g = np.exp(-(x*x + y*y) / (2*sigma*sigma))
    g[g < 1e-7] = 0
    return g


def draw_gaussian(heatmap, cx, cy, radius):
    """Draw Gaussian on heatmap at (cx, cy)."""
    diameter = 2 * radius + 1
    gaussian = gaussian2d((diameter, diameter), sigma=diameter / 6)
    
    h, w = heatmap.shape
    left = min(cx, radius)
    right = min(w - cx, radius + 1)
    top = min(cy, radius)
    bottom = min(h - cy, radius + 1)
    
    hm_region = heatmap[cy-top:cy+bottom, cx-left:cx+right]
    g_region = gaussian[radius-top:radius+bottom, radius-left:radius+right]
    
    if hm_region.size > 0 and g_region.size > 0:
        np.maximum(hm_region, g_region, out=hm_region)


class COCOPersonDataset(Dataset):
    """COCO Person Detection Dataset."""
    
    def __init__(self, data_dir, split='train', img_size=416, stride=4):
        self.data_dir = data_dir
        self.split = split
        self.img_size = img_size
        self.stride = stride
        self.out_size = img_size // stride
        
        # Load COCO
        anno_path = f'{data_dir}/annotations/instances_{split}2017.json'
        self.coco = COCO(anno_path)
        
        # Get images with persons (category_id=1)
        self.img_ids = self.coco.getImgIds(catIds=[1])
        print(f"‚úÖ {split}: {len(self.img_ids)} images")
        
        self.normalize = transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
    
    def __len__(self):
        return len(self.img_ids)
    
    def __getitem__(self, idx):
        # Load image
        img_id = self.img_ids[idx]
        img_info = self.coco.loadImgs(img_id)[0]
        img_path = f"{self.data_dir}/{self.split}2017/{img_info['file_name']}"
        
        img = cv2.imread(img_path)
        if img is None:
            return self.__getitem__((idx + 1) % len(self))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        h0, w0 = img.shape[:2]
        
        # Resize maintaining aspect ratio
        scale = min(self.img_size / h0, self.img_size / w0)
        h1, w1 = int(h0 * scale), int(w0 * scale)
        img = cv2.resize(img, (w1, h1))
        
        # Pad to square
        pad_h = self.img_size - h1
        pad_w = self.img_size - w1
        pad_top = pad_h // 2
        pad_left = pad_w // 2
        
        canvas = np.full((self.img_size, self.img_size, 3), 114, dtype=np.uint8)
        canvas[pad_top:pad_top+h1, pad_left:pad_left+w1] = img
        
        # To tensor
        img_t = torch.from_numpy(canvas).permute(2, 0, 1).float() / 255.0
        img_t = self.normalize(img_t)
        
        # Get person annotations
        ann_ids = self.coco.getAnnIds(imgIds=img_id, catIds=[1], iscrowd=False)
        anns = self.coco.loadAnns(ann_ids)
        
        # Create targets
        heatmap = np.zeros((self.out_size, self.out_size), dtype=np.float32)
        wh_map = np.zeros((2, self.out_size, self.out_size), dtype=np.float32)
        reg_mask = np.zeros((self.out_size, self.out_size), dtype=np.float32)
        
        for ann in anns:
            x, y, w, h = ann['bbox']
            if w < 1 or h < 1:
                continue
            
            # Scale to input size
            x = x * scale + pad_left
            y = y * scale + pad_top
            w = w * scale
            h = h * scale
            
            # Center in output space
            cx = (x + w / 2) / self.stride
            cy = (y + h / 2) / self.stride
            
            if 0 <= cx < self.out_size and 0 <= cy < self.out_size:
                cx_int = int(cx)
                cy_int = int(cy)
                
                # Radius based on object size
                radius = max(1, int(min(w, h) / self.stride / 3))
                draw_gaussian(heatmap, cx_int, cy_int, radius)
                
                # Width/height (normalized)
                wh_map[0, cy_int, cx_int] = w / self.img_size
                wh_map[1, cy_int, cx_int] = h / self.img_size
                reg_mask[cy_int, cx_int] = 1
        
        return {
            'image': img_t,
            'heatmap': torch.from_numpy(heatmap[None]),
            'wh': torch.from_numpy(wh_map),
            'mask': torch.from_numpy(reg_mask),
        }

print("‚úÖ Dataset class defined")

In [None]:
# ============================================
# CELL 6: Create DataLoaders
# ============================================
BATCH_SIZE = 16
IMG_SIZE = 416

train_ds = COCOPersonDataset(DATA_DIR, 'train', IMG_SIZE)
val_ds = COCOPersonDataset(DATA_DIR, 'val', IMG_SIZE)

train_loader = DataLoader(train_ds, BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True, drop_last=True)
val_loader = DataLoader(val_ds, BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)

print(f"\nüìä Train: {len(train_loader)} batches")
print(f"üìä Val: {len(val_loader)} batches")

In [None]:
# ============================================
# CELL 7: Visualize Data
# ============================================
batch = next(iter(train_loader))

fig, axes = plt.subplots(2, 4, figsize=(16, 8))
for i in range(4):
    # Image
    img = batch['image'][i].permute(1,2,0).numpy()
    img = img * [0.229, 0.224, 0.225] + [0.485, 0.456, 0.406]
    img = np.clip(img, 0, 1)
    axes[0,i].imshow(img)
    axes[0,i].axis('off')
    axes[0,i].set_title('Input')
    
    # Heatmap
    hm = batch['heatmap'][i, 0].numpy()
    axes[1,i].imshow(hm, cmap='hot')
    axes[1,i].axis('off')
    axes[1,i].set_title(f'Heatmap (max={hm.max():.2f})')

plt.tight_layout()
plt.show()

In [None]:
# ============================================
# CELL 8: Load Model
# ============================================
from mouaadnet_ultra.model import MouaadNetUltra

model = MouaadNetUltra().to(device)
print(f"‚úÖ Model loaded")
print(f"   Parameters: {sum(p.numel() for p in model.parameters()):,}")

In [None]:
# ============================================
# CELL 9: Loss Function
# ============================================

class CenterNetLoss(nn.Module):
    """CenterNet-style detection loss."""
    
    def __init__(self):
        super().__init__()
    
    def forward(self, pred_hm, pred_wh, gt_hm, gt_wh, mask):
        # Focal Loss for heatmap
        pred_hm = torch.clamp(torch.sigmoid(pred_hm), 1e-4, 1 - 1e-4)
        
        pos_mask = gt_hm.eq(1).float()
        neg_mask = gt_hm.lt(1).float()
        
        pos_loss = -torch.log(pred_hm) * torch.pow(1 - pred_hm, 2) * pos_mask
        neg_loss = -torch.log(1 - pred_hm) * torch.pow(pred_hm, 2) * torch.pow(1 - gt_hm, 4) * neg_mask
        
        num_pos = pos_mask.sum().clamp(min=1)
        hm_loss = (pos_loss.sum() + neg_loss.sum()) / num_pos
        
        # L1 Loss for size regression
        mask = mask.unsqueeze(1)
        wh_loss = F.l1_loss(pred_wh * mask, gt_wh * mask, reduction='sum')
        wh_loss = wh_loss / (mask.sum() + 1e-4)
        
        total = hm_loss + 0.1 * wh_loss
        
        return {'total': total, 'hm': hm_loss, 'wh': wh_loss}

criterion = CenterNetLoss()
print("‚úÖ Loss function defined")

In [None]:
# ============================================
# CELL 10: Training Setup
# ============================================
EPOCHS = 30
LR = 1e-3

optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, EPOCHS)
scaler = torch.amp.GradScaler('cuda')

print(f"‚úÖ Training setup complete")
print(f"   Epochs: {EPOCHS}")
print(f"   Learning Rate: {LR}")

In [None]:
# ============================================
# CELL 11: Training Functions
# ============================================

def train_one_epoch(model, loader, optimizer, criterion, scaler):
    model.train()
    total = 0
    
    pbar = tqdm(loader, desc='Train')
    for batch in pbar:
        imgs = batch['image'].to(device)
        gt_hm = batch['heatmap'].to(device)
        gt_wh = batch['wh'].to(device)
        mask = batch['mask'].to(device)
        
        optimizer.zero_grad()
        
        with torch.amp.autocast('cuda'):
            out = model(imgs)
            pred_hm = out['heatmaps'][0]
            pred_wh = out['sizes'][0]
            loss_dict = criterion(pred_hm, pred_wh, gt_hm, gt_wh, mask)
            loss = loss_dict['total']
        
        if torch.isfinite(loss):
            scaler.scale(loss).backward()
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), 5.0)
            scaler.step(optimizer)
            scaler.update()
            total += loss.item()
        
        pbar.set_postfix(loss=f"{loss.item():.4f}")
    
    return total / len(loader)


@torch.no_grad()
def validate(model, loader, criterion):
    model.eval()
    total = 0
    
    for batch in tqdm(loader, desc='Val'):
        imgs = batch['image'].to(device)
        gt_hm = batch['heatmap'].to(device)
        gt_wh = batch['wh'].to(device)
        mask = batch['mask'].to(device)
        
        out = model(imgs)
        loss_dict = criterion(out['heatmaps'][0], out['sizes'][0], gt_hm, gt_wh, mask)
        total += loss_dict['total'].item()
    
    return total / len(loader)

print("‚úÖ Training functions ready")

In [None]:
# ============================================
# CELL 12: TRAIN!
# ============================================
best_loss = float('inf')
history = {'train': [], 'val': []}

print("="*50)
print("üöÄ Starting Training")
print("="*50)

for epoch in range(EPOCHS):
    print(f"\nEpoch {epoch+1}/{EPOCHS}")
    
    train_loss = train_one_epoch(model, train_loader, optimizer, criterion, scaler)
    val_loss = validate(model, val_loader, criterion)
    scheduler.step()
    
    history['train'].append(train_loss)
    history['val'].append(val_loss)
    
    print(f"Train: {train_loss:.4f} | Val: {val_loss:.4f}")
    
    # Save
    if val_loss < best_loss:
        best_loss = val_loss
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'loss': best_loss
        }, f'{CKPT_DIR}/best.pt')
        print("‚≠ê Saved best!")

print(f"\n‚úÖ Done! Best loss: {best_loss:.4f}")

In [None]:
# ============================================
# CELL 13: Plot Results
# ============================================
plt.figure(figsize=(10, 4))
plt.plot(history['train'], label='Train')
plt.plot(history['val'], label='Val')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training Curves')
plt.grid(True)
plt.savefig(f'{CKPT_DIR}/curves.png')
plt.show()

In [None]:
# ============================================
# CELL 14: Test Detection
# ============================================
model.eval()
batch = next(iter(val_loader))

with torch.no_grad():
    out = model(batch['image'][:4].to(device))

fig, axes = plt.subplots(2, 4, figsize=(16, 8))
for i in range(4):
    img = batch['image'][i].permute(1,2,0).numpy()
    img = img * [0.229, 0.224, 0.225] + [0.485, 0.456, 0.406]
    axes[0,i].imshow(np.clip(img, 0, 1))
    axes[0,i].axis('off')
    
    hm = torch.sigmoid(out['heatmaps'][0][i,0]).cpu().numpy()
    axes[1,i].imshow(hm, cmap='hot')
    axes[1,i].set_title(f'max={hm.max():.2f}')
    axes[1,i].axis('off')

plt.suptitle('Detection Results')
plt.tight_layout()
plt.show()

In [None]:
# ============================================
# CELL 15: Export
# ============================================
ckpt = torch.load(f'{CKPT_DIR}/best.pt')
model.load_state_dict(ckpt['model_state_dict'])
model.eval()
model.cpu()

# ONNX
torch.onnx.export(
    model, 
    torch.randn(1, 3, 416, 416),
    f'{CKPT_DIR}/detection.onnx',
    input_names=['image'],
    opset_version=11
)

print(f"‚úÖ Exported!")
print(f"   PyTorch: {CKPT_DIR}/best.pt")
print(f"   ONNX: {CKPT_DIR}/detection.onnx")
print(f"\nüì• Download and test with:")
print(f"   python examples/webcam_demo.py --weights best.pt")

In [None]:
# ============================================
# CELL 16: Download (Colab Only)
# ============================================
try:
    from google.colab import files
    files.download(f'{CKPT_DIR}/best.pt')
    files.download(f'{CKPT_DIR}/detection.onnx')
    print("üéâ Downloaded!")
except:
    print(f"üìÅ Files saved to: {CKPT_DIR}")
    print("   Copy them manually or use Lightning AI file browser")