In [31]:
# UI and debugging
from termcolor import colored

# binary
import struct

# Protocol
from qiskit_aer import AerSimulator
from qiskit import QuantumCircuit, transpile
from qiskit import QuantumRegister, ClassicalRegister
from qiskit.quantum_info import Pauli
from math import ceil
import numpy as np
import random
import math
import copy


# Visualization
from qiskit.visualization import plot_circuit_layout, circuit_drawer, plot_bloch_multivector
from pylatexenc.latexencode import unicode_to_latex
from qiskit.quantum_info import Statevector

In [32]:
# Prints debug message (if _dbg_ == mode)
def DBG(msg, mode):
    global _dbg_
    if _dbg_ >= mode:
        print("\n" + colored("[DBG]", "black", "on_yellow", attrs=["bold"]) + "\t" + msg)


# Like DBG, but without icon
def LST(msg, mode):
    global _dbg_
    if _dbg_ >= mode:
        print("\n\t" +  msg)
            

# Prints error message if is_error == True; print success message otherwise
def ERR(msg_err, msg_suc, is_error):
    global _dbg_
    if _dbg_:
        if is_error:
            print("\n" + colored("[ERR]", "black", "on_red", attrs=["bold", "blink"]) + "\t" +  msg_err)
            #raise Exception("User Defined Error")
        else:
            print("\n" + colored("[ "+u'\u2713'+" ]", "black", "on_green", attrs=["bold", "blink"]) + "\t" +  msg_suc)


# New task
def NEW(msg):
    print("\n\n\n" + colored("[NEW]", "black", "on_cyan", attrs=["bold"]) + "\t" + msg)


# Used for uninitialized variables
def NIL(var="variable"):  # var is name of unitialized variable
    ERR(var + " is unitialized! Exiting...", "", True) 

In [33]:
# Step 1.1 (compute numbers of EPR pairs needed for message, eavesdrop checks, and total)
def get_EPR_counts(message, pe, m, m_msg, m_err1, m_err2):
    # first eavesdrop pairs
    for m in range(len(message)*2):
        m_err1 = ceil((1-pe)/2*m)
        if m - m_err1 >= len(message) and (m - m_err1)%2 == 0:
            m_msg = m
            break
    m_msg = m - m_err1
    # second eavesdrop pairs
    for m in range((len(message))*2):
        m_err2 = ceil((1-pe)/2*m)
        if m - (m_err1+m_err2) >= len(message) and (m - m_err2)%2 == 0:
            m_msg = m
            break
    m_msg = m - m_err1 - m_err2
    # return counts
    DBG("m      = " + str(m) + "\n\tm_msg  = " + str(m_msg) + "\n\tm_err1 = " + str(m_err1) + "\n\tm_err2 = " + str(m_err2), 2)
    return m, m_msg, m_err1, m_err2
    

# Step 1.2 (create EPR pairs)
def create_EPR_pairs(m):
    EPR_pairs = []
    for i in range(m):
        label_A          = r"""A_"""+str(i)
        label_B          = r"""B_"""+str(i)
        q_a              = QuantumRegister(1, name=label_A)
        q_b              = QuantumRegister(1, name=label_B)
        c                = ClassicalRegister(2)
        EPR_pair_circuit = QuantumCircuit(q_a, q_b, c)
        EPR_pair_circuit.x(q_b[0])
        EPR_pair_circuit.h(q_a[0])
        EPR_pair_circuit.z(q_a[0])
        EPR_pair_circuit.z(q_b[0])
        EPR_pair_circuit.cx(q_a[0], q_b[0])
        EPR_pairs.append(EPR_pair_circuit)
    DBG(str(len(EPR_pairs)) + " EPR pairs generated", 1)
    return EPR_pairs


# Step 1.3 (choose message and error indices)
def choose_indices(m, m_msg, m_err1, m_err2, indices_msg, indices_err1, indices_err2):
    for i in range(m):
        indices_msg.append(i)
    indices_err1 = random.sample(indices_msg, m_err1)
    for i in indices_err1:
        indices_msg.remove(i)
    indices_err2 = random.sample(indices_msg, m_err2)
    for i in indices_err2:
        indices_msg.remove(i)
    return indices_msg, indices_err1, indices_err2


# Step 2
def detect_eavesdropping(simulator, _circuits, indices_err1, bases_err1, pe, threshold):
    # Get Bob's basis selections
    for i in indices_err1:
        bases_err1[i] = random.choice(['X', 'Y', 'Z'])
    # Get Alice and Bob's results
    bob_results   = _detection_results(simulator, _circuits, indices_err1, bases_err1, 1)  # 1 is Bob's qubit
    alice_results = _detection_results(simulator, _circuits, indices_err1, bases_err1, 0)  # 0 is Alice's qubit
    # Count errors
    error_count = 0
    DBG("Eavesdrop detection circuits:", 3)
    for i in indices_err1:
        LST("\n" + str(_circuits[i]) + "\n", 3)
        if alice_results[i] == bob_results[i]:
            error_count += 1
    DBG("bob_results       = " + str(bob_results) + "\n\talice_results     = " + str(alice_results), 2)
    DBG("error_count       = "+str(error_count) + "\n\tlen(indices_err1) = "+str(len(indices_err1)) + "\n\tthreshold         = "+str(threshold),2)
    is_error = (error_count / len(indices_err1)) >= threshold
    ERR("Eavesdropping (1) detected! Restarting QISAC protocol...", "No eavesdropping detected. Continuing...", is_error)
    return _circuits, indices_msg, indices_err1, bases_err1, is_error


# Step 3:
def encode(circuits, message, phase_data,m_msg, m_err2, indices_msg, indices_err2, bases_err2):
    # Check length restrictions and abord or add padding as needed
    if len(message) > len(circuits) - m_err2:
        ERR("message / phase_data lengths cannot exceed available qubits! Exiting...", "", True)
    elif len(message) < m_msg:
        for i in range(m_msg - len(message)):
            message    = "".join([message, "0"])     # [NOTE] we assume message cannot end with a 0 
            phase_data = "".join([phase_data, "0"])  # [NOTE] we assume phase_data cannot end with a 0 
        DBG("Padded message    = " + message + "\n\tPadded phase_data = " + phase_data, 1)
    circuits, bases_err2 = _init_2nd_check(circuits, indices_err2)
    circuits             = _encode_data(circuits, message, phase_data, indices_msg)    
    return circuits, message, phase_data, indices_msg, indices_err2, bases_err2


# Step 4:
def detect_eavesdropping_2(simulator, _circuits, m_msg, m_err2, indices_err2, bases_err2, pe, threshold):
    # Get Alice and Bob's results
    bob_results   = _detection_results(simulator, _circuits, indices_err2, bases_err2, 1)  # 1 is Bob's qubit
    alice_results = _detection_results(simulator, _circuits, indices_err2, bases_err2, 0)  # 0 is Alice's qubit
    # Count errors
    error_count = 0
    DBG("Eavesdrop detection circuits:", 3)
    for i in indices_err2:
        LST("\n" + str(_circuits[i]) + "\n", 3)
        if alice_results[i] == bob_results[i]:
            error_count += 1
    DBG("bob_results       = " + str(bob_results) + "\n\talice_results     = " + str(alice_results), 2)
    DBG("error_count       = "+str(error_count) + "\n\tlen(indices_err2) = "+str(len(indices_err2)) + "\n\tthreshold         = "+str(threshold),2)
    is_error = (error_count / len(indices_err2)) >= threshold
    ERR("Eavesdropping (2) detected! Restarting QISAC protocol...", "No eavesdropping detected. Continuing...", is_error) 
    return _circuits, is_error


# Step 5:
def measure_msg_pairs(simulator, circuits, indices_msg, bases_msg, po):
    results = {}
    for i in indices_msg:
        if random.random() <= po:
            circuits[i].pauli('XX', [0,1])  # [TODO] Is this correct?
            bases_msg[i] = 'O_1'
        else:
            circuits[i].pauli('YX', [0,1])  # [TODO] Is this correct?
            bases_msg[i] = 'O_2'
        # Get results
        circuits[i].measure_all()
        circuits[i] = transpile(circuits[i], simulator)
        job         = simulator.run(circuits[i], shots=1)
        result      = job.result().get_counts()
        result_str  = list(result.keys())[0]
        results[i]  = result_str  # [TODO] Confirm this is correct
    return results, bases_msg

In [None]:
# Helper function for Step 2
def _detection_results(simulator, circuits, indices, bases, qubit):
    results = {}
    for i in indices:
        # Prepare circuits at indices
        if   bases[i] == 'X':
            circuits[i].h(qubit)
        elif bases[i] == 'Y':
            circuits[i].sdg(qubit)
            circuits[i].h(qubit)
        # Get results
        circuits[i].measure(qubit, qubit)
        circuits[i] = transpile(circuits[i], simulator)
        job         = simulator.run(circuits[i], shots=1)
        result      = job.result().get_counts()
        result_str  = list(result.keys())[0]
        results[i]  = result_str[qubit]  # [TODO] Confirm this is correct
    return results


# Helper function for Step 3
def _init_2nd_check(circuits, indices_err2):
    bases = {}
    for i in indices_err2:
        bases[i] = random.choice(["I", "Z", "X", "iY"])
        if   bases[i] == "Z":
            circuits[i].z(0)
        elif bases[i] == "X":
            circuits[i].x(0)
        elif bases[i] == "iY":
            circuits[i].ry(np.pi/2, 0)  # [TODO] Is this the correct operator?
    DBG("bases          =  " + str(bases), 2)
    return circuits, bases

# Helper function for Step 3
def _encode_data(circuits, message, phase_data, indices_msg):
    # Encode message
    DBG("Encoding message", 1)
    j = 0
    for i in indices_msg:
        if message[j] == "1":
            circuits[i].x(0)
        j += 1
    # Encode phase_data
    for i in indices_msg:
        circuits[i].rz(math.radians(int(phase_data)), 0) # [TODO] Is this the correct operator?
    DBG("Encoded message/parameter and 2nd eavesdrop check circuits:", 3)
    for c in circuits:
        LST("\n" + str(c), 3)
    return circuits

def _load_sensor_data(file_path):

    adc1_values = []

    with open(file_path, 'r') as file:
        for line in file:
            if 'ADC1 =' in line:
                parts = line.strip().split()
                adc1_values.append(float(parts[2]))

    arr = np.array(adc1_values, dtype=np.float32)
    return arr


# source: https://stackoverflow.com/questions/16444726/binary-representation-of-float-in-python-bits-not-hex
def bin2float(b):
    ''' Convert 32-bit binary string to a float.

    Attributes:
        :b: Binary string to transform.
    '''
    h = int(b, 2).to_bytes(4, byteorder="big")
    return struct.unpack('>f', h)[0]


def float2bin(f):
    ''' Convert float to 32-bit binary string.

    Attributes:
        :f: Float number to transform.
    '''
    [d] = struct.unpack(">I", struct.pack(">f", f))
    return f'{d:032b}'



In [51]:
# Global variable declarations and initializations
_dbg_        = 0                    # If _dbg_, then print additional debug statements
m            = None                 # Number of EPR pairs
m_msg        = None                 # Number of EPR pairs allocated to message / phase accumulation
m_err1       = None                 # Number of EPR pairs allocated to first eavesdropping check
m_err2       = None                 # Number of EPR pairs allocated to second eavesdropping check
indices_msg  = []                   # Indices from circuits allocated to message / phase accumulation
indices_err1 = []                   # Indices from circuits allocated to first eavesdropping check (won't be meaningful after Step 2)
indices_err2 = []                   # Indices from circuits allocated to second eavesdropping check
pe           = 0.8                  # Proportion of EPR pairs NOT used for 1st error detection
po           = 0.5                  # Proportion of EPR pairs measured on observable 1 (see step 5)
threshold    = 0.7                  # Threshold for first eavesdropping detection
simulator    = AerSimulator()       # Create simulator
circuits     = None                 # Used to store list of EPR pairs
is_error     = True                 # Used to indicate if eavesdropping was detected (see Step 2)
message      = ""                   # Message to be communicated
phase_data   = ""                   # Phase data to be accumulated
N            = 1                    # Number of evolution iterations of phase accumulation
bases_err1   = {}                   # Bases selected by Bob for 1st eavesdropping check
bases_err2   = {}                   # Bases selected by Alice for 2nd eavesdropping check
bases_msg    = {}
#O_1          = Pauli('XX')          # Obersable 1 (see Step 5)
#O_2          = Pauli('YX')          # Obersable 2 (see Step 5)
file_path = "/home/matt/OneDrive/School/Year 4/COMP 5900E/Project/data/10min1.txt"

def run_qisac_once(message):
    global _dbg_
    global m, m_msg, m_err1, m_err2
    global indices_msg, indices_err1, indices_err2
    global pe, po, threshold, simulator, circuits
    global is_error, phase_data, N
    global bases_err1, bases_err2, bases_msg
    # global O_1, O_2

    is_error = True

    while is_error:
        # message      = message  # multiply sensor data by 10^d to remove fractional part
        phase_data   = "00000000000000000000000000000000"  
        indices_msg  = []
        indices_err1 = []
        indices_err2 = []

        print("\n" + colored("[STARTING QISAC PROTOCOL]", "black", "on_magenta", attrs=["bold"]))
        
        NEW("(1) Prepare list of EPR pairs and send one qubit from each to Bob")
        m, m_msg, m_err1, m_err2                = get_EPR_counts(message, pe, m, m_msg, m_err1, m_err2)
        circuits                                = create_EPR_pairs(m)  # Create list of EPR pair circuits
        indices_msg, indices_err1, indices_err2 = choose_indices(m, m_msg, m_err1, m_err2, indices_msg, indices_err1, indices_err2)
        DBG("indices_msg  = " + str(indices_msg) + "\n\tindices_err1 = " + str(indices_err1) + "\n\tindices_err2 = " + str(indices_err2), 2)

        NEW("(2) Check for eavesdropping")
        # add extra param to detect_eavesdropping() as either True or False to debug
        circuits, indices_msg, indices_err1, bases_err1, is_error = detect_eavesdropping(simulator, circuits, indices_err1, bases_err1, pe, threshold)  

        if is_error:
            print("\n")  # UI formatting
            continue
        
        NEW("(3) Encode message and parameter estimate")
        circuits, message, phase_data, indices_msg, indices_err2, bases_err2 = encode(circuits, message, phase_data, m_msg, m_err2, indices_msg, indices_err2, bases_err2)

        NEW("(4) Transmit remaining qubits and check again for eavesdropping")
        circuits, is_error = detect_eavesdropping_2(simulator, circuits, m, m_err2, indices_err2, bases_err2, pe, threshold)

        if is_error:
            print("\n")  # UI formatting
            continue

    NEW("(5) Perform measurements")
    results, bases_msg = measure_msg_pairs(simulator, circuits, indices_msg, bases_msg, po)
    DBG("Measurement results:", 1)
    msg_check = ""
    for q,v in results.items():
        if v[0] == v[1]:
            msg_check = msg_check + str(1)
        else:
            msg_check = msg_check + str(0)
    DBG("msg_check = " + msg_check + "\n\tmessage   = " + message, 1)
    success = (msg_check == message)
    ERR("Decoded message does not match sent message!", "Decoded message correctly!", msg_check != message)
    return success



In [52]:
sensor_values = _load_sensor_data(file_path)

In [None]:
results = []
count = 0

for value in sensor_values:
    bin_value = float2bin32(value)
    result = run_qisac_once(bin_value)
    results.append(result)
    # break
    count += 1 # for testing
    if count == 10:
        break

print("\n")
print("Total successful:", sum(results))
print("Total failed:", len(results) - sum(results))




[1m[45m[30m[STARTING QISAC PROTOCOL][0m



[1m[46m[30m[NEW][0m	(1) Prepare list of EPR pairs and send one qubit from each to Bob



[1m[46m[30m[NEW][0m	(2) Check for eavesdropping



[1m[45m[30m[STARTING QISAC PROTOCOL][0m



[1m[46m[30m[NEW][0m	(1) Prepare list of EPR pairs and send one qubit from each to Bob



[1m[46m[30m[NEW][0m	(2) Check for eavesdropping



[1m[46m[30m[NEW][0m	(3) Encode message and parameter estimate



[1m[46m[30m[NEW][0m	(4) Transmit remaining qubits and check again for eavesdropping



[1m[46m[30m[NEW][0m	(5) Perform measurements

[1m[45m[30m[STARTING QISAC PROTOCOL][0m



[1m[46m[30m[NEW][0m	(1) Prepare list of EPR pairs and send one qubit from each to Bob



[1m[46m[30m[NEW][0m	(2) Check for eavesdropping



[1m[46m[30m[NEW][0m	(3) Encode message and parameter estimate



[1m[46m[30m[NEW][0m	(4) Transmit remaining qubits and check again for eavesdropping



[1m[46m[30m[NEW][0m	(5) Perform measurem