# FedForge Convergence and Telemetry Analysis (I06)

This notebook reads generated simulation/profile artifacts and monitor telemetry exports,
then produces summary tables and analysis outputs under `artifacts/analysis/i06`.

In [None]:
from __future__ import annotations

import json
import sqlite3
from datetime import UTC, datetime
from pathlib import Path

import pandas as pd


def _repo_root() -> Path:
    cwd = Path.cwd()
    if (cwd / "artifacts").exists() and (cwd / "src").exists():
        return cwd
    if (cwd.parent / "artifacts").exists() and (cwd.parent / "src").exists():
        return cwd.parent
    raise RuntimeError("Could not resolve repository root from current working directory")


REPO_ROOT = _repo_root()
ANALYSIS_DIR = REPO_ROOT / "artifacts" / "analysis" / "i06"
ANALYSIS_DIR.mkdir(parents=True, exist_ok=True)

PROFILE_REFERENCE_PATHS = sorted((REPO_ROOT / "artifacts" / "profiles").glob("*/reference_metrics.json"))
SERVER_ROUND_METRICS_PATH = REPO_ROOT / "artifacts" / "server" / "round_metrics.jsonl"
MONITOR_DB_PATH = REPO_ROOT / "artifacts" / "monitor" / "monitor.db"

if not PROFILE_REFERENCE_PATHS:
    raise RuntimeError("No profile reference metrics found under artifacts/profiles")
if not SERVER_ROUND_METRICS_PATH.exists():
    raise RuntimeError(f"Missing server round metrics file: {SERVER_ROUND_METRICS_PATH}")
if not MONITOR_DB_PATH.exists():
    raise RuntimeError(f"Missing monitor SQLite database: {MONITOR_DB_PATH}")

print(f"repo_root={REPO_ROOT}")
print(f"analysis_dir={ANALYSIS_DIR}")
print(f"profile_reference_files={len(PROFILE_REFERENCE_PATHS)}")
print(f"server_round_metrics={SERVER_ROUND_METRICS_PATH}")
print(f"monitor_db={MONITOR_DB_PATH}")

In [None]:
profile_reports: list[dict[str, object]] = []
for path in PROFILE_REFERENCE_PATHS:
    payload = json.loads(path.read_text(encoding="utf-8"))
    payload["reference_path"] = str(path)
    profile_reports.append(payload)

profile_overview_rows: list[dict[str, object]] = []
for report in profile_reports:
    profile_overview_rows.append(
        {
            "profile_id": report["profile_id"],
            "dataset_id": report["dataset_id"],
            "resolved_device": report["resolved_device"],
            "baseline_rounds": report["baseline_rounds"],
            "baseline_clients": report["baseline_clients"],
            "repeat_runs": report["repeat_runs"],
            "repeatable": report["repeatable"],
            "max_loss_delta_vs_run_1": report["max_loss_delta_vs_run_1"],
            "reference_path": report["reference_path"],
        }
    )

profile_overview_df = pd.DataFrame(profile_overview_rows).sort_values("profile_id")
print("\n[Profile Overview]")
print(profile_overview_df.to_string(index=False))
profile_overview_df.to_csv(ANALYSIS_DIR / "profile_overview.csv", index=False)

In [None]:
convergence_rows: list[dict[str, object]] = []

for report in profile_reports:
    for run in report["runs"]:
        run_index = int(run["run_index"])
        losses = run["aggregate_eval_loss_by_round"]
        for round_idx, loss in enumerate(losses, start=1):
            convergence_rows.append(
                {
                    "profile_id": report["profile_id"],
                    "dataset_id": report["dataset_id"],
                    "run_index": run_index,
                    "round": round_idx,
                    "aggregate_eval_loss": float(loss),
                }
            )

convergence_df = pd.DataFrame(convergence_rows).sort_values(["profile_id", "run_index", "round"])
print("\n[Convergence Points]")
print(convergence_df.to_string(index=False))
convergence_df.to_csv(ANALYSIS_DIR / "convergence_points.csv", index=False)

final_loss_df = (
    convergence_df.sort_values("round")
    .groupby(["profile_id", "dataset_id", "run_index"], as_index=False)
    .tail(1)
    .rename(columns={"aggregate_eval_loss": "final_aggregate_eval_loss"})
    [["profile_id", "dataset_id", "run_index", "round", "final_aggregate_eval_loss"]]
)

print("\n[Final Loss per Run]")
print(final_loss_df.to_string(index=False))
final_loss_df.to_csv(ANALYSIS_DIR / "final_loss_by_run.csv", index=False)

In [None]:
client_rows: list[dict[str, object]] = []

for report in profile_reports:
    run_one = next((run for run in report["runs"] if int(run["run_index"]) == 1), None)
    if run_one is None:
        continue

    summary_path = Path(str(run_one["summary_path"]))
    if not summary_path.is_absolute():
        summary_path = REPO_ROOT / summary_path
    summary_payload = json.loads(summary_path.read_text(encoding="utf-8"))

    for round_payload in summary_payload["round_summaries"]:
        round_number = int(round_payload["round"])
        train_metrics = round_payload["client_train_metrics"]
        eval_metrics = round_payload["client_eval_metrics"]
        for client_id, client_eval in eval_metrics.items():
            client_train = train_metrics.get(client_id, {})
            client_rows.append(
                {
                    "profile_id": report["profile_id"],
                    "dataset_id": report["dataset_id"],
                    "round": round_number,
                    "client_id": client_id,
                    "eval_loss": float(client_eval.get("eval_loss", 0.0)),
                    "eval_accuracy": float(client_eval.get("eval_accuracy", 0.0)),
                    "train_loss": float(client_train.get("train_loss", 0.0)),
                    "train_runtime": float(client_train.get("train_runtime", 0.0)),
                }
            )

client_df = pd.DataFrame(client_rows).sort_values(["profile_id", "round", "client_id"])
print("\n[Per-Client Metrics from run_01 summaries]")
print(client_df.to_string(index=False))
client_df.to_csv(ANALYSIS_DIR / "client_metrics.csv", index=False)

client_agg_df = (
    client_df.groupby(["profile_id", "dataset_id"], as_index=False)
    .agg(
        mean_eval_loss=("eval_loss", "mean"),
        mean_eval_accuracy=("eval_accuracy", "mean"),
        mean_train_loss=("train_loss", "mean"),
    )
)

print("\n[Per-Profile Client Aggregates]")
print(client_agg_df.to_string(index=False))
client_agg_df.to_csv(ANALYSIS_DIR / "client_metrics_aggregated.csv", index=False)

In [None]:
server_round_rows: list[dict[str, object]] = []
for line in SERVER_ROUND_METRICS_PATH.read_text(encoding="utf-8").splitlines():
    if not line.strip():
        continue
    payload = json.loads(line)
    metrics = payload.get("aggregated_metrics", {})
    server_round_rows.append(
        {
            "round": int(payload.get("round", 0)),
            "num_clients": int(payload.get("num_clients", 0)),
            "num_examples": int(payload.get("num_examples", 0)),
            "eval_loss": float(metrics.get("eval_loss", 0.0)),
            "eval_accuracy": float(metrics.get("eval_accuracy", 0.0)),
            "train_loss": float(metrics.get("train_loss", 0.0)),
            "train_runtime": float(metrics.get("train_runtime", 0.0)),
            "ts": payload.get("ts", ""),
        }
    )

server_round_df = pd.DataFrame(server_round_rows).sort_values("round")
print("\n[Server Round Aggregates]")
print(server_round_df.to_string(index=False))
server_round_df.to_csv(ANALYSIS_DIR / "server_round_metrics.csv", index=False)

In [None]:
with sqlite3.connect(MONITOR_DB_PATH) as connection:
    event_counts_df = pd.read_sql_query(
        """
        SELECT run_id, event_type, COUNT(*) AS count
        FROM events
        GROUP BY run_id, event_type
        ORDER BY run_id, event_type
        """,
        connection,
    )
    latency_df = pd.read_sql_query(
        """
        SELECT role, event_type, AVG(latency_ms) AS avg_latency_ms, COUNT(latency_ms) AS samples
        FROM events
        WHERE latency_ms IS NOT NULL
        GROUP BY role, event_type
        ORDER BY role, event_type
        """,
        connection,
    )
    run_state_df = pd.read_sql_query(
        "SELECT run_id, state, updated_at FROM run_state ORDER BY run_id",
        connection,
    )

print("\n[Monitor Event Counts]")
print(event_counts_df.head(30).to_string(index=False))
print("\n[Monitor Event Latencies]")
print(latency_df.head(30).to_string(index=False))
print("\n[Monitor Run States]")
print(run_state_df.to_string(index=False))

event_counts_df.to_csv(ANALYSIS_DIR / "monitor_event_counts.csv", index=False)
latency_df.to_csv(ANALYSIS_DIR / "monitor_event_latencies.csv", index=False)
run_state_df.to_csv(ANALYSIS_DIR / "monitor_run_states.csv", index=False)

In [None]:
analysis_summary = {
    "generated_at_utc": datetime.now(UTC).isoformat(),
    "profile_count": int(len(profile_overview_df)),
    "datasets": sorted(profile_overview_df["dataset_id"].unique().tolist()),
    "all_profiles_repeatable": bool(profile_overview_df["repeatable"].all()),
    "max_profile_loss_delta": float(profile_overview_df["max_loss_delta_vs_run_1"].max()),
    "server_round_count": int(len(server_round_df)),
    "monitor_total_events": int(event_counts_df["count"].sum()) if not event_counts_df.empty else 0,
    "monitor_run_ids": sorted(event_counts_df["run_id"].unique().tolist()) if not event_counts_df.empty else [],
}

summary_path = ANALYSIS_DIR / "analysis_summary.json"
summary_path.write_text(json.dumps(analysis_summary, indent=2, sort_keys=True), encoding="utf-8")

print("\n[Analysis Summary]")
print(json.dumps(analysis_summary, indent=2, sort_keys=True))
print(f"summary_path={summary_path}")