# SSDLite 训练笔记（seed）

## 0. Imports & 配置

In [1]:
import math, time, json
from pathlib import Path
from typing import Dict, List, Tuple

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.amp import GradScaler, autocast
from torch.utils.data import DataLoader

import numpy as np

# === 你工程里的模块 ===
from sparrow.models.ssdlite import SSDLite                      # 模型骨架/头/neck  :contentReference[oaicite:4]{index=4}
from sparrow.datasets.coco_dets import create_dets_dataloader   # COCO det dataloader  :contentReference[oaicite:5]{index=5}
from sparrow.loss.ssdlite_loss import  (
    SSDLoss, pack_targets_for_ssd, generate_ssd_anchors,
)                                                               # 损失/锚框/编码  :contentReference[oaicite:6]{index=6}


DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# --- 训练配置（可按需改）---
IMG_SIZE      = 320
BATCH_SIZE    = 64
EPOCHS        = 10
NUM_WORKERS   = 8
PIN_MEMORY    = True
DATA_ROOT     = "./data/coco2017_ssdlite"   # 根目录，含 images/{train2017,val2017} & annotations/*.json
SAVE_DIR      = "./output/ssdlite"
BACKBONE      = "mobilenet_v2"
WIDTH_MULT    = 1.0               # 与你选的权重/算力匹配
NUM_CLASSES   = 81                # 含背景=1..80（COCO）
RATIOS        = (1.0, 2.0, 0.5)
SCALES        = (1.0, 1.26)
STRIDES       = (8, 16, 32)       # 与默认 SSDLite 配置一致  :contentReference[oaicite:7]{index=7}

# 优化器 & 训练细节
LR            = 3e-3
WD            = 5e-4
WARMUP_EPOCHS = 3
AMP           = True              # 混合精度
NEG_POS_RATIO = 3                 # OHEM 负正比  :contentReference[oaicite:8]{index=8}

Path(SAVE_DIR).mkdir(parents=True, exist_ok=True)

## 1. DataLoader（训练/验证）

In [2]:
aug_cfg = dict(
    use_color_aug=True, use_flip=True, use_rotate=True, rotate_deg=15.0,
    use_scale=True, scale_range=(0.75, 1.25), min_box_size=2.0
)

train_loader: DataLoader = create_dets_dataloader(
    dataset_root=DATA_ROOT, img_size=IMG_SIZE, batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY, aug_cfg=aug_cfg, is_train=True
)  # 训练默认带几何/颜色增广等  :contentReference[oaicite:9]{index=9}

val_loader: DataLoader = create_dets_dataloader(
    dataset_root=DATA_ROOT, img_size=IMG_SIZE, batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY, aug_cfg=dict(min_box_size=2.0), is_train=False
)  # 验证关闭大部分增广  :contentReference[oaicite:10]{index=10}

## 2. 构建模型 & 优化器/损失

In [3]:
model = SSDLite(
    num_classes=NUM_CLASSES, backbone=BACKBONE, width_mult=WIDTH_MULT,
    anchor_ratios=RATIOS, anchor_scales=SCALES, anchor_strides=STRIDES
).to(DEVICE)  # 输出三层 [P3,P4,P5]，每层 SSDLiteHead 产出 [B, H*W*A, C] 与 [B, H*W*A, 4]  :contentReference[oaicite:11]{index=11}

optimizer = torch.optim.SGD(model.parameters(), lr=LR, momentum=0.9, weight_decay=WD)
lr_sched  = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)
scaler    = GradScaler(enabled=AMP)

# 含背景的 SSDLoss（CrossEntropy + SmoothL1 + OHEM）
criterion = SSDLoss(num_classes=NUM_CLASSES, alpha=1.0, neg_pos_ratio=NEG_POS_RATIO, reg_type="smoothl1")

## 3. 预先计算 anchors（按实际特征图尺寸）

> 我们用一个 dummy batch 前向，拿到 P3/P4/P5 的 `H,W`，再用 **与训练一致的 strides/ratios/scales** 生成**归一化 (cx,cy,w,h)** anchors；这与损失里的编码/解码严格对齐（variances=(0.1,0.2)）。

In [4]:
@torch.no_grad()
def infer_feat_shapes_and_make_anchors(model, img_size: int):
    model.eval()
    dummy = torch.zeros(1, 3, img_size, img_size, device=DEVICE)
    out = model(dummy)
    # 从 cls_logits 每层长度反推出 H*W*A，再用 strides 反推 H,W（已知 A=len(ratios)*len(scales)）
    A = len(RATIOS) * len(SCALES)
    feat_shapes = []
    for cls in out["cls_logits"]:
        _, NA, _ = cls.shape
        HW = NA // A
        H  = int(round(math.sqrt(HW)))
        W  = HW // H
        feat_shapes.append((H, W))
    anchors = generate_ssd_anchors(
        img_size=img_size, feat_shapes=feat_shapes, strides=list(STRIDES),
        ratios=RATIOS, scales=SCALES
    ).to(DEVICE)  # [A_total,4], 归一化 cxcywh  :contentReference[oaicite:14]{index=14}
    A_total = anchors.shape[0]
    print("feat_shapes:", feat_shapes, " anchors:", A_total)
    return feat_shapes, anchors

feat_shapes, anchors_cxcywh = infer_feat_shapes_and_make_anchors(model, IMG_SIZE)

feat_shapes: [(80, 80), (40, 40), (20, 20)]  anchors: 50400


## 4. 训练循环（与损失接口完全对齐）

In [None]:
use_amp = AMP and (DEVICE == 'cuda')

def to_device_packed(packed, device):
    """把 pack_targets_for_ssd 返回的结构搬到 device。兼容 list[Tensor]/Tensor/其它。"""
    def _move(x):
        if torch.is_tensor(x):
            return x.to(device, non_blocking=True)
        if isinstance(x, (list, tuple)):
            return type(x)(_move(t) for t in x)
        if isinstance(x, dict):
            return {k: _move(v) for k, v in x.items()}
        return x
    return _move(packed)

best_val = 1e9
for epoch in range(1, EPOCHS+1):
    model.train()
    meter = {"loss": 0.0, "cls": 0.0, "reg": 0.0, "n": 0}
    t0 = time.time()

    for imgs, targets_list, _ in train_loader:
        imgs = imgs.to(DEVICE, non_blocking=True)

        # 1) 打包 targets（先 CPU 产生，再整体搬到 GPU）
        packed = pack_targets_for_ssd(targets_list, img_size=IMG_SIZE)
        packed = to_device_packed(packed, DEVICE)  # <<< 关键：把 boxes/labels/masks 全搬到同一设备

        optimizer.zero_grad(set_to_none=True)
        # 2) 仅在 CUDA 下启用 autocast
        if use_amp:
            ctx = autocast(device_type='cuda', enabled=True)
        else:
            # cpu 或未启用 AMP 时，给个空上下文
            from contextlib import nullcontext
            ctx = nullcontext()

        with ctx:
            out = model(imgs)
            cls_logits = torch.cat(out["cls_logits"], dim=1)  # [B, A_total, C]
            bbox_regs  = torch.cat(out["bbox_regs"],  dim=1)  # [B, A_total, 4]
            assert cls_logits.shape[1] == anchors_cxcywh.shape[0], "anchors 与输出长度不一致"

            # 3) 损失计算前，确保 anchors 与 logits 在同设备（通常已是 DEVICE）
            loss, m = criterion(cls_logits, bbox_regs, anchors_cxcywh, packed)

        if use_amp:
            # AMP 模式
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            # 非 AMP（如 CPU 或你关闭 AMP）
            loss.backward()
            optimizer.step()

        meter["loss"] += float(loss.detach().cpu())
        meter["cls"]  += m["loss_cls"]
        meter["reg"]  += m["loss_reg"]
        meter["n"]    += 1

    lr_sched.step()
    dt = time.time() - t0
    tr_log = {k: v / max(1, meter["n"]) for k, v in meter.items() if k != "n"}
    print(f"[Epoch {epoch:03d}] train: {tr_log}  time={dt:.1f}s  lr={optimizer.param_groups[0]['lr']:.3e}")

    # ---------------- 验证 ----------------
    model.eval()
    with torch.no_grad():
        vm = {"loss": 0.0, "cls": 0.0, "reg": 0.0, "n": 0}
        for imgs, targets_list, _ in val_loader:
            imgs = imgs.to(DEVICE, non_blocking=True)
            packed = pack_targets_for_ssd(targets_list, img_size=IMG_SIZE)
            packed = to_device_packed(packed, DEVICE)  # <<< 同样要搬

            out = model(imgs)
            cls_logits = torch.cat(out["cls_logits"], dim=1)
            bbox_regs  = torch.cat(out["bbox_regs"],  dim=1)
            loss, m = criterion(cls_logits, bbox_regs, anchors_cxcywh, packed)

            vm["loss"] += float(loss.detach().cpu())
            vm["cls"]  += m["loss_cls"]
            vm["reg"]  += m["loss_reg"]
            vm["n"]    += 1

        val_log = {k: v / max(1, vm["n"]) for k, v in vm.items() if k != "n"}
    print(f"[Epoch {epoch:03d}] valid: {val_log}")

    if val_log["loss"] < best_val:
        best_val = val_log["loss"]
        ckpt = {"model_state": model.state_dict(), "epoch": epoch, "best_score": best_val}
        torch.save(ckpt, f"{SAVE_DIR}/best.pt")
        print(f"  ✓ saved best to {SAVE_DIR}/best.pt")

## 5. 简易推理/可视化（验证 anchor/解码逻辑）

> 解码要与训练**同一套** variances 与 anchor 定义，直接复用 `decode_deltas_to_xyxy`；运行在 **letterbox 坐标**，再按比例映射回原图。

In [None]:
import cv2
from sparrow.loss.ssdlite_loss import decode_deltas_to_xyxy  # 与训练同源  :contentReference[oaicite:19]{index=19}

def nms_np(boxes, scores, iou=0.5, max_det=100):
    order = scores.argsort()[::-1]
    keep = []
    while order.size > 0 and len(keep) < max_det:
        i = order[0]; keep.append(i)
        if order.size == 1: break
        xx1 = np.maximum(boxes[i,0], boxes[order[1:],0])
        yy1 = np.maximum(boxes[i,1], boxes[order[1:],1])
        xx2 = np.minimum(boxes[i,2], boxes[order[1:],2])
        yy2 = np.minimum(boxes[i,3], boxes[order[1:],3])
        w = np.clip(xx2-xx1, 0, None); h = np.clip(yy2-yy1, 0, None)
        inter = w*h
        area_i = (boxes[i,2]-boxes[i,0])*(boxes[i,3]-boxes[i,1])
        area_o = (boxes[order[1:],2]-boxes[order[1:],0])*(boxes[order[1:],3]-boxes[order[1:],1])
        ovr = inter / (area_i + area_o - inter + 1e-6)
        order = order[1:][ovr < iou]
    return np.array(keep, dtype=np.int64)

@torch.no_grad()
def infer_one(img_bgr, model, conf_thr=0.35, person_idx=1):
    # letterbox 到 IMG_SIZE（与训练一致）
    h0, w0 = img_bgr.shape[:2]
    scale = min(IMG_SIZE/h0, IMG_SIZE/w0)
    nh, nw = int(round(h0*scale)), int(round(w0*scale))
    canvas = np.full((IMG_SIZE, IMG_SIZE, 3), 114, dtype=np.uint8)
    top, left = (IMG_SIZE-nh)//2, (IMG_SIZE-nw)//2
    canvas[top:top+nh, left:left+nw] = cv2.resize(img_bgr, (nw, nh))
    rgb = cv2.cvtColor(canvas, cv2.COLOR_BGR2RGB)

    inp = torch.from_numpy(rgb.transpose(2,0,1)).float().unsqueeze(0)/255.0
    inp = inp.to(DEVICE)

    out = model(inp)
    cls = torch.cat(out["cls_logits"], dim=1)  # [1,A,C]
    reg = torch.cat(out["bbox_regs"],  dim=1)  # [1,A,4]

    probs = torch.softmax(cls[0], dim=-1)      # **含背景 softmax**
    deltas = reg[0]
    # 解码回归：anchors 是 **归一化(cxcywh)**，得到 **归一化 xyxy**
    xyxy_norm = decode_deltas_to_xyxy(deltas, anchors_cxcywh)  # [A,4]  :contentReference[oaicite:20]{index=20}

    # 取人类概率（person=1），阈值过滤、NMS
    scores = probs[:, person_idx].detach().cpu().numpy()
    mask = scores >= conf_thr
    boxes_lbox = xyxy_norm[mask].detach().cpu().numpy() * IMG_SIZE  # 回到 letterbox 像素
    scores = scores[mask]
    if boxes_lbox.shape[0] > 0:
        keep = nms_np(boxes_lbox, scores, iou=0.5, max_det=100)
        boxes_lbox, scores = boxes_lbox[keep], scores[keep]

    # 映回原图
    boxes = []
    for (x1,y1,x2,y2) in boxes_lbox:
        X1 = (x1 - left) / scale
        Y1 = (y1 - top ) / scale
        X2 = (x2 - left) / scale
        Y2 = (y2 - top ) / scale
        boxes.append([X1,Y1,X2,Y2])
    return np.array(boxes, np.float32), scores

# 可视化
img = cv2.imread("./india_road.png")
boxes, scores = infer_one(img, model, conf_thr=0.35, person_idx=1)
vis = img.copy()
for b,s in zip(boxes.astype(int), scores):
    cv2.rectangle(vis, (b[0],b[1]), (b[2],b[3]), (0,200,255), 2)
    cv2.putText(vis, f"{s:.2f}", (b[0], max(0,b[1]-4)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,200,255), 1, cv2.LINE_AA)

## 为什么这套 note 能“稳”

* **类别定义**：数据集把 COCO 类别映射为 **0..C-1 连续索引**（人常为 0）；损失里**背景=0**，前景标签范围被内部处理为 **1..C-1**，因此**模型 num\_classes 必须= C+1 = 81**，并用 **softmax**。这一点在 `SSDLoss` 的文档与实现里写得很清楚（编码/解码 variances = 0.1/0.2；OHEM 负样本挖掘也依赖背景通道的 CE）。&#x20;
* **锚框/解码一致**：训练/推理均使用 `generate_ssd_anchors` 的**归一化 cxcywh**，配合同一个 `decode_deltas_to_xyxy`，不会再出现“anchor/stride/variance 不一致”的偏差。
* **特征尺寸对齐**：先跑一次前向，自动推断 `[P3,P4,P5]` 的 `(H,W)`，结合 `A=len(ratios)*len(scales)` 与 `STRIDES=(8,16,32)` 生成 anchors，长度与 head 展平维度严格一致（我们也加了 assert）。
* **DataLoader 与增广**：与你工程现有的增广/letterbox 完全一致，验证集关闭大部分增广，避免评测偏差。