In [None]:
import numpy as np
import torch
from torch.utils.data import DataLoader, TensorDataset, random_split
import matplotlib.pyplot as plt
import pennylane as qml

from sklearn.metrics import accuracy_score

In [None]:
#CONTROL PARAMETERS CELL

name = ["Lagos","Perth"]
seed = 25

split_size = 0.8
batch_size =  40

n_qubits = 2

hidden_layers =  [4]
dropout = [0.]
learning_rate = 5e-5

step_size = 50
gamma = 0.75

epochs = 500

In [None]:
np.random.seed(seed)
torch.manual_seed(seed)

data = np.load('Dati/dataset_{}_x_{}_1000.npy'.format(name[0],name[1]))
label = np.load('Dati/labels_{}_x_{}_1000.npy'.format(name[0],name[1]))

indices = np.random.permutation(data.shape[0])
data = data[indices]
label = label[indices]

data = torch.tensor(data, dtype=torch.float32)
label = torch.tensor(label, dtype=torch.float32)

label = torch.reshape(label, [label.shape[0], 1])

print( len(label))

In [None]:
dataset = TensorDataset(data,label)
train_size = int( split_size * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Create DataLoaders for training and validation sets
train_loader = DataLoader(train_dataset, batch_size, shuffle=True, worker_init_fn= np.random.seed(seed) )
val_loader = DataLoader(val_dataset, batch_size, shuffle=False , worker_init_fn= np.random.seed(seed))

In [None]:
dev = qml.device("default.qubit", wires=n_qubits)

In [None]:
@qml.qnode(dev, interface='torch')
def ZeroEntanlerCircuit(inputs, weights):
    qml.AmplitudeEmbedding(inputs, wires=range(n_qubits), pad_with=0.0, normalize= True)
    for i in range(n_qubits):
        qml.RX(weights[i] , wires=i)
    return [qml.expval(qml.PauliZ(wires=i)) for i in range(n_qubits)]


weight_shapes = {"weights": (n_qubits,)}

In [None]:
@qml.qnode(dev, interface='torch')
def EntanlerCircuit(inputs, weights):
    qml.AmplitudeEmbedding(inputs, wires=range(n_qubits), pad_with=0.0, normalize= True)
    for i in range(n_qubits):
        qml.RX(weights[i] , wires=i)

    qml.CNOT(wires=[0,1])
    qml.CNOT(wires=[1,2])
    
    return [qml.expval(qml.PauliZ(wires=i)) for i in range(n_qubits)]

weight_shapes = {"weights": (n_qubits,)}

In [None]:

#qlayer = qml.qnn.TorchLayer( ZeroEntanlerCircuit, weight_shapes)
qlayer = qml.qnn.TorchLayer( EntanlerCircuit, weight_shapes)

In [None]:
class BlockLayer(torch.nn.Module):
    def __init__(self,input_feat, output_feat, d_out):
        super(BlockLayer,self).__init__()
        self.block = torch.nn.Sequential(
            torch.nn.Linear( input_feat, output_feat),
            torch.nn.ReLU(),
            torch.nn.Dropout( d_out )
        )
    def forward(self, x):
        x = self.block(x)
        return x


class HybridNetwork(torch.nn.Module):
    def __init__(self, hidden_layers, d_out):
        super(HybridNetwork, self).__init__()
        self.layers = torch.nn.ModuleList()

        self.layers.append( torch.nn.Linear( len(data[0])  , hidden_layers[0]) )
        self.layers.append(torch.nn.ReLU())
        self.layers.append(torch.nn.Dropout(d_out[0]))

        for i in range (len(hidden_layers) - 1):
            self.layers.append( BlockLayer( hidden_layers[i], hidden_layers[i+1], d_out[i+1]) )

        last_layer_len = len(hidden_layers) - 1
        self.layers.append( torch.nn.Linear( hidden_layers[last_layer_len], n_qubits) )
        self.layers.append(torch.nn.Tanh())
        self.layers.append(qlayer)
        self.layers.append(torch.nn.Linear(n_qubits,1))

        
        self._initialize_weights()

    def _initialize_weights(self):
        for layer in self.layers:
            if isinstance(layer, qml.qnn.TorchLayer):
                torch.nn.init.uniform_(layer.weights, -np.pi, np.pi)
                break   
    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x
      
      
     

In [None]:
def init_weights_he(m):
    if isinstance(m, torch.nn.Linear):
        torch.nn.init.kaiming_normal_(m.weight, nonlinearity='relu')  # Inizializzazione He (normale)
        if m.bias is not None:
            torch.nn.init.zeros_(m.bias)  # Inizializza i bias a zero

In [None]:
model = HybridNetwork( hidden_layers , dropout )
model.apply(init_weights_he)

opt = torch.optim.Adam(model.parameters(), learning_rate)
lossFunction = torch.nn.BCEWithLogitsLoss()
scheduler = torch.optim.lr_scheduler.StepLR(opt, step_size= step_size, gamma= gamma)


In [None]:
def evaluate_model( model , loader , loss_fn  ):
    
    all_outputs = []
    all_labels = []

    running_loss = 0
    with torch.no_grad():
        for id, (data, label) in enumerate(loader):
            output = model(data)
            loss = loss_fn(output,label)

            running_loss += loss.item()

            prob = torch.sigmoid(output)
            all_outputs.append(prob)
            all_labels.append(label)

        losses = running_loss/(id+1)
        all_outputs_tensor = torch.cat(all_outputs)
        all_labels_tensor = torch.cat(all_labels)

        predicted = torch.round(all_outputs_tensor)
       
        accuracy = accuracy_score(all_labels_tensor.numpy(), predicted.numpy())

        return accuracy , losses

In [None]:
# This is the actual model training (and validation). It may take a while, depending on:
# epoch number, network architecture, dataset size.

train_losses = []
val_losses = []

train_accuracies = []
val_accuracies = []


# training the model
for i in range(epochs):

    train_running_loss = 0

    model.train()  # Set the model to training mode
    for id_batch, (data, label) in enumerate(train_loader):
        output = model(data)
        loss = lossFunction(output, label)
        loss.backward()
        opt.step()
        opt.zero_grad(set_to_none=True)

        train_running_loss += loss.item()
   
    train_losses.append(train_running_loss/(id_batch+1))
    scheduler.step()
    
    model.eval()
    train_acc , train_loss = evaluate_model( model , train_loader, lossFunction)
    val_acc , val_loss = evaluate_model( model , val_loader, lossFunction)

    train_accuracies.append(train_acc)
    val_accuracies.append(val_acc)
    val_losses.append(val_loss)
      
    if (i+1)%10==0 :
        print(
            "Epoch: {}\tTraining Loss: {:.4f}\tVal Loss: {:.4f}\tTraining Accuracy: {:.2f}%\tValidation Accuracy: {:.2f}%".format(
                i + 1, train_losses[i], val_loss, 100 * train_acc, 100 * val_acc)
            )
    
print(' ')
print( f'train parameter: ')
print(f'batch_size: {batch_size} \t hidden layer: {hidden_layers} \t dropout: {dropout}')
print(f'lr: {learning_rate} \t step size: {step_size} \t gamma: {gamma}')
print(f'epochs: {epochs}')

In [None]:
print(100*max(val_accuracies))

In [None]:
plt.plot(train_losses)
plt.plot(val_losses)
plt.legend(["Loss on train", "Loss on validation"])
plt.xlabel("Epoch")
plt.ylabel("Loss")

In [None]:
plt.plot(train_accuracies)
plt.plot(val_accuracies)
plt.legend(["Accuracies on train", "Accuracies on validation"])
plt.xlabel("Epoch")
plt.ylabel("Accuracy")