# Implementation: Static Pictures

**Needed Installs**

In [None]:
!pip install -U transformers pillow
!pip install -U accelerate

**Core Code for Person Recogntion: Model Usage + Pipeline**

In [None]:
import torch
from PIL import Image
from transformers import AutoImageProcessor, RTDetrForObjectDetection

MODEL_ID = "PekingU/rtdetr_r50vd"

# Load once (important for video later)
processor = AutoImageProcessor.from_pretrained(MODEL_ID)
model = RTDetrForObjectDetection.from_pretrained(MODEL_ID)
model.eval()

# Find the COCO "person" class id from the model config
id2label = model.config.id2label
PERSON_ID = [k for k, v in id2label.items() if v.lower() == "person"][0]

In [None]:
from torchvision.ops import nms

@torch.no_grad()
def detect_people(
    image,
    threshold: float = 0.3,
    min_box_area: int = 700,
    ar_min: float = 1.05,
    ar_max: float = 6.5,
    nms_iou: float = 0.60,
    device: str | torch.device = None,
):
    """
    RT-DETR people detection with fast+strong post-processing:
    1) model inference + threshold
    2) keep only person
    3) geometry filter (area + aspect ratio)
    4) NMS dedupe

    Returns:
      dict: count, boxes, scores, labels, debug
    """
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"
    dev = torch.device(device)

    model.to(dev)
    model.eval()

    if image.mode != "RGB":
        image = image.convert("RGB")

    # --- 1) Inference ---
    inputs = processor(images=image, return_tensors="pt").to(dev)
    outputs = model(**inputs)

    target_sizes = torch.tensor([image.size[::-1]], device=dev)  # (h, w)
    det = processor.post_process_object_detection(
        outputs, target_sizes=target_sizes, threshold=threshold
    )[0]

    debug = {
        "raw_total": int(det["boxes"].shape[0]),
        "raw_thresh": float(threshold),
    }

    # --- 2) Person-only ---
    person_mask = det["labels"] == PERSON_ID
    boxes = det["boxes"][person_mask]
    scores = det["scores"][person_mask]
    labels = det["labels"][person_mask]

    debug["person_after_thresh"] = int(boxes.shape[0])

    # Early exit
    if boxes.numel() == 0:
        return {
            "count": 0,
            "boxes": boxes.detach().cpu(),
            "scores": scores.detach().cpu(),
            "labels": labels.detach().cpu(),
            "debug": debug,
        }

    # --- 3) Geometry filter ---
    w = (boxes[:, 2] - boxes[:, 0]).clamp(min=0)
    h = (boxes[:, 3] - boxes[:, 1]).clamp(min=0)
    area = w * h
    ar = h / (w + 1e-6)

    # Only apply strict geometry to low-confidence detections
    score_gate = 0.50
    high_conf = scores >= score_gate

    geom_ok = (area >= float(min_box_area)) & ((ar >= float(ar_min)) & (ar <= float(ar_max)))

    # Keep all high-confidence detections, and only filter low-confidence ones
    keep = high_conf | geom_ok

    boxes = boxes[keep]
    scores = scores[keep]
    labels = labels[keep]

    debug["person_after_geom"] = int(boxes.shape[0])
    debug["geom_params"] = {
        "min_box_area": min_box_area,
        "ar_min": ar_min,
        "ar_max": ar_max,
        "score_gate": score_gate,
        "rule": "keep if high_conf OR (area>=min_area OR person-like AR)"
    }


    # --- 4) NMS dedupe ---
    keep_idx = nms(boxes, scores, float(nms_iou))
    boxes = boxes[keep_idx]
    scores = scores[keep_idx]
    labels = labels[keep_idx]

    debug["person_after_nms"] = int(boxes.shape[0])
    debug["nms_iou"] = nms_iou

    return {
        "count": int(boxes.shape[0]),
        "boxes": boxes.detach().cpu(),
        "scores": scores.detach().cpu(),
        "labels": labels.detach().cpu(),
        "debug": debug,
    }

In [None]:
def count_people(image: Image.Image, **kwargs) -> int:
    """Convenience wrapper that returns only the count."""
    return detect_people(image, **kwargs)["count"]

**Visualiztion**

In [None]:
from PIL import ImageDraw, ImageFont

def _get_default_font(size=16):
    # Colab often lacks many fonts; fall back safely.
    try:
        return ImageFont.truetype("DejaVuSans.ttf", size)
    except Exception:
        return ImageFont.load_default()

In [None]:
def draw_detections(
    image,
    boxes,
    scores=None,
    labels=None,
    id2label=None,
    max_detections=None,
    score_format="{:.2f}",
    box_width=3,
    font_size=16,
):
    """
    Draw bounding boxes on a PIL image.

    Args:
        image: PIL.Image
        boxes: Tensor/array-like shape (N,4) in [x1,y1,x2,y2] pixel coords
        scores: optional (N,)
        labels: optional (N,)
        id2label: optional dict mapping label_id -> name
        max_detections: optionally draw only first K (e.g., sorted by score already)
    Returns:
        annotated PIL.Image
    """
    img = image.copy()
    draw = ImageDraw.Draw(img)
    font = _get_default_font(font_size)

    if max_detections is not None:
        boxes = boxes[:max_detections]
        if scores is not None: scores = scores[:max_detections]
        if labels is not None: labels = labels[:max_detections]

    for i, b in enumerate(boxes):
        x1, y1, x2, y2 = [float(v) for v in b]
        draw.rectangle([x1, y1, x2, y2], width=box_width)

        parts = []
        if labels is not None and id2label is not None:
            parts.append(str(id2label.get(int(labels[i]), int(labels[i]))))
        if scores is not None:
            parts.append(score_format.format(float(scores[i])))

        if parts:
            text = " ".join(parts)
            # Text background for readability
            tx, ty = x1, max(0.0, y1 - font_size - 4)
            tw, th = draw.textbbox((0, 0), text, font=font)[2:]
            draw.rectangle([tx, ty, tx + tw + 6, ty + th + 4], fill="white")
            draw.text((tx + 3, ty + 2), text, font=font, fill="black")

    return img

In [None]:
def visualize_people(image, detection_output, max_detections=50):
    """
    Draw only the filtered 'person' detections (boxes + scores).
    """
    return draw_detections(
        image=image,
        boxes=detection_output["boxes"],
        scores=detection_output["scores"],
        labels=detection_output["labels"],
        id2label=model.config.id2label,
        max_detections=max_detections,
        box_width=3,
        font_size=16,
    )

**Folder Evalutaion Function: MAE + Exact Match**

In [None]:
import re
from pathlib import Path
from PIL import Image
import numpy as np

def evaluate_image_folder_metrics(
    folder_path: str,
    threshold: float = 0.4,
    nms_iou: float = 0.60,
    min_box_area: int = 700,
    ar_min: float = 1.05,
    ar_max: float = 6.5,
    device=None,
    extensions=(".jpg", ".jpeg", ".png", ".bmp", ".webp"),
):
    """
    Evaluate a folder of images with filenames: <id>_<gtcount>.<ext>
    Returns: (mae, exact_match_rate, n_eval)
    """
    folder = Path(folder_path)
    if not folder.exists():
        raise FileNotFoundError(f"Folder not found: {folder_path}")

    pattern = re.compile(r"^(?P<id>.+)_(?P<gt>\d+)$")
    image_paths = sorted([p for p in folder.iterdir() if p.suffix.lower() in extensions])

    gts, preds = [], []

    for p in image_paths:
        m = pattern.match(p.stem)
        if not m:
            continue
        gt = int(m.group("gt"))

        img = Image.open(p).convert("RGB")
        pred = count_people(
            img,
            threshold=threshold,
            nms_iou=nms_iou,
            min_box_area=min_box_area,
            ar_min=ar_min,
            ar_max=ar_max,
            device=device,
        )

        gts.append(gt)
        preds.append(int(pred))

    if len(gts) == 0:
        raise ValueError(f"No valid images found in {folder_path} with '<id>_<gt>' naming.")

    gts = np.array(gts, dtype=float)
    preds = np.array(preds, dtype=float)

    abs_err = np.abs(preds - gts)
    mae = float(abs_err.mean())
    exact_match = float((abs_err == 0).mean())
    return mae, exact_match, int(len(gts))


**Sweep + Plot: Threshhold**

In [None]:
import matplotlib.pyplot as plt

def sweep_and_plot_threshold(
    folder_path: str,
    thresholds=(0.2, 0.3, 0.4, 0.5, 0.6),
    fixed_nms_iou=0.60,
    fixed_min_box_area=700,
    ar_min=1.05,
    ar_max=6.5,
    device=None,
):
    maes, ems = [], []
    for th in thresholds:
        mae, em, n = evaluate_image_folder_metrics(
            folder_path,
            threshold=th,
            nms_iou=fixed_nms_iou,
            min_box_area=fixed_min_box_area,
            ar_min=ar_min,
            ar_max=ar_max,
            device=device,
        )
        maes.append(mae)
        ems.append(em)
        print(f"threshold={th:.2f} | MAE={mae:.3f} | ExactMatch={em*100:.1f}% (N={n})")

    # Plot MAE
    plt.figure()
    plt.plot(list(thresholds), maes, marker="o")
    plt.xlabel("threshold")
    plt.ylabel("MAE")
    plt.title("MAE vs threshold")
    plt.show()

    # Plot Exact Match
    plt.figure()
    plt.plot(list(thresholds), [e*100 for e in ems], marker="o")
    plt.xlabel("threshold")
    plt.ylabel("Exact Match Rate (%)")
    plt.title("Exact Match Rate vs threshold")
    plt.show()

    best_idx = int(np.argmin(maes))
    print(f"Best (by lowest MAE): threshold={thresholds[best_idx]:.2f} | MAE={maes[best_idx]:.3f} | ExactMatch={ems[best_idx]*100:.1f}%")
    return {"thresholds": list(thresholds), "mae": maes, "exact_match": ems}

**Sweep + Plot: nms_iou**

In [None]:
def sweep_and_plot_nms_iou(
    folder_path: str,
    nms_ious=(0.3, 0.4, 0.5, 0.6, 0.7),
    fixed_threshold=0.4,
    fixed_min_box_area=700,
    ar_min=1.05,
    ar_max=6.5,
    device=None,
):
    maes, ems = [], []
    for iou in nms_ious:
        mae, em, n = evaluate_image_folder_metrics(
            folder_path,
            threshold=fixed_threshold,
            nms_iou=iou,
            min_box_area=fixed_min_box_area,
            ar_min=ar_min,
            ar_max=ar_max,
            device=device,
        )
        maes.append(mae)
        ems.append(em)
        print(f"nms_iou={iou:.2f} | MAE={mae:.3f} | ExactMatch={em*100:.1f}% (N={n})")

    # Plot MAE
    plt.figure()
    plt.plot(list(nms_ious), maes, marker="o")
    plt.xlabel("nms_iou")
    plt.ylabel("MAE")
    plt.title("MAE vs nms_iou")
    plt.show()

    # Plot Exact Match
    plt.figure()
    plt.plot(list(nms_ious), [e*100 for e in ems], marker="o")
    plt.xlabel("nms_iou")
    plt.ylabel("Exact Match Rate (%)")
    plt.title("Exact Match Rate vs nms_iou")
    plt.show()

    best_idx = int(np.argmin(maes))
    print(f"Best (by lowest MAE): nms_iou={nms_ious[best_idx]:.2f} | MAE={maes[best_idx]:.3f} | ExactMatch={ems[best_idx]*100:.1f}%")
    return {"nms_ious": list(nms_ious), "mae": maes, "exact_match": ems}

**Sweep + Plot: Min Box Area**

In [None]:
def sweep_and_plot_min_box_area(
    folder_path: str,
    min_areas=(0, 200, 400, 700, 1000, 1500),
    fixed_threshold=0.4,
    fixed_nms_iou=0.60,
    ar_min=1.05,
    ar_max=6.5,
    device=None,
):
    maes, ems = [], []
    for a in min_areas:
        mae, em, n = evaluate_image_folder_metrics(
            folder_path,
            threshold=fixed_threshold,
            nms_iou=fixed_nms_iou,
            min_box_area=int(a),
            ar_min=ar_min,
            ar_max=ar_max,
            device=device,
        )
        maes.append(mae)
        ems.append(em)
        print(f"min_box_area={int(a)} | MAE={mae:.3f} | ExactMatch={em*100:.1f}% (N={n})")

    plt.figure()
    plt.plot(list(min_areas), maes, marker="o")
    plt.xlabel("min_box_area (px^2)")
    plt.ylabel("MAE")
    plt.title("MAE vs min_box_area")
    plt.show()

    plt.figure()
    plt.plot(list(min_areas), [e*100 for e in ems], marker="o")
    plt.xlabel("min_box_area (px^2)")
    plt.ylabel("Exact Match Rate (%)")
    plt.title("Exact Match Rate vs min_box_area")
    plt.show()

    best_idx = int(np.argmin(maes))
    print(f"Best (by lowest MAE): min_box_area={min_areas[best_idx]} | MAE={maes[best_idx]:.3f} | ExactMatch={ems[best_idx]*100:.1f}%")
    return {"min_areas": list(min_areas), "mae": maes, "exact_match": ems}

# Testing: Static Pictures

**Test Cells: Intial first Check**

In [None]:
import requests
from PIL import Image

img = Image.open(r"/content/05_8.jpg").convert("RGB")

out = detect_people(img, threshold=0.4, min_box_area=300, nms_iou=0.6)
print("People:", out["count"])
# out.keys(), out["boxes"][:2], out["scores"][:2]


In [None]:
annotated = visualize_people(img, out)

import matplotlib.pyplot as plt
plt.figure(figsize=(8,8))
plt.imshow(annotated)
plt.axis("off")
plt.title(f"People count: {out['count']} | {out['debug']}")
plt.show()


**Helpers for Testing**

In [None]:
import re
import os
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image

# ---------- Helpers: parsing + evaluation ----------

def _list_images(folder_path, extensions=(".jpg",".jpeg",".png",".bmp",".webp")):
    folder = Path(folder_path)
    paths = sorted([p for p in folder.iterdir() if p.suffix.lower() in extensions])
    if len(paths) == 0:
        raise ValueError(f"No images found in: {folder_path}")
    return paths

def _parse_gt_from_name(path: Path):
    """
    Expected: <id>_<gt>.<ext>, e.g. 0001_6.jpg
    Returns gt (int) or None if filename doesn't match.
    """
    m = re.match(r"^(?P<id>.+)_(?P<gt>\d+)$", path.stem)
    if not m:
        return None
    return int(m.group("gt"))

def evaluate_folder_detailed(
    folder_path: str,
    threshold: float,
    nms_iou: float,
    min_box_area: int,
    ar_min: float = 1.05,
    ar_max: float = 6.5,
    device=None,
    verbose=False,
):
    """
    Returns:
      summary dict: MAE, ExactMatch, N, params
      df: per-image results with abs_error
    """
    paths = _list_images(folder_path)
    rows = []
    skipped = 0

    for p in paths:
        gt = _parse_gt_from_name(p)
        if gt is None:
            skipped += 1
            continue

        img = Image.open(p).convert("RGB")
        out = detect_people(
            img,
            threshold=threshold,
            nms_iou=nms_iou,
            min_box_area=min_box_area,
            ar_min=ar_min,
            ar_max=ar_max,
            device=device,
        )
        pred = int(out["count"])
        abs_err = abs(pred - gt)

        rows.append({
            "file": p.name,
            "path": str(p),
            "gt": gt,
            "pred": pred,
            "abs_error": abs_err,
        })

        if verbose:
            print(f"{p.name}: GT={gt}, Pred={pred}, |err|={abs_err}")

    if len(rows) == 0:
        raise ValueError("No valid images matched '<id>_<gt>.<ext>' naming format.")

    df = pd.DataFrame(rows).sort_values(["abs_error", "file"], ascending=[False, True]).reset_index(drop=True)
    mae = float(df["abs_error"].mean())
    exact = float((df["abs_error"] == 0).mean())

    summary = {
        "N_total_files": len(paths),
        "N_evaluated": int(len(df)),
        "N_skipped_bad_name": int(skipped),
        "MAE": mae,
        "ExactMatch": exact,
        "params": {
            "threshold": float(threshold),
            "nms_iou": float(nms_iou),
            "min_box_area": int(min_box_area),
            "ar_min": float(ar_min),
            "ar_max": float(ar_max),
        }
    }
    return summary, df

# ---------- Plotting helpers ----------

import os

def _plot_curve(x, y, xlabel, ylabel, title, save_path=None, dpi=200):
    plt.figure()
    plt.plot(list(x), list(y), marker="o")
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.title(title)
    plt.grid(True, alpha=0.3)

    if save_path is not None:
        os.makedirs(os.path.dirname(save_path), exist_ok=True)
        plt.tight_layout()
        plt.savefig(save_path, dpi=dpi, bbox_inches="tight")
    plt.show()
    plt.close()


def _pick_best_worst(results_df, metric="MAE"):
    """
    results_df has columns: param_value, MAE, ExactMatch
    Returns best_row, worst_row by min/max metric.
    """
    best = results_df.loc[results_df[metric].idxmin()].to_dict()
    worst = results_df.loc[results_df[metric].idxmax()].to_dict()
    return best, worst

# ---------- Sweeps (sequential) ----------
# We do sequential sweeps because that's fast and interpretable:
# 1) sweep threshold with fixed nms_iou/min_box_area
# 2) sweep nms_iou using best threshold (fixed min_box_area)
# 3) sweep min_box_area using best threshold + best nms_iou

def run_all_sweeps(
    folder_path: str,
    thresholds=(0.2, 0.3, 0.4, 0.5, 0.6),
    nms_ious=(0.3, 0.4, 0.5, 0.6, 0.7),
    min_areas=(0, 200, 400, 700, 1000, 1500),
    base_threshold=0.4,
    base_nms_iou=0.6,
    base_min_box_area=700,
    ar_min=1.05,
    ar_max=6.5,
    device=None,
    plots_dir="/content/eval_plots",
):
    os.makedirs(plots_dir, exist_ok=True)

    # ---- 1) Threshold sweep ----
    th_rows = []
    for th in thresholds:
        s, _ = evaluate_folder_detailed(
            folder_path, threshold=th, nms_iou=base_nms_iou, min_box_area=base_min_box_area,
            ar_min=ar_min, ar_max=ar_max, device=device, verbose=False
        )
        th_rows.append({"param": "threshold", "value": th, "MAE": s["MAE"], "ExactMatch": s["ExactMatch"]})
        print(f"[TH] th={th:.2f} | MAE={s['MAE']:.3f} | Exact={s['ExactMatch']*100:.1f}%")

    df_th = pd.DataFrame(th_rows)
    _plot_curve(
        df_th["value"], df_th["MAE"],
        "threshold", "MAE", "MAE vs threshold",
        save_path=os.path.join(plots_dir, "MAE_vs_threshold.png")
    )
    _plot_curve(
        df_th["value"], df_th["ExactMatch"]*100,
        "threshold", "Exact Match Rate (%)", "Exact Match vs threshold",
        save_path=os.path.join(plots_dir, "ExactMatch_vs_threshold.png")
    )

    best_th, worst_th = _pick_best_worst(df_th, metric="MAE")
    best_threshold = float(best_th["value"])
    worst_threshold = float(worst_th["value"])

    print("\nBest threshold (by MAE):", best_th)
    print("Worst threshold (by MAE):", worst_th)

    # ---- 2) NMS sweep (use best threshold) ----
    iou_rows = []
    for iou in nms_ious:
        s, _ = evaluate_folder_detailed(
            folder_path, threshold=best_threshold, nms_iou=iou, min_box_area=base_min_box_area,
            ar_min=ar_min, ar_max=ar_max, device=device, verbose=False
        )
        iou_rows.append({"param": "nms_iou", "value": iou, "MAE": s["MAE"], "ExactMatch": s["ExactMatch"]})
        print(f"[NMS] iou={iou:.2f} | MAE={s['MAE']:.3f} | Exact={s['ExactMatch']*100:.1f}%")

    df_iou = pd.DataFrame(iou_rows)
    _plot_curve(
        df_iou["value"], df_iou["MAE"],
        "nms_iou", "MAE", f"MAE vs nms_iou (th={best_threshold:.2f})",
        save_path=os.path.join(plots_dir, f"MAE_vs_nms_iou_th{best_threshold:.2f}.png")
    )
    _plot_curve(
        df_iou["value"], df_iou["ExactMatch"]*100,
        "nms_iou", "Exact Match Rate (%)", f"Exact Match vs nms_iou (th={best_threshold:.2f})",
        save_path=os.path.join(plots_dir, f"ExactMatch_vs_nms_iou_th{best_threshold:.2f}.png")
    )

    best_iou, worst_iou = _pick_best_worst(df_iou, metric="MAE")
    best_nms_iou = float(best_iou["value"])
    worst_nms_iou = float(worst_iou["value"])

    print("\nBest nms_iou (by MAE):", best_iou)
    print("Worst nms_iou (by MAE):", worst_iou)

    # ---- 3) min_box_area sweep (use best threshold + best nms) ----
    area_rows = []
    for a in min_areas:
        s, _ = evaluate_folder_detailed(
            folder_path, threshold=best_threshold, nms_iou=best_nms_iou, min_box_area=int(a),
            ar_min=ar_min, ar_max=ar_max, device=device, verbose=False
        )
        area_rows.append({"param": "min_box_area", "value": int(a), "MAE": s["MAE"], "ExactMatch": s["ExactMatch"]})
        print(f"[AREA] area={int(a)} | MAE={s['MAE']:.3f} | Exact={s['ExactMatch']*100:.1f}%")

    df_area = pd.DataFrame(area_rows)
    _plot_curve(
        df_area["value"], df_area["MAE"],
        "min_box_area (px^2)", "MAE",
        f"MAE vs min_box_area (th={best_threshold:.2f}, nms={best_nms_iou:.2f})",
        save_path=os.path.join(plots_dir, f"MAE_vs_min_box_area_th{best_threshold:.2f}_nms{best_nms_iou:.2f}.png")
    )
    _plot_curve(
        df_area["value"], df_area["ExactMatch"]*100,
        "min_box_area (px^2)", "Exact Match Rate (%)",
        f"Exact Match vs min_box_area (th={best_threshold:.2f}, nms={best_nms_iou:.2f})",
        save_path=os.path.join(plots_dir, f"ExactMatch_vs_min_box_area_th{best_threshold:.2f}_nms{best_nms_iou:.2f}.png")
    )

    best_area, worst_area = _pick_best_worst(df_area, metric="MAE")
    best_min_area = int(best_area["value"])
    worst_min_area = int(worst_area["value"])

    print("\nBest min_box_area (by MAE):", best_area)
    print("Worst min_box_area (by MAE):", worst_area)

    best_params = {"threshold": best_threshold, "nms_iou": best_nms_iou, "min_box_area": best_min_area, "ar_min": ar_min, "ar_max": ar_max}
    worst_params = {"threshold": float(worst_threshold), "nms_iou": float(worst_nms_iou), "min_box_area": int(worst_min_area), "ar_min": ar_min, "ar_max": ar_max}

    print(f"\nSaved plots to: {plots_dir}")

    return {
        "df_threshold": df_th,
        "df_nms": df_iou,
        "df_area": df_area,
        "best_params": best_params,
        "worst_params": worst_params,
        "plots_dir": plots_dir
    }

# ---------- Visualization artifacts ----------
def save_comparison_images(
    worst_df: pd.DataFrame,
    folder_path: str,
    worst_params: dict,
    best_params: dict,
    device=None,
    out_dir="/content/eval_outputs",
):
    """
    Finds the worst-recognition image under worst_params (max abs_error from worst_df),
    then saves two annotated images: worst params and best params.
    """
    os.makedirs(out_dir, exist_ok=True)

    # Pick worst example (highest abs error)
    worst_row = worst_df.iloc[0].to_dict()
    img_path = worst_row["path"]
    filename = worst_row["file"]
    gt = int(worst_row["gt"])

    img = Image.open(img_path).convert("RGB")

    # Worst params detection + viz
    out_w = detect_people(img, device=device, **worst_params)
    vis_w = visualize_people(img, out_w)
    pred_w = int(out_w["count"])
    out_w_path = os.path.join(out_dir, f"WORSTPARAMS_{Path(filename).stem}_GT{gt}_P{pred_w}.png")
    vis_w.save(out_w_path)

    # Best params detection + viz
    out_b = detect_people(img, device=device, **best_params)
    vis_b = visualize_people(img, out_b)
    pred_b = int(out_b["count"])
    out_b_path = os.path.join(out_dir, f"BESTPARAMS_{Path(filename).stem}_GT{gt}_P{pred_b}.png")
    vis_b.save(out_b_path)

    print("Worst-recognition image:", filename, "| GT=", gt)
    print("Saved:")
    print(" -", out_w_path)
    print(" -", out_b_path)

    return {
        "worst_image_file": filename,
        "worst_image_path": img_path,
        "gt": gt,
        "pred_worst": pred_w,
        "pred_best": pred_b,
        "saved_worst_viz": out_w_path,
        "saved_best_viz": out_b_path,
        "debug_worst": out_w["debug"],
        "debug_best": out_b["debug"],
    }

**Usage on Picture Folder (20)**

* Sweep threshold, NMS IoU, min_box_area (with graphs)

* Find and save best and worst parameter sets (by MAE)

* For best + worst: print overall MAE + Exact Match on the whole folder

* Find the worst-recognition image under the worst parameters (max abs error)
Produce and save two annotated images of that same file:
  1. with worst parameters
  2. with best parameters

In [None]:
import os
from IPython.display import display
from PIL import Image
import pandas as pd
import torch

# =========================
# CONFIG
# =========================
folder_path = "/content/20_pictures"
device = "cuda" if torch.cuda.is_available() else "cpu"

plots_dir = "/content/eval_plots"
outputs_dir = "/content/eval_outputs"
tables_dir = "/content/eval_tables"

os.makedirs(plots_dir, exist_ok=True)
os.makedirs(outputs_dir, exist_ok=True)
os.makedirs(tables_dir, exist_ok=True)

print("Folder:", folder_path)
print("Device:", device)
print("Plots dir:", plots_dir)
print("Outputs dir:", outputs_dir)
print("Tables dir:", tables_dir)

# =========================
# 1) Sweeps (saves plots automatically)
# =========================
sweep_res = run_all_sweeps(
    folder_path=folder_path,
    thresholds=(0.2, 0.3, 0.4, 0.5, 0.6),
    nms_ious=(0.3, 0.4, 0.5, 0.6, 0.7),
    min_areas=(0, 200, 400, 700, 1000, 1500),
    base_threshold=0.4,
    base_nms_iou=0.60,
    base_min_box_area=700,
    device=device,
    plots_dir=plots_dir,  # <-- IMPORTANT so graphs are saved
)

best_params = sweep_res["best_params"]
worst_params = sweep_res["worst_params"]

print("\n=== BEST PARAMS (sequential best-by-MAE) ===")
print(best_params)

print("\n=== WORST PARAMS (sequential worst-by-MAE) ===")
print(worst_params)

# =========================
# 2) Evaluate best/worst on whole folder + save per-image tables
# =========================
best_summary, best_df = evaluate_folder_detailed(folder_path, device=device, **best_params)
worst_summary, worst_df = evaluate_folder_detailed(folder_path, device=device, **worst_params)

best_csv = os.path.join(tables_dir, "per_image_results_BEST.csv")
worst_csv = os.path.join(tables_dir, "per_image_results_WORST.csv")
best_df.to_csv(best_csv, index=False)
worst_df.to_csv(worst_csv, index=False)

print("\n=== Overall performance on whole folder ===")
print(f"BEST  -> MAE={best_summary['MAE']:.3f} | ExactMatch={best_summary['ExactMatch']*100:.1f}% | N={best_summary['N_evaluated']}")
print(f"WORST -> MAE={worst_summary['MAE']:.3f} | ExactMatch={worst_summary['ExactMatch']*100:.1f}% | N={worst_summary['N_evaluated']}")
print("\nSaved per-image CSVs:")
print(" -", best_csv)
print(" -", worst_csv)

# =========================
# 3) Visualize the worst-recognition image (according to WORST params)
# =========================
artifact = save_comparison_images(
    worst_df=worst_df,
    folder_path=folder_path,
    worst_params=worst_params,
    best_params=best_params,
    device=device,
    out_dir=outputs_dir,
)

print("\nSaved visualizations:")
print(" -", artifact["saved_worst_viz"])
print(" -", artifact["saved_best_viz"])

# Show inline
display(Image.open(artifact["saved_worst_viz"]))
display(Image.open(artifact["saved_best_viz"]))

# =========================
# 4) Quick directory listing (optional)
# =========================
print("\nFiles created:")
print("Plots:", os.listdir(plots_dir))
print("Outputs:", os.listdir(outputs_dir))
print("Tables:", os.listdir(tables_dir))


# Implementation: Video

In [None]:
!pip -q install opencv-python

import cv2
import numpy as np
from PIL import Image

In [None]:
def bgr_to_pil(frame_bgr):
    frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
    return Image.fromarray(frame_rgb)

In [None]:
def run_detection_on_video(
    video_path,
    every_n_frames=3,
    threshold=0.4,
    min_box_area=300,
    nms_iou = 0.5,
    device=None,
    max_frames=None,
):
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)

    frame_idx = 0
    detections_per_frame = []  # list of dicts: {frame_idx, boxes, scores}

    while True:
        ok, frame = cap.read()
        if not ok:
            break

        if max_frames is not None and frame_idx >= max_frames:
            break

        if frame_idx % every_n_frames == 0:
            pil_img = bgr_to_pil(frame)
            out = detect_people(
                pil_img,
                threshold = threshold,
                min_box_area = min_box_area,
                nms_iou = nms_iou,
                device = device
            )
            detections_per_frame.append({
                "frame_idx": frame_idx,
                "boxes": out["boxes"].numpy(),   # (N,4)
                "scores": out["scores"].numpy()  # (N,)
            })

        frame_idx += 1

    cap.release()
    return detections_per_frame, fps

In [None]:
import numpy as np

def iou_xyxy(a, b):
    # a,b: [x1,y1,x2,y2]
    x1 = max(a[0], b[0])
    y1 = max(a[1], b[1])
    x2 = min(a[2], b[2])
    y2 = min(a[3], b[3])

    inter_w = max(0.0, x2 - x1)
    inter_h = max(0.0, y2 - y1)
    inter = inter_w * inter_h

    area_a = max(0.0, a[2] - a[0]) * max(0.0, a[3] - a[1])
    area_b = max(0.0, b[2] - b[0]) * max(0.0, b[3] - b[1])
    union = area_a + area_b - inter + 1e-9
    return inter / union

In [None]:
def greedy_match_iou(tracks_boxes, det_boxes, iou_thresh):
    """
    Returns list of (track_idx, det_idx) matches using greedy IoU.
    """
    if len(tracks_boxes) == 0 or len(det_boxes) == 0:
        return []

    # IoU matrix [T, D]
    ious = np.zeros((len(tracks_boxes), len(det_boxes)), dtype=np.float32)
    for t in range(len(tracks_boxes)):
        for d in range(len(det_boxes)):
            ious[t, d] = iou_xyxy(tracks_boxes[t], det_boxes[d])

    matches = []
    used_t = set()
    used_d = set()

    # Flatten sorted by IoU descending
    flat = [(t, d, ious[t, d]) for t in range(ious.shape[0]) for d in range(ious.shape[1])]
    flat.sort(key=lambda x: x[2], reverse=True)

    for t, d, v in flat:
        if v < iou_thresh:
            break
        if t in used_t or d in used_d:
            continue
        used_t.add(t)
        used_d.add(d)
        matches.append((t, d))

    return matches

In [None]:
class Track:
    def __init__(self, track_id, box, score, frame_idx):
        self.id = track_id
        self.box = box.astype(np.float32)
        self.score = float(score)
        self.last_frame = int(frame_idx)

        self.hits = 1         # number of times matched
        self.age = 0          # frames since last match (in sampled frames)

    def update(self, box, score, frame_idx, smooth=0.7):
        # Optional smoothing to reduce jitter
        self.box = smooth * self.box + (1 - smooth) * box.astype(np.float32)
        self.score = float(score)
        self.last_frame = int(frame_idx)
        self.hits += 1
        self.age = 0

In [None]:
def track_unique_people(
    detections_per_frame,
    iou_match_thresh=0.35,
    max_age=3,
    min_hits=2,
    smooth=0.7,
):
    """
    Args:
      detections_per_frame: list of dicts from Step 1:
        {"frame_idx": int, "boxes": (N,4) np array, "scores": (N,) np array}

    Returns:
      total_unique_confirmed: int
      tracks_history: list of summary per frame (optional debug)
    """
    tracks = []
    next_id = 1
    confirmed_ids = set()
    history = []

    for item in detections_per_frame:
        frame_idx = item["frame_idx"]
        det_boxes = item["boxes"]
        det_scores = item["scores"]

        # Age all tracks (they might get matched and reset to 0)
        for tr in tracks:
            tr.age += 1

        tracks_boxes = [tr.box for tr in tracks]
        matches = greedy_match_iou(tracks_boxes, det_boxes, iou_match_thresh)

        matched_tracks = set()
        matched_dets = set()

        # Update matched tracks
        for t_idx, d_idx in matches:
            tracks[t_idx].update(det_boxes[d_idx], det_scores[d_idx], frame_idx, smooth=smooth)
            matched_tracks.add(t_idx)
            matched_dets.add(d_idx)

            if tracks[t_idx].hits >= min_hits:
                confirmed_ids.add(tracks[t_idx].id)

        # Create new tracks for unmatched detections
        for d_idx in range(len(det_boxes)):
            if d_idx in matched_dets:
                continue
            tr = Track(next_id, det_boxes[d_idx], det_scores[d_idx], frame_idx)
            tracks.append(tr)
            next_id += 1

        # Remove dead tracks
        tracks = [tr for tr in tracks if tr.age <= max_age]

        # Debug snapshot (optional)
        history.append({
            "frame_idx": frame_idx,
            "num_dets": int(len(det_boxes)),
            "num_tracks_alive": int(len(tracks)),
            "num_confirmed_total": int(len(confirmed_ids)),
        })

    return len(confirmed_ids), history

In [None]:
import cv2

def draw_tracks_on_frame(frame_bgr, tracks, color=(0, 255, 0)):
    """
    frame_bgr: OpenCV image
    tracks: list of Track objects (alive tracks)
    """
    out = frame_bgr.copy()
    for tr in tracks:
        x1, y1, x2, y2 = tr.box
        x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
        cv2.rectangle(out, (x1, y1), (x2, y2), color, 2)

        label = f"ID {tr.id} | hits {tr.hits}"
        cv2.putText(out, label, (x1, max(0, y1 - 8)),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2, cv2.LINE_AA)
    return out


In [None]:
def track_unique_people_with_snapshots(
    detections_per_frame,
    iou_match_thresh=0.35,
    max_age=3,
    min_hits=2,
    smooth=0.7,
):
    tracks = []
    next_id = 1
    confirmed_ids = set()

    # frame_idx -> list of (id, box, hits) for drawing
    snapshots = {}

    for item in detections_per_frame:
        frame_idx = item["frame_idx"]
        det_boxes = item["boxes"]
        det_scores = item["scores"]

        for tr in tracks:
            tr.age += 1

        tracks_boxes = [tr.box for tr in tracks]
        matches = greedy_match_iou(tracks_boxes, det_boxes, iou_match_thresh)

        matched_tracks = set()
        matched_dets = set()

        for t_idx, d_idx in matches:
            tracks[t_idx].update(det_boxes[d_idx], det_scores[d_idx], frame_idx, smooth=smooth)
            matched_tracks.add(t_idx)
            matched_dets.add(d_idx)
            if tracks[t_idx].hits >= min_hits:
                confirmed_ids.add(tracks[t_idx].id)

        for d_idx in range(len(det_boxes)):
            if d_idx in matched_dets:
                continue
            tr = Track(next_id, det_boxes[d_idx], det_scores[d_idx], frame_idx)
            tracks.append(tr)
            next_id += 1

        tracks = [tr for tr in tracks if tr.age <= max_age]

        # snapshot for this sampled frame
        snapshots[frame_idx] = [
            {"id": tr.id, "box": tr.box.copy(), "hits": tr.hits}
            for tr in tracks
        ]

    return len(confirmed_ids), snapshots


In [None]:
def render_tracked_video(
    input_video_path,
    output_video_path,
    detections_per_frame,
    every_n_frames=3,
    iou_match_thresh=0.35,
    max_age=3,
    min_hits=2,
    smooth=0.7,
):
    # Run tracking once to get snapshots
    unique_count, snapshots = track_unique_people_with_snapshots(
        detections_per_frame,
        iou_match_thresh=iou_match_thresh,
        max_age=max_age,
        min_hits=min_hits,
        smooth=smooth,
    )
    print("Unique people (confirmed):", unique_count)

    cap = cv2.VideoCapture(input_video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(output_video_path, fourcc, fps, (w, h))

    frame_idx = 0
    last_snapshot = []

    while True:
        ok, frame = cap.read()
        if not ok:
            break

        # Update snapshot only on sampled frames if available
        if frame_idx in snapshots:
            last_snapshot = snapshots[frame_idx]

        # Convert snapshot dicts to Track-like lightweight objects for drawing
        class _T: pass
        alive = []
        for s in last_snapshot:
            t = _T()
            t.id = s["id"]
            t.box = s["box"]
            t.hits = s["hits"]
            alive.append(t)

        annotated = draw_tracks_on_frame(frame, alive)

        # Display the dynamic real-time count
        cv2.putText(annotated, f"Unique people (so far): {unique_count}",
                    (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.9,
                    (255, 255, 255), 2, cv2.LINE_AA)

        writer.write(annotated)
        frame_idx += 1

    cap.release()
    writer.release()
    return unique_count, output_video_path

# Testing: Video

**Test Cells: Intial first Check**

In [None]:
detections, fps = run_detection_on_video(
    "/content/elevator_film_1.mp4",
    every_n_frames=3,
    device="cuda" if torch.cuda.is_available() else "cpu",
    max_frames=300  # optional limit while testing
)
print("frames with detections:", len(detections), "fps:", fps)
print("example:", detections[0]["frame_idx"], detections[0]["boxes"].shape)

In [None]:
unique_count, hist = track_unique_people(
    detections,
    iou_match_thresh=0.35,
    max_age=3,     # in sampled frames (with every_n_frames=3, this is ~9 real frames)
    min_hits=2,
    smooth=0.7
)

print("Unique people in video:", unique_count)
print("Last history:", hist[-5:])


In [None]:
unique_count, out_path = render_tracked_video(
    input_video_path="/content/elevator_film_1.mp4",
    output_video_path="/content/tracked_output_1.mp4",
    detections_per_frame=detections,
    every_n_frames=3,
    iou_match_thresh=0.35,
    max_age=3,
    min_hits=2,
    smooth=0.7
)

print("Saved:", out_path)

**Usage on Video Folder (5)**

In [None]:
import os
from pathlib import Path
import pandas as pd

VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4v"}

def batch_track_videos(
    input_folder: str,
    output_folder: str = None,
    *,
    every_n_frames: int = 3,
    threshold: float = 0.4,
    min_box_area: int = 300,
    nms_iou: float = 0.5,
    device: str = None,
    max_frames=None,   # keep compatible with older python
    # tracker params
    iou_match_thresh: float = 0.35,
    max_age: int = 3,
    min_hits: int = 2,
    smooth: float = 0.7,
    # options
    overwrite: bool = False,
    save_csv: bool = True,
):
    input_folder = Path(input_folder)
    if output_folder is None:
        output_folder = input_folder / "tracked_outputs"
    output_folder = Path(output_folder)
    output_folder.mkdir(parents=True, exist_ok=True)

    if device is None:
        device = "cuda" if ("torch" in globals() and torch.cuda.is_available()) else "cpu"

    video_paths = sorted([
        p for p in input_folder.iterdir()
        if p.is_file() and p.suffix.lower() in VIDEO_EXTS
    ])

    if not video_paths:
        raise FileNotFoundError(f"No videos found in: {input_folder}")

    results = []

    for idx, vid_path in enumerate(video_paths, start=1):
        stem = vid_path.stem
        out_path = output_folder / f"tracked_{stem}.mp4"

        print(f"\n[{idx}/{len(video_paths)}] Processing: {vid_path.name}")
        print(f"  params: every_n_frames={every_n_frames}, thr={threshold}, area>={min_box_area}, nms_iou={nms_iou}")

        if out_path.exists() and not overwrite:
            print(f"  -> Skipping (already exists): {out_path.name}")
            results.append({
                "video": vid_path.name,
                "input_path": str(vid_path),
                "output_path": str(out_path),
                "fps": None,
                "frames_with_detections": None,
                "unique_people": None,
                "status": "skipped_exists",
            })
            continue

        try:
            # 1) detection (FIX: pass params through)
            detections, fps = run_detection_on_video(
                str(vid_path),
                every_n_frames=every_n_frames,
                threshold=threshold,
                min_box_area=min_box_area,
                nms_iou=nms_iou,
                device=device,
                max_frames=max_frames,
            )

            frames_with_det = len(detections)
            print(f"  detections frames: {frames_with_det}, fps: {fps}")

            if frames_with_det == 0:
                print("  -> No sampled frames processed or no detections list. Skipping render.")
                results.append({
                    "video": vid_path.name,
                    "input_path": str(vid_path),
                    "output_path": None,
                    "fps": fps,
                    "frames_with_detections": 0,
                    "unique_people": 0,
                    "status": "no_detections",
                })
                continue

            # 2) tracking stats
            unique_count, _hist = track_unique_people(
                detections,
                iou_match_thresh=iou_match_thresh,
                max_age=max_age,
                min_hits=min_hits,
                smooth=smooth
            )
            print(f"  unique people (confirmed): {unique_count}")

            # 3) render tracked video
            unique_count_render, saved_path = render_tracked_video(
                input_video_path=str(vid_path),
                output_video_path=str(out_path),
                detections_per_frame=detections,
                every_n_frames=every_n_frames,
                iou_match_thresh=iou_match_thresh,
                max_age=max_age,
                min_hits=min_hits,
                smooth=smooth
            )

            results.append({
                "video": vid_path.name,
                "input_path": str(vid_path),
                "output_path": str(saved_path),
                "fps": fps,
                "frames_with_detections": frames_with_det,
                "unique_people": int(unique_count_render),
                "status": "ok",
            })

            print(f"  saved: {saved_path}")

        except Exception as e:
            print(f"  !! ERROR on {vid_path.name}: {type(e).__name__}: {e}")
            results.append({
                "video": vid_path.name,
                "input_path": str(vid_path),
                "output_path": None,
                "fps": None,
                "frames_with_detections": None,
                "unique_people": None,
                "status": f"error: {type(e).__name__}",
            })

    df_summary = pd.DataFrame(results)

    if save_csv:
        csv_path = output_folder / "summary.csv"
        df_summary.to_csv(csv_path, index=False)
        print("\nSaved summary:", csv_path)

    return df_summary

In [None]:
input_folder = "/content/videos"
summary = batch_track_videos(
    input_folder="/content/videos",
    output_folder="/content/tracked_outputs",
    every_n_frames=3,
    threshold=0.4,
    min_box_area=300,
    nms_iou=0.5,
    device="cuda" if torch.cuda.is_available() else "cpu",
    max_frames=None,
    overwrite=True
)

display(summary)
print("\nSaved outputs in:", "/content/tracked_outputs")