In [1]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
import pennylane as qml
from pennylane import numpy as np
import os
from matplotlib import pyplot as plt

comp_device = "cuda" if torch.cuda.is_available() else "cpu"
batch_size = 16
epochs = 50
learning_rate = 5e-4
drop_rate = 0.6
n_qbits = 11
num_train_images = 3000
val_split = 0.25
num_val_images = int(num_train_images*val_split)
num_test_images = num_val_images
gamma_scheduler = 0.1
epochs_decay = 15
circuit_layers = 15

q_device = qml.device("default.qubit", wires=n_qbits)

project_dir = os.getcwd()
data_dir = os.path.join(project_dir, 'data')

In [2]:
def train_loop(dataloader, model, loss_fn, optimizer):
  
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    loss_value, correct = 0., 0.


    for X, y in dataloader:
        pred = model(X)
        loss = loss_fn(pred, y)
        loss_value += loss.item()
        correct += (pred.argmax(1) == y).type(torch.float).sum().item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    loss_value /= num_batches
    acc = correct/size
    print(f"Train: \n acc: {(100*acc):>0.1f}%, loss: {loss_value:>8f} \n")

    return loss_value, acc


def val_loop(dataloader, model, loss_fn):

    #total number of datapoints (num images in dataset)
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    val_loss, correct = 0., 0.

    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            val_loss += loss_fn(pred, y).item()
            #.sum() because you have to sum for all the preds in the batch
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    val_loss /= num_batches
    val_acc = correct/size
    print(f"Validation: \n val_acc: {(100*val_acc):>0.1f}%, val_loss: {val_loss:>8f} \n")

    return val_loss,val_acc

    
"""
PLOTTING FUNCTIONS
"""

def plot_loss_metric(loss, val_loss, metric, val_metric, metric_name='accuracy'):

    epochs_range = range(len(loss))

    plt.figure(figsize=(20, 10))
    plt.subplot(1, 2, 1)
    if metric_name == 'accuracy':
        plt.plot(epochs_range, metric, label='Training Accuracy')
        plt.plot(epochs_range, val_metric, label='Validation Accuracy')
        plt.title('Training and Validation Accuracy')
    elif metric_name == 'recall':
        plt.plot(epochs_range, metric, label='Training Recall')
        plt.plot(epochs_range, val_metric, label='Validation Recall')
        plt.title('Training and Validation Recall')
    elif metric_name == 'precision':
        plt.plot(epochs_range, metric, label='Training Precision')
        plt.plot(epochs_range, val_metric, label='Validation Precision')
        plt.title('Training and Validation Precision')
    plt.legend(loc='lower right')
    

    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, loss, label='Training Loss')
    plt.plot(epochs_range, val_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.title('Training and Validation Loss')

    plt.show()



def save_list(list, path):

    with open(path, 'w') as fp:
        
        for i,item in enumerate(list):
            fp.write('%d\t%f\n' %(int(i+1),item))


In [3]:
training_data = datasets.MNIST(
    root=data_dir,
    train=True,
    download=True,
    transform=transforms.ToTensor(),
)

test_val_data = datasets.MNIST(
    root=data_dir,
    train=False,
    download=True,
    transform=transforms.ToTensor(),
)

training_data = torch.utils.data.Subset(training_data, list(range(num_train_images)))
val_data = torch.utils.data.Subset(test_val_data, list(range(num_val_images)))
test_data = torch.utils.data.Subset(test_val_data, list(range(num_val_images, num_val_images+num_test_images)))

train_dl = DataLoader(training_data, batch_size=batch_size, shuffle=True)
val_dl = DataLoader(val_data, batch_size=batch_size)
test_dl = DataLoader(test_data, batch_size=batch_size)

In [4]:
@qml.qnode(q_device, interface='torch')
def VQC(inputs, weights):

    inputs = torch.arctan(inputs)
    qml.AngleEmbedding(inputs, wires=range(n_qbits), rotation='Y')
    qml.StronglyEntanglingLayers(weights=weights, wires=range(n_qbits))

    return [qml.expval(qml.PauliZ(j)) for j in range(n_qbits)]


weights_shape = {"weights": (circuit_layers,n_qbits,3)}

class HybridCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.q_layer = qml.qnn.TorchLayer(VQC, weights_shape)
        self.conv1 = nn.Conv2d(1,8,3, padding=1)
        self.conv2 = nn.Conv2d(8,16,3, padding=1)
        self.conv3 = nn.Conv2d(16,32,3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.flatten = nn.Flatten()
        self.dense1 = nn.Linear(32*3*3, 64)
        self.dense2 = nn.Linear(64,11)
        self.dense3 = nn.Linear(11,10)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_rate)
        nn.init.xavier_uniform_(self.q_layer.weights)

    def forward(self, x):
        
        x = self.conv1(x)
        x = self.relu(x)
        x = self.pool(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.pool(x)
        x = self.conv3(x)
        x = self.relu(x)
        x = self.pool(x)
        x = self.flatten(x)
        x = self.dense1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.dense2(x)
        x = self.q_layer(x)
        x = self.dense3(x)
        
        return x

In [None]:
hybrid_model = HybridCNN()
opt = torch.optim.Adam(hybrid_model.parameters(), lr=learning_rate)
loss_func = nn.CrossEntropyLoss()

In [None]:
loss_list = []
val_loss_list = []
acc_list = []
val_acc_list = []

for t in range(epochs):
    print(f"-------------------------------\nEpoch {t+1}")
    
    loss, acc = train_loop(train_dl, hybrid_model, loss_func, opt)
    val_loss, val_acc = val_loop(val_dl, hybrid_model, loss_func)
    loss_list.append(loss)
    acc_list.append(acc)
    val_loss_list.append(val_loss)
    val_acc_list.append(val_acc)


lists_dir = os.path.join(project_dir, 'nqbits_{}_layers_{}'.format(n_qbits, circuit_layers))

if os.path.exists(lists_dir):
    raise Exception("Directory name already used")
else:
    os.mkdir(lists_dir)

loss_file = 'loss_nqbits_{}_layers_{}.txt'.format(n_qbits, circuit_layers)
loss_file_path = os.path.join(lists_dir, loss_file)

val_loss_file = 'val_loss_nqbits_{}_layers_{}.txt'.format(n_qbits, circuit_layers)
val_loss_file_path = os.path.join(lists_dir, val_loss_file)

acc_file = 'acc_nqbits_{}_layers_{}.txt'.format(n_qbits, circuit_layers)
acc_file_path = os.path.join(lists_dir, acc_file)

val_acc_file = 'val_acc_nqbits_{}_layers_{}.txt'.format(n_qbits, circuit_layers)
val_acc_file_path = os.path.join(lists_dir, val_acc_file)

save_list(loss_list, loss_file_path)
save_list(val_loss_list, val_loss_file_path)
save_list(acc_list, acc_file_path)
save_list(val_acc_list, val_acc_file_path)


test_loss, test_acc = val_loop(test_dl, hybrid_model, loss_func)

with open(os.path.join(project_dir, 'loss_and_acc'), 'a') as fp:
    
    fp.write('%d\t%d\t%f\t%f\n' % (n_qbits, circuit_layers, test_loss, test_acc))