In [3]:
import numpy as np
import matplotlib.pyplot as plt
from qiskit_ibm_provider import IBMProvider
from qiskit.circuit import Gate, Parameter, QuantumCircuit
from qiskit import pulse, schedule
from qiskit.tools.jupyter import *
from qiskit_ibm_provider.job import job_monitor
from qiskit.pulse import Gaussian
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import train_test_split

import time
import os
import csv

import warnings
warnings.filterwarnings('ignore')

IBMProvider.save_account('d006ccf7ada69abcbd5a6c4f8421f29a2de677261f2cc1c914751b05aabd43990a31692cb75cfa1fa694c6ed58eacbaeba7d0049c9d9035a54dfa355be0b8de1', overwrite=True)

provider = IBMProvider()
backend = provider.get_backend('ibm_brisbane')


In [5]:
backend_defaults = backend.defaults()
backend_properties = backend.properties()
backend_config = backend.configuration()

In [6]:
qubit = 0
f01 = backend.properties().qubits[qubit][2].value * 1e9
anhar = backend.properties().qubits[qubit][3].value * 1e9
f12 = f01 + anhar

qubit = 0
pi = np.pi
cos = np.cos
sin = np.sin
exp = np.exp
NUM_SHOTS = 2048
scale_factor = 1e-14


In [7]:
#some useful function
def get_job_data(job, average):
    """Retrieve data from a job that has already run.
    Args:
        job (Job): The job whose data you want.
        average (bool): If True, gets the data assuming data is an average.
                        If False, gets the data assuming it is for single shots.
    Return:
        list: List containing job result data.
    """
    job_results = job.result()  # timeout parameter set to 120 s
    result_data = []
    for i in range(len(job_results.results)):
        if average:  # get avg data
            result_data.append(np.real(job_results.get_memory(i)[qubit] * scale_factor))
        else:  # get single data
            result_data.append(job_results.get_memory(i)[:, qubit] * scale_factor)
    return result_data

def fit_function(x_values, y_values, function, init_params):
    fitparams, conv = curve_fit(function, x_values, y_values, init_params)
    y_fit = function(x_values, *fitparams)
    
    return fitparams, y_fit

def reshape_complex_vec(vec):
    """Take in complex vector vec and return 2d array w/ real, imag entries. This is needed for the learning.
    Args:
        vec (list): complex vector of data
    Returns:
        list: vector w/ entries given by (real(vec], imag(vec))
    """
    length = len(vec)
    vec_reshaped = np.zeros((length, 2))
    for i in range(len(vec)):
        vec_reshaped[i] = [np.real(vec[i]), np.imag(vec[i])]
    return vec_reshaped

def discriminate(data) :
    zero_data = data[0]
    one_data = data[1]
    two_data = data[2]
    zero_data_reshaped = reshape_complex_vec(zero_data)
    one_data_reshaped = reshape_complex_vec(one_data)
    two_data_reshaped = reshape_complex_vec(two_data)
    IQ_012_data = np.concatenate((zero_data_reshaped, one_data_reshaped, two_data_reshaped))
    state_012 = np.zeros(NUM_SHOTS)  
    state_012 = np.concatenate((state_012, np.ones(NUM_SHOTS)))
    state_012 = np.concatenate((state_012, 2 * np.ones(NUM_SHOTS)))
    IQ_012_train, IQ_012_test, state_012_train, state_012_test = train_test_split(IQ_012_data, state_012, test_size=0.3)
    LDA_012 = LinearDiscriminantAnalysis()
    LDA_012.fit(IQ_012_train, state_012_train)
    return LDA_012

def count(data, discriminator):
    sched_data = []
    for i in range(len(data)):
        sched_data.append(reshape_complex_vec(data[i]))
    discrim_data = []
    for j in range(len(sched_data)):
        discrim_data.append(discriminator.predict(sched_data[j]))
        #print('predicting', j, end='\r')
    final_result = []
    for k in range(len(discrim_data)):
        result = {'0': 0, '1': 0, '2': 0}
        for l in range(len(discrim_data[k])):
            if discrim_data[k][l] == 0.0:
                result['0'] += 1
            elif discrim_data[k][l] == 1.0:
                result['1'] += 1
            elif discrim_data[k][l] == 2.0:
                result['2'] += 1
            else:
                print('Unexpected behavior')
        final_result.append(result)
    return final_result

def IQ_012_plot(x_min, x_max, y_min, y_max, data):
    """Helper function for plotting IQ plane for 0, 1, 2. Limits of plot given
    as arguments."""
    # zero data plotted in blue
    zero_data = data[0]
    one_data = data[1]
    two_data = data[2]
    alpha = 1
    size = 10
    plt.scatter(np.real(zero_data), np.imag(zero_data),
                s=size, cmap='viridis', c='blue', alpha=alpha, label=r'$|0\rangle$')
    # one data plotted in red
    plt.scatter(np.real(one_data), np.imag(one_data),
                s=size, cmap='viridis', c='red', alpha=alpha, label=r'$|1\rangle$')
    # two data plotted in green
    plt.scatter(np.real(two_data), np.imag(two_data),
                s=size, cmap='viridis', c='green', alpha=alpha, label=r'$|2\rangle$')

    # Plot a large dot for the average result of the 0, 1 and 2 states.
    mean_zero = np.mean(zero_data) # takes mean of both real and imaginary parts
    mean_one = np.mean(one_data)
    mean_two = np.mean(two_data)
    mean_alpha = 1
    mean_size = 100
    plt.scatter(np.real(mean_zero), np.imag(mean_zero),
                s=mean_size, cmap='viridis', c='black',alpha=mean_alpha)
    plt.scatter(np.real(mean_one), np.imag(mean_one),
                s=mean_size, cmap='viridis', c='black',alpha=mean_alpha)
    plt.scatter(np.real(mean_two), np.imag(mean_two),
                s=mean_size, cmap='viridis', c='black',alpha=mean_alpha)

    plt.xlim(x_min, x_max)
    plt.ylim(y_min,y_max)
    plt.legend(fontsize=15)
    plt.grid()
    plt.ylabel('I [a.u.]', fontsize=15)
    plt.xlabel('Q [a.u.]', fontsize=15)
    plt.title("0-1-2 discrimination", fontsize=15)

#mitigate
from cvxopt import matrix, solvers
def mitigated_population(p, C):
    P = matrix(np.transpose(C).dot(C))
    q = matrix(-np.transpose(p).dot(C))
    G = matrix([[-1.0,0.0,0.0],[0.0,-1.0,0.0],[0.0,0.0,-1.0]])
    h = matrix([0.0, 0.0, 0.0])
    A = matrix([1.0, 1.0, 1.0], (1,3))
    b = matrix(1.0)
    solvers.options['show_progress'] = False
    sol=solvers.qp(P, q, G, h, A, b)
    # return np.asarray(sol['x'])
    return np.array([e for e in sol['x']])

def get_population(data, discriminator):
    drag_values = count(data, discriminator)
    return np.asarray([[val['0']/NUM_SHOTS, val['1']/NUM_SHOTS, val['2']/NUM_SHOTS] for val in drag_values])

def get_mitigated_population(data, discriminator, confusion_matrix):
    population = get_population(data, discriminator)
    mitigated = [mitigated_population(pop, confusion_matrix) for pop in population]
    return np.asarray(mitigated)

from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score

#loss function
def loss_func(param, theta, phi, p_exp, N_data = 97):
  p_model = [population_model(theta[i], phi[i], *param, order) for i in range(N_data)]
  return mean_squared_error(p_model, population, squared = False)


To avoide time-consuming, we will use x pulse default of ibm, copy amplitude of this to our xpi pulse

In [8]:
calibrations = backend_defaults.instruction_schedule_map
calibrations.get('x', qubit), calibrations.get('sx', qubit)

(Schedule((0, Play(Drag(duration=120, sigma=30, beta=-0.002246983963901737, amp=0.19778947048625103, angle=0.0, name='Xp_d0'), DriveChannel(0), name='Xp_d0')), name="x"),
 Schedule((0, Play(Drag(duration=120, sigma=30, beta=-0.08003473960250763, amp=0.09881675138820192, angle=0.0002430674771130061, name='X90p_d0'), DriveChannel(0), name='X90p_d0')), name="sx"))

In [20]:
dur01 = 120
sig01 = dur01/4
amp01 = 0.19778947048625103

In [21]:
amp12 = 0.23743549524170637
dur12 = 160
sig12 = dur12/4

In [22]:
#define a pulse rotating theta around n-axis in subspace 0-1 and 1-2

def r01(theta, phi):
    with pulse.build(backend=backend, default_alignment='sequential', name=r'$R^{(01)}$') as r01_pulse:
        drive_chan = pulse.drive_channel(qubit)
        pulse.set_frequency(f01, drive_chan)
        with pulse.phase_offset(-phi, drive_chan):
            pulse.play(pulse.Gaussian(duration=dur01,
                                      amp=amp01*theta/np.pi,
                                      sigma=sig01,
                                      name=r'$X_{\pi/2}^{(01)}$'), drive_chan)

    return r01_pulse

def r12(theta, phi):
    with pulse.build(backend=backend, default_alignment='sequential',
                     name=r'$R^{(12)}$') as r12_pulse:
        drive_chan = pulse.drive_channel(qubit)
        pulse.set_frequency(f12, drive_chan)
        with pulse.phase_offset(-phi, drive_chan):
            pulse.play(pulse.Gaussian(duration=dur12,
                                      amp=amp12*theta/np.pi,
                                      sigma=sig12,
                                      name=r'$X_{\pi/2}^{(12)}$'), drive_chan)

    return r12_pulse


def phase_tracking(order):
    exp_circs = []
    params = []
    for i in range(97):
        phi = np.random.default_rng().uniform(0, pi, len(order))
        theta = np.random.default_rng().uniform(0, pi, len(order))
        
        params.append(np.concatenate((theta, phi)))
        
        qc = QuantumCircuit(1,1)
        qc.append(X90_01, [0])
        qc.append(X90_12, [0])
        qc.add_calibration(X90_01, (0,), r01(pi/2, 0), [])
        qc.add_calibration(X90_12, (0,), r12(pi/2, 0), [])
        
        i = 0
        
        for level in order:
            if level == '1':
                name = r"$R^{01}$" + fr"$(\theta_{i+1}$" + ',' + fr'$\phi_{i+1})$'
                R01 = Gate(name, 1, [])
                qc.append(R01, [0])
                qc.add_calibration(R01, (0,), r01(theta[i], phi[i]), [])
            elif level == '2':
                name = r"$R^{12}$" + fr"$(\theta_{i+1}$" + ',' + fr'$\phi_{i+1})$'
                R12 = Gate(name, 1, [])
                qc.append(R12, [0])
                qc.add_calibration(R12, (0,), r12(theta[i], phi[i]), [])
            else:
                print('Out of level')
                break
            i = i + 1
        qc.measure(0,0)
        exp_circs.append(qc)
    exp_circs = discr_circ + exp_circs
    
    return exp_circs, params

# Create the three circuits
Xpi01 = Gate("$X_{\pi}^{01}$", 1, [])
Xpi12 = Gate("$X_{\pi}^{12}$", 1, [])
X90_01 = Gate(r'$X_{\pi/2}^{(01)}$', 1, [])
X90_12 = Gate(r'$X_{\pi/2}^{(12)}$', 1, [])
# 0 state
zero = QuantumCircuit(1, 1)
zero.measure(0, 0)

# 1 state
one = QuantumCircuit(1, 1)
one.x(0)
one.measure(0, 0)

# 2 state 
two = QuantumCircuit(1, 1)
two.x(0)
two.append(Xpi12, [0])
two.measure(0, 0)
two.add_calibration(Xpi12, (0,), r12(pi, 0), [])

discr_circ = [zero, one, two]

Generalize phase tracking protocol

In [23]:
import random
def create_order(length):
    order = ''
    for i in range(length):
        order = order + str(random.randint(1,2))
    return order

def create_orders(length):
    orders = set()
    for i in range(20):
        orders.add(create_order(length))
    return orders
#orders = []
#for i in range(6, 10, 1):
    #orders += random.sample(list(create_orders(i)), k=2)

When we yse function phase_tracking, we take a string "1121221..." as input, which perform "1" as applying a Rx01 gate, and "2" as applying a Rx12 gate with rotation angle and axis angle randomized uniformly. These angles are saved as parameters to file follow synctax "order_params_date_backend", and these will helpful for you when handle data

In [24]:
order = "1221112"

In [25]:
NUM_SHOTS

2048

In [26]:
#for order in orders:
exp_circs, params = phase_tracking(order)
tg = time.localtime(time.time())
params_file = order + '_params_' + str(tg.tm_year) + '_' + str(tg.tm_mon) + '_' + str(tg.tm_mday) + '_' + backend.name + f'_{dur12}' + '.csv'
np.savetxt(params_file, params, delimiter=',')
job = backend.run(exp_circs, meas_level=1, meas_return='single', shots=NUM_SHOTS)
print(job.job_id())


cnaef24yzmv0008w9jp0


In [27]:
job = provider.backend.retrieve_job('cnaef24yzmv0008w9jp0')
job_monitor(job)

Job Status: job has successfully run
