In [20]:
import os
import functools
import time
from typing import Dict, List, Tuple, Union

# Plotting imports.
import matplotlib.pyplot as plt
plt.rcParams.update({"font.family": "serif", "font.size": 15})
%matplotlib inline

import networkx as nx
import numpy as np
import pandas as pd

# Qiskit imports.
import qiskit
from qiskit_experiments.library import StandardRB
import qiskit.ignis.verification.randomized_benchmarking as rb

from qiskit.transpiler.passes import RemoveBarriers, RemoveFinalMeasurements
from qiskit.circuit import Gate

# Mitiq imports.
from mitiq.interface import convert_to_mitiq, convert_from_mitiq
from mitiq import benchmarks, pec, zne

import cirq



In [2]:
# Option to use a noisy simulator instead of hardware backend.
use_noisy_simulator: bool = True  

# Minimum depth circuit to use.
min_depth: int = 1          
# Random seed for circuit generation.
seed: int = 1               

# Minimum noise scale factor.
min_scale_factor: int = 1   
# Maximum noise scale factor.
max_scale_factor: int = 3   

# Step between noise scale factors.
step_scale_factor: int = 1  

# Total number of shots to use.
# For PEC, the shots per circuit is shots / num_samples.
# For ZNE, the shots per circuit is shots / len(scale_factors).
shots: int = 10_000     
# Number of samples (circuits) to use in PEC.    
num_samples: int = 100  

# Display verbose output.    
verbose: bool = True  
# Give queue updates every this many seconds when running on hardware device.    
verbose_update_time: int = 30  

# Save data (cnot_counts, oneq_counts, true_values, noisy_values, zne_values, etc.).
save_data: bool = True 

In [3]:
# Benchmark circuit type. Supported types are "rb" and "mirror".
circuit_type: str = "rb"  

# Qubits to use on the experiment. 
num_qubits = 3
qubits = [j for j in range(num_qubits)]

# Split qubits into 2-qubit pairs (assuming a chain connectivity). 
rb_pattern = [[qa, qb] for qa, qb in zip(qubits[0:-1:2], qubits[1::2])]
if len(qubits) % 2 == 1:
    # For an odd number of qubits, append final individual qubit to the RB pattern.
    rb_pattern.append([qubits[-1]])
print("Qubit indeces:", qubits)
print("RB pattern:", rb_pattern)
    
    
# Maximum depth circuit to use.    
max_depth: int = 20

# Step to create a list of depths to run at.
step_depth: int = 2
        
# Average results over this many trials (circuit instances) at each depth.
trials: int = 4 

Qubit indeces: [0, 1, 2]
RB pattern: [[0, 1], [2]]


In [4]:
ibm_lima_ordering = [0, 1, 3, 4, 2]
    
if len(qubits) <= 5:
    # Assume ibmq-lima device. Take n adjacent physical qubits.
    physical_ibm_qubits = ibm_lima_ordering[:len(qubits)] 
elif len(qubits) <= len(ibm_kolkata_ordering):
    # Assume ibmq-kolkata device. Take n adjacent physical qubits.
    physical_ibm_qubits = ibm_kolkata_ordering[:len(qubits)]  
else:
    raise ValueError(f"Number of qubits {num_qubits} too large.")

print("IBM physical qubits:", physical_ibm_qubits)

IBM physical qubits: [0, 1, 3]


In [5]:
def make_rotation_barrier(circuit, delta):
    qubits = list(circuit.all_qubits())
    delta_x = np.random.choice([1.0, -1.0]) * delta
    delta_y = np.random.choice([1.0, -1.0]) * delta
    delta_z = np.random.choice([1.0, -1.0]) * delta
    barrier = cirq.Circuit()
    for q in qubits:
        barrier.append(cirq.rx(delta_x)(q))
        barrier.append(cirq.ry(delta_y)(q))
        barrier.append(cirq.rz(delta_z)(q))
    return barrier

def fold_with_rotation_barriers(circuit, scale_factor, delta):

    base_circuit, circ_type = convert_to_mitiq(circuit)

    ####### Adapted from Mitiq ############
    folded = base_circuit.copy()
    # Determine the number of global folds and the final fractional scale
    num_global_folds, fraction_scale = divmod(scale_factor - 1, 2)
    # Do the global folds
    for _ in range(int(num_global_folds)):
        folded += make_rotation_barrier(base_circuit, delta)
        folded += cirq.Circuit(cirq.inverse(base_circuit))
        folded += make_rotation_barrier(base_circuit, delta)
        folded += cirq.Circuit(base_circuit)

    # Fold remaining gates until the scale is reached
    operations = list(base_circuit.all_operations())
    num_to_fold = int(round(fraction_scale * len(operations) / 2))

    if num_to_fold > 0:

        # Create the inverse of the final partial circuit
        inverse_partial = cirq.Circuit()
        num_partial = 0
        for moment in base_circuit[::-1]:
            new_moment = cirq.Moment()
            for op in moment.operations[::-1]:
                new_moment = new_moment.with_operation(cirq.inverse(op))
                num_partial += 1
                if num_partial == num_to_fold:
                    break
            inverse_partial.append(new_moment)
            if num_partial == num_to_fold:
                break
        # Append partially folded circuit
        folded += make_rotation_barrier(base_circuit, delta)
        folded += inverse_partial
        folded += make_rotation_barrier(base_circuit, delta)
        folded += cirq.inverse(inverse_partial)
    #########
    
    folded_converted = convert_from_mitiq(folded, circ_type)
    return folded_converted

In [6]:
# Error mitigation type. Supported typed are "zne" and "pec".
mitigation_type: str = "zne"

# Random angles for rotation barriers    
delta = 0.0001

# Noise scaling method.
fold_method = zne.scaling.fold_global

depths = list(range(min_depth, max_depth + 1, step_depth))

scale_factors = tuple(range(min_scale_factor, max_scale_factor + 1, step_scale_factor))

print(f"Depths: {depths}")
print(f"Scale factors: {scale_factors}")

Depths: [1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
Scale factors: (1, 2, 3)


In [7]:
computer = nx.Graph()

# Assume chain-like connectivity
computer.add_edges_from([(qa, qb) for qa, qb in zip(qubits[:-1], qubits[1:])])

# Add reversed edges to computer graph.
# This is important to represent CNOT gates with target and control reversed.
computer = nx.to_directed(computer)

# Get the backend to run on and a simulated backend for comparing results.
if use_noisy_simulator:
    from qiskit.providers.fake_provider import FakeLima, FakeKolkataV2
        
    if len(qubits) <= 5:
        noisy_backend = FakeLima()
    else:
        noisy_backend = FakeKolkataV2()

# Specify both the hardware device and device architecture (computer):
else:

    provider = qiskit.IBMQ.load_account()
    noisy_backend = provider.get_backend(backend)
    print(f"View {backend} job status at https://quantum-computing.ibm.com/jobs")        

# Check if RB-pattern is consistent with device topology.
for edge in rb_pattern:
    if len(edge) == 2:
        if edge not in computer.edges:
            raise ValueError("The option rb_pattern is not consistent with the device topology.")
        
# Set ideal backends for simulator-selected options.
ideal_backend = qiskit.Aer.get_backend("aer_simulator")
    

print("Computer connectivity (used only for mirror circuits):", computer.edges)
print("Noisy backend", noisy_backend)
print("Ideal backend", ideal_backend)

Computer connectivity (used only for mirror circuits): [(0, 1), (1, 0), (1, 2), (2, 1)]
Noisy backend fake_lima
Ideal backend aer_simulator


In [12]:
def named_qubit_to_line_qubit(named_qubit: str) -> cirq.LineQubit:
    """Extract qubit number assuming "_" is used as a word separator."""
    digits = [int(s) for s in named_qubit.name.split("_") if s.isdigit()]
    if len(digits) == 1:
        return cirq.LineQubit(digits[0])
    else:
        raise RuntimeError("Failed to identify qubit number.")

In [21]:
def get_circuit_rb(depth: int, seed: int) -> Union[Tuple[cirq.Circuit, str], Tuple[qiskit.QuantumCircuit, str]]:
    circuit = rb.randomized_benchmarking_seq(
            length_vector=[depth], 
            rb_pattern=rb_pattern,
            group_gates="0", 
            rand_seed=seed,
    )[0][0][0]

        # Remove barriers and measurements.
    circuit = RemoveFinalMeasurements()(RemoveBarriers()(circuit))

    return convert_from_mitiq(convert_to_mitiq(circuit)[0], "qiskit"), "0" * len(qubits)

In [14]:
def get_circuit_mirror(depth: int, seed: int) -> Union[Tuple[cirq.Circuit, str], Tuple[qiskit.QuantumCircuit, str]]:
    return_type = "qiskit"
    circuit, correct_bitstring = benchmarks.generate_mirror_circuit(
            nlayers=depth,
            two_qubit_gate_prob=1.0,
            connectivity_graph=computer,
            two_qubit_gate_name="CNOT",
            seed=seed,
            return_type=return_type,
    )        
        
    return circuit, "".join(map(str, correct_bitstring[::-1])) 

In [15]:
def get_cnot_error(edge: Tuple[int, int] = None) -> float:
    # If we use a simulator, we can't get any error probability from any specific 
    # device so we hardcode a small "reasonable" CNOT error probability.
    cnot_error_prob = noisy_backend.properties().gate_error("cx", qubits=edge)

    print(f"cnot_error_prob for edge {edge}: {cnot_error_prob}")
    return cnot_error_prob

def get_cnot_representation(edge: Tuple[int, int]) -> pec.OperationRepresentation:
    cnot_circuit = cirq.Circuit(
            cirq.CNOT(
                cirq.NamedQubit(f"q_{str(edge[0])}"),
                cirq.NamedQubit(f"q_{str(edge[1])}")
            )
    )
            
    rep_exact_prob = 1 - np.sqrt(1 - get_cnot_error(edge))
    return pec.represent_operation_with_local_depolarizing_noise(
        cnot_circuit, 
        noise_level=rep_exact_prob,
    )


def get_representations(computer: nx.Graph) -> List[pec.OperationRepresentation]:
    return [get_cnot_representation(edge) for edge in computer.edges]

In [16]:
def get_num_cnot_count(circuit: Union[cirq.Circuit, qiskit.QuantumCircuit]) -> int:
    """Determine number of cnot gates in a given `Circuit` object."""
    return circuit.count_ops().get("cx")

def get_avg_cnot_count(circuits: Union[List[cirq.Circuit], List[qiskit.QuantumCircuit]]) -> float:
    """Determine average number of cnot gates present in 
    list of either `Circuit` or `QuantumCircuit` objects."""    
    
    return np.average([c.count_ops().get("cx") for c in circuits])
    

def get_oneq_count(circuit: Union[cirq.Circuit, qiskit.QuantumCircuit]) -> int:
    return len(circuit) - get_num_cnot_count(circuit)

### dataset generation

In [109]:
import json

from blackwater.data.generators.exp_val import ExpValueEntry
from blackwater.data.utils import (
    circuit_to_graph_data_json,
    generate_random_pauli_sum_op,
    get_backend_properties_v1,
    create_estimator_meas_data,
    encode_pauli_sum_op,
)

from tqdm.notebook import tqdm_notebook

In [110]:
properties = get_backend_properties_v1(noisy_backend)


In [111]:
dataset = []

for r in tqdm_notebook(range(20)):
    for depth in depths:
        # print(depth, end="...")

        for trial in range(trials):
            local_seed = 10**8 * r + 10**6 * depth + 10**3 * seed + trial

            circuit, correct_bitstring = get_circuit_mirror(depth, local_seed)
            circuit.measure_all()

            transpiled = qiskit.transpile(
                circuit,
                backend=noisy_backend,
                initial_layout=physical_ibm_qubits,
                optimization_level=0, # Otherwise RB circuits are simplified to empty circuits.
            )

            noisy_job_result = noisy_backend.run(
                transpiled, 
                init_qubits=True,  
                shots=shots,
            ).result()
            noisy_value = noisy_job_result.get_counts().get(correct_bitstring, 0.0) / shots

            ideal_job_result = ideal_backend.run(
                transpiled, 
                init_qubits=True,  
                shots=shots,
            ).result()
            ideal_value = ideal_job_result.get_counts().get(correct_bitstring, 0.0) / shots

            graph_data = circuit_to_graph_data_json(
                circuit=transpiled,
                properties=properties,
                use_qubit_features=True,
                use_gate_features=True,
            )

            entry = ExpValueEntry(
                circuit_graph=graph_data,
                observable=[],
                ideal_exp_value=ideal_value,
                noisy_exp_value=noisy_value,
                circuit_depth=circuit.depth(),
            )

            dataset.append(entry.to_dict())
    
    if r % 2 == 0:
        with open(f"./data/rb_uf/{r}.json", "w") as f:
            json.dump(dataset, f)
            
            dataset = []

len(dataset)

  0%|          | 0/20 [00:00<?, ?it/s]

KeyboardInterrupt: 