# Classifying all of MNIST

Current SOA does only 2 digits, `IBM=(0,1)`, `Google=(3,6)`. We shall attempt to do all 10 digits

This codebook is created for the purpose of trying to classify the full MNIST dataset using a hybrid quantum-classical neural network. The codebook is divided into 3 parts: 

0. Python imports
1. Data preparation 
2. Quantum neural network 
3. Classical neural network 
4. Hybrid quantum-classical neural network 
5. Results 

The current problem we're facing is

> The simulations run too slow. An average iteration in `Net` below for a classical layer takes ~10ns whereas for a quantum layer it takes ~0.5s, that too parallelized to (8/16 threads)

This is most likely since most of Qiskit is written in Python and not in C-family/Fortran top to bottom. In fact the only C layer in all of Qiskit is the `Aer` package which is why it takes 0.5s and not something like 15s.

There is no way I know of so far of circumventing this problem. the following are a few considerations:
- Using **TorchQuantum**: TorchQuantum does not support Apple Silicon installations (i am using an M2 laptop). The issue is tracked [here](https://github.com/mit-han-lab/torchquantum/issues/98)
- Use the **GPU**: The current Benchmark for M2 CPU is faster than the Colab GPU
- Using **Apple Silicon** to its full extent: While pytorch is already ready for it, Qiskit is not. The issue is tracked [here](https://github.com/Qiskit/qiskit-aer/issues/1762). With full Apple Silicon support, we can use the M2 to basically as much power as the same order of magnitude as Titan
- Using a **Different Algorithm**: See [QCNN.ipynb](./QCNN.ipynb)
- Using **Runtime Primitives**: The `EstimatorQNN` class actually returns the energy levels measured in various ways. The circuit is not learning when using those/I don't know how to use it (since I can't find a lot of examples online)
- Using `TorchConnector`: There is no speed/learning benefit. Under the hood it uses the same `EstimatorQNN` class

### The situation
We know for a fact this model works because under various configurations we are seeing learning happening. For $(0,1)$ case it is almost a perfect classifier. For $(0,1,3,6)$ case for small training size it is random but as the trainset becomes larger it starts becoming more and more better than random (I was able to reach ~40%, perfectly random is ~25%)

It stands to reason if Qiskit were faster we would see the same thing happening for the full MNIST dataset.

In [1]:
import numpy as np
import matplotlib.pyplot as plt

import torch
from torch.autograd import Function
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F

import qiskit
from qiskit.visualization import *
from qiskit_aer import AerSimulator
from qiskit_machine_learning.neural_networks import EstimatorQNN
from concurrent.futures import ThreadPoolExecutor
import time as t

from utils import gtt, make_filt

ModuleNotFoundError: No module named 'torchvision'

In [None]:
n_train = 1000;
n_test = 100;
qubits = 13
shots = 256
threads = 8

train_loader, test_loader = gtt(n_train, [i for i in range(10)])

In [None]:
def unweight(dic):
    return [k for k, v in dic.items() for i in range(v)]

def get_probabilities(results):

    if isinstance(results, dict):
        results = [results]

    probabilities = []
    for result in results:
        arr = np.mean([list(map(int, x)) for x in unweight(result)], axis=0)
        probabilities.append(arr)

    return probabilities[0] if len(probabilities)==1 else probabilities

```js
"IN IMAGE" = Conv = Conv = DropOut = Linear
||
Quantum
||
Linear = "OUTPIT"
```

This is based off of the Standard Hybrid Tutorial on the Qiskit docs to be used with PyTorch. The general idea is to use

2 Convolutional Layers &rarr; Linear as a link &rarr; Quantum Layer &rarr; Linear as a link &rarr; 10 outputs for 10 digits

In [None]:
class QuantumCircuit:
    def __init__(self, n_qubits):
        self.threads = threads; # set number of || threads
        circs = []
        for i in range(threads): # Creates 8 || identical circuits
            circ = qiskit.QuantumCircuit(qubits);
            all_qubits = [i for i in range(qubits)]
            circ.h(all_qubits)
            # Parametrisation
            params = [qiskit.circuit.Parameter('theta')]
            circ.rx(params[0], all_qubits)
            # run
            circ.measure_all()

            param_dict = {param: np.random.random() for param in params}
            bound_circuit = circ.assign_parameters(parameters = param_dict)

            circs.append(bound_circuit);

        self.circuits = circs;

    def runner(self, circuit): # Runs a circuit
        backend = AerSimulator()

        result = qiskit.execute(circuit, backend, shots=int(shots/threads)).result()
        result = get_probabilities(result.get_counts(circuit))
        return result

    def run(self, inputs): # || execution of circuits
        reses = None;
        with ThreadPoolExecutor(max_workers=len(self.circuits)) as executor:
            reses = list(executor.map(self.runner, self.circuits))

        return np.average(reses, axis=0)

In [None]:
class HybridFunction(Function): # more or less the same as in the tutorial
    """ Hybrid quantum - classical function definition """

    @staticmethod  # Note: the @staticmethod decorator is not strictly necessary here
    def forward(ctx, input, quantum_circuit):
        """ Forward pass computation """
        ctx.shift = np.pi / 2;  # Store the shift value for the backward pass
        # Store the quantum circuit for the backward pass
        ctx.quantum_circuit = quantum_circuit

        results = [];
        for i in range(len(input)):
            expectation_z = ctx.quantum_circuit.run(input[i].tolist())
            results.append(torch.tensor(np.array([expectation_z])))

        # Save the input and the result for the backward pass
        results = torch.stack(results).squeeze(1)
        ctx.save_for_backward(input, results)

        return results

    @staticmethod
    def backward(ctx, grad_output):
        """ Backward pass computation """
        input, expectation_z = ctx.saved_tensors  # Load the saved tensors
        # Convert the input to a numpy array
        input_list = np.array(input.tolist())

        shift_right = input_list + np.ones(input_list.shape) * ctx.shift # Shift right
        shift_left = input_list - np.ones(input_list.shape) * ctx.shift # Shift left

        gradients = []
        for i in range(len(input_list)):
            expectation_right = ctx.quantum_circuit.run(shift_right[i]) # Run the quantum circuit for the right shift
            expectation_left = ctx.quantum_circuit.run(shift_left[i]) # Run the quantum circuit for the left shift

            gradient = torch.tensor(np.array([expectation_right])) - \
                torch.tensor(np.array([expectation_left])) # Compute the gradient
            gradients.append(gradient)

        # gradients = np.array([gradients]).T
        gradients = torch.stack(gradients).squeeze(1)
        return gradients * grad_output.float(), None, None

In [None]:
class Hybrid(nn.Module): # more or less the same as in the tutorial
    """ Hybrid quantum - classical layer definition """

    def __init__(self):
        super(Hybrid, self).__init__()
        self.quantum_circuit = QuantumCircuit(10)

    def forward(self, input):
        return HybridFunction.apply(input, self.quantum_circuit)

In [None]:
class Net(nn.Module): # the actual neural net as mentioned in the tutorial
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=4)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=4)
        out_conv1 = F.max_pool2d(self.conv1(torch.rand(1,1,28,28)), 2);
        out_conv2 = F.max_pool2d(self.conv2(out_conv1), 2)
        self.dropout = nn.Dropout2d()
        self.fc1 = nn.Linear(out_conv2.view(1,-1).shape[1], qubits)
        self.hybrid = Hybrid()
        out_hybrid = self.hybrid(torch.rand(qubits,qubits))
        self.fc2 = nn.Linear(out_hybrid.shape[1], 10)

#         each conv reduces size, the more the better so that we ensure that the quantum does the heavy lifting

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout(x)
        x = x.view(x.shape[0], -1)
        x = self.fc1(x) # We don't relu this to prevent learning, we pass as-is to QC
        x = self.hybrid(x).type(torch.FloatTensor)
        x = self.fc2(x)
        return x;

model = Net();
print(model)

In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.001) # Adam optimizer
loss_func = nn.CrossEntropyLoss() # Cross entropy loss since we're doing classification

epochs = 20 # for now 20
loss_list = [3] # we need to intialize this to something, 3 is arbitrary

model.train() # Set the model to training mode

outputs = []
targets = []
for epoch in range(epochs):
    total_loss = []
    times = []
    for batch_idx, (data, target) in enumerate(train_loader):
        now = t.time()
        optimizer.zero_grad() # Clear the gradients
        # Forward pass
        output = model(data) # Forward pass

        outputs.append(torch.argmax(output))
        targets.append(target)

        loss = loss_func(output, target) # Loss

        loss.backward() # Backward pass
        optimizer.step() # Optimize the weights

        total_loss.append(loss.item())
        times.append(t.time() - now)

    print(f"Avg Itr Time: {np.round(np.average(times),1)}s x {len(times)} itrs = {np.round(np.sum(times)/60,1)}min")
    loss_list.append(sum(total_loss)/len(total_loss))

    diff = np.abs(loss_list[-1] - loss_list[-2]) /loss_list[-1];
    if diff <= 0.001: # Early stopping criterial loss diff = 0.1%
        break;

    print(f'Training [{100. * (epoch + 1) / epochs:.0f}%]\tLoss: {loss_list[-1]:.4f}')

In [None]:
plt.plot(loss_list)
plt.title('Hybrid NN Training Convergence')
plt.xlabel('Training Iterations')
plt.ylabel('CrossEntropy Loss')

In [None]:
model.eval() # Set the model to evaluation mode
with torch.no_grad(): # Don't compute gradients
    correct = 0
    for batch_idx, (data, target) in enumerate(test_loader): # Loop over the test set
        output = model(data)

        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()

        loss = loss_func(output, target)
        total_loss.append(loss.item())

    print('Performance on test data:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%'.format(
        sum(total_loss) / len(total_loss),
        correct / len(test_loader) * 100)
        )

In [None]:
# THIS IS JUST TO VISUALIZE THE PREDICTIONS
n_samples_show = 6
count = 0
fig, axes = plt.subplots(nrows=1, ncols=n_samples_show, figsize=(10, 3))

model.eval()
with torch.no_grad():
    for batch_idx, (data, target) in enumerate(test_loader):
        if count == n_samples_show:
            break
        output = model(data)
        print(output)

        pred = output.argmax(dim=1, keepdim=True)

        axes[count].imshow(data[0].numpy().squeeze(), cmap='gray')

        axes[count].set_xticks([])
        axes[count].set_yticks([])
        axes[count].set_title('Predicted {}'.format(pred.item()))

        count += 1

In [None]:
# For dark mode
from IPython.core.display import HTML
HTML("""
<style>
  html{filter:invert(1)}
  div.prompt{opacity: 0.5;}
  .btn-default{border-color: transparent;}
  #header-container{display:none !important;}
  div.cell.selected, div.cell.selected.jupyter-soft-selected{border-color: transparent;}
</style>
""")