In [94]:
import torch
import torch.nn as nn
from torchvision import datasets, transforms
import numpy as np
from sklearn.decomposition import PCA
from sklearn.preprocessing import MinMaxScaler
import random

import perceval as pcvl

In [95]:
# Simple Linear Model
class LinearModelBaseline(nn.Module):
    def __init__(self, input_size, num_classes=10):
        super(LinearModelBaseline, self).__init__()
        self.image_size = input_size

        # Classical part
        self.classifier = nn.Linear(input_size, num_classes)

    def forward(self, x):
        # Data is already flattened
        output = self.classifier(x)
        return output

In [83]:
from merlin.datasets import mnist_digits

def get_mnist_data(source="percevalquest"):
    """
    Get MNIST dataset. The returned images are flattened, scaled with MinMaxScaler and converted to torch.tensors.

    Args:
        source (str): ["percevalquest", "full_dataset"] Whether to return the percevalquest subset of the full dataset of MNIST

    Returns:
        X_train, y_train, X_test, y_test
    """
    if source == "percevalquest":
        train_features, train_labels, train_metadata = mnist_digits.get_data_train_percevalquest()
        test_features, test_labels, test_metadata = mnist_digits.get_data_test_percevalquest()

    elif source == "full_dataset":
        # Define transform: convert images to tensors and normalize
        transform = transforms.ToTensor()

        # Download training data (60,000 samples)
        train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)

        # Separate images and labels
        train_features = torch.stack([data[0] for data in train_dataset])  # shape: [60000, 1, 28, 28]
        train_labels = torch.tensor([data[1] for data in train_dataset])  # shape: [60000]

        # Download test data (10,000 samples)
        test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

        test_features = torch.stack([data[0] for data in test_dataset])
        test_labels = torch.tensor([data[1] for data in test_dataset])

    else:
        raise ValueError(f"Unknown source: {source}")

    # Flatten the images from (N, 28, 28) to (N, 784)
    train_features = train_features.reshape(train_features.shape[0], -1)
    test_features = test_features.reshape(test_features.shape[0], -1)

    # Scale
    scaler = MinMaxScaler()
    train_features = scaler.fit_transform(train_features)
    test_features = scaler.transform(test_features)

    # Convert data to PyTorch tensors
    X_train = torch.FloatTensor(train_features)
    y_train = torch.LongTensor(train_labels)
    X_test = torch.FloatTensor(test_features)
    y_test = torch.LongTensor(test_labels)

    print(f"Dataset loaded: {len(X_train)} training samples, {len(X_test)} test samples")
    print(f"Train dataset shape : {X_train.shape}")
    print(f"Train label shape : {y_train.shape}")
    return X_train, y_train, X_test, y_test


source = "full_dataset"  # ["percevalquest", "full_dataset"]
X_train, y_train, X_test, y_test = get_mnist_data(source)

Dataset loaded: 60000 training samples, 10000 test samples
Train dataset shape : torch.Size([60000, 784])
Train label shape : torch.Size([60000])


In [96]:
# Important hyperparameters
M=16
N=3
n_components=M-1

In [97]:
# train PCA
def train_pca(X_train, X_test):
    pca = PCA(n_components=n_components)

    pca.fit(X_train)
    X_train_pca = torch.FloatTensor(pca.transform(X_train))
    X_test_pca = torch.FloatTensor(pca.transform(X_test))

    # Compute min and max from the training set
    min_val = X_train_pca.min()
    max_val = X_train_pca.max()

    # Avoid division by zero if max == min
    if max_val == min_val:
        raise ValueError("Cannot scale: all values in X_train_pca are the same.")

    # Scale both train and test using training set stats so they range between 0 and 1
    X_train_pca = (X_train_pca - min_val) / (max_val - min_val)
    X_test_pca = (X_test_pca - min_val) / (max_val - min_val)

    print(X_train_pca)
    print(X_train_pca.shape)
    return X_train_pca, X_test_pca

X_train_pca, X_test_pca = train_pca(X_train, X_test)

tensor([[0.4322, 0.4800, 0.4069,  ..., 0.4295, 0.5907, 0.3451],
        [0.6573, 0.4755, 0.2495,  ..., 0.4817, 0.4238, 0.4610],
        [0.3876, 0.3013, 0.4485,  ..., 0.3533, 0.4284, 0.4374],
        ...,
        [0.3556, 0.3601, 0.4661,  ..., 0.3589, 0.4295, 0.3065],
        [0.4338, 0.4021, 0.2704,  ..., 0.5083, 0.3800, 0.3305],
        [0.3567, 0.4070, 0.2597,  ..., 0.3449, 0.3277, 0.3550]])
torch.Size([60000, 15])


In [99]:
def initialize_left_and_right_circuits(n_modes):
    """
    Initialize random left and right side quantum circuits.
    """
    left_side = pcvl.GenericInterferometer(n_modes,
                            lambda idx: pcvl.BS(theta=np.pi*2*random.random()) // (0, pcvl.PS(phi=np.pi * 2 * random.random())),
                            shape=pcvl.InterferometerShape.RECTANGLE,
                            depth=2 * n_modes,
                            phase_shifter_fun_gen=lambda idx: pcvl.PS(phi=np.pi*2*random.random()))
    right_side = pcvl.GenericInterferometer(n_modes,
                            lambda idx: pcvl.BS(theta=np.pi*2*random.random()) // (0, pcvl.PS(phi=np.pi * 2 * random.random())),
                            shape=pcvl.InterferometerShape.RECTANGLE,
                            depth=2 * n_modes,
                            phase_shifter_fun_gen=lambda idx: pcvl.PS(phi=np.pi*2*random.random()))
    return left_side, right_side

left, right = initialize_left_and_right_circuits(M)

In [100]:
def create_circuit(input_size, n_modes, left, right, scale=1, input=None):
    """
    Create a quantum circuit.
    """
    left_side = left

    center = pcvl.Circuit(n_modes)
    for i in range(input_size):
        if input is None:
            param = pcvl.P(f"theta_{i}")
            center.add(i, pcvl.PS(phi=param))
        else:
            center.add(i, pcvl.PS(input[i] * scale))

    right_side = right

    circuit = pcvl.Circuit(n_modes)
    circuit.add(0, left_side, merge=True)
    circuit.add(0, center, merge=True)
    circuit.add(0, right_side, merge=True)
    return circuit

def create_input_state(n_photons, n_modes, photons_at_start=True):
    """
    Create a quantum input state.
    """
    input_state = [0] * n_modes
    i = 0
    index = 0
    while i < n_photons:
        if photons_at_start:
            input_state[i] = 1
        else:
            input_state[index] = 1

        index += n_modes // n_photons
        i += 1
    print(f"Input state : {input_state}")
    return input_state

photons_at_start = False
input_state = create_input_state(n_photons=N, n_modes=M, photons_at_start=photons_at_start)

Input state : [1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0]


In [102]:
def get_quantum_output_proba(data, scale=1):
    """
    Using Perceval, calculate the perfect output probability distributions out of the quantum circuit when sending in data in input.

    Args:
        data (torch.Tensor): [Num_data, Num_PCA_components] Input data to the circuit
        scale (float): Value to multiply each input value before encoding it into the circuit

    Returns:
        output (torch.Tensor): [Num_data, Num_possible_output_states] Output probabilities of the quantum circuit
    """
    circuit = create_circuit(input_size=M-1, n_modes=M, left=left, right=right, scale=scale)
    local_p = pcvl.Processor("SLOS", circuit)
    local_p.with_input(pcvl.BasicState(input_state))
    sampler = pcvl.algorithm.Sampler(local_p)
    # Using the add_iteration_list method does not seem to accelerate the process
    sampler.add_iteration_list([{"circuit_params": {f"theta_{i}": float(val) * float(scale) for i, val in enumerate(input)}} for input in data])
    prob_dist = sampler.probs()
    result_list = prob_dist["results_list"]

    output= []
    for r in result_list:
        result = r["results"]
        vector = []
        for state, value in result.items():
            vector.append(value)
        assert np.isclose(np.sum(vector), 1), f"Sum of vector is not 1, {np.sum(vector)} != 1"

        output.append(vector)

    output = torch.tensor(output)
    print(output.shape)
    return output

In [101]:
def get_quantum_output_sampling(data, n_sample, scale=1):
    """
    Using Perceval, calculate approximate output probability distributions out of the quantum circuit when sending in data in input by sampling.

    Args:
        data (torch.Tensor): [Num_data, Num_PCA_components] Input data to the circuit
        n_sample (int): Number of samples to generate
        scale (float): Value to multiply each input value before encoding it into the circuit

    Returns:
        output (torch.Tensor): [Num_data, Num_possible_output_states] Output probabilities of the quantum circuit
    """
    # Find all possible states and store them in "possible_states"
    circuit = create_circuit(input_size=M-1, n_modes=M, left=left, right=right, scale=scale, input=data[0])
    local_p = pcvl.Processor("SLOS", circuit)
    local_p.with_input(pcvl.BasicState(input_state))
    sampler = pcvl.algorithm.Sampler(local_p)
    prob_dist = sampler.probs()
    results = prob_dist["results"]
    possible_states = []
    for state, values in results.items():
        possible_states.append(state)
    possible_states = [str(state) for state in possible_states]

    # Sample from distribution
    circuit = create_circuit(input_size=M-1, n_modes=M, left=left, right=right, scale=scale)
    local_p = pcvl.Processor("SLOS", circuit)
    local_p.with_input(pcvl.BasicState(input_state))
    sampler = pcvl.algorithm.Sampler(local_p)

    # Using the add_iteration_list method does not seem to accelerate the process
    sampler.add_iteration_list([{"circuit_params": {f"theta_{i}": float(val) * float(scale) for i, val in enumerate(input)}} for input in data])
    sample_count = sampler.sample_count(n_sample)
    result_list = sample_count["results_list"]

    output= []
    for r in result_list:
        result = r["results"]
        total = 0
        vector = [0] * len(possible_states)
        for state, value in result.items():
            index = possible_states.index(str(state))
            vector[index] = value
            total += value
        vector = np.array(vector) / total
        assert np.isclose(np.sum(vector), 1), f"Sum of vector is not 1, {np.sum(vector)} != 1"

        output.append(vector)

    output = torch.tensor(output)
    print(output.shape)
    return output

In [103]:
# Get quantum output probabilities
scale = np.pi * 2
n_sampling = 1e6
train_output = get_quantum_output_proba(X_train_pca, n_sampling, scale=scale)
test_output = get_quantum_output_proba(X_test_pca, n_sampling, scale=scale)

KeyboardInterrupt: 

In [91]:
# Save the ouptutprobability distributions
train_file_path = f"./complete_dataset_saved_probas/train_{N}_{M}_scale_{scale:.2f}.pt"
test_file_path = f"./complete_dataset_saved_probas/test_{N}_{M}_scale_{scale:.2f}.pt"
torch.save(train_output, train_file_path)
torch.save(test_output, test_file_path)

In [69]:
def get_run_info(N, M, only_use_pca=False):
    """ Helper to display information """
    s = "#######################################################################################\n"
    s += f"N_M = {N}_{M}\n"
    s += f"only_use_pca = {only_use_pca}\n"
    s += f"scale_type = {scale}\n"
    return s

In [70]:
def get_summary_metrics(history):
    """ Helper to display metrics """
    linear_best_test_acc = max(history["linear"]["accuracy"]["test"])
    linear_best_test_acc_index = history["linear"]["accuracy"]["test"].index(max(history["linear"]["accuracy"]["test"]))
    linear_train_acc = history["linear"]["accuracy"]["train"][linear_best_test_acc_index]
    s = f"Linear best test accuracy: {linear_best_test_acc:.4f} and associated train accuracy {linear_train_acc:.4f} at epoch {linear_best_test_acc_index}\n"
    return s

In [92]:
def train(path_to_train, path_to_test, X_train, X_test, only_use_pca=False, balance_q_and_raw=False):
    """
    Train linear model that takes quantum outputs as inputs for classification.

    Args:
        path_to_train (str): Path to the tensor training data
        path_to_test (str): Path to the tensor testing data
        X_train (torch.Tensor): Raw MNIST training data
        X_test (torch.Tensor): Raw MNIST testing data

        only_use_pca (bool): If true, only use the quantum outputs and ignore the raw MNIST data
        balance_q_and_raw (bool): If true, balance the quantum outputs and the raw MNIST data with a custom-made normalization
    """
    # Loss function and optimizer
    criterion = nn.CrossEntropyLoss()

    # Optimizer AdaGrad
    if only_use_pca:
        linear_model = LinearModelBaseline(input_size=output.shape[1], num_classes=10)
    else:
        linear_model = LinearModelBaseline(input_size=X_train.shape[1] + output.shape[1], num_classes=10)
    optimizer_linear = torch.optim.Adagrad(linear_model.parameters(), lr=0.05)

    # Create DataLoader for batching
    quantum_output_train = torch.load(path_to_train)
    quantum_output_test = torch.load(path_to_test)

    # Batch_size = 100
    batch_size = 100

    if only_use_pca:
        train_dataset = torch.utils.data.TensorDataset(quantum_output_train, y_train)
    else:
        # Scale X_train and quantum_output_train so they have the same norm to balance importance
        if balance_q_and_raw:
            q_output_norm = torch.norm(quantum_output_train, dim=1)
            raw_data_norm = torch.norm(X_train, dim=1)
            quantum_output_train = quantum_output_train / q_output_norm.unsqueeze(1)
            X_train = X_train / raw_data_norm.unsqueeze(1)

            q_output_norm = torch.norm(quantum_output_test, dim=1)
            raw_data_norm = torch.norm(X_test, dim=1)
            quantum_output_test = quantum_output_test / q_output_norm.unsqueeze(1)
            X_test = X_test / raw_data_norm.unsqueeze(1)

        train_combined = torch.cat((X_train, quantum_output_train), dim=1)
        print(train_combined.shape)
        train_dataset = torch.utils.data.TensorDataset(train_combined, y_train)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    # Training loop
    num_epochs = 100

    history = {
          'linear': {'loss': [], 'accuracy': {'train': [], 'test': []}},
          'epochs': []
    }

    for epoch in range(num_epochs):

        running_loss_linear = 0.0

        linear_model.train()

        for i, (input, labels) in enumerate(train_loader):

            # Linear model - Forward and Backward pass
            outputs = linear_model(input.float())
            loss = criterion(outputs, labels)
            optimizer_linear.zero_grad()
            loss.backward()
            optimizer_linear.step()
            running_loss_linear += loss.item()

        avg_loss_linear = running_loss_linear/len(train_loader)
        history['linear']['loss'].append(avg_loss_linear)
        history['epochs'].append(epoch + 1)

        linear_model.eval()
        with torch.no_grad():

            if only_use_pca:
                outputs = linear_model(quantum_output_train.float())
                _, predicted = torch.max(outputs, 1)
                linear_train_accuracy = (predicted == y_train).sum().item() / y_train.size(0)

                outputs = linear_model(quantum_output_test.float())
                _, predicted = torch.max(outputs, 1)
                linear_test_accuracy = (predicted == y_test).sum().item() / y_test.size(0)
            else:
                train_combined = torch.cat((X_train, quantum_output_train), dim=1)
                outputs = linear_model(train_combined.float())
                _, predicted = torch.max(outputs, 1)
                linear_train_accuracy = (predicted == y_train).sum().item() / y_train.size(0)

                test_combined = torch.cat((X_test, quantum_output_test), dim=1)
                outputs = linear_model(test_combined.float())
                _, predicted = torch.max(outputs, 1)
                linear_test_accuracy = (predicted == y_test).sum().item() / y_test.size(0)

        history['linear']['accuracy']['train'].append(linear_train_accuracy)
        history['linear']['accuracy']['test'].append(linear_test_accuracy)

        print(f'\nEpoch [{epoch+1}/{num_epochs}],\nLOSS ------------ Linear: {avg_loss_linear:.4f}'+
              f',\nTRAIN ACCURACY -- Linear: {linear_train_accuracy:.4f},\nTEST ACCURACY --- Linear: {linear_test_accuracy:.4f}\n')

    run_info = get_run_info(N, M, only_use_pca=only_use_pca)
    print(run_info)

    summary_metrics = get_summary_metrics(history)
    print(summary_metrics)
    return


only_use_pca = False
balance_q_and_raw = True
train(train_file_path, test_file_path, X_train, X_test, only_use_pca=only_use_pca, balance_pca_and_raw=balance_q_and_raw)

torch.Size([60000, 1148])

Epoch [1/100],
LOSS ------------ Linear: 0.4881,
TRAIN ACCURACY -- Linear: 0.9097,
TEST ACCURACY --- Linear: 0.9139


Epoch [2/100],
LOSS ------------ Linear: 0.3230,
TRAIN ACCURACY -- Linear: 0.9206,
TEST ACCURACY --- Linear: 0.9229


Epoch [3/100],
LOSS ------------ Linear: 0.2892,
TRAIN ACCURACY -- Linear: 0.9263,
TEST ACCURACY --- Linear: 0.9274


Epoch [4/100],
LOSS ------------ Linear: 0.2707,
TRAIN ACCURACY -- Linear: 0.9299,
TEST ACCURACY --- Linear: 0.9301


Epoch [5/100],
LOSS ------------ Linear: 0.2582,
TRAIN ACCURACY -- Linear: 0.9326,
TEST ACCURACY --- Linear: 0.9326


Epoch [6/100],
LOSS ------------ Linear: 0.2491,
TRAIN ACCURACY -- Linear: 0.9342,
TEST ACCURACY --- Linear: 0.9353


Epoch [7/100],
LOSS ------------ Linear: 0.2419,
TRAIN ACCURACY -- Linear: 0.9347,
TEST ACCURACY --- Linear: 0.9363


Epoch [8/100],
LOSS ------------ Linear: 0.2362,
TRAIN ACCURACY -- Linear: 0.9367,
TEST ACCURACY --- Linear: 0.9379


Epoch [9/100],
LOSS ---------

KeyboardInterrupt: 

Save the probability tensors to json format

In [93]:
# Save the saved probability tensors to json format together
import json
Ns = [3]
Ms = [12]
scales = [2 * np.pi]

data_dico = {}
for M in Ms:
    train_path = f"./complete_dataset_saved_probas/train_{N}_{M}_scale_{scale:.2f}.pt"
    test_path = f"./complete_dataset_saved_probas/test_{N}_{M}_scale_{scale:.2f}.pt"

    train_tensor = torch.load(train_path)
    test_tensor = torch.load(test_path)
    data_dico[M] = {"train": train_tensor.tolist(), "test": test_tensor.tolist()}

with open("./complete_dataset_saved_probas/quantum reservoir saved probas.json", "w") as f:
    json.dump(data_dico, f)