In [None]:
# Importing the libraries
import qiskit as qs
from qiskit.circuit.library import GroverOperator, MCMT, ZGate
import qiskit_aer as aer
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import math

## Grover's Algorithm

In [None]:
# Give of list of numbers we want to find a specific winner in the list. Let us
# start by solving this problem classically.

# Here's our list
my_list = [0,1,7,3]

# Here's our winner
WINNER = 7

# We will define an oracle function that will return True if our winner is found.
def my_oracle(value):
    if value == WINNER:
        return True
    else:
        return False
    
# We can now (inefficently) loop through the list and call the oracle function
for i, value in enumerate(my_list):
    if my_oracle(value):
        print(f"Winner found at index {i} with value {value}")
        print(f"Orcale called {i+1} times")
        break

Obviously there are better algorithms for this job however classically this problem has a computational scaling of N/2. Using a qunatum algorithm we can achieve $\sqrt{N}$ scaling.

We can describe a list using a 2 quibit quantum state:

[00, 01, 10, 11]

Now our winner is the state 11.

In [None]:
# Our orcale is now going to be a quantum circuit
def get_grover_oracle(winner):

    num_qbits = len(winner)

    circuit = qs.QuantumCircuit(num_qbits, name='oracle')

    # Qiskit uses reversed bit ordering so me have to reverse the winner string
    winner = winner[::-1]

    # Find the 0 bits in the winner string
    zero_inds = [i for i, bit in enumerate(winner) if bit == '0']

    if len(zero_inds) > 0:
        # Add an x gate to the qubits in the winner that were set to 0
        circuit.x(zero_inds)

    circuit.compose(MCMT(ZGate(), num_qbits-1, 1), inplace=True)
    
    if len(zero_inds) > 0:
        # Add an x gates and a multi controlled z gate
        circuit.x(zero_inds)

    return circuit

In [None]:
# Here's our new winner
QUANTUM_WINNER = '11'

# Let's get the oracle circuit that will mark our winner
grover_oracle = get_grover_oracle(QUANTUM_WINNER)
grover_oracle.decompose().draw(output='mpl')

In [None]:
# We can convert this into it's own gate
grover_oracle = grover_oracle.to_gate(label='oracle')

We can set up a new quantum system and see what this gate does to the state vectors.

In [None]:
circuit = qs.QuantumCircuit(len(QUANTUM_WINNER))
circuit.draw(output='mpl')

First we can take a look at the state vectors of an initial quantum state.

In [None]:
# Helper function to display the state vectors
def get_state_vectors(circuit):
    simulator = aer.StatevectorSimulator()
    result = simulator.run(circuit.decompose()).result()
    sv = np.round(result.get_statevector(), 3)
    state_labels = [f"{i:0{len(QUANTUM_WINNER)}b}" for i in range(2**len(QUANTUM_WINNER))]
    df = pd.DataFrame(np.array([state_labels, sv]).T, columns=['State', 'Vector'])
    display(df)
    return sv

In [None]:
sv = get_state_vectors(circuit)

In [None]:
# Now let's apply the hadamard gates to put the qubits in superposition
circuit.h(range(len(QUANTUM_WINNER)))
circuit.draw(output='mpl')

In [None]:
sv = get_state_vectors(circuit)

In [None]:
# And finally lets add our oracle
# Now let's apply the hadamard gates to put the qubits in superposition
circuit.append(grover_oracle, range(len(QUANTUM_WINNER)))
circuit.draw(output='mpl')

In [None]:
sv = get_state_vectors(circuit)

You can see that the winner state has been marked by changing it's amplitude. Now that our winner state is marked we can apply the next part of the algorithm.

In [None]:
# The grover operator is built into qiskit; we can decompose
# so we can see what it is composed of.

grover_op = GroverOperator(get_grover_oracle(QUANTUM_WINNER).decompose())
grover_op.decompose().draw(output='mpl')

So this Grover's operator will alter our quantum state and slowly map the probability amplitudes onto the marked states. There is an equation to calculate the optimum number of operations to perform.

In [None]:
optimal_num_iterations = math.floor(
    math.pi / (4 * math.asin(math.sqrt(1 / 2**grover_op.num_qubits)))
)
print(f"Optimal number of iterations: {optimal_num_iterations}")

In [None]:
# Now we can apply the grover operator to our circuit twice.

for i in range(optimal_num_iterations):
    circuit.append(grover_op.decompose(), range(grover_op.num_qubits))

# Our cicuit is now finished and we can measure the state of all
# quibits to get the result.
circuit.measure_all()
circuit.draw(output='mpl')

In [None]:
simulator = aer.QasmSimulator()
result = simulator.run(circuit.decompose(), shots=1000).result()
counts = result.get_counts()

plt.bar(counts.keys(), counts.values())

Let's tidy this into a neat little function so that we can test some other cases. For simplicity we will redefine the problem slightly:

- The list will contain numbers 0 to $2^n$ where $n$ is a integer number of qubits.
- The winner is any number form the list.

In [None]:
# Helper functions

def get_state_map(my_list, n):
    state_map = {}
    for i, val in enumerate(my_list):
        state_map[val] = f"{i:0{n}b}"
    return state_map

def convert_to_quantum_state(value, state_map):
    return state_map[value]

def convert_to_classical_value(state, state_map: dict):
    for key, val in state_map.items():
        if val == state:
            return key

def get_quantum_state(list):
    n = math.ceil(math.log2(max(list)+1))
    state_map = get_state_map(list)
    states = []
    for number in list:
        states.append(convert_to_quantum_state(number, state_map))
    return states

def convert_counts_to_classical(counts, state_map):
    classical_counts = {}
    for key in counts.keys():
        new_key = convert_to_classical_value(key, state_map)
        classical_counts[new_key] = counts[key]
    return classical_counts

def create_plotting_counts(counts, list):
    for i, val in enumerate(list):
        if val not in counts.keys():
            counts[val] = 0

    return counts

def get_key_with_max_val(dictionary):
    return max(dictionary, key=dictionary.get)

In [None]:
def get_winner(my_list, winner, shots=1000):

    n = math.ceil(math.log2(max(my_list)+1))

    state_map = get_state_map(my_list, n)

    winner_state = convert_to_quantum_state(winner, state_map)

    grover_oracle = get_grover_oracle(winner_state)

    grover_op = GroverOperator(grover_oracle)

    optimal_num_iterations = math.floor(
        math.pi / (4 * math.asin(math.sqrt(1 / 2**grover_op.num_qubits)))
    )

    circuit = qs.QuantumCircuit(grover_op.num_qubits)
    circuit.h(range(grover_op.num_qubits))
    
    for i in range(optimal_num_iterations):
        circuit.append(grover_op.decompose(), range(grover_op.num_qubits))
    
    circuit.measure_all()
    circuit = circuit.decompose()

    simulator = aer.QasmSimulator()
    result = simulator.run(circuit.decompose(), shots=shots).result()
    counts = result.get_counts()

    classical_counts = convert_counts_to_classical(counts, state_map)
    plotting_counts = create_plotting_counts(classical_counts, my_list)

    result_df = pd.DataFrame(plotting_counts.items(), columns=['Value', 'Count'])
    

    # create a my_list of labels
    tick_label = [str(i) for i in list(result_df['Value'])]
    plt.bar(result_df['Value'], result_df['Count'], tick_label=tick_label)
    plt.show()

    winner = get_key_with_max_val(classical_counts)
    
    print(f"Winner is {winner} with {classical_counts[winner]} votes")

    return winner

In [None]:
# Number of quibits to use
n = 6

# Create a list of unique random numbers
my_list = np.random.choice(np.arange(2**n), 2**n, replace=False)

# Pick a random winner
my_winner = np.random.choice(my_list)

print(f"List: {my_list}")
print(f"Winner: {my_winner}")

In [None]:
_ = get_winner(my_list, my_winner, shots=1)

# Quantum Kernels

In [None]:
from qiskit.circuit.library import ZFeatureMap, ZZFeatureMap
from qiskit.circuit.library import ZZFeatureMap
from qiskit.primitives import Sampler
from qiskit_algorithms.state_fidelities import ComputeUncompute
from qiskit_machine_learning.kernels import FidelityQuantumKernel
from qiskit_machine_learning.datasets import ad_hoc_data

In [None]:
# Load a dataset
adhoc_dimension = 2
train_features, train_labels, test_features, test_labels, adhoc_total = ad_hoc_data(
    training_size=20,
    test_size=5,
    n=adhoc_dimension,
    gap=0.3,
    plot_data=False,
    one_hot=False,
    include_sample_total=True,
)

In [None]:
# Plotting functions

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

def plot_cov(cov):
    fig = plt.figure(figsize=(4, 3))
    sns.heatmap(cov, fmt='g', cmap='Greens', square = True)
    plt.show()

def plot_features(ax, features, labels, class_label, marker, face, edge, label):
    # A train plot
    ax.scatter(
        # x coordinate of labels where class is class_label
        features[np.where(labels[:] == class_label), 0],
        # y coordinate of labels where class is class_label
        features[np.where(labels[:] == class_label), 1],
        marker=marker,
        facecolors=face,
        edgecolors=edge,
        label=label,
    )

def plot_dataset(train_features, train_labels, test_features, test_labels, adhoc_total):

    plt.figure(figsize=(5, 5))
    plt.ylim(0, 2 * np.pi)
    plt.xlim(0, 2 * np.pi)
    plt.imshow(
        np.asmatrix(adhoc_total).T,
        interpolation="nearest",
        origin="lower",
        cmap="RdBu",
        extent=[0, 2 * np.pi, 0, 2 * np.pi],
    )
    
def heatmap(
    x_axis: str,
    y_axis: str,
    z_axis: str,
    df_X: pd.DataFrame,
    df_mean: pd.DataFrame,
) -> plt:

    n = int(math.sqrt(len(df_X)))
    xmin, xmax = df_X[x_axis].min(), df_X[x_axis].max()
    ymin, ymax = df_X[y_axis].min(), df_X[y_axis].max()
    extent = [xmin, xmax, ymin, ymax]
    plt.subplots()
    plt.imshow(
        df_mean[z_axis].values.reshape(n, n),
        cmap="viridis",
        origin="lower",  # Lower left corner of (0,0) is correct for data
        extent=extent,  # Sets axes labels according to data
        aspect=xmax / ymax,  # Correct aspect ratio
    )
    plt.xlabel(x_axis)
    plt.ylabel(y_axis)
    plt.colorbar(label=z_axis)
    return plt

This dataset contains features, (x,y) coordinates, that each have a binary label. We can plot the dataset using a heatmap as shown below.

In [None]:
plot_dataset(train_features, train_labels, test_features, test_labels, adhoc_total)

![image.png](attachment:image.png)

A ZZFeatureMap feature map can be used to map these features on to a block sphere as shown above where positive binary values correspond to a postive expecation value of the normal $w$.

In [None]:
adhoc_feature_map = ZZFeatureMap(feature_dimension=adhoc_dimension, reps=2, entanglement="linear")
adhoc_feature_map.decompose().draw(output='mpl')

In [None]:
# Creating the kernel
sampler = Sampler()
fidelity = ComputeUncompute(sampler=sampler)
adhoc_kernel = FidelityQuantumKernel(fidelity=fidelity, feature_map=adhoc_feature_map)

In [None]:
cov_matrix = adhoc_kernel.evaluate(x_vec=train_features)
plot_cov(cov_matrix)

Let's plot our test data over the function, remember red = +1 and blue = -1

In [None]:
plt.figure(figsize=(10, 10))
plt.ylim(0, 2 * np.pi)
plt.xlim(0, 2 * np.pi)
plt.imshow(
    np.asmatrix(adhoc_total).T,
    interpolation="nearest",
    origin="lower",
    cmap="RdBu",
    extent=[0, 2 * np.pi, 0, 2 * np.pi],
)
plt.scatter(
    # x coordinate of labels where class is class_label
    test_features[np.where(test_labels[:] == 0), 0],
    # y coordinate of labels where class is class_label
    test_features[np.where(test_labels[:] == 0), 1],
    marker="s",
    facecolors="g",
    edgecolors="w",
    label="A_test",
)
plt.scatter(
    # x coordinate of labels where class is class_label
    test_features[np.where(test_labels[:] == 1), 0],
    # y coordinate of labels where class is class_label
    test_features[np.where(test_labels[:] == 1), 1],
    marker="s",
    facecolors="pink",
    edgecolors="w",
    label="A_test",
)

In [None]:
from sklearn.svm import SVC

adhoc_svc = SVC(kernel=adhoc_kernel.evaluate)

adhoc_svc.fit(train_features, train_labels)

adhoc_score_callable_function = adhoc_svc.score(test_features, test_labels)

print(f"Callable kernel classification test score: {adhoc_score_callable_function}")

In [None]:
adhoc_svc.predict(test_features)
results_df = pd.DataFrame({"Test X Input": test_features[:, 0], "Test Y Input": test_features[:, 1], "Predicted Label": adhoc_svc.predict(test_features)})
display(results_df)

In [None]:
import torch
import gpytorch

class QuantumKernel(gpytorch.kernels.Kernel):
    def __init__(self, **kwargs):
        super(QuantumKernel, self).__init__(has_lengthscale=False, **kwargs)
        # Add any custom parameters here

    def forward(self, x1, x2, diag=False, **params):
        
        return adhoc_kernel.evaluate(x_vec=x1, y_vec=x2)


In [None]:
class GPClassificationModel(gpytorch.models.ApproximateGP):
    def __init__(self, inducing_points):
        variational_distribution = gpytorch.variational.CholeskyVariationalDistribution(inducing_points.size(0))
        variational_strategy = gpytorch.variational.VariationalStrategy(self, inducing_points, variational_distribution, learn_inducing_locations=True)
        super(GPClassificationModel, self).__init__(variational_strategy)
        self.mean_module = gpytorch.means.ConstantMean()
        self.covar_module = QuantumKernel()

    def forward(self, x):
        mean_x = self.mean_module(x)
        covar_x = self.covar_module(x)
        return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)


In [None]:
torch_train_features = torch.tensor(train_features)
torch_train_labels = torch.tensor(train_labels)
inducing_points = torch_train_features.unsqueeze(-1)  # Inducing points

In [None]:
likelihood = gpytorch.likelihoods.BernoulliLikelihood()
model = GPClassificationModel(inducing_points=inducing_points)

In [None]:
# # Use the Adam optimizer
# optimizer = torch.optim.Adam(model.parameters(), lr=0.005)  # Use a smaller learning rate to stabilize training

# # "Loss" for GPs - the variational ELBO
# mll = gpytorch.mlls.VariationalELBO(likelihood, model, torch_train_labels.numel())

# model.train()
# likelihood.train()

# training_iterations = 20
# batch_size = 2  # Use smaller batches to save memory

# for i in range(training_iterations):
#     for batch_idx in range(0, len(torch_train_features), batch_size):
#         optimizer.zero_grad()
#         batch_x = torch_train_features[batch_idx:batch_idx + batch_size]
#         batch_y = torch_train_labels[batch_idx:batch_idx + batch_size]
#         output = model(batch_x)
#         loss = -mll(output, batch_y)
#         loss.backward()
#         optimizer.step()
#     print(f'Iteration {i + 1}/{training_iterations} - Loss: {loss.item()}')


In [None]:
# class GPClassificationModel(gpytorch.models.ApproximateGP):
#     def __init__(self, inducing_points):
#         variational_distribution = gpytorch.variational.CholeskyVariationalDistribution(inducing_points.size(0))
#         variational_strategy = gpytorch.variational.VariationalStrategy(self, inducing_points, variational_distribution, learn_inducing_locations=True)
#         super(GPClassificationModel, self).__init__(variational_strategy)
#         self.mean_module = gpytorch.means.ConstantMean()
#         self.covar_module = QuantumKernel()

#     def forward(self, x):
#         mean_x = self.mean_module(x)
#         covar_x = self.covar_module(x)
#         return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)

# # Create training data
# train_x = torch_train_features = torch.tensor(train_features)
# train_y = torch.tensor(train_labels)
# inducing_points = torch_train_features.unsqueeze(-1)  # Inducing points

# # Initialize the model and likelihood
# likelihood = gpytorch.likelihoods.BernoulliLikelihood()
# model = GPClassificationModel(inducing_points)

# # Training procedure
# optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

# mll = gpytorch.mlls.VariationalELBO(likelihood, model, train_y.numel())

# model.train()
# likelihood.train()

# training_iterations = 10
# batch_size = 2

# for i in range(training_iterations):
#     for batch_idx in range(0, len(train_x), batch_size):
#         optimizer.zero_grad()
#         batch_x = train_x[batch_idx:batch_idx + batch_size]
#         batch_y = train_y[batch_idx:batch_idx + batch_size]
#         output = model(batch_x)
#         loss = -mll(output, batch_y)
#         loss.backward()
#         optimizer.step()
#     print(f'Iteration {i + 1}/{training_iterations} - Loss: {loss.item()}')

# model.eval()
# likelihood.eval()

# # Test points
# test_x = torch.rand(10, 2, 1)
# with torch.no_grad(), gpytorch.settings.fast_pred_var():
#     observed_pred = likelihood(model(test_x))

# # Plot the predictions
# with torch.no_grad():
#     f, ax = plt.subplots(1, 1, figsize=(4, 3))
#     lower, upper = observed_pred.confidence_region()
#     ax.plot(test_x.squeeze().numpy(), observed_pred.mean.numpy(), 'b')
#     ax.fill_between(test_x.squeeze().numpy(), lower.numpy(), upper.numpy(), alpha=0.5)
#     ax.set_ylim([-0.1, 1.1])
#     ax.legend(['Observed Data', 'Mean', 'Confidence'])
#     plt.show()


In [None]:
import twinlab as tl

gardening_df = tl.load_example_dataset("gardening")

In [None]:
binary_gardening_df = gardening_df.copy()
# for value in binary_gardening_df["Fruits produced"].values:
#     if value > 0:
#         binary_gardening_df["Fruits produced"].replace(value, 1, inplace=True)

# heatmap(x_axis="Sunlight [hours/day]", y_axis="Water [times/week]", z_axis="Fruits produced", df_X=binary_gardening_df.iloc[:, :-1], df_mean=pd.DataFrame(binary_gardening_df.iloc[:, -1]))

In [None]:
tl_data = tl.Dataset("gardening_df")
tl_data.upload(gardening_df)
tl_gp = tl.Emulator("gardening_df")
tl_gp.train(tl_data, ["Sunlight [hours/day]", "Water [times/week]"], ["Fruits produced"])
tl_gp.summarise()

In [None]:
class kernel:
    def __init__(self,
                 theta : list):
        self.theta = theta

    def __call__(self,
                 xi : np.array,
                 xj : np.array):
        return NotImplementedError

class SE(kernel):

    def __init__(self,
                 theta: list):
        super().__init__(theta)

    def __call__(self, xi, xj):
        return (self.theta[1]**2) * np.exp(-0.5 * ((xi - xj)/ self.theta[0])**2)
    
class QK(kernel):
    def __init__(self, feature_dimension: int):
        self.feature_map = ZZFeatureMap(feature_dimension=feature_dimension, reps=1, entanglement='linear', insert_barriers=True)

    def __call__(self, xi, xj):
        return FidelityQuantumKernel(feature_map=self.feature_map).evaluate(xi, xj)

In [None]:
def cov_matrix(x0, x1, k):
    K = np.zeros((len(x0),len(x1)))
    for i, xi in enumerate(x0):
        for j, xj in enumerate(x1):
            K[i][j] = k(xi, xj)
    return K

In [None]:
x = binary_gardening_df.iloc[:, :-1].values

kernel = QK(feature_dimension=2)

cov = cov_matrix(x, x, kernel)

plot_cov(cov)

In [None]:
def eval_posterior(x_data, Y, x_eval, mu, k):

    Kxy = cov_matrix(x_eval, x_data, k)
    Kyy_inv = np.linalg.inv(cov_matrix(x_data, x_data, k))
    Kxx = cov_matrix(x_eval, x_eval, k)

    mu_out = mu*np.ones(len(x_eval)) + np.dot(np.dot(Kxy,Kyy_inv),(Y - mu*np.ones(len(x_data))))
    cov = Kxx - np.dot(Kxy,np.dot(Kyy_inv, Kxy.T))

    return mu_out, cov

In [None]:
num_samples = 24
sample_points = 5

mu = -0.32584294324713525

kernel = QK(feature_dimension=2)

x = np.array([[i, j] for i in range(0, 16) for j in range(0, 7)])

# Randomly choose 3 sample points form the dataset
samples= binary_gardening_df.sample(sample_points)
x_data = samples.iloc[:, :-1].values
y_data = samples.iloc[:, -1].values

mu_GP, cov_GP = eval_posterior(x_data, y_data, x, mu, kernel)

## We now plot

np.random.seed(12)
s = np.random.multivariate_normal(mu_GP, cov_GP, size= (1,))


In [None]:
df_X = pd.DataFrame(x[:100], columns=["Sunlight [hours/day]", "Water [times/week]"])
df_mean = pd.DataFrame(s[0][:100], columns=["Fruits produced"])

In [None]:
binary_df_mean = df_mean.copy()
# for value in df_mean["Fruits produced"].values:
#     if value > 0:
#         df_mean["Fruits produced"].replace(value, 1, inplace=True)
#     else:
#         df_mean["Fruits produced"].replace(value, 0, inplace=True)

In [None]:
tl.plotting.heatmap(x_axis="Sunlight [hours/day]", y_axis="Water [times/week]", z_axis="Fruits produced", df_X=df_X, df_mean=binary_df_mean)

In [None]:
tl_b_gp = tl.Emulator("b_gp")
tl_b_df = tl.Dataset("b_df")
tl_b_df.upload(binary_gardening_df)

tl_b_gp.train(tl_b_df, ["Sunlight [hours/day]", "Water [times/week]"], ["Fruits produced"])

predictions = tl_b_gp.predict(df_X)

In [None]:
tl_df_mean = pd.DataFrame(predictions[0][:100], columns=["Fruits produced"])

In [None]:
tl_b_gp.heatmap(x1_axis="Sunlight [hours/day]", x2_axis="Water [times/week]", y_axis="Fruits produced")

In [None]:
binary_tl_df_mean = tl_df_mean.copy()
# for value in binary_tl_df_mean["Fruits produced"].values:
#     if value > 0:
#         binary_tl_df_mean["Fruits produced"].replace(value, 1, inplace=True)
#     else:
#         binary_tl_df_mean["Fruits produced"].replace(value, 0, inplace=True)

In [None]:
tl.plotting.heatmap(x_axis="Sunlight [hours/day]", y_axis="Water [times/week]", z_axis="Fruits produced", df_X=gardening_df.iloc[:, :-1], df_mean=pd.DataFrame(gardening_df.iloc[:, -1]))