In [None]:
TEAM_NAME = "EXPERIMENTS"  # enter team name

In [None]:
import warnings

warnings.filterwarnings("ignore")

import csv
import json
import os
import time
from itertools import chain, combinations

import matplotlib.lines as mlines
import matplotlib.pyplot as plt
import pennylane as qml
import pennylane.numpy as np
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm

In [None]:
seed = 100232 ## seed
# Set training hyperparameters (modify as needed)

shots = 500  # number of circuit executions
iterations = 50  # number of training iterations
batch_size = 30  # number of training examples used in one iteration
stepsize = 0.125  # learning rate, positive value between 0.0 and 1.0

In [None]:
DATA_PATH = "data/"

file = open(DATA_PATH + "params.json")

params = json.load(file)
delta = params["delta"]
n_points = params["n_points"]

file.close()

# Load data
Xs = np.zeros(shape=(n_points, 2))
Ys = np.zeros(shape=(n_points,))

with open(DATA_PATH + "binary_data.csv", mode="r") as file:
    csvFile = csv.reader(file)
    for i, row in enumerate(csvFile):
        Xs[i] = np.array([float(row[0]), float(row[1])])
        Ys[i] = float(row[2])
        if i == n_points:
            break

n_samples = 200  # number of train/test samples
X_data, Y_data = Xs[:n_samples], Ys[:n_samples]
X_train, X_test, Y_train, Y_test = train_test_split(X_data, Y_data)

def powerset(iterable, mx):
    "powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)"
    s = list(iterable)
    pset = chain.from_iterable(combinations(s, r) for r in range(len(s) + 1))
    return [l for l in list(pset) if len(l) == mx]

n_wires = 2  # number of qubits
S_size = 2  # number of interactions considered
depth = 2  # number of layers in ansatz
pset = powerset(range(n_wires), S_size)

def encode_data(x):
    """Non-linear encoding (transformation) of one input data vector

    Args:
        x : shape (2,) tensor containing one input data vector

    Returns:
        triple of data encoded coefficients phi_1, phi_2, phi_{1,2}
    """

    return x[0], x[1], (np.pi - x[0]) * (np.pi - x[1])

def feature_map(x):
    """Short depth feature map with entanglement

    Args:
        x : shape (3,) tensor containing one encoded data vector
    """
    for _ in range(2):
        qml.Hadamard(wires=0)
        qml.Hadamard(wires=1)
        qml.RX(x[0], wires=0)
        qml.RX(x[1], wires=1)
        qml.CNOT(wires=[0, 1])
        qml.RX(x[2], wires=1)
        qml.CNOT(wires=[0, 1])

def ansatz(params):
    """VQC ansatz using single-qubits unitaries and entangling gates

    Args:
        params : shape (`depth` + 1, `n_wires`, 3) tensor containing trainable parameters
    """
    for j in range(n_wires):
         qml.Rot(params[0, j, 0], params[0, j, 1], params[0, j, 2], wires=j)
    
    for i in range(1,depth+1):
         for s in pset:
            qml.CZ(wires=s)
            for j in range(n_wires):
                qml.Rot(params[i, j, 0], params[i, j, 1], params[i, j, 2], wires=j)

def circuit(params, x):
    """Havlicek et al. variational quantum circuit

    Args:
        params : shape (`depth` + 1, `n_wires`, 3) tensor containing trainable parameters
        x : shape (2,) tensor containing one input data vector

    Returns:
        shape(2 * 'n_wires',) tensor containing Z basis measurement probability on each qubit
    """

    feature_map(encode_data(x))  # prepare initial feature map state
    ansatz(params)  # apply discriminator circuit

    return qml.probs(wires=range(n_wires))


dev_local = qml.device("default.qubit", wires=n_wires, shots=shots)

qnode_local = qml.QNode(circuit, dev_local)

np.random.seed(seed)
params = (
    2 * np.pi * np.random.randn(depth + 1, n_wires, 3, requires_grad=True)
)  # random initial circuit parameters

def err_prob(params, x, y, R, bias=0):
    """Error probability of assigning a wrong label

    Args:
        x : shape (2,) tensor containing one input data vector
        y : shape (1,) tensor containing associated label
        params : shape (`depth` + 1, `n_wires`, 3) tensor containing trainable parameters
        R : int number of circuit evaluations (shots)
        bias : optional bias parameter in [-1,+1]

    Returns:
        probability that the VQC computed label m(x) = y' is assigned incorrectly
    """

    bprobs = qnode_local(params, x)  # probabilities for each basis state
    cprobs = (
        bprobs[0] + bprobs[3],
        bprobs[1] + bprobs[2],
    )  # probabilities for each eigenstate of parity Z_1 Z_2
    prob_correct = cprobs[int(0.5 - 0.5 * y)]  # probability of choosing correct label

    # Intermediate evaluation of equation from supplement using derived p_y(x) i.e. prob_correct
    val = (np.sqrt(R) * (0.5 - prob_correct - 0.5 * y * bias)) / (
        np.sqrt(2 * (1 - prob_correct) * prob_correct)
    )

    return 1 / (
        1 + np.exp(-val)
    )  # binomial CDF for R >> 1 approximated using sigmoid function

def cost(params, Xs, Ys, R):
    """Cost function for circuit optimization

    Args:
        params : shape (`depth` + 1, `n_wires`, 3) tensor containing trainable parameters
        Xs : shape (n,2) tensor containing input data vectors
        Ys : shape (n,) tensor containing associated labels
        R : int number of circuit evaluations (shots)

    Returns:
        the empirical risk i.e. the error probability averaged over all data points

    """

    ep_sum = 0  # initialize error probabilities sum
    for i in range(
        Xs.shape[0]
    ):  # compute sum of error probabilities over all data points
        ep_sum += err_prob(params, Xs[i], Ys[i], R)

    return ep_sum / Xs.shape[0]  # return average over all data points


In [None]:
# Gradient-based training
costs = []
times = []

opt = qml.AdagradOptimizer(stepsize)

for i in range(iterations):

    # Generate the batch
    batch_index = np.random.randint(0, X_train.shape[0], (batch_size,))
    X_batch = X_train[batch_index]
    Y_batch = Y_train[batch_index]

    t0 = time.time()

    # Update parameters by a single step.
    params = opt.step(lambda var: cost(var, X_batch, Y_batch, shots), params)

    t1 = time.time()

    # Compute cost
    cost_current = cost(params, X_train, Y_train, shots)
    

    costs.append(cost_current)
    times.append(t1 - t0)
    print(f"Cost at step {i}: {round(float(costs[-1]), 4)}") ##checkflag
    # print(f"Time at step {i}: {round(float(times[-1]), 4)}") ##checkflag

print(f"Initial cost: {round(float(costs[0]), 4)}")
print(f"Cost at step {iterations}: {round(float(costs[-1]), 4)}")
print(f"Avg time per step: {round(sum(times)/len(times), 4)} sec")

# Output folder
# time_str = time.strftime("%Y%m%d-%H%M%S")
VQC_OUT = f"vqc/{str(seed)}"
os.makedirs(VQC_OUT, exist_ok=True)

params_csv = f"{VQC_OUT}/{TEAM_NAME}_params.csv"
costs_csv = f"{VQC_OUT}/{TEAM_NAME}_costs.csv"

np.savetxt(params_csv, params.reshape(n_wires * (depth + 1), 3))
print(f"\nParameters saved to {params_csv}")

np.savetxt(
    costs_csv,
    np.vstack((range(iterations), costs[:iterations])).T,
    delimiter=", ",
)
print(f"Costs saved to {costs_csv}")

In [None]:
split_xs = lambda x: [[x[i][j] for i in range(len(x))] for j in [0, 1]]

xi, xj = split_xs(Xs)
xi_train, xj_train = split_xs(X_train)
xi_test, xj_test = split_xs(X_test)

final_params = params_csv
# Calculate accuracy
Y_pred = np.zeros(shape=Y_test.shape)

for ind, x in enumerate(X_test):

    # How many of the qubit states are orthogonal?
    bprobs = qnode_local(final_params, x)  # probabilities for each basis state
    cprobs = (
        bprobs[0] + bprobs[3],
        bprobs[1] + bprobs[2],
    )  # probabilities for each eigenstate of parity Z_1 Z_2

    # out of the majority of the number shots,
    # classify +1 for orthogonal and -1 for correspondence
    if cprobs[0] >= 0.5:
        Y_pred[ind] = 1.0
    else:
        Y_pred[ind] = -1.0

preds = [1 if Y_pred[i] == Y_test[i] else 0 for i in range(len(Y_pred))]

n_preds = len(Y_pred)
n_correct = sum(preds)
n_incorrect = n_preds - n_correct
accuracy = n_correct / n_preds

# Plot predictions
preds_png = f"{VQC_OUT}/{TEAM_NAME}_preds.png"

label_correct = mlines.Line2D(
    [],
    [],
    color="none",
    marker="o",
    markerfacecolor="y",
    markeredgecolor="y",
    label=f"correct ({n_correct})",
)
label_incorrect = mlines.Line2D(
    [],
    [],
    color="none",
    marker="o",
    markerfacecolor="k",
    markeredgecolor="k",
    label=f"incorrect ({n_incorrect})",
)

plt.scatter(xi, xj, marker="o", c=["r" if v == 1 else "b" for v in Ys])
plt.scatter(xi_test, xj_test, c=["y" if v == 1 else "k" for v in preds])
plt.xticks([0, 2 * np.pi], [r"$0$", r"$2\pi$"])
plt.yticks([0, 2 * np.pi], [r"$0$", r"$2\pi$"])
plt.xlabel("$\\theta$")
plt.ylabel("$\phi$")
plt.legend(handles=[label_correct, label_incorrect], loc="upper right")
plt.title(f"Predictions, $\Delta = {delta}$")
plt.savefig(preds_png)
plt.show()

In [None]:
fig, ax = plt.subplots(1,2)

ax[0,0].set_title("VQC training")
ax[0,0].set_xlabel("Trial Step")
ax[0,0].set_ylabel("$R_{emp}$")
ax[0,0].plot(range(1, iterations + 1), costs)

ax[0,1].set_title(f"Predictions, $\Delta = {delta}$")
ax[0,1].scatter(xi, xj, marker="o", c=["r" if v == 1 else "b" for v in Ys])
ax[0,1].scatter(xi_test, xj_test, c=["y" if v == 1 else "k" for v in preds])
ax[0,1].xticks([0, 2 * np.pi], [r"$0$", r"$2\pi$"])
ax[0,1].yticks([0, 2 * np.pi], [r"$0$", r"$2\pi$"])
ax[0,1].xlabel("$\\theta$")
ax[0,1].ylabel("$\phi$")
ax[0,1].legend(handles=[label_correct, label_incorrect], loc="upper right")


plt.savefig(preds_png)

plt.show()