In [115]:
pip install ultralytics flwr huggingface_hub opencv-python pillow torchvision opacus imageio

Note: you may need to restart the kernel to use updated packages.


In [116]:
YOLO81_NAMES = [
    "person","bicycle","car","motorcycle","airplane","bus","train","truck","boat","traffic light",
    "fire hydrant","stop sign","parking meter","bench","bird","cat","dog","horse","sheep","cow",
    "elephant","bear","zebra","giraffe","backpack","umbrella","handbag","tie","suitcase",
    "frisbee","skis","snowboard","sports ball","kite","baseball bat","baseball glove","skateboard",
    "surfboard","tennis racket","bottle","wine glass","cup","fork","knife","spoon","bowl","banana",
    "apple","sandwich","orange","broccoli","carrot","hot dog","pizza","donut","cake","chair",
    "couch","potted plant","bed","dining table","toilet","tv","laptop","mouse","remote","keyboard",
    "cell phone","microwave","oven","toaster","sink","refrigerator","book","clock","vase",
    "scissors","teddy bear","hair drier","toothbrush","face"
]

In [117]:
import argparse
import copy
import random
import time
from pathlib import Path
from typing import Dict, List, Tuple

import numpy as np
import torch
from ultralytics import YOLO
import imageio.v3 as iio  # noqa: F401


In [118]:
# -----------------------------
# Utils
# -----------------------------
def set_seed(seed: int = 1337):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)


@torch.no_grad()
def copy_state_dict(sd: Dict[str, torch.Tensor], device: torch.device) -> Dict[str, torch.Tensor]:
    return {k: v.detach().clone().to(device) for k, v in sd.items()}


def state_dict_difference(new: Dict[str, torch.Tensor],
                          old: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
    """new - old, for float tensors only; zeros for non-float/buffers."""
    out = {}
    for k, vold in old.items():
        vnew = new[k]
        if torch.is_floating_point(vold):
            out[k] = (vnew.to(vold.dtype) - vold).to(vold.device)
        else:
            out[k] = torch.zeros_like(vold, device=vold.device)
    return out


def state_dict_add(base: Dict[str, torch.Tensor],
                   delta: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
    """base + delta for floats; identity for non-floats."""
    out = {}
    for k, vb in base.items():
        vd = delta[k]
        if torch.is_floating_point(vb):
            out[k] = (vb + vd.to(vb.dtype)).to(vb.device)
        else:
            out[k] = vb
    return out


def average_state_dicts(sds: List[Dict[str, torch.Tensor]]) -> Dict[str, torch.Tensor]:
    """Mean over float tensors across clients; zeros for non-floats."""
    out = {}
    keys = sds[0].keys()
    for k in keys:
        t0 = sds[0][k]
        if torch.is_floating_point(t0):
            stacked = torch.stack([sd[k].to(t0.dtype) for sd in sds], dim=0)
            out[k] = stacked.mean(dim=0).to(t0.dtype)
        else:
            out[k] = torch.zeros_like(t0)
    return out


def l2_norm_of_delta(delta: Dict[str, torch.Tensor]) -> float:
    sq = 0.0
    for v in delta.values():
        sq += float(torch.sum(v.float() ** 2))
    return float(np.sqrt(sq))


def clip_and_add_noise(delta: Dict[str, torch.Tensor], clip_norm: float, noise_multiplier: float) -> Dict[str, torch.Tensor]:
    """Client-level DP: clip the update to L2<=clip_norm, then add N(0, (sigma*clip_norm)^2)."""
    norm = l2_norm_of_delta(delta) + 1e-12
    scale = min(1.0, clip_norm / norm)
    sigma = noise_multiplier * clip_norm
    noised = {}
    for k, v in delta.items():
        clipped = v * scale
        if sigma > 0.0:
            noised[k] = clipped + torch.randn_like(clipped) * sigma
        else:
            noised[k] = clipped
    return noised


def load_yolo_for_pretrained(weights_path: str, device: torch.device):
    """
    Load an Ultralytics YOLO model from an official checkpoint name (e.g., 'yolov8n.pt')
    or a local .pt file. Do NOT force nc/names; Ultralytics' Trainer will auto-adjust
    the detection head to match data.yaml (nc) during .train().
    """
    model = YOLO(weights_path)  # auto-downloads by name if needed
    model.model.to(device)
    return model


def ensure_weights(weights_path: str) -> str:
    """
    Return the provided weights if they exist, otherwise return an official model name
    so Ultralytics auto-downloads (e.g. 'yolov8n.pt').
    """
    if weights_path and Path(weights_path).exists():
        return weights_path
    # Default to official YOLOv8n if a path isn't provided/found
    print("No local weights found; using official 'yolov8n.pt' (auto-download).")
    return "yolov8n.pt"

def align_model_head_to_yaml(model: "YOLO", data_yaml: str, device: torch.device, imgsz: int = 640):
    """
    Use Ultralytics' Trainer pipeline to rebuild the detection head so that 'nc'
    matches the given data.yaml. epochs=0 avoids actual training but performs
    model/data setup.
    """
    _ = model.train(
        data=data_yaml,
        epochs=0,            # <- no training; just builds according to data.yaml
        imgsz=imgsz,
        device=0 if device.type == "cuda" else "cpu",
        verbose=False,
    )
    model.model.to(device)

In [119]:
# -----------------------------
# Federated Client
# -----------------------------
class FLClient:
    def __init__(
        self,
        client_id: int,
        device: torch.device,
        weights_path: str,
        data_yaml: str,
        workdir: Path,
        imgsz: int = 640,
        local_epochs: int = 1,
        batch: int = 16,
        dp_enabled: bool = False,
        dp_clip: float = 1.0,
        dp_noise_mult: float = 0.5,
        freeze_backbone: bool = False,
    ):
        self.client_id = client_id
        self.device = device
        self.weights_path = weights_path
        self.imgsz = imgsz
        self.local_epochs = local_epochs
        self.batch = batch
        self.dp_enabled = dp_enabled
        self.dp_clip = dp_clip
        self.dp_noise_mult = dp_noise_mult
        self.data_yaml = data_yaml
        self.freeze_backbone = freeze_backbone

        # Local YOLO instance per client (isolated optimizer/EMA/augment state)
        self.model = load_yolo_for_pretrained(weights_path, device)
        self.run_dir = workdir / f"client_{client_id}"
        self.run_dir.mkdir(parents=True, exist_ok=True)

        print(
            "client", self.client_id,
            "task=", getattr(self.model.model, "task", None),
            "nc=", getattr(self.model.model, "nc", None),
            "names_len=", len(getattr(self.model.model, "names", []))
        )

    def _freeze_some_layers(self):
        if not self.freeze_backbone:
            return
        # Heuristic: freeze early backbone blocks (YOLOv8n indexing)
        for name, param in self.model.model.named_parameters():
            if any(name.startswith(f"model.{i}.") for i in range(10)):
                param.requires_grad = False

    def get_state_dict(self) -> Dict[str, torch.Tensor]:
        return copy_state_dict(self.model.model.state_dict(), self.device)

    def set_state_dict(self, sd: Dict[str, torch.Tensor]):
        # HEADS may differ across clients; be tolerant
        self.model.model.load_state_dict(sd, strict=False)

    def local_train_and_get_update(self, global_sd: Dict[str, torch.Tensor]) -> Tuple[Dict[str, torch.Tensor], dict]:
        # Load broadcaster's global params
        self.set_state_dict(global_sd)

        # Optional freezing
        self._freeze_some_layers()

        self.model.overrides = getattr(self.model, "overrides", {}) or {}
        self.model.overrides["epochs"] = int(self.local_epochs)
        self.model.overrides["resume"] = False
        # Train locally (Ultralytics will adjust head to self.data_yaml's nc)
        results = self.model.train(
            data=self.data_yaml,
            epochs=self.local_epochs,
            imgsz=self.imgsz,
            batch=self.batch,
            device=0 if self.device.type == "cuda" else "cpu",
            project=str(self.run_dir),
            name=f"round_train",
            exist_ok=True,
            verbose=False,
        )

        # Compute client update (delta = new - old)
        new_sd = self.get_state_dict()
        delta = state_dict_difference(new_sd, global_sd)

        # Optionally apply client-level DP
        if self.dp_enabled:
            delta = clip_and_add_noise(delta, clip_norm=self.dp_clip, noise_multiplier=self.dp_noise_mult)

        # Gather some metrics (best-effort; Ultralytics APIs vary)
        metrics = {}
        try:
            metrics["train_epochs"] = self.local_epochs
            metrics["train_batches"] = getattr(results, "trainer", None) and getattr(results.trainer, "nb", None)
        except Exception:
            pass

        return delta, metrics

    @torch.no_grad()
    def local_val_map(self, data_yaml: str = None) -> dict:
        res = self.model.val(
            data=data_yaml or self.data_yaml,
            imgsz=self.imgsz,
            device=0 if self.device.type == "cuda" else "cpu",
            split="val",
            verbose=False,
        )
        d = getattr(res, "results_dict", {}) or {}
        return {
            "precision": float(d.get("metrics/precision(B)", 0.0)),
            "recall":    float(d.get("metrics/recall(B)",    0.0)),
            "map50":     float(d.get("metrics/mAP50(B)",     0.0)),
            "map50_95":  float(d.get("metrics/mAP50-95(B)",  0.0)),
        }


In [120]:
# -----------------------------
# Federated Trainer
# -----------------------------
class FederatedTrainer:
    def __init__(self, clients: List[FLClient], device: torch.device):
        self.clients = clients
        self.device = device
        # Initialize global state from client 0's architecture
        self.global_model = load_yolo_for_pretrained(clients[0].weights_path, device)
        align_model_head_to_yaml(self.global_model, clients[0].data_yaml, device, imgsz=clients[0].imgsz)

        self.global_sd = copy_state_dict(self.global_model.model.state_dict(), device)
        

    def broadcast(self):
        for c in self.clients:
            c.set_state_dict(self.global_sd)

    def aggregate(self, deltas: List[Dict[str, torch.Tensor]]):
        # FedAvg: average the deltas, then add to global
        mean_delta = average_state_dicts(deltas)
        self.global_sd = state_dict_add(self.global_sd, mean_delta)
        # HEAD tolerances
        self.global_model.model.load_state_dict(self.global_sd, strict=False)

    @torch.no_grad()
    def evaluate_global_on_clients(self) -> dict:
        Ps, Rs, maps_50, maps_50_95 = [], [], [], []
        for c in self.clients:
            c.set_state_dict(self.global_sd)
            m = c.local_val_map()
            Ps.append(m["precision"]); Rs.append(m["recall"])
            maps_50.append(m["map50"]); maps_50_95.append(m["map50_95"])
        return {
            "avg_precision": float(np.mean(Ps)) if Ps else 0.0,
            "avg_recall":    float(np.mean(Rs)) if Rs else 0.0,
            "avg_map50":     float(np.mean(maps_50)) if maps_50 else 0.0,
            "avg_map50_95":  float(np.mean(maps_50_95)) if maps_50_95 else 0.0,
        }

    def run(self, rounds: int = 5) -> List[dict]:
        history = []
        for r in range(1, rounds + 1):
            t0 = time.time()

            print(f"\n=== Federated Round {r}/{rounds} ===")

            deltas = []
            client_metrics = []
            for c in self.clients:
                delta, m = c.local_train_and_get_update(self.global_sd)
                deltas.append(delta)
                client_metrics.append(m)

            # Aggregate
            self.aggregate(deltas)

            # Eval global
            metrics = self.evaluate_global_on_clients()
            metrics["round"] = r
            metrics["seconds"] = round(time.time() - t0, 2)
            history.append(metrics)
            print(f"[Round {r}] avg_mAP50={metrics['avg_map50']:.4f}, "
                  f"avg_mAP50-95={metrics['avg_map50_95']:.4f}, time={metrics['seconds']}s")

        # Save final global weights (both raw sd and full Ultralytics checkpoint)
        sd_path = Path("federated_best_sd.pt")
        torch.save(self.global_sd, sd_path)

        full_ckpt_path = Path("federated_best_full.pt")
        try:
            model_cpu = copy.deepcopy(self.global_model.model).float().cpu()
            model_cpu.load_state_dict(self.global_sd, strict=False)
            torch.save({"model": model_cpu}, full_ckpt_path)
            print(f"\n[Done] Saved raw state_dict -> {sd_path.resolve()}")
            print(f"[Done] Saved full Ultralytics checkpoint -> {full_ckpt_path.resolve()}")
        except Exception as e:
            print(f"[Warn] Could not save full Ultralytics checkpoint: {e}. "
                  f"Saved state_dict only -> {sd_path.resolve()}")

        return history



In [121]:
@torch.no_grad()
def censor_image(input_path: str, output_path: str, weights_path: str, conf: float = 0.25,
                 device: torch.device = torch.device("cpu")):
    import cv2
    import imageio.v3 as iio  # local import to ensure availability even if top import is removed

    # Try to load as full Ultralytics checkpoint first
    try:
        model = YOLO(weights_path)
        model.model.to(device)
    except Exception:
        # Fallback: treat as raw state_dict; load into a base arch
        base = YOLO("yolov8n.pt")
        sd = torch.load(weights_path, map_location="cpu")
        if isinstance(sd, dict) and "model" in sd and hasattr(sd["model"], "state_dict"):
            model = YOLO(weights_path)
        else:
            base.model.load_state_dict(sd, strict=False)
            model = base
        model.model.to(device)

    res = model.predict(
        source=input_path,
        conf=conf,
        verbose=False,
        device=0 if device.type == "cuda" else "cpu"
    )[0]

    img = cv2.imread(input_path)
    if img is None:
        raise FileNotFoundError(f"Could not read image: {input_path}")

    boxes = res.boxes.xyxy.detach().cpu().numpy().astype(int)
    for (x1, y1, x2, y2) in boxes:
        x1, y1 = max(0, x1), max(0, y1)
        x2, y2 = min(img.shape[1] - 1, x2), min(img.shape[0] - 1, y2)
        img[y1:y2, x1:x2] = 0  # solid black

    iio.imwrite(output_path, img)
    print(f"[Censor] Wrote: {output_path}")

In [122]:
def parse_args(argv=None):
    parser = argparse.ArgumentParser(
        description="Federated YOLOv8 (official weights) with optional client-level DP",
        allow_abbrev=False,
    )
    # Data/model
    parser.add_argument(
        "--weights",
        type=str,
        default="yolov8n.pt",
        help="Path or name of Ultralytics YOLOv8 checkpoint (e.g., yolov8n.pt, yolov8s.pt).",
    )
    parser.add_argument("--client-yamls", type=str, nargs="+", default=[],
                        help="Client dataset YAMLs; if empty, auto-discovers ./data/faces_clients_small/client_*.yaml")
    parser.add_argument("--imgsz", type=int, default=640)
    parser.add_argument("--batch", type=int, default=16)

    # FL
    parser.add_argument("--rounds", type=int, default=10, help="Federated rounds (epoch cycles).")
    parser.add_argument("--local-epochs", type=int, default=1, help="Local epochs per client per round.")

    # DP
    parser.add_argument("--dp", action="store_true", help="Enable client-level DP (clip + Gaussian noise on client update).")
    parser.add_argument("--dp-clip", type=float, default=1.0, help="Client update L2 clip norm.")
    parser.add_argument("--dp-noise-mult", type=float, default=0.5, help="Noise multiplier; noise std = sigma * clip_norm.")

    # Misc
    parser.add_argument("--device", type=str, default="auto", choices=["auto", "cpu", "cuda"])
    parser.add_argument("--seed", type=int, default=1337)
    parser.add_argument("--workdir", type=str, default="./fl_runs", help="Directory for client training outputs.")
    parser.add_argument("--freeze-backbone", action="store_true")

    # Optional post-training demo
    parser.add_argument("--demo-image", type=str, default="",
                        help="If provided, runs censoring on this image using final weights.")
    parser.add_argument("--demo-output", type=str, default="output_censored.jpg")
    parser.add_argument("--demo-conf", type=float, default=0.25)

    # Notebook-safe: ignore unknown args like "-f <kernel.json>"
    args, _ = parser.parse_known_args(argv)
    return args


In [123]:
def main(argv=None):
    args = parse_args(argv)

    # Auto-discover client yamls if not provided
    if not args.client_yamls:
        client_root = Path("./data/faces_clients_small")  # adjust if needed
        args.client_yamls = sorted(str(p) for p in client_root.glob("client_*.yaml"))
        print(f"[INFO] Auto-discovered {len(args.client_yamls)} client yamls under {client_root}")
        for y in args.client_yamls:
            print("   ", y)

    set_seed(args.seed)

    # Device
    if args.device == "auto":
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    else:
        device = torch.device(args.device)

    # Weights
    weights_path = ensure_weights(args.weights)

    # Build clients
    workdir = Path(args.workdir)
    workdir.mkdir(parents=True, exist_ok=True)
    client_yamls = args.client_yamls
    n_clients = len(client_yamls)
    if n_clients == 0:
        raise RuntimeError(
            "No client YAMLs found. Set --client-yamls or place client_*.yaml under ./data/faces_clients_smallest"
        )

    clients: List[FLClient] = []
    for i in range(n_clients):
        c = FLClient(
            client_id=i,
            device=device,
            weights_path=weights_path,
            data_yaml=client_yamls[i],
            workdir=workdir,
            imgsz=args.imgsz,
            local_epochs=args.local_epochs,
            batch=args.batch,
            dp_enabled=args.dp,
            dp_clip=args.dp_clip,
            dp_noise_mult=args.dp_noise_mult,
            freeze_backbone=args.freeze_backbone,
        )
        clients.append(c)

    # Federated training (FedAvg)
    trainer = FederatedTrainer(clients, device)
    history = trainer.run(rounds=args.rounds)

    # Optional demo: censor faces with final aggregated weights
    if args.demo_image:
        # Prefer full Ultralytics checkpoint for predict/val
        final_weights = "federated_best_full.pt"
        if not Path(final_weights).exists():
            # Fallback to raw sd if full ckpt isn't present
            final_weights = "federated_best_sd.pt"
        censor_image(
            input_path=args.demo_image,
            output_path=args.demo_output,
            weights_path=final_weights,
            conf=args.demo_conf,
            device=device,
        )

    # Summary
    print("\nRound\tavg_mAP50\tavg_mAP50_95")
    for h in history:
        print(f"{h['round']}\t{h['avg_map50']:.4f}\t\t{h['avg_map50_95']:.4f}")


if __name__ == "__main__":
    main()

[INFO] Auto-discovered 1 client yamls under data/faces_clients_small
    data/faces_clients_small/client_00.yaml
client 0 task= detect nc= 80 names_len= 80
Ultralytics 8.3.189 🚀 Python-3.13.7 torch-2.8.0 CPU (Apple M1 Pro)
[34m[1mengine/trainer: [0magnostic_nms=False, amp=True, augment=False, auto_augment=randaugment, batch=16, bgr=0.0, box=7.5, cache=False, cfg=None, classes=None, close_mosaic=10, cls=0.5, conf=None, copy_paste=0.0, copy_paste_mode=flip, cos_lr=False, cutmix=0.0, data=data/faces_clients_small/client_00.yaml, degrees=0.0, deterministic=True, device=cpu, dfl=1.5, dnn=False, dropout=0.0, dynamic=False, embed=None, epochs=0, erasing=0.4, exist_ok=False, fliplr=0.5, flipud=0.0, format=torchscript, fraction=1.0, freeze=None, half=False, hsv_h=0.015, hsv_s=0.7, hsv_v=0.4, imgsz=640, int8=False, iou=0.7, keras=False, kobj=1.0, line_width=None, lr0=0.01, lrf=0.01, mask_ratio=4, max_det=300, mixup=0.0, mode=train, model=yolov8n.pt, momentum=0.937, mosaic=1.0, multi_scale=Fal

KeyboardInterrupt: 