<a href="https://colab.research.google.com/github/mrdobson/SENSIP_2021_REU/blob/main/mnist_qnn_2class_loop.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install qiskit
!pip install qiskit_machine_learning
!pip install pylatexenc



In [None]:
# not all of these are critical, but to be safe just including everything for now
import numpy as np
import matplotlib.pyplot as plt
import time
import torch
import qiskit

from torch import tensor
from torch import cat, no_grad
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.nn import (Module, Conv2d, Linear, Dropout2d, NLLLoss, CrossEntropyLoss,
                      MaxPool2d, Flatten, Sequential, ReLU)
import torch.optim as optim
import torch.nn.functional as F

from torchsummary import summary

from qiskit import Aer, QuantumCircuit
from qiskit.opflow import Z, I, StateFn, PauliSumOp, AerPauliExpectation, ListOp, Gradient
from qiskit.utils import QuantumInstance
from qiskit.circuit import Parameter
from qiskit.circuit.library import RealAmplitudes, ZZFeatureMap
from qiskit.algorithms.optimizers import COBYLA, L_BFGS_B, ADAM

from qiskit_machine_learning.neural_networks import TwoLayerQNN, CircuitQNN
from qiskit_machine_learning.algorithms.classifiers import NeuralNetworkClassifier, VQC
from qiskit_machine_learning.algorithms.regressors import NeuralNetworkRegressor, VQR

from qiskit_machine_learning.connectors import TorchConnector
from qiskit_machine_learning.exceptions import QiskitMachineLearningError
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import confusion_matrix

In [None]:
qi_qasm = QuantumInstance(Aer.get_backend('qasm_simulator'), shots=1024)
qi_sv = QuantumInstance(Aer.get_backend('statevector_simulator'))

# select whether to run on GPU or not
gpu = True

# predefine number of epochs to train over
epochs = 7

# if use_cuda == true then use GPU accel functions
use_cuda = torch.cuda.is_available()

print('CUDA available:', use_cuda)

if use_cuda and gpu:
    device = torch.device('cuda')
    print('Training on GPU...')
else:
    device = torch.device('cpu')
    print('Training on CPU...')

CUDA available: True
Training on GPU...


In [None]:
# full data set range
data_set = list(range(0,10))
print(data_set)

QC_outputs = ['000', '001', '010', '011', '100', '101', '110', '111']

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


# Quantum Circuit Options

In [None]:
class QuantumCircuit:
    """ 
    This class provides a simple interface for interaction 
    with the quantum circuit 
    """
    
    def __init__(self, n_qubits, backend, shots):
        # --- Circuit definition ---
        self._circuit = qiskit.QuantumCircuit(n_qubits)
        
        all_qubits = [i for i in range(n_qubits)]
                
            
        self.theta_0 = qiskit.circuit.Parameter('theta0')
        self.theta_1 = qiskit.circuit.Parameter('theta1')
        self.theta_2 = qiskit.circuit.Parameter('theta2')
        self.theta_3 = qiskit.circuit.Parameter('theta3')
        self.theta_4 = qiskit.circuit.Parameter('theta4')
        self.theta_5 = qiskit.circuit.Parameter('theta5')
        self.theta_6 = qiskit.circuit.Parameter('theta6')

        
        self._circuit.h(all_qubits)
        self._circuit.barrier()
        self._circuit.ry(self.theta_0, all_qubits)
        self._circuit.cz(0,1)
        self._circuit.cz(1,2)
        self._circuit.ry(self.theta_1, 0)
        self._circuit.ry(self.theta_2, 1)
        self._circuit.ry(self.theta_3, 2)
        self._circuit.cz(0,1)
        self._circuit.cz(1,2)
        self._circuit.ry(self.theta_4, 0)
        self._circuit.ry(self.theta_5, 1)
        self._circuit.ry(self.theta_6, 2)

        self._circuit.measure_all()
        # ---------------------------

        self.backend = backend
        self.shots = shots
    
    def run(self, thetas):
        job = qiskit.execute(self._circuit, 
                             self.backend, 
                             shots = self.shots,
                             parameter_binds = [{self.theta_0: thetas[0],
                                                 self.theta_1: thetas[1],
                                                 self.theta_2: thetas[2],
                                                 self.theta_3: thetas[3],
                                                 self.theta_4: thetas[4],
                                                 self.theta_5: thetas[5],
                                                 self.theta_6: thetas[6],}])
        counts = job.result().get_counts(self._circuit)
        
        expects = np.zeros(8)
        for k in range(8):
            key = QC_outputs[k]
            perc = counts.get(key, 0) /self.shots
            expects[k] = perc
        
        return expects

In [None]:
class QuantumCircuit:
    def __init__(self, backend, shots):
        # circuit definition
        self._circuit = qiskit.QuantumCircuit(2)

        self.theta_0 = qiskit.circuit.Parameter('theta0')
        self.theta_1 = qiskit.circuit.Parameter('theta1')

        self._circuit.h(0)
        self._circuit.cx(0, 1)
        self._circuit.barrier()
        self._circuit.ry(self.theta_0, 0)
        self._circuit.ry(self.theta_1, 1)
        self._circuit.barrier()
        self._circuit.h(0)

        self._circuit.measure(0, 0)

        self.backend = backend
        self.shots = shots
      
    def run(self, thetas):
        job = qiskit.execute(self._circuit,
                             self.backend,
                             shots = self.shots,
                             parameter_binds = [{self.theta_0: thetas[0],
                                                 self.theta_1: thetas[1]}])
        counts = job.result().get_counts(self._circuit)

        expects = np.zeros(2)
        for k in range(2):
            key = QC_outputs[k]
            perc = counts.get(key, 0) /self.shots
            expects[k] = perc

        return expects

In [None]:
# validate bell circuit
circuit = QuantumCircuit(qi_qasm, 100)
circuit._circuit.draw('mpl')

CircuitError: ignored

# Data Loaders

In [None]:
# training data load function
def load_training(val_1=0, val_2=1, n_samples=70):
    # returns tuple with X_train.data and X_train.targets <-- what I don't have from iris
    X_train = datasets.MNIST(root='./data', train=True, download=True,
                             transform=transforms.Compose([transforms.ToTensor()]))

    # leave only labels samp_val_1 and samp_val_2
    idx = np.append(np.where(X_train.targets == val_1)[0][:n_samples],
                    np.where(X_train.targets == val_2)[0][:n_samples])

    X_train.data = X_train.data[idx]
    X_train.targets = X_train.targets[idx]

    # samp vals tunable above in the settings, this is to fit to our data loader
    X_train.targets[X_train.targets==val_1] = 0
    X_train.targets[X_train.targets==val_2] = 1
    ### DEBUG vnv that we remapped the correct samples 
    #print("xtrain targets ", X_train.targets, "\n")

    # perform training load
    train_loader = DataLoader(X_train, batch_size=1, shuffle=True)

    # return loader
    return train_loader


In [None]:
# testing data load function
def load_testing(val_1=0, val_2=1, n_samples=30):
    X_test = datasets.MNIST(root='./data', train=False, download=True,
                            transform=transforms.Compose([transforms.ToTensor()]))
    # selecting which samples to keep (first 100 0s and 1s)
    idx = np.append(np.where(X_test.targets == val_1)[0][:n_samples], 
                    np.where(X_test.targets == val_2)[0][:n_samples])

    X_test.data = X_test.data[idx]
    X_test.targets = X_test.targets[idx]

    # samp vals tunable above in the settings, to fit to data loader
    X_test.targets[X_test.targets==val_1] = 0
    X_test.targets[X_test.targets==val_2] = 1
    ### DEBUG    
    #print("xtest targets ", X_train.targets, "\n")

    # perform testing load
    test_loader = DataLoader(X_test, batch_size=1, shuffle=True)

    # return loader
    return test_loader


# CPU Functions

In [None]:
# function to perform model training
def perform_training(model, optimizer, loss_func, train_loader, val_1=0, val_2=1, epochs=5):
    # start training timer
    start_train = time.time()

    loss_list = [] # store loss history
    model.train()  # place model in training mode
    print('\nBegin training for samples {0} and {1}:'.format(val_1, val_2))

    for epoch in range(epochs):
        print('Starting epoch {} for 2 qubits'.format(epoch))
        total_loss = []
        for batch_idx, (data, target) in enumerate(train_loader):
            optimizer.zero_grad(set_to_none=True) # init gradient
            output = model(data)             # forward pass
            ### DEBUG
            #print("target is: ", target)
            #print("output is: ", output)
            loss = loss_func(output, target) # calc loss
            loss.backward()                  # backward pass
            optimizer.step()                 # optimize weights
            total_loss.append(loss.item())   # store loss
        loss_list.append(sum(total_loss)/len(total_loss))
        print('Training [{:0f}%]\tLoss: {:.4f}'.format(
              100. * (epoch + 1) / epochs, loss_list[-1]))
        
    # finish training timer, report training runtime
    end_train = time.time()
    print('Training runtime is: ', (end_train - start_train)/60, ' min')
        
    # plot results of cost reduction
    plt.figure()
    plt.plot(loss_list)
    plt.title('Cost Reduction for 2 qubits between {0} and {1}:'.format(val_1, val_2))
    plt.xlabel('Training Iterations ({0} epochs)'.format(epochs))
    plt.ylabel('Cross Entropy Loss')
    plt.show()

    return total_loss

In [None]:
# function to test and evaluate model
def perform_eval(model, loss_func, test_loader, total_loss, val_1=0, val_2=1):
    # start eval time
    start_eval = time.time()

    model.eval() # set into eval mode
    print('Begin eval for samples {0} and {1}:'.format(val_1, val_2))

    with no_grad():
        correct = 0
        for batch_idx, (data, target) in enumerate(test_loader):
            output = model(data)
            if len(output.shape) == 1:
                output = output.reshape(1, *output.shape)
            
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
        
            loss = loss_func(output, target)
            total_loss.append(loss.item())
        # batch_size goes where the 1 is here
        print('Performance on test data:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%'
              .format(sum(total_loss) / len(total_loss),
              correct / len(test_loader) / 1 * 100))

    # commenting this out because it reports it at a strange time in the console
    # cool visual, but doesn't work as expected, maybe put into a different function?
    """    
    # show handful of examples for 2 qubit system
    n_samples_show = 6
    count = 0
    fig, axes = plt.subplots(nrows=1, ncols=n_samples_show, figsize=(10, 3))

    model.eval()
    with no_grad():
        for batch_idx, (data, target) in enumerate(test_loader):
            if count == n_samples_show:
                break
            output = model(data)
        
            pred = output.argmax(dim=1, keepdim=True) 
        
            # remap class values to samp_val 1 and 2
            # where pred == 0 set it to samp_val_1, same for pred == 1
            # below lines don't work for this, need to figure out another way but not critical
            #pred.item[pred.item()==0] = samp_val_1
            #pred.item[pred.item()==1] = samp_val_2

            axes[count].imshow(data[0].numpy().squeeze(), cmap='gray')
            axes[count].set_xticks([])
            axes[count].set_yticks([])
            axes[count].set_title('Predicted {}'.format(pred.item()))
        
            count += 1    
    """  
    
    # end eval time
    end_eval = time.time()
    print('Evaluation time is: ', (end_eval - start_eval)/60, ' min')

# GPU Functions

In [None]:
def perform_training_gpu(model, optimizer, loss_func, train_loader, val_1=0, val_2=1, epochs=5):
    # start training timer
    start_train = time.time()

    loss_list = [] # store loss history
    model.train()  # place model in training mode
    print('\nBegin training for samples {0} and {1}:'.format(val_1, val_2))

    for epoch in range(epochs):
        print('Starting epoch {} for 2 qubits'.format(epoch))
        total_loss = []
        for batch_idx, (data, target) in enumerate(train_loader):
            optimizer.zero_grad(set_to_none=True) # init gradient

            # gpu optimize data and target tensors
            data = data.cuda()
            target = target.cuda()

            output = model(data).cuda()             # forward pass
            ### DEBUG
            #print("target is: ", target)
            #print("output is: ", output)
            
            loss = loss_func(output, target) # calc loss
            loss.backward()                  # backward pass
            optimizer.step()                 # optimize weights
            total_loss.append(loss.item())   # store loss
        loss_list.append(sum(total_loss)/len(total_loss))
        print('Training [{:0f}%]\tLoss: {:.4f}'.format(
              100. * (epoch + 1) / epochs, loss_list[-1]))
        
    # finish training timer, report training runtime
    end_train = time.time()
    print('Training runtime is: ', (end_train - start_train)/60, ' min')
        
    # plot results of cost reduction
    plt.figure()
    plt.plot(loss_list)
    plt.title('Cost Reduction for 2 qubits between {0} and {1}:'.format(val_1, val_2))
    plt.xlabel('Training Iterations ({0} epochs)'.format(epochs))
    plt.ylabel('Cross Entropy Loss')
    plt.show()

    return total_loss

In [None]:
# function to test and evaluate model
def perform_eval_gpu(model, loss_func, test_loader, total_loss, val_1=0, val_2=1):
    # start eval time
    start_eval = time.time()

    model.eval() # set into eval mode
    print('Begin eval for samples {0} and {1}:'.format(val_1, val_2))

    with no_grad():
        correct = 0
        for batch_idx, (data, target) in enumerate(test_loader):

            # optimize for gpu acceleration
            data = data.cuda()
            target = target.cuda()

            output = model(data).cuda()

            if len(output.shape) == 1:
                output = output.reshape(1, *output.shape)
            
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
        
            loss = loss_func(output, target)
            total_loss.append(loss.item())
        # batch_size goes where the 1 is here
        print('Performance on test data:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%'
              .format(sum(total_loss) / len(total_loss),
              correct / len(test_loader) / 1 * 100))

    # commenting this out because it reports it at a strange time in the console
    # cool visual, but doesn't work as expected, maybe put into a different function?
    """    
    # show handful of examples for 2 qubit system
    n_samples_show = 6
    count = 0
    fig, axes = plt.subplots(nrows=1, ncols=n_samples_show, figsize=(10, 3))

    model.eval()
    with no_grad():
        for batch_idx, (data, target) in enumerate(test_loader):
            if count == n_samples_show:
                break
            output = model(data)
        
            pred = output.argmax(dim=1, keepdim=True) 
        
            # remap class values to samp_val 1 and 2
            # where pred == 0 set it to samp_val_1, same for pred == 1
            # below lines don't work for this, need to figure out another way but not critical
            #pred.item[pred.item()==0] = samp_val_1
            #pred.item[pred.item()==1] = samp_val_2

            axes[count].imshow(data[0].numpy().squeeze(), cmap='gray')
            axes[count].set_xticks([])
            axes[count].set_yticks([])
            axes[count].set_title('Predicted {}'.format(pred.item()))
        
            count += 1    
    """

    # end eval time
    end_eval = time.time()
    print('Evaluation time is: ', (end_eval - start_eval)/60, ' min')

# QNN Definition

In [None]:
# set up QNN elements
num_inputs = 2

# ZZ is 2nd order Pauli expansion circuit
fm = ZZFeatureMap(num_inputs)
#fm.draw(output='mpl')

# RealAmplitudes is used as an ansatz for ML, heuristic trial wave func
ansatz = RealAmplitudes(num_inputs, reps=1)
#ansatz.draw(output='mpl')

# define observable
observable = PauliSumOp.from_list([('Z'*num_inputs, 1)])
#print(observable)

# define two layer QNN
qnn = TwoLayerQNN(num_inputs, 
                  feature_map=fm, 
                  ansatz=ansatz, 
                  observable=observable,
                  quantum_instance=qi)
#print(qnn.operator)

In [None]:
class Net(Module):

    def __init__(self):
        super().__init__()
        self.conv1 = Conv2d(1, 2, kernel_size=5)
        self.conv2 = Conv2d(2, 16, kernel_size=5)
        self.dropout = Dropout2d()
        self.fc1 = Linear(256, 64)
        self.fc2 = Linear(64, 2)         # 2-dimensional input to QNN
        self.qnn = TorchConnector(qnn)
        self.fc3 = Linear(1, 1)          # 1-dimensional output from QNN

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout(x)
        x = x.view(1, -1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        x = x.cpu()
        print('x type is: ', type(x))
        x = self.qnn(x)  # apply QNN
        # typecast qnn output to a cuda tensor
        #x = x.type(dtype=torch.cuda.FloatTensor)
        x = self.fc3(x)
        return cat((x, 1 - x), -1)

if gpu:
    model = Net().cuda()
else:
    model = Net()

#summary(model, (1, 28, 28))

In [None]:
# define model, optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=0.001)
if gpu:
    loss_func = CrossEntropyLoss().cuda()
else:
    loss_func = CrossEntropyLoss()

# Main Program

In [None]:
# total program runtime
start_eval = time.time()

acc_vals = []

# main program loop I guess?
for i in data_set:
    # want to exclude 9 to avoid repeats
    if i != 9:
        # iterate through indices larger than i to create subset
        for j in data_set:
            if j > i:
                subset = [i, j]

                # load a subset of data for training
                trn_load = load_training(i, j, 210)
                # load subset for testing
                tst_load = load_testing(i, j, 90)

                ### DEBUG
                print("model is: ", type(model))
                print("optim is: ", type(optimizer))
                print("loss func is: ", type(loss_func))
                print("test load is: ", type(tst_load))
                print("type of i is: ", type(i))
                print("type of j is: ", type(j))
                print("type of epoch is: ", type(epochs))

                # start training for subset
                if gpu:
                    tot_loss = perform_training_gpu(model, optimizer, loss_func, tst_load, i, j, epochs)
                else:
                    tot_loss = perform_training(model, optimizer, loss_func, tst_load, i, j, epochs)

                ### DEBUG
                print("tot loss type is: ", type(tot_loss))

                # then evaluate model for subset
                if gpu:
                    perform_eval_gpu(model, loss_func, tst_load, tot_loss, i, j)
                else:
                    perform_eval(model, loss_func, tst_load, tot_loss, i, j)

                # add a data structure to hold the accuracy values for each set of elements



# end eval time
end_eval = time.time()
print('Total program runtime is: ', (end_eval - start_eval)/60, ' min')

model is:  <class '__main__.Net'>
optim is:  <class 'torch.optim.adam.Adam'>
loss func is:  <class 'torch.nn.modules.loss.CrossEntropyLoss'>
test load is:  <class 'torch.utils.data.dataloader.DataLoader'>
type of i is:  <class 'int'>
type of j is:  <class 'int'>
type of epoch is:  <class 'int'>

Begin training for samples 0 and 1:
Starting epoch 0 for 2 qubits
x type is:  <class 'torch.Tensor'>


  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)


TypeError: ignored