In [1]:
# -*- coding: utf-8 -*-
"""
Created on 2023/04/11
Revised on 2023/05/30
 
@author: mjofre - Marc Jofre
e-mail: marc.jofre@upc.edu
Technical University of Catalonia - Universitat Politècnica de Catalunya (UPC)
"""
###########################################################################
# Seminar on Quantum Technologies for Cybersecurity: Networking and Systems
# Class
# Session 1 - Introduction to Quantum Technologies for Cybersecurity

import os, sys, time
import numpy as np
import math
import matplotlib.pyplot as plt
from google.colab import files
import matplotlib.style
import matplotlib as mpl
#print(plt.style.available)
mpl.style.use('default')

try:  
  import qiskit
except:
  print("installing qiskit...")
  !pip install qiskit --quiet
  print("installed qiskit.")
  import qiskit


installing qiskit...
  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.1/5.1 MB[0m [31m19.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m51.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m241.5/241.5 kB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m129.7/129.7 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.6/49.6 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.6/49.6 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m37.5/37.5 MB[0m [31m9.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m

In [None]:
#############################################################################
# Shor's algorithm - Explicit description
# Is a quantum algorithm that can be used to factor large integers into their prime factors in polynomial time.
# It works using quantum Fourier transforms and the period finding.
# https://qiskit.org/textbook/ch-algorithms/shor.html

from qiskit import Aer, QuantumCircuit, execute
import math, random, signal, time
def timeout_handler(num, stack):
  print("Received SIGALRM")
  raise Exception("FUBAR")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(25)

# Function to find the greatest common divisor
def gcd(a,b):
  while b!=0:
    a,b=b,a%b
  return a

# Function to find a random integer coprime to n
def find_coprime(n):
  while True:
    coprime=random.randint(2,n-1)
    if gcd(coprime,n)==1:
      return coprime

# Shor's algorithm function
def shor_algorithm(n):
  # Step 1: Choose a random numer 'a' coprime to 'n'
  a=find_coprime(n)

  # Step 2: Construct the quantum circuit
  quantum_register_size=math.ceil(math.log2(n))
  classical_register_size=quantum_register_size*2

  quantum_register = QuantumCircuit(quantum_register_size+1,classical_register_size)

  # Step 2a: Apply Hadamard gate to the quantum register
  for i in range(quantum_register_size):
    quantum_register.h(i)
  """
  # Step 2b: Apply modular exponentiation to the quantum register
  for i in range(quantum_register_size):
    quantum_register.x(quantum_register_size) # Apply X gate: It is equivalent to the classical NOT gate in classical computing, i.e., it flips the state of a qubit from |0⟩ to |1⟩ and vice versa. 
    for j in range(2**i):
      quantum_register.swap(i+j,quantum_register_size+i+j)
    for j in range(2**i):
      quantum_register.cx(quantum_register_size+i+j,i)
    quantum_register.x(quantum_register_size)
  """
  """
  # Step 2c: Apply inverse Fourier transform to the quantum register
  for i in range(quantum_register_size):
    quantum_register.h(i)
    quantum_register.p(-math.pi/float(2**i),i) # previously u1
    for j in range(i+1,quantum_register_size):
      quantum_register.crz(-math.pi/float(2**(j-i)),i,j) # previously cu1
    quantum_register.barrier()
  """
  # Step 2d: Measure the quantum register and store the result in the classical register
  for i in range(quantum_register_size):
    quantum_register.measure(i,i)
  
  # Step 3: Execute the quantum circuit on a simulator
  simulator=Aer.get_backend('qasm_simulator')
  result=execute(quantum_register,simulator,shots=1).result()
  measured_registers=result.get_counts()

  # Step 4: Extract the period 'r' from the measured registers
  for measured_register in measured_registers:
    measured_register=measured_register[::-1]
    x=int(measured_register[:quantum_register_size],2)
    y=int(measured_register[quantum_register_size:],2)
    r=gcd((a**x-1)%n,n)
    if r!=1 and r!=n:
      return r
  
  # Step 5: If no period 'r' is found, repeat the algorithm with a different 'a'
  return shor_algorithm(n)

# Example usage 1 works
n=int(11*17)#int(11*17)#int(()*())#15
print('Number to factorize in primes: '+str(n))
factor1=shor_algorithm(n)
factor2=n//factor1
print(f"The factors of {n} are {factor1} and {factor2}.")

# Example usage 2, not capable
# https://en.wikipedia.org/wiki/Largest_known_prime_number#:~:text=The%20largest%20known%20prime%20number,Search%20(GIMPS)%20in%202018.
LargestPrimeNumber=8191#(2**82589933 - 1)
SecondLargestPrimeNumber=131071#(2**77232917 - 1)
n=int(LargestPrimeNumber*SecondLargestPrimeNumber)
print('Number to factorize in primes: '+str(n))
time.sleep(1)
try:
  factor1=shor_algorithm(n)
  factor2=n//factor1
  print(f"The factors of {n} are {factor1} and {factor2}.")
except Exception as ex:
  print("We're gonna need a quantum computer!")
finally:
  signal.alarm(0)


Number to factorize in primes: 187
The factors of 187 are 11 and 17.
Number to factorize in primes: 1073602561
Received SIGALRM
We're gonna need a quantum computer!


# Quantum Key Distribution

In [2]:
try:
  import qunetsim
  # https://pypi.org/project/qunetsim/
  # https://tqsd.github.io/QuNetSim/
  # https://github.com/tqsd/QuNetSim
except ImportError:
  print("installing qunetsim...")
  !pip install qunetsim==0.1.2.post3 --quiet 
  print("installed qunetsim.")
  import qunetsim
# Note: seems that functions get_qubit, has to be changed by get_data_qubit
# Also, some name definitions instead of "", should be ''
# Also, some times exit() is place. This has to be removed, since it kills the runtime

installing qunetsim...
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.5/60.5 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for eqsn (setup.py) ... [?25l[?25hdone
installed qunetsim.


In [3]:
from qunetsim.components.host import Host
from qunetsim.components.network import Network
from qunetsim.objects import Qubit
from qunetsim.objects import Logger
import random

Logger.DISABLED = True
FlagMessages=np.logical_not(Logger.DISABLED)
KEY_LENGTH = 150
SAMPLE_SIZE = int(KEY_LENGTH)
WAIT_TIME = 5
INTERCEPTION = True

# Basis Declaration
BASIS = ['Z', 'X']  # |0>|1> = Z-Basis; |+>|-> = X-Basis
##########################################################################

def q_bit(host, encode):
  q = Qubit(host)
  if encode == '+':
    q.H()
  if encode == '-':
    q.X()
    q.H()
  if encode == '0':
    q.I()
  if encode == '1':
    q.X()
  return q

def encoded_bases(alice_bits, alice_basis):
  alice_encoded = ""
  for i in range(0, len(alice_bits)):
    if alice_basis[i] == 'X':  # X-Basis
      if alice_bits[i] == '0':
        alice_encoded += '+'
      if alice_bits[i] == '1':
        alice_encoded += '-'
    if alice_basis[i] == 'Z':  # Z-Basis
      if alice_bits[i] == '0':
        alice_encoded += '0'
      if alice_bits[i] == '1':
        alice_encoded += '1'
  # print("Alice encoded: {}".format(alice_encoded))
  return alice_encoded

def preparation():
  alice_basis = ""
  bob_basis = ""
  alice_bits = ""
  for kl in range(KEY_LENGTH):
    alice_basis += random.choice(BASIS)
    bob_basis += random.choice(BASIS)
    alice_bits += str(random.getrandbits(1))
  alice_encoded = encoded_bases(alice_bits, alice_basis)
  return alice_basis, bob_basis, alice_bits, alice_encoded

def alice_key_string(alice_bits, alice_basis, bob_basis):
  alice_key = ""
  for i in range(SAMPLE_SIZE):
    if alice_basis[i] == bob_basis[i]:
      alice_key += str(alice_bits[int(i)])
  print("Alice Key: {}".format(alice_key))
  return alice_key

def bob_key_string(bob_bits, bob_basis, alice_basis):
  bob_key = ""
  for i in range(SAMPLE_SIZE):
    if bob_basis[i] == alice_basis[i]:
      bob_key += str(bob_bits[i])
  print("Bob Key: {}".format(bob_key))
  return bob_key

def alice(host, receiver, alice_basis, alice_bits, alice_encoded):
  # For Qubit and Basis
  for i, encode in enumerate(alice_encoded):
    _, ack = host.send_qubit(receiver, q_bit(host, encode), await_ack=True)
    if FlagMessages:
      if ack is not None:
        print("{}'s qubit {} successfully sent to {}.".format('Alice', i, receiver))
  ################################################################################  
  # Sending Basis to Bob
  ack_basis_alice = host.send_classical('Bob', alice_basis, await_ack=True)
  if FlagMessages:
    if ack_basis_alice is not None:
      print("{}'s basis string successfully sent to Bob".format('Alice'))
  # Receiving Basis from Bob
  basis_from_bob = host.get_classical('Bob', wait=WAIT_TIME)
  if FlagMessages:
    if basis_from_bob is not None:
      print("{}'s basis string got successfully by {}".format(receiver, 'Alice'))
  ###############################################################################

  # For Key
  alice_key = alice_key_string(alice_bits, alice_basis, basis_from_bob[0].content)
  global alice_key_recovered
  alice_key_recovered=alice_key
  #################################################################################

  # For Sending Key
  # alice_brd_ack = host.send_classical(receiver, str(alice_key), await_ack=True)
  # if alice_brd_ack is not None:
  #     print("{}'s key successfully sent to {}".format(host.host_id, receiver))
  # bob_key = host.get_classical(receiver, wait=WAIT_TIME)
  # if bob_key is not None:
  #     print("{}'s key got successfully by {}".format(receiver, host.host_id))
  #     if alice_key == bob_key[0].content:
  #         print("Same key from {}'s side".format(host.host_id))
  #     else:
  #         print("No")
  ################################################################################

def eve_sniffing_quantum(host, receiver, eve_basis, alice_basis, alice_bits):
  #qubit.measure(non_destructive=True)
  # Attention, how this programmed it is doing an interception attack
  for i in range(0, len(eve_basis)):
    data=host.get_data_qubit(receiver, wait=WAIT_TIME)
    eve_bit=data.measure()#non_destructive=True)    
    if (eve_basis[i]==alice_basis[i]):
      eve_encoded = encoded_bases([alice_bits[i]], [eve_basis[i]])
      _, ack = host.send_qubit('Bob', q_bit(host,eve_encoded), await_ack=True)
    else:
      eve_encoded = encoded_bases([eve_bit], [eve_basis[i]])
      _, ack = host.send_qubit('Bob', q_bit(host,eve_encoded), await_ack=True)
    if FlagMessages:
      if ack is not None:
        print("{}'s qubit {} successfully sent to {}.".format('Eve', i, 'Bob'))
    #time.sleep(0.01)

def bob(host, receiver, bob_basis):
  bob_measured_bits = ""
  # For Qubit and Basis
  for i in range(0, len(bob_basis)):
    data = host.get_data_qubit(receiver, wait=WAIT_TIME)
    if FlagMessages:
      if data is not None:
        print("{}'s qubit {} got successfully by {}".format(receiver, i, host.host_id))
    # Measuring Alice's qubit based on Bob's basis
    if bob_basis[i] == 'Z':  # Z-basis
      bob_measured_bits += str(data.measure())
    if bob_basis[i] == 'X':  # X-basis
      data.H()
      bob_measured_bits += str(data.measure())
  
  print("Bob measured bit: {}".format(bob_measured_bits))
  ###############################################################################

  # Receiving Basis from Alice
  basis_from_alice = host.get_classical('Alice', wait=WAIT_TIME)
  if basis_from_alice is not None:
    print("{}'s basis string got successfully by {}.".format('Alice', 'Bob'))
  # Sending Basis to Alice
  ack_basis_bob = host.send_classical('Alice', bob_basis, await_ack=True)
  if ack_basis_bob is not None:
    print("{}'s basis string successfully sent to {}.".format('Bob', 'Alice'))
  ################################################################################

  # For sample key indices
  bob_key = bob_key_string(bob_measured_bits, bob_basis, basis_from_alice[0].content)
  global bob_key_recovered
  bob_key_recovered=bob_key
  ##############################################################################

  # For Broadcast Key
  # alice_key = host.get_classical(receiver, wait=WAIT_TIME)
  # if alice_key is not None:
  #     print("{}'s key got successfully by {}".format(receiver, host.host_id))
  #     if bob_key == alice_key[0].content:
  #         print("Same key from {}'s side".format(host.host_id))
  #     else:
  #         print("No")
  # bob_brd_ack = host.send_classical(receiver, str(bob_key), await_ack=True)
  # if bob_brd_ack is not None:
  #     print("{}'s key successfully sent to {}".format(host.host_id, receiver))
  ################################################################################

def main():
  network = Network.get_instance()
  # Define network topology
  nodes = ['Alice', 'Eve', 'Bob']
  network.start(nodes)
  network.delay = 0.0

  host_alice = Host('Alice')
  host_eve = Host('Eve')
  host_bob = Host('Bob')

  #host_alice.add_connection('Bob')
  host_alice.add_connections(['Eve','Bob'])
  host_alice.start()
  
  host_eve.add_connections(['Alice','Bob'])
  host_eve.delay = 0.1
  host_eve.start()
  
  #host_bob.add_connection('Alice')
  host_bob.add_connections(['Alice','Eve'])
  host_bob.delay = 0.5
  host_bob.start()

  network.add_host(host_alice)
  network.add_host(host_eve)
  network.add_host(host_bob)
  #network.draw_classical_network()
  #network.draw_quantum_network()
  
  network.start()

  alice_basis, bob_basis, alice_bits, alice_encoded = preparation()
  
  print("Alice bases: {}".format(alice_basis))
  print("Bob bases: {}".format(bob_basis))
  print("Alice bits: {}".format(alice_bits))
  print("Alice encoded: {}".format(alice_encoded))
  
  if INTERCEPTION:
    t1 = host_alice.run_protocol(alice, (host_eve.host_id, alice_basis, alice_bits, alice_encoded, ))
    eve_basis, aux_basis, aux_bits, aux_encoded = preparation()
    t2 = host_eve.run_protocol(eve_sniffing_quantum, (host_alice.host_id, eve_basis, alice_basis, alice_bits))
    t3 = host_bob.run_protocol(bob, (host_eve.host_id, bob_basis, ))
  else:
    t1 = host_alice.run_protocol(alice, (host_bob.host_id, alice_basis, alice_bits, alice_encoded, ))
    t3 = host_bob.run_protocol(bob, (host_alice.host_id, bob_basis, ))
  time.sleep(WAIT_TIME)
  #t1.join()
  #t2.join()

  network.stop(True)
  qber=0.0
  for i in range(0,len(alice_key_recovered)):
    if (alice_key_recovered[i]!=bob_key_recovered[i]):
      qber+=1
  qber=qber/len(alice_key_recovered)
  print("QBER: {:.2f}%".format(qber*100.0))
  if (qber>0.11):
    print('Eavesdropping detected!')
  else:
    print('No eavesdropping detected!')

main()

Alice bases: XXZZZZXZZXZXZZXZXXZXZXZXXZXXZXXZXZZZZZZZZXXXXZZZXXXZXXXZXZXXZZXZXXZZZXXXZZZXXXZXXZXXXXZXZZZXXZZXXZXZZZXXXXXXZZZXZXZZZZZZXZXXZZZZZXZZXXXZXZZXZZXXXXZXXX
Bob bases: XZXXZXZZXXXZXZXZXXZXZXZZXZZXZXZXZZZXZXXZZZXXXZZZZZXXZZZZZZZXZZZZZZZZZZXZZXXZXXZXXXZXZZXXZXXZZZXXXXXZZXZXXZZZXZXZZXZXZXZXXZXZXXZZZXXXZZZXZZXXXZXXZXXXZX
Alice bits: 111110000000111010011110111011101110001010011101000100110110011111000001011100101111011100000100000110010010001010110110010111111100011110100011101100
Alice encoded: --1110+00+0+11-0-+0-1-1+-1-+1--0-11000101++--101+++1++-1+1-+01-1--000++-011-++1+-1--+-1-000++10++0+110+-++-+001+1+110110+1+-11111-00+--1-01+00---+1-++
Bob measured bit: 111101000010011010011010000001110010000010011001000010000000000000000010000000101001101101100110010001110000001101100001100100001001110100001011001101
Alice's basis string got successfully by Bob.
Alice Key: 1100110100111111011110010111010110011000000010111010001110010101010111100011010
Bob's basis string successfully sent to