In [None]:
import os
import sys
import argparse
import pickle
import random
from copy import deepcopy
from datetime import datetime

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy.optimize import curve_fit
from qiskit_experiments.library import StandardRB, InterleavedRB

from qiskit import (
    QuantumCircuit, 
    QuantumRegister, 
    ClassicalRegister, 
    pulse) 
# This is where we access all of our Pulse features!
from qiskit.circuit import Parameter, Gate
from qiskit.circuit.library import XGate
from qiskit.pulse import Delay,Play
# This Pulse module helps us build sampled pulses for common pulse shapes
from qiskit.pulse import library as pulse_lib
# from qiskit.providers.ibmq.managed import IBMQJobManager
# from qiskit_ibm_provider import IBMProvider
from qiskit_ibm_runtime import QiskitRuntimeService, SamplerV2 as Sampler
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager

current_dir = os.getcwd()
# current_dir = os.path.dirname(__file__)
package_path = os.path.abspath(current_dir)
sys.path.insert(0, package_path)

from utils.run_jobs import run_jobs
import common.pulse_types as pt


In [None]:
pulse_dict = {
    "gauss": [pt.Gaussian, pt.LiftedGaussian],
    "lor": [pt.Lorentzian, pt.LiftedLorentzian],
    "lor2": [pt.Lorentzian2, pt.LiftedLorentzian2],
    "lor3": [pt.Lorentzian3, pt.LiftedLorentzian3],
    "sq": [pt.Constant, pt.Constant],
    "sech": [pt.Sech, pt.LiftedSech],
    "sech2": [pt.Sech2, pt.LiftedSech2],
    "sin": [pt.Sine, pt.Sine],
    "sin2": [pt.Sine2, pt.Sine2],
    "sin3": [pt.Sine3, pt.Sine3],
    "sin4": [pt.Sine4, pt.Sine4],
    "sin5": [pt.Sine5, pt.Sine5],
    "demkov": [pt.Demkov, pt.LiftedDemkov],
}

In [None]:
def make_all_dirs(path):
    folders = path.split("/")
    for i in range(2, len(folders) + 1):
        folder = "/".join(folders[:i])
        if not os.path.isdir(folder):
            os.mkdir(folder)

def get_calib_params(
    backend, qubit,
    pulse_type, 
    sigma, duration,
    remove_bg
):
    file_dir = os.getcwd()
    # file_dir = os.path.split(file_dir)[0]
    calib_dir = os.path.join(file_dir, "calibrations", backend, str(qubit))
    params_file = os.path.join(calib_dir, "actual_params.csv")
    print(params_file)
    if os.path.isfile(params_file):
        param_df = pd.read_csv(params_file)
    df = param_df[param_df.apply(
            lambda row: row["pulse_type"] == pulse_type and \
                row["duration"] == duration and \
                row["sigma"] == sigma and \
                row["rb"] == remove_bg, axis=1)]
    # print(df)
    idx = 0 
    if df.shape[0] > 1:
        idx = input("More than one identical calibrations found! "
                    "Which do you want to use? ")
    elif df.shape[0] < 1:
        raise ValueError("No entry found!")
    ser = df.iloc[idx]
    l = ser.at["l"]
    p = ser.at["p"]
    x0 = ser.at["x0"]

    return l, p, x0

def initialize_backend(backend):
    backend_full_name = "ibm_" + backend 
    GHz = 1.0e9 # Gigahertz
    MHz = 1.0e6 # Megahertz
    us = 1.0e-6 # Microseconds
    ns = 1.0e-9 # Nanoseconds
    qubit = 0
    mem_slot = 0

    drive_chan = pulse.DriveChannel(qubit)
    # meas_chan = pulse.MeasureChannel(qubit)
    # acq_chan = pulse.AcquireChannel(qubit)
    
    backend_name = backend
    # provider = IBMQ.load_account()
    backend = QiskitRuntimeService(channel="ibm_quantum").backend(backend_full_name)
    print(f"Using {backend_name} backend.")
    pm = generate_preset_pass_manager(optimization_level=0, backend=backend)
    backend_defaults = backend.defaults()
    backend_config = backend.configuration()

    center_frequency_Hz = backend_defaults.qubit_freq_est[qubit]# 4962284031.287086 Hz
    num_qubits = backend_config.n_qubits

    q_freq = [backend_defaults.qubit_freq_est[q] for q in range(num_qubits)]
    dt = backend_config.dt

    return backend, pm, drive_chan, num_qubits, q_freq

def fit_function(
    x_values, 
    y_values, 
    function, 
    init_params,
    bounds
):
    fitparams, conv = curve_fit(
        function, 
        x_values, 
        y_values, 
        init_params, 
        maxfev=100000, 
        bounds=bounds
    )
    y_fit = function(x_values, *fitparams)
    
    return fitparams, y_fit

def linear_func(x, a, b):
    return a * x + b

In [None]:
def add_circ(amp, duration, sigma, qubit=0):
    with pulse.build(backend=backend, default_alignment='sequential', name="calibrate_area") as sched:
        dur_dt = duration
        pulse.set_frequency(rough_qubit_frequency, drive_chan)
        if pulse_type == "sq" or "sin" in pulse_type:
            pulse_played = pulse_dict[pulse_type][remove_bg](
                duration=dur_dt,
                amp=amp,
                name=pulse_type
            )
        elif pulse_type == "gauss":
            pulse_played = pulse_dict[pulse_type][remove_bg](
                duration=dur_dt,
                amp=amp,
                name=pulse_type,
                sigma=sigma
            )
        elif pulse_type in ["lor", "lor2", "lor3"]:
            pulse_played = pulse_dict[pulse_type][remove_bg](
                duration=dur_dt,
                amp=amp,
                name=pulse_type,
                sigma=sigma,
            )
        elif pulse_type == "drag":
            pulse_played = pulse_dict[pulse_type][remove_bg](
                duration=dur_dt,
                amp=amp,
                beta=1,
                name=pulse_type,
                sigma=sigma,
            )
        elif pulse_type == "ipN":
            pulse_played = pulse_dict[pulse_type][remove_bg](
                duration=dur_dt,
                amp=amp,
                N=N,
                name=pulse_type
            )
        elif pulse_type == "fcq":
            pulse_played = pulse_dict[pulse_type][remove_bg](
                duration=dur_dt,
                amp=amp,
                beta=beta,
                name=pulse_type
            )
        else:
            pulse_played = pulse_dict[pulse_type][remove_bg](
                duration=dur_dt,
                amp=amp,
                name=pulse_type,
                sigma=sigma,
            )
        pulse.play(pulse_played, drive_chan)
    pi_gate = Gate("rabi", 1, [])
    base_circ = QuantumCircuit(qubit+1, 1)
    base_circ.append(pi_gate, [qubit])
    base_circ.measure(qubit, 0)
    base_circ.add_calibration(pi_gate, (qubit,), sched, [])
    return base_circ


In [None]:
pulse_type, qubit = "sq", 46
num_T, T_min, T_max, num_amp, amp_min, amp_max = 8, 100, 100000, 100, 0, 0.4
remove_bg, max_experiments_per_job = 0, 100
num_shots, num_exp, backend = 256, 100, "sherbrooke"
l, p, x0 = 50, 0.5, 0
N, beta = 1, 0
save = 1

In [None]:
l, p, x0 = get_calib_params(
    backend, qubit,
    pulse_type, 
    sigma, duration,
    remove_bg
)
backend, pm, drive_chan, num_qubits, q_freq = initialize_backend(backend)
circs = [add_circ(a, duration, sigma, qubit=qubit) for a in amplitudes]
pm_circs = pm.run(circs)
values, job_ids = run_jobs(pm_circs, backend, duration, num_shots_per_exp=num_shots_per_exp, pm=pm)


In [None]:
# print(amplitudes, np.real(pi_sweep_values))
plt.figure(3)
plt.scatter(amplitudes, np.real(values), color='black') # plot real part of sweep values
plt.title("Rabi Calibration Curve")
plt.xlabel("Amplitude [a.u.]")
plt.ylabel("Transition Probability")
if save:
    plt.savefig(os.path.join(save_dir, date.strftime("%H%M%S") + f"_{pulse_type}_dur_{duration}_s_{int(sigma)}_N_{N}_beta_{beta}_areacal.png"))


In [None]:
datapoints = np.vstack((amplitudes, np.real(values)))
with open(os.path.join(data_folder, f"area_calibration_{date.strftime('%H%M%S')}.pkl"), "wb") as f:
    pickle.dump(datapoints, f)


In [None]:
## fit curve
def fit_function(x_values, y_values, function, init_params):
    try:
        fitparams, conv = curve_fit(
            function, 
            x_values, 
            y_values, 
            init_params, 
            maxfev=100000, 
            bounds=(
                [-0.6, 1, 0, -0.025, 0.4], 
                [-0.40, 1e4, 100, 0.025, 0.6]
            )
        )
    except ValueError:
        return 100000, 100000
    y_fit = function(x_values, *fitparams)

    return fitparams, y_fit

def mae_function(x_values, y_values, function, init_params):
    return np.sum(np.abs(fit_function(x_values, y_values, function, init_params)[1] - y_values))

max_l = 10000
mae_threshold = 0.6
ls = np.arange(1, 33)**2
ls = ls[ls >= l]
for current_l in ls:
    if mae_function(
        amplitudes[: fit_crop_parameter], 
        np.real(values[: fit_crop_parameter]), 
        lambda x, A, l, p, x0, B: A * (np.cos(l * (1 - np.exp(- p * (x - x0))))) + B, 
        [-0.47273362, current_l, p, x0, 0.47747625]
    ) < mae_threshold:
        l = current_l
        break

rabi_fit_params, _ = fit_function(            
    amplitudes[: fit_crop_parameter], 
    np.real(values[: fit_crop_parameter]), 
    lambda x, A, l, p, x0, B: A * (np.cos(l * (1 - np.exp(- p * (x - x0))))) + B, 
    [-0.47273362, l, p, 0, 0.47747625] 
)

print(rabi_fit_params)

In [None]:
A, l, p, x0, B = rabi_fit_params
pi_amp = -np.log(1 - np.pi / l) / p + x0 #np.pi / (k)
half_amp = -np.log(1 - np.pi / (2 * l)) / p + x0 #np.pi / (k)

detailed_amps = np.arange(amplitudes[0], amplitudes[-1], amplitudes[-1] / 2000)
extended_y_fit = A * (np.cos(l * (1 - np.exp(- p * (detailed_amps - x0))))) + B

## create pandas series to keep calibration info
param_dict = {
    "date": [current_date],
    "time": [time],
    "pulse_type": [pulse_type],
    "A": [A],
    "l": [l],
    "p": [p],
    "x0": [x0],
    "B": [B],
    "pi_amp": [pi_amp],
    "half_amp": [half_amp],
    "drive_freq": [center_frequency_Hz],
    "duration": [duration],
    "sigma": [sigma],
    "rb": [int(remove_bg)],
    "N": [N],
    "beta": [beta],  
    "job_id": None # [",".join(job_ids)]
}
print(param_dict)
if save:
    with open(os.path.join(data_folder, f"fit_params_area_cal_{date.strftime('%H%M%S')}.pkl"), "wb") as f:
        pickle.dump(param_dict, f)

    new_entry = pd.DataFrame(param_dict)
    params_file = os.path.join(calib_dir, "actual_params.csv")
    if os.path.isfile(params_file):
        param_df = pd.read_csv(params_file)
        param_df = add_entry_and_remove_duplicates(param_df, new_entry)
        # param_df = pd.concat([param_df, param_series.to_frame().T], ignore_index=True)
        param_df.to_csv(params_file, index=False)
    else:
        new_entry.to_csv(params_file, index=False)

In [None]:
plt.figure(4)
plt.scatter(amplitudes, np.real(values), color='black')
plt.plot(detailed_amps, extended_y_fit, color='red')
plt.xlim([min(amplitudes), max(amplitudes)])
plt.title("Fitted Rabi Calibration Curve")
plt.xlabel("Amplitude [a.u.]")
plt.ylabel("Transition Probability")
# plt.ylabel("Measured Signal [a.u.]")
if save:
    plt.savefig(os.path.join(save_dir, date.strftime("%H%M%S") + f"_{pulse_type}_N_{N}_beta_{beta}_pi_amp_sweep_fitted.png"))

plt.show()