In [1]:
import numpy as np
from qiskit_aer import Aer
from qiskit import QuantumCircuit, QuantumRegister, ClassicalRegister, transpile
from qiskit.visualization import plot_histogram
from qiskit.circuit.library import StatePreparation
from qiskit_algorithms import AmplificationProblem, Grover
import matplotlib.pyplot as plt
import math
from qiskit.primitives import Sampler
import itertools
from collections import defaultdict

class QuantumKNN:
    def normalize_vector(self, vector):
        norm = np.linalg.norm(vector)
        return vector / norm
    
    def where_to_apply_x(self, bin_number_length):
        powers_of_two = 2 ** np.arange(bin_number_length)
        indices = \
            [
                [
                    ind for ind, v in enumerate(powers_of_two)
                    if v & (pos ^ (pos - 1)) == v
                ] for pos in range(2 ** bin_number_length)
            ]
        return indices
    
    def __init__(self, train_points, train_labels, test_point):
        self.train_points = [self.normalize_vector(train_point) for train_point in train_points]
        self.train_labels = train_labels
        self.test_point = self.normalize_vector(test_point)       
        self.o_n = math.ceil(np.log2(len(train_points[0])))  # Number of qubits for indexing training/test points
        self.o_m = math.ceil(np.log2(len(train_points)))  # Number of qubits to represent data points
        self.quantum_circuit = self.construct_knn()
        
    def predict(self, num_neighbors):
        self.fidelities = self.get_fidelities(self.counts)
        label_counts = defaultdict(lambda: 0)
        for ix in self.fidelities.argsort()[::-1][:num_neighbors]:
            if ix < len(self.train_labels):
                label_counts[self.train_labels[ix]] += 1
        return max(label_counts, key=label_counts.get)
            
        
    def construct_knn(self):
        # Quantum Registers
        similarity = QuantumRegister(1, name="similarity")
        test_reg = QuantumRegister(self.o_n, name="test_reg")
        train_reg = QuantumRegister(self.o_n, name="train_reg")
        train_basis = QuantumRegister(self.o_m, name="train_basis")

        # State Preparation Circuit without Measurement
        state_prep = QuantumCircuit(similarity, test_reg, train_reg, train_basis, name='state_prep')

        # State Preparation
        state_prep.append(StatePreparation(self.test_point), test_reg[:])
        state_prep.h(train_basis)

        # Controlled State Preparation
        controlled_inits = [StatePreparation(train_state).control(self.o_m) for train_state in self.train_points]

        # Apply controlled initializations and X gates
        where_x = self.where_to_apply_x(self.o_m)
        for i, (c_init, x_idx) in enumerate(zip(controlled_inits, where_x)):
            state_prep.x(train_basis[x_idx])
            state_prep.append(c_init, train_basis[:] + train_reg[:])
            
        state_prep.barrier()

        # Swap Test
        state_prep.h(similarity)
        for psi_bit, phi_bit in zip(test_reg, train_reg):
            state_prep.cswap(similarity, psi_bit, phi_bit)

        state_prep.h(similarity)
        
        state_prep.barrier()

        # Oracle Construction
        def construct_oracle(similarity_qubit_index, train_basis_indices, total_qubits):
            oracle = QuantumCircuit(total_qubits, name='oracle')

            # Apply X gate to similarity qubit to flip it if needed
            oracle.x(similarity_qubit_index)

            # Initialize auxiliary qubits in superposition
            oracle.h(train_basis_indices)

            # Apply multi-controlled Z gate to all auxiliary qubits
            control_qubits = [similarity_qubit_index]
            for aux_bit in train_basis_indices:
                oracle.mcp(2*math.pi, control_qubits, aux_bit)

            # Return auxiliary qubits to original state
            oracle.h(train_basis_indices)

            # Apply X gate to similarity qubit to return it to its original state
            oracle.x(similarity_qubit_index)

            return oracle


        # Construct the oracle
        similarity_qubit_index = 0
        train_basis_indices = list(range(1+self.o_n+self.o_n, 1+self.o_n+self.o_n+self.o_m))
        total_qubits = similarity.size + test_reg.size + train_reg.size + train_basis.size
        oracle = construct_oracle(similarity_qubit_index, train_basis_indices, total_qubits)

        # Grover's Algorithm
        iterations = int(np.floor(np.pi / 4 * np.sqrt(2**self.o_m)))
        problem = AmplificationProblem(oracle=oracle, state_preparation=state_prep)
        grover = Grover(iterations=iterations, sampler=Sampler())  
        amplified_circuit = grover.construct_circuit(problem)

        # Final Circuit with Measurements
        final_circuit = QuantumCircuit(similarity, test_reg, train_reg, train_basis)
        #final_circuit.compose(state_prep, inplace=True)
        final_circuit.compose(amplified_circuit, inplace=True)

        # Classical Register
        c_train_basis = ClassicalRegister(self.o_m, name="meas_basis")
        c_ancilla = ClassicalRegister(1, name="meas_ancilla")
        final_circuit.add_register(c_ancilla)
        final_circuit.add_register(c_train_basis)

        final_circuit.measure(similarity, c_ancilla)

        for qbit, cbit in zip(train_basis, c_train_basis):
            final_circuit.measure(qbit, cbit)
            
        return final_circuit
            
    def measure(self):
        # Execute the circuit
        backend = Aer.get_backend('qasm_simulator')
        tqc = transpile(self.quantum_circuit, backend)
        result = backend.run(tqc, shots=1024).result()
        self.counts = result.get_counts()
            
            
    def get_fidelities(self, counts):
        basis_counts = defaultdict(lambda: 0)
        similarity_counts = defaultdict(lambda: 0)
        for count_key in counts:
            basis, similarity = count_key.split(" ")
            basis_counts[basis] += counts[count_key]
            similarity_counts[similarity] += counts[count_key]

        exp_fidelity = np.abs(similarity_counts['0'] - similarity_counts['1']) / sum(similarity_counts.values())

        num_qubits = len(list(basis_counts.keys())[0])
        fidelities = np.zeros(2 ** num_qubits, dtype=float)
        for basis_state in basis_counts.keys():
            ix = int(basis_state, 2)
            for similarity_state in similarity_counts.keys():
                state_str = basis_state + ' ' + similarity_state
                if state_str in counts:
                    fidelities[ix] += \
                        (-1) ** int(similarity_state) * \
                        (counts[state_str]) / similarity_counts[similarity_state] * \
                        (1 - exp_fidelity ** 2)
            fidelities[ix] *= 2 ** num_qubits / 2
            fidelities[ix] += exp_fidelity
        return fidelities


In [2]:
import random
num_neighbors = 3
num_points = 8  # Number of training points
num_dimensions = 4
test_point = np.random.rand(num_dimensions)
train_points = [(np.random.rand(num_dimensions)) for _ in range(num_points)]
train_labels = [random.randint(0,3) for _ in range(num_points)]

In [3]:
label_counts = defaultdict(lambda: 0)
distances = np.array([math.dist(test_point, train_point) for train_point in train_points])
for ix in distances.argsort()[::][:num_neighbors]:
    label_counts[train_labels[ix]] += 1
pred_label = max(label_counts, key=label_counts.get)
print(pred_label)

2


In [4]:
q_knn = QuantumKNN(train_points, train_labels, test_point)
q_knn.measure()
q_knn.predict(2)

1

In [5]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import pickle

from sklearn import datasets
from sklearn.model_selection import train_test_split , KFold
from sklearn.preprocessing import Normalizer
from sklearn.metrics import accuracy_score
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

from collections import Counter

# Iris Dataset

In [6]:
# import iris dataset
iris = datasets.load_iris()
# np.c_ is the numpy concatenate function
iris_df = pd.DataFrame(data= np.c_[iris['data'], iris['target']],
                      columns= iris['feature_names'] + ['target'])

iris_x= iris_df.iloc[:, :-1] # Data
iris_y= iris_df.iloc[:, -1] # Class

iris_x_train, iris_x_test, iris_y_train, iris_y_test= train_test_split(iris_x, iris_y,
                                                   test_size= 0.14,
                                                   shuffle= True, #shuffle the data to avoid bias
                                                   random_state= 0)

iris_x_train= np.asarray(iris_x_train) # Training Points
iris_y_train= np.asarray(iris_y_train) # Labels

iris_x_test= np.asarray(iris_x_test[:20])
iris_y_test= np.asarray(iris_y_test[:20])

In [7]:
import time    
# import iris dataset
iris = datasets.load_iris()
# np.c_ is the numpy concatenate function
iris_df = pd.DataFrame(data= np.c_[iris['data'], iris['target']],
                      columns= iris['feature_names'] + ['target'])

iris_x= iris_df.iloc[:, :-1] # Data
iris_y= iris_df.iloc[:, -1] # Class

iris_x_train, iris_x_test, iris_y_train, iris_y_test= train_test_split(iris_x, iris_y,
                                                   test_size= 0.14,
                                                   shuffle= True, #shuffle the data to avoid bias
                                                   random_state= 0)

iris_x_train= np.asarray(iris_x_train) # Training Points
iris_y_train= np.asarray(iris_y_train) # Labels

iris_x_test= np.asarray(iris_x_test[:20])
iris_y_test= np.asarray(iris_y_test[:20])

iris_results_dict = {}
training_points = [8, 16, 32, 64, 128]
for tp in training_points:
    print(f"Computing {len(iris_x_test)} points with {len(iris_x_train[:tp])} training points, started.")
    pred_y = defaultdict(lambda: [])
    q_knn_results = []

    start_time = time.time()
    for i, test_point in enumerate(iris_x_test):
        q_knn = QuantumKNN(iris_x_train[:tp], iris_y_train[:tp], test_point)
        q_knn.measure()
        q_knn_results.append(q_knn)
        for nn in range(1, tp+1):
            if nn <= tp:
                pred_y[str(nn)].append(q_knn.predict(nn))

    #q_knn.quantum_circuit.draw('mpl', filename=f"results/iris_quantum_circuit_{tp}_training_points.png")
    accuracy = defaultdict(lambda: 0)
    for pkey in pred_y:
        accuracy[pkey] = accuracy_score(iris_y_test, pred_y[pkey])
    time_taken = time.time() - start_time
    iris_results_dict[str(tp)] = {
        "pred_y": dict(pred_y),
        "time_taken": time_taken,
        "accuracy": dict(accuracy),
        "num_qubits": len(q_knn.quantum_circuit.qubits)
    }
    print(f"Computing {len(iris_x_test)} points with {len(iris_x_train[:tp])} training points, took: {time_taken}")
    print("-------------")

with open('results/iris_results.pkl', 'wb') as fp:
    pickle.dump(iris_results_dict, fp)

Computing 20 points with 8 training points, started.
Computing 20 points with 8 training points, took: 13.411328554153442
-------------
Computing 20 points with 16 training points, started.
Computing 20 points with 16 training points, took: 56.51359581947327
-------------
Computing 20 points with 32 training points, started.
Computing 20 points with 32 training points, took: 187.17765641212463
-------------
Computing 20 points with 64 training points, started.
Computing 20 points with 64 training points, took: 602.1863422393799
-------------
Computing 20 points with 128 training points, started.
Computing 20 points with 128 training points, took: 2109.985020160675
-------------
Computing 20 points with 8 training points, started.
Computing 20 points with 8 training points, took: 42.55013394355774
-------------
Computing 20 points with 16 training points, started.
Computing 20 points with 16 training points, took: 56.77265024185181
-------------
Computing 20 points with 32 training poin

# Wine Dataset

In [8]:
# import iris dataset
wine = datasets.load_wine()
# np.c_ is the numpy concatenate function
wine_df = pd.DataFrame(data= np.c_[wine['data'], wine['target']],
                      columns= wine['feature_names'] + ['target'])

wine_x= wine_df.iloc[:, :-1] # Data
wine_y= wine_df.iloc[:, -1] # Class

#Scale the data
wine_x= pd.DataFrame(StandardScaler().fit_transform(wine_x))
# Create PCA object.
wine_num_components = 4
wine_pca = PCA(n_components=wine_num_components)

#Run PCA.
wine_pComp=wine_pca.fit_transform(wine_x)

wine_reduced_x = pd.DataFrame(data = wine_pComp
             , columns = [f'PC {i}' for i in range(wine_num_components)])

wine_x_train, wine_x_test, wine_y_train, wine_y_test= train_test_split(wine_reduced_x, wine_y,
                                                   test_size= 0.14,
                                                   shuffle= True, #shuffle the data to avoid bias
                                                   random_state= 0)
wine_x_train= np.asarray(wine_x_train) # Training Points
wine_y_train= np.asarray(wine_y_train) # Labels

wine_x_test= np.asarray(wine_x_test[:20])
wine_y_test= np.asarray(wine_y_test[:20])

In [9]:
import time

# import iris dataset
wine = datasets.load_wine()
# np.c_ is the numpy concatenate function
wine_df = pd.DataFrame(data= np.c_[wine['data'], wine['target']],
                      columns= wine['feature_names'] + ['target'])

wine_x= wine_df.iloc[:, :-1] # Data
wine_y= wine_df.iloc[:, -1] # Class

#Scale the data
wine_x= pd.DataFrame(StandardScaler().fit_transform(wine_x))
# Create PCA object.
wine_num_components = 4
wine_pca = PCA(n_components=wine_num_components)

#Run PCA.
wine_pComp=wine_pca.fit_transform(wine_x)

wine_reduced_x = pd.DataFrame(data = wine_pComp
             , columns = [f'PC {i}' for i in range(wine_num_components)])

wine_x_train, wine_x_test, wine_y_train, wine_y_test= train_test_split(wine_reduced_x, wine_y,
                                                   test_size= 0.14,
                                                   shuffle= True, #shuffle the data to avoid bias
                                                   random_state= 0)
wine_x_train= np.asarray(wine_x_train) # Training Points
wine_y_train= np.asarray(wine_y_train) # Labels

wine_x_test= np.asarray(wine_x_test[:20])
wine_y_test= np.asarray(wine_y_test[:20])

wine_results_dict = {}
training_points = [8, 16, 32, 64, 128]
for tp in training_points:
    print(f"Computing {len(wine_x_test)} points with {len(wine_x_train[:tp])} training points, started.")
    pred_y = defaultdict(lambda: [])
    q_knn_results = []

    start_time = time.time()
    for i, test_point in enumerate(wine_x_test):
        q_knn = QuantumKNN(wine_x_train[:tp], wine_y_train[:tp], test_point)
        q_knn.measure()
        q_knn_results.append(q_knn)
        for nn in range(1, tp+1):
            if nn <= tp:
                pred_y[str(nn)].append(q_knn.predict(nn))

    #q_knn.quantum_circuit.draw('mpl', filename=f"results/wine_quantum_circuit_{tp}_training_points.png")
    accuracy = defaultdict(lambda: 0)
    for pkey in pred_y:
        accuracy[pkey] = accuracy_score(wine_y_test, pred_y[pkey])
    time_taken = time.time() - start_time
    wine_results_dict[str(tp)] = {
        "pred_y": dict(pred_y),
        "time_taken": time_taken,
        "accuracy": dict(accuracy),
        "num_qubits": len(q_knn.quantum_circuit.qubits)
    }
    print(f"Computing {len(wine_x_test)} points with {len(wine_x_train[:tp])} training points, took: {time_taken}")
    print("-------------")

with open('results/wine_results.pkl', 'wb') as fp:
    pickle.dump(wine_results_dict, fp)

Computing 20 points with 8 training points, started.
Computing 20 points with 8 training points, took: 25.5241596698761
-------------
Computing 20 points with 16 training points, started.
Computing 20 points with 16 training points, took: 117.59724688529968
-------------
Computing 20 points with 32 training points, started.
Computing 20 points with 32 training points, took: 304.92507696151733
-------------
Computing 20 points with 64 training points, started.
Computing 20 points with 64 training points, took: 1006.4113292694092
-------------
Computing 20 points with 128 training points, started.
Computing 20 points with 128 training points, took: 3642.2739400863647
-------------
Computing 20 points with 8 training points, started.
Computing 20 points with 8 training points, took: 76.04200839996338
-------------
Computing 20 points with 16 training points, started.
Computing 20 points with 16 training points, took: 98.31896662712097
-------------
Computing 20 points with 32 training poi

# Breast Cancer

In [10]:
# import iris dataset
breast_cancer = datasets.load_breast_cancer()
# np.c_ is the numpy concatenate function
breast_cancer_df = pd.DataFrame(data= np.c_[breast_cancer['data'], breast_cancer['target']],
                      columns= np.append(breast_cancer['feature_names'], ['target']))

breast_cancer_x= breast_cancer_df.iloc[:, :-1] # Data
breast_cancer_y= breast_cancer_df.iloc[:, -1] # Class

#Scale the data
breast_cancer_x= pd.DataFrame(StandardScaler().fit_transform(breast_cancer_x))
# Create PCA object.
breast_cancer_num_components = 4
breast_cancer_pca = PCA(n_components=breast_cancer_num_components)

#Run PCA.
pComp=breast_cancer_pca.fit_transform(breast_cancer_x)

breast_cancer_reduced_x = pd.DataFrame(data = pComp
             , columns = [f'PC {i}' for i in range(breast_cancer_num_components)])

breast_cancer_x_train, breast_cancer_x_test, breast_cancer_y_train, breast_cancer_y_test= train_test_split(
    breast_cancer_reduced_x, breast_cancer_y,
    test_size= 0.14,
    shuffle= True, #shuffle the data to avoid bias
    random_state= 0)

breast_cancer_x_train= np.asarray(breast_cancer_x_train) # Training Points
breast_cancer_y_train= np.asarray(breast_cancer_y_train) # Labels

breast_cancer_x_test= np.asarray(breast_cancer_x_test[:20])
breast_cancer_y_test= np.asarray(breast_cancer_y_test[:20])

In [11]:
import time
    
# import iris dataset
breast_cancer = datasets.load_breast_cancer()
# np.c_ is the numpy concatenate function
breast_cancer_df = pd.DataFrame(data= np.c_[breast_cancer['data'], breast_cancer['target']],
                      columns= np.append(breast_cancer['feature_names'], ['target']))

breast_cancer_x= breast_cancer_df.iloc[:, :-1] # Data
breast_cancer_y= breast_cancer_df.iloc[:, -1] # Class

#Scale the data
breast_cancer_x= pd.DataFrame(StandardScaler().fit_transform(breast_cancer_x))
# Create PCA object.
breast_cancer_num_components = 4
breast_cancer_pca = PCA(n_components=breast_cancer_num_components)

#Run PCA.
pComp=breast_cancer_pca.fit_transform(breast_cancer_x)

breast_cancer_reduced_x = pd.DataFrame(data = pComp
             , columns = [f'PC {i}' for i in range(breast_cancer_num_components)])

breast_cancer_x_train, breast_cancer_x_test, breast_cancer_y_train, breast_cancer_y_test= train_test_split(
    breast_cancer_reduced_x, breast_cancer_y,
    test_size= 0.14,
    shuffle= True, #shuffle the data to avoid bias
    random_state= 0)

breast_cancer_x_train= np.asarray(breast_cancer_x_train) # Training Points
breast_cancer_y_train= np.asarray(breast_cancer_y_train) # Labels

breast_cancer_x_test= np.asarray(breast_cancer_x_test[:20])
breast_cancer_y_test= np.asarray(breast_cancer_y_test[:20])

breast_cancer_results_dict = {}
training_points = [8, 16, 32, 64, 128]
for tp in training_points:
    print(f"Computing {len(breast_cancer_x_test)} points with {len(breast_cancer_x_train[:tp])} training points, started.")
    pred_y = defaultdict(lambda: [])
    q_knn_results = []

    start_time = time.time()
    for i, test_point in enumerate(breast_cancer_x_test):
        q_knn = QuantumKNN(breast_cancer_x_train[:tp], breast_cancer_y_train[:tp], test_point)
        q_knn.measure()
        q_knn_results.append(q_knn)
        for nn in range(1, tp+1):
            if nn <= tp:
                pred_y[str(nn)].append(q_knn.predict(nn))

    #q_knn.quantum_circuit.draw('mpl', filename=f"results/breast_cancer_quantum_circuit_{tp}_training_points.png")
    accuracy = defaultdict(lambda: 0)
    for pkey in pred_y:
        accuracy[pkey] = accuracy_score(breast_cancer_y_test, pred_y[pkey])
    time_taken = time.time() - start_time
    breast_cancer_results_dict[str(tp)] = {
        "pred_y": dict(pred_y),
        "time_taken": time_taken,
        "accuracy": dict(accuracy),
        "num_qubits": len(q_knn.quantum_circuit.qubits)
    }
    print(f"Computing {len(breast_cancer_x_test)} points with {len(breast_cancer_x_train[:tp])} training points, took: {time_taken}")
    print("-------------")

with open('results/breast_cancer_results.pkl', 'wb') as fp:
    pickle.dump(breast_cancer_results_dict, fp)

Computing 20 points with 8 training points, started.
Computing 20 points with 8 training points, took: 24.70741033554077
-------------
Computing 20 points with 16 training points, started.
Computing 20 points with 16 training points, took: 148.76504731178284
-------------
Computing 20 points with 32 training points, started.
Computing 20 points with 32 training points, took: 326.3528046607971
-------------
Computing 20 points with 64 training points, started.
Computing 20 points with 64 training points, took: 1049.5465931892395
-------------
Computing 20 points with 128 training points, started.
Computing 20 points with 128 training points, took: 3641.968740940094
-------------
Computing 20 points with 8 training points, started.
Computing 20 points with 8 training points, took: 24.401777744293213
-------------
Computing 20 points with 16 training points, started.
Computing 20 points with 16 training points, took: 82.87090468406677
-------------
Computing 20 points with 32 training poi