In [1]:
import numpy as np
import torch.nn as nn
import torch

from qiskit  import Aer, QuantumCircuit
from qiskit.utils import QuantumInstance
from qiskit.opflow import AerPauliExpectation
from qiskit.circuit import Parameter
from qiskit.circuit.library import RealAmplitudes, ZZFeatureMap
from qiskit.primitives import Sampler
# from qiskit_algorithms.optimizers import COBYLA, L_BFGS_B
# from qiskit_algorithms.utils import algorithm_globals

import time

# from qiskit_machine_learning.algorithms.classifiers import NeuralNetworkClassifier, VQC
# from qiskit_machine_learning.algorithms.regressors import NeuralNetworkRegressor, VQR
# from qiskit_machine_learning.circuit.library import QNNCircuit
from qiskit_machine_learning.connectors import TorchConnector
# from qiskit_machine_learning.neural_networks import SamplerQNN, EstimatorQNN
from qiskit_machine_learning.neural_networks import CircuitQNN, TwoLayerQNN

from matplotlib import pyplot as plt
from IPython.display import clear_output

  from qiskit.opflow import AerPauliExpectation


In [2]:
# Define the XOR input and target data
XOR_INPUT = np.array([[0, 0, 0], [0, 1, 0], [0, 0, 1], [0, 1, 1]], dtype=np.float32)
XOR_TARGET = np.array([[0], [1], [1], [0]], dtype=np.float32)
OR_INPUT = np.array([[1, 0, 0], [1, 1, 0], [1, 0, 1], [1, 1, 1]], dtype=np.float32)
OR_TARGET = np.array([[0], [1], [1], [1]], dtype=np.float32)
AND_INPUT = np.array([[2, 0, 0], [2, 1, 0], [2, 0, 1], [2, 1, 1]], dtype=np.float32)
AND_TARGET = np.array([[0], [0], [0], [1]], dtype=np.float32)

In [3]:
# Convert the NumPy arrays to PyTorch tensors
inputsXOR = torch.from_numpy(XOR_INPUT).view(1, 4, 3)  # Add a batch and sequence dimension
targetsXOR = torch.from_numpy(XOR_TARGET).view(1, 4, 1)  # Add a batch and sequence dimension
inputsOR = torch.from_numpy(OR_INPUT).view(1, 4, 3)  # Add a batch and sequence dimension
targetsOR = torch.from_numpy(OR_TARGET).view(1, 4, 1)  # Add a batch and sequence dimension
inputsAND = torch.from_numpy(AND_INPUT).view(1, 4, 3)  # Add a batch and sequence dimension
targetsAND = torch.from_numpy(AND_TARGET).view(1, 4, 1)  # Add a batch and sequence dimension

In [4]:
class QLSTM(nn.Module):
    def __init__(self, 
                input_size: int, 
                hidden_size: int, 
                n_qubits: int=4,
                n_qlayers: int=1,
                batch_first=True,
                backend='statevector_simulator'):
        super(QLSTM, self).__init__()

        self.input_size = input_size
        self.hidden_size = hidden_size
        self.concat_size = input_size + hidden_size
        self.n_qubits = n_qubits
        self.n_qlayers = n_qlayers
        self.batch_first = batch_first
        
        self.clayer_in = nn.Linear(self.concat_size, n_qubits)
        self.clayer_out = nn.Linear(1, 1)

        self.qi = QuantumInstance(Aer.get_backend('statevector_simulator'))
        feature_map = ZZFeatureMap(self.n_qubits)
        ansatz = RealAmplitudes(self.n_qubits, reps=self.n_qlayers)

        self.qnn1 = TwoLayerQNN(self.n_qubits, feature_map, ansatz, exp_val=AerPauliExpectation(), quantum_instance=self.qi)
        self.qnn2 = TwoLayerQNN(self.n_qubits, feature_map, ansatz, exp_val=AerPauliExpectation(), quantum_instance=self.qi)
        self.qnn3 = TwoLayerQNN(self.n_qubits, feature_map, ansatz, exp_val=AerPauliExpectation(), quantum_instance=self.qi)
        self.qnn4 = TwoLayerQNN(self.n_qubits, feature_map, ansatz, exp_val=AerPauliExpectation(), quantum_instance=self.qi)

        self.qlayer = {
            'forget': TorchConnector(self.qnn1),
            'input': TorchConnector(self.qnn2),
            'update': TorchConnector(self.qnn3),
            'output': TorchConnector(self.qnn4)
        }
        # print("self.qlayer",self.qlayer)

    def forward(self, x, init_states=None):
        # print(x)
        # print(init_states)
        if self.batch_first is True:
            batch_size, seq_length, features_size = x.size()
        else:
            seq_length, batch_size, features_size = x.size()
        # print("x.size()",x.size())
        hidden_seq = []
        if init_states is None:
            h_t = torch.zeros(batch_size, self.hidden_size)  # hidden state (output)
            c_t = torch.zeros(batch_size, self.hidden_size)  # cell state
        else:
            # for now we ignore the fact that in PyTorch you can stack multiple RNNs
            # so we take only the first elements of the init_states tuple init_states[0][0], init_states[1][0]
            h_t, c_t = init_states
            h_t = h_t[0]
            c_t = c_t[0]
        # print("HT",h_t)
        # print("CT",c_t)
        for t in range(seq_length):
            # get features from the t-th element in seq, for all entries in the batch
            x_t = x[:, t, :]
            # print(">>> x_t", x_t)
            # print(">>> h_t", h_t)
            # Concatenate input and hidden state
            v_t = torch.cat((h_t, x_t), dim=1)
            # print(">>> v_t", v_t)
            # match qubit dimension
            y_t = self.clayer_in(v_t)
            # print(">>> y_t", y_t)

            
            # print(">>>", y_t.size(), self.qlayer['forget'](y_t))
            # print(">>> forget", self.qlayer['forget'](y_t))
            # print(">>> input", self.qlayer['input'](y_t))
            # print(">>> update", self.qlayer['update'](y_t))
            # print(">>> output", self.qlayer['output'](y_t))

            # print("***>>>self.clayer_out(self.qlayer['forget'](y_t))***", self.clayer_out(self.qlayer['forget'](y_t)))

            f_t = torch.sigmoid(self.clayer_out(self.qlayer['forget'](y_t)))  # forget block
            # print("***>>>***", f_t.size())
            i_t = torch.sigmoid(self.clayer_out(self.qlayer['input'](y_t)))  # input block
            # print("***>>>***", i_t.size())
            g_t = torch.tanh(self.clayer_out(self.qlayer['update'](y_t)))  # update block
            # print("***>>>***", g_t.size())
            o_t = torch.sigmoid(self.clayer_out(self.qlayer['output'](y_t))) # output block
            # print("***>>>***", o_t.size())

            c_t = (f_t * c_t) + (i_t * g_t)
            h_t = o_t * torch.tanh(c_t)

            hidden_seq.append(h_t.unsqueeze(0))
        hidden_seq = torch.cat(hidden_seq, dim=0)
        hidden_seq = hidden_seq.transpose(0, 1).contiguous()
        return hidden_seq, (h_t, c_t)

In [5]:
# input_dim = feature
class MultiTask_Network(nn.Module):
    def __init__(self, hidden_dim,
                output_dim_0, output_dim_1, output_dim_2,
                input_size: int, 
                hidden_size: int, 
                n_qubits: int=4,
                n_qlayers: int=1,
                batch_first=True,
                backend='statevector_simulator'):
        
        super(MultiTask_Network, self).__init__()

        self.input_size = input_size
        self.hidden_size = hidden_size
        self.concat_size = input_size + hidden_size
        self.n_qubits = n_qubits
        self.n_qlayers = n_qlayers
        self.batch_first = batch_first

        self.clayer_in = nn.Linear(self.concat_size, n_qubits)

        self.qrnn = QLSTM(input_size, hidden_size, n_qubits=n_qubits, n_qlayers=n_qlayers, backend=backend)
        
        self.clayer_out = nn.Linear(n_qubits, hidden_dim)
        # self.clayer_out_1 = nn.Linear(n_qubits, hidden_dim)     
        # self.clayer_out_2 = nn.Linear(n_qubits, hidden_dim)

        self.final_0 = nn.Linear(hidden_dim, output_dim_0)
        self.final_1 = nn.Linear(hidden_dim, output_dim_1)     
        self.final_2 = nn.Linear(hidden_dim, output_dim_2)   
        
    def forward(self, x : torch.Tensor, task_id : int):
        x, _ = self.qrnn(x)
        x = torch.sigmoid(x)
        if task_id == 0:
            xf = self.clayer_out(x)
            x = self.final_0(xf)
        elif task_id == 1:
            xf = self.clayer_out(x)
            x = self.final_1(xf)
        elif task_id == 2:
            xf = self.clayer_out(x)
            x = self.final_2(xf)
        else:
            assert False, 'Bad Task ID passed'
            
        return x
    

In [11]:
model = MultiTask_Network(250,
    output_dim_0=1, output_dim_1=1, output_dim_2=1,
    input_size=3, 
    hidden_size=3, 
    n_qubits=3,
    n_qlayers=3,
    batch_first=True,
    backend='statevector_simulator')

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr = 1e-3)

# Training the RNN
epochs = 1000

  self.qi = QuantumInstance(Aer.get_backend('statevector_simulator'))
  self.qnn1 = TwoLayerQNN(self.n_qubits, feature_map, ansatz, exp_val=AerPauliExpectation(), quantum_instance=self.qi)
  self.qnn2 = TwoLayerQNN(self.n_qubits, feature_map, ansatz, exp_val=AerPauliExpectation(), quantum_instance=self.qi)
  self.qnn3 = TwoLayerQNN(self.n_qubits, feature_map, ansatz, exp_val=AerPauliExpectation(), quantum_instance=self.qi)
  self.qnn4 = TwoLayerQNN(self.n_qubits, feature_map, ansatz, exp_val=AerPauliExpectation(), quantum_instance=self.qi)


In [12]:
losses_per_epoch = []
for epoch in range(epochs):
    # Forward pass
    outputsXOR = model(inputsXOR, task_id = 0)
    lossXOR = criterion(outputsXOR, targetsXOR)
    outputsOR = model(inputsOR, task_id = 1)
    lossOR = criterion(outputsOR, targetsOR)
    outputsAND = model(inputsAND, task_id = 2)
    lossAND = criterion(outputsAND, targetsAND)

    loss = lossXOR + lossOR + lossAND
    losses_per_epoch.append(loss.item())

    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Print the loss every 100 epochs
    if epoch % 100 == 0:
        print(f'Epoch {epoch}, Loss: {loss.item()}')

Epoch 0, Loss: 2.7932498455047607
Epoch 100, Loss: 0.5515503883361816
Epoch 200, Loss: 0.44457605481147766
Epoch 300, Loss: 0.3646758198738098
Epoch 400, Loss: 0.32860812544822693
Epoch 500, Loss: 0.3051535487174988
Epoch 600, Loss: 0.2876550853252411
Epoch 700, Loss: 0.2713548243045807
Epoch 800, Loss: 0.25345638394355774
Epoch 900, Loss: 0.23456397652626038


In [8]:
with torch.no_grad():
    test_outputs = model(inputsXOR, task_id = 0)
    for i in range(len(XOR_INPUT)):
        input_data = XOR_INPUT[i]
        output = test_outputs[0, i, 0].item()
        target = XOR_TARGET[i, 0]
        print(f'Input: {input_data}, Output: {output}, Target: {target}')

Input: [0. 0. 0.], Output: 0.4847990572452545, Target: 0.0
Input: [0. 1. 0.], Output: 0.4988959729671478, Target: 1.0
Input: [0. 0. 1.], Output: 0.5066404342651367, Target: 1.0
Input: [0. 1. 1.], Output: 0.5162375569343567, Target: 0.0


In [9]:
with torch.no_grad():
    test_outputs = model(inputsOR, task_id = 1)
    for i in range(len(XOR_INPUT)):
        input_data = OR_INPUT[i]
        output = test_outputs[0, i, 0].item()
        target = OR_TARGET[i, 0]
        print(f'Input: {input_data}, Output: {output}, Target: {target}')

Input: [1. 0. 0.], Output: 0.7278559803962708, Target: 0.0
Input: [1. 1. 0.], Output: 0.765399694442749, Target: 1.0
Input: [1. 0. 1.], Output: 0.7632008790969849, Target: 1.0
Input: [1. 1. 1.], Output: 0.7835854291915894, Target: 1.0


In [10]:
with torch.no_grad():
    test_outputs = model(inputsAND, task_id = 2)
    for i in range(len(XOR_INPUT)):
        input_data = AND_INPUT[i]
        output = test_outputs[0, i, 0].item()
        target = AND_TARGET[i, 0]
        print(f'Input: {input_data}, Output: {output}, Target: {target}')

Input: [2. 0. 0.], Output: 0.21310588717460632, Target: 0.0
Input: [2. 1. 0.], Output: 0.2569143772125244, Target: 0.0
Input: [2. 0. 1.], Output: 0.2654947340488434, Target: 0.0
Input: [2. 1. 1.], Output: 0.2881806492805481, Target: 1.0
