In [44]:
import itertools
import logging, warnings
import numbers

import pandas as pd
import qiskit as qk
import qiskit.aqua as aqua
import qiskit.aqua.utils.subsystem as ss
import qiskit.circuit as qcirc
import qiskit.circuit.instruction as qinst
import qiskit.extensions.quantum_initializer as qi
import qiskit.providers as qkp
import qiskit.result as qres
import qiskit.tools as qktools
import numpy as np
import scipy.stats as stats

from qiskit import aqua
from qiskit.aqua.algorithms import QuantumAlgorithm
from qiskit.aqua.algorithms.classifiers.qsvm._qsvm_abc import _QSVM_ABC
from qiskit.circuit.controlledgate import ControlledGate
from qiskit.circuit.gate import Gate
from sklearn import datasets, metrics
from sklearn.metrics import mean_squared_error
from typing import Dict, List, Optional, Union

logger = logging.getLogger(__name__)

UnionQInstBaseB = Union[aqua.QuantumInstance, qkp.BaseBackend]
OptionalQInstance = Optional[UnionQInstBaseB]

## The QKNN Classifier

In [2]:
class _QKNN(_QSVM_ABC):
    """The qknn classifier.
    A class maintaining:
        - a QKNeighborsClassifier quantum algorithm;
        - manages the running, testing and predicting using all available data in said quantum algorithm.
    """
    
    def __init__(self, qalgo):
        super().__init__(qalgo)

        
    def predict(self, data):
        circuits = self._qalgo.construct_circuits(data, self._qalgo.training_dataset)
        circuit_results = self._qalgo.get_circuit_results(circuits)
        fidelities = self._qalgo.get_all_fidelities(circuit_results)
        
        predicted_labels = self._qalgo.majority_vote(self._qalgo.training_labels, fidelities)

        return predicted_labels

    
    def run(self):
        circuits = self._qalgo.construct_circuits(self._qalgo.data_points, self._qalgo.training_dataset)
        circuit_results = self._qalgo.get_circuit_results(circuits)
        fidelities = self._qalgo.get_all_fidelities(circuit_results)

        predicted_labels = self._qalgo.majority_vote(self._qalgo.training_labels, fidelities)

        self._ret['counts'] = circuit_results.get_counts()
        self._ret['fidelities'] = fidelities
        self._ret['predicted_labels'] = predicted_labels

        
    @staticmethod
    def execute_all(qalgo, data, training_data):
        pass

## Some Quantum Gates

import qiskit_quantum_knn.qknn.quantumgates as gates

In [3]:
def swap():
    """Decomposition of the SWAP-gate.
    Returns:
        Instruction: the SWAP-gate.
    """
    swap_circuit = qk.QuantumCircuit(2, name='swap_gate')
    swap_circuit.cx(0, 1)
    swap_circuit.cx(1, 0)
    swap_circuit.cx(0, 1)
    return swap_circuit.to_instruction()


def fidelity_instruction():
    r"""Decomposition of the SWAP-measurement.
    Returns:
        Instruction: The Fidelity gate (swap measurement).
    """
    fidelity_circ = qk.QuantumCircuit(3, 1)
    fidelity_circ.h(0)
    fidelity_circ.cswap(0, 1, 2)
    fidelity_circ.h(0)
    fidelity_circ.measure(0, 0)

    fidelity_instr = fidelity_circ.to_instruction()

    return fidelity_instr


def init_to_state(reg_to_init: qk.QuantumRegister,
                  init_state: np.ndarray,
                  name: Optional[str] = None) -> Gate:
    """Initialize a :class:`QuantumRegister` to the provided state.
    Args:
        reg_to_init (QuantumRegister): register which needs to be initialized.
        init_state (np.ndarray): state to which the :py:attr:`reg_to_init` must be initialized to.
        name (str): optional, name for the ``init_gate``.
    Raises:
        ValueError: if the register and state do not have the same dimension.
    Returns:
        Gate: The initialiser.
    """
    # check if provided values are correct
    if len(init_state) != 2 ** len(reg_to_init):
        raise ValueError(
            "Dimensionality of the init_state does not coincide with the "
            "length of the register to initialise to: is {0} and {1}".format(len(init_state), len(reg_to_init)))

    init_circ = qk.QuantumCircuit(reg_to_init, name=name)  # create temp circuit
    init = qi.Isometry(init_state, 0, 0)                   # create Isometry for init
    init_circ.append(init, reg_to_init)                    # apply init to temp circuit

    basis_gates = ['u1', 'u2', 'u3', 'cx']  # list basis gates
    # transpile circuit so that it is decomposed to the basis gates above making it unitary and possible to convert from Instruction to Gate
    transpiled = qk.transpile(init_circ, basis_gates=basis_gates)
    init_gate = transpiled.to_gate()  # convert to Gate
    
    return init_gate


def controlled_initialize(reg_to_init: qk.QuantumRegister,
                          init_state: np.ndarray,
                          num_ctrl_qubits: Optional[int] = 1,
                          name: Optional[str] = None) -> ControlledGate:
    """Initialize a register to provided state with control.
    Args:
        reg_to_init (QuantumRegister): register which needs to be initialized.
        init_state (np.ndarray): state to which the ``reg_to_init`` must be initialized to.
        num_ctrl_qubits (int): optional, number of desired controls.
        name (str): optional, name for the ``init_gate``.
    Returns:
        ControlledGate: The produced controlled initialise.
    """
    # create the init state
    init_gate = init_to_state(reg_to_init, init_state, name)
    # make it controlled
    controlled_init = init_gate.control(num_ctrl_qubits=num_ctrl_qubits)

    return controlled_init


## Construction of a QKNN Quantum Circuit

from qiskit_quantum_knn.qknn import qknn_construction as qc

In [4]:
def create_qknn(state_to_classify: Union[List, np.ndarray],
                classified_states: Union[List, np.ndarray],
                add_measurement: bool = False) -> qk.QuantumCircuit:
    """ Construct one QkNN QuantumCircuit.
    Args:
        state_to_classify (numpy.ndarray): array of dimension N complex values describing the state to classify via KNN
        classified_states (numpy.ndarray): array containing M training samples of dimension N
        add_measurement (bool): controls if measurements must be added to the classical registers
    Returns:
        QuantumCircuit: the constructed circuit.
    """
    oracle = create_oracle(classified_states)
    return construct_circuit(state_to_classify, oracle, add_measurement)


def construct_circuit(state_to_classify: np.ndarray,
                      oracle: qinst.Instruction,
                      add_measurement: bool) -> qk.QuantumCircuit:
    r"""Setup for a QkNN QuantumCircuit.
    Args:
        state_to_classify (numpy.ndarray)
        oracle (qiskit Instruction): oracle :math:`\mathcal{W}` for applying training data.
        add_measurement (bool)
    Raises:
        ValueError: If the number of data points in :attr:`state_to_classify` is more than 2.
        ValueError: If the length of the vectors in the :attr:`classified_states` and/or test data are not a positive power of 2.
    Returns:
        QuantumCircuit: constructed circuit.
    """

    if len(np.array(state_to_classify).shape) != 1:
        raise ValueError(
            f"Please only one data point to classify. Number of data points "
            f"provided is: {np.array(state_to_classify).shape[0]}. "
        )

    # get the dimensions of the state to classify
    state_dimension = len(state_to_classify)

    # get the number of qubits for the registers containing the state to classify and the number of train samples
    n = np.log2(state_dimension)  # n qubits for describing states
    m = oracle.num_qubits - n     # n qubits for computational basis

    # Check if param is a power of 2
    if (n == 0 or not n.is_integer()) and (m == 0 or not m.is_integer()):
        raise ValueError(
            "Desired statevector length not a positive power of 2."
        )

    # step 1: initialise (creates registers, sets qubits to |0> or the state to classify
    qknn_circ = initialise_qknn(n, m, state_to_classify)
    # step 2: state trans. (applies oracle)
    qknn_circ = state_transformation(qknn_circ, oracle)
    # step 3: adds the measurement gates
    if add_measurement:
        qknn_circ = add_measurements(qknn_circ)

    logger.debug(f"Final circuit:\n{qknn_circ.draw(fold=90)}")

    return qknn_circ


def create_oracle(train_data: Union[List, np.ndarray]) -> qinst.Instruction:
    r"""Create an Oracle to perform as QRAM.
    Args:
        train_data (array-like): List of vectors with dimension ``len(r_train)`` to initialize ``r_train`` to.
    Returns:
        circuit.instruction.Instruction: Instruction of the Oracle.
    """
    # get shape of training data
    train_shape = np.shape(train_data)
    # check if training data is two dimensional
    if len(train_shape) != 2:
        raise ValueError("Provided training data not 2-dimensional. Provide"
                         "a matrix of shape n_samples x dim")
    # get the log2 values of the number of samples m from training data and dimension of vectors n
    m, n = np.log2(train_shape)
    # check if m should be ceiled.
    if not m.is_integer():
        warnings.warn("Number of training states not a positive power of 2,"
                      "adding extra qubit to comply.")
        m = np.ceil(m)

    # create quantum registers
    r_train = qk.QuantumRegister(n, name='train_states')
    r_comp_basis = qk.QuantumRegister(m, name='comp_basis')

    # initialize the list containing controlled inits – assign each training state i to r_train 
    # (so should be as long as number of samples)
    controlled_inits = [qcirc.ControlledGate] * train_shape[0]

    # create an empty circuit with the registers
    oracle_circ = qk.QuantumCircuit(r_train, r_comp_basis, name='oracle')

    # create all the controlled inits for each vector in train data
    for i, train_state in enumerate(train_data):
        controlled_inits[i] = \
            controlled_initialize(r_train,
                                  train_state,
                                  num_ctrl_qubits=r_comp_basis.size,
                                  name="phi_{}".format(i))

    # apply all the x-gates and controlled inits to the circuit
    # define the length of the binary string which will translate |i>
    bin_number_length = r_comp_basis.size
    where_x = where_to_apply_x(bin_number_length)

    for i, (c_init, x_idx) in enumerate(zip(controlled_inits, where_x)):
        # represent i in binary number with length bin_number_length, so leading zeros are included
        logger.debug(f"applying x-gates to: {x_idx}")
        # apply the x-gates
        oracle_circ.x(r_comp_basis[x_idx])
        # apply the controlled init
        oracle_circ.append(c_init, r_comp_basis[:] + r_train[:])

    logger.debug(f"Created oracle as:\n{oracle_circ.draw()}")

    return oracle_circ.to_instruction()


def where_to_apply_x(bin_number_length: int) -> List:
    r""" Create an array to apply :math:`X`-gates systematically to create all possible register combinations.
    Args:
        bin_number_length (int): the length of the highest binary value (or the number of qubits).
    Returns:
        List: All possible combinations.
    """
    powers_of_two = 2 ** np.arange(bin_number_length)
    indices = \
        [
            [
                ind for ind, v in enumerate(powers_of_two)
                if v & (pos ^ (pos - 1)) == v
            ] for pos in range(2 ** bin_number_length)
        ]
    return indices


def initialise_qknn(log2_dim: int,
                    log2_n_samps: int,
                    test_state: np.ndarray) -> qk.QuantumCircuit:
    r"""Creates the registers and applies the unclassified datum :math:`\psi`.
    Args:
        log2_dim (int): int, log2 value of the dimension of the test and train states.
        log2_n_samps (int): int, log2 value of the number of training samples M.
        test_state (numpy.ndarray): 2 ** log2_dimension complex values to initialise the r_1 test state in (psi).
    Returns:
        QuantumCircuit: The initialised circuit.
    """
    if len(test_state) != 2 ** log2_dim:
        raise ValueError(
            "Dimensionality of test state or provided dimension not correct;"
            " test state dim is {0:d}, and dimension given is {1:d}".format(len(test_state), 2 ** log2_dim))

    # register for control qubit
    r_0 = qk.QuantumRegister(1, name='control')
    # register for test state
    r_1 = qk.QuantumRegister(log2_dim, name='state_to_classify')
    # register for train state
    r_2 = qk.QuantumRegister(log2_dim, name='train_states')
    # register for computational basis
    r_3 = qk.QuantumRegister(log2_n_samps, name='comp_basis')

    # classical register for measuring the control and computational basis
    c_0 = qk.ClassicalRegister(r_0.size, name='meas_control')
    c_1 = qk.ClassicalRegister(r_3.size, name="meas_comp_basis")

    init_circ = qk.QuantumCircuit(r_0, r_1, r_2, r_3, c_0, c_1)
    init = qi.Isometry(test_state, 0, 0)
    init.name = "init test state"
    init_circ.append(init, r_1)
    init_circ.barrier()

    logger.debug(f"Initialised circuit as:\n{init_circ.draw()}")

    return init_circ


def state_transformation(qknn_circ: qk.QuantumCircuit,
                         oracle: qinst.Instruction) -> qk.QuantumCircuit:
    r"""applies :math:`H`-gates and the Oracle :math:`\mathcal{W}` to the circuit, and applies the :math:`SWAP`-test.
    Args:
        qknn_circ (QuantumCircuit): has been initialised according to initialise_qknn().
        oracle (qiskit Instruction): oracle W|i>|0> = W|i>|phi_i> for applying training data.
    Returns:
        QuantumCircuit: the transformed :class:`QuantumCircuit`.
    """
    # initialising registers for readability
    [control, test_register, train_register, comp_basis] = qknn_circ.qregs

    # perform equation 13 from Afham; Afham, Afrad; Goyal, Sandeep K. (2020).
    qknn_circ.h(control)
    qknn_circ.h(comp_basis)

    # perform equation 15, append to circuit
    qknn_circ.append(oracle, train_register[:] + comp_basis[:])

    # controlled swap
    for psi_bit, phi_bit in zip(test_register, train_register):
        qknn_circ.cswap(control, psi_bit, phi_bit)

    # final Hadamard gate
    qknn_circ.h(control)
    
    qknn_circ.barrier()

    logger.info(f"transformed registers to circuit:\n{qknn_circ.draw()}")

    return qknn_circ


def add_measurements(qknn_circ: qk.QuantumCircuit) -> qk.QuantumCircuit:
    """Adds measurement gates to the control and computational basis.
    Args:
        qknn_circ (qk.QuantumCircuit): has been build up by applying initialise_qknn() and state_transformation().
    Returns:
        QuantumCircuit: the :class:`QuantumCircuit` with measurements applied.
    """
    comp_basis_creg = qknn_circ.cregs[-1]
    comp_basis_qreg = qknn_circ.qregs[-1]
    qknn_circ.measure(qknn_circ.qregs[0], qknn_circ.cregs[0])
    for qbit, cbit in zip(comp_basis_qreg, reversed(comp_basis_creg)):
        qknn_circ.measure(qbit, cbit)

    logger.debug("Added measurements.")

    return qknn_circ

## Quantum KNN Algorithm

In [5]:
class QKNeighborsClassifier(QuantumAlgorithm):
    
    def __init__(self, n_neighbors: int = 3,
                 training_dataset: Optional[np.ndarray] = None,
                 training_labels: Optional[np.ndarray] = None,
                 test_dataset: Optional[np.ndarray] = None,
                 data_points: Optional[np.ndarray] = None,
                 quantum_instance: OptionalQInstance = None) -> None:
        super().__init__(quantum_instance)

        self.n_neighbors = n_neighbors

        # datasets for training, testing and predicting
        self.training_dataset = None
        self.test_dataset = None
        self.data_points = None
        # dictionaries containing the class vs labels and reverse, respectively
        self.class_to_label = None
        self.label_to_class = None
        # the number of classes in the provided data sets
        self.num_classes = None

        # setup of the data
        self.training_dataset = training_dataset
        self.training_labels = training_labels
        self.test_dataset = test_dataset
        self.data_points = data_points

        self.instance = _QKNN(self)
        
        
    def fit(self, X, y):
        """
        Args:
            X (array-like): Training data of shape [n_samples, n_features].
            y (array-like): Target values of shape [n_samples].
        """
        self.training_dataset = X
        self.training_labels = y
        
        
    @staticmethod
    def construct_circuit(state_to_classify: np.ndarray,
                          oracle: qinst.Instruction,
                          add_measurement: bool = False) -> qk.QuantumCircuit:
        r"""Construct a QkNN QuantumCircuit.
        The Oracle provided is mentioned in :afham2020:`Afham et al. (2020)`
        as the parameter :math:`\mathcal{W}`, and is created via the method
        :py:func:`~qiskit_quantum_knn.qknn.qknn_construction.create_oracle`.
        Args:
            state_to_classify (array-like): array of dimension ``N`` complex
                values describing the state to classify via kNN.
            oracle (qiskit Instruction): oracle :math:`\mathcal{W}` for applying
                training data.
            add_measurement (bool): controls if measurements must be added
                to the classical registers.
        Returns:
            QuantumCircuit: The constructed circuit.
        """
        return construct_circuit(state_to_classify, oracle, add_measurement)
    
    
    @staticmethod
    def construct_circuits(data_to_predict, training_data) -> qk.QuantumCircuit:
        """Constructs all quantum circuits for each datum to classify.
        Args:
            data_to_predict (array): data points, 2-D array, of shape
                ``(N, D)``, where ``N`` is the number of data points and ``D``
                is the dimensionality of the vector. ``D`` should coincide with
                the provided training data.
            training_data (array): data points which you want to know
                the distance of between :py:attr:`data_to_predict`.
        Returns:
            numpy.ndarray: The constructed circuits.
        Raises:
            AquaError: Quantum instance is not present.
        """
        measurement = True  # can be adjusted if statevector_sim
        oracle = create_oracle(training_data)

        # parallel_map() creates QuantumCircuits in parallel to be executed by a QuantumInstance
        logger.info("Starting parallel map for constructing circuits.")
        circuits = qktools.parallel_map(QKNeighborsClassifier.construct_circuit,
                                        data_to_predict,
                                        task_args=[oracle, measurement])
        logger.info("Done.")

        return circuits
    
    
    @staticmethod
    def execute_circuits(quantum_instance: UnionQInstBaseB, circuits) -> qres.Result:
        """Executes the provided circuits (type array-like)."""
        logger.info("Executing circuits")
        result = quantum_instance.execute(circuits)
        logger.info("Done.")
        return result
    
    
    def get_circuit_results(self, circuits, quantum_instance: OptionalQInstance = None) -> qres.Result:
        """Get the qiskit Results from the provided quantum circuits."""
        self._quantum_instance = self._quantum_instance \
            if quantum_instance is None else quantum_instance
        if self._quantum_instance is None:
            raise aqua.AquaError("Either provide a quantum instance or set one up.")

        return QKNeighborsClassifier.execute_circuits(self.quantum_instance, circuits)
    
    
    @staticmethod
    def get_all_fidelities(circuit_results: qres.Result):
        r"""Get all contrasts.
        Gets the fidelity values which are calculated via
        :func:`calculate_fidelities` and saves these in an array. For more about
        fidelities, see :meth:`calculate_fidelities`.
        Args:
            circuit_results (qiskit.result.Result): the results from a QkNN
                circuit build using ``QKNeighborsClassifier``.
        Returns:
            array: all fidelities corresponding to the QkNN.
        """
        logger.info("Getting fidelity values.")
        # get all counts from the circuit results
        all_counts = circuit_results.get_counts()
        # determine the length of the computational basis register by checking the length of the count result
        # the -2 compensates for the '0' or '1' at the end of the key
        num_qubits = len(list(all_counts[0].keys())[0]) - 2

        # initialize the array which will hold the contrast values
        n_occurrences = len(all_counts)  # number of occurring states
        n_datapoints = 2 ** num_qubits   # number of data points

        all_fidelities = np.empty(shape=(n_occurrences, n_datapoints))

        # loop over all counted states
        for i, counts in enumerate(all_counts):
            # calculate the contrast values q(i) for this set of counts
            all_fidelities[i] = \
                QKNeighborsClassifier.calculate_fidelities(counts)
        logger.info("Done.")

        return all_fidelities
    
    
    @staticmethod
    def calculate_fidelities(counts: Dict[str, int]) -> np.ndarray:
        r"""Calculate the fidelities :math:`F_i`.
        Args:
            counts (dict): counts pulled from a qiskit Result from the QkNN.
        Returns:
            array: the fidelity values.
            ndarray of length ``n_samples`` with each index ``i`` (representing
            state :math:`|i\rangle` from the computational basis) the fidelity
            belonging to :math:`|i\rangle`.
        """
        # first get the total counts of 0 and 1 in the control qubit
        subsystem_counts = ss.get_subsystems_counts(counts)
        # the counts from the control qubit are in the second register
        control_counts = QKNeighborsClassifier.setup_control_counts(subsystem_counts[1])
        total_counts = control_counts['0'] + control_counts['1']
        exp_fidelity = np.abs(control_counts['0'] - control_counts['1']) / \
            total_counts

        # now get the counts for the fidelities define possible states that the computational can be in
        num_qubits = len(list(subsystem_counts[0].keys())[0])
        comp_basis_states = \
            list(itertools.product(['0', '1'], repeat=num_qubits))
        # initialise dict which is going to contain the fidelity values
        fidelities = np.zeros(2 ** num_qubits, dtype=float)
        for comp_state in comp_basis_states:
            # convert list of '0's and '1's to one string
            comp_state = ''.join(comp_state)
            # init fidelity value for this state
            fidelity = 0.
            for control_state in control_counts.keys():
                state_str = comp_state + ' ' + control_state
                if state_str not in counts:
                    logger.debug(
                        "State {0:s} not found in counts {1}. Adding"
                        "naught to contrast value."
                        .format(state_str, counts))
                else:
                    fidelity += \
                        (-1) ** int(control_state) * \
                        (counts[state_str]) / control_counts[control_state] * \
                        (1 - exp_fidelity ** 2)
            index_state = int(comp_state, 2)
            fidelity *= 2 ** num_qubits / 2
            fidelity += exp_fidelity
            fidelities[index_state] = fidelity
        return fidelities
    
    
    @staticmethod
    def calculate_contrasts(counts: Dict[str, int]) -> np.ndarray:
        r"""Calculate contrasts :math:`q(i)`.
        Args:
            counts (dict): counts pulled from a qiskit Result from the QkNN.
        Returns:
            array: the contrasts values."""
        # first get the total counts of 0 and 1 in the control qubit
        subsystem_counts = ss.get_subsystems_counts(counts)
        # the counts from the control qubit are in the second register
        control_counts = QKNeighborsClassifier.setup_control_counts(subsystem_counts[1])

        # now get the counts for the contrasts define possible states that the computational can be in
        num_qubits = len(list(subsystem_counts[0].keys())[0])
        comp_basis_states = \
            list(itertools.product(['0', '1'], repeat=num_qubits))
        # initialise dict which is going to  contain the contrast values
        contrasts = np.zeros(2 ** num_qubits, dtype=float)
        for comp_state in comp_basis_states:
            # convert list of '0's and '1's to one string
            comp_state = ''.join(comp_state)
            # init contrast value for this state
            contrast = 0.
            for control_state in control_counts.keys():
                state_str = comp_state + ' ' + control_state
                if state_str not in counts:
                    logger.debug(
                        "State {0:s} not found in counts {1}. Adding"
                        "naught to contrast value."
                        .format(state_str, counts))
                else:
                    contrast += \
                        (-1) ** int(control_state) * \
                        (counts[state_str]) / control_counts[control_state]
            index_state = int(comp_state, 2)
            contrasts[index_state] = contrast

        return contrasts

    
    @staticmethod
    def setup_control_counts(control_counts: Dict[str, int]) -> Dict[str, int]:
        """Sets up control counts dict.
        Args:
            control_counts (dict): dictionary from a :py:class:`Result`
            representing the control qubit in the QkNN circuit.
        Returns:
            dict: The modified control counts.
                The same control_counts dict as provided but with non-counted
                occurrence added as well if needed.
        Raises:
            ValueError: if the provided dictionary does not coincide with the
                :py:class:`Result` from the QkNN.
        """
        # constant describing the states possible in control_counts
        control_states = np.array(['0', '1'])
        # check if substition of 0 count value must be done
        if control_states[0] not in control_counts:
            to_substitute = int(control_states[0])
        elif control_states[1] not in control_counts:
            to_substitute = int(control_states[1])
        else:
            to_substitute = None

        if to_substitute is not None:
            # if to_substitute = 1, make it -1 * (1 - 1) = 0, else, make it -1 * (0 - 1) = 1
            sole_occurrence = -1 * (to_substitute - 1)
            logger.debug(
                "Only one value is counted in the control qubit: {0:d},"
                "setting the counts of state {1:d} to 0."
                    .format(sole_occurrence, to_substitute))
            control_counts = {
                str(to_substitute): 0,
                str(sole_occurrence): control_counts[str(sole_occurrence)]
            }

        return control_counts
    
    
    def majority_vote(self, labels: np.ndarray, fidelities: np.ndarray) -> np.ndarray:
        """Performs majority vote with the :math:`k` nearest to determine class.
        Args:
            labels (array-like): The labels of the training data provided to
                the :class:`QKNeighborsClassifier`.
            fidelities (array-like): The fidelities calculated using
                :meth:`get_all_fidelities'.
        Returns:
            ndarray: The labels resulted from the majority vote.
        """
        logger.info("Performing majority vote.")
        # get the neighbors sorted on their distance (lowest first) per data point.
        if np.any(fidelities < -0.2) or np.any(fidelities > 1.2):
            raise ValueError("Fidelities contain values outside range 0<=F<=1:"
                             f"{fidelities[fidelities < -0.2]}"
                             f"{fidelities[fidelities > 1.2]}")

        sorted_neighbors = np.argpartition(1 - fidelities, -self.n_neighbors)
        # get the number of participating values
        n_queries = len(labels)
        
        sorted_neighbors = sorted_neighbors[sorted_neighbors < n_queries]\
            .reshape(sorted_neighbors.shape[0], n_queries)

        if n_queries == 1:
            n_closest_neighbors = sorted_neighbors[:self.n_neighbors]
        else:
            # when more than one data point is given to this majority vote: shape is (n_points, m)
            n_closest_neighbors = sorted_neighbors[:, :self.n_neighbors]

        voter_labels = np.take(labels, n_closest_neighbors)
        if n_queries == 1:
            votes, counts = stats.mode(voter_labels)
        else:
            votes, counts = stats.mode(voter_labels, axis=1)

        logger.info("Done.")
        return votes.real.flatten()
    
    
    @property
    def ret(self) -> Dict:
        """ Returns result.
        Returns:
            Dict: return value(s).
        """
        return self.instance.ret

    
    @ret.setter
    def ret(self, new_value):
        """ Sets result.
        Args:
            new_value: new value to set.
        """
        self.instance.ret = new_value

        
    def predict(self, data) -> np.ndarray:
        """Predict the labels of the provided data."""
        return self.instance.predict(data)

    
    def _run(self) -> Dict:
        return self.instance.run()

## Encode Classical Data to Quantum State (amplitude encoding)

from qiskit_quantum_knn.encoding import analog

In [6]:
def encode(classical_data):
    r"""Encodes the given classical state to a quantum state.
    Args:
        classical_data (vector_like): state(s) to encode.
    Returns:
        np.ndarray: the encoded quantum state.
    """
    # sum up every row of the matrix to get the lengths for each row via a_ij * a_ij = A_i
    amplitudes = np.sqrt(np.einsum('ij, ij->i', classical_data, classical_data))

    # set zero amplitudes to 1 to prevent division through zero
    amplitudes[amplitudes == 0] = 1

    # normalise the data by dividing the original through the amplitude
    normalised_data = classical_data / amplitudes[:, np.newaxis]

    return normalised_data

# Example

In [25]:
# initialising the quantum instance:
backend = qk.BasicAer.get_backend('qasm_simulator')
instance = aqua.QuantumInstance(backend, shots=10000)

# initialising the qknn model:
qknn = QKNeighborsClassifier(n_neighbors=3, quantum_instance=instance)


# use iris dataset:
iris = datasets.load_iris()
labels = iris.target
raw = iris.data
df = pd.DataFrame(iris.data, columns=iris.feature_names)
df['labels'] = labels
df = df.sample(n=len(df))


df.head()

Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),labels
100,6.3,3.3,6.0,2.5,2
2,4.7,3.2,1.3,0.2,0
53,5.5,2.3,4.0,1.3,1
18,5.7,3.8,1.7,0.3,0
6,4.6,3.4,1.4,0.3,0


In [26]:
data_raw = df[['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)']]
data_raw = data_raw.to_numpy()

In [37]:
# create the train and test sets
n_variables = 4  # should be positive power of 2                      
n_train_points = 16  #int(0.8*len(df))   # can be any positive integer
n_test_points = 10   #int(0.2*len(df))    # can be any positive integer

# encode the data
encoded_data = encode(data_raw[:, :n_variables])

train_data = encoded_data[:n_train_points]
train_labels = df['labels'][:n_train_points]
train_labels = train_labels.to_numpy()

test_data = encoded_data[n_train_points:(n_train_points + n_test_points), :n_variables]
test_labels = df['labels'][n_train_points:(n_train_points + n_test_points)]
test_labels = test_labels.to_numpy()

qknn.fit(train_data, train_labels)
qknn_prediction = qknn.predict(test_data)

print(qknn_prediction)
print(test_labels)

[0 2 0 2 2 1 2 0 1 2]
[0 1 0 1 1 0 2 2 0 1]


In [40]:
mse = mean_squared_error(test_labels, qknn_prediction)
mse

1.0

In [42]:
correct = 0
for i in range(len(qknn_prediction)):
    if qknn_prediction[i] == test_labels[i]:
        correct += 1
    
accuracy = correct/n_test_points

accuracy

0.3

In [45]:
accuracy = metrics.accuracy_score(test_labels, qknn_prediction)

In [46]:
accuracy

0.3