In [3]:
# from swaptest import cswaptest
import numpy as np
import sklearn, qiskit
from sklearn.model_selection import train_test_split
import collections
from qiskit.primitives import Sampler

def create_dataset(n_samples, test_ratio=0.2):
    # Generate synthetic data for Class 0
    mean0 = [2, 2]
    cov0 = [[1, 0.5], [0.5, 1]]  # diagonal covariance
    data0 = np.random.multivariate_normal(mean0, cov0, n_samples)
    labels0 = np.zeros(n_samples)

    # Generate synthetic data for Class 1
    mean1 = [6, 5]
    cov1 = [[1, -0.5], [-0.5, 1]]  # diagonal covariance
    data1 = np.random.multivariate_normal(mean1, cov1, n_samples)
    labels1 = np.ones(n_samples)

    # Combine data and labels
    data = np.vstack((data0, data1))
    labels = np.concatenate((labels0, labels1))

    # Splitting dataset into training and testing sets
    train_x, test_x, train_y, test_y = train_test_split(
        data, labels, test_size=test_ratio, random_state=42
    )
    return train_x, test_x, train_y, test_y


def encoded_circuit(input_vector):
    encode = Encoding(input_vector, "dc_amplitude_encoding")
    return encode.qcircuit


def get_index_fredkin_gate(N, padding=0):
    """Get paramaters for log2(N) Fredkin gates

    Args:
        - N (int): dimensional of states
        - padding (int, optional): Defaults to 0.

    Returns:
        - list of int: params for the second and third Frekin gates
    """
    indices = []
    for i in range(0, int(np.log2(N))):
        indices.append(2**i + padding)
    return indices


def create_integrated_swap_test_circuit(vector1, vector2):
    """[summary]

    Args:
        - vector1 (numpy array): First vector
        - vector2 (numpy array): Second vector

    Returns:
        QuantumCircuit: full circuit
    """
    N = len(vector1)
    cs1 = get_index_fredkin_gate(N)
    cs2 = get_index_fredkin_gate(N, N - 1)
    total_qubits = (N - 1) * 2 + 1
    # Construct circuit
    qc = qiskit.QuantumCircuit(total_qubits, 1)
    qc.h(0)
    qc.compose(encoded_circuit(vector1), qubits=[*range(1, N)], inplace=True)
    qc.compose(encoded_circuit(vector2), qubits=[*range(N, 2 * N - 1)], inplace=True)
    for i in range(len(cs1)):
        qc.cswap(0, cs1[i], cs2[i])
    qc.h(0)
    qc.measure(0, 0)
    return qc


def integrated_swap_test_circuit(vector1, vector2):
    """Return fidelity between two same - dimension vectors by cswaptest
    Args:
        - vector1 (numpy array): First vector, don't need to normalize
        - vector2 (numpy array): Second vector, don't need to normalize
    Returns:
        - float: Fidelity = sqrt((p(0) - p(1)) / n_shot)
    """
    if len(vector1) != len(vector2):
        raise Exception("Two states must have the same dimensional")
    vector1 = vector1 / np.linalg.norm(vector1)
    vector2 = vector2 / np.linalg.norm(vector1)
    qc = create_integrated_swap_test_circuit(vector1, vector2)
    sampler = Sampler()
    counts = sampler.run(qc, shots = 10000).result().quasi_dists[0]
    return np.sqrt(np.abs((counts.get(0, 0) - counts.get(1, 0)) / 1024))


def get_fidelity(vector1, vector2, iteration: int = 1):
    """Run ist circuit many times

    Args:
        - vector1 (numpy array): First vector, don't need to normalize
        - vector2 (numpy array): Second vector, don't need to normalize
        - iteration (int): Number of iteration

    Returns:
        - Float: mean fidelity
    """
    fidelities = np.array([])
    for _ in range(0, iteration):
        fidelity = integrated_swap_test_circuit(vector1, vector2)
        fidelities = np.append(fidelities, fidelity)
    return np.average(fidelities)


def encode(xss):
    """Convert normal vector to normalized state

    Args:
        xss (list of list): dataset

    Returns:
        list of list: normalized dataset
    """
    amplitudes = np.sqrt(np.einsum("ij,ij->i", xss, xss))
    amplitudes[amplitudes == 0] = 1
    normalised_data = xss / amplitudes[:, np.newaxis]
    return normalised_data


def get_major_vote(labels):
    """Get major label value

    Args:
        labels (list): list of value

    Returns:
        int: major vote
    """
    x = collections.Counter(labels)
    return x.most_common(1)[0][0]


def sort_return_index(xs):
    """Note that we must sort for large to small, because scalar product between
    two similar vectors is almost 1 (other wise with Eucliean distance), I wrong this
    point and take few hours to catch this error :))

    Args:
        xs (list): list of scalar products between train vectors and one test vector

    Returns:
        list: sorted list but return indices
    """
    new_xs = sorted(range(len(xs)), key=lambda k: xs[k])
    new_xs.reverse()
    return new_xs


def get_sublist_with_indices(xs, indices, k):
    return [xs[index] for index in indices][:k]


def distances(xs, yss, iteration: int = 1):
    """Return a lots of distance

    Args:
        - xs (list of float): vector
        - yss (list of list): dataset
        - iteration (int): number of iteration

    Returns:
        - list of values: all distances from vector to others vector in dataset
    """
    distances = []
    for ys in yss:
        distances.append(get_fidelity(vector1=xs, vector2=ys, iteration=iteration))
    return distances


def predict(train_datas, train_labels, test_datas, k: int = 1, iteration: int = 1):
    """Return predicted labels QKNN algorithm

    Args:
        - train_datas (numpy array 2D): Vectors in train data
        - train_labels (numpy array 1D): Labels in train data
        - test_datas (numpy array 2D): Vectors in test data
        - k (int): Number of neighboors
        - iteration (int): number of iteration
    Returns:
        - list of int: predicted labels
    """
    predict_labels = []
    i = 0
    for i, test_data in enumerate(test_datas):
        xs = distances(test_data, train_datas, iteration)
        indices_of_sorted_xs = sort_return_index(xs)
        labels = get_sublist_with_indices(train_labels, indices_of_sorted_xs, k)
        predict_labels.append(get_major_vote(labels))
        if i % 10 == 0:
            print("Progress " + (str(int(i / len(test_datas) * 100)) + "%"))
    return predict_labels


def bench_mark(ground_truth, predict):
    """Return predict labels QKNN algorithm

    Args:
        - ground_truth (numpy array 1D): truth labels
        - predict (numpy array 1D): predict labels

    Returns:
        - Tuple: benchmark on classifer problem
    """
    accuracy = sklearn.metrics.accuracy_score(ground_truth, predict)
    precision = sklearn.metrics.precision_score(
        ground_truth, predict, average="weighted"
    )
    recall = sklearn.metrics.recall_score(ground_truth, predict, average="weighted")
    f1 = sklearn.metrics.f1_score(ground_truth, predict, average="micro")
    matrix = sklearn.metrics.confusion_matrix(ground_truth, predict)
    return accuracy, precision, recall, matrix


"""
function to load classical data in a quantum device
"""

import numpy as np
import qiskit


class bin_tree:
    size = None
    values = None

    def __init__(self, values):
        self.size = len(values)
        self.values = values

    def parent(self, key):
        return int((key - 0.5) / 2)

    def left(self, key):
        return int(2 * key + 1)

    def right(self, key):
        return int(2 * key + 2)

    def root(self):
        return 0

    def __getitem__(self, key):
        return self.values[key]


class Encoding:
    qcircuit = None
    quantum_data = None
    classical_data = None
    num_qubits = None
    tree = None
    output_qubits = []

    def __init__(self, input_vector, encode_type="amplitude_encoding"):
        if encode_type == "amplitude_encoding":
            self.amplitude_encoding(input_vector)
        if encode_type == "qubit_encoding":
            self.qubit_encoding(input_vector)
        if encode_type == "dc_amplitude_encoding":
            self.dc_amplitude_encoding(input_vector)
        if encode_type == "basis_encoding":
            self.basis_encoding(input_vector)

    def basis_encoding(self, input_vector, n_classical=1):
        """
        encoding a binary string x in a basis state |x>
        """
        self.num_qubits = int(len(input_vector))
        self.quantum_data = qiskit.QuantumRegister(self.num_qubits)
        self.classical_data = qiskit.ClassicalRegister(n_classical)
        self.qcircuit = qiskit.QuantumCircuit(self.quantum_data, self.classical_data)
        for k, _ in enumerate(input_vector):
            if input_vector[k] == 1:
                self.qcircuit.x(self.quantum_data[k])

    def qubit_encoding(self, input_vector, n_classical=1):
        """
        encoding a binary string x as
        """
        input_pattern = qiskit.QuantumRegister(len(input_vector))
        classical_register = qiskit.ClassicalRegister(n_classical)
        self.qcircuit = qiskit.QuantumCircuit(input_pattern, classical_register)
        for k, _ in enumerate(input_vector):
            self.qcircuit.ry(input_vector[k], input_pattern[k])

    @staticmethod
    def _recursive_compute_beta(input_vector, betas):
        if len(input_vector) > 1:
            new_x = []
            beta = []
            for k in range(0, len(input_vector), 2):
                norm = np.sqrt(input_vector[k] ** 2 + input_vector[k + 1] ** 2)
                new_x.append(norm)
                if norm == 0:
                    beta.append(0)
                else:
                    if input_vector[k] < 0:
                        beta.append(
                            2 * np.pi - 2 * np.arcsin(input_vector[k + 1] / norm)
                        )  ## testing
                    else:
                        beta.append(2 * np.arcsin(input_vector[k + 1] / norm))
            Encoding._recursive_compute_beta(new_x, betas)
            betas.append(beta)
            output = []

    @staticmethod
    def _index(k, circuit, control_qubits, numberof_controls):
        binary_index = "{:0{}b}".format(k, numberof_controls)
        for j, qbit in enumerate(control_qubits):
            if binary_index[j] == "1":
                circuit.x(qbit)

    def amplitude_encoding(self, input_vector):
        """
        load real vector x to the amplitude of a quantum state
        """
        self.num_qubits = int(np.log2(len(input_vector)))
        self.quantum_data = qiskit.QuantumRegister(self.num_qubits)
        self.qcircuit = qiskit.QuantumCircuit(self.quantum_data)
        newx = np.copy(input_vector)
        betas = []
        Encoding._recursive_compute_beta(newx, betas)
        self._generate_circuit(betas, self.qcircuit, self.quantum_data)

    def dc_amplitude_encoding(self, input_vector):
        self.num_qubits = int(len(input_vector)) - 1
        self.quantum_data = qiskit.QuantumRegister(self.num_qubits)
        self.qcircuit = qiskit.QuantumCircuit(self.quantum_data)
        newx = np.copy(input_vector)
        betas = []
        Encoding._recursive_compute_beta(newx, betas)
        self._dc_generate_circuit(betas, self.qcircuit, self.quantum_data)

    def _dc_generate_circuit(self, betas, qcircuit, quantum_input):

        k = 0
        linear_angles = []
        for angles in betas:
            linear_angles = linear_angles + angles
            for angle in angles:
                qcircuit.ry(angle, quantum_input[k])
                k += 1

        self.tree = bin_tree(quantum_input)
        my_tree = self.tree

        last = my_tree.size - 1
        actual = my_tree.parent(last)
        level = my_tree.parent(last)
        while actual >= 0:
            left_index = my_tree.left(actual)
            right_index = my_tree.right(actual)
            while right_index <= last:

                qcircuit.cswap(
                    my_tree[actual], my_tree[left_index], my_tree[right_index]
                )

                left_index = my_tree.left(left_index)
                right_index = my_tree.left(right_index)
            actual -= 1
            if level != my_tree.parent(actual):
                level -= 1

        # set output qubits
        next_index = 0
        while next_index < my_tree.size:
            self.output_qubits.append(next_index)
            next_index = my_tree.left(next_index)

    def _generate_circuit(self, betas, qcircuit, quantum_input):
        numberof_controls = 0  # number of controls
        control_bits = []
        for angles in betas:
            if numberof_controls == 0:
                qcircuit.ry(angles[0], quantum_input[self.num_qubits - 1])
                numberof_controls += 1
                control_bits.append(quantum_input[self.num_qubits - 1])
            else:
                for k, angle in enumerate(reversed(angles)):
                    Encoding._index(k, qcircuit, control_bits, numberof_controls)

                    qcircuit.mcry(
                        angle,
                        control_bits,
                        quantum_input[self.num_qubits - 1 - numberof_controls],
                        None,
                        mode="noancilla",
                    )

                    Encoding._index(k, qcircuit, control_bits, numberof_controls)
                control_bits.append(
                    quantum_input[self.num_qubits - 1 - numberof_controls]
                )
                numberof_controls += 1

def print_result(results):
    accuracy, precision, recall, matrix = results
    print("accuracy: ", accuracy)
    print("precision: ", precision)
    print("recall: ", recall)
    print("matrix: ", matrix)


def processing(invocation_input):
    k = invocation_input["k"]
    iteration = invocation_input["iteration"]
    num_samples = invocation_input["num_samples"]
    test_ratio = invocation_input["test_ratio"]
    train_x, test_x, train_y, test_y = create_dataset(num_samples, test_ratio=0.2)
    hat_y = np.asarray(predict(train_x, train_y, test_x, k, iteration))
    global result
    result = bench_mark(test_y, hat_y)
    circuit = qiskit.QuantumCircuit(1, 1)
    return circuit

def post_processing(job_result):
    print_result(result)
    return job_result

Progress 0%
Progress 25%
Progress 50%
Progress 75%
Predict labels:  [0. 1. 0. 1. 0. 1. 1. 1. 1. 0. 0. 0. 1. 0. 1. 1. 1. 1. 0. 0. 1. 0. 1. 0.
 0. 1. 0. 0. 0. 1. 1. 1. 0. 0. 0. 0. 1. 1. 1. 1.]
Test labels:  [0. 0. 0. 1. 1. 1. 0. 1. 1. 0. 0. 1. 1. 0. 1. 1. 0. 1. 0. 0. 1. 0. 1. 0.
 0. 0. 0. 0. 1. 1. 0. 0. 0. 0. 1. 1. 1. 1. 1. 0.]
accuracy:  0.7
precision:  0.7035087719298245
recall:  0.7
matrix:  [[14  7]
 [ 5 14]]


In [4]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

# Seed for reproducibility
np.random.seed(42)

# Number of samples per class
n_samples = 50

# Generate synthetic data for Class 0
mean0 = [2, 2]
cov0 = [[1, 0.5], [0.5, 1]]  # diagonal covariance
data0 = np.random.multivariate_normal(mean0, cov0, n_samples)
labels0 = np.zeros(n_samples)

# Generate synthetic data for Class 1
mean1 = [6, 5]
cov1 = [[1, -0.5], [-0.5, 1]]  # diagonal covariance
data1 = np.random.multivariate_normal(mean1, cov1, n_samples)
labels1 = np.ones(n_samples)

# Combine data and labels
data = np.vstack((data0, data1))
labels = np.concatenate((labels0, labels1))

# Splitting dataset into training and testing sets
train_x, test_x, train_y, test_y = train_test_split(
    data, labels, test_size=0.2, random_state=42
)

[[7.64844768 3.32503845]
 [6.71550707 5.73802701]
 [2.2803935  2.89206979]
 [2.95105452 2.44929748]
 [4.68004127 5.08909442]
 [1.59495412 3.4472323 ]
 [2.01168515 2.36879772]
 [5.19171999 4.56254123]
 [3.49252377 2.08022007]
 [6.16584633 5.61597654]
 [2.65389295 1.22914477]
 [3.0713331  1.60781815]
 [6.36243695 3.82175276]
 [2.88138988 2.57217751]
 [5.55374974 5.37342135]
 [2.63419637 2.16846662]
 [7.15677822 4.31681421]
 [7.37719541 3.69136756]
 [0.91800394 2.45604051]
 [2.5405444  1.48283348]
 [2.11286518 1.87827805]
 [2.00937389 1.36425414]
 [3.77496643 3.2126789 ]
 [6.81489269 5.04150611]
 [0.64148816 1.57276828]
 [3.05181224 3.24867348]
 [4.58628564 5.59303204]
 [1.91187619 1.52679391]
 [6.58699466 6.60346096]
 [2.80894067 3.18463869]
 [2.58393791 0.82089775]
 [0.24864422 1.01607895]
 [1.89807704 0.67723339]
 [2.0741187  1.77511135]
 [2.72001381 3.03426115]
 [4.62110834 6.85272458]
 [2.74709465 0.8338144 ]
 [5.66196156 4.83456279]
 [1.24870443 2.81334809]
 [7.58928412 6.13088504]


In [2]:
# from swaptest import cswaptest
import numpy as np
import sklearn, qiskit
from sklearn.model_selection import train_test_split
import collections
from qiskit.primitives import Sampler

def create_dataset(n_samples, test_ratio=0.2):
    # Generate synthetic data for Class 0
    mean0 = [2, 2]
    cov0 = [[1, 0.5], [0.5, 1]]  # diagonal covariance
    data0 = np.random.multivariate_normal(mean0, cov0, n_samples)
    labels0 = np.zeros(n_samples)

    # Generate synthetic data for Class 1
    mean1 = [6, 5]
    cov1 = [[1, -0.5], [-0.5, 1]]  # diagonal covariance
    data1 = np.random.multivariate_normal(mean1, cov1, n_samples)
    labels1 = np.ones(n_samples)

    # Combine data and labels
    data = np.vstack((data0, data1))
    labels = np.concatenate((labels0, labels1))

    # Splitting dataset into training and testing sets
    train_x, test_x, train_y, test_y = train_test_split(
        data, labels, test_size=test_ratio, random_state=42
    )
    
    import pandas as pd
    pd.DataFrame(train_x).to_csv('train_x.csv')
    pd.DataFrame(test_x).to_csv('test_x.csv')
    pd.DataFrame(train_y).to_csv('train_y.csv')
    pd.DataFrame(test_y).to_csv('test_y.csv')
    return train_x, test_x, train_y, test_y

create_dataset(100, 0.2)

(array([[ 3.7050156 ,  3.10442198],
        [ 5.13220877,  4.02182665],
        [ 0.33957146,  1.12545764],
        [ 1.28110753,  0.5427208 ],
        [ 7.2992976 ,  4.5924492 ],
        [ 7.01574111,  4.75400733],
        [ 3.08836895,  2.61985786],
        [ 0.05329865,  2.13785629],
        [ 5.66308439,  5.71768534],
        [ 1.61969961,  0.78780112],
        [ 5.68901489,  5.16335561],
        [ 4.98075327,  4.99962534],
        [ 2.04308654,  0.9647758 ],
        [ 6.63155791,  6.09076559],
        [ 5.60551381,  5.40367185],
        [ 3.35217906,  1.55149292],
        [ 4.99601771,  6.34799447],
        [ 1.19863692,  1.45550523],
        [ 7.44763333,  2.89710707],
        [ 1.30541373, -0.22147798],
        [ 5.47093414,  6.00363276],
        [ 3.12975595,  1.88369794],
        [ 1.38041246,  1.01616774],
        [ 2.68062501,  2.46290647],
        [ 2.884787  ,  1.26831032],
        [ 5.55807029,  6.02025008],
        [ 2.58779281,  2.58913434],
        [ 5.58116837,  3.711