Mitq tutorial: https://mitiq.readthedocs.io/en/stable/examples/ibmq-backends.html

In [2]:
import time, random
import numpy as np
from qiskit.circuit import QuantumCircuit
from qiskit.quantum_info import random_clifford, Pauli, Statevector
import matplotlib.pyplot as plt
from copy import deepcopy

np.set_printoptions(precision=6, edgeitems=10, linewidth=150, suppress=True)

In [3]:
import qiskit
import itertools
from qiskit import *
from qiskit.quantum_info import Clifford, random_clifford
from qiskit.synthesis import synth_clifford_full
from qiskit.quantum_info import hellinger_fidelity as hf

from utils.pauli_checks import ChecksFinder, add_pauli_checks, add_meas_pauli_checks, add_linear_meas_pauli_checks,  search_for_pauli_list
from utils.pauli_checks import gen_initial_layout, gen_final_layout, complete_postprocess, filter_results

from utils.pauli_checks import convert_to_PCS_circ # new util

from utils.utils import norm_dict, total_counts
# from utils.vqe_utils import evaluation

In [4]:
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_aer import QasmSimulator

from mitiq import zne
from mitiq.interface.mitiq_qiskit.qiskit_utils import initialized_depolarizing_noise

#### Backend settings

In [6]:
USE_REAL_HARDWARE = False

In [7]:
from qiskit_ibm_runtime.fake_provider import *
from qiskit_aer import AerSimulator
import qiskit_aer.noise as noise
from itertools import combinations

# Fake backend noise model
# fake_backend = FakeCairo()
# noise_model = noise.NoiseModel.from_backend(fake_backend)

# Custom nosie model
prob_1 = 0.002  # 1-qubit gate
prob_2 = 0.02  # 2-qubit gate

error_1 = noise.depolarizing_error(prob_1, 1)
error_2 = noise.depolarizing_error(prob_2, 2)

noise_model = noise.NoiseModel()
noise_model.add_all_qubit_quantum_error(error_1, ['u1', 'u2', 'u3', 'sx', 'x'])
# noise_model.add_all_qubit_quantum_error(error_1, ['x'])
noise_model.add_all_qubit_quantum_error(error_2, ['cx'], ['cz'])

In [10]:
if QiskitRuntimeService.saved_accounts() and USE_REAL_HARDWARE:
    service = QiskitRuntimeService()
    backend = service.least_busy(operational=True, simulator=False)
    noise_model = False
else:
    # Simulate the circuit with noise
    backend = AerSimulator(noise_model=noise_model)

#### Generate circuit

In [16]:
# # Optimized parameters

# parameters = [0.0944527, 0.04799566, -0.0590973, -0.05908328, 0.04114604, 0.02695483, 0.02604318, 0.03485649]

# def construct_qcc_circuit(entanglers: list, truncation=None):
#     '''This function defines the QCC ansatz circuit for VQE. Here we construct exponential blocks using
#     entanglers from QMF state as a proof of principle demonstration.
    
#     Args:
#         entanglers: list storing Pauli words for construction of qcc_circuit.
#         backend: statevector, qasm simulator or a real backend.
#         truncation: a threshold number to truncate the blocks. Default: None.
#     Returns:
#         qcc_circuit
#     '''
#     if truncation != None:
#         if len(entanglers) > truncation:
#             num_blocks = truncation
#         else:
#             num_blocks = len(entanglers)
#     else:
#         num_blocks = len(entanglers)

#     # p = ParameterVector('p', num_blocks)
#     p = [0.0944527, 0.04799566, -0.0590973, -0.05908328, 0.04114604, 0.02695483, 0.02604318, 0.03485649]
    
#     num_qubits = len(entanglers[0])
#     qcc_circuit = QuantumCircuit(num_qubits)
#     for i in range(num_blocks):
#         circuit = QuantumCircuit(num_qubits)
#         key = entanglers[i]
#         coupler_map = []
#         # We first construct coupler_map according to the key.
#         for j in range(num_qubits):
#             if key[num_qubits-1-j] != 'I':
#                 coupler_map.append(j)
                
#         # Then we construct the circuit.
#         if len(coupler_map) == 1:
#             # there is no CNOT gate.
#             c = coupler_map[0]
#             if key[num_qubits-1-c] == 'X':
#                 circuit.h(c)
#                 circuit.rz(p[i], c)
#                 circuit.h(c)
#             elif key[num_qubits-1-c] == 'Y':
#                 circuit.rx(-np.pi/2, c)
#                 circuit.rz(p[i], c)
#                 circuit.rx(np.pi/2, c)
                
#             qcc_circuit += circuit
#         else:
#             # Here we would need CNOT gate.
#             for j in coupler_map:
#                 if key[num_qubits-1-j] == 'X':
#                     circuit.h(j)
#                 elif key[num_qubits-1-j] == 'Y':
#                     circuit.rx(-np.pi/2, j)
                    
#             for j in range(len(coupler_map) - 1):
#                 circuit.cx(coupler_map[j], coupler_map[j+1])
                
#             param_gate = QuantumCircuit(num_qubits)
#             param_gate.rz(p[i], coupler_map[-1])
            
#             #qcc_circuit += circuit + param_gate + circuit.inverse()
#             qcc_circuit.compose(circuit, inplace=True)
#             qcc_circuit.compose(param_gate, inplace=True)
#             qcc_circuit.compose(circuit.inverse(), inplace=True)
    
#     # Would the optimization level setting to 3 represent the best Qiksit optimization?
#     # trans_circuit = transpile(qcc_circuit, backend=backend, optimization_level=3)
    
#     return qcc_circuit

# def hf_circ(num_qubits):
#     hf_circuit = QuantumCircuit(num_qubits)
#     hf_circuit.x(0)
#     hf_circuit.x(3)
        
#     entanglers = ['XXXXXY', 'XXXIYI', 'IXIXXY', 'IXIIYI', 'IXXIXY', 'XXIXYI', 'IIIXIY', 'XIYIII']

#     parameterized_circuit = hf_circuit.compose(construct_qcc_circuit(entanglers))
    
#     return parameterized_circuit


# num_qubits = 6
# circuit = hf_circ(num_qubits)

# circuit.draw("mpl", fold=-1)

In [18]:
# import supermarq

# num_qubits = 6

# qiskit_test = supermarq.benchmarks.mermin_bell.MerminBell(num_qubits)
# # qiskit_test = supermarq.benchmarks.hamiltonian_simulation.HamiltonianSimulation(num_qubits, total_time=4)
# circuit = qiskit_test.qiskit_circuit()

# # # basis_gates = ["x","sx","rz","cx","rx"]
# transpiled_circ = qiskit.transpile(qiskit_test.qiskit_circuit(), basis_gates = ['x', 'y', 'rz', 's', 'h', 'cx'])
# circuit = transpiled_circ.copy()
# circuit.remove_final_measurements()
# circuit.draw("mpl", fold=-1)

In [20]:
# def hydrogen_trial_circuit(num_qubits):
#     qc = QuantumCircuit(num_qubits)
#     # prepare the Hartree-Fock state
#     qc.x(0)
#     qc.x(1)
    
#     qc.rx(np.pi/2, 0)
#     qc.h(1)
#     qc.h(2)
#     qc.h(3)
    
#     qc.cx(0,1)
#     qc.cx(1,2)
#     qc.cx(2,3)
    
#     qc.rz(1.0, 3)
    
#     qc.cx(2,3)
#     qc.cx(1,2)
#     qc.cx(0,1)
    
#     qc.rx(-np.pi/2, 0)
#     qc.h(1)
#     qc.h(2)
#     qc.h(3)
    
#     return qc

# num_qubits = 4
# circuit = hydrogen_trial_circuit(num_qubits)
# # circuit.measure_all()
# print(circuit)

In [22]:
# def ghz_mirror_circuit(num_qubits, p):
#     qc = QuantumCircuit(num_qubits)

#     for _ in range(p):
#         qc.h(0)
#         for i in range(1, num_qubits):
#             qc.cx(0, i)
#         for i in range(num_qubits-1, 0, -1):
#             qc.cx(0, i)
#         qc.h(0)++
#         # for i in range(num_qubits):
#         #     qc.x(i)
#         #     qc.x(i)
#         #     qc.x(i)
#         #     qc.x(i)
    
#     # qc.h(0)
#     # for i in range(1, num_qubits):
#     #     qc.cx(0, i)
#     # for i in range(num_qubits-1, 0, -1):
#     #     qc.cx(0, i)
#     # qc.h(0)

#     # for i in range(num_qubits):
#     #     qc.x(i)
#     #     qc.x(i)
#     #     qc.x(i)
#     #     qc.x(i)

#     # for i in range(num_qubits):
#     #     qc.x(i)
#     #     qc.x(i)
#     #     qc.x(i)
#     #     qc.x(i)

#     return qc


# p = 1
# num_qubits = 4
# circuit = ghz_mirror_circuit(num_qubits, p)
# print(circuit)

In [None]:
# from qiskit.quantum_info import random_clifford

# num_qubits = 4

# clifford_obj = random_clifford(num_qubits)
# circuit = clifford_obj.to_circuit()
# circuit.draw("mpl", fold=-1)

#### Set observable(s)

In [None]:
pauli_string = 'Z'*num_qubits
# pauli_string = 'ZIZI'
# pauli_string = 'X'*num_qubits
# pauli_string = 'XXXXXX'
# pauli_string = 'X'*num_qubits
# pauli_string = 'IIII'

#### Ideal result

In [None]:
psi = Statevector(circuit)
# print(psi)
operator = Pauli(pauli_string)
expect = np.array(psi).T.conj() @ operator.to_matrix() @ np.array(psi)
print(expect)

#### Run ZNE

In [None]:
def apply_measurement_basis(circuit, pauli_string):
    """Modify the circuit to measure in the basis specified by the Pauli string."""
    for i, pauli in enumerate(pauli_string):
        if pauli == 'X':
            circuit.h(i)
        elif pauli == 'Y':
            circuit.sdg(i)
            circuit.h(i)

def compute_expectation_value(counts, pauli_string):
    """Compute expectation value of a Pauli string observable from measurement counts."""
    total_shots = sum(counts.values())
    expectation = 0

    for bitstring, count in counts.items():
        # Reverse bitstring if needed to match Qiskit's little-endian convention
        # bitstring = bitstring[::-1]
        value = 1

        for i, pauli in enumerate(pauli_string):
            if pauli == 'I':
                continue
            elif pauli == 'Z' or pauli == 'X' or pauli == 'Y':
                # X and Y are already rotated to Z basis, so interpret as Z
                value *= 1 if bitstring[i] == '0' else -1
            else:
                raise ValueError(f"Invalid Pauli operator: {pauli}")

        expectation += value * count

    return expectation / total_shots


# def ibmq_executor(circuit: QuantumCircuit, shots: int = 10_000):
def ibmq_executor(circuit: QuantumCircuit, pauli_string: str, shots: int = 10_000):
    """Returns the expectation value of the Pauli string observable.

    Args:
        circuit: Circuit to run.
        pauli_string: String representing the Pauli operators to be measured (e.g., 'XYZI').
        shots: Number of times to execute the circuit.
        backend: Simulator or real quantum device to execute the circuit.
    """
    # Modify the circuit to measure the required Pauli observables
    measurement_circuit = circuit.copy()
    measurement_circuit.barrier()
    apply_measurement_basis(measurement_circuit, pauli_string)
    measurement_circuit.measure_all()
    # print(measurement_circuit)

    # Transpile for the backend
    exec_circuit = transpile(
        measurement_circuit,
        backend=backend,
        optimization_level=0 # Preserve gate structure for simulation accuracy.
    )

    # print("transpiled circuit")
    # print(exec_circuit)

    # Run the circuit
    job = backend.run(exec_circuit, shots=shots)
    counts = job.result().get_counts()

    # Compute the expectation value based on counts
    # expectation_value = sum((-1 if (bin(int(state, 16)).count('1') % 2) else 1) * count for state, count in counts.items()) / shots
    expectation_value = compute_expectation_value(counts, pauli_string)
    return expectation_value
    

In [None]:
unmitigated = ibmq_executor(circuit, pauli_string)

In [None]:
from functools import partial

linear_factory = zne.inference.LinearFactory(scale_factors=[1.0, 1.5, 2.0, 2.5, 3.0])
zne_executor = partial(ibmq_executor, pauli_string=pauli_string)
# mitigated = zne.execute_with_zne(circuit, zne_executor, factory=linear_factory)

mitigated = zne.execute_with_zne(circuit, zne_executor) # default

In [None]:
print(f"Unmitigated result {unmitigated:.3f}")
print(f"Mitigated result {mitigated:.3f}")

#### Run PCE

In [None]:
# num_checks = 2*num_qubits
# num_checks = num_qubits
num_checks = num_qubits//2


circs_list = []
signs_list = []
for check_id in range(1, num_checks + 1):
    print(check_id)
    sign, circ = convert_to_PCS_circ(circuit, num_qubits, check_id, only_Z=True)
    circs_list.append(circ)
    signs_list.append(sign)

print(signs_list)

In [None]:
circs_list[-1].draw("mpl", fold=-1)

#### Compute Expectation

In [None]:
# def filter_counts(counts, num_checks):
#     filtered_counts = {}
#     for state, count in counts.items():
#         if all(bit == '0' for bit in state[:num_checks]):
#             remaining_state = state[num_checks:]
#             if remaining_state in filtered_counts:
#                 filtered_counts[remaining_state] += count
#             else:
#                 filtered_counts[remaining_state] = count

#     return filtered_counts

def filter_counts(no_checks, sign_list_in, in_counts):
    """
    Adjusts for minus signs.
    """
    sign_list = deepcopy(sign_list_in)
    sign_list.reverse()
    err_free_checks = ""
    for i in sign_list:
        if i == "+1":
            err_free_checks += "0"
        else:
            err_free_checks += "1"
            
    out_counts = {}
    for key in in_counts.keys():
        if err_free_checks == key[:no_checks]:
            new_key = key[no_checks:]
            out_counts[new_key] = in_counts[key]
    return out_counts

def ibmq_executor_pcs(circuit: QuantumCircuit, pauli_string: str, num_qubits, shots: int = 10_000, signs = None):
    """Returns the expectation value of the Pauli string observable.

    Args:
        circuit: Circuit to run.
        shots: Number of times to execute the circuit.
        backend: Simulator or real quantum device to execute the circuit.
    """
    # Modify the circuit to measure the required Pauli observables
    measurement_circuit = circuit.copy()
    apply_measurement_basis(measurement_circuit, pauli_string)
    measurement_circuit.measure_all()
    # print(measurement_circuit)

    # Transpile for the backend
    exec_circuit = transpile(
        measurement_circuit,
        backend=backend,
        optimization_level=0,  # Preserve gate structure for simulation accuracy.
    )

    # print("transpiled circuit")
    # print(exec_circuit)

    # Run the circuit
    job = backend.run(exec_circuit, shots=shots)
    # print(job.result().quasi_dists)

    
    counts = job.result().get_counts()
    # print("counts: ", counts)
    # print()

    # Filter counts based on check data
    total_qubits = circuit.num_qubits
    num_checks = total_qubits - num_qubits
    # filtered_counts = filter_counts(counts, num_checks)
    filtered_counts = filter_counts(num_checks, signs, counts)
    # print("filtered_counts: ", filtered_counts)

    # Compute the expectation value based on filtered counts
    expectation_value = compute_expectation_value(filtered_counts, pauli_string)
    return expectation_value

#### Run PCS for each # of checks

In [None]:
expectation_values = []
for i in range(num_checks):
    print(i)
    pcs_circ = circs_list[i]
    signs = signs_list[i]
    expectation_value = ibmq_executor_pcs(pcs_circ, pauli_string, num_qubits = num_qubits, signs=signs)
    expectation_values.append(expectation_value)

In [None]:
print(expectation_values)

#### Extrapolate checks

In [None]:
import numpy as np
import matplotlib.pyplot as plt

num_checks_to_fit = num_checks
check_numbers = range(1, num_checks_to_fit+1)

# extrapolation_layers = range(num_checks_to_fit+1, 2*num_checks_to_fit+1)
extrapolation_layers = range(num_checks_to_fit+1, 2*num_qubits+1) # extraplate to 2n

# Fit a polynomial of degree 1 (linear fit) to the check_numbers and expectation_values
polynomial_coefficients = np.polyfit(check_numbers, expectation_values[:num_checks_to_fit], 1)
polynomial = np.poly1d(polynomial_coefficients)

extrapolated_values = polynomial(list(extrapolation_layers))

x_values = np.linspace(min(check_numbers), max(extrapolation_layers), 400)
y_values = polynomial(x_values)

# Plotting
plt.figure(figsize=(8, 5))
plt.scatter(check_numbers, expectation_values[:num_checks_to_fit], color='blue', label='Actual Data')
plt.plot(x_values, y_values, 'r--', label='Polynomial Fit')
plt.scatter(list(extrapolation_layers), extrapolated_values, color='green', label='Extrapolated Values')
plt.xlabel('Number of Checks')
plt.ylabel('Expectation Value')
plt.legend()
plt.grid(True)
plt.show()

for layer, value in zip(extrapolation_layers, extrapolated_values):
    print(f"Extrapolated expectation value at check number {layer}: {value:.4f}")

In [None]:
print(f"ZNE result {mitigated:.3f}")

# Tests

In [None]:
import random

# def generate_random_pauli_strings(num_qubits, num_observables):
#     paulis = ['I', 'X', 'Y', 'Z']
#     return [
#         ''.join(random.choice(paulis) for _ in range(num_qubits))
#         for _ in range(num_observables)
#     

def generate_random_pauli_strings(num_qubits, num_observables):
    # aulis = ['I', 'X', 'Y', 'Z']
    paulis = ['I', 'Z']
    unique_strings = set()

    # while len(unique_strings) < num_observables:
    for _ in range(num_observables):
        pauli_string = ''.join(random.choice(paulis) for _ in range(num_qubits))
        unique_strings.add(pauli_string)

    return list(unique_strings)


In [None]:
num_observables = 100
pauli_strings = generate_random_pauli_strings(num_qubits, num_observables)
# pauli_strings = ['XXXXXX', 'YYYYYY', 'XYXYXY', 'YXYXYX', 'YYYXXX', 'XXXYYY', 'ZZZZZZ', 'ZZIIII', 'IIZZII', 'IIIIZZ',
#          'XZXZXZ', 'ZXZXZX', 'ZZZXXX', 'XXXZZZ', 'IIIIXX', 'XXIIII', 'XXIIXX']

In [None]:
psi = Statevector(circuit)

ideal_expectations = []
for string in pauli_strings:
    operator = Pauli(string)
    expect = np.array(psi).T.conj() @ operator.to_matrix() @ np.array(psi)
    # print("expecation for ", string, ": ", expect)
    ideal_expectations.append(expect)

In [None]:
zne_abs_errors = []
for i, pauli_string in enumerate(pauli_strings):
    # unmitigated = ibmq_executor(circuit)
    zne_executor = partial(ibmq_executor, pauli_string=pauli_string)
    zne_exp = zne.execute_with_zne(circuit, zne_executor)
    print("zne mitigated exp for ", pauli_string, ": ", zne_exp)
    print("ideal expectation = ", ideal_expectations[i])
    abs_error = np.abs(ideal_expectations[i] - zne_exp)
    zne_abs_errors.append(abs_error)
zne_avg_error = np.mean(zne_abs_errors)
print(f"Average absolute error for ZNE: {zne_avg_error:.5f}")

In [None]:
pce_abs_errors = []
for i, pauli_string in enumerate(pauli_strings):
    print("i =", i+1, "out of", num_observables)
    # Compute expectation for each number of layers
    expectation_values = []
    for j in range(num_checks):
        # print(j)
        pcs_circ = circs_list[j]
        signs = signs_list[j]
        expectation_value = ibmq_executor_pcs(pcs_circ, pauli_string, num_qubits = num_qubits, signs=signs)
        expectation_values.append(expectation_value)

    print("expected values from implemented checks: ", expectation_values)
    # Compute exptrapolated expectation
    check_numbers = list(range(1, num_checks + 1))
    polynomial_coefficients = np.polyfit(check_numbers, expectation_values, 1)
    polynomial = np.poly1d(polynomial_coefficients)
    # target_checks = 2 * num_qubits # Extrapolate to a specific target number of checks (e.g., 2n)
    target_checks = num_qubits
    pce_exp = polynomial(target_checks)

    print("pce exp for ", pauli_string, ": ", pce_exp)
    print("ideal expectation = ", ideal_expectations[i])
    abs_error = np.abs(ideal_expectations[i] - pce_exp)
    print("abs error = ", abs_error)
    pce_abs_errors.append(abs_error)

pce_avg_error = np.mean(pce_abs_errors)
print(f"Average absolute error for PCE: {pce_avg_error:.5f}")

In [None]:
print(f"Average absolute error for ZNE: {zne_avg_error:.5f}")
print(f"Average absolute error for PCE: {pce_avg_error:.5f}")

In [None]:
import os
import csv

def save_avg_errors(circ_folder, filename, zne_avg_error, pce_avg_error):
    dir_path = os.path.join("data_PCE_vs_ZNE", circ_folder)
    os.makedirs(dir_path, exist_ok=True)  # Ensure subfolder exists

    filepath = os.path.join(dir_path, filename)
    with open(filepath, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(["Method", "Average Absolute Error"])
        writer.writerow(["ZNE", zne_avg_error])
        writer.writerow(["PCE", pce_avg_error])

circ_folder = "mermin-bell"
file_name = f"avg_errors_n={num_qubits}_num_obs={num_observables}_num_samp={10_000}.csv"
save_avg_errors(circ_folder, file_name, zne_avg_error, pce_avg_error)


# Plots

In [None]:
import os
import csv
import re
from collections import defaultdict

def load_avg_errors(data_dir, num_observables, num_samples):
    data = defaultdict(dict)  # {num_qubits: {"ZNE": ..., "PCE": ...}}

    pattern = re.compile(
        rf"avg_errors_n=(\d+)_num_obs={num_observables}_num_samp={num_samples}\.csv"
        # rf"avg_errors_n=4_p=(\d+)_num_obs={num_observables}_num_samp={num_samples}\.csv"
    )

    for filename in os.listdir(data_dir):
        match = pattern.match(filename)
        if not match:
            continue

        num_qubits = int(match.group(1))
        filepath = os.path.join(data_dir, filename)

        with open(filepath, "r") as f:
            reader = csv.reader(f)
            next(reader)  # skip header
            for row in reader:
                method, error = row
                data[num_qubits][method.upper()] = float(error)

    return data

In [None]:
import matplotlib.pyplot as plt
import os

def plot_avg_errors_by_qubit(data, save_path=None):
    num_qubits = sorted(data.keys())
    zne_errors = [data[n].get("ZNE", None) for n in num_qubits]
    pce_errors = [data[n].get("PCE", None) for n in num_qubits]

    x = range(len(num_qubits))
    width = 0.35

    fig, ax = plt.subplots()
    ax.bar([i - width/2 for i in x], zne_errors, width, label='ZNE')
    ax.bar([i + width/2 for i in x], pce_errors, width, label='PCE')

    ax.set_xlabel("Number of Qubits")
    ax.set_ylabel("Average Absolute Error")
    ax.set_title(f"Comparison to ZNE with default settings. \n # samples = {num_samples}, # observables = {num_observables}")
    ax.set_xticks(x)
    ax.set_xticklabels(num_qubits)
    ax.legend()

    plt.tight_layout()

    if save_path:
        os.makedirs(os.path.dirname(save_path), exist_ok=True)
        plt.savefig(save_path)
        print(f"Plot saved to {save_path}")
        
    plt.show()

In [None]:
data_dir = "data_PCE_vs_ZNE/mermin-bell"
num_observables = 100
num_samples = 10000

data = load_avg_errors(data_dir, num_observables, num_samples)
print(data)
plot_avg_errors_by_qubit(data, save_path=data_dir)