In [None]:
import numpy as np
import matplotlib.pyplot as plt
import time
from copy import copy

import torch
from torch.autograd import Function
from torchvision import datasets, transforms
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F

import qiskit
from qiskit.visualization import *
from qiskit_aer import Aer

In [None]:
class QMLCircuit:
    def __init__(self, n_qubits, backend):
        # 学習用量子回路の作成
        self._circuit = qiskit.QuantumCircuit(n_qubits, 1)
        self.n_params = 3 * (2 * n_qubits + 1)
        self.n_qubits = n_qubits
        self.all_qubits = [i for i in range(n_qubits)]
        self.params = [qiskit.circuit.Parameter('p{}'.format(i)) for i in range(self.n_params)]
        
        for qubit in self.all_qubits:
            self._circuit.u(self.params[3 * qubit],
                            self.params[3 * qubit + 1],
                            self.params[3 * qubit + 2], qubit)
        self._circuit.barrier()
        
        for qubit in self.all_qubits:
            control = qubit
            target = (qubit + 1) % n_qubits
            self._circuit.cu(self.params[3 * n_qubits + 3 * qubit],
                             self.params[3 * n_qubits + 3 * qubit + 1],
                             self.params[3 * n_qubits + 3 * qubit + 2],
                             0,
                             control_qubit = control, target_qubit = target)
        self._circuit.u(self.params[-3], self.params[-2], self.params[-1], self.n_qubits - 1)
        
        # 本例では状態ベクトルより期待値を計算しているため、測定ゲートはコメントアウトしています
        #self._circuit.measure(self.n_qubits - 1, 0)

        self.backend = backend
    
    def run(self, data, params):
        param_dict = {}
        params = tuple(params.detach().numpy())
        for i in range(self.n_params):
            param_dict[self.params[i]] = params[i]

        # データを振幅エンコーディングした量子回路を作成し、学習用量子回路と結合
        init_circ = qiskit.QuantumCircuit(self.n_qubits, 1)
        init_statevec = self.amplitude_embedding(data)
        init_circ.initialize(init_statevec, self.all_qubits)
        #self._circuit = init_circ + self._circuit ###########################
        self._circuit = init_circ
        
        #self.bound_circuit = self._circuit.bind_parameters(param_dict)
        self.bound_circuit = self._circuit.assign_parameters(param_dict)
        
        # 量子回路を実行
        job = qiskit.execute(self.bound_circuit,
                             self.backend, )
        
        # 状態ベクトルを取得し、期待値を計算
        outputstate = job.result().get_statevector(self.bound_circuit)
        expectation = np.sum((np.abs(outputstate)**2)[2**(self.n_qubits - 1):])
        
        return np.array([expectation])
    
    def amplitude_embedding(self, data):
        data = np.array(data, dtype = np.float)
        dim = 2 ** self.n_qubits
        if len(data) < dim:
            data = np.pad(data, (0, dim - len(data)), 'constant', constant_values=(0, 0))
        if np.sum(data**2) == 0:
            data += 1
        vec = data / np.sqrt(np.sum(data ** 2))
        return vec

In [None]:
n_qubits = 7
n_params = 3 * (2 * n_qubits + 1)
params = torch.nn.parameter.Parameter(torch.Tensor(n_params))
data = torch.Tensor(np.array([np.random.randint(0, 2) for i in range(2 ** n_qubits)]))
simulator = Aer.get_backend('statevector_simulator')

start = time.time()
circuit = QMLCircuit(n_qubits, simulator)
print('Expected value {}'.format(circuit.run(data, params)[0]))
print(time.time() - start)
circuit._circuit.draw('mpl')

In [None]:
class HybridFunction(Function):
    
    @staticmethod
    def forward(ctx, f, data, params):
        # 順伝播の計算
        def f_each(data, params):
            return torch.tensor([f(torch.flatten(d), params) for d in data], dtype=torch.float64)
        expectation_z = f_each(data, params)
        ctx.save_for_backward(data, params, expectation_z)
        ctx.f = f_each
        return expectation_z
        
    @staticmethod
    def backward(ctx, grad_output):
        # 逆伝播の計算
        data, params, res = ctx.saved_tensors
        delta = 0.001
        gradients = []
        for i in range(len(params)):
            params[i] += delta
            gradient  = torch.sum((ctx.f(data, params) - res) / delta * grad_output)
            params[i] -= delta
            gradients.append(gradient)
        return None, None, torch.Tensor(gradients), None

class Hybrid(nn.Module):
    # 量子回路レイヤーの定義
    def __init__(self, n_qubits, backend, shots):
        super(Hybrid, self).__init__()
        self.quantum_circuit = QMLCircuit(n_qubits, backend)
        
    def forward(self, data, params):
        def f(data, params):
            circ = copy(self.quantum_circuit)
            res = circ.run(data, params)
            return res
        return HybridFunction.apply(f, data, params)

In [None]:
# 学習用サンプルデータ
# MNISTから"0"と"1"を100サンプルずつ抜き出す
n_samples = 100

X_train = datasets.MNIST(root='./data', train=True, download=True,
                         transform=transforms.Compose([transforms.ToTensor()]))

idx = np.append(np.where(X_train.targets == 0)[0][:n_samples], 
                np.where(X_train.targets == 1)[0][:n_samples])

X_train.data = X_train.data[idx]
X_train.targets = X_train.targets[idx]

train_loader = torch.utils.data.DataLoader(X_train, batch_size=16, shuffle=True)

In [None]:
n_samples_show = 6

data_iter = iter(train_loader)
fig, axes = plt.subplots(nrows=1, ncols=n_samples_show, figsize=(10, 3))

while n_samples_show > 0:
    images, targets = data_iter.__next__()
    images = (torch.nn.AvgPool2d(3)(images))
    axes[n_samples_show - 1].imshow(images[0][0].numpy(), cmap='gray')
    axes[n_samples_show - 1].set_xticks([])
    axes[n_samples_show - 1].set_yticks([])
    axes[n_samples_show - 1].set_title("Labeled: {}".format(targets[0].item()))
    
    n_samples_show -= 1
print(images.shape)

In [None]:
n_samples = 50

X_test = datasets.MNIST(root='./data', train=False, download=True,
                        transform=transforms.Compose([transforms.ToTensor()]))

idx = np.append(np.where(X_test.targets == 0)[0][:n_samples], 
                np.where(X_test.targets == 1)[0][:n_samples])

X_test.data = X_test.data[idx]
X_test.targets = X_test.targets[idx].float()

test_loader = torch.utils.data.DataLoader(X_test, batch_size=16, shuffle=True)

In [None]:
class Net(nn.Module):
    def __init__(self, n_qubits, n_params):
        super(Net, self).__init__()
        self.hybrid = Hybrid(n_qubits, qiskit.Aer.get_backend('statevector_simulator'), 1)
        self.weight = torch.nn.parameter.Parameter(torch.Tensor(n_params))
        torch.nn.init.uniform_(self.weight, 0.0, 2 * np.pi)

    def forward(self, x):
        x = self.hybrid(x, self.weight)
        return x

In [None]:
n_qubits = 7
n_params = 3 * (2 * n_qubits + 1)
model = Net(n_qubits, n_params)
optimizer = optim.Adam(model.parameters(), lr=0.02)
loss_func = nn.MSELoss()

epochs = 40
loss_list = []

model.train()
for epoch in range(epochs):
    total_loss = []
    for batch_idx, (data, target) in enumerate(train_loader):
        data = (torch.nn.AvgPool2d(3)(data)).float()
        optimizer.zero_grad()
        # モデルの出力
        output = model(data)
        # 損失関数の計算
        loss = loss_func(output.squeeze(), target.double())
        # 誤差逆伝播
        loss.backward()
        # パラメータ更新
        optimizer.step()
        total_loss.append(loss.item())
    loss_list.append(sum(total_loss)/len(total_loss))
    print('Training [{:.1f}%]\tLoss: {:.4f}'.format(
        100. * (epoch + 1) / epochs, loss_list[-1]))

In [None]:
plt.plot(loss_list)
plt.title('Training Loss History')
plt.xlabel('Training Iterations')
plt.ylabel('MSE Loss')

In [None]:
model.eval()
with torch.no_grad():
    
    correct = 0
    n_sample = 0
    output_list = []
    target_list = []
    for batch_idx, (data, target) in enumerate(test_loader):
        data = (torch.nn.AvgPool2d(3)(data)).float()
        output = model(data)
        
        pred = (output > 0.5).squeeze().int()
        correct += torch.sum(pred == target).item()
        n_sample += len(target)
        
        output_list.append(output.squeeze())
        target_list.append(target.double())

    total_output = torch.cat(output_list)
    total_target = torch.cat(target_list)
    total_loss = loss_func(total_output, total_target)
    print('Performance on test data:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%'.format(
        total_loss.item(),
        correct / n_sample * 100)
        )

In [None]:
model.eval()
with torch.no_grad():
    
    correct = 0
    n_sample = 0
    output_list = []
    target_list = []
    for batch_idx, (data, target) in enumerate(train_loader):
        data = (torch.nn.AvgPool2d(3)(data)).float()
        output = model(data)
        
        pred = (output > 0.5).squeeze().int()
        correct += torch.sum(pred == target).item()
        n_sample += len(target)
        
        output_list.append(output.squeeze())
        target_list.append(target.double())

    total_output = torch.cat(output_list)
    total_target = torch.cat(target_list)
    total_loss = loss_func(total_output, total_target)
    print('Performance on train data:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%'.format(
        total_loss.item(),
        correct / n_sample * 100)
        )