# Hybrid pipeline — Hardware-ready notebook

This notebook is a **complete, hardware-ready** implementation of the hybrid pipeline for spectral reconstruction and eigenvalue extraction using Qiskit Runtime. It includes: 

- Job batching for runtime submission (EstimatorV2 / SamplerV2 via `qiskit_ibm_runtime.Session`)
- Readout calibration and assignment-matrix correction
- Zero-noise extrapolation (ZNE) scaffolding with **global** and **local** gate-folding helpers and polynomial extrapolation
- Helpers to submit Hadamard-test / Trotterized controlled-evolution circuits and QPE circuits via Runtime
- Orchestration that collects results, applies readout correction and ZNE extrapolation, and returns calibrated overlaps / expectation values suitable for SBQD / FFT / QPE

**Warning & setup**: this notebook will run real hardware jobs if `RUN_MODE='hardware'`. Configure your IBM token with `QiskitRuntimeService.save_account(...)` before running hardware cells. Test everything in `RUN_MODE='local'` first.


## Install required packages (Colab / QBraid)
Run this cell if packages are missing. On Colab this may take a minute.

In [None]:
!pip install --quiet qiskit qiskit-aer qiskit-ibm-runtime torch scipy numpy matplotlib nbformat

In [None]:
# Imports
import os, math, time, json
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch import nn
from scipy.linalg import expm, eig, fractional_matrix_power
from scipy.signal import find_peaks

from qiskit import QuantumCircuit
from qiskit_aer import AerSimulator
from qiskit.quantum_info import Operator, SparsePauliOp
from qiskit.primitives import Estimator, Sampler
from qiskit.extensions import UnitaryGate
from qiskit.algorithms.optimizers import COBYLA

# runtime imports
try:
    from qiskit_ibm_runtime import QiskitRuntimeService, EstimatorV2, SamplerV2, Session
    RUNTIME_AVAILABLE = True
except Exception:
    QiskitRuntimeService = None
    EstimatorV2 = None
    SamplerV2 = None
    Session = None
    RUNTIME_AVAILABLE = False

print('Imports OK; runtime available =', RUNTIME_AVAILABLE)

In [None]:
# Config
RUN_MODE = 'local'   # 'local' or 'hardware'
SAVE_DIR = 'results'
os.makedirs(SAVE_DIR, exist_ok=True)

N_TRUNC = 8
N_QUBITS = int(np.ceil(np.log2(N_TRUNC)))
DIM = 2 ** N_QUBITS

KAN_INITIAL = 20
KAN_ITERS = 4
KAN_CAND_PER_ITER = 8
KAN_EPOCHS = 150
KAN_M_UNITS = max(40, 6 * (2 * N_QUBITS))
VQE_REFINES = 3
VQE_REFINED_ITERS = 120
KRYLOV_DIM = 5

T_MAX = 6.0
N_TIME = 256
DELTA_T = T_MAX / N_TIME

QPE_COUNT_QUBITS = 6
QPE_SHOTS = 1024

RUNTIME_BACKEND = 'ibm_perth'
TROTTER_STEPS = 4

ZNE_LEVELS = [1,3,5]

SEED = 123
np.random.seed(SEED)

print('Config set: N_QUBITS=', N_QUBITS, 'DIM=', DIM)

## Configure IBM Quantum Runtime (one-time)
Uncomment and run the `save_account` line, then run the next cell to create a session. **Do not share your token.**

In [None]:
# from qiskit_ibm_runtime import QiskitRuntimeService
# QiskitRuntimeService.save_account(channel='ibm_cloud', token='YOUR_IBM_TOKEN')

if RUN_MODE == 'hardware':
    if not RUNTIME_AVAILABLE:
        raise RuntimeError('qiskit_ibm_runtime not available; install package and configure token')
    service = QiskitRuntimeService()
    backend = service.backend(RUNTIME_BACKEND)
    print('Using backend:', backend.name())
else:
    service = None
    backend = None
    print('Running in local simulation mode')


## Build truncated Hamiltonian (Berry–Keating style) and pad to qubit dimension

In [None]:
def ladder_ops(N):
    a = np.zeros((N,N), dtype=complex)
    for n in range(N-1):
        a[n,n+1] = np.sqrt(n+1)
    return a, a.conj().T

def build_H_small(N_trunc):
    a, adag = ladder_ops(N_trunc)
    x = (a + adag) / np.sqrt(2)
    p = 1j * (adag - a) / np.sqrt(2)
    H = 0.5 * (x @ p + p @ x)
    H = 0.5 * (H + H.conj().T)
    return H

H_small = build_H_small(N_TRUNC)
H_pad = np.zeros((DIM, DIM), dtype=complex)
H_pad[:N_TRUNC, :N_TRUNC] = H_small
H_op = Operator(H_pad)

cref_vals, _ = eig(H_small)
cref_vals = np.real_if_close(cref_vals)
cref_vals.sort()
print('Classical lowest eigenvalues (preview):', np.round(cref_vals[:min(8,len(cref_vals))],6))

## Ansatz & energy evaluation (Estimator / local simulator)

In [None]:
n_params = 2 * N_QUBITS

def ansatz_circuit(theta):
    qc = QuantumCircuit(N_QUBITS)
    for i in range(N_QUBITS):
        qc.ry(float(theta[i]), i)
        qc.rz(float(theta[i + N_QUBITS]), i)
    for i in range(N_QUBITS - 1):
        qc.cx(i, i+1)
    return qc

if RUN_MODE == 'local':
    aer_sim = AerSimulator(method='statevector')
    estimator_local = Estimator(aer_sim)
else:
    estimator_local = None

def eval_energy_theta(theta, use_runtime=False, session=None):
    qc = ansatz_circuit(theta)
    if use_runtime:
        est = EstimatorV2(session=session)
        res = est.run([qc], [H_op], shots=1024).result()
        return float(res.values[0].real)
    else:
        res = estimator_local.run([qc], [H_op]).result()
        return float(res.values[0].real)

print('Ansatz ready; eval_energy_theta available')

## KAN surrogate (PyTorch) — active sampling helpers

In [None]:
class KANSurrogate(nn.Module):
    def __init__(self, in_dim, m_units=80, uni_hidden=12):
        super().__init__()
        self.inner = nn.Linear(in_dim, m_units)
        self.uni_w = nn.Parameter(torch.randn(m_units, uni_hidden) * 0.01)
        self.uni_b = nn.Parameter(torch.zeros(m_units, uni_hidden))
        self.uni_out = nn.Parameter(torch.randn(m_units) * 0.01)
        self.outer = nn.Linear(m_units, 1)
    def forward(self, x):
        z = self.inner(x)
        z_exp = z.unsqueeze(-1)
        hidden = torch.tanh(z_exp * self.uni_w.unsqueeze(0) + self.uni_b.unsqueeze(0))
        hid_mean = hidden.mean(dim=-1)
        uni = hid_mean * self.uni_out.unsqueeze(0)
        out = self.outer(uni)
        return out.squeeze(-1)

def train_kan(model, X, y, epochs=100, lr=1e-3):
    model.train()
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    X_t = torch.tensor(X, dtype=torch.float32)
    y_t = torch.tensor(y, dtype=torch.float32)
    for _ in range(epochs):
        opt.zero_grad()
        pred = model(X_t)
        loss = torch.mean((pred - y_t)**2)
        loss.backward(); opt.step()

def find_minima_kan(model, restarts=8, steps=80, lr=0.2):
    model.eval()
    candidates = []
    for r in range(restarts):
        theta_var = torch.tensor(np.random.uniform(-np.pi, np.pi, size=(1, n_params)), dtype=torch.float32, requires_grad=True)
        opt_x = torch.optim.Adam([theta_var], lr=lr)
        for s in range(steps):
            opt_x.zero_grad()
            pred = model(theta_var)
            pred.backward(); opt_x.step()
            with torch.no_grad():
                theta_var.clamp_(-4*np.pi, 4*np.pi)
        candidates.append(theta_var.detach().cpu().numpy().reshape(-1))
    return candidates

print('KAN helpers ready')

In [None]:
def run_kan_active():
    Theta = []
    Evals = []
    print('\nCollecting initial samples...')
    for i in range(KAN_INITIAL):
        th = np.random.uniform(-np.pi, np.pi, size=(n_params,))
        e = eval_energy_theta(th)
        Theta.append(th); Evals.append(e)
        if i % 5 == 0:
            print(f'  sample {i} energy {e:.6f}')
    Theta = np.array(Theta); Evals = np.array(Evals)
    kan = KANSurrogate(n_params, m_units=KAN_M_UNITS).to('cpu')
    for it in range(KAN_ITERS):
        print(f'\nKAN iter {it+1}/{KAN_ITERS}, dataset size {len(Theta)}')
        train_kan(kan, Theta, Evals, epochs=KAN_EPOCHS)
        candidates = find_minima_kan(kan, restarts=KAN_CAND_PER_ITER)
        for j, cand in enumerate(candidates):
            e = eval_energy_theta(cand)
            Theta = np.vstack([Theta, cand]); Evals = np.hstack([Evals, e])
            print(f'  cand {j} energy {e:.6f}')
        for _ in range(2):
            rth = np.random.uniform(-np.pi, np.pi, size=(n_params,))
            rv = eval_energy_theta(rth)
            Theta = np.vstack([Theta, rth]); Evals = np.hstack([Evals, rv])
            print(f'  exploratory sample energy {rv:.6f}')
    best_idx = int(np.argmin(Evals))
    print('KAN best energy', Evals[best_idx])
    return Theta, Evals, kan

print('KAN active function defined')

In [None]:
def run_vqe_refine(Theta, Evals):
    print('\nRunning short VQE (COBYLA)')
    seed_idxs = np.argsort(Evals)[:VQE_REFINES]
    refined = []
    refined_vals = []
    for idx in seed_idxs:
        init = Theta[idx]
        cobyla = COBYLA(maxiter=VQE_REFINED_ITERS)
        xopt, val, _ = cobyla.optimize(num_vars=n_params, objective_function=lambda x: eval_energy_theta(x), initial_point=init)
        refined.append(xopt); refined_vals.append(val)
        print(f'  refined seed {idx} val {val:.6f}')
    return refined, refined_vals

print('VQE refine ready')

In [None]:
def build_krylov_blocks(refined_thetas):
    print('\nBuilding Krylov blocks')
    krylov_states = []
    for theta in refined_thetas:
        qc = ansatz_circuit(theta)
        sv = aer_sim.run(qc).result().get_statevector(qc) if RUN_MODE == 'local' else None
        if RUN_MODE == 'local':
            v0 = sv[:N_TRUNC]; v0 = v0 / np.linalg.norm(v0)
        else:
            raise NotImplementedError('Hardware mode: implement overlap measurements for Krylov basis')
        block = [v0]
        for k in range(1, KRYLOV_DIM):
            v = H_small @ block[-1]
            for prev in block:
                v = v - np.vdot(prev, v) * prev
            normv = np.linalg.norm(v)
            if normv < 1e-12: break
            block.append(v / normv)
        for vec in block:
            emb = np.zeros(DIM, dtype=complex); emb[:N_TRUNC] = vec; emb /= np.linalg.norm(emb)
            krylov_states.append(emb)
    K = len(krylov_states)
    print('Total krylov vectors', K)
    S = np.zeros((K,K), dtype=complex); Hproj = np.zeros_like(S)
    for i in range(K):
        for j in range(K):
            S[i,j] = np.vdot(krylov_states[i][:N_TRUNC], krylov_states[j][:N_TRUNC])
            Hproj[i,j] = np.vdot(krylov_states[i][:N_TRUNC], H_small @ krylov_states[j][:N_TRUNC])
    S_inv_sqrt = fractional_matrix_power(S, -0.5)
    H_eff = S_inv_sqrt @ Hproj @ S_inv_sqrt
    eigvals_k, eigvecs_k = eig(H_eff)
    eigvals_k = np.real_if_close(eigvals_k); eigvals_k.sort()
    print('Krylov eigenvalues preview', np.round(eigvals_k[:min(12,len(eigvals_k))],6))
    return krylov_states, S, Hproj, eigvals_k, eigvecs_k, S_inv_sqrt

print('Krylov builder ready')

In [None]:
def compute_Ct_via_statevector(psi0, U_step, n_time=N_TIME):
    C = np.zeros(n_time, dtype=complex)
    cur = psi0.copy()
    for n in range(n_time):
        C[n] = np.vdot(psi0, cur)
        cur = U_step @ cur
    return C

print('FFT helper ready')

In [None]:
def build_readout_calibration_circuits(n_qubits):
    circuits = []
    for idx in range(2**n_qubits):
        qc = QuantumCircuit(n_qubits, n_qubits)
        b = format(idx, f'0{n_qubits}b')
        for q, bit in enumerate(reversed(b)):
            if bit == '1': qc.x(q)
        qc.measure(range(n_qubits), range(n_qubits))
        circuits.append(qc)
    return circuits

def compute_assignment_matrix(counts_list, n_qubits, shots):
    dim = 2**n_qubits
    A = np.zeros((dim, dim), dtype=float)
    for i, counts in enumerate(counts_list):
        for bitstr, c in counts.items():
            j = int(bitstr, 2)
            A[i, j] = c / shots
    return A

print('Readout calibration builders ready')

In [None]:
def global_fold_circuit(circ, fold_factor=3):
    if fold_factor == 1: return circ.copy()
    circ_f = QuantumCircuit(*circ.num_qubits, *circ.num_clbits)
    for k in range(fold_factor):
        if k % 2 == 0:
            circ_f.compose(circ, inplace=True)
        else:
            circ_f.compose(circ.inverse(), inplace=True)
    return circ_f

def local_fold_circuit(circ, fold_factor=3):
    if fold_factor == 1: return circ.copy()
    circ_f = QuantumCircuit(*circ.num_qubits, *circ.num_clbits)
    for inst, qargs, cargs in circ.data:
        for _ in range(fold_factor):
            circ_f.append(inst, qargs, cargs)
    return circ_f

print('Folding helpers ready')

In [None]:
def richardson_extrapolation(scalings, values, method='linear'):
    x = np.array(scalings, dtype=float)
    y = np.array(values, dtype=float)
    if method == 'linear':
        coefs = np.polyfit(x, y, 1)
    else:
        coefs = np.polyfit(x, y, 2)
    return np.polyval(coefs, 0.0)

print('ZNE extrapolation helper ready')

In [None]:
def pauli_decompose(H_mat):
    basis = ['I','X','Y','Z']
    pauli_terms = []
    for idx in range(4 ** N_QUBITS):
        label = ''
        tmp = idx
        for _ in range(N_QUBITS):
            label = basis[tmp % 4] + label
            tmp //= 4
        sp = SparsePauliOp.from_list([(label, 1.0)])
        mat = sp.to_matrix()
        c = np.trace(mat.conj().T @ H_mat) / (2 ** N_QUBITS)
        if abs(c) > 1e-12:
            pauli_terms.append((label, float(c)))
    return pauli_terms

def pauli_term_to_exp_circ(label, angle):
    qc = QuantumCircuit(N_QUBITS)
    for q, p in enumerate(reversed(label)):
        if p == 'X': qc.h(q)
        elif p == 'Y': qc.sdg(q); qc.h(q)
    for q in range(1, N_QUBITS): qc.cx(q, 0)
    qc.rz(2*angle, 0)
    for q in range(1, N_QUBITS): qc.cx(q, 0)
    for q, p in enumerate(reversed(label)):
        if p == 'X': qc.h(q)
        elif p == 'Y': qc.h(q); qc.s(q)
    return qc

def trotterized_controlled_evolution(pauli_terms, t, r_steps):
    qc = QuantumCircuit(1 + N_QUBITS)
    dt = t / r_steps
    for _ in range(r_steps):
        for label, coeff in pauli_terms:
            angle = coeff * dt
            base = pauli_term_to_exp_circ(label, angle)
            qc.append(base.to_gate().control(1), [0] + list(range(1,1+N_QUBITS)))
    return qc

print('Pauli decomp & trotter builder ready')

In [None]:
def build_hadamard_test(prep_qc: QuantumCircuit, unitary_circ: QuantumCircuit, measure_imag=False):
    qc = QuantumCircuit(1 + N_QUBITS, 1)
    qc.h(0)
    if measure_imag: qc.sdg(0)
    qc.compose(prep_qc, qubits=list(range(1,1+N_QUBITS)), inplace=True)
    qc.append(unitary_circ.to_gate().control(1), [0] + list(range(1,1+N_QUBITS)))
    qc.h(0)
    qc.measure(0, 0)
    return qc

print('Hadamard test builder ready')

In [None]:
def submit_estimator_batch(circuits, observables=None, shots=1024, backend_name=RUNTIME_BACKEND, max_batch=20):
    results = []
    if RUN_MODE == 'local':
        for i in range(0, len(circuits), max_batch):
            batch = circuits[i:i+max_batch]
            res = estimator_local.run(batch, [H_op]*len(batch) if observables is None else observables[i:i+len(batch)]).result()
            for v in res.values: results.append(v)
        return results
    if not RUNTIME_AVAILABLE:
        raise RuntimeError('Runtime not available')
    with Session(service=service, backend=backend_name) as session:
        est = EstimatorV2(session=session)
        for i in range(0, len(circuits), max_batch):
            batch = circuits[i:i+max_batch]
            obs = None if observables is None else (observables[i:i+max_batch] if isinstance(observables, (list,tuple)) else [observables]*len(batch))
            run_res = est.run(batch, obs, shots=shots).result()
            for v in run_res.values:
                results.append(v)
    return results

print('Batch submit helper ready')

In [None]:
def measure_Ct_with_zne(prep_circ, base_unitary, t_steps_list, shots=2000, fold_levels=ZNE_LEVELS, backend_name=RUNTIME_BACKEND):
    if RUN_MODE == 'hardware':
        cal_circs = build_readout_calibration_circuits(N_QUBITS + 1)
        with Session(service=service, backend=backend_name) as session:
            sampler = SamplerV2(session=session)
            counts_res = sampler.run(cal_circs, shots=shots).result()
            counts_list = [c for c in counts_res.circuits_counts]
        A = compute_assignment_matrix(counts_list, N_QUBITS + 1, shots)
    else:
        A = None

    results = {}
    for tsteps in t_steps_list:
        re_vals = []
        im_vals = []
        scalings = []
        for fold in fold_levels:
            U_pow = QuantumCircuit(base_unitary.num_qubits)
            for _ in range(tsteps): U_pow.compose(base_unitary, inplace=True)
            U_fold = local_fold_circuit(U_pow, fold)
            had_re = build_hadamard_test(prep_circ, U_fold, measure_imag=False)
            had_im = build_hadamard_test(prep_circ, U_fold, measure_imag=True)
            res = submit_estimator_batch([had_re, had_im], shots=shots, backend_name=backend_name, max_batch=10)
            re_vals.append(float(res[0])); im_vals.append(float(res[1])); scalings.append(fold)
        re0 = richardson_extrapolation(scalings, re_vals, method='linear')
        im0 = richardson_extrapolation(scalings, im_vals, method='linear')
        results[tsteps] = {'raw': (scalings, re_vals, im_vals), 'extrapolated': (re0, im0)}
    return results

print('Hadamard + ZNE orchestration ready')

In [None]:
def measure_overlap_matrix_hardware(prep_circuits, shots=2000, backend_name=RUNTIME_BACKEND):
    K = len(prep_circuits)
    S = np.zeros((K,K), dtype=complex)
    if RUN_MODE == 'local':
        for i in range(K):
            for j in range(K):
                qc = prep_circuits[i].copy()
                qc.compose(prep_circuits[j].inverse(), inplace=True)
                sv = aer_sim.run(qc).result().get_statevector(qc)
                amp0 = sv[0]
                S[i,j] = amp0
                S[j,i] = np.conj(amp0)
        return S
    else:
        raise NotImplementedError('Hardware overlap measurement: implement SWAP-test or tomography batching per device')

print('Overlap measurement skeleton ready')

In [None]:
def build_qpe_circuit(count_qubits, system_statevector, controlled_unitary_gate):
    qc = QuantumCircuit(count_qubits + N_QUBITS, count_qubits)
    qc.initialize(system_statevector, list(range(count_qubits, count_qubits + N_QUBITS)))
    for q in range(count_qubits): qc.h(q)
    for j in range(count_qubits):
        power = 2 ** (count_qubits - 1 - j)
        qc.append(controlled_unitary_gate.power(power).control(1), [j] + list(range(count_qubits, count_qubits + N_QUBITS)))
    qft_dagger(qc, count_qubits)
    qc.measure(range(count_qubits), range(count_qubits))
    return qc

print('QPE builder ready')

## How to run & publish

1. Run the notebook **locally** with `RUN_MODE='local'` to test everything (no hardware costs).
2. Configure your IBM token as described earlier.
3. Set `RUN_MODE='hardware'` and `RUNTIME_BACKEND` to a backend you have access to.
4. Run the readout calibration and small hadamard-test flows first to validate.

### Publish as a Gist / open in Colab
- Download this notebook (.ipynb) and upload it as a Gist at gist.github.com or use `gh gist create hybrid_hardware_notebook.ipynb --public`.
- Open in Colab via `File → Open notebook → GitHub` and paste Gist URL, or use `https://colab.research.google.com/gist/<user>/<gist-id>/hybrid_hardware_notebook.ipynb`.
