In [None]:
!pip install perceval-quandela -q

In [None]:
import math
import numpy as np
from ast import literal_eval
import matplotlib.pyplot as plt
import pandas as pd
import perceval as pcvl
from perceval.components import BS, PS

### Problem Statement: CP for A*u + noise

We are studying with a noisy matrix-vector product:

$$
y = A \cdot u + \text{noise}(A, u)
$$

Where:
- $ A $ is a matrix.
- $ u $ is a vector.
- The term $ \text{noise}(A, u) $ introduces variability into the outcome.

In the context of **split conformal prediction**, we are looking to create a prediction interval for $ y $, given the model $ y = A \cdot u $ and noise. The goal is to predict $ y $ while accounting for the uncertainty introduced by the noise.

**Does split conformal prediction in this specific case keep coverage guarantee?**


1. **Fixed Predictor**:  
   The function $ f(A, u) $ that maps $ (A, u) $ to $ y $ is fixed in the sense that, given a pair $ (A, u) $, we know exactly what the model should output (if there were no noise). The variability comes from the noise term.

   $$
   y = A \cdot u + \text{noise}(A, u)
   $$

2. **Calibration**:  
   Since the fundamental idea of split conformal prediction is to calibrate the non-conformity score based on the calibration set:
   - First, we compute the residuals (the difference between the predicted and true values) on the calibration set, i.e., the difference between $ A \cdot u $ and the noisy $ y $.
   - Then, we calculate a threshold for the non-conformity score.

3. **Residuals**:  
   In the case of the noisy matrix-vector product $ y = A \cdot u + \text{noise}(A, u) $, the residuals are the differences between the predicted values and the actual noisy observations.

   - **Predicted value**: $ \hat{y} = A \cdot u $ (without the noise).
   - **Observed value**: $ y = A \cdot u + \text{noise}(A, u) $.

   Thus, the residual for a given pair $ (A, u) $ is:

   $$
   \text{residual} = |y - \hat{y}| = |(A \cdot u + \text{noise}(A, u)) - A \cdot u| = |\text{noise}(A, u)|
   $$

   $$
   \text{residual} = |\text{noise}(A, u)|
   $$

   So the residuals represent the noise added to the matrix-vector product, which we want to capture when calibrating the model for conformal prediction.

### Types of Noise

In this problem, we are using two types of noise to explore how they affect the split conformal prediction procedure:

1. **i.i.d. Uniform Noise**:  
   This is a standard type of noise, where each noise term for every element in the matrix-vector product is drawn independently from an Uniform distribution, therefore we would keep exchangeability among residuals.

2. **Non-exchangeable Noise**:  
   In this case, the noise increases linearly with the index. For instance, the noise term for each element in the matrix-vector product grows as the index increases, i.e., the noise at index $ i $ might be proportional to $ i $. This is not exchangeable noise, as the noise magnitude depends on the position in the vector or matrix. (Not implemented in this version)



## Conformal Inference

### Parameters

In [None]:
# Generating 100 samples of A (4x4 matrices) and u (4x1 vectors)
num_samples = 100
dim = 4

#np.random.seed(0)

# Adding noise to the y_true values using the add_noise function
noise_type = 'iid'  # You can change to 'nonexchangeable_trend' if needed

Genearating Unitary Matrices
- https://perceval.quandela.net/docs/v0.11/circuits.html#unitary-matrices
- https://perceval.quandela.net/docs/v0.11/reference/utils.html#perceval.utils.matrix.Matrix.random_unitary

#### Perceval example of photonic chip, unnecessary

In [None]:
#####################################################
### VECTOR MATRIX MULTIPLICATION PHOTONIC EXAMPLE ###
#####################################################

# Define a 2x2 matrix (for simplicity)
matrix = np.array([[1, 2],
                   [3, 4]])

# Create a circuit for 2 modes (2x2 matrix will operate on a 2-dimensional quantum state)
circuit = pcvl.Circuit(2)

# Apply phase shifts and beam splitters to simulate a unitary matrix transformation
circuit.add((0, 1), pcvl.BS())  # Apply a beam splitter between modes 0 and 1
circuit.add(0, pcvl.PS(np.pi / 4))  # Apply a phase shift on mode 0
circuit.add(1, pcvl.PS(np.pi / 2))  # Apply a phase shift on mode 1

# Define the input quantum state
input_state = pcvl.BasicState([1, 0])  # Represents the state |0> on mode 0 and |1> on mode 1

# Initialize the backend (SLOS or another appropriate backend)
simulator = pcvl.BackendFactory().get_backend('SLOS')

# Set the circuit in the simulator
simulator.set_circuit(circuit)

# Set the input state in the simulator
simulator.set_input_state(input_state)

# Perform the matrix-vector multiplication by running the circuit
output_distribution = simulator.prob_distribution()  # No input state argument

# Print the results (this is the equivalent of the result of matrix-vector multiplication)
print("Input state:", input_state)
print("Output distribution:")
for output_state, probability in output_distribution.items():
    print(f"{output_state}: {probability:.4f}")

Input state: |1,0>
Output distribution:
|1,0>: 0.5000
|0,1>: 0.5000


### Real data from Quix chip

In [None]:
quix_data = pd.read_csv('/content/drive/MyDrive/Measurment.csv')[["Expected","Measured"]]
quix_data_shuffled = quix_data.sample(frac=1).reset_index(drop=True)

# Convert string representations of lists into actual lists
quix_data_shuffled['Expected'] = quix_data_shuffled['Expected'].apply(literal_eval)
quix_data_shuffled['Measured'] = quix_data_shuffled['Measured'].apply(literal_eval)

quix_data_shuffled.head(5)

Unnamed: 0,Expected,Measured
0,"[0.056, 2.584]","[-0.017, 2.484]"
1,"[1.111, 1.84]","[1.002, 1.574]"
2,"[0.472, 6.187]","[0.042, 2.441]"
3,"[1.242, 1.205]","[1.246, 1.274]"
4,"[0.265, 1.179]","[2.387, 0.279]"


In [None]:
len(quix_data_shuffled)

400

### Data generation

In [None]:
def add_noise(y_true, noise_type):
    if noise_type == 'iid':
        noise = np.random.uniform(0, 1, size=y_true.shape)  # iid uniform noise
    #elif noise_type == 'nonexchangeable_trend':
    #    noise = 100 * (np.arange(len(y_true))**2) + np.random.normal(0, 0.5, size=y_true.shape)  # Increasing noise with trend component
    return y_true + noise

In [None]:
def generate_unitary_matrices_data(num_samples, dim, noise_type=None):
    """
    Generates a dataset of unitary matrices and input/output vectors.

    Args:
        num_samples (int): Number of samples to generate.
        dim (int): Dimension of the unitary matrices.
        noise_type (str or None): Type of noise to apply. If None, no noise is added.

    Returns:
        list: A list of tuples (y_noisy, y_true), where:
            - y_noisy is the output vector with noise.
            - y_true is the noise-free output vector.
    """
    # Generate unitary matrices
    A_samples = [pcvl.Matrix.random_unitary(dim) for _ in range(num_samples)]

    # Generate random input vectors
    u_samples = np.random.uniform(0, 1, (num_samples, dim))
    u_samples = u_samples / np.linalg.norm(u_samples, axis=1, keepdims=True)  # Normalize

    # Compute output vectors (y_true = A * u)
    y_true_samples = np.array([np.dot(A, u) for A, u in zip(A_samples, u_samples)])

    # Add noise if specified
    if noise_type:
        y_noisy_samples = np.array([add_noise(y_true, noise_type) for y_true in y_true_samples])
    else:
        y_noisy_samples = y_true_samples

    # Create the dataset as a list of tuples
    dataset = list(zip(y_noisy_samples, y_true_samples))
    return dataset

In [None]:
def generate_data(num_samples, dim, noise_type):
    # Generate random matrices A and vectors u
    A_samples = np.random.uniform(0, 1, (num_samples, dim, dim))  # 100 matrices of size 4x4
    u_samples = np.random.uniform(0, 1, (num_samples, dim))       # 100 vectors  of size 4x1

    # Generating y_true values for each sample (y_true = A * u)
    y_true_samples  = np.array([np.dot(A, u) for A, u in zip(A_samples, u_samples)])
    y_noisy_samples = np.array([add_noise(y_true, noise_type) for y_true in y_true_samples])

    dataset = list(zip(y_noisy_samples, y_true_samples))

    return dataset

In [None]:
# Generate set with 100 noisy output vectors and 100 non-noisy output vectors
#dataset = generate_data(100, 4, 'iid')
dataset = generate_unitary_matrices_data(100, 4, 'iid') #'iid' or None

# Extract first sample
noisy, true = dataset[0]

# Print dataset size and shapes of components
print(f"Number of samples in dataset: {len(dataset)}")
print(f"Shape of noisy vectors: {noisy.shape}")
print(f"Shape of non-noisy vectors): {true.shape}")

Number of samples in dataset: 100
Shape of noisy vectors: (4,)
Shape of non-noisy vectors): (4,)


### Splitting of data & Model training

In [None]:
np.random.shuffle(dataset)

train_data = dataset[:int(0.7*len(dataset))]                             # 70% for model training
calibration_data = dataset[int(0.7*len(dataset)):int(0.9*len(dataset))]  # 20% for conformal calibration
test_data = dataset[int(0.9*len(dataset)):]                              # 10% for conformal testing

X_train_noisy, Y_train_true = zip(*train_data)
X_calibration_noisy, Y_calibration_true = zip(*calibration_data)
X_test_noisy, Y_test_true = zip(*test_data)

# Tuples to arrays
X_train_noisy, Y_train_true = np.array(X_train_noisy), np.array(Y_train_true)
X_calibration_noisy, Y_calibration_true = np.array(X_calibration_noisy), np.array(Y_calibration_true)
X_test_noisy, Y_test_true = np.array(X_test_noisy), np.array(Y_test_true)

In [None]:
# Train a model
model_coef = np.linalg.lstsq(X_train_noisy, Y_train_true, rcond=None)[0]  # Least squares solution

#### Splitting of Quix data

In [None]:
calibration_data_quix = quix_data_shuffled[:int(0.5*len(quix_data_shuffled))]    # 50% for conformal calibration
test_data_quix = quix_data_shuffled[int(0.5*len(quix_data_shuffled)):]           # 50% for conformal testing

X_calibration_noisy = np.array(calibration_data_quix["Measured"].tolist())
Y_calibration_true = np.array(calibration_data_quix["Expected"].tolist())

X_test_noisy = np.array(test_data_quix["Measured"].tolist())
Y_test_true = np.array(test_data_quix["Expected"].tolist())

### Split Conformal Prediction

In [None]:
def split_conformal_prediction_with_test(X_calib, model_coef, Y_calib, alpha_value):

    y_pred_calib = X_calib#np.dot(X_calib, model_coef)

    # Compute residuals
    residuals = np.abs(Y_calib - y_pred_calib)

    ### As an approximation will take the norm among the 4 dimensions
    residuals_reduced = np.linalg.norm(residuals, axis=1)

    # Quantile 1-alpha
    ### Is this correct or do I have to have 1 threshold per dimension?
    #q_alpha = np.percentile(residuals, 100 * (1 - alpha_cp))

    q_alpha = np.percentile(residuals_reduced, 100 * (1 - alpha_cp))

    return q_alpha

In [None]:
# Empirical coverage and average prediction interval size
def compute_coverage_and_interval_size(X_test, model_coef, Y_test, q_alpha):

    y_pred = X_test#np.dot(X_test, model_coef)

    lower_bound = y_pred - q_alpha
    upper_bound = y_pred + q_alpha

    residuals_test_set = np.abs(Y_test - y_pred)
    residuals_test_set = np.linalg.norm(residuals_test_set, axis=1)

    pred_test_set = np.abs(Y_test)
    pred_norm_test_set = np.linalg.norm(pred_test_set, axis=1)

    # Empirical coverage: percentage of true values within the prediction intervals
    coverage = np.mean((Y_test >= lower_bound) & (Y_test <= upper_bound)) * 100 # Element-wise comparation

    # Average prediction interval size
    avg_interval_size = np.mean(upper_bound - lower_bound)

    return coverage, avg_interval_size, lower_bound, upper_bound, q_alpha, y_pred, residuals_test_set, pred_norm_test_set

### Permutation test (over labels/measurements)

Permutation test asseses the only condition for CP, exchangeability of data(labels) to check for indepence among observations, ie that there are no underlying dependecies in the data (DeFinetti...)

In [None]:
N = len(X_test_noisy)
max_permutations = math.factorial(N)

print(N, max_permutations)

200 788657867364790503552363213932185062295135977687173263294742533244359449963403342920304284011984623904177212138919638830257642790242637105061926624952829931113462857270763317237396988943922445621451664240254033291864131227428294853277524242407573903240321257405579568660226031904170324062351700858796178922222789623703897374720000000000000000000000000000000000000000000000000


In [None]:
Y_pred = X_test_noisy
Y_meas = Y_test_true

# Computing the residual as the diff between vectors and taking the norm as unique parameter/statistic
def mean_euclidean_distance(A, B):
    return np.mean(np.linalg.norm(A - B, axis=1))

# Observed distance/residual in initial data
D_obs = mean_euclidean_distance(Y_pred, Y_meas)

# Permutation test
num_permutations = 10000
D_perm = []

for _ in range(num_permutations):
    permuted = Y_meas.copy()
    np.random.shuffle(permuted)
    D_perm.append(mean_euclidean_distance(Y_pred, permuted))

D_perm = np.array(D_perm)

# p-value
p_value = np.mean(D_perm >= D_obs)

# Result
print(f"Observed distance/residual: {D_obs:.4f}")
print(f"p-value: {p_value:.4f}")

if p_value < 0.05:
    print("We reject the null hypothesis of Exchangeability.")
else:
    print("No sufficient evidence to reject Exchangeability.")

Observed distance/residual: 1.1455
p-value: 1.0000
No sufficient evidence to reject Exchangeability.


# Conformal Prediction Results
* $\text{Coverage (empirical):} \,  \frac{1}{n} \sum_{i \in \text{Test}} \mathbb{I}\left(y^*_i  \in \left[\hat{y}_i + q_{1-\alpha}, \hat{y}_i - q_{1-\alpha}\right]\right) \cdot 100\%$

* $\text{Average Size:} \, \frac{1}{n}\sum_{i \in \text{Test}} \left( \text{Upper Bound}_i - \text{Lower Bound}_i \right)$

* $\text{Expected Relative Residual:} \, \mathbb{E}\left[\frac{\|y_i - \hat{y}_i\|}{\|\hat{y}_i\|}\right] = \frac{1}{n}\sum_{i \in \text{Test}}\frac{\text{residuals}_i}{\text{pred_norm}_i}$

* $\text{Expected Relative Interval Size:} \, \mathbb{E}\left[\frac{\sqrt{2} \cdot q_{1-\alpha}}{\|\hat{y}_i\|}\right] = \frac{1}{n}\sum_{i \in \text{Test}}\frac{\sqrt{2} \cdot q_{1-\alpha}}{\text{pred_norm}_i}$

check comparing with ground truth Y_test

In [None]:
# Desired alpha/error level for Conformal Prediction
alpha_cp   = 0.1

q_alpha_iid = split_conformal_prediction_with_test(X_calib = X_calibration_noisy, model_coef = model_coef, Y_calib = Y_calibration_true, alpha_value=alpha_cp)
coverage_iid, avg_size_iid, lower_bound_iid, upper_bound_iid, q_alpha, y_pred_idd, residuals_test_set, pred_norm_test_set = compute_coverage_and_interval_size(X_test_noisy, model_coef, Y_test_true, q_alpha_iid)

max_abs_diff = np.max(np.abs(Y_test_true[:, 0] - Y_test_true[:, 1]))
print(f' Coverage marginal: {coverage_iid}\n Coverage (empirical) guarantee satisfied?: {coverage_iid >= (1-alpha_cp)*100}\n Avg Interval Size: {(np.abs(avg_size_iid))} \n Expected Relative Residual: {(np.mean(residuals_test_set/pred_norm_test_set))} \n Expected Relative Interval Size: {(np.mean((np.sqrt(2)*q_alpha)/pred_norm_test_set))}')

 Coverage marginal: 95.5
 Coverage (empirical) guarantee satisfied?: True
 Avg Interval Size: 5.117969851381282 
 Expected Relative Residual: 0.4721472064412991 
 Expected Relative Interval Size: 1.649903367993022
