# CommLab Final Project: AQS Implementation with QOTP

## Import packages

In [161]:
from qiskit import *
import numpy as np
from qiskit.tools.visualization import circuit_drawer
from QKD import QKD

In [162]:
qasm_sim = Aer.get_backend('qasm_simulator')
def get_measurements(qc, num_shots): 
    job = execute(qc, qasm_sim, memory = True, shots = num_shots) 
    result = job.result()
    return result

## Initializing phase

In [163]:
def bin_to_int_key(bin_key):
    # transform binary key shared by alice and trent into KAT(integer based key)
    n = len(bin_key)
    int_key = []
    zero = []
    one = []
    for i in range(0,n):
        if not bin_key[i]:
            zero.append(i)
        else:
            one.append(i)
    int_key = zero + one
    return int_key

def generate_message(n):
    message = QuantumRegister(n, name="p")
    message_measure = ClassicalRegister(n, name="p_c")
    qc = QuantumCircuit(message, message_measure)
    for i in range(n):
        random_state = quantum_info.random_statevector(dims=2, seed=None)
        qc.initialize(random_state, message[i])
    return qc

# will be replaced with QKD
def get_shared_key(n):
    r_AT = QKD(n)
    r_BT = QKD(2*n + 1)
    
    K_AT = bin_to_int_key(r_AT)
    K_BT = bin_to_int_key(r_BT)
    
    return K_AT, K_BT, r_BT

def create_bell_states(n):
    A_pos = QuantumRegister(n)
    B_pos = QuantumRegister(n)  
    qc = QuantumCircuit(A_pos, B_pos)
    
    for i in range(n):
        qc.h(A_pos[i])
        qc.cnot(A_pos[i], B_pos[i])
    return qc.to_instruction(label="bell_state")

In [164]:
n = 4
shots = 100

P = QuantumRegister(n, name="p_original")
P_enc = QuantumRegister(n, name="p_enc")
M_A_1 = QuantumRegister(n, name="m_a_1")
M_A_2 = QuantumRegister(n, name="m_a_2")
M_A_1_T = QuantumRegister(n, name="m_a_1_t")
M_A_2_T = QuantumRegister(n, name="m_a_2_t")
M_A_1_sig = QuantumRegister(n, name="m_a_1_sig")
M_A_2_sig = QuantumRegister(n, name="m_a_2_sig")
A_bell = QuantumRegister(n, name="alice_bell")
B_bell = QuantumRegister(n, name="bob_bell")
for_M_A = QuantumRegister(n, name="m_a_mea")
verify = QuantumRegister(2, name="verify")
verify_measure = ClassicalRegister(2, name="verify_measure")
cswap_measure = ClassicalRegister(1, name="cswap_measure")
cswap = QuantumRegister(1, name="cswap")
cswap_measure_2 = ClassicalRegister(1, name="cswap_measure_2")
cswap_2 = QuantumRegister(1, name="cswap_2")
cswap_measure_3 = ClassicalRegister(1, name="cswap_measure_3")
cswap_3 = QuantumRegister(1, name="cswap_3")
cswap_measure_4 = ClassicalRegister(1, name="cswap_measure_4")
cswap_4 = QuantumRegister(1, name="cswap_4")

qc = QuantumCircuit(P, P_enc, M_A_1, M_A_2, M_A_1_T, M_A_2_T, M_A_1_sig, M_A_2_sig, 
                    A_bell, B_bell, for_M_A, verify, verify_measure,
                    cswap, cswap_2, cswap_3, cswap_4, 
                    cswap_measure, cswap_measure_2, cswap_measure_3, cswap_measure_4)

get_M_A = []

for m in range(2*n):
    get_M_A.append(ClassicalRegister(1, 'mac_' + str(m)))
    qc.add_register(get_M_A[m])

measure_M_A_1 = []
measure_M_A_2 = []
measure_M_A_1_T = []
measure_M_A_2_T = []
for m in range(n):
    measure_M_A_1.append(ClassicalRegister(1, 'm_a_1_measure'+str(m)))
    measure_M_A_2.append(ClassicalRegister(1, 'm_a_2_measure'+str(m)))
    measure_M_A_1_T.append(ClassicalRegister(1, 'm_a_1_t_measure'+str(m)))
    measure_M_A_2_T.append(ClassicalRegister(1, 'm_a_2_t_measure'+str(m)))

for m in range(n):
    qc.add_register(measure_M_A_1[m])
for m in range(n):
    qc.add_register(measure_M_A_2[m])
for m in range(n):
    qc.add_register(measure_M_A_1_T[m])
for m in range(n):
    qc.add_register(measure_M_A_2_T[m])

# Step 1 : 3 copies of the message
for i in range(n):
    random_state = quantum_info.random_statevector(dims=2, seed=None)
    qc.initialize(random_state, P[i])
    qc.initialize(random_state, P_enc[i])
    qc.initialize(random_state, for_M_A[i])
    

# keys generation
K_AT, K_BT, _ = get_shared_key(20*n)

# Step 2 : generate bell states between Alice, Bob
AB_bell = []
for i in range(n):
    AB_bell.append(A_bell[i])
for i in range(n):
    AB_bell.append(B_bell[i])
qc.append(create_bell_states(n), AB_bell)

  return func(*args, **kwargs)


<qiskit.circuit.instructionset.InstructionSet at 0x135f63280>

## Signing phase

In [165]:
def qotp(n, KAT_2n, type):
    # n = len(P)
    # KAT has length 2n
    q = QuantumRegister(n)
    qc = QuantumCircuit(q)
    for i in range(0,n):
        if KAT_2n[2*i]:
            qc.x(i)
        if KAT_2n[2*i+1]:
            qc.z(i)
    return qc.to_instruction(label="qotp_"+type)
    
def secret_qubit(K, type):
    n = len(K)
    q = QuantumRegister(n)
    qc = QuantumCircuit(q)
    for i in range(0,n):
        if K[i] == 0:
            qc.x(q[i])
        else :
            qc.z(q[i])
    return qc.to_instruction(label="secret_qubit_"+type)

def chained_cnot_dec(n, K):
    q = QuantumRegister(n)
    qc = QuantumCircuit(q)
    print()
    for i in reversed(range(0,n)):
        if i != K[i]:
            qc.cnot(i, K[i])
    return qc.to_instruction(label="chained_cnot_dec")

def bell_measurement(n):
    q1 = QuantumRegister(n)
    q2 = QuantumRegister(n)
    qc = QuantumCircuit(q1, q2)
    for i in range(n):
        qc.cnot(q1[i], q2[i])
        qc.h(q1[i])
    return qc.to_instruction(label="bell_measurement")

In [166]:
# Step 1 : 2nd copy of P --> P_enc
qc.append(secret_qubit(K_AT[:n], "enc"), P_enc)

# Step 3 : generate one type of bell state (to transform into other bell states later)
bell_m = []

for i in range(n):
    bell_m.append(M_A_1[i])
for i in range(n):
    bell_m.append(M_A_2[i])
qc.append(create_bell_states(n), bell_m)

bell_m = []
# bell_m is P_enc and A_bell concatenated

for i in range(n):
    bell_m.append(A_bell[i]) # A's bell state
for i in range(n): 
    bell_m.append(for_M_A[i]) # P

qc.append(bell_measurement(n), bell_m) # do bell measurement on A's bell and P

<qiskit.circuit.instructionset.InstructionSet at 0x134f90d00>

In [167]:
for i in range(n):
    qc.measure(A_bell[i], get_M_A[2*i])
    qc.measure(for_M_A[i], get_M_A[2*i+1])
    qc.z(M_A_1[i]).c_if(get_M_A[2*i], 1)
    qc.x(M_A_2[i]).c_if(get_M_A[2*i+1], 1)

In [168]:
S = [*P_enc, *M_A_1, *M_A_2]
qc.append(qotp(3*n, K_AT[:6*n], "enc"), S)

<qiskit.circuit.instructionset.InstructionSet at 0x135acfd30>

## Verification Phase

In [169]:
def cswap_comparison(n):
    # q1, q2 : qubits to compare
    # for_measure : the qubits to measure
    q1 = QuantumRegister(n)
    q2 = QuantumRegister(n)
    for_measure = QuantumRegister(1)
    qc = QuantumCircuit(q1, q2, for_measure)
    for i in range(n):
        qc.h(for_measure[0])
        qc.cswap(for_measure[0], q1[i], q2[i])
        qc.h(for_measure[0])
    return qc.to_instruction(label="swap test")
    # cswap measurement = 0 --> same

In [170]:
# step 1 : Bob receives S, P (from Alice) --> YB = EKB(S, P)
qc.append(qotp(4*n, K_BT[:8*n], "enc"), [*S, *P])

# step 2 : Trent : decrypts YB --> (P, S) --> (P, P_enc, M_A_1, M_A_2)
qc.append(qotp(4*n, K_BT[:8*n], "dec"), [*S, *P])
qc.append(qotp(3*n, K_AT[:6*n], "dec"), S)

# checks if P_enc calculated = P_enc sent
qc.append(secret_qubit(K_AT[:n], "enc"), P)
qc.append(cswap_comparison(n), [*P, *P_enc, *cswap])
qc.measure(cswap, cswap_measure)

result = get_measurements(qc, shots)
counts = result.get_counts()
print(counts)

{'0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 0 1 0 0 0 1 0 0 0 0 00': 1, '0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 1 0 0 0 0 0 0 0 00': 1, '0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 0 0 0 0 1 0 0 0 0 00': 1, '0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 1 0 1 1 0 0 0 0 00': 1, '0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 00': 1, '0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 0 0 0 0 0 00': 1, '0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 0 0 1 1 1 0 0 0 0 00': 1, '0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 00': 1, '0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 0 1 1 0 0 0 0 0 00': 1, '0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 1 1 0 0 0 0 00': 1, '0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 1 0 1 0 0 0 0 0 0 00': 1, '0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 00': 1, '0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 0 0 1 0 0 1 0 0 0 0 00': 1, '0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 1 0 0 0 0 0 00': 1, '0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 0 0 1 0 0 0 0 0 0 0 00': 1, '0 0 0 0 0 0 0 0 0 0 0 0

In [171]:
identical = True
for key in counts:
    if key[-4] != "0":
        identical = False
print(identical)

True


In [172]:
if identical : 
    qc.x(verify[0])

In [173]:
# step 3 : Trent recovers P
qc.append(secret_qubit(K_AT[:n], "dec"), P)

# measures Alice's bell states and replicate them
qc.append(create_bell_states(n), [*M_A_1_T, *M_A_2_T])
qc.append(create_bell_states(n), [*M_A_1_sig, *M_A_2_sig])

qc.append(bell_measurement(n), [*M_A_1, *M_A_2])

for i in range(n):
    qc.measure(M_A_1[i], measure_M_A_1[i])
    qc.measure(M_A_2[i], measure_M_A_2[i])
    qc.z(M_A_1_T[i]).c_if(measure_M_A_1[i], 1)
    qc.x(M_A_2_T[i]).c_if(measure_M_A_2[i], 1)
    qc.z(M_A_1_sig[i]).c_if(measure_M_A_1[i], 1)
    qc.x(M_A_2_sig[i]).c_if(measure_M_A_2[i], 1)
# sends YTB = EKB(M_A_1_T, M_A_2_T, S, P, r) size = 6N+2
S = [*P_enc, *M_A_1_sig, *M_A_2_sig]
qc.append(qotp(3*n, K_AT[:6*n], "enc"), S)
qc.append(qotp(6*n+2, K_BT[:12*n+4], "enc"), [*M_A_1_T, *M_A_2_T, *S, *P, *verify])

# #step 4 : Bob decrypts and checks verify bits
qc.append(qotp(6*n+2, K_BT[:12*n+4], "dec"), [*M_A_1_T, *M_A_2_T, *S, *P, *verify])
qc.measure(verify, verify_measure)
qc.barrier()
result = get_measurements(qc, 10)
counts = result.get_counts()
print(counts)

{'0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 1 0 0 0 1 0 0 0 0 0 0 0 01': 1, '0 0 0 0 0 0 0 0 0 1 0 1 0 1 1 0 0 0 1 1 0 1 1 0 0 0 0 0 01': 1, '0 0 0 0 0 0 0 0 0 0 0 1 1 1 0 1 0 1 0 1 0 0 1 1 0 0 0 0 01': 1, '0 0 0 0 0 0 0 0 0 1 0 0 1 1 0 1 0 1 1 1 0 0 0 1 0 0 0 0 01': 1, '0 0 0 0 0 0 0 0 1 0 0 0 0 1 1 1 1 0 0 1 0 1 0 1 0 0 0 0 01': 1, '0 0 0 0 0 0 0 0 1 0 1 0 1 0 1 0 1 1 0 0 1 1 0 0 0 0 0 0 01': 1, '0 0 0 0 0 0 0 0 0 1 0 1 0 0 1 0 0 0 1 0 0 1 1 0 0 0 0 0 01': 1, '0 0 0 0 0 0 0 0 1 1 1 0 0 0 0 1 1 0 1 0 1 0 0 1 0 0 0 0 01': 1, '0 0 0 0 0 0 0 0 0 1 1 1 1 1 0 0 0 1 1 1 1 0 1 0 0 0 0 0 01': 1, '0 0 0 0 0 0 0 0 1 1 0 1 1 1 0 0 1 1 1 1 0 0 1 0 0 0 0 0 01': 1}


In [174]:
# check if verifiable 
identical = True
for key in counts:
    if key[-2:] != '01':
        identical = False
print(identical)

True


In [175]:
if not identical :
    print("Signature is forged, protocol terminates")
else:
    # Bob has M_A_1_T, M_A_2_T, S, P, verify
    qc.append(bell_measurement(n), [*M_A_1_T, *M_A_2_T])
    for i in range(n):
        qc.measure(M_A_1_T[i], measure_M_A_1_T[i])
        qc.measure(M_A_2_T[i], measure_M_A_2_T[i])
        qc.z(B_bell[i]).c_if(measure_M_A_1_T[i], 1)
        qc.x(B_bell[i]).c_if(measure_M_A_2_T[i], 1)
    qc.barrier()
    qc.append(cswap_comparison(n), [*B_bell, *P, *cswap_2])
    qc.measure(cswap_2, cswap_measure_2)

In [176]:
result = get_measurements(qc, shots)
counts = result.get_counts()
# print(counts)

In [177]:
identical = True
for keys in counts:
    if keys[-6] != '0':
        identical = False
print(identical)

True


In [178]:
if not identical : 
    print("Reject signature")
else :
    print("Accept Signature")

Accept Signature


In [179]:
qc.draw(fold=-1)

## Dispute Resolve : Alice's Disavowel

In [180]:
# Trent requires Bob to provide P, S
# Trent checks if P_enc = encrypt_KAT(P)
# S = [*P_enc, *M_A_1_sig, *M_A_2_sig]
qc.append(qotp(3*n, K_AT[:6*n], "dec"), S)
qc.append(secret_qubit(K_AT[:n], "enc"), P)

qc.append(cswap_comparison(n), [*P, *P_enc, *cswap_3])
qc.measure(cswap_3, cswap_measure_3)
result = get_measurements(qc, shots)
counts = result.get_counts()
# print(counts)
# restore P
qc.append(secret_qubit(K_AT[:n], "dec"), P)
qc.append(qotp(3*n, K_AT[:6*n], "enc"), S)

<qiskit.circuit.instructionset.InstructionSet at 0x134facd60>

In [181]:
identical = True
for keys in counts:
    if keys[-8] != '0':
        identical = False
if identical :
    print("Message and signature verified, Alice's disavowel prevented")
else :
    print("Message and signature forged, Bob's forgery prevented")
qc.draw(fold=-1)

Message and signature verified, Alice's disavowel prevented


## Bob : Known Message Attack

In [182]:
# Bob now has a valid signature, message pair (S, P) 
# step 1 : generate random string r and add same pauli gates on S, P 
rdstr = []
r = np.random.randint(4, size=n)
for i in range(n):
    if r[i] == 0:
        qc.x(P[i])
        qc.x(P_enc[i])
    elif r[i] == 1:
        qc.z(P[i])
        qc.z(P_enc[i])
    elif r[i] == 2:
        qc.x(P[i])
        qc.z(P[i])
        qc.x(P_enc[i])
        qc.z(P_enc[i])

In [183]:
# step 2 : Trent's dispute resolve
qc.append(qotp(3*n, K_AT[:6*n], "dec"), S)
qc.append(secret_qubit(K_AT[:n], "enc"), P)

qc.append(cswap_comparison(n), [*P, *P_enc, *cswap_3])
qc.measure(cswap_3, cswap_measure_3)
result = get_measurements(qc, shots)
counts = result.get_counts()
# print(counts)
# restore P
qc.append(secret_qubit(K_AT[:n], "dec"), P)
qc.append(qotp(3*n, K_AT[:6*n], "enc"), S)

<qiskit.circuit.instructionset.InstructionSet at 0x134f5d100>

In [184]:
identical = True
for keys in counts:
    if keys[-8] != '0':
        identical = False

if identical :
    print("Bob's forgery succeeded")
else :
    print("Bob's forgery failed")

Bob's forgery succeeded
