# SynQc Run Bundle Analysis

This notebook loads an **exported SynQc run bundle** (JSON or CSV) and produces three analyses:

1. **KPI trends over time**
2. **Target comparisons**
3. **“Fidelity vs latency” tradeoff**

It’s intentionally **schema-tolerant**: it will flatten nested JSON (e.g. `kpis.fidelity`, `metrics.latency_ms`) and try to infer the timestamp and target columns.

## What is a “run bundle” here?

A “bundle” can be any of the following:

- A JSON file that is a **list of runs**, e.g. `[{...}, {...}]`
- A JSON file that is an **object with a list** under a common key like:
  - `runs`, `experiments`, `items`, or `data`
- A **directory** containing multiple `*.json` / `*.csv` files (they’ll be concatenated)

Each *run* ideally includes:

- a timestamp (e.g. `created_at`, `timestamp`, `completed_at`)
- a target identifier (e.g. `hardware_target`, `target`, `backend`)
- KPI values (e.g. `fidelity`, `latency_ms`)

> If your exports don’t match these names, no problem—this notebook will show you the detected columns and you can override the auto-detection in the config cell.


In [None]:
from __future__ import annotations

from pathlib import Path
import json
import math
import re
from typing import Any, Dict, Iterable, List, Optional, Tuple

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from IPython.display import display

pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 140)

# Optional: nicer inline resolution
plt.rcParams["figure.dpi"] = 130


## Configuration

Set `BUNDLE_PATH` to either:

- a **JSON/CSV file**, or
- a **directory** containing multiple JSON/CSV files.

If this notebook lives in `notebooks/`, a good convention is to store exports in `exports/` at repo root.


In [None]:
# Change this as needed:
# - If the notebook is in repo root, 'exports/' is typically correct.
# - If the notebook is in 'notebooks/', '../exports/' is typically correct.
_default_exports_dir = Path("exports") if Path("exports").exists() else Path("../exports")

BUNDLE_PATH = _default_exports_dir  # file or directory

# Optional overrides (leave as None to auto-detect):
TIME_COL_OVERRIDE: Optional[str] = None
TARGET_COL_OVERRIDE: Optional[str] = None
FIDELITY_COL_OVERRIDE: Optional[str] = None
LATENCY_COL_OVERRIDE: Optional[str] = None

# Time aggregation for trend plots (set to None to plot raw points):
TREND_RESAMPLE_RULE = "D"  # e.g. "H", "D", "W", "M" (hour/day/week/month)
TREND_AGG = "mean"         # "mean", "median", etc.


## Load & normalize bundle

The functions below:

- read JSON/CSV (or a directory of files),
- extract a list of runs,
- flatten nested structures into a tabular dataframe,
- infer likely time/target/KPI columns,
- create *canonical* columns when possible: `kpi_fidelity`, `kpi_latency_ms`


In [None]:
def _read_json(path: Path) -> Any:
    return json.loads(path.read_text(encoding="utf-8"))

def _read_csv(path: Path) -> pd.DataFrame:
    return pd.read_csv(path)

def _iter_input_files(path: Path) -> List[Path]:
    if path.is_file():
        return [path]
    if path.is_dir():
        files = sorted([p for p in path.rglob("*") if p.suffix.lower() in {".json", ".csv"}])
        if not files:
            raise FileNotFoundError(f"No .json/.csv files found under directory: {path.resolve()}")
        return files
    raise FileNotFoundError(f"Bundle path not found: {path.resolve()}")

def _extract_runs(obj: Any) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
    """Return (runs, bundle_meta)."""
    meta: Dict[str, Any] = {}
    if isinstance(obj, list):
        # list of runs
        runs = obj
    elif isinstance(obj, dict):
        # object wrapping list of runs
        for key in ("runs", "experiments", "items", "data"):
            if key in obj and isinstance(obj[key], list):
                runs = obj[key]
                meta = {k: v for k, v in obj.items() if k != key}
                break
        else:
            # maybe a single run
            runs = [obj]
    else:
        raise TypeError(f"Unsupported JSON top-level type: {type(obj)}")
    # Filter to dicts
    runs = [r for r in runs if isinstance(r, dict)]
    return runs, meta

def load_run_bundle(path: Path) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """Load a run bundle from JSON/CSV file(s) and return (df, meta)."""
    files = _iter_input_files(path)
    all_frames: List[pd.DataFrame] = []
    meta: Dict[str, Any] = {"source_files": [str(p) for p in files]}

    for p in files:
        if p.suffix.lower() == ".json":
            obj = _read_json(p)
            runs, bundle_meta = _extract_runs(obj)
            meta.setdefault("bundle_meta", []).append({"file": str(p), "meta": bundle_meta})
            df_part = pd.json_normalize(runs, sep=".")
            df_part["_source_file"] = str(p)
            all_frames.append(df_part)
        elif p.suffix.lower() == ".csv":
            df_part = _read_csv(p)
            df_part["_source_file"] = str(p)
            all_frames.append(df_part)
        else:
            continue

    if not all_frames:
        raise ValueError("No usable data found.")

    df = pd.concat(all_frames, ignore_index=True, sort=False)
    return df, meta

def pick_first_existing(columns: Iterable[str], candidates: Iterable[str]) -> Optional[str]:
    cols = set(columns)
    for c in candidates:
        if c in cols:
            return c
    return None

def infer_time_column(df: pd.DataFrame) -> Optional[str]:
    candidates = [
        "timestamp", "time", "datetime", "date",
        "created_at", "started_at", "completed_at", "finished_at",
        "createdAt", "startedAt", "completedAt",
        "server_time", "serverTime",
    ]
    # Also match common nested paths
    nested_candidates = [
        "meta.timestamp", "meta.created_at", "meta.completed_at",
        "run.timestamp", "run.created_at", "run.completed_at",
    ]
    col = pick_first_existing(df.columns, candidates + nested_candidates)
    if col:
        return col

    # Fallback heuristic: any column containing 'time' or ending in '_at'
    heur = [c for c in df.columns if re.search(r"(time|_at)$", c, re.IGNORECASE)]
    return heur[0] if heur else None

def infer_target_column(df: pd.DataFrame) -> Optional[str]:
    candidates = [
        "hardware_target", "target", "backend", "provider", "device",
        "hardware.target", "hardware.backend", "hardware.name",
        "target.name", "target_id", "backend_name",
        "provider_target", "qpu", "qpu_name",
    ]
    col = pick_first_existing(df.columns, candidates)
    if col:
        return col

    # Heuristic: contains 'target' or 'backend'
    heur = [c for c in df.columns if re.search(r"(target|backend|qpu)", c, re.IGNORECASE)]
    return heur[0] if heur else None

def infer_kpi_columns(df: pd.DataFrame) -> Dict[str, Optional[str]]:
    fidelity_candidates = [
        "kpi_fidelity", "fidelity",
        "kpis.fidelity", "kpi.fidelity", "metrics.fidelity", "results.fidelity",
        "kpis.fidelity_score", "kpis.fidelityScore",
    ]
    latency_candidates = [
        "kpi_latency_ms", "latency_ms", "latencyMs",
        "kpis.latency_ms", "kpis.latencyMs", "kpi.latency_ms",
        "metrics.latency_ms", "results.latency_ms",
        "duration_ms", "runtime_ms", "execution_time_ms",
        "latency", "duration", "runtime",
    ]
    return {
        "fidelity": pick_first_existing(df.columns, fidelity_candidates),
        "latency": pick_first_existing(df.columns, latency_candidates),
    }

def coerce_datetime(series: pd.Series) -> pd.Series:
    # pandas can parse many formats; errors become NaT
    dt = pd.to_datetime(series, errors="coerce", utc=True)
    # drop timezone for easier plotting
    try:
        dt = dt.dt.tz_convert(None)
    except Exception:
        pass
    return dt

def coerce_numeric(series: pd.Series) -> pd.Series:
    return pd.to_numeric(series, errors="coerce")

def prepare_dataframe(df_raw: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    df = df_raw.copy()
    info: Dict[str, Any] = {}

    # Decide time/target/KPI columns (allow override)
    time_col = TIME_COL_OVERRIDE or infer_time_column(df)
    target_col = TARGET_COL_OVERRIDE or infer_target_column(df)
    kpis = infer_kpi_columns(df)
    fidelity_col = FIDELITY_COL_OVERRIDE or kpis["fidelity"]
    latency_col = LATENCY_COL_OVERRIDE or kpis["latency"]

    info["time_col"] = time_col
    info["target_col"] = target_col
    info["fidelity_col"] = fidelity_col
    info["latency_col"] = latency_col

    # Coerce time
    if time_col and time_col in df.columns:
        df[time_col] = coerce_datetime(df[time_col])

    # Coerce likely numeric KPI columns
    for c in [fidelity_col, latency_col]:
        if c and c in df.columns:
            df[c] = coerce_numeric(df[c])

    # Create canonical KPI columns if we found something
    if fidelity_col and fidelity_col in df.columns:
        df["kpi_fidelity"] = df[fidelity_col]
    if latency_col and latency_col in df.columns:
        # If the chosen latency column looks like seconds, convert -> ms
        if re.search(r"(\bsec\b|_s$|seconds)", latency_col, re.IGNORECASE):
            df["kpi_latency_ms"] = df[latency_col] * 1000.0
            info["latency_units"] = "s (converted to ms)"
        else:
            df["kpi_latency_ms"] = df[latency_col]
            info["latency_units"] = "ms (assumed)"

    # Canonical target column name (optional)
    if target_col and target_col in df.columns:
        df["hardware_target_canonical"] = df[target_col].astype(str)
    else:
        df["hardware_target_canonical"] = "unknown"

    # Canonical time column name (optional)
    if time_col and time_col in df.columns:
        df["timestamp_canonical"] = df[time_col]
    else:
        df["timestamp_canonical"] = pd.NaT

    # Sort by time if possible
    if df["timestamp_canonical"].notna().any():
        df = df.sort_values("timestamp_canonical")

    return df, info


In [None]:
df_raw, bundle_meta = load_run_bundle(BUNDLE_PATH)
df, detected = prepare_dataframe(df_raw)

print("Loaded rows:", len(df))
print("Detected columns:", json.dumps(detected, indent=2))
print("Unique targets:", df["hardware_target_canonical"].nunique())
df.head(10)


## Inspect KPI columns

Below we list numeric columns to help you decide which KPIs you want to trend/compare.

If the auto-detection didn’t find your fidelity/latency columns, you can set `FIDELITY_COL_OVERRIDE` and/or `LATENCY_COL_OVERRIDE` in the config cell and re-run.


In [None]:
numeric_cols = [c for c in df.select_dtypes(include=["number"]).columns]
numeric_cols


In [None]:
# Quick summary (time range, counts per target)
if df["timestamp_canonical"].notna().any():
    print("Time range:",
          df["timestamp_canonical"].min(),
          "→",
          df["timestamp_canonical"].max())
else:
    print("No parsable timestamps found (timestamp_canonical is all NaT).")

display(df["hardware_target_canonical"].value_counts().head(20))
df.describe(include="all").T.head(30)


## Plot helpers

We keep the plotting functions small and matplotlib-only (no seaborn), so they work in minimal environments.


In [None]:
def plot_kpi_trend_over_time(
    df: pd.DataFrame,
    kpi_col: str,
    time_col: str = "timestamp_canonical",
    target_col: str = "hardware_target_canonical",
    resample_rule: Optional[str] = "D",
    agg: str = "mean",
    title: Optional[str] = None,
):
    data = df[[time_col, target_col, kpi_col]].dropna()
    if data.empty:
        print(f"No data to plot for {kpi_col}.")
        return

    # Ensure time is datetime
    data = data.copy()
    data[time_col] = pd.to_datetime(data[time_col], errors="coerce")
    data = data.dropna(subset=[time_col])
    if data.empty:
        print(f"No valid timestamps to plot for {kpi_col}.")
        return

    plt.figure()

    if resample_rule:
        data = data.set_index(time_col)
        for tgt, g in data.groupby(target_col):
            series = getattr(g[kpi_col].resample(resample_rule), agg)()
            if series.dropna().empty:
                continue
            plt.plot(series.index, series.values, label=str(tgt))
        plt.xlabel("Time")
        plt.ylabel(kpi_col)
        plt.title(title or f"{kpi_col} trend over time ({agg}, resample={resample_rule})")
        plt.legend(loc="best")
        plt.xticks(rotation=30, ha="right")
        plt.tight_layout()
        plt.show()
    else:
        # Plot raw points
        for tgt, g in data.groupby(target_col):
            plt.plot(g[time_col], g[kpi_col], marker="o", linestyle="-", label=str(tgt))
        plt.xlabel("Time")
        plt.ylabel(kpi_col)
        plt.title(title or f"{kpi_col} trend over time (raw)")
        plt.legend(loc="best")
        plt.xticks(rotation=30, ha="right")
        plt.tight_layout()
        plt.show()

def plot_target_comparison(
    df: pd.DataFrame,
    kpi_col: str,
    target_col: str = "hardware_target_canonical",
    agg: str = "mean",
    title: Optional[str] = None,
    top_n: Optional[int] = 20,
):
    data = df[[target_col, kpi_col]].dropna()
    if data.empty:
        print(f"No data to compare for {kpi_col}.")
        return

    grouped = data.groupby(target_col)[kpi_col].agg([agg, "count", "std"]).sort_values(agg, ascending=False)
    if top_n:
        grouped = grouped.head(top_n)

    plt.figure()
    plt.bar(grouped.index.astype(str), grouped[agg].values)
    plt.xlabel("Target")
    plt.ylabel(f"{agg}({kpi_col})")
    plt.title(title or f"Target comparison: {kpi_col} ({agg})")
    plt.xticks(rotation=45, ha="right")
    plt.tight_layout()
    plt.show()

    return grouped

def plot_fidelity_vs_latency(
    df: pd.DataFrame,
    fidelity_col: str = "kpi_fidelity",
    latency_col: str = "kpi_latency_ms",
    target_col: str = "hardware_target_canonical",
    title: str = "Fidelity vs latency",
):
    data = df[[target_col, fidelity_col, latency_col]].dropna()
    if data.empty:
        print("No data to plot fidelity vs latency.")
        return

    plt.figure()
    for tgt, g in data.groupby(target_col):
        plt.scatter(g[latency_col], g[fidelity_col], label=str(tgt), alpha=0.8)
    plt.xlabel(latency_col)
    plt.ylabel(fidelity_col)
    plt.title(title)
    plt.legend(loc="best")
    plt.tight_layout()
    plt.show()

def compute_pareto_frontier_latency_min_fidelity_max(
    df: pd.DataFrame,
    fidelity_col: str = "kpi_fidelity",
    latency_col: str = "kpi_latency_ms",
) -> pd.DataFrame:
    """Pareto frontier where we want *lower latency* and *higher fidelity*.

    Returns points on the frontier sorted by latency ascending.
    """
    data = df[[latency_col, fidelity_col]].dropna().sort_values(latency_col)
    if data.empty:
        return data

    best_fid = -np.inf
    frontier_rows = []
    for _, row in data.iterrows():
        fid = row[fidelity_col]
        lat = row[latency_col]
        if fid > best_fid:
            frontier_rows.append((lat, fid))
            best_fid = fid

    frontier = pd.DataFrame(frontier_rows, columns=[latency_col, fidelity_col])
    return frontier


## 1) KPI trends over time

These plots show how KPIs evolve over time, optionally resampled to a regular cadence (daily by default).

If you have many targets, consider narrowing to a subset first.


In [None]:
# Filter to a subset of targets (optional)
# example: keep only the 5 most common targets
top_targets = df["hardware_target_canonical"].value_counts().head(5).index.tolist()
df_trend = df[df["hardware_target_canonical"].isin(top_targets)].copy()

# Plot fidelity trend (if available)
if "kpi_fidelity" in df_trend.columns and df_trend["kpi_fidelity"].notna().any():
    plot_kpi_trend_over_time(
        df_trend,
        kpi_col="kpi_fidelity",
        resample_rule=TREND_RESAMPLE_RULE,
        agg=TREND_AGG,
        title="KPI trend: Fidelity",
    )
else:
    print("kpi_fidelity not available. Set FIDELITY_COL_OVERRIDE in the config cell and re-run.")

# Plot latency trend (if available)
if "kpi_latency_ms" in df_trend.columns and df_trend["kpi_latency_ms"].notna().any():
    plot_kpi_trend_over_time(
        df_trend,
        kpi_col="kpi_latency_ms",
        resample_rule=TREND_RESAMPLE_RULE,
        agg=TREND_AGG,
        title="KPI trend: Latency (ms)",
    )
else:
    print("kpi_latency_ms not available. Set LATENCY_COL_OVERRIDE in the config cell and re-run.")


## 2) Target comparisons

These charts compare KPI aggregates by target (mean by default).

You’ll also get a small table back with `mean/count/std` to use in reports.


In [None]:
comparison_tables = {}

if "kpi_fidelity" in df.columns and df["kpi_fidelity"].notna().any():
    comparison_tables["fidelity"] = plot_target_comparison(
        df, kpi_col="kpi_fidelity", agg="mean",
        title="Target comparison: Mean fidelity",
        top_n=20,
    )
else:
    print("Skipping fidelity comparison (kpi_fidelity missing).")

if "kpi_latency_ms" in df.columns and df["kpi_latency_ms"].notna().any():
    comparison_tables["latency_ms"] = plot_target_comparison(
        df, kpi_col="kpi_latency_ms", agg="mean",
        title="Target comparison: Mean latency (ms)",
        top_n=20,
    )
else:
    print("Skipping latency comparison (kpi_latency_ms missing).")

comparison_tables


## 3) Fidelity vs latency tradeoff

Scatter plot for each run (optionally grouped by target).  
At the bottom we also compute a **Pareto frontier** (lower latency, higher fidelity) across all runs.


In [None]:
if ("kpi_fidelity" in df.columns and df["kpi_fidelity"].notna().any()
    and "kpi_latency_ms" in df.columns and df["kpi_latency_ms"].notna().any()):

    plot_fidelity_vs_latency(df)

    frontier = compute_pareto_frontier_latency_min_fidelity_max(df)
    if not frontier.empty:
        plt.figure()
        plt.plot(frontier["kpi_latency_ms"], frontier["kpi_fidelity"], marker="o")
        plt.xlabel("kpi_latency_ms")
        plt.ylabel("kpi_fidelity")
        plt.title("Pareto frontier (lower latency, higher fidelity)")
        plt.tight_layout()
        plt.show()

        display(frontier.head(20))
else:
    print("Need both kpi_fidelity and kpi_latency_ms to plot tradeoff. Override columns in config and re-run.")


## Export a cleaned dataset (optional)

This can be useful to check into `exports/` or attach to an issue/PR.


In [None]:
# Pick exports directory relative to where this notebook lives
exports_dir = Path("exports") if Path("exports").exists() else Path("../exports")
exports_dir.mkdir(parents=True, exist_ok=True)

clean_path = exports_dir / "run_bundle_cleaned.csv"
df.to_csv(clean_path, index=False)
print("Wrote:", clean_path.resolve())


## Optional: generate a bundle from a running SynQc backend

If you have the SynQc backend running locally (default `http://localhost:8001`), you can pull recent experiments
and write them to `exports/run_bundle_from_api.json`.

This is useful if you haven’t implemented a dedicated “export bundle” feature yet.


In [None]:
# Uncomment to use:
# API_BASE = "http://localhost:8001"
# 
# try:
#     import requests
# except ImportError:
#     # If you're in a clean environment:
#     !pip -q install requests
#     import requests
# 
# recent = requests.get(f"{API_BASE}/experiments/recent", timeout=20).json()
# # tolerate multiple shapes
# runs = recent.get("experiments") if isinstance(recent, dict) else recent
# if runs is None and isinstance(recent, dict):
#     runs = recent.get("runs") or recent.get("items") or recent.get("data")
# 
# bundle = {
#     "generated_at": pd.Timestamp.utcnow().isoformat(),
#     "source": f"{API_BASE}/experiments/recent",
#     "runs": runs if isinstance(runs, list) else [runs],
# }
# 
# exports_dir = Path("exports") if Path("exports").exists() else Path("../exports")
# exports_dir.mkdir(parents=True, exist_ok=True)
# out_path = exports_dir / "run_bundle_from_api.json"
# out_path.write_text(json.dumps(bundle, indent=2), encoding="utf-8")
# print("Wrote:", out_path.resolve())
