In [20]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import sys
import os

from fvgp import GP
from fvgp.gp_kernels import exponential_kernel

from sklearn.datasets import load_digits
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from sklearn.utils import shuffle
from sklearn.decomposition import PCA

from scipy.stats import wasserstein_distance
from scipy.stats import norm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset


In [2]:
# 1. Load and Preprocess the Digits Dataset
digits = load_digits()
X, y = digits.data, digits.target

# Split into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.1, random_state=42, stratify=y
)

# Normalize the data to resemble probability distributions
for i in range(len(X_train)):
    X_train[i] = (X_train[i] - np.min(X_train[i])) + 1e-8
    X_train[i] = X_train[i] / np.sum(X_train[i])

for i in range(len(X_test)):
    X_test[i] = (X_test[i] - np.min(X_test[i])) + 1e-8
    X_test[i] = X_test[i] / np.sum(X_test[i])


In [5]:
# 2. Compute PCA Directions Covering 90% of Variance and Random Directions
pca = PCA(n_components=0.90)
pca.fit(X_train)

# Get PCA directions
pca_directions = pca.components_  # Shape: (n_pca_directions, n_features)
num_pca_directions = pca_directions.shape[0]

# Generate the same number of random directions
np.random.seed(42)  # For reproducibility
random_directions = np.random.randn(num_pca_directions, X_train.shape[1])
random_directions /= np.linalg.norm(random_directions, axis=1, keepdims=True)

# Combine PCA and random directions
directions = np.vstack((pca_directions, random_directions))  # Shape: (2*num_pca_directions, n_features)
num_directions = directions.shape[0]

print(f"Number of PCA directions: {num_pca_directions}")
print(f"Total number of directions (PCA + random): {num_directions}")


Number of PCA directions: 20
Total number of directions (PCA + random): 40


In [6]:
# 3. Project the Data onto the Directions
X_train_proj = X_train.dot(directions.T)  # Shape: (n_train_samples, num_directions)
X_test_proj = X_test.dot(directions.T)    # Shape: (n_test_samples, num_directions)

In [8]:
# 4. Compute Pairwise Sliced Wasserstein Distances and Cache Them
def compute_sliced_wasserstein_matrix(X_proj1, X_proj2):
    n1 = X_proj1.shape[0]
    n2 = X_proj2.shape[0]
    distance_matrix = np.zeros((n1, n2))
    
    for d in range(num_directions):
        print(f"Processing direction {d+1}/{num_directions}", end='\r')
        X1_proj_d = X_proj1[:, d]
        X2_proj_d = X_proj2[:, d]
        
        # Compute pairwise Wasserstein distances for this direction
        dist_matrix_d = np.array([
            [wasserstein_distance([X1_proj_d[i]], [X2_proj_d[j]]) for j in range(n2)] 
            for i in range(n1)
        ])
        
        distance_matrix += dist_matrix_d
    
    # Average over all directions
    distance_matrix /= num_directions
    return distance_matrix

In [9]:
# Compute and cache the distance matrices
print("Computing sliced Wasserstein distance matrices...")
distance_matrix_train_train = compute_sliced_wasserstein_matrix(X_train_proj, X_train_proj)
distance_matrix_train_test = compute_sliced_wasserstein_matrix(X_train_proj, X_test_proj)
distance_matrix_test_test = compute_sliced_wasserstein_matrix(X_test_proj, X_test_proj)
print("\nSliced Wasserstein distance matrices computed.")


Computing sliced Wasserstein distance matrices...
Processing direction 40/40
Sliced Wasserstein distance matrices computed.


In [10]:
# 5. Define the GP Kernel Function Using Precomputed Distance Matrices
def SW_kernel(X1, X2, hyperparameters):
    length_scale = hyperparameters[0]
    n_train = X_train.shape[0]
    n_test = X_test.shape[0]
    if len(X1) == n_train and len(X2) == n_train:
        K = exponential_kernel(distance_matrix_train_train, length_scale)
    elif len(X1) == n_test and len(X2) == n_test:
        K = exponential_kernel(distance_matrix_test_test, length_scale)
    elif len(X1) == n_train and len(X2) == n_test:
        K = exponential_kernel(distance_matrix_train_test, length_scale)
    elif len(X1) == n_test and len(X2) == n_train:
        K = exponential_kernel(distance_matrix_train_test.T, length_scale)
    else:
        # For any other cases, compute the distances on-the-fly
        X1_proj = X1.dot(directions.T)
        X2_proj = X2.dot(directions.T)
        distance_matrix = compute_sliced_wasserstein_matrix(X1_proj, X2_proj)
        K = exponential_kernel(distance_matrix, length_scale)
    return K

In [11]:
# 6. Initialize Hyperparameters and Bounds
initial_length_scale = 1.0  # Initial guess for length scale
init_hyperparameters = np.array([initial_length_scale])

# Define bounds for the length scale (e.g., between 0.1 and 10)
length_scale_bounds = np.array([[0.1, 10.0]])

In [12]:
# 7. Train GP Models Using One-vs-Rest Strategy
gp_models = []
num_classes = 10  # Digits 0-9

print("Training GP models...")
for class_label in range(num_classes):
    print(f"Training GP model for class {class_label}...")
    
    # Binary labels for the current class
    y_train_binary = (y_train == class_label).astype(float)
    
    # Initialize GP model
    gp_model = GP(
        X_train,
        y_train_binary,
        init_hyperparameters=init_hyperparameters,
        gp_kernel_function=SW_kernel,
        noise_variances=np.zeros(len(y_train_binary)) + 1e-6  # Noise variance
    )

    # Train the GP model using MCMC
    gp_model.train(
        hyperparameter_bounds=length_scale_bounds,
        method='mcmc',
        max_iter=1000,
        tolerance=1e-3,
    )

    gp_models.append(gp_model)
    print(f"GP model for class {class_label} trained.\n")

print("All GP models trained.")

Training GP models...
Training GP model for class 0...


  metr_ratio = np.exp(prior_star + likelihood_star - prior - likelihood)


GP model for class 0 trained.

Training GP model for class 1...
GP model for class 1 trained.

Training GP model for class 2...
GP model for class 2 trained.

Training GP model for class 3...
GP model for class 3 trained.

Training GP model for class 4...
GP model for class 4 trained.

Training GP model for class 5...
GP model for class 5 trained.

Training GP model for class 6...
GP model for class 6 trained.

Training GP model for class 7...
GP model for class 7 trained.

Training GP model for class 8...
GP model for class 8 trained.

Training GP model for class 9...
GP model for class 9 trained.

All GP models trained.


In [13]:
# 8. Define the Probit Link Function (Prefer over Logit, Gaussian Assumptions)
def probit(mu, sigma2):
    # Applies the probit function with variance adjustment.
    adjusted_mu = mu / np.sqrt(1 + sigma2)
    return norm.cdf(adjusted_mu)

In [16]:
# 8. Predict Probabilities Using the Trained GP Models
def predict_probs(X_test, gp_models):
    num_classes = len(gp_models)
    n_test = X_test.shape[0]
    
    # Initialize arrays to store means and variances
    means = np.zeros((n_test, num_classes))
    variances = np.zeros((n_test, num_classes))
    
    for class_label, gp_model in enumerate(gp_models):
        # Compute the posterior mean for the test data
        posterior_mean = gp_model.posterior_mean(X_test)
        mean = posterior_mean["f(x)"]  # Extract mean predictions
        means[:, class_label] = mean.flatten()
        
        # Compute the posterior variance for the test data
        posterior_cov = gp_model.posterior_covariance(X_test, variance_only=True)
        variance = posterior_cov["v(x)"]  # Extract variances
        variances[:, class_label] = variance.flatten()
    
    # Apply probit with variance to convert means and variances to probabilities
    probabilities = probit(means, variances)
    return probabilities

In [22]:
# 9. Predict Class Labels and Evaluate the Classifier
probabilities = predict_probs(X_test, gp_models)

y_pred = np.argmax(probabilities, axis=1)

accuracy = accuracy_score(y_test, y_pred) * 100
print(f'\nAccuracy: {accuracy:.0f}%')
print('Classification Report:')
print(classification_report(y_test, y_pred))


Accuracy: 99%
Classification Report:
              precision    recall  f1-score   support

           0       1.00      1.00      1.00        18
           1       0.90      1.00      0.95        18
           2       1.00      1.00      1.00        18
           3       1.00      1.00      1.00        18
           4       1.00      1.00      1.00        18
           5       1.00      1.00      1.00        18
           6       1.00      1.00      1.00        18
           7       1.00      1.00      1.00        18
           8       1.00      0.89      0.94        18
           9       1.00      1.00      1.00        18

    accuracy                           0.99       180
   macro avg       0.99      0.99      0.99       180
weighted avg       0.99      0.99      0.99       180



In [23]:
def save_matrix(matrix, filename):
    np.save(filename, matrix)
    print(f"Saved {filename}")

output_dir = "distance_matrices"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

save_matrix(distance_matrix_train_train, os.path.join(output_dir, "distance_matrix_train_train.npy"))
save_matrix(distance_matrix_train_test, os.path.join(output_dir, "distance_matrix_train_test.npy"))
save_matrix(distance_matrix_test_test, os.path.join(output_dir, "distance_matrix_test_test.npy"))

Saved distance_matrices/distance_matrix_train_train.npy
Saved distance_matrices/distance_matrix_train_test.npy
Saved distance_matrices/distance_matrix_test_test.npy
