# Cost estimator calibration

This notebook gathers runtime measurements for QuASAr backends and conversion paths, fits the
parameters used by :class:`quasar.cost_estimator.CostEstimator`, and reports calibration metrics.
It exports the fitted model as JSON together with per-sample predictions for traceability.


## Requirements

Install the simulation dependencies listed in `requirements.txt`. The notebook also expects
`pandas` and `scikit-learn`-style numerical tools (NumPy and Matplotlib are already required by
the project).


In [None]:
from __future__ import annotations

import itertools
import json
import math
import time
from dataclasses import dataclass
from pathlib import Path
import sys
from typing import Dict, Iterable, List, Optional

import numpy as np
try:
    import pandas as pd
except ImportError:  # pragma: no cover - optional dependency
    pd = None
import matplotlib.pyplot as plt

REPO_ROOT = Path.cwd()
if (REPO_ROOT / 'scripts').exists() and str(REPO_ROOT) not in sys.path:
    sys.path.insert(0, str(REPO_ROOT))

from qiskit.quantum_info import Clifford

from scripts.calibration.common import (
    build_clifford_tail,
    collect_metrics,
    split_at_first_nonclifford,
    count_ops,
)
from quasar.backends.sv import StatevectorBackend
from quasar.backends.dd import DecisionDiagramBackend, ddsim_available
from quasar.backends.tableau import TableauBackend, stim_available
from quasar.conversion.tab2sv import tableau_to_statevector
from quasar.cost_estimator import CostEstimator, CostParams

plt.rcParams.update({'figure.figsize': (7.5, 4.5), 'axes.grid': True})


In [None]:
@dataclass
class SampleSpec:
    n: int
    depth_cliff: int
    tail_layers: int
    angle_scale: float
    seed: int


def time_call(func, *args, **kwargs):
    start = time.perf_counter()
    error = None
    value = None
    try:
        value = func(*args, **kwargs)
        success = value is not None
    except Exception as exc:  # pragma: no cover - runtime dependency guard
        error = repr(exc)
        success = False
    elapsed = time.perf_counter() - start
    return elapsed, success, value, error


def generate_specs(
    *,
    qubits: Iterable[int],
    samples_per_setting: int,
    depth_cliff: int,
    tail_layers: int,
    angle_scale: float,
    seed: int,
) -> List[SampleSpec]:
    specs: List[SampleSpec] = []
    for index, n in enumerate(qubits):
        base_seed = seed + 31 * index
        for offset in range(samples_per_setting):
            specs.append(
                SampleSpec(
                    n=int(n),
                    depth_cliff=int(depth_cliff),
                    tail_layers=int(tail_layers),
                    angle_scale=float(angle_scale),
                    seed=base_seed + 17 * offset,
                )
            )
    return specs


def collect_calibration_samples(specs: Iterable[SampleSpec]) -> List[Dict[str, object]]:
    tableau = TableauBackend() if stim_available() else None
    sv_backend = StatevectorBackend()
    dd_backend = DecisionDiagramBackend() if ddsim_available() else None

    records: List[Dict[str, object]] = []
    for sample_id, spec in enumerate(specs):
        circuit = build_clifford_tail(
            n=spec.n,
            depth_cliff=spec.depth_cliff,
            tail_layers=spec.tail_layers,
            angle_scale=spec.angle_scale,
            seed=spec.seed,
        )
        metrics_full = collect_metrics(circuit)
        one_full, two_full = count_ops(circuit.data)
        base = {
            'sample_id': sample_id,
            'num_qubits': int(metrics_full.get('num_qubits', spec.n)),
            'num_gates': int(metrics_full.get('num_gates', one_full + two_full)),
            'one_qubit_gates': int(one_full),
            'two_qubit_gates': int(two_full),
            'rotation_count': int(metrics_full.get('rotation_count', 0)),
            'sparsity': float(metrics_full.get('sparsity', 0.0) or 0.0),
        }

        elapsed, success, _, error = time_call(sv_backend.run, circuit, want_statevector=True)
        records.append({**base, 'backend': 'sv', 'component': 'full', 'elapsed_s': elapsed, 'success': success, 'error': error})

        if dd_backend is not None:
            elapsed, success, _, error = time_call(dd_backend.run, circuit, want_statevector=False)
            records.append({**base, 'backend': 'dd', 'component': 'full', 'elapsed_s': elapsed, 'success': success, 'error': error})

        split = split_at_first_nonclifford(circuit)
        if split is None:
            continue

        prefix_metrics = collect_metrics(split.prefix)
        tail_metrics = collect_metrics(split.tail)
        one_pre, two_pre = count_ops(split.prefix.data)
        one_tail, two_tail = count_ops(split.tail.data)

        prefix_base = {
            'sample_id': sample_id,
            'num_qubits': int(prefix_metrics.get('num_qubits', spec.n)),
            'num_gates': int(prefix_metrics.get('num_gates', one_pre + two_pre)),
            'one_qubit_gates': int(one_pre),
            'two_qubit_gates': int(two_pre),
            'rotation_count': int(prefix_metrics.get('rotation_count', 0)),
            'sparsity': float(prefix_metrics.get('sparsity', 0.0) or 0.0),
        }
        tail_base = {
            'sample_id': sample_id,
            'num_qubits': int(tail_metrics.get('num_qubits', spec.n)),
            'num_gates': int(tail_metrics.get('num_gates', one_tail + two_tail)),
            'one_qubit_gates': int(one_tail),
            'two_qubit_gates': int(two_tail),
            'rotation_count': int(tail_metrics.get('rotation_count', 0)),
            'sparsity': float(tail_metrics.get('sparsity', 0.0) or 0.0),
        }

        prefix_state = None
        if tableau is not None:
            elapsed, success, prefix_state, error = time_call(tableau.run, split.prefix, want_statevector=True)
            records.append({**prefix_base, 'backend': 'tableau', 'component': 'prefix', 'elapsed_s': elapsed, 'success': success, 'error': error})
        if prefix_state is None:
            try:
                prefix_state = tableau_to_statevector(Clifford(split.prefix))
            except Exception as exc:  # pragma: no cover - runtime dependency guard
                prefix_state = None
                records.append({**prefix_base, 'backend': 'conversion', 'component': 'conversion', 'elapsed_s': None, 'success': False, 'error': repr(exc)})
        else:
            elapsed = 0.0
            records.append({**prefix_base, 'backend': 'conversion', 'component': 'conversion', 'elapsed_s': elapsed, 'success': True, 'error': None})

        if prefix_state is not None:
            elapsed, success, _, error = time_call(sv_backend.run, split.tail, initial_state=prefix_state, want_statevector=True)
            records.append({**tail_base, 'backend': 'sv', 'component': 'tail', 'elapsed_s': elapsed, 'success': success, 'error': error})
            if dd_backend is not None:
                elapsed, success, _, error = time_call(dd_backend.run, split.tail, initial_state=None, want_statevector=False)
                records.append({**tail_base, 'backend': 'dd', 'component': 'tail', 'elapsed_s': elapsed, 'success': success, 'error': error})

    return records


In [None]:
specs = generate_specs(qubits=[4, 6, 8], samples_per_setting=3, depth_cliff=120, tail_layers=12, angle_scale=0.3, seed=11)
records = collect_calibration_samples(specs)
len(records)


In [None]:
if pd is not None:
    calibration_df = pd.DataFrame(records)
    display(calibration_df.head())
else:
    calibration_df = records
    calibration_df[:3]


In [None]:
def compute_regression_metrics(actual: np.ndarray, predicted: np.ndarray) -> Dict[str, float]:
    mask = actual > 0
    filtered_actual = actual[mask]
    filtered_pred = predicted[mask]
    if filtered_actual.size == 0:
        return {'mape': float('nan'), 'r2': float('nan')}
    mape = float(np.mean(np.abs((filtered_actual - filtered_pred) / filtered_actual)))
    sse = float(np.sum((filtered_actual - filtered_pred) ** 2))
    sst = float(np.sum((filtered_actual - np.mean(filtered_actual)) ** 2))
    r2 = float(1.0 - sse / sst) if sst > 0 else float('nan')
    return {'mape': mape, 'r2': r2}


def fit_statevector(df) -> Dict[str, float]:
    subset = df[(df['backend'] == 'sv') & df['success']]
    amps = 2 ** subset['num_qubits']
    amp_ops_one = amps * subset['one_qubit_gates']
    amp_ops_two = amps * subset['two_qubit_gates']
    X = np.column_stack([amp_ops_one, amp_ops_two])
    y = subset['elapsed_s'].to_numpy(dtype=float)
    coeffs, *_ = np.linalg.lstsq(X, y, rcond=None)
    scale_one, scale_two = coeffs
    predictions = X @ coeffs
    metrics = compute_regression_metrics(y, predictions)
    factor = float(scale_two / scale_one) if scale_one else float('nan')
    return {
        'time_scale': float(scale_one),
        'sv_twoq_factor': factor,
        'mape': metrics['mape'],
        'r2': metrics['r2'],
        'num_samples': int(len(subset)),
        'predictions': predictions,
        'targets': y,
        'index': subset.index.to_numpy(),
    }


def fit_tableau(df) -> Dict[str, float]:
    subset = df[(df['backend'] == 'tableau') & df['success']]
    gate_weight = subset['one_qubit_gates'] + subset['two_qubit_gates']
    X = gate_weight.to_numpy(dtype=float)[:, None]
    y = subset['elapsed_s'].to_numpy(dtype=float)
    coeffs, *_ = np.linalg.lstsq(X, y, rcond=None)
    unit_cost = coeffs[0] if coeffs.size else float('nan')
    predictions = (X.flatten() * unit_cost)
    metrics = compute_regression_metrics(y, predictions)
    return {
        'tableau_prefix_unit_cost': float(unit_cost),
        'mape': metrics['mape'],
        'r2': metrics['r2'],
        'num_samples': int(len(subset)),
        'predictions': predictions,
        'targets': y,
        'index': subset.index.to_numpy(),
    }


def fit_conversion(df) -> Dict[str, float]:
    subset = df[(df['backend'] == 'conversion') & df['success']]
    amps = 2 ** subset['num_qubits']
    X = amps.to_numpy(dtype=float)[:, None]
    y = subset['elapsed_s'].to_numpy(dtype=float)
    coeffs, *_ = np.linalg.lstsq(X, y, rcond=None)
    factor = coeffs[0] if coeffs.size else float('nan')
    predictions = (X.flatten() * factor)
    metrics = compute_regression_metrics(y, predictions)
    return {
        'conv_amp_ops_factor': float(factor),
        'mape': metrics['mape'],
        'r2': metrics['r2'],
        'num_samples': int(len(subset)),
        'predictions': predictions,
        'targets': y,
        'index': subset.index.to_numpy(),
    }


def fit_decision_diagram(df) -> Dict[str, float]:
    subset = df[(df['backend'] == 'dd') & df['success']]
    if subset.empty:
        return {
            'dd_gate_node_factor': float('nan'),
            'dd_frontier_weight': float('nan'),
            'dd_rotation_weight': float('nan'),
            'dd_twoq_weight': float('nan'),
            'dd_sparsity_discount': float('nan'),
            'dd_base_cost': float('nan'),
            'mape': float('nan'),
            'r2': float('nan'),
            'num_samples': 0,
            'predictions': np.array([]),
            'targets': np.array([]),
            'index': np.array([], dtype=int),
        }
    frontier = subset['num_qubits'].to_numpy(dtype=float)
    num_gates = subset['num_gates'].to_numpy(dtype=float)
    base_nodes = frontier * np.maximum(1.0, np.log2(frontier + 1.0))
    gate_factor = np.maximum(1.0, np.log2(num_gates + 1.0))
    M = num_gates * base_nodes * gate_factor
    mask = M > 0
    M = M[mask]
    if M.size == 0:
        return {
            'dd_gate_node_factor': float('nan'),
            'dd_frontier_weight': float('nan'),
            'dd_rotation_weight': float('nan'),
            'dd_twoq_weight': float('nan'),
            'dd_sparsity_discount': float('nan'),
            'dd_base_cost': float('nan'),
            'mape': float('nan'),
            'r2': float('nan'),
            'num_samples': 0,
            'predictions': np.array([]),
            'targets': np.array([]),
            'index': np.array([], dtype=int),
        }
    log_frontier = np.log2(frontier[mask] + 1.0)
    rot_density = subset['rotation_count'].to_numpy(dtype=float)[mask] / np.maximum(1.0, num_gates[mask])
    twoq_density = subset['two_qubit_gates'].to_numpy(dtype=float)[mask] / np.maximum(1.0, num_gates[mask])
    sparsity = np.clip(subset['sparsity'].to_numpy(dtype=float)[mask], 0.0, 1.0)
    X = np.column_stack([
        M,
        M * log_frontier,
        M * rot_density,
        M * twoq_density,
        -M * sparsity,
        np.ones_like(M),
    ])
    y = subset['elapsed_s'].to_numpy(dtype=float)[mask]
    coeffs, *_ = np.linalg.lstsq(X, y, rcond=None)
    alpha, alpha_wf, alpha_wr, alpha_wt, alpha_ws, base = coeffs
    predictions = X @ coeffs
    metrics = compute_regression_metrics(y, predictions)
    gate_node_factor = float(alpha)
    frontier_weight = float(alpha_wf / alpha) if alpha else float('nan')
    rotation_weight = float(alpha_wr / alpha) if alpha else float('nan')
    twoq_weight = float(alpha_wt / alpha) if alpha else float('nan')
    sparsity_discount = float(alpha_ws / alpha) if alpha else float('nan')
    return {
        'dd_gate_node_factor': gate_node_factor,
        'dd_frontier_weight': frontier_weight,
        'dd_rotation_weight': rotation_weight,
        'dd_twoq_weight': twoq_weight,
        'dd_sparsity_discount': sparsity_discount,
        'dd_base_cost': float(base),
        'mape': metrics['mape'],
        'r2': metrics['r2'],
        'num_samples': int(M.size),
        'predictions': predictions,
        'targets': y,
        'index': subset.index.to_numpy()[mask],
    }


In [None]:
if pd is None:
    raise RuntimeError('Install pandas to continue the calibration workflow.')

sv_fit = fit_statevector(calibration_df)
tableau_fit = fit_tableau(calibration_df)
conversion_fit = fit_conversion(calibration_df)
dd_fit = fit_decision_diagram(calibration_df)

model = {
    'statevector': {key: sv_fit[key] for key in ['time_scale', 'sv_twoq_factor', 'mape', 'r2', 'num_samples']},
    'tableau': {key: tableau_fit[key] for key in ['tableau_prefix_unit_cost', 'mape', 'r2', 'num_samples']},
    'conversion': {key: conversion_fit[key] for key in ['conv_amp_ops_factor', 'mape', 'r2', 'num_samples']},
    'decision_diagram': {
        key: dd_fit[key]
        for key in [
            'dd_gate_node_factor',
            'dd_frontier_weight',
            'dd_rotation_weight',
            'dd_twoq_weight',
            'dd_sparsity_discount',
            'dd_base_cost',
            'mape',
            'r2',
            'num_samples',
        ]
    },
}
model


In [None]:
pred_df = calibration_df.copy()
pred_df['predicted_s'] = np.nan
pred_df.loc[sv_fit['index'], 'predicted_s'] = sv_fit['predictions']
pred_df.loc[tableau_fit['index'], 'predicted_s'] = tableau_fit['predictions']
pred_df.loc[conversion_fit['index'], 'predicted_s'] = conversion_fit['predictions']
pred_df.loc[dd_fit['index'], 'predicted_s'] = dd_fit['predictions']
pred_df


In [None]:
results_dir = Path('final_results')
results_dir.mkdir(parents=True, exist_ok=True)
model_path = results_dir / 'cost_estimator_calibration.json'
pred_csv = results_dir / 'estimator_calibration_predictions.csv'
with model_path.open('w', encoding='utf-8') as handle:
    json.dump(model, handle, indent=2)
pred_df.to_csv(pred_csv, index=False)
model_path, pred_csv


In [None]:
fig, axes = plt.subplots(1, 3, figsize=(14, 4.5), sharey=True)
for ax, (backend, label) in zip(axes, [('sv', 'Statevector'), ('tableau', 'Tableau prefix'), ('conversion', 'Tableau→SV')]):
    subset = pred_df[pred_df['backend'] == backend]
    ax.scatter(subset['elapsed_s'], subset['predicted_s'])
    ax.set_title(label)
    ax.set_xlabel('Measured (s)')
    ax.set_ylabel('Predicted (s)')
    ax.plot([0, subset['elapsed_s'].max()], [0, subset['elapsed_s'].max()], 'k--', linewidth=1)
fig.tight_layout()
plot_path = Path('plots') / 'estimator_calibration_fit.png'
plot_path.parent.mkdir(parents=True, exist_ok=True)
fig.savefig(plot_path, dpi=150)
plot_path
