In [1]:
import numpy as np
from qiskit import QuantumCircuit
from qiskit.circuit.library import StatePreparation
from qiskit.quantum_info import DensityMatrix, partial_trace
from qclib.machine_learning.datasets import digits
from qclib.state_preparation.util.baa import _split_combinations

from qdna.compression import SchmidtCompressor
from qdna.quantum_info import correlation

In [2]:
def plot_digits(vectors, num_data_qubits):
    import matplotlib.pyplot as plt
    _dpi = 96
    matrix_dim_1 = 2**(int(np.ceil(num_data_qubits/2)))
    matrix_dim_2 = 2**(int(np.floor(num_data_qubits/2)))

    ncols = len(vectors)
    _, axes = plt.subplots(
        nrows=1,
        ncols=ncols,
        figsize=(ncols*10*matrix_dim_1/_dpi, 10*matrix_dim_2/_dpi),
        dpi=_dpi
    )
    for ax, vector in zip(axes, vectors):
        ax.set_axis_off()
        image = vector.reshape(matrix_dim_1, matrix_dim_2)
        ax.imshow(image, cmap=plt.cm.gray, interpolation='none')

    plt.show()

***
## Experiment main routine
***

In [3]:
# Creates an example of the circuit.
def complete_circuit(n_qubits, state, compressor):
    # Typical state initializer.
    initializer = StatePreparation(state)

    # Creates the quantum circuit.
    circuit = QuantumCircuit(n_qubits)

    # Circuit on Alice's side.
    circuit.append(initializer.definition, range(n_qubits))
    circuit.append(compressor.definition, range(n_qubits))
    # circuit.reset(compressor.latent_qubits)

    return circuit

In [4]:
def experiment(n_qubits, test_input, compressor, verbose=1):
    # Applies the compression process to each of the test samples.

    rhos = []

    # Iterates through all test samples.
    for i, test_sample in enumerate(np.concatenate([test_samples for _, test_samples in test_input.items()])):
        circuit = complete_circuit(n_qubits, test_sample, compressor)

        # Compares the trash state with the reference state |0>.
        trash_state = np.array(
            partial_trace(DensityMatrix(circuit), compressor.latent_qubits)
        ).reshape(-1)
        trash_state = np.concatenate([[np.real(e), np.imag(e)] for e in trash_state])

        # Stores and prints the results.
        rhos.append(trash_state)
        
        if verbose > 0:
            print(i, '-', rhos[-1])

    return rhos

***
## Calculates the typical state
***

In [5]:
# Estimate the centroid.
# Simply the average of the training samples (or a random selection of samples).
def calculate_typical_state(n_qubits, training_input):
    centroid = np.zeros(2**n_qubits)
    for train_sample in np.concatenate([train_samples for _, train_samples in training_input.items()]):
        centroid += train_sample

    typical_state = centroid / np.linalg.norm(centroid)

    return typical_state

***
## Find the best partitioning configuration.
***

In [6]:
def find_partitioning(n_qubits, typical_state, n_trash_partition, verbose=0):
    n_latent_partition = n_qubits - n_trash_partition
    min_entropy = np.iinfo(np.int32).max # 2147483648
    for partition in _split_combinations(range(n_qubits), n_latent_partition):
        set_a = set(partition)
        set_b = set(range(n_qubits)).difference(set_a)
        entropy = correlation(typical_state, set_a, set_b)

        if verbose > 0:
            print('latent', set_a, 'trash', set_b, 'entropy', entropy)

        if entropy <= min_entropy:
            min_entropy = entropy
            latent_partition = sorted(partition)
            trash_partition = sorted(set(range(n_qubits)).difference(set(latent_partition)))

    return latent_partition, trash_partition

***
## Experiment
***

In [7]:
from torch import (
    optim,
    tensor,
    float32,
    no_grad,
    complex64,
    sum as sumt
)

# Define torch NN module
from torch.nn import (
    Module,
    Linear,
    Sequential,
    MSELoss,
    CrossEntropyLoss,
    CELU,
    ReLU,
    Sigmoid
)

class NeuralNet(Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.linear_stack = Sequential(
            Linear(input_size, hidden_size, dtype=float32),
            CELU(),
            Linear(hidden_size, hidden_size, dtype=float32),
            CELU(),
            Linear(hidden_size, output_size, dtype=float32),
            Sigmoid()
        )
        
    def forward(self, x):
        x = self.linear_stack(x)
        return x.reshape(-1)

In [8]:
def new_data(batch_size, x, y):
    x_new, y_new = [], [],
    for _ in range(batch_size):
        n = np.random.randint(len(x))
        x_new.append(x[n])
        y_new.append(y[n])
        
    return (
        tensor(np.array(x_new), dtype=float32),
        tensor(np.array(y_new), dtype=float32)
    )

In [9]:
import warnings
warnings.simplefilter('ignore')

def full_experiment(class_label, n_trash_partition, reps=1):
    # Dataset load.
    seed = 42
    _, training_input, test_input, class_labels = digits.load(
        classes=[class_label],
        training_size=150,
        test_size=20,
        random_seed=seed
    )
    
    feature_dim = len(training_input[class_labels[0]][0])
    n_qubits = int(np.ceil(np.log2(feature_dim)))

    # Calculate the typical state.
    typical_state = calculate_typical_state(n_qubits, training_input)

    print('target class:', class_label)

    # Search for the best partitioning.
    _, trash_partition = find_partitioning(
        n_qubits,
        typical_state,
        n_trash_partition,
        verbose=0
    )

    print('\t', "trash qubits:", n_trash_partition)
    print('\t\ttrash_partition:', trash_partition)

    # Buld the compressor.
    compressor = SchmidtCompressor(
        typical_state,
        opt_params={
            'partition': trash_partition,
            'lr': 0
        }
    )

    # CREATE THE DATASETS FROM THE EXTRACTED FEATURES

    training_rhos = {}
    test_rhos = {}

    # Calculates the training dataset.
    training_input[class_label] = np.array(list(training_input[class_label]) * 9) # balance dataset
    training_rhos[0] = experiment(
        n_qubits,
        training_input,
        compressor,
        verbose=0
    )

    # Calculate test density matrices.
    test_rhos[0] = experiment(
        n_qubits,
        test_input,
        compressor,
        verbose=0
    )

    # Calculate density matrices to the other classes.
    training_rhos[1] = []
    test_rhos[1] = []
    for i in [j for j in range(10) if j not in class_labels]:
        # Dataset load.
        _, _training_input, _test_input, _ = digits.load(
            classes=[i],
            training_size=150,
            test_size=20,
            random_seed=seed
        )

        training_rhos[1].extend(
            experiment(
                n_qubits,
                _training_input,
                compressor,
                verbose=0
            )
        )

        test_rhos[1].extend(
            experiment(
                n_qubits,
                _test_input,
                compressor,
                verbose=0
            )
        )

    # FORMAT DATA

    # Format the training and test data.
    train_data = np.concatenate([
        [rho] for m in range(len(training_rhos)) for rho in training_rhos[m]
    ])
    test_data = np.concatenate([
        [rho] for m in range(len(test_rhos)) for rho in test_rhos[m]
    ])

    # Format the training and test labels.
    train_targets = np.concatenate([
        [m] * len(training_rhos[m]) for m in range(len(training_rhos))
    ])
    test_targets = np.concatenate([
        [m] * len(test_rhos[m]) for m in range(len(test_rhos))
    ])

    rho_size = len(train_data[0])
    print('\t\trho_size:', rho_size)
    
    total_losses = []
    tps = []
    tns = []
    total_ps = []
    total_ns = []
    accs = []
    fps = []
    fns = []
    f1s = []
    mccs = []


    # REPEAT THE WHOLE TRAINING+TESTING PROCESS `reps` TIMES
    #
    # This process uses the datasets created from the features
    # extracted using the Schmidt Compressor.

    for rep in range(reps):
        print('\t\trep:', rep)
        
        # TRAINING
        
        model  = NeuralNet(rho_size, 3*rho_size, 1)
        optimizer = optim.Adam(model.parameters(), lr=0.01)
        loss_func = MSELoss()

        loss_list = []  # Store loss history
        model.train()   # Set model to training mode

        iters = 1000
        batch_size = 25
        for i in range(iters):
            # Random sampling of data.
            x_batch, y_batch = new_data(batch_size, train_data, train_targets)

            optimizer.zero_grad(set_to_none=True) # Initialize gradient
            output = model(x_batch)               # Forward pass
            loss = loss_func(output, y_batch)     # Calculate loss
            loss.backward()                       # Backward pass
            optimizer.step()                      # Optimize weights

            loss_list.append(loss.item())         # Store loss

        print(
            f"Training [{100.0 * (i + 1) / iters:.0f}%]\t"
            f"Loss: {loss_list[-1]:.4f}\t"
            f"Avg. Loss: {sum(loss_list) / len(loss_list):.4f}"
        )

        # TESTING

        total_loss = []
        tp = 0
        tn = 0
        total_p = 0
        total_n = 0
        acc = 0
        fp = 0
        fn = 0
        f1 = 0
        mcc = 0

        model.eval()  # set model to evaluation mode
        with no_grad():
            for (data, target) in zip(test_data, test_targets):
                data = tensor(data, dtype=float32)
                target = tensor([target], dtype=float32)

                output = model(data)

                pred = np.round(output, 0)

                if pred == target:
                    if pred == 0:
                        tp += 1
                    else:
                        tn += 1
                
                if pred != target:
                    if pred == 0:
                        fp += 1
                    else:
                        fn += 1

                if target == 0:
                    total_p += 1
                else:
                    total_n += 1

                loss = loss_func(output, target)
                total_loss.append(loss.item())

            acc = (tp+tn) / (total_p+total_n)

            f1 = (2*tp) / (2*tp+fp+fn)

            mcc = (tp*tn - fp*fn) / np.sqrt((tp+fp)*(tp+fn)*(tn+fp)*(tn+fn))

            print(
                "Performance on test data:\n\tLoss: {:.4f}\n\t"
                "Accuracy: {:.1f}%\n\t"
                "MCC: {:.1f}%\n\t"
                "Accuracy 0: {:.1f}%\n\t"
                "Accuracy 1: {:.1f}%".format(
                    sum(total_loss) / (total_p+total_n),
                    acc * 100,
                    mcc,
                    tp / total_p * 100,
                    tn / total_n * 100
                )
            )

        total_losses.append(sum(total_loss))
        tps.append(tp)
        tns.append(tn)
        total_ps.append(total_p)
        total_ns.append(total_n)
        accs.append(acc)
        fps.append(fp)
        fns.append(fn)
        f1s.append(f1)
        mccs.append(mcc)

    return (total_losses, tps, tns, accs, fps, fns, f1s, mccs)

results = {}
for class_label in range(0, 10):
    results[class_label] = full_experiment(class_label, n_trash_partition=2, reps=10)


target class: 0
	 trash qubits: 2
		trash_partition: [3, 4]
		rho_size: 32
		rep: 0
Training [100%]	Loss: 0.0297	Avg. Loss: 0.0212
Performance on test data:
	Loss: 0.0000
	Accuracy: 100.0%
	MCC: 1.0%
	Accuracy 0: 100.0%
	Accuracy 1: 100.0%
		rep: 1
Training [100%]	Loss: 0.0245	Avg. Loss: 0.0217
Performance on test data:
	Loss: 0.0025
	Accuracy: 99.5%
	MCC: 1.0%
	Accuracy 0: 100.0%
	Accuracy 1: 99.4%
		rep: 2
Training [100%]	Loss: 0.0112	Avg. Loss: 0.0222
Performance on test data:
	Loss: 0.0083
	Accuracy: 99.0%
	MCC: 0.9%
	Accuracy 0: 100.0%
	Accuracy 1: 98.9%
		rep: 3
Training [100%]	Loss: 0.0098	Avg. Loss: 0.0222
Performance on test data:
	Loss: 0.0016
	Accuracy: 100.0%
	MCC: 1.0%
	Accuracy 0: 100.0%
	Accuracy 1: 100.0%
		rep: 4
Training [100%]	Loss: 0.0007	Avg. Loss: 0.0215
Performance on test data:
	Loss: 0.0055
	Accuracy: 99.5%
	MCC: 1.0%
	Accuracy 0: 100.0%
	Accuracy 1: 99.4%
		rep: 5
Training [100%]	Loss: 0.0033	Avg. Loss: 0.0230
Performance on test data:
	Loss: 0.0017
	Accuracy:

In [10]:
from IPython.display import Markdown, display, Latex

table = '| class | avg. TP | std. TP | avg. TN | std. TN | avg. FP | std. FP | avg. FN | std. FN | avg. acc | std. acc | avg. F1 | std. F1 | avg. MCC | std. MCC |\n'
table += '|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|\n'
for class_label in range(0, 10):
    result = results[class_label]
    avg_total_loss = np.mean(result[0])
    std_total_loss = np.std(result[0])

    avg_tp = np.mean(result[1])
    std_tp = np.std(result[1])

    avg_tn = np.mean(result[2])
    std_tn = np.std(result[2])

    avg_acc = np.mean(result[3])
    std_acc = np.std(result[3])

    avg_fp = np.mean(result[4])
    std_fp = np.std(result[4])

    avg_fn = np.mean(result[5])
    std_fn = np.std(result[5])

    avg_f1 = np.mean(result[6])
    std_f1 = np.std(result[6])

    avg_mcc = np.mean(result[7])
    std_mcc = np.std(result[7])

    table += f'| {class_label} | {avg_tp} | {std_tp} | {avg_tn} | {std_tn} | {avg_fp} | {std_fp} | {avg_fn} | {std_fn} | {round(avg_acc, 4)} | {round(std_acc, 4)} | {round(avg_f1, 4)} | {round(std_f1, 4)} | {round(avg_mcc, 4)} | {round(std_mcc, 4)} |\n'

display(Markdown(table))

| class | avg. TP | std. TP | avg. TN | std. TN | avg. FP | std. FP | avg. FN | std. FN | avg. acc | std. acc | avg. F1 | std. F1 | avg. MCC | std. MCC |
|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|
| 0 | 20.0 | 0.0 | 178.2 | 0.7483314773547882 | 0.8 | 0.7483314773547882 | 0.0 | 0.0 | 0.996 | 0.0038 | 0.9807 | 0.0179 | 0.9789 | 0.0195 |
| 1 | 17.1 | 0.3 | 169.7 | 5.235456045083369 | 9.3 | 5.23545604508337 | 2.9 | 0.3 | 0.9387 | 0.0257 | 0.7462 | 0.0803 | 0.7238 | 0.0831 |
| 2 | 18.7 | 0.45825756949558394 | 163.4 | 7.49933330370107 | 15.6 | 7.4993333037010705 | 1.3 | 0.45825756949558394 | 0.9151 | 0.0367 | 0.7006 | 0.0851 | 0.6877 | 0.081 |
| 3 | 16.3 | 0.6403124237432849 | 167.1 | 5.80430874437258 | 11.9 | 5.80430874437258 | 3.7 | 0.6403124237432849 | 0.9216 | 0.0265 | 0.6845 | 0.0626 | 0.6565 | 0.0626 |
| 4 | 19.0 | 0.6324555320336759 | 174.2 | 1.4000000000000001 | 4.8 | 1.4000000000000001 | 1.0 | 0.6324555320336759 | 0.9709 | 0.0054 | 0.8681 | 0.0207 | 0.8566 | 0.0214 |
| 5 | 18.8 | 1.077032961426901 | 166.7 | 5.657738063926255 | 12.3 | 5.657738063926255 | 1.2 | 1.0770329614269007 | 0.9322 | 0.0252 | 0.7436 | 0.0665 | 0.7321 | 0.0632 |
| 6 | 19.8 | 0.39999999999999997 | 174.3 | 4.001249804748511 | 4.7 | 4.001249804748512 | 0.2 | 0.4000000000000001 | 0.9754 | 0.0193 | 0.8965 | 0.0711 | 0.891 | 0.0719 |
| 7 | 16.6 | 1.2 | 173.9 | 3.6455452267116364 | 5.1 | 3.6455452267116364 | 3.4 | 1.2 | 0.9573 | 0.0149 | 0.8003 | 0.0505 | 0.7824 | 0.0546 |
| 8 | 13.3 | 0.6403124237432849 | 161.3 | 7.642643521714199 | 18.7 | 7.642643521714199 | 5.7 | 0.6403124237432849 | 0.8774 | 0.0357 | 0.5324 | 0.0693 | 0.4904 | 0.0706 |
| 9 | 15.8 | 1.7204650534085253 | 157.2 | 8.749857141690942 | 21.8 | 8.749857141690942 | 4.2 | 1.7204650534085253 | 0.8693 | 0.038 | 0.5563 | 0.0533 | 0.5234 | 0.0508 |
