# Notebook 03 · Federated Simulation with Flower (CPU, CUDA, OpenVINO)

This notebook simulates a federated learning setting using Flower, training a CIFAR-10 model across multiple hardware scenarios:

- A: CPU-only clients
- B: Heterogeneous CPU + NVIDIA CUDA
- C: Scenario B + post-fit OpenVINO inference (CPU/GPU/NPU)
- D: Non-IID (Dirichlet α=0.1) + Scenario B

It records detailed per-round metrics (time, parameter sizes, energy via NVML when available, and optional OpenVINO inference metrics) to `metrics/03_flower_rounds.csv`. Use Notebook 04 to visualize results.

Notes:
- CUDA/NVML energy is recorded only when an NVIDIA GPU and `pynvml` are available; otherwise energy shows as N/D.
- OpenVINO IRs must exist under `models_saved/openvino_ir/` (e.g., `cnn_cifar10.xml`). If not found, OV metrics are skipped.
- This notebook is compatible with the Flower simulation API available in this repo; deprecation warnings are silenced for clarity.

In [1]:
# Environment and imports
import os
import platform
import socket
import sys
import time
from dataclasses import dataclass
from typing import Dict, List, Tuple

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Subset

# Flower (version-agnostic imports for simulation)
try:
    from flwr.server.strategy import FedAvg
    from flwr.simulation import start_simulation as _run_sim
    from flwr.server import ServerConfig
except Exception:
    # Fallbacks for other versions
    from flwr.server.strategy import FedAvg  # type: ignore

    try:
        from flwr.simulation import run_simulation as _run_sim  # type: ignore
    except Exception:
        from flwr.simulation.app import start_simulation as _run_sim  # type: ignore
    try:
        from flwr.server import ServerConfig  # type: ignore
    except Exception:
        ServerConfig = None  # type: ignore

from flwr.client import NumPyClient

# Project utilities
sys.path.append(os.path.abspath(os.path.join('..')))
from utils.data_utils import DataLoaderFactory
from utils.energy import GpuEnergyMeterNVML
from utils.infer_openvino import benchmark_numpy
from models import make_model
from utils.logging_utils import get_logger

# Logging
logger = get_logger("nb03")
logger.info("Python %s | Torch %s", platform.python_version(), torch.__version__)
try:
    import flwr as fl

    logger.info("Flower %s", getattr(fl, "__version__", "unknown"))
except Exception:
    logger.info("Flower version unknown")
try:
    import openvino as ov

    logger.info("OpenVINO %s", getattr(ov, "__version__", "unknown"))
except Exception:
    logger.info("OpenVINO not installed or version unknown")

# Silence deprecation and noisy logs
import warnings

warnings.filterwarnings("ignore", category=DeprecationWarning)
for _n in ["flwr", "ray", "grpc"]:
    try:
        import logging as _logging

        _logging.getLogger(_n).setLevel(_logging.ERROR)
    except Exception:
        pass

# Reproducibility
SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

DEVICE_NAME = "cuda" if torch.cuda.is_available() else "cpu"
logger.info("Detected device: %s", DEVICE_NAME)

# Paths
ROOT = os.path.abspath(os.path.join(os.path.dirname("."), ".."))
METRICS_DIR = os.path.join(ROOT, "metrics")
METRICS_CSV = os.path.join(METRICS_DIR, "03_flower_rounds.csv")
os.makedirs(METRICS_DIR, exist_ok=True)


2025-08-15 16:12:08 | INFO | nb03 | Python 3.11.9 | Torch 2.8.0+cu129
2025-08-15 16:12:08 | INFO | nb03 | Flower 1.20.0
2025-08-15 16:12:08 | INFO | nb03 | OpenVINO 2025.2.0-19140-c01cd93e24d-releases/2025/2
2025-08-15 16:12:10 | INFO | nb03 | Detected device: cuda


In [2]:
# Configuration
CFG = {
    "dataset": "cifar10",
    "model": "cnn",  # options: cnn, mlp, mobilenetv3, efficientnet_lite0
    "num_classes": 10,
    "batch_size": 128,
    "num_workers": 2,
    "epochs_per_round": 1,
    "rounds": 3,
    "clients": 4,
    "use_cuda_client": True,
    "dirichlet_alpha": 0.1,
    "ov_ir_name": "cnn_cifar10.xml",  # change if using a different model
    "bandwidth_mb_s": 100.0,  # for comm time estimation only (plots in nb04)
}

SCENARIOS = [
    {"id": "A", "name": "CPU-only", "use_cuda": False, "non_iid": False, "ov_bench": False},
    {"id": "B", "name": "CPU+CUDA", "use_cuda": True, "non_iid": False, "ov_bench": False},
    {"id": "C", "name": "CPU+CUDA+OV", "use_cuda": True, "non_iid": False, "ov_bench": True},
    {"id": "D", "name": "NonIID(α=0.1)+B", "use_cuda": True, "non_iid": True, "ov_bench": False},
]

CSV_COLUMNS = [
    "timestamp", "scenario", "round", "role", "cid", "device",
    "t_train_s", "t_eval_s", "t_agg_s", "params_bytes", "bytes_up", "bytes_down",
    "loss", "acc", "energy_j",
    # Optional OpenVINO metrics
    "ov_cpu_lat_ms", "ov_cpu_thr_ips", "ov_gpu_lat_ms", "ov_gpu_thr_ips", "ov_npu_lat_ms", "ov_npu_thr_ips",
    # Environment info
    "torch", "flwr", "openvino", "os", "hostname"
]

# Write CSV header if missing
if not os.path.exists(METRICS_CSV):
    import csv

    with open(METRICS_CSV, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(CSV_COLUMNS)


# Utility: sizes in bytes for parameters (float32)
def _params_to_bytes(model: nn.Module) -> int:
    return int(sum(p.numel() for p in model.parameters()) * 4)


# Thread limiting (per-process)
os.environ.setdefault("OMP_NUM_THREADS", "1")
os.environ.setdefault("MKL_NUM_THREADS", "1")
torch.set_num_threads(int(os.environ["OMP_NUM_THREADS"]))


In [3]:
# Data loading and partitioning


def load_cifar10_loaders(batch_size: int, num_workers: int) -> Tuple[DataLoader, DataLoader]:
    return DataLoaderFactory.get_cifar10_dataloaders(
        batch_size=batch_size,
        shuffle_train=True,
        shuffle_test=False,
        num_workers=num_workers,
        data_augmentation=True,
        download=True,
    )


def partition_indices_iid(n_samples: int, n_clients: int) -> List[np.ndarray]:
    idx = np.random.permutation(n_samples)
    return np.array_split(idx, n_clients)


def partition_indices_dirichlet(labels: np.ndarray, n_clients: int, alpha: float) -> List[np.ndarray]:
    n_classes = int(labels.max()) + 1
    class_indices = [np.nonzero(labels == c)[0] for c in range(n_classes)]
    for c in range(n_classes):
        np.random.shuffle(class_indices[c])
    client_indices: List[List[int]] = [[] for _ in range(n_clients)]
    for c in range(n_classes):
        counts = np.random.dirichlet(alpha=[alpha] * n_clients)
        counts = (counts / counts.sum() * len(class_indices[c])).astype(int)
        # fix under/overflow
        while counts.sum() < len(class_indices[c]):
            counts[np.argmax(counts)] += 1
        while counts.sum() > len(class_indices[c]):
            counts[np.argmax(counts)] -= 1
        start = 0
        for i, cnt in enumerate(counts):
            client_indices[i].extend(class_indices[c][start:start + cnt].tolist())
            start += cnt
    return [np.array(sorted(idx)) for idx in client_indices]


# Prepare base datasets once
train_loader_full, test_loader = load_cifar10_loaders(CFG["batch_size"], CFG["num_workers"])
train_dataset = train_loader_full.dataset  # type: ignore[attr-defined]


def make_client_loaders(n_clients: int, non_iid: bool = False, alpha: float = 0.1) -> List[DataLoader]:
    y = np.array(train_dataset.targets)  # type: ignore[attr-defined]
    if non_iid:
        parts = partition_indices_dirichlet(y, n_clients, alpha)
    else:
        parts = partition_indices_iid(len(train_dataset), n_clients)
    loaders = []
    for idx in parts:
        subset = Subset(train_dataset, indices=idx.tolist())
        loaders.append(DataLoader(subset, batch_size=CFG["batch_size"], shuffle=True, num_workers=CFG["num_workers"]))
    return loaders


INFO:utils.data_utils:Dataset CIFAR-10: using cached data at C:\Users\padul\OneDrive\Universidad\Doctorado\Desarrollo\federated-lab-multihw\data


In [4]:
# Flower client
@dataclass
class ClientContext:
    cid: str
    device: torch.device
    loader: DataLoader
    model_name: str
    num_classes: int
    use_nvml: bool
    ov_bench: bool
    scenario_id: str


def train_one_epoch(model: nn.Module, loader: DataLoader, device: torch.device, epochs: int = 1) -> Tuple[float, float]:
    model.train()
    opt = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)
    loss_meter = 0.0
    n = 0
    for _ in range(epochs):
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            opt.zero_grad(set_to_none=True)
            out = model(x)
            loss = F.cross_entropy(out, y)
            loss.backward()
            opt.step()
            b = x.size(0)
            loss_meter += float(loss.item() * b)
            n += int(b)
    return (loss_meter / max(n, 1), 0.0)


def evaluate(model: nn.Module, loader: DataLoader, device: torch.device) -> Tuple[float, float]:
    model.eval()
    loss_meter = 0.0
    correct = 0
    n = 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            out = model(x)
            loss = F.cross_entropy(out, y)
            pred = out.argmax(1)
            b = x.size(0)
            loss_meter += float(loss.item() * b)
            correct += int((pred == y).sum())
            n += int(b)
    return float(loss_meter / max(n, 1)), float(correct / max(n, 1))


class CifarClient(NumPyClient):
    def __init__(self, ctx: ClientContext, global_test: DataLoader):
        self.ctx = ctx
        self.device = ctx.device
        in_ch = 3
        self.model = make_model(ctx.model_name, num_classes=ctx.num_classes, in_ch=in_ch)
        self.test_loader = global_test
        self.params_bytes = _params_to_bytes(self.model)
        self.energy_meter = GpuEnergyMeterNVML(0) if (self.device.type == "cuda" and ctx.use_nvml) else None

    def get_parameters(self, config: Dict[str, str] | None = None):
        return [p.cpu().detach().numpy() for p in self.model.state_dict().values()]

    def set_parameters(self, parameters: List[np.ndarray]):
        state_dict = self.model.state_dict()
        new_state = {}
        for (k, v), arr in zip(state_dict.items(), parameters):
            new_state[k] = torch.tensor(arr, dtype=v.dtype)
        self.model.load_state_dict(new_state, strict=True)

    def fit(self, parameters, config):
        round_num = int(config.get("round", 0)) if isinstance(config, dict) else 0
        self.set_parameters(parameters)
        self.model.to(self.device)

        # Train with optional NVML energy
        def _work():
            train_one_epoch(self.model, self.ctx.loader, self.device, epochs=CFG["epochs_per_round"])

        if self.energy_meter is not None:
            energy_j, dt_ms = self.energy_meter.measure(_work)
            t_train_s = dt_ms / 1000.0
        else:
            t0 = time.perf_counter();
            _work();
            t_train_s = time.perf_counter() - t0;
            energy_j = -1.0
        # Eval on global test (CPU to keep comparable)
        dev_eval = torch.device("cpu")
        self.model.to(dev_eval)
        t0 = time.perf_counter();
        loss, acc = evaluate(self.model, self.test_loader, dev_eval);
        t_eval_s = time.perf_counter() - t0
        # Optional OpenVINO benchmarking (post-fit)
        ov_cpu_lat = ov_cpu_thr = ov_gpu_lat = ov_gpu_thr = ov_npu_lat = ov_npu_thr = -1.0
        if self.ctx.ov_bench:
            try:
                import numpy as _np
                x_small, y_small = next(iter(self.test_loader))
                x_small = x_small[:32].numpy()
                y_small = y_small[:32].numpy()
                ir_path = os.path.join(ROOT, "models_saved", "openvino_ir", CFG["ov_ir_name"])
                if os.path.exists(ir_path):
                    m = benchmark_numpy(ir_path, x_small, device="CPU", runs=50, warmup=10, y_true=y_small)
                    ov_cpu_lat, ov_cpu_thr = m.get("lat_ms_mean", -1.0), m.get("thr_ips", -1.0)
                    for dev in ("GPU", "NPU"):
                        try:
                            m2 = benchmark_numpy(ir_path, x_small if dev != "NPU" else x_small[:1], device=dev, runs=50,
                                                 warmup=10, y_true=y_small if dev != "NPU" else y_small[:1])
                            if dev == "GPU":
                                ov_gpu_lat, ov_gpu_thr = m2.get("lat_ms_mean", -1.0), m2.get("thr_ips", -1.0)
                            else:
                                ov_npu_lat, ov_npu_thr = m2.get("lat_ms_mean", -1.0), m2.get("thr_ips", -1.0)
                        except Exception:
                            pass
            except Exception:
                pass
        # Communication sizes (approx)
        bytes_up = self.params_bytes
        bytes_down = self.params_bytes
        # Append CSV row (client)
        import csv
        from datetime import datetime, timezone
        with open(METRICS_CSV, "a", newline="", encoding="utf-8") as f:
            w = csv.writer(f)
            w.writerow([
                datetime.now(timezone.utc).isoformat(),
                self.ctx.scenario_id,
                round_num,
                "client",
                self.ctx.cid,
                self.device.type,
                round(t_train_s, 6),
                round(t_eval_s, 6),
                -1.0,  # t_agg_s filled by server after aggregation
                self.params_bytes,
                bytes_up,
                bytes_down,
                round(loss, 6),
                round(acc, 6),
                round(energy_j, 6),
                round(ov_cpu_lat, 6), round(ov_cpu_thr, 6),
                round(ov_gpu_lat, 6), round(ov_gpu_thr, 6),
                round(ov_npu_lat, 6), round(ov_npu_thr, 6),
                torch.__version__,
                getattr(fl, "__version__", "unknown") if 'fl' in globals() else "unknown",
                getattr(ov, "__version__", "unknown") if 'ov' in globals() else "unknown",
                platform.platform(), socket.gethostname(),
            ])
        return self.get_parameters({}), len(self.ctx.loader.dataset), {}

    def evaluate(self, parameters, config):
        self.set_parameters(parameters)
        self.model.to(torch.device("cpu"))
        loss, acc = evaluate(self.model, self.test_loader, torch.device("cpu"))
        return float(loss), len(self.test_loader.dataset), {"acc": float(acc)}


In [5]:
# Strategy with aggregation timing and server eval logging
class TimedFedAvg(FedAvg):
    def __init__(self, scenario_id: str, test_loader: DataLoader, **kwargs):
        super().__init__(**kwargs)
        self.scenario_id = scenario_id
        self.test_loader = test_loader

    def aggregate_fit(self, rnd, results, failures):
        t0 = time.perf_counter()
        agg_params, agg_metrics = super().aggregate_fit(rnd, results, failures)
        t_agg_s = time.perf_counter() - t0
        # Patch last client rows for this round with t_agg_s (best effort)
        try:
            import pandas as pd
            if os.path.exists(METRICS_CSV):
                df = pd.read_csv(METRICS_CSV)
                mask = (df['scenario'] == self.scenario_id) & (df['round'] == rnd) & (df['role'] == 'client')
                df.loc[mask, 't_agg_s'] = float(t_agg_s)
                df.to_csv(METRICS_CSV, index=False)
        except Exception:
            pass
        return agg_params, agg_metrics


def make_evaluate_fn(model_name: str, num_classes: int, test_loader: DataLoader):
    def evaluate_fn(server_round: int, parameters, config):
        # Build a fresh model on CPU for evaluation consistency
        model = make_model(model_name, num_classes=num_classes, in_ch=3)
        state_dict = model.state_dict()
        for (k, v), arr in zip(state_dict.items(), parameters):
            state_dict[k] = torch.tensor(arr, dtype=v.dtype)
        model.load_state_dict(state_dict, strict=True)
        loss, acc = evaluate(model, test_loader, torch.device('cpu'))
        # Append CSV row (server)
        import csv
        from datetime import datetime, timezone
        with open(METRICS_CSV, "a", newline="", encoding="utf-8") as f:
            w = csv.writer(f)
            w.writerow([
                datetime.now(timezone.utc).isoformat(),
                config.get('scenario', 'unknown') if isinstance(config, dict) else 'unknown',
                server_round,
                "server",
                "-",
                "cpu",
                0.0, 0.0, 0.0,
                _params_to_bytes(model), 0, 0,
                round(loss, 6), round(acc, 6), -1.0,
                -1.0, -1.0, -1.0, -1.0, -1.0, -1.0,
                torch.__version__,
                getattr(fl, "__version__", "unknown") if 'fl' in globals() else "unknown",
                getattr(ov, "__version__", "unknown") if 'ov' in globals() else "unknown",
                platform.platform(), socket.gethostname(),
            ])
        return float(loss), {"acc": float(acc)}

    return evaluate_fn


In [6]:
# Run simulation across scenarios

def run_scenario(scn: Dict):
    n_clients = CFG["clients"]
    loaders = make_client_loaders(n_clients, non_iid=scn["non_iid"], alpha=CFG["dirichlet_alpha"])
    test_loader_local = test_loader

    def client_fn(cid: str):
        i = int(cid)
        use_cuda = scn["use_cuda"] and CFG["use_cuda_client"] and torch.cuda.is_available() and i == 0
        device = torch.device("cuda" if use_cuda else "cpu")
        ctx = ClientContext(
            cid=cid,
            device=device,
            loader=loaders[i],
            model_name=CFG["model"],
            num_classes=CFG["num_classes"],
            use_nvml=True,
            ov_bench=scn["ov_bench"],
            scenario_id=scn["id"],
        )
        return CifarClient(ctx, test_loader_local)

    # Evaluation function and strategy
    eval_fn = make_evaluate_fn(CFG["model"], CFG["num_classes"], test_loader_local)

    def on_fit_config_fn(server_round: int):
        return {"round": server_round, "scenario": scn["id"]}

    def on_evaluate_config_fn(server_round: int):
        return {"round": server_round, "scenario": scn["id"]}

    strategy = TimedFedAvg(
        scenario_id=scn["id"],
        test_loader=test_loader_local,
        fraction_fit=1.0,
        fraction_evaluate=0.0,
        min_fit_clients=n_clients,
        min_available_clients=n_clients,
        min_evaluate_clients=0,
        on_fit_config_fn=on_fit_config_fn,
        evaluate_fn=eval_fn,
        on_evaluate_config_fn=on_evaluate_config_fn,
    )

    # Client resources
    client_resources = {"num_cpus": 1, "num_gpus": 0.0}
    if scn["use_cuda"] and torch.cuda.is_available():
        client_resources = {"num_cpus": 1, "num_gpus": 1.0 / n_clients}

    # Server config (version-agnostic)
    cfg_obj = ServerConfig(num_rounds=CFG["rounds"]) if ServerConfig else {"num_rounds": CFG["rounds"]}

    # Run
    logger.info("Running scenario %s: %s", scn["id"], scn["name"])
    _run_sim(
        client_fn=client_fn,
        num_clients=n_clients,
        config=cfg_obj,
        strategy=strategy,
        client_resources=client_resources,
    )


for scn in SCENARIOS:
    run_scenario(scn)


2025-08-15 16:12:11 | INFO | nb03 | Running scenario A: CPU-only
2025-08-15 16:12:14,711	INFO worker.py:1771 -- Started a local Ray instance.
[36m(ClientAppActor pid=5840)[0m 
[36m(ClientAppActor pid=5840)[0m             This is a deprecated feature. It will be removed
[36m(ClientAppActor pid=5840)[0m             entirely in future versions of Flower.
[36m(ClientAppActor pid=5840)[0m         
[36m(ClientAppActor pid=5840)[0m 
[36m(ClientAppActor pid=5840)[0m             This is a deprecated feature. It will be removed
[36m(ClientAppActor pid=5840)[0m             entirely in future versions of Flower.
[36m(ClientAppActor pid=5840)[0m         
[36m(ClientAppActor pid=17372)[0m 
[36m(ClientAppActor pid=17372)[0m             This is a deprecated feature. It will be removed
[36m(ClientAppActor pid=17372)[0m             entirely in future versions of Flower.
[36m(ClientAppActor pid=17372)[0m         
[36m(ClientAppActor pid=2112)[0m 
[36m(ClientAppActor pid=2112)[

In [7]:
# Preview CSV (optional)
try:
    import pandas as pd

    if os.path.exists(METRICS_CSV):
        df_prev = pd.read_csv(METRICS_CSV)
        display(df_prev.head())
except Exception as e:
    print("CSV preview failed:", e)


Unnamed: 0,timestamp,scenario,round,role,cid,device,t_train_s,t_eval_s,t_agg_s,params_bytes,...,ov_cpu_thr_ips,ov_gpu_lat_ms,ov_gpu_thr_ips,ov_npu_lat_ms,ov_npu_thr_ips,torch,flwr,openvino,os,hostname
0,2025-08-15T14:12:44.952672,unknown,0,server,-,cpu,0.0,0.0,0.0,376488,...,-1.0,-1.0,-1.0,-1.0,-1.0,2.8.0+cu129,1.20.0,2025.2.0-19140-c01cd93e24d-releases/2025/2,Windows-10-10.0.26100-SP0,MSIdeJosePadial
1,2025-08-15T14:13:34.225062,A,0,client,3,cpu,30.500085,17.931266,-1.0,376488,...,-1.0,-1.0,-1.0,-1.0,-1.0,2.8.0+cu129,1.20.0,2025.2.0-19140-c01cd93e24d-releases/2025/2,Windows-10-10.0.26100-SP0,MSIdeJosePadial
2,2025-08-15T14:13:37.353973,A,0,client,1,cpu,29.803939,17.57187,-1.0,376488,...,-1.0,-1.0,-1.0,-1.0,-1.0,2.8.0+cu129,1.20.0,2025.2.0-19140-c01cd93e24d-releases/2025/2,Windows-10-10.0.26100-SP0,MSIdeJosePadial
3,2025-08-15T14:13:38.888442,A,0,client,0,cpu,30.182808,17.781224,-1.0,376488,...,-1.0,-1.0,-1.0,-1.0,-1.0,2.8.0+cu129,1.20.0,2025.2.0-19140-c01cd93e24d-releases/2025/2,Windows-10-10.0.26100-SP0,MSIdeJosePadial
4,2025-08-15T14:13:39.004973,A,0,client,2,cpu,29.220233,17.926943,-1.0,376488,...,-1.0,-1.0,-1.0,-1.0,-1.0,2.8.0+cu129,1.20.0,2025.2.0-19140-c01cd93e24d-releases/2025/2,Windows-10-10.0.26100-SP0,MSIdeJosePadial
