# QISKIT HACKATHON: THE DECOHERENCE WATCHDOG

### Import libraries

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from qiskit.circuit import QuantumCircuit, QuantumRegister, ClassicalRegister
from qiskit.circuit.library import QuantumVolume
from qiskit.compiler import transpile
from qiskit.transpiler import PassManager, CouplingMap
from qiskit.transpiler.basepasses import TransformationPass
from qiskit.transpiler.passes import SabreLayout, ASAPScheduleAnalysis
from qiskit.transpiler.exceptions import TranspilerError
from qiskit.dagcircuit import DAGCircuit
from qiskit.converters import dag_to_circuit, circuit_to_dag
from qiskit_ibm_runtime.fake_provider.backends import FakeTorino
from qiskit_aer import AerSimulator
from qiskit.quantum_info import Statevector, state_fidelity, DensityMatrix, hellinger_fidelity
from qiskit.result import Counts


print("--- Project: The Decoherence Watchdog ---")
print("All imports successful.")

--- Project: The Decoherence Watchdog ---
All imports successful.


### Define the Benchmark Circuit and Target Backend

In [2]:
# We use a Quantum Volume circuit as our benchmark.
num_qubits = 3
depth = 3
benchmark_circuit = QuantumVolume(num_qubits, depth, seed=42).decompose()
benchmark_circuit.measure_all()
benchmark_circuit.name = "QV3"

# We use FakeTorino, which is a Backend object.
backend = FakeTorino()
# *** FIX: Access properties directly from the backend object ***
coupling_map = backend.coupling_map
instruction_durations = backend.instruction_durations
dt = backend.dt # System time resolution in seconds

print(f"\nBenchmark circuit: '{benchmark_circuit.name}' on backend: '{backend.name}'")
print(f"Number of operations: {benchmark_circuit.size()}")


Benchmark circuit: 'QV3' on backend: 'fake_torino'
Number of operations: 6


### The custom transpiler

In [3]:
class DecoherenceWatchdogPass(TransformationPass):
    """
    A hardware-aware transpiler pass that finds the most vulnerable idle
    spot in a circuit and inserts an error-heralding 'watchdog' gadget.
    """
    def __init__(self, backend_properties, coupling_map):
        super().__init__()
        self.props = backend_properties
        self.coupling_map = coupling_map

    def run(self, dag: DAGCircuit) -> DAGCircuit:
        """The main execution method of the transpiler pass."""
        print("\n[WatchdogPass] Running analysis...")

        if 'layout' not in self.property_set or 'node_start_time' not in self.property_set:
            raise TranspilerError(
                "DecoherenceWatchdogPass requires layout and scheduling passes to be run before it."
            )

        layout = self.property_set['layout']
        schedule = self.property_set['node_start_time']

        worst_spot = { "cost": -1, "qubit": None }

        for physical_qubit in layout.get_physical_bits():
            qubit_ops = sorted(
                [node for node in dag.op_nodes() if physical_qubit in [layout[q] for q in node.qargs]],
                key=lambda n: schedule[n]
            )

            for i in range(len(qubit_ops) - 1):
                prev_node = qubit_ops[i]
                next_node = qubit_ops[i+1]
                
                prev_qargs = tuple(layout[q] for q in prev_node.qargs)
                try:
                    prev_duration_sec = self.props.gate_length(prev_node.op.name.lower(), prev_qargs)
                except Exception:
                    # If gate duration is not available, use a default duration
                    prev_duration_sec = 100e-9  # 100 nanoseconds default

                idle_start_sec = (schedule[prev_node] * dt) + prev_duration_sec
                idle_end_sec = schedule[next_node] * dt
                idle_duration_sec = idle_end_sec - idle_start_sec

                if idle_duration_sec > 1e-9:
                    try:
                        t2_time = self.props.t2(physical_qubit)
                        if t2_time is not None and t2_time > 0:
                            idling_cost = idle_duration_sec / t2_time
                            if idling_cost > worst_spot["cost"]:
                                worst_spot = {"cost": idling_cost, "qubit": physical_qubit}
                    except Exception:
                        # If T2 time is not available, skip this qubit
                        continue

        if worst_spot["qubit"] is None:
            print("[WatchdogPass] No significant idle time found. No action taken.")
            return dag

        print(f"[WatchdogPass] Found most vulnerable spot on qubit {worst_spot['qubit']} with cost {worst_spot['cost']:.4f}")

        data_qubit = worst_spot["qubit"]
        ancilla_qubit = None
        best_ancilla_t2 = -1

        for neighbor in self.coupling_map.neighbors(data_qubit):
            try:
                neighbor_t2 = self.props.t2(neighbor)
                if neighbor_t2 is not None and neighbor_t2 > best_ancilla_t2:
                    best_ancilla_t2 = neighbor_t2
                    ancilla_qubit = neighbor
            except Exception:
                # If T2 time is not available for this neighbor, skip it
                continue

        if ancilla_qubit is None:
            print("[WatchdogPass] Could not find a suitable ancilla. Aborting modification.")
            return dag
            
        print(f"[WatchdogPass] Selected data qubit: {data_qubit}, ancilla qubit: {ancilla_qubit}")

        qr = QuantumRegister(2, 'q')
        cr = ClassicalRegister(1, 'c_watchdog')
        gadget_qc = QuantumCircuit(qr, cr, name="watchdog")
        gadget_qc.h(qr[1])
        gadget_qc.cx(qr[0], qr[1])
        gadget_qc.h(qr[1])
        gadget_qc.measure(qr[1], cr[0])
        
        dag.add_creg(cr)
        gadget_dag = circuit_to_dag(gadget_qc)
        dag.compose(gadget_dag, qubits=[data_qubit, ancilla_qubit], clbits=[dag.clbits[-1]])
        print("[WatchdogPass] Watchdog gadget successfully inserted.")
            
        return dag


def draw_circuit_comparison(original_circuit, modified_circuit):
    """Draw both original and modified circuits using DAG representation."""
    print("\n" + "="*60)
    print("CIRCUIT VISUALIZATION")
    print("="*60)
    
    # Convert circuits to DAGs for cleaner visualization
    original_dag = circuit_to_dag(original_circuit)
    modified_dag = circuit_to_dag(modified_circuit)
    
    print(f"\nOriginal Circuit ({original_circuit.size()} operations):")
    print("-" * 40)
    try:
        # Use DAG's draw method with Graphviz
        original_dag.draw(output='mpl', style='color')
        plt.title("Original Circuit DAG")
        plt.show()
    except Exception as e:
        print(f"Could not draw original DAG: {e}")
        # Fallback to text representation
        print(original_circuit.draw(output='text'))
    
    print(f"\nModified Circuit with Watchdog ({modified_circuit.size()} operations):")
    print("-" * 40)
    try:
        # Use DAG's draw method with Graphviz
        modified_dag.draw(output='mpl', style='color')
        plt.title("Modified Circuit DAG with Watchdog")
        plt.show()
    except Exception as e:
        print(f"Could not draw modified DAG: {e}")
        # Fallback to text representation
        print(modified_circuit.draw(output='text'))
    
    print("="*60)


### Run experiments

In [5]:
print("\n--- Running Experiments ---")

# A. Calculate the ideal, noise-free result for fidelity comparison
ideal_circuit = benchmark_circuit.copy()
ideal_circuit.remove_final_measurements(inplace=True)
ideal_state = Statevector(ideal_circuit)
ideal_distribution = ideal_state.probabilities_dict()

# B. Run the Baseline: Qiskit's best generic optimization
print("\n1. Running Baseline (optimization_level=3)...")
baseline_circuit = transpile(benchmark_circuit, backend, optimization_level=3)
sim_noise = AerSimulator.from_backend(backend)
baseline_result = sim_noise.run(baseline_circuit, shots=8192).result()
baseline_counts = baseline_result.get_counts()
baseline_fidelity = hellinger_fidelity(ideal_distribution, baseline_counts)
print(f"   Baseline Fidelity: {baseline_fidelity:.4f}")

# C. Run Our Solution: The Custom Pass Manager
print("\n2. Running Our Custom 'Decoherence Watchdog' Pass...")

# *** SOLUTION: Schedule-aware approach that handles new nodes ***
print("[Pipeline] Creating schedule-aware pass manager...")

class ScheduleAwareWatchdogPass(TransformationPass):
    """Schedule-aware watchdog pass that properly handles timing for new nodes."""
    
    def __init__(self, backend_properties, coupling_map, instruction_durations):
        super().__init__()
        self.props = backend_properties
        self.coupling_map = coupling_map
        self.instruction_durations = instruction_durations
    
    def run(self, dag: DAGCircuit) -> DAGCircuit:
        print("\n[ScheduleAwareWatchdog] Running analysis...")
        
        if 'layout' not in self.property_set or 'node_start_time' not in self.property_set:
            raise TranspilerError("Requires layout and scheduling passes to be run before it.")
        
        layout = self.property_set['layout']
        schedule = self.property_set['node_start_time']
        
        # Find vulnerable spot (same logic as before)
        worst_spot = {"cost": -1, "qubit": None}
        
        for physical_qubit in layout.get_physical_bits():
            qubit_ops = sorted(
                [node for node in dag.op_nodes() if physical_qubit in [layout[q] for q in node.qargs]],
                key=lambda n: schedule[n]
            )
            
            for i in range(len(qubit_ops) - 1):
                prev_node = qubit_ops[i]
                next_node = qubit_ops[i+1]
                
                prev_qargs = tuple(layout[q] for q in prev_node.qargs)
                try:
                    prev_duration_sec = self.props.gate_length(prev_node.op.name.lower(), prev_qargs)
                except Exception:
                    prev_duration_sec = 100e-9
                
                idle_start_sec = (schedule[prev_node] * dt) + prev_duration_sec
                idle_end_sec = schedule[next_node] * dt
                idle_duration_sec = idle_end_sec - idle_start_sec
                
                if idle_duration_sec > 1e-9:
                    try:
                        t2_time = self.props.t2(physical_qubit)
                        if t2_time is not None and t2_time > 0:
                            idling_cost = idle_duration_sec / t2_time
                            if idling_cost > worst_spot["cost"]:
                                worst_spot = {"cost": idling_cost, "qubit": physical_qubit}
                    except Exception:
                        continue
        
        if worst_spot["qubit"] is None:
            print("[ScheduleAwareWatchdog] No significant idle time found.")
            return dag
        
        print(f"[ScheduleAwareWatchdog] Found vulnerable spot on qubit {worst_spot['qubit']} with cost {worst_spot['cost']:.4f}")
        
        # Find ancilla
        data_qubit = worst_spot["qubit"]
        ancilla_qubit = None
        best_ancilla_t2 = -1
        
        for neighbor in self.coupling_map.neighbors(data_qubit):
            try:
                neighbor_t2 = self.props.t2(neighbor)
                if neighbor_t2 is not None and neighbor_t2 > best_ancilla_t2:
                    best_ancilla_t2 = neighbor_t2
                    ancilla_qubit = neighbor
            except Exception:
                continue
        
        if ancilla_qubit is None:
            print("[ScheduleAwareWatchdog] Could not find suitable ancilla.")
            return dag
        
        print(f"[ScheduleAwareWatchdog] Selected data qubit: {data_qubit}, ancilla: {ancilla_qubit}")
        
        # Add watchdog gadget with timing information
        qr = QuantumRegister(2, 'q')
        cr = ClassicalRegister(1, 'c_watchdog')
        gadget_qc = QuantumCircuit(qr, cr, name="watchdog")
        gadget_qc.h(qr[1])
        gadget_qc.cx(qr[0], qr[1])
        gadget_qc.h(qr[1])
        gadget_qc.measure(qr[1], cr[0])
        
        dag.add_creg(cr)
        gadget_dag = circuit_to_dag(gadget_qc)
        
        # Store original scheduling for later restoration
        original_schedule = dict(schedule)
        
        # Compose the gadget
        dag.compose(gadget_dag, qubits=[data_qubit, ancilla_qubit], clbits=[dag.clbits[-1]])
        
        # Add timing for new nodes by estimating their durations
        new_nodes = [node for node in dag.op_nodes() if node not in original_schedule]
        current_time = max(original_schedule.values()) if original_schedule else 0
        
        for node in new_nodes:
            try:
                # Try to get duration from instruction_durations
                duration = self.instruction_durations.get(node.op.name, [(None, 160)])  # 160dt default
                if isinstance(duration, list) and len(duration) > 0:
                    duration_dt = duration[0][1] if duration[0][1] is not None else 160
                else:
                    duration_dt = 160
                schedule[node] = current_time
                current_time += duration_dt
            except Exception:
                schedule[node] = current_time
                current_time += 160  # Default duration in dt units
        
        print("[ScheduleAwareWatchdog] Watchdog gadget inserted with proper timing.")
        return dag

# Create the pass manager with scheduling
pre_transpiled_circuit = transpile(benchmark_circuit, backend, optimization_level=1)

pm = PassManager([
    SabreLayout(coupling_map),
    ASAPScheduleAnalysis(instruction_durations),
    ScheduleAwareWatchdogPass(backend.properties(), coupling_map, instruction_durations)
])

print("[Analysis] Running schedule-aware vulnerability analysis...")
watchdog_circuit_raw = pm.run(pre_transpiled_circuit)

print(f"[WatchdogPass] Raw circuit has {watchdog_circuit_raw.size()} operations")

# CRITICAL FIX: Transpile the watchdog circuit to backend's basis gates
# This converts H gates and other gates to the backend's native gate set
print("[Transpilation] Converting watchdog circuit to backend basis gates...")

# Extract layout information properly
layout_to_use = None
if hasattr(watchdog_circuit_raw, '_layout') and watchdog_circuit_raw._layout is not None:
    # Convert TranspileLayout to a format the transpiler can understand
    layout_to_use = watchdog_circuit_raw._layout.initial_layout
    print(f"[Transpilation] Using preserved layout: {layout_to_use}")

watchdog_circuit = transpile(
    watchdog_circuit_raw, 
    backend, 
    initial_layout=layout_to_use,  # Use properly extracted layout
    optimization_level=0  # Don't optimize further to preserve our watchdog
)

print(f"[Success] Final transpiled circuit has {watchdog_circuit.size()} operations")

# Draw circuit comparison  
draw_circuit_comparison(pre_transpiled_circuit, watchdog_circuit)

# Now the circuit should be compatible with AerSimulator
print("[Simulation] Running watchdog circuit...")

# Reduce shots and add memory configuration to prevent memory overflow
print(f"[Debug] Circuit depth: {watchdog_circuit.depth()}, width: {watchdog_circuit.width()}")

# Try basic AerSimulator without noise model for testing
print("[Simulation] Using basic AerSimulator (no noise) for memory efficiency...")
sim_basic = AerSimulator(method='automatic')  # Use QASM simulator which is more memory efficient

try:
    watchdog_result = sim_basic.run(watchdog_circuit, shots=1024, memory=True).result()  # Further reduced shots
    memory = watchdog_result.get_memory(0)  # Get memory for the first (and only) experiment
    print(f"[Success] Simulation completed with {len(memory)} memory samples")
except Exception as e:
    print(f"[Error] Simulation failed: {e}")
    print("[Fallback] Getting counts instead of memory...")
    watchdog_result = sim_basic.run(watchdog_circuit, shots=1024).result()
    watchdog_counts = watchdog_result.get_counts(0)
    memory = None
    print(f"[Success] Got counts: {len(watchdog_counts)} unique bitstrings")

# D. Post-Selection and Final Fidelity Calculation
good_counts = {}
raw_counts = {}
for mem_str in memory:
    parts = mem_str.split()
    if len(parts) < 2: continue
    # The memory string format for mid-circuit measurements is "c1 c0"
    # c1 is our watchdog bit, c0 is the main data register
    watchdog_bit, data_bits = parts
    
    raw_counts[data_bits] = raw_counts.get(data_bits, 0) + 1
    if watchdog_bit == '0':
        good_counts[data_bits] = good_counts.get(data_bits, 0) + 1

raw_watchdog_fidelity = hellinger_fidelity(ideal_distribution, raw_counts)
print(f"   Watchdog Fidelity (Raw, all shots): {raw_watchdog_fidelity:.4f}")

if not good_counts:
    post_selected_fidelity = 0.0
    print("[Warning] All shots were discarded by the watchdog. Fidelity is 0.")
else:
    post_selected_fidelity = hellinger_fidelity(ideal_distribution, good_counts)

print(f"   Watchdog Fidelity (Post-Selected): {post_selected_fidelity:.4f}")
print(f"   (Based on {sum(good_counts.values())} of {len(memory)} total shots)")



--- Running Experiments ---

1. Running Baseline (optimization_level=3)...
   Baseline Fidelity: 0.9964

2. Running Our Custom 'Decoherence Watchdog' Pass...
[Pipeline] Creating schedule-aware pass manager...
[Analysis] Running schedule-aware vulnerability analysis...

[ScheduleAwareWatchdog] Running analysis...
[ScheduleAwareWatchdog] Found vulnerable spot on qubit 88 with cost 0.0033
[ScheduleAwareWatchdog] Selected data qubit: 88, ancilla: 87
[ScheduleAwareWatchdog] Watchdog gadget inserted with proper timing.
[WatchdogPass] Raw circuit has 103 operations
[Transpilation] Converting watchdog circuit to backend basis gates...
[Transpilation] Using preserved layout: Layout({
65: <Qubit register=(133, "q"), index=0>,
64: <Qubit register=(133, "q"), index=1>,
63: <Qubit register=(133, "q"), index=2>,
67: <Qubit register=(133, "q"), index=3>,
82: <Qubit register=(133, "q"), index=4>,
132: <Qubit register=(133, "q"), index=5>,
19: <Qubit register=(133, "q"), index=6>,
74: <Qubit register=

QiskitError: 'ERROR:  [Experiment 0] Insufficient memory to run circuit QV3 using the statevector simulator. Required memory: 1048576M, max memory: 16384M ,  ERROR: Insufficient memory to run circuit QV3 using the statevector simulator. Required memory: 1048576M, max memory: 16384M'

### Plots

In [None]:
print("\n--- Generating Final Report ---")

labels = ['Qiskit level=3 (Baseline)', 'Watchdog (Raw)', 'Watchdog (Post-Selected)']
fidelities = [baseline_fidelity, raw_watchdog_fidelity, post_selected_fidelity]

fig, ax = plt.subplots(figsize=(10, 6))
bars = ax.bar(labels, fidelities, color=['#648FFF', '#FFB000', '#DC267F'])
ax.set_ylabel('State Fidelity')
ax.set_title('Fidelity Comparison: Standard Transpilation vs. Decoherence Watchdog')
ax.set_ylim(0, max(fidelities) * 1.2)

# Add fidelity values on top of the bars
for bar in bars:
    yval = bar.get_height()
    ax.text(bar.get_x() + bar.get_width()/2.0, yval + 0.01, f'{yval:.4f}', ha='center', va='bottom')

plt.xticks(rotation=15, ha="right")
plt.tight_layout()
plt.show()

# Display the winning method
winner_index = np.argmax(fidelities)
print(f"\nWinning Method: {labels[winner_index]}")
if winner_index == 2:
    improvement = ((post_selected_fidelity - baseline_fidelity) / baseline_fidelity) * 100
    print(f"Our method improved upon the baseline by {improvement:.2f}%!")
elif winner_index == 0:
    print("The baseline performed best. Our pass may have added too much noise.")
else:
    print("The raw watchdog circuit performed best, indicating a potential net benefit even without post-selection.")
