In [9]:
import os
import sys
import time
import json
import sqlite3
import copy
from pathlib import Path

import numpy as np
import opt_einsum as oe


def find_project_root(start: Path) -> Path:
    for candidate in [start] + list(start.parents):
        if (candidate / "my_implementation").exists():
            return candidate
    raise RuntimeError("Could not locate project root")


PROJECT_ROOT = find_project_root(Path().resolve())
if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))
if str(PROJECT_ROOT / "Quantum") not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT / "Quantum"))

import InfiniQuantumSim.TLtensor as tlt  # noqa: E402

from my_implementation.src import db  # noqa: E402
from my_implementation.src.simulator import run_circuit  # noqa: E402
from my_implementation.src.state_manager import fetch_state  # noqa: E402
from my_implementation.src.frontend import circuit_dict_to_gates  # noqa: E402

SCHEMA_PATH = PROJECT_ROOT / "my_implementation" / "sql" / "schema.sql"
CHECKPOINT_DIR = PROJECT_ROOT / "checkpoints" / "benchmark_notebook"
MAX_VERIFY_QUBITS = int(os.environ.get("BENCHMARK_PARITY_MAX_QUBITS", "64"))
VERIFY_ALL = os.environ.get("BENCHMARK_VERIFY_ALL", "1") != "0"



In [10]:
class SerialRandomGreedy(oe.RandomGreedy):
    def __init__(self, *args, **kwargs):
        kwargs["parallel"] = False
        super().__init__(*args, **kwargs)


oe.RandomGreedy = SerialRandomGreedy



In [11]:
def prepare_circuit(generator, n_qubits):
    circuit = copy.deepcopy(generator(n_qubits))
    if generator is tlt.generate_qpe_circuit:
        target = circuit["number_of_qubits"] - 1
        cu_idx = 0
        for gate in circuit["gates"]:
            if gate["gate"] == "CU" and not gate.get("qubits"):
                gate["qubits"] = [cu_idx, target]
                cu_idx += 1
    return circuit



In [12]:
def fetch_state_dict(connection, version, eps=1e-12):
    rows = fetch_state(connection, version)
    return {idx: complex(real, imag) for idx, real, imag in rows if abs(real) > eps or abs(imag) > eps}


def run_my_impl(circuit_dict, checkpoint_subdir):
    connection = sqlite3.connect(":memory:")
    db.initialize_schema(connection, SCHEMA_PATH)
    checkpoint_subdir.mkdir(parents=True, exist_ok=True)
    version = run_circuit(connection, circuit_dict, checkpoint_dir=str(checkpoint_subdir))
    state = fetch_state_dict(connection, version)
    connection.close()
    return version, state


def run_reference_impl(circuit_dict, eps=1e-12):
    qc = tlt.QuantumCircuit(circuit_dict=circuit_dict)
    dense = qc.run(contr_method="np").reshape(-1)
    return {idx: amp for idx, amp in enumerate(dense) if abs(amp) > eps}


def max_diff_sparse(state_a, state_b):
    keys = set(state_a) | set(state_b)
    if not keys:
        return 0.0
    return max(abs(state_a.get(k, 0.0) - state_b.get(k, 0.0)) for k in keys)



In [13]:
BENCHMARK_CIRCUITS = {
    "GHZ": (tlt.generate_ghz_circuit, range(5, 35)),
    "W": (tlt.generate_w_circuit, range(5, 35)),
    "QFT": (tlt.generate_qft_circuit, range(5, 25)),
    "QPE": (tlt.generate_qpe_circuit, range(5, 19)),
}

results = {}



In [None]:
for circuit_name, (generator, qubit_range) in BENCHMARK_CIRCUITS.items():
    print(f"## {circuit_name}")
    records = []
    for n in qubit_range:
        circuit = prepare_circuit(generator, n)
        n_qubits = circuit["number_of_qubits"]
        # Ensure frontend can materialize gate objects even if we skip verification later.
        _n_check, _gates = circuit_dict_to_gates(circuit)
        assert _n_check == n_qubits

        start = time.perf_counter()
        version, state = run_my_impl(circuit, CHECKPOINT_DIR / circuit_name)
        elapsed = time.perf_counter() - start

        record = {
            "n_qubits": n_qubits,
            "logical_depth": len(circuit["gates"]),
            "state_version": version,
            "non_zero_states": len(state),
            "time_seconds": elapsed,
            "max_diff_vs_reference": None,
        }

        if VERIFY_ALL or n_qubits <= MAX_VERIFY_QUBITS:
            ref_state = run_reference_impl(circuit)
            record["max_diff_vs_reference"] = float(max_diff_sparse(state, ref_state))
        else:
            record["max_diff_vs_reference"] = None

        records.append(record)
        print(
            f"{circuit_name} n={n_qubits}: time={elapsed:.3f}s, "
            f"non-zero={record['non_zero_states']}, diff={record['max_diff_vs_reference']}"
        )

    results[circuit_name] = records



## GHZ
GHZ n=5: time=0.002s, non-zero=2, diff=0.0
GHZ n=6: time=0.002s, non-zero=2, diff=0.0
GHZ n=7: time=0.002s, non-zero=2, diff=0.0
GHZ n=8: time=0.003s, non-zero=2, diff=0.0
GHZ n=9: time=0.005s, non-zero=2, diff=0.0
GHZ n=10: time=0.008s, non-zero=2, diff=0.0
GHZ n=11: time=0.012s, non-zero=2, diff=0.0
GHZ n=12: time=0.021s, non-zero=2, diff=0.0
GHZ n=13: time=0.041s, non-zero=2, diff=0.0
GHZ n=14: time=0.080s, non-zero=2, diff=0.0
GHZ n=15: time=0.166s, non-zero=2, diff=0.0
GHZ n=16: time=0.333s, non-zero=2, diff=0.0
GHZ n=17: time=0.730s, non-zero=2, diff=0.0
GHZ n=18: time=1.430s, non-zero=2, diff=0.0
GHZ n=19: time=2.721s, non-zero=2, diff=0.0
GHZ n=20: time=5.491s, non-zero=2, diff=0.0
GHZ n=21: time=12.245s, non-zero=2, diff=0.0
GHZ n=22: time=22.959s, non-zero=2, diff=0.0
GHZ n=23: time=47.756s, non-zero=2, diff=0.0
GHZ n=24: time=102.145s, non-zero=2, diff=0.0
GHZ n=25: time=209.603s, non-zero=2, diff=0.0
GHZ n=26: time=460.113s, non-zero=2, diff=0.0
GHZ n=27: time=1037.6

In [None]:
timestamp = time.strftime("%Y%m%d-%H%M%S")
output_path = CHECKPOINT_DIR / f"results_{timestamp}.json"
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(results, indent=2))
print(f"Saved benchmark summary to {output_path}")

