In [None]:
import numpy as np
import matplotlib.pyplot as plt

import torch
from torch.autograd import Function
from torchvision import datasets, transforms
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F

from torchsummary import summary

from qiskit_aer import AerSimulator
from qiskit.visualization import *
from qiskit import QuantumCircuit as QC, transpile
from qiskit.circuit import Parameter


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Device:', device)

if device.type == 'cuda':
    print('Training on GPU...')
    print(f'GPU: {torch.cuda.get_device_name(0)}')
else:
    print('Training on CPU...')

In [None]:
class QuantumCircuit:
    def __init__(self, n_qubits, backend, shots):
        self._circuit = QC(n_qubits)
 
        all_qubits = [i for i in range(n_qubits)]
        self.theta = Parameter('theta')
 
        self._circuit.h(all_qubits)
        self._circuit.barrier()
        self._circuit.ry(self.theta, all_qubits)
 
        self._circuit.measure_all()

        self.backend = backend
        self.shots = shots
 
    def run(self, thetas):
        circuits = [self._circuit.assign_parameters({self.theta: theta})
                   for theta in thetas]
 
        transpiled = transpile(circuits, self.backend)
        job = self.backend.run(transpiled, shots=self.shots)
        result = job.result()

        if len(thetas) == 1:
            counts_dict = result.get_counts(0)
        else:
            counts_dict = result.get_counts(circuits[0])
 
        counts = np.array(list(counts_dict.values()))
        states = np.array(list(counts_dict.keys())).astype(float)
 
        probabilities = counts / self.shots
        expectation = np.sum(states * probabilities)
 
        return np.array([expectation])

In [None]:
simulator = AerSimulator(method='automatic')

circuit = QuantumCircuit(1, simulator, 100)
print('Expected value for rotation pi {}'.format(circuit.run([np.pi])[0]))
circuit._circuit.draw('mpl')

In [None]:
class HybridFunction(Function):
    @staticmethod
    def forward(ctx, inputs, quantum_circuit, shift):
        ctx.shift = shift
        ctx.quantum_circuit = quantum_circuit

        expectation_z = []
        for input in inputs:
            expectation_z.append(ctx.quantum_circuit.run(input.tolist()))

        expectation_z = np.array(expectation_z)
        result = torch.tensor(expectation_z).to(inputs.device)

        ctx.save_for_backward(inputs, result)
        return result

    @staticmethod
    def backward(ctx, grad_output):
        input, expectation_z = ctx.saved_tensors
        input_list = np.array(input.tolist())
 
        shift_right = input_list + np.ones(input_list.shape) * ctx.shift
        shift_left = input_list - np.ones(input_list.shape) * ctx.shift
 
        gradients = []
        for i in range(len(input_list)):
            expectation_right = ctx.quantum_circuit.run(shift_right[i])
            expectation_left  = ctx.quantum_circuit.run(shift_left[i])
 
            gradient = expectation_right - expectation_left
            gradients.append(gradient)
 
        gradients = np.array(gradients)
        gradients = torch.from_numpy(gradients).to(input.device)
 
        gradients = gradients.unsqueeze(-1)
        gradients = gradients.transpose(0, 1)

        return gradients.float() * grad_output.float(), None, None

In [None]:
class Hybrid(nn.Module):
    def __init__(self, backend, shots, shift):
        super(Hybrid, self).__init__()
        self.quantum_circuit = QuantumCircuit(1, backend, shots)
        self.shift = shift
 
    def forward(self, input):
        return HybridFunction.apply(input, self.quantum_circuit, self.shift)

In [None]:
n_samples = 512
batch_size = 256

X_train = datasets.MNIST(root='./data', train=True, download=True,
                         transform=transforms.Compose([transforms.ToTensor()]))

X_train.data = X_train.data[:n_samples]
X_train.targets = X_train.targets[:n_samples]

train_loader = torch.utils.data.DataLoader(X_train, batch_size=batch_size, shuffle=True)

n_samples_show = 6

data_iter = iter(train_loader)
fig, axes = plt.subplots(nrows=1, ncols=n_samples_show, figsize=(10, 3))

images, targets = next(data_iter)

while n_samples_show > 0:
    axes[n_samples_show - 1].imshow(images[n_samples_show].numpy().squeeze(), cmap='gray')
    axes[n_samples_show - 1].set_xticks([])
    axes[n_samples_show - 1].set_yticks([])
    axes[n_samples_show - 1].set_title("Labeled: {}".format(int(targets[n_samples_show])))
 
    n_samples_show -= 1

plt.show()

n_samples = 2048

X_test = datasets.MNIST(root='./data', train=False, download=True,
                        transform=transforms.Compose([transforms.ToTensor()]))

X_test.data = X_test.data[:n_samples]
X_test.targets = X_test.targets[:n_samples]

test_loader = torch.utils.data.DataLoader(X_test, batch_size=batch_size, shuffle=True)

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
        self.dropout = nn.Dropout2d()
        self.fc1 = nn.Linear(256, 64)
        self.fc2 = nn.Linear(64, 10)
 
        self.hybrid = nn.ModuleList([
            Hybrid(AerSimulator(method='automatic'), 100, np.pi / 2)
            for i in range(10)
        ])

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout(x)
        x = torch.flatten(x, start_dim=1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        x = torch.chunk(x, 10, dim=1)
 
        x = tuple([hy(x_) for hy, x_ in zip(self.hybrid, x)])
        return torch.cat(x, -1)


model = Net().to(device)

summary(model, (1, 28, 28))

In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_func = nn.CrossEntropyLoss().to(device)

epochs = 50
loss_list = []

model.train()
for epoch in range(epochs):
    total_loss = []
    for batch_idx, (data, target) in enumerate(train_loader):
        print(f'Epoch {epoch+1}, Batch {batch_idx+1}/2...', end='\r')
        optimizer.zero_grad()

        data = data.to(device)
        target = target.to(device)
 
        output = model(data)

        loss = loss_func(output, target)

        loss.backward()

        optimizer.step()
 
        total_loss.append(loss.item())
 
    loss_list.append(sum(total_loss)/len(total_loss))
    print('Training [{:.0f}%]\tLoss: {:.4f}'.format(
        100. * (epoch + 1) / epochs, loss_list[-1]))

In [None]:
plt.plot(loss_list)
plt.title('Hybrid NN Training Convergence')
plt.xlabel('Training Iterations')
plt.ylabel('Neg Log Likelihood Loss')
plt.show()

In [None]:
model.eval()
with torch.no_grad():
 
    correct = 0
    total_loss = []
 
    for batch_idx, (data, target) in enumerate(test_loader):
 
        data = data.to(device)
        target = target.to(device)

        output = model(data)
 
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
 
        loss = loss_func(output, target)
        total_loss.append(loss.item())
 
    print('Performance on test data:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%'.format(
        sum(total_loss) / len(total_loss),
        correct / len(test_loader) * 100 / batch_size)
Â        )

n_samples_show = 8
count = 0
fig, axes = plt.subplots(nrows=1, ncols=n_samples_show, figsize=(10, 3))

model.eval()
with torch.no_grad():
    for batch_idx, (data, target) in enumerate(test_loader):
        if count == n_samples_show:
            break
 
        data_cuda = data.to(device)
        target_cuda = target.to(device)

        output_cuda = model(data_cuda)
 
        pred = output_cuda.argmax(dim=1, keepdim=True)

        axes[count].imshow(data[count].numpy().squeeze(), cmap='gray')

        axes[count].set_xticks([])
        axes[count].set_yticks([])
        axes[count].set_title('Predicted {}'.format(pred[count].item()))
 
        count += 1

plt.show()