# Waveform benchmarking
Reduced version of the script of the blog post [Randomized benchmarking in seconds](https://www.zhinst.com/blogs/randomized-benchmarking-seconds) to benchmark the compiler

It DOESN'T shows best practices to perform Randomized Benchmarking, it only benchmark certain aspects of the AWG compiler. Please refer to the original blog post and code for optimal Randomized Benchmarking strategies. 

Select the ziPython version, then reload the kernel (limitation of Jupyter)

In [None]:
#%pip install zhinst==22.2.26577
%pip install zhinst==22.2.29711 

## Import and dataserver setup

In [None]:
## Import of required modules
import zhinst.toolkit
import zhinst.ziPython

import time
import numpy as np
import json
from scipy.linalg import expm
import random
import os, os.path, tempfile
import textwrap

In [None]:
## Define the parameters of your instrument and dataserver
# The script uses a HDAWG to compare CSV and placeholder waveforms, and a SHFSG
# to compare different compiler versions

# dataserver IP address - may be localhost or any IP running the LabOne dataserver
# dataserver_host = 'localhost'
dataserver_host = 'localhost'

# HDAWG device name
# dev_hd = 'devYYYY'
dev_hd = 'dev8000'

# SHFSG device name
# dev_sg = 'devYYYYY'
dev_sg = 'dev12000'

In [None]:
#to save time, it's possible to skip the calculation of the recovery gate and put a dummy one
#just for benchmarking of the instruments, not for operation
real_rb = False

In [None]:
#connect to the devices
session = zhinst.toolkit.Session(dataserver_host)
devicehd = session.connect_device(dev_hd)
devicesg = session.connect_device(dev_sg)

# Randomized Benchmarking definitions

## Clifford gates

#### amplitudes (in units of max amplitude) and length (in s) for basic gates 

In [None]:
pi_amplitude = 1.0
pi_length = 240e-9
pi2_amplitude = 0.5
pi2_length = 240e-9

#### define an envelope function for single qubit gates - here: Gaussian with width = length / 3

In [None]:
def pulse_envelope(amplitude, length, phase, sigma=1/3, sample_rate=2.4e9, tol=15):
    #sigma = 1/3
    # ensure waveform length is integer multiple of 16
    samples = round(sample_rate * length / 16) * 16
    x = np.linspace(-1, 1, samples)
    # output is complex, where phase later determines the gate rotation axis
    y = amplitude * np.exp(-x**2 / sigma**2 + 1j * np.deg2rad(phase))
    
    return y.round(tol)

#### build the clifford gates out of the elementary single qubit gates

In [None]:
# all elements of the Clifford group, according to defintion in arXiv:1410.2338
clifford_params = [
    ['I'],
    ['Y/2', 'X/2'],
    ['-X/2', '-Y/2'],
    ['X'],
    ['-Y/2', '-X/2'],
    ['X/2', '-Y/2'],
    ['Y'],
    ['-Y/2', 'X/2'],
    ['X/2', 'Y/2'],
    ['X', 'Y'],
    ['Y/2', '-X/2'],
    ['-X/2', 'Y/2'],
    ['Y/2', 'X'],
    ['-X/2'],
    ['X/2', '-Y/2', '-X/2'],
    ['-Y/2'],
    ['X/2'],
    ['X/2', 'Y/2', 'X/2'],
    ['-Y/2', 'X'],
    ['X/2', 'Y'],
    ['X/2', '-Y/2', 'X/2'],
    ['Y/2'],
    ['-X/2', 'Y'],
    ['X/2', 'Y/2', '-X/2']
]

clifford_len = len(clifford_params)

# parameters of basic single qubit pulses
pulses_params = {
    'I': {'amplitude':0.0, 'length': pi_length, 'phase': 0.0},
    'X': {'amplitude':pi_amplitude, 'length': pi_length, 'phase': 0.0},
    'Y': {'amplitude':pi_amplitude, 'length': pi_length, 'phase': 90.0},
    'X/2': {'amplitude':pi2_amplitude, 'length': pi2_length, 'phase': 0.0},
    'Y/2': {'amplitude':pi2_amplitude, 'length': pi2_length, 'phase': 90.0},
    '-X/2': {'amplitude':pi2_amplitude, 'length': pi2_length, 'phase': 0.0-180.0},
    '-Y/2': {'amplitude':pi2_amplitude, 'length': pi2_length, 'phase': 90.0-180.0},
}

# calculate complex waveforms for single qubit elementary pulses
pulses_waves = {pulse_type: pulse_envelope(**pulse_param) for (pulse_type, pulse_param) in pulses_params.items()}
# calculate complex waveforms for each of the Clifford gates
clifford_waves = [np.concatenate([pulses_waves[i] for i in clifford_gate]) for clifford_gate in clifford_params]

# divide real and complex part of waveforms into I and Q channel outputs
clifford_waves_real = [(np.real(wave), np.imag(wave)) for wave in clifford_waves]

#create zhinst-toolkit Waveforms 
wfm_clifford = zhinst.toolkit.Waveforms()
for i,wfm in enumerate(clifford_waves):
    wfm_clifford[i] = wfm

#### basic definitions to manipulate Clifford gates - needed for recovery gate calculation

In [None]:
def pauli(ind = 'x'):
    """pauli matrices
    """
    if ind =='x':
        res = np.array([[0,1], [1,0]])
    if ind =='y':
        res = np.array([[0,-1j], [1j,0]])
    if ind =='z':
        res = np.array([[1,0], [0,-1]])
        
    return res

def rot_matrix(angle=np.pi, axis='x'):
    """general definition of rotation unitary for a single qubit
    """
    return expm(-1j * angle / 2 * pauli(axis))

def mult_gates(gates, use_linalg=False, tol=20):
    """multiply a variable number of gates / matrices - recursive definition faster for simple 2x2 matrices
    """
    if len(gates) > 1:
        if use_linalg:
            res = np.linalg.multi_dot(gates)
        else:
            res = np.matmul(gates[0], mult_gates(gates[1:], use_linalg=False, tol=tol))
    elif len(gates) == 1:
        res = gates[0]
    
    return res.round(tol)

# generate matrix representation of all Clifford gates from elementary gates
elem_gates = {'I': np.array([[1,0],[0,1]]),
              'X': rot_matrix(np.pi, 'x'),
              'Y': rot_matrix(np.pi, 'y'),
              'X/2': rot_matrix(np.pi / 2, 'x'),
              'Y/2': rot_matrix(np.pi / 2, 'y'),
              '-X/2': rot_matrix(-np.pi / 2, 'x'),
              '-Y/2': rot_matrix(-np.pi / 2, 'y')}

clifford_matrices = [[elem_gates[gate] for gate in gates] for gates in clifford_params]
clifford_gates = [mult_gates(matrices) for matrices in clifford_matrices]

def glob_phase(phase, dim=2):
    """global phase operator for dimensionality dim
    """
    return np.exp(1j * phase) * np.identity(dim)

def match_up_to_phase(target, gates, dim=2, verbose=False):
    """finds the element of the list gates that best matches the target gate up to a global phase of integer multiples of pi
    """
    # set of global phase operators for integer multiples of pi
    glob_phases = [glob_phase(0, dim), glob_phase(np.pi, dim)]
    # gates up to global phases
    gates_2 = [[mult_gates([gate1, gate2]) for gate2 in glob_phases] for gate1 in gates]
    # index of gate that is closest to target up to global phase (using frobenius norm)
    match_index = np.argmin([np.amin([np.linalg.norm(target - gate) for gate in gates]) for gates in gates_2])
    
    return match_index

#### function to calculate the last gate in the sequence - recovery gate which leads back to initial state (up to global phase)

In [None]:
def calculate_inverse_clifford(clifford_list, gates=clifford_gates):
    """Calculates the final recovery clifford gate

    Parameters:
    clifford_list: list
        a list containing the indices of the clifford sequence to be inverted
    gate: list
        a list containing the gates to compare the recovery gate to
    """
    #return a dummy if only benchmarking of the system
    if not real_rb:
        return 0
    
    # matrix representation of Clifford sequence
    seq_gate = mult_gates([gates[gate] for gate in clifford_list])
    # recovery gate - inverse of full sequence
    rec_gate = np.linalg.inv(seq_gate)
    # index of recovery gate (up to global phase)
    recovery = int(match_up_to_phase(rec_gate, clifford_gates))
    
    return recovery

## Compiler Utilities
A series of utility functions, to compile sequences and upload them.

Not advised for operational use, only for benchmarking to clearly separate each step.

Please use the functions provied by zhinst.toolkit in real life.

In [None]:
def create_awg_mod(awg):
    """Create an AWG moduel
    
    Args:
        awg (AWG): The AWG node of the instrument 
    
    Returns:
        AwgModule: the AWG module
    """
    
    awg_mod = awg._session.modules.create_awg_module()
    raw_awg = awg_mod.raw_module
    awg_mod.device(awg._serial)
    awg_mod.index(awg._index)
    awg_mod.compiler.upload(False)
    raw_awg.execute()
    return awg_mod


def get_fn(awg_mod, fn_base="awg_default.elf", absolute=True):
    """Get the ELF filename

    Args:
        awg_mod (AwgModule): The AWG Module
        fn_base (str, optional): The 'base' filename
        absolute (bool, optional): If True, return the full absolute path. Otherwise 
        only the reduce name to be passed to the AWG module. Defaults to True.

    Returns:
        str: the ELF filename
    """

    if absolute:
        base_dir = awg_mod.directory()
        device = awg_mod.device().serial
        index = awg_mod.index()
        return os.path.normpath(
            os.path.join(base_dir, "awg/elf", f"{device:s}_{index:d}_{fn_base:s}")
        )
    else:
        return fn_base

def compile_sequences(awg_mod, seqc, index=0):
    """Compile a sequence

    Args:
        awg_mod (AwgModule): The AWG Module needed for the compilation
        seqc (str): The sequence
        index (int, optional): The sequence number. It will be used for the filename
    """
    
    fn = get_fn(awg_mod, fn_base=f"awg_{index:d}.elf", absolute=False)
    awg_mod.elf.file(fn)
    awg_mod.compiler.sourcestring(seqc)
    while True:
        status = awg_mod.compiler.status()
        if status == 0:
            while awg_mod.compiler.sourcestring() != '':
                time.sleep(0.001)
            break
        elif status == 1:
            raise RuntimeError("Compilation Error: " + awg_mod.compiler.statusstring())
            break
        elif status == 2:
            raise RuntimeWarning("Compilation warning: " + awg_mod.compiler.statusstring())
            break

        time.sleep(0.001)

def upload_sequence(awg, awg_mod, index=0):
    """Upload a sequence to the device

    Args:
        awg_mod (AwgModule): The AWG Module used for the compilation
        awg (AWG): The AWG node of the instrument 

        index (int, optional): The sequence number. It will be used for the filename
    """
    fn = get_fn(awg_mod, fn_base=f"awg_{index:d}.elf", absolute=True)
    with open(fn, "rb") as f:
        data = np.frombuffer(f.read(), dtype=np.uint32)
        awg.elf.data(data)


## Benchmark 1: Compare wavewform definition type
Use a RB traditional method: define the whole sequence as a flat waveform, uploaded at each RB step.

We compare CSV textual waveforms against directly injected waveforms defined in the sequence as placeholder.

#### generate full two-channel waveform from Clifford definition - including digital modulation, if required

In [None]:
def generate_wave(clifford_sequence, freq=10e6, iq_modulation=True, pad_zero=None, sample_rate=2.4e9):
    # wave_real - zero for Y-pulse, non-zero for X-pulse - negative amplitude flips by pi
    wave_real = np.concatenate([clifford_waves_real[i][0] for i in clifford_sequence])
    # wave_imag - zero for X-pulse, non-zero for Y-pulse - negative amplitude flips by pi
    wave_imag = np.concatenate([clifford_waves_real[i][1] for i in clifford_sequence])
    
    if pad_zero is not None:
        pad_len = pad_zero - wave_real.size
        if pad_len > 0:
            wave_real = np.pad(wave_real, (0, pad_len), 'constant', constant_values=0)
            wave_imag = np.pad(wave_imag, (0, pad_len), 'constant', constant_values=0)
    
    wave_len = wave_real.size
        
    if iq_modulation:
        wave_time = wave_len / sample_rate
        wave_sin = np.sin(2 * np.pi * freq * np.linspace(0, wave_time, wave_len))
        wave_cos = np.cos(2 * np.pi * freq * np.linspace(0, wave_time, wave_len))
        # X-pulse: sin on I, cos on Q / Y-pulse: -cos on I, sin on Q
        wave_I = wave_sin * wave_real + wave_cos * wave_imag
        wave_Q = -wave_cos * wave_real + wave_sin * wave_imag
    else:
        wave_I = wave_real
        wave_Q = wave_imag
        
    return [(wave_I, wave_Q)]    

#### define the seqC program as string, with length of waveform as parameter

In [None]:
# define the seqC program as string
def rb_program_flat_wfm(wave_len, num_Averages, csv=False):
    if csv:
        wfm_decl = textwrap.dedent(f"""\
        //Waveform definition - two-channel waveform, from CSV
        wave wI = "wI";
        wave wQ = "wQ";
        """)
    else:
        wfm_decl = textwrap.dedent(f"""\
        //Waveform definition - two-channel waveform, with placeholders
        wave wI = placeholder({wave_len:d});
        wave wQ = placeholder({wave_len:d});
        assignWaveIndex(wI, wQ, 0);
        """)
        
    res = wfm_decl + textwrap.dedent(f"""\

    // send a trigger at start of sequence
    setTrigger(3);
    wait(5);
    setTrigger(0);
    
    repeat ({num_Averages}) {{
        //execute random sequence from single waveform
        playWave(1,2, wI, 1,2, wQ);
    }}
    """)

    return res

#### pre-generate and upload full waveform including modulation - variable waveform length, requiring recompilation at every step

In [None]:
def bench_flat_wfm(n,k,awg,csv):
    """Run a benchmark for the compiler and upload speed.
    Uses a RB sequence expressed as flat waveform

    Args:
        n (int): number of different sequence RB lengths (exponential)
        k (int): number of different random sequences per length
        awg (AWG): The AWG node
        csv (bool): If the waveform is saved as CSV (True) or as directly injectd into a placeholder (False)
        
    Returns:
        np.array: Compile time, per iterations
        np.array: Upload time, per iterations
    """
    
    # include IQ_modulation?
    iq_modulation = True
    freq = 10e6

    # number of averages / repetitions
    num_Averages = 2**0
    
    #Create an AWG module and use a temporary directory
    with tempfile.TemporaryDirectory() as filedir:
        awg_mod = create_awg_mod(awg)
        awg_mod.directory(filedir)
        
        #Create sub-directories for waves
        os.mkdir(os.path.join(filedir, "awg/"))
        os.mkdir(os.path.join(filedir, "awg/waves/"))

        start = time.perf_counter_ns()
        compile_time = []
        upload_time = []

        i = 0
        # iterate over sequence lengths
        for len_exp in range(1,n+1):
            # define sequence length = 2^n
            M = 2**len_exp

            # iterate over different random sequences
            for rand_i in range(k):
                # Generate a RB sequence as a sequence of random Clifford indices
                gates_M1 = [random.randrange(0,24) for i in range(M)]
                # find recovery gate
                gate_M = calculate_inverse_clifford(gates_M1)
                # full sequence
                gates_M = gates_M1 + [gate_M]

                #Generate the waveforms
                wave_all = generate_wave(gates_M, freq=freq, iq_modulation=iq_modulation)
                wave_len = wave_all[0][0].size

                #Generate the sequence
                seqc = rb_program_flat_wfm(wave_len, num_Averages, csv=csv)

                compile_start = time.perf_counter_ns()

                # if csv wfm are used, save them on disk
                # otherwise calculate the raw vector
                if csv:
                    np.savetxt(os.path.join(filedir, "awg/waves/wI.csv"), wave_all[0][0])
                    np.savetxt(os.path.join(filedir, "awg/waves/wQ.csv"), wave_all[0][1])
                else:
                    wfm = zhinst.toolkit.Waveforms()
                    wfm[0] = wave_all[0]
                    wfm_raw = wfm.get_raw_vector(0)
                

                #compile the sequence
                i += 1
                compile_sequences(awg_mod, seqc, i)
                compile_stop = time.perf_counter_ns()

                #Upload the compiled sequence and associated waveforms
                with awg.root.set_transaction():
                    upload_sequence(awg, awg_mod,i)

                    #upload separatly the waveform, if not csv
                    if not csv:
                        awg.waveform.waves[0](wfm_raw)

                upload_stop = time.perf_counter_ns()

                upload_time.append(upload_stop - compile_stop)
                compile_time.append(compile_stop - compile_start)

    tot_time = time.perf_counter_ns() - start

    upload_time = np.array(upload_time) * 1e-9
    compile_time = np.array(compile_time) * 1e-9

    print(f'Total time: {tot_time*1e-9:.3f}s')
    print(f'Per iteration time (average): {tot_time/(k*n)*1e-9:.3f}s')
    
    return compile_time, upload_time

In [None]:
#benchmark parameters
n = 14
k = 10

In [None]:
#Placeholders
compile_time_placeholder, upload_time_placeholder = bench_flat_wfm(n=n,k=k,awg=devicehd.awgs[0], csv=False)

Total time: 74.152s
Per iteration time (average): 0.530s


In [None]:
#CSV
compile_time_csv, upload_time_csv = bench_flat_wfm(n=n,k=k,awg=devicehd.awgs[0], csv=True)

Total time: 1926.003s
Per iteration time (average): 13.757s


In [None]:
#save results for plot
np.savetxt('params.txt', [n,k], fmt = '%d')
np.savetxt('compile_time_csv.txt', compile_time_csv)
np.savetxt('compile_time_placeholder.txt', compile_time_placeholder)
np.savetxt('upload_time_csv.txt', upload_time_csv)
np.savetxt('upload_time_placeholder.txt', upload_time_placeholder)

## Benchmark 2: Compare compiler versions
Benchmark to compare different compiler versions. We define a RB sequence where each Clifford gate is defined as a command table entry, and the sequence is defined as list of calls to executeTableEntry.

We compare the AWG compiler version 22.02 patch-0 (22.2.26577) and patch-1 (22.2.29711).

Please note that the notebook should be restarted to load a new library version.

In [None]:
# define the seqC program as string
def rb_program_ct_list(gates, num_Averages):
    # Waveform definition - allocating the waveforms for each clifford
    waveforms_def = str()
    for i,wave in enumerate(clifford_waves):
        wave_len = len(wave)
        waveforms_def += f"assignWaveIndex(placeholder({wave_len:d}), placeholder({wave_len:d}), {i:d});\n"

    # define the seqC program as string
    hd_rb_program = textwrap.dedent(f"""\
    //Waveforms definition
    """)
    
    hd_rb_program += waveforms_def
    
    hd_rb_program += textwrap.dedent(f"""
    //Loop
    repeat ({num_Averages:d}) {{
    """)
    
    for gate in gates:
        hd_rb_program += f"\texecuteTableEntry({gate:d});\n"

    hd_rb_program += "}"
    
    return hd_rb_program


#Simple command table with all the cliffords
def get_ct_cliff(awg):
    ct_schema = awg.commandtable.load_validation_schema()
    ct = zhinst.toolkit.CommandTable(ct_schema)
    for i in range(24):
        ct.table[i].waveform.index = i
    return ct

In [None]:
def bench_elements(n,k,awg):
    """Run a benchmark for the compiler and upload speed.
    Uses a RB sequence expressed as serie of command table executions

    Args:
        n (int): number of different sequence RB lengths (exponential)
        k (int): number of different random sequences per length
        awg (AWG): The AWG node
        
    Returns:
        np.array: Compile time, per iterations
        np.array: Upload time, per iterations
    """
    
    # number of averages / repetitions
    num_Averages = 2**0    
    
    #get the command table
    ct = get_ct_cliff(awg)
    cts = np.frombuffer(json.dumps(ct.as_dict()).encode('utf-8'), dtype=np.uint32)
    
    #Create an AWG module and use a temporary directory
    with tempfile.TemporaryDirectory() as filedir:
        awg_mod = create_awg_mod(awg)
        awg_mod.directory(filedir)

        start = time.perf_counter_ns()
        compile_time = []
        upload_time = []

        i = 0
        # iterate over sequence lengths
        for len_exp in range(1,n+1):
            # define sequence length = 2^n
            M = 2**len_exp

            # iterate over different random sequences
            for rand_i in range(k):
                # Generate a RB sequence as a sequence of random Clifford indices
                gates_M1 = [random.randrange(0,24) for i in range(M)]
                # find recovery gate
                gate_M = calculate_inverse_clifford(gates_M1)
                # full sequence
                gates_M = gates_M1 + [gate_M]

                #Generate the sequence
                seqc = rb_program_ct_list(gates_M, num_Averages)

                compile_start = time.perf_counter_ns()

                #compile the sequence
                i += 1
                compile_sequences(awg_mod, seqc, i)
                compile_stop = time.perf_counter_ns()

                #Upload the compiled sequence, associated waveforms and the command tale
                with awg.root.set_transaction():
                    upload_sequence(awg, awg_mod, i)

                    #upload the waveforms
                    for i in wfm_clifford:
                        awg.waveform.waves[i](wfm_clifford.get_raw_vector(i))
                        
                    #Upload the CT
                    awg.commandtable.data(cts)

                upload_stop = time.perf_counter_ns()

                upload_time.append(upload_stop - compile_stop)
                compile_time.append(compile_stop - compile_start)

    tot_time = time.perf_counter_ns() - start

    upload_time = np.array(upload_time) * 1e-9
    compile_time = np.array(compile_time) * 1e-9


    print(f'Total time: {tot_time*1e-9:.3f}s')
    print(f'Per iteration time: {tot_time/(k*n)*1e-9:.3f}s')
    
    return compile_time, upload_time

In [None]:
compile_time_el, upload_time_el = bench_elements(n,k,devicesg.sgchannels[0].awg)

Total time: 9.998s
Per iteration time: 0.071s


In [None]:
ziPython_ver = zhinst.ziPython.__version__.replace('.','_')
np.savetxt(f'compile_time_el_{ziPython_ver:s}.txt', compile_time_el)
np.savetxt(f'upload_time_el_{ziPython_ver:s}.txt', upload_time_el)