# Drift Detection at Runtime (IBM Runtime local mode + devqubit)

This notebook is a **minimal, end-to-end** example of *runtime drift detection* for quantum workloads.

We:
1. Build a small **calibration-sensitive benchmark circuit**.
2. Establish a **baseline** run (your “known-good” reference).
3. Execute several **candidate** runs that emulate production conditions.
4. Use **devqubit** to:
   - capture artifacts (including a device snapshot/fingerprints),
   - compute a distribution distance (**TVD**),
   - enforce a simple **policy** (CI/CD-style verification),
   - generate a **JUnit** report for pipelines.

> Notes  
> - We use **IBM Runtime local testing mode** (`FakeManilaV2`) so the notebook runs without IBM credentials.  
> - In production you would point the same code at a real backend/session.


In [None]:
from __future__ import annotations

from importlib.metadata import entry_points


def has_adapter(name: str) -> bool:
    eps = entry_points().select(group="devqubit.adapters")
    return any(ep.name == name for ep in eps)


if not has_adapter("qiskit-runtime"):
    raise ImportError(
        "devqubit Qiskit Runtime adapter is not installed.\n"
        "Install with: pip install 'devqubit[qiskit-runtime]'"
    )

print("Qiskit Runtime adapter available!")

In [None]:
try:
    import qiskit_aer

    _ = qiskit_aer.__version__
    _AER_AVAILABLE = True
except ImportError:
    _AER_AVAILABLE = False

# --- Imports ---
from dataclasses import dataclass
from pathlib import Path
from typing import List, Mapping, Tuple
import random
import shutil

import numpy as np

from qiskit import QuantumCircuit
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager
from qiskit_ibm_runtime import SamplerV2
from qiskit_ibm_runtime.fake_provider import FakeManilaV2

if _AER_AVAILABLE:
    from qiskit_aer import AerSimulator
    from qiskit_aer.noise import NoiseModel, depolarizing_error

from devqubit import (
    track,
    wrap_backend,
    create_registry,
    create_store,
    diff,
    verify,
)

from devqubit.compare import VerifyPolicy
from devqubit.ci import write_junit

In [None]:
"""Configuration and local workspace."""

@dataclass(frozen=True)
class DriftDemoConfig:
    """Configuration for the drift demo."""

    project_name: str = "production_monitoring"
    workspace_dir: str = ".devqubit_drift_demo"

    seed: int = 42
    n_qubits: int = 3
    optimization_level: int = 2

    shots_baseline: int = 4096
    shots_candidates: Tuple[int, ...] = (256, 4096, 8192)
    top_k: int = 3

    tvd_max: float = 0.03

    drift_depol_1q: float = 0.01
    drift_depol_2q: float = 0.03


CFG = DriftDemoConfig()


def seed_everything(seed: int) -> None:
    """Seed Python and NumPy RNGs."""

    random.seed(seed)
    np.random.seed(seed)


seed_everything(CFG.seed)


# --- Fresh workspace ---
WORKSPACE = Path(CFG.workspace_dir)
if WORKSPACE.exists():
    shutil.rmtree(WORKSPACE)
WORKSPACE.mkdir(parents=True)

store = create_store(f"file://{WORKSPACE}/objects")
registry = create_registry(f"file://{WORKSPACE}")


# Local testing backend (no credentials required)
fake_backend = FakeManilaV2()
print(f"Backend snapshot: {fake_backend.name} ({fake_backend.num_qubits} qubits)")
print(f"Workspace: {WORKSPACE.resolve()}")

## 1) Benchmark circuit

We want a circuit that is *sensitive* to calibration quality (single-qubit rotations, entangling gates, and some depth).  
The output distribution becomes our “health signal” over time.


In [None]:
def create_benchmark_circuit(n_qubits: int) -> QuantumCircuit:
    """Create a small benchmark circuit for calibration monitoring."""

    qc = QuantumCircuit(n_qubits, name="calibration_benchmark")

    # Layer 1: prepare superposition
    for q in range(n_qubits):
        qc.h(q)

    # Layer 2: chain entanglement
    for q in range(n_qubits - 1):
        qc.cx(q, q + 1)

    # Layer 3: rotations (gate-error sensitive)
    for q in range(n_qubits):
        qc.rz(np.pi / 4, q)
        qc.rx(np.pi / 3, q)

    # Layer 4: more entanglement (depth)
    for q in range(n_qubits - 1):
        qc.cx(q, q + 1)

    qc.measure_all()
    return qc


def concentration_topk(counts: Mapping[str, int], k: int) -> float:
    """Compute concentration of the top-k most likely outcomes."""

    total = sum(counts.values())
    if total == 0:
        return 0.0
    top = sorted(counts.values(), reverse=True)[:k]
    return float(sum(top)) / float(total)


def shannon_entropy_bits(counts: Mapping[str, int], eps: float = 1e-12) -> float:
    """Compute Shannon entropy (in bits) of a discrete distribution."""

    total = sum(counts.values())
    if total == 0:
        return 0.0
    probs = np.array(list(counts.values()), dtype=float) / float(total)
    return float(-np.sum(probs * np.log2(probs + eps)))


def make_sampler(mode, default_shots: int) -> SamplerV2:
    """Create a SamplerV2 instance with a default shot budget."""

    sampler = SamplerV2(mode=mode)
    # In V2 primitives, default_shots is the simplest way to control sampling budget.
    sampler.options.default_shots = int(default_shots)
    return sampler


def make_drift_backend(
    depol_1q: float,
    depol_2q: float,
    seed: int,
):
    """Create an Aer backend that emulates 'worse calibration' (optional)."""
    if not _AER_AVAILABLE:
        return None

    noise_model = NoiseModel()

    # Typical IBM basis instructions after transpilation.
    one_qubit_instr = ["rz", "sx", "x", "id"]
    two_qubit_instr = ["cx"]

    noise_model.add_all_qubit_quantum_error(
        depolarizing_error(depol_1q, 1),
        one_qubit_instr,
    )
    noise_model.add_all_qubit_quantum_error(
        depolarizing_error(depol_2q, 2),
        two_qubit_instr,
    )

    return AerSimulator(noise_model=noise_model, seed_simulator=seed)


def run_and_log_benchmark(
    *,
    project: str,
    store,
    registry,
    sampler: SamplerV2,
    circuit_isa: QuantumCircuit,
    params: Mapping[str, object],
    tags: Mapping[str, str],
) -> str:
    """Execute the benchmark circuit and log a devqubit run."""

    with track(
        project=project,
        store=store,
        registry=registry,
    ) as run:
        tracked_sampler = wrap_backend(run, sampler)

        run.log_params(dict(params))
        run.set_tags(dict(tags))

        job = tracked_sampler.run([circuit_isa])
        pub_result = job.result()[0]
        counts = pub_result.data.meas.get_counts()

        run.log_metrics(
            {
                "concentration_topk": concentration_topk(counts, k=CFG.top_k),
                "unique_outcomes": int(len(counts)),
                "entropy_bits": shannon_entropy_bits(counts),
            }
        )

        return run.run_id

In [None]:
# Build + transpile circuit once
circuit = create_benchmark_circuit(CFG.n_qubits)
print(circuit.draw())

pm = generate_preset_pass_manager(
    backend=fake_backend,
    optimization_level=CFG.optimization_level,
)
circuit_isa = pm.run(circuit)

print("\nTranspiled (ISA) circuit:")
print(circuit_isa.draw(fold=500))

## 2) Establish a baseline (reference)

A baseline is the “known-good” distribution we compare against later.  
We store it in the devqubit registry as the project baseline.


In [None]:
baseline_sampler = make_sampler(
    mode=fake_backend,
    default_shots=CFG.shots_baseline,
)

baseline_id = run_and_log_benchmark(
    project=CFG.project_name,
    store=store,
    registry=registry,
    sampler=baseline_sampler,
    circuit_isa=circuit_isa,
    params={
        "circuit_name": "calibration_benchmark",
        "n_qubits": CFG.n_qubits,
        "shots": CFG.shots_baseline,
        "optimization_level": CFG.optimization_level,
        "backend": fake_backend.name,
    },
    tags={"role": "baseline", "scenario": "baseline"},
)

registry.set_baseline(CFG.project_name, baseline_id)
print(f"Baseline run set: {baseline_id}")

## 3) Inspect the captured device snapshot / fingerprints

devqubit stores a snapshot that helps you answer:  
**“Did the hardware configuration/calibration change between runs?”**


In [None]:
baseline_rec = registry.load(baseline_id)

print("Device Snapshot")
print("=" * 60)

device_snap = baseline_rec.record.get("device_snapshot", {})
if device_snap:
    print(f"Backend:     {device_snap.get('backend_name')}")
    print(f"Captured at: {device_snap.get('captured_at')}")
    print(f"Qubits:      {device_snap.get('num_qubits')}")

    cal = device_snap.get("calibration", {})
    if cal:
        print("\nCalibration summary (median):")
        for key in ("t1_us", "t2_us", "readout_error"):
            if key in cal:
                print(f"  {key:>13s}: {cal[key].get('median', 'N/A')}")
else:
    print("(No device snapshot found — depends on backend/mode.)")

print("\nFingerprints:")
for key, value in baseline_rec.fingerprints.items():
    print(f"  {key}: {value[:48]}...")

## 4) Candidate runs

We run the *same* transpiled circuit under different conditions:

- **shot variability** (common in practice when budgets change),
- optionally, a **worse calibration** scenario (if `qiskit-aer` is installed).

All candidates are tagged as `role=candidate`.


In [None]:
from dataclasses import dataclass


@dataclass(frozen=True)
class Scenario:
    """A production scenario to execute."""

    name: str
    description: str
    shots: int
    mode: object
    backend_name: str


scenarios: List[Scenario] = [
    Scenario(
        "low_shots",
        "Reduced shot budget",
        CFG.shots_candidates[0],
        fake_backend,
        fake_backend.name,
    ),
    Scenario(
        "stable",
        "Normal operation",
        CFG.shots_candidates[1],
        fake_backend,
        fake_backend.name,
    ),
    Scenario(
        "high_shots",
        "Higher precision run",
        CFG.shots_candidates[2],
        fake_backend,
        fake_backend.name,
    ),
]

# Optional: add a more realistic drift scenario using Aer noise model tweaks
drift_backend = make_drift_backend(
    depol_1q=CFG.drift_depol_1q,
    depol_2q=CFG.drift_depol_2q,
    seed=CFG.seed,
)

if drift_backend is not None:
    scenarios.append(
        Scenario(
            "calibration_drift",
            "Extra depolarizing noise (Aer-based drift)",
            CFG.shots_candidates[1],
            drift_backend,
            "aer_simulator",
        )
    )
    print("Added Aer-based drift scenario.")
else:
    print("Aer-based drift scenario skipped (qiskit-aer not installed).")


production_run_ids: List[str] = []

print("\nExecuting scenarios...")
print("-" * 60)

for sc in scenarios:
    sampler = make_sampler(mode=sc.mode, default_shots=sc.shots)

    run_id = run_and_log_benchmark(
        project=CFG.project_name,
        store=store,
        registry=registry,
        sampler=sampler,
        circuit_isa=circuit_isa,
        params={
            "circuit_name": "calibration_benchmark",
            "n_qubits": CFG.n_qubits,
            "shots": sc.shots,
            "optimization_level": CFG.optimization_level,
            "backend": sc.backend_name,
            "scenario_description": sc.description,
        },
        tags={"role": "candidate", "scenario": sc.name},
    )

    production_run_ids.append(run_id)
    print(f"{sc.name:>18s} -> run_id={run_id}")

## 5) Drift analysis (baseline vs candidates)

We compare distributions using **Total Variation Distance (TVD)**, which on discrete domains is:

$\mathrm{TVD}(P, Q) = \frac{1}{2} \sum_x |P(x) - Q(x)|$

devqubit computes TVD for you in `diff()` and `verify()`.


In [None]:
print("Drift Analysis")
print("=" * 60)

for run_id in production_run_ids:
    comp = diff(baseline_id, run_id, registry=registry, store=store)

    sc = registry.load(run_id).tags.get("scenario", "?")
    tvd = comp.tvd

    if tvd is None:
        print(f"{sc:>18s}: TVD not available")
        continue

    status = "✓ OK" if tvd <= CFG.tvd_max else "! DRIFT"
    print(f"{sc:>18s}: TVD={tvd:.4f}  {status}")

## 6) CI/CD-style verification (policy)

In pipelines you usually want an **automated pass/fail**.

We enforce:
- program must match (same circuit),
- fingerprints can differ (backend snapshot changes are part of the story),
- TVD must be under a threshold.


In [None]:
production_policy = VerifyPolicy(
    params_must_match=False,  # allow different shot counts
    program_must_match=True,  # same circuit is required
    fingerprint_must_match=False,  # hardware snapshots can change
    tvd_max=CFG.tvd_max,
)

print("CI/CD Verification")
print("=" * 60)

baseline_rec = registry.load(baseline_id)

for run_id in production_run_ids:
    candidate = registry.load(run_id)
    sc = candidate.tags.get("scenario", "?")

    result = verify(
        baseline_rec,
        candidate,
        store_baseline=store,
        store_candidate=store,
        policy=production_policy,
    )

    status = "✓ PASSED" if result.ok else "✗ FAILED"
    print(f"\n{sc}: {status}")

    if not result.ok:
        for failure in result.failures:
            print(f"  - {failure}")

## 7) Export a JUnit report

JUnit XML is a common interchange format for CI systems.


In [None]:
# Generate a JUnit report for the *last* scenario (as an example).
last_candidate = registry.load(production_run_ids[-1])

result = verify(
    baseline_rec,
    last_candidate,
    store_baseline=store,
    store_candidate=store,
    policy=production_policy,
)

junit_path = WORKSPACE / "test-results.xml"
write_junit(result, junit_path)

print(f"JUnit report written to: {junit_path}")
print("\n--- test-results.xml ---")
print(junit_path.read_text())

## 8) Debugging: full comparison details

When something fails, you usually want a human-readable diff.


In [None]:
comparison = diff(
    baseline_id,
    production_run_ids[-1],
    registry=registry,
    store=store,
)

print(comparison)

## 9) Simple monitoring dashboard

A tiny text dashboard that pulls all runs from the registry and verifies candidates against the current baseline.


In [None]:
all_runs = registry.list_runs(project=CFG.project_name)
baseline_info = registry.get_baseline(CFG.project_name)

print("Production Monitoring Dashboard")
print("=" * 80)
print(
    f"Baseline: {baseline_info['run_id'][:16]}... (set: {baseline_info['set_at'][:10]})\n"
)

print(
    f"{'Run ID':<20} {'Role':<10} {'Scenario':<18} {'Conc(topK)':>10} {'Entropy':>10} {'Status':>8}"
)
print("-" * 80)

for run_info in all_runs:
    rec = registry.load(run_info["run_id"])
    role = rec.tags.get("role", "N/A")
    scenario = rec.tags.get("scenario", "-")

    conc = float(rec.metrics.get("concentration_topk", 0.0))
    ent = float(rec.metrics.get("entropy_bits", 0.0))

    if role == "candidate":
        v = verify(
            baseline_rec,
            rec,
            store_baseline=store,
            store_candidate=store,
            policy=production_policy,
        )
        status = "✓" if v.ok else "✗"
    else:
        status = "-"

    print(
        f"{rec.run_id[:18]}.. {role:<10} {scenario:<18} {conc:>10.2%} {ent:>10.3f} {status:>8}"
    )

In [None]:
# Optional cleanup (keeps reruns tidy).
# If you'd like to inspect the registry/artifacts on disk, comment this out.
shutil.rmtree(WORKSPACE)
print(f"Workspace cleaned up: {WORKSPACE}")