In [2]:
import torch
import torch.nn.functional as F  # functions of the neural network library
import load_dataset as load  # module with function to load data
from Conv_Layer import *
from torch.nn import AdaptiveAvgPool2d
from Pooling import *
import gc
import time
from Dense import *

In [3]:
def float_to_int(x):
    # Check if the decimal part is non-zero
    if x - x.int() > 0.5:
        return x.int() + 1
    else:
        return x.int()

def batch_float_to_int(x):
    return torch.tensor([float_to_int(xi) for xi in x])

def copy_images(images, number_copies):
    """ 
    Args:
        images: tensor batch * I * I

    Returns: tensor batch * number_copies * I * I
    """
    images_unsqueezed = images.unsqueeze(1)
    images_repeated = images_unsqueezed.repeat(1, number_copies, 1, 1)
    return images_repeated


def copy_images_bottom_channel(images, J):
    images = images.unsqueeze(1)
    upscaled_x = F.interpolate(images, size=(images.size()[-1]*J, images.size()[-1]*J), mode='nearest')
    upscaled_x = upscaled_x.squeeze(1)
    return upscaled_x

# conv = Conv_RBS_density_I2_3D(I,K,J,device)
# 
# for batch_idx, (data, target) in enumerate(train_loader):
#     new_size = I
#     adaptive_avg_pool = AdaptiveAvgPool2d((new_size, new_size))
#     data = adaptive_avg_pool(data).to(device)
#     init_density_matrix = to_density_matrix(F.normalize(data.squeeze().resize(data.size()[0],I**2), p=2, dim=1).to(device), device)
#     print(init_density_matrix.shape)
#     copied_density_matrix = copy_images_bottom_channel(init_density_matrix, J)
#     print(copied_density_matrix.shape)
#     conv(copied_density_matrix)
#     break



def normalize_and_scale(tensor):
    # Assuming the tensor is 10x1 and values roughly from -100 to 100
    min_val = tensor.min()
    max_val = tensor.max()
    normalized_tensor = (tensor - min_val) / (max_val - min_val)
    scaled_tensor = normalized_tensor * 9
    return scaled_tensor


class MeasureLayer(nn.Module):
    def __init__(self, batch_size, I, J, device):
        super().__init__()
        # Initialize the tensor as a parameter, requiring gradients
        self.observable = nn.Parameter(torch.randn(batch_size, I*I*J, I*I*J), requires_grad=True).to(device)
        self.device = device
        self.batch_size = batch_size

    def forward(self, rho):
        """
        Args:
            rho: tensor size batch * dimension * dimension
        Returns: list of prediction, float
        """
        rho = rho.to(self.device)
        output = torch.stack([torch.trace(torch.matmul(rho[i], self.observable[i])).float().to(self.device) for i in range(self.batch_size)]).to(device)
        return normalize_and_scale(output)


class MeasureHWLayer(nn.Module):
    def __init__(self, batch_size, I, J, device):
        super().__init__()
        # Initialize the tensor as a parameter, requiring gradients
        self.observable = nn.Parameter(torch.randn(batch_size, int(binom(I+I+J, k)), int(binom(I+I+J, k))), requires_grad=True).to(device)
        self.device = device
        self.batch_size = batch_size

    def forward(self, rho):
        """
        Args:
            rho: tensor size batch * dimension * dimension
        Returns: list of prediction, float
        """
        rho = rho.to(self.device)
        output = torch.stack([torch.trace(torch.matmul(rho[i], self.observable[i])).float().to(self.device) for i in range(self.batch_size)]).to(device)
        return normalize_and_scale(output)



class TrainedMeasureLayer(nn.Module):
    def __init__(self, batch_size, observable, device):
        super().__init__()
        # Initialize the tensor as a parameter, requiring gradients
        self.observable = observable.to(device)
        self.device = device
        self.batch_size = batch_size

    def forward(self, rho):
        """
        Args:
            rho: tensor size batch * dimension * dimension
        Returns: list of prediction, float
        """
        rho = rho.to(self.device)
        output = torch.stack([torch.trace(torch.matmul(rho[i], self.observable[i])).float().to(self.device) for i in range(self.batch_size)]).to(device)
        return normalize_and_scale(output)


class MeasureLayerTrained(nn.Module):
    def __init__(self, batch_size, trained_observable_diagonal, device):
        super().__init__()
        self.diagonal_element = trained_observable_diagonal
        self.device = device
        self.batch_size = batch_size

    def forward(self, rho):
        """
        Args:
            rho: tensor size batch * dimension * dimension
        Returns: list of prediction, float
        """
        rho = rho.to(self.device)
        output = torch.stack([torch.trace(torch.matmul(rho[i], torch.diag(self.diagonal_element))).float().to(self.device) for i in range(self.batch_size)]).to(device)
        return normalize_and_scale(output)


class Tomograph_state(nn.Module):
    def __init__(self, out, device):
        super().__init__()
        self.out = out
        self.device = device

    def forward(self, input):
        data = F.relu(input[:,-self.out:,-self.out:])
        return F.normalize(data, p=2, dim=1).to(self.device)


def map_HW_to_distribution(batch_x, I, J, k, batch, device):
    output = I + I + J
    diagonal_x = torch.stack([torch.diag(x) for x in batch_x]).to(device)
    y = torch.zeros(batch, output).to(device)
    map = map_RBS(I + I + J, k)
    for b in range(batch):
        for key, value in map.items():
            coef = diagonal_x[b][value]
            y[b][key[0]] += coef
            y[b][key[1]] += coef
            y[b][key[2]] += coef
    return y


def map_HW_to_measure(batch_x, I, J, k, batch, device):
    return torch.stack([torch.diag(x) for x in batch_x]).to(device)


def get_reduced_layers_structure(n, out):
    list_gates = []
    PQNN_param_dictionary, PQNN_dictionary, PQNN_layer = PQNN_building_brick(0, 8, index_first_RBS=0, index_first_param=0)
    for x, y in PQNN_dictionary.items():
        list_gates.append((y,y+1))
    list_gates.reverse()
    # print(list_gates)

    list_gates_delete = []
    PQNN_param_dictionary, PQNN_dictionary, PQNN_layer = PQNN_building_brick(0, n-out, index_first_RBS=0, index_first_param=0)
    for x, y in PQNN_dictionary.items():
        list_gates_delete.append((y,y+1))
    # print(list_gates_delete) 

    for e in list_gates_delete:
        list_gates.remove(e)
    list_gates.reverse()
    return list_gates

# # Example of using the class
# I = 2
# J = I# Dimension of the tensor
# dimension = I*I*J
# batch_size = 10
# device = torch.device("mps")

# model = MeasureLayer(batch_size, I, J, device_cpu)
# # x = torch.eye(dimension,dimension).to(device_cpu)
# x = torch.randn(10, dimension,dimension).to(device)
# y = torch.tensor((5,5,5,5,5,5,5,5,5,5), dtype=torch.float32).to(device_cpu)
# print(model(x))
# criterion = torch.nn.MSELoss(reduction='sum')
# optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# for t in range(2000):
#     model.train()
#     # Forward pass: Compute predicted y by passing x to the model
#     y_pred = model(x)
#     # Compute and print loss
#     loss = criterion(y_pred.to(device_cpu), y)
#     # if t % 100 == 99:
#     #     print(t, loss.item())
# 
#     # Zero gradients, perform a backward pass, and update the weights.
#     optimizer.zero_grad()
#     # loss.requires_grad = True
#     loss.backward()
#     optimizer.step()
# 
# print(model(x))

In [30]:
# import torch
# 
# def transform_input(input_tensor, output_size):
#     batch, input_size = input_tensor.shape
# 
#     step = input_size // output_size
#     # Initialize an output tensor
#     output_tensor = torch.zeros((batch, output_size), dtype=input_tensor.dtype)
#     for i in range(output_size):
#         # Sum up 'step' number of elements for each segment of the input
#         output_tensor[:, i] = input_tensor[:, i*step:(i+1)*step].sum(dim=1)
#     return output_tensor
# 
# # Example usage
# batch = 2
# input_size = 22
# output_size = 10
# input_tensor = torch.randn(batch, input_size)
# output_tensor = transform_input(input_tensor, output_size)
# print(input_tensor)
# print(output_tensor)


In [21]:
# def map_HW_to_distribution(batch_x, I, J, k, batch, device):
#     output = I+I+J
#     diagonal_x = torch.stack([torch.diag(x) for x in batch_x]).to(device)
#     y = torch.zeros(batch,output).to(device)
#     map = map_RBS(I+I+J,k)
#     for b in range(batch):
#         for key, value in map.items():
#             coef = diagonal_x[b][value]
#             y[b][key[0]] += coef
#             y[b][key[1]] += coef
#             y[b][key[2]] += coef
#     return y
# 
# def map_HW_to_measure(batch_x, I, J, k, batch, device):
#     return torch.stack([torch.diag(x)[-10:] for x in batch_x]).to(device)
# I = 3
# J = 2
# k = 3
# batch = 1
# device = torch.device("mps")
# size = int(binom(I+I+J,k))
# batch_x = torch.randn(batch,size,size).to(device)
# print(batch_x.shape)
# map_HW_to_distribution(batch_x, I, J, k, batch, device).shape

torch.Size([1, 56, 56])


torch.Size([1, 8])

In [5]:
# I = 3
# J = 3
# k = 3
# batch = 2
# device = torch.device("mps")
# size = int(binom(I+I+J,k))
# batch_x = torch.randn(batch,size,size).to(device)
# print(batch_x.shape)
# map_HW_to_measure(batch_x, I, J, k, batch, device).shape

torch.Size([2, 84, 84])


torch.Size([2, 10])

In [4]:
# import torch
# 
# 
# 
# # Example
# input_tensor = torch.randn(10, 1) * 100  # Generating a tensor with values roughly from -100 to 100
# output_tensor = normalize_and_scale(input_tensor)
# 
# print(output_tensor)
def Passage_matrix_I_to_HW_3D(I, J, k, device):
    """ This function outputs a tensor matrix that allows to pass from the
    Image basis to the HW basis. We assume to consider square images with no
    channels.
    Args:
        - I: size of the input image
        - device: torch device (cpu, cuda, etc...)
    Output:
        - Passage_matrix: tensor matrix of size (int(binom(2*I,2)), I**2) that allows
        to pass from the Image basis to the HW basis.
    """
    Passage_matrix = torch.zeros((int(binom(I+I+J,k)), I*I*J), dtype=torch.uint8, device=device)
    mapping_input = map_RBS_I2_3D_bottom(I,J)
    mapping_output = map_RBS(I+I+J,k)
    for line in range(I):
        for column in range(I):
            for channel in range(J):
                # print("line: " + str(line) + ", " + str(I+column) + ", " + str(2*I+channel) )
                output_index = mapping_output[(line, I+column, 2*I+channel)]
                intput_index = mapping_input[(line, I+column, 2*I+channel)]
                Passage_matrix[output_index, intput_index] = 1
    return(Passage_matrix)


class Basis_Change_I_to_HW_density_3D(nn.Module):
    """ This module allows to change the basis from the Image basis to the HW basis."""

    def __init__(self, I, J, k, device):
        """ We suppose that the input image is square and we consider no channels. """
        super().__init__()
        self.Passage_matrix = Passage_matrix_I_to_HW_3D(I, J, k, device).to(torch.float)

    def forward(self, input_state):
        """ This module forward a tensor made of each pure sate weighted by their
        probabilities that describe the output mixted state form the pooling layer. 
        Arg:
            - input: a torch vector representing the initial input state. Its
            dimension is (nbr_batch, I**2, I**2).
        Output:
            - a torch density operator that represents the output mixted state in
            the basis of HW 2. Its dimension is (nbr_batch, binom(2*I,2), binom(2*I,2)).
        """
        # input_state = torch.einsum('bii, oi->boi', input_state, self.Passage_matrix.to(torch.float32))
        # input_state = torch.einsum('boi, ai->boa', input_state, self.Passage_matrix.to(torch.float32))
        # return (input_state)
        return self.Passage_matrix @ input_state @ self.Passage_matrix.T

class Dense_RBS_density_3D(nn.Module):
    """ This module describes the action of one RBS based VQC. """

    def __init__(self, I, J, k, list_gates, device):
        """ Args:
            - I: size of the square input image
            - list_gates: list of tuples representing the qubits affected by each RBS
            - device: torch device (cpu, cuda, etc...) 
        """
        super().__init__()
        # We only store the RBS unitary corresponding to an edge in the qubit connectivity: 
        self.RBS_Unitaries_dict = RBS_Unitaries(I+I+J, k, list_gates, device)
        self.RBS_gates = nn.ModuleList([RBS_Dense_density(list_gates[i], device) for i in range(len(list_gates))])

    def forward(self, input_state):
        """ Feedforward of the RBS based VQC.
        Arg:
            - input_state = a density operator on which is applied the RBS from the 
            VQC. Its dimension is (nbr_batch, binom(2*I,2), binom(2*I,2))
        Output:
            - final density operator from the application of the RBS from the VQC on
            the input density operator. Its dimension is (nbr_batch, binom(2*I,2), binom(2*I,2)).
        """
        input_state = input_state.float()
        for RBS in self.RBS_gates:
            input_state = RBS(input_state, self.RBS_Unitaries_dict)
        return (input_state)


class Dense_RBS_density_3D_para(nn.Module):
    """ This module describes the action of one RBS based VQC. """

    def __init__(self, I, J, k, list_gates, angles, device):
        """ Args:
            - I: size of the square input image
            - list_gates: list of tuples representing the qubits affected by each RBS
            - angles: with length list_gates
            - device: torch device (cpu, cuda, etc...) 
        """
        super().__init__()
        # We only store the RBS unitary corresponding to an edge in the qubit connectivity: 
        self.RBS_Unitaries_dict = RBS_Unitaries(I+I+J, k, list_gates, device)
        self.RBS_gates = nn.ModuleList([RBS_Dense_density_para(list_gates[i], angles[i], device) for i in range(len(list_gates))])

    def forward(self, input_state):
        """ Feedforward of the RBS based VQC.
        Arg:
            - input_state = a density operator on which is applied the RBS from the 
            VQC. Its dimension is (nbr_batch, binom(2*I,2), binom(2*I,2))
        Output:
            - final density operator from the application of the RBS from the VQC on
            the input density operator. Its dimension is (nbr_batch, binom(2*I,2), binom(2*I,2)).
        """
        input_state = input_state.float()
        for RBS in self.RBS_gates:
            input_state = RBS(input_state, self.RBS_Unitaries_dict)
        return (input_state)


In [36]:
I = 3
O = I//2
n = 2*I
k = 3
K = I
J = 2
device = torch.device("mps")
device_cpu = torch.device("mps")
batch_size = 1

list_gates_pyramid = []
angles = []
PQNN_param_dictionary, PQNN_dictionary, PQNN_layer = PQNN_building_brick(0, I+I+J, index_first_RBS=0, index_first_param=0)
for x, y in PQNN_dictionary.items():
    list_gates_pyramid.append((y,y+1))
    angles.append(0.31)

list_gates_pyramid_reduced = get_reduced_layers_structure(I+I+J, 5)
angles_reduced = [0.31 for i in range(len(list_gates_pyramid_reduced))]

# list_gates_pyramid2 = []
# PQNN_param_dictionary, PQNN_dictionary, PQNN_layer = PQNN_building_brick(0, 5, index_first_RBS=0, index_first_param=0)
# for x, y in PQNN_dictionary.items():
#     list_gates_pyramid2.append((y,y+1))

list_gates_naive = [(6,7)]
angles_naive = [0.31]
dense_naive = Dense_RBS_density_3D_para(I, J, k, list_gates_naive, angles_naive, device)


dimension = int(binom(I+I+J, k))
dense1 = Dense_RBS_density_3D_para(I, J, k, list_gates_pyramid, angles, device)
dense1_reduced = Dense_RBS_density_3D_para(I, J, k, list_gates_pyramid_reduced, angles_reduced, device)
tomo = Tomograph_state(10, device)
tomop = Tomograph_state(10, device)
# dense2 = Dense_RBS_density_3D(0, 5, k, list_gates_pyramid2, device)

x = torch.randn(batch_size, dimension, dimension).to(device)
d1 = dense1(x)
d2 = dense1(x)
d1p = dense_naive(d2)
# print(torch.sum(d1[0][-10:,-10:]-d1p[0][-10:,-10:]))
# print(d1.shape)
# print(d1p.shape)
tomo = tomo(d1)
tomop = tomop(d1p)
print(torch.sum(tomop-tomo))
# d2 = dense2(tomo)
# d2.shape

tensor(0.1466, device='mps:0', grad_fn=<SumBackward0>)


In [14]:
I = 12
O = I//2
n = 2*I
k = 3
K = 2
J = 2

dimension = int(binom(I+I+J, k))
final_dimension = int(binom(O+J, k))
device = torch.device("mps")
device_cpu = torch.device("mps")
# trained_observable_diagonal = torch.linspace(-1, 1, steps=int(binom(O+J, k)))
batch_size = 10  # the number of examples per batch
observable = torch.randn(batch_size, final_dimension, final_dimension)
class_number = 10
train_loader, test_loader, dim_in, dim_out = load.load_MNIST(batch_size=batch_size)
scala = 1000
reduced_loader = reduce_MNIST_dataset(train_loader, scala)
# list_gates_pyramid = []
# PQNN_param_dictionary, PQNN_dictionary, PQNN_layer = PQNN_building_brick(0, I+I+J, index_first_RBS=0, index_first_param=0)
# for x, y in PQNN_dictionary.items():
#     list_gates_pyramid.append((y,y+1))
#     
# list_gates_pyramid2 = []
# PQNN_param_dictionary, PQNN_dictionary, PQNN_layer = PQNN_building_brick(0, 5, index_first_RBS=0, index_first_param=0)
# for x, y in PQNN_dictionary.items():
#     list_gates_pyramid2.append((y,y+1))
    
list_gates = [(i, j) for i in range(O+J) for j in range(O+J) if i != j]

# full_model = nn.Sequential(Conv_RBS_density_I2_3D(I,2,J,device),
#                            Basis_Change_I_to_HW_density_3D(I, J, k, device),
#                            Dense_RBS_density_3D(I, J, k, list_gates_pyramid, device),
#                            Tomograph_state(class_number, device),
#                            Dense_RBS_density_3D(0, 5, k, list_gates_pyramid2, device))
# full_model = nn.Sequential(Conv_RBS_density_I2_3D(I,2,J,device),
#                            Conv_RBS_density_I2_3D(I,3,J,device),
#                            Basis_Change_I_to_HW_density_3D(I, J, k, device),
#                            Dense_RBS_density_3D(I, J, k, list_gates_pyramid, device),
#                            Tomograph_state(class_number, device),
#                            Dense_RBS_density_3D(0, 5, k, list_gates_pyramid2, device))

full_model = nn.Sequential(Conv_RBS_density_I2_3D(I,2,J,device),
                           Pooling_2D_density_3D(I, O, J, device),
                           Conv_RBS_density_I2_3D(O,2,J,device),
                           Pooling_2D_density_3D(O, O//2, J, device),
                           Basis_Change_I_to_HW_density_3D(O//2, J, k, device),
                           Dense_RBS_density_3D(O//2, J, k, list_gates, device),
                           TrainedMeasureLayer(batch_size, observable, device_cpu))

loss_function = torch.nn.MSELoss(reduction='sum')
optimizer = torch.optim.Adagrad(full_model.parameters(), lr=1e-1, lr_decay=1e-6, weight_decay=0, initial_accumulator_value=1e-6, eps=1e-10)

# optimizer = torch.optim.Adam(full_model.parameters(), lr=1e-2)
# loss_function = torch.nn.CrossEntropyLoss()

def train_net(network, train_loader, loss_function, optimizer, device):
    network.train()
    train_loss = 0
    train_accuracy = 0
    for batch_idx, (data, target) in enumerate(train_loader):
        new_size = I
        adaptive_avg_pool = AdaptiveAvgPool2d((new_size, new_size))
        data = adaptive_avg_pool(data).to(device)
        init_density_matrix = to_density_matrix(F.normalize(data.squeeze().resize(data.size()[0],I**2), p=2, dim=1).to(device), device)
        copied_density_matrix = copy_images_bottom_channel(init_density_matrix, J).to(device)
        out_network = network(copied_density_matrix)
        # print(out_network.shape)
        # out_network = map_HW_to_measure(out_network, I, J, k, batch_size, device)
        # print(out_network.shape)
        # print(target.shape)
        # training
        # targets = get_batch_projectors(target, batch_size, int(I*I*J), device)
        loss = loss_function(out_network.to(device_cpu),target.float().to(device))
        train_loss += loss.item()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # predict digital number
        predict_number = batch_float_to_int(out_network.to(device_cpu))
        acc = predict_number.eq(target.view_as(predict_number)).sum().item()
        train_accuracy += acc
        
        # pred = out_network.argmax(dim=1, keepdim=True).to(device)  # the class chosen by the network is the highest output
        # acc = pred.eq(target.to(device).view_as(pred)).sum().item()  # the accuracy is the proportion of correct classes
        # train_accuracy += acc  # increment accuracy of whole test set

        # delete variable to free memory
        del out_network
        gc.collect()

    train_accuracy /= len(train_loader.dataset)
    train_loss /= (batch_idx + 1)
    return train_accuracy, train_loss


for epoch in range(100):
    # start = time.time()
    train_accuracy, train_loss = train_net(full_model, reduced_loader, loss_function, optimizer, device)
    print(f'Epoch {epoch}: Loss = {train_loss:.6f}, accuracy = {train_accuracy*100:.4f} %')
    # end = time.time()
    # print("The time of execution of above program is :",(end-start), "s")

Epoch 0: Loss = 176.554815, accuracy = 10.0000 %
Epoch 1: Loss = 135.451085, accuracy = 16.6667 %
Epoch 2: Loss = 132.114540, accuracy = 13.3333 %
Epoch 3: Loss = 130.146144, accuracy = 16.6667 %
Epoch 4: Loss = 128.482192, accuracy = 20.0000 %
Epoch 5: Loss = 126.988327, accuracy = 20.0000 %
Epoch 6: Loss = 125.584359, accuracy = 21.6667 %
Epoch 7: Loss = 124.202453, accuracy = 21.6667 %
Epoch 8: Loss = 122.785531, accuracy = 23.3333 %
Epoch 9: Loss = 121.297185, accuracy = 25.0000 %
Epoch 10: Loss = 119.756743, accuracy = 25.0000 %
Epoch 11: Loss = 118.279426, accuracy = 21.6667 %
Epoch 12: Loss = 116.766685, accuracy = 21.6667 %
Epoch 13: Loss = 118.514075, accuracy = 21.6667 %
Epoch 14: Loss = 112.010890, accuracy = 18.3333 %
Epoch 15: Loss = 110.431975, accuracy = 20.0000 %
Epoch 16: Loss = 108.331834, accuracy = 21.6667 %
Epoch 17: Loss = 106.515694, accuracy = 16.6667 %
Epoch 18: Loss = 105.260644, accuracy = 18.3333 %
Epoch 19: Loss = 103.880637, accuracy = 20.0000 %
Epoch 20: 