In [1]:
import sys
import time
import numpy as np
from pathlib import Path
from tqdm import tqdm
from typing import Callable, Dict, List

# pytket imports
from pytket import Circuit, OpType
from pytket.passes import DecomposeBoxes, AutoRebase, FullPeepholeOptimise

# add path to your local tket2_rewriting repo
sys.path.append("../../../tket2-rewriting/tket2_rewriting")

# import your custom optimisers
from cx_gadget.cx_gadget import CXGadget
from zzphase_flip.zzphase_flip import ZZPhaseFlip
from three_qubit_squash.three_qubit_squash_gadget import Squash3Gadget
from label.label_gadget import LabelGadget
from KAK.kak_gadget import KAKGadget
from tket.circuit import Tk2Circuit
from tket.optimiser import BadgerOptimiser

from circuit_generation.cx_gadget_random import CXGadgetBenchmarkCircuit
from circuit_generation.cx_gadget_vanilla import CXGadgetVanillaBenchmarkCircuit
from circuit_generation.cx_many_rz import CXManyRzBenchmarkCircuit
from circuit_generation.gadget import (
    SquashGadgetTestCircuit,
    LabelFriendlyCircuit,
    SimpleLabelFriendlyCircuit,
    KAKFriendlyCircuit,
)

In [2]:
N_SAMPLES = 10
WIDTH = 8
DEPTH = 10
RNG = np.random.default_rng(42)

# List of circuit generators to benchmark
GENERATORS: Dict[str, Callable[[], Circuit]] = {
    "CXGadgetBenchmarkCircuit": lambda: CXGadgetBenchmarkCircuit(WIDTH, DEPTH, RNG),
    "CXGadgetVanillaBenchmarkCircuit": lambda: CXGadgetVanillaBenchmarkCircuit(WIDTH, DEPTH, RNG),
    "CXManyRzBenchmarkCircuit": lambda: CXManyRzBenchmarkCircuit(WIDTH, DEPTH, RNG),
    "SquashGadgetTestCircuit": SquashGadgetTestCircuit,
    "LabelFriendlyCircuit": lambda: LabelFriendlyCircuit(WIDTH, DEPTH, RNG),
    "SimpleLabelFriendlyCircuit": lambda: SimpleLabelFriendlyCircuit(WIDTH, DEPTH, RNG),
    "KAKFriendlyCircuit": lambda: KAKFriendlyCircuit(WIDTH, DEPTH, RNG),
}

In [3]:
GADGET_INSTANCES = [
    ZZPhaseFlip(),
    CXGadget(),
    Squash3Gadget(),
    # LabelGadget(),
    KAKGadget(),
]
GADGET_NAMES = ["ZZPhaseFlip", "CXGadget", "Squash3Gadget", "KAKGadget"]

OPTIMISER = BadgerOptimiser(GADGET_INSTANCES)

In [4]:
import time
def benchmark_generator(name, gen, n_samples, time_budget_s=None):
    start = time.monotonic()
    total_opt_time = total_peephole_time = 0.0
    orig_2q = opt_2q = peephole_2q = 0
    timed_out = False

    for i in range(n_samples):
        if time_budget_s and time.monotonic() - start > time_budget_s:
            timed_out = True
            break

        circ = gen()
        DecomposeBoxes().apply(circ)
        AutoRebase({OpType.CX, OpType.Rz, OpType.Rx}).apply(circ)

        tk2 = Tk2Circuit(circ)
        t0 = time.monotonic()
        opt_tk2 = OPTIMISER.optimise(tk2)
        total_opt_time += time.monotonic() - t0

        opt1 = opt_tk2.to_tket1()
        t1 = time.monotonic()
        FullPeepholeOptimise().apply(opt1)
        total_peephole_time += time.monotonic() - t1

        orig_2q += circ.n_2qb_gates()
        peephole_2q += opt1.n_2qb_gates()

    return {
        "name": name,
        "orig_2q": orig_2q,
        "peephole_2q": peephole_2q,
        "total_opt_time": total_opt_time,
        "total_peephole_time": total_peephole_time,
        "timed_out": timed_out,
        "samples_done": i + 1 if not timed_out else i,
    }


In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError
import time

GLOBAL_S = 290
PER_TASK_S = 120  # e.g., 2 min per generator

start = time.monotonic()
deadline = start + GLOBAL_S
results = []

with ThreadPoolExecutor() as ex:
    futs = {ex.submit(benchmark_generator, n, g, N_SAMPLES, PER_TASK_S): n
            for n, g in GENERATORS.items()}

    while futs and time.monotonic() < deadline:
        try:
            for fut in as_completed(list(futs), timeout=5):
                name = futs.pop(fut)
                try:
                    res = fut.result()  # already finished
                    results.append(res)
                    print(f"[OK] {name} ({res['samples_done']} samples"
                          f"{' TIMEOUT' if res['timed_out'] else ''})")
                except Exception as e:
                    print(f"[WARN] {name} failed: {e!r}")
        except TimeoutError:
            pass

# anything left is still running; we must not wait for them
for fut, name in list(futs.items()):
    fut.cancel()
    print(f"[SKIP] {name} still running; skipping to avoid cell timeout")


[OK] SquashGadgetTestCircuit (10 samples)




[OK] KAKFriendlyCircuit (9 samples TIMEOUT)
[OK] CXManyRzBenchmarkCircuit (1 samples TIMEOUT)



thread '<unnamed>' panicked at tket-py/src/protocol/circuit_replacer.rs:294:16:
index out of bounds: the len is 35 but the index is 35
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

thread '<unnamed>' panicked at tket-py/src/protocol/circuit_replacer.rs:294:16:
index out of bounds: the len is 30 but the index is 30


In [None]:
# Total runtime across all benchmarks
total_time = sum(r["total_opt_time"] + r["total_peephole_time"] for r in results)
print(f"\nTotal runtime for all benchmarks (optimiser + peephole): {total_time:.2f} s")

# Total 2-qubit gate reduction across all benchmarks
total_orig_2q = sum(r["orig_2q"] for r in results)
total_final_2q = sum(r["peephole_2q"] for r in results)
total_reduction = total_orig_2q - total_final_2q
total_pct_reduction = 100.0 * total_reduction / total_orig_2q if total_orig_2q > 0 else 0.0

print(f"Total 2Q gates: {total_orig_2q} → {total_final_2q} "
      f"({total_reduction} reduced, {total_pct_reduction:.2f}% reduction)")


Total runtime for all benchmarks (optimiser + peephole): 19.28 s
Total 2Q gates: 68 → 38 (30 reduced, 44.12% reduction)
