In [None]:
import warnings
from sklearn.datasets import load_boston

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torch
from tqdm.notebook import tqdm

In [None]:
import pennylane as qml
import numpy as np
import torch
from torch.autograd import Variable

import functools
from collections import OrderedDict


In [None]:
# hyperparameters
batch_size = 4
epochs = 100

total_sample = 506
train_sample = 404

In [None]:

def gen_heatmap(_model, x1_num=100, x2_num=100, name='model'):
    pred = []
    for i in range(x1_num):
        temp_history = []
        for j in range(x2_num):
            if name == 'Hybrid MLP':
                x1 = np.pi /2 * i/x1_num
                x2 = np.pi /2 * j/x2_num
            else:
                x1 = i/x1_num * 5 + 4
                x2 = j/x2_num  * 40
            y_pred = _model(torch.tensor([x1, x2], dtype=torch.float32))

            prediction = y_pred.detach().item()

            temp_history.append(prediction)
        pred.append(temp_history)

    # make the plot more intuitive
    # horizon direction: LSTAT
    # vertical direction: number of room
    pred = list(reversed(pred))
    

    
    # return heatmap
    return pred

def display_heatmap(pred, name='model'):
    plt.xticks([])
    plt.yticks([])

    plt.xlabel('LSTAT')
    plt.ylabel('# room')
    plt.title(name)
    # plt.title(name).set_color("white")

    im = plt.imshow(pred, vmin=5, vmax=45, cmap=plt.cm.get_cmap('ocean'))
    # im = plt.imshow(pred, vmin=11, vmax=49, cmap=plt.cm.get_cmap('ocean'))
    
    plt.rcParams['axes.grid'] = False
    # this stop the warning
    
    plt.colorbar(im)
    
    plt.title(name)
    
    plt.show()

In [None]:

loss_eval = torch.nn.L1Loss()

# use L1 loss
def eval_model(_model, _data_loader=None, epoch_num=-1, scale=1., history_list=None, store_history=True):
    _loss = 0
    for xs, ys in _data_loader:
        pred = _model(xs * scale)
        _loss += loss_eval(pred, ys).detach()
        
    avg_loss = _loss
    if store_history:
        history_list.append(avg_loss)
    return history_list

Multi layer perceptron
=======================================

![title](img/regression_mlp.png)

In [None]:
def train_mlp():
    with warnings.catch_warnings(): 
    # You should probably not use this dataset.
        warnings.filterwarnings("ignore")
        X, y = load_boston(return_X_y=True)
        
        # to tensor, and only select # numnber of rooms and LSTAT as input X
        X = torch.tensor(X[:, [5, 12]]).float()
        y = np.float32(y.reshape((total_sample, 1)))
    
    eval_loss_history = []
    # sequential model
    fc_1 = torch.nn.Linear(2, 4)
    fc_2 = torch.nn.Linear(4, 16)
    fc_3 = torch.nn.Linear(16, 1)
    relu = torch.nn.ReLU()
    layers = [fc_1, relu, fc_2, relu, fc_3]
    model = torch.nn.Sequential(*layers)


    opt = torch.optim.Adam(model.parameters(), lr=0.003)
    loss = torch.nn.MSELoss()

    data_loader = torch.utils.data.DataLoader(
        list(zip(X, y))[:train_sample], batch_size=batch_size, shuffle=True, drop_last=True
    )

    test_loader = torch.utils.data.DataLoader(
        list(zip(X, y))[train_sample:], batch_size=batch_size, shuffle=True, drop_last=True
    )

    # model & traning

    for epoch in range(epochs):
        _loss = 0

        for xs, ys in data_loader:
            # print(xs.dtype)
            # print(ys.dtype)
            opt.zero_grad()

            loss_evaluated = loss(model(xs), ys)
            loss_evaluated.backward()
            _loss += loss_evaluated

            opt.step()

        # accuracy = test(model, X[train_sample:], y[train_sample:], logits=True)
        # acc.append(accuracy)

        eval_loss_history = eval_model(model, _data_loader=test_loader, epoch_num=epoch, history_list=eval_loss_history)
    return [eval_loss_history, gen_heatmap(model, name='MLP')]

### Hybrid Multi layer perceptron
=======================================

![title](img/regression_hmlp.png)

In [None]:
def train_hybrid():
    h_eval_loss_history = []
    n_qubits = 4
    dev = qml.device("default.qubit", wires=n_qubits)

    @qml.qnode(dev, interface="torch")
    def qnode(inputs, weights):
        qml.RX(inputs[0], wires=0)
        qml.RX(inputs[1], wires=2)

        qml.CNOT(wires=[0, 1])
        qml.CNOT(wires=[1, 2])
        qml.CNOT(wires=[2, 3])
        qml.CNOT(wires=[3, 0])

        qml.RY(weights[0, 0], wires=0)
        qml.RY(weights[0, 1], wires=1)
        qml.RY(weights[0, 2], wires=2)
        qml.RY(weights[0, 3], wires=3)

        qml.CNOT(wires=[0, 1])
        qml.CNOT(wires=[1, 2])
        qml.CNOT(wires=[2, 3])
        qml.CNOT(wires=[3, 0])

        return [qml.expval(qml.PauliZ(wires=0)), 
                qml.expval(qml.PauliZ(wires=1)),
                qml.expval(qml.PauliZ(wires=2)),
                qml.expval(qml.PauliZ(wires=3))]


    # hybrid circuits, sequential model
    fc_test = torch.nn.Linear(4, 40, bias=False)
    # fc_1 = torch.nn.Linear(4, 4, bias=False)
    fc_2 = torch.nn.Linear(40, 1)
    relu = torch.nn.ReLU()
    torch.nn.init.uniform_(fc_2.weight, a=-50, b=50)

    # pennylane cirtuit convertion, to pytorch layer
    n_layers = 1
    weight_shapes = {"weights": (n_layers, n_qubits)}
    qlayer = qml.qnn.TorchLayer(qnode, weight_shapes, init_method={'weights': functools.partial(torch.nn.init.uniform_, b=np.pi*0.6)})

    # h_layers = [qlayer, fc_1, relu, fc_2]
    h_layers = OrderedDict(([
                ('qlayer', qlayer),
                ('fc_test', fc_test),
                ('relu', relu),
                # ('fc_1', fc_1), 
                # ('relu', relu), 
                ('fc_2', fc_2)]))
    h_model = torch.nn.Sequential(h_layers)

    opt = torch.optim.Adam(h_model.parameters(), lr=0.003)
    loss = torch.nn.MSELoss()

    # handle input different comparing with MLP, use input as radian

    def NormalizeData(data):
        data = np.transpose(data)
        data[0] = (data[0] - np.min(data[0])) / (np.max(data[0]) - np.min(data[0]))
        data[1] = (data[1] - np.min(data[1])) / (np.max(data[1]) - np.min(data[1]))
        return np.transpose(data)

    def StandardizeData(data):
        data = np.transpose(data)
        mean = [np.mean(data[0]), np.mean(data[1])]
        std = [np.std(data[0]), np.std(data[1])]
        data[0] = (data[0] - mean[0]) - std[0]
        data[1] = (data[1] - mean[1]) - std[1]
        return np.transpose(data)

    def MinMaxScaler(data):
        data = np.transpose(data)
        std = [np.std(data[0]), np.std(data[1])]
        data[0] = std[0] * (np.max(data[0]) - np.min(data[0])) + np.min(data[0])
        data[1] = std[1] * (np.max(data[1]) - np.min(data[1])) + np.min(data[1])
        return np.transpose(data)

    def robustScaler(data):
        data = np.transpose(data)
        q1 = [np.quantile(data[0], 0.25), np.quantile(data[1], 0.25)]
        q3 = [np.quantile(data[0], 0.25), np.quantile(data[1], 0.75)]
        data[0] = (data[0]-q1[0])/(q3[0] - q1[0])
        data[1] = (data[1]-q1[1])/(q3[1] - q1[1])
        return np.transpose(data)

    def hybrid_dataloader():
        with warnings.catch_warnings(): 
            # You should probably not use this dataset.
            warnings.filterwarnings("ignore")
            X, y = load_boston(return_X_y=True)
            # # to tensor, and only select # numnber of rooms and LSTAT as input X
            X = X[:, [5, 12]]
            X = NormalizeData(X)
            # y = NormalizeData(y)
            # plt.scatter(X[:, 0], X[:, 1], c=y)
            X = torch.tensor(X).float()
            # print(y.dtype)
            y = torch.tensor(y.reshape((total_sample, 1))).float()
            # print(y.dtype)

            return [torch.utils.data.DataLoader(
            list(zip(X, y))[:train_sample], batch_size=batch_size, shuffle=True, drop_last=True
            ),
                   torch.utils.data.DataLoader(
            list(zip(X, y))[train_sample:], batch_size=batch_size, shuffle=True, drop_last=True
            )]

    # model & traning

    flag = True

    for epoch in range(epochs):
        _loss = 0

        for xs, ys in hybrid_dataloader()[0]:

            xs *= np.pi/2

            opt.zero_grad()

            loss_evaluated = loss(h_model(xs), ys)
            loss_evaluated.backward()
            _loss += loss_evaluated

            opt.step()

        h_eval_loss_history = eval_model(h_model, _data_loader=hybrid_dataloader()[1], scale=np.pi/2, epoch_num=epoch, history_list=h_eval_loss_history)
    return [h_eval_loss_history, gen_heatmap(h_model, name='Hybrid MLP')]

Comparision
=======================================

In [None]:
runs = 3

In [None]:
mlp_loss = 0
mlp_heatmap = 0
hmlp_loss = 0
hmlp_heatmap = 0

# size: [runs, 2, epoch, 100, 100]
for i in range(runs):
    result_mlp = train_mlp()
    result_hybrid = train_hybrid()
    
    mlp_loss += np.asarray(result_mlp[0])
    mlp_heatmap += np.asarray(result_mlp[1])
    hmlp_loss += np.asarray(result_hybrid[0])
    hmlp_heatmap += np.asarray(result_hybrid[1])

mlp_loss /= runs
mlp_heatmap /= runs
hmlp_loss /= runs
hmlp_heatmap /= runs

In [None]:
# plot benchmark comparing between mlp and hmlp

plt.style.use("bmh")

fig, ax = plt.subplots(1, 1, figsize=(6, 4))

ax.set_ylabel("Loss")
ax.set_xlabel("Epoch #")

ax.plot(mlp_loss, '.-', label="MLP loss")
ax.plot(hmlp_loss, '.-', label="H_MLP loss")
# ax.plot(*, '.-', label="Backprop")
ax.set_ylabel("Loss")
ax.set_xlabel("Epoch #")

ax.legend()
plt.show()

display_heatmap(mlp_heatmap, name='MLP')
display_heatmap(hmlp_heatmap, name='Hybrid MLP')