In [1]:
import os
import pandas as pd
import numpy as np
from tqdm import tqdm
import time
import matplotlib.pyplot as plt

import scipy.stats
from sklearn.decomposition import FastICA, PCA
from sklearn.manifold import LocallyLinearEmbedding
from sklearn.random_projection import GaussianRandomProjection
from sklearn.neighbors import KNeighborsClassifier

In [2]:
from data_helpers.wine_quality_data_helper import load_wine_quality_data
from data_helpers.mnist_data_helper import load_mnist_data

2024-03-25 14:17:30.060229: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
class ICAWrapper:
    NAME = 'ICA'
    def __init__(self, n_components) -> None:
        self.model = FastICA(n_components=n_components, max_iter=1000)
    
    def fit(self, X):
        return self.model.fit(X)
    
    def fit_transform(self, X):
        return self.model.fit_transform(X)
    
    def transform(self, X):
        return self.model.transform(X)
    
    def get_kurtosis(self, X):
        X_star = self.model.fit_transform(X)
        # k = scipy.stats.kurtosis(X)
        # k = k[k != np.inf]
        # k = k[~np.isnan(k)]
        kurtosis = scipy.stats.kurtosis(X_star).mean()
        return kurtosis

class PCAWrapper:
    NAME = 'PCA'
    def __init__(self, n_components) -> None:
        self.model = PCA(n_components=n_components)

    def fit(self, X):
        return self.model.fit(X)

    def fit_transform(self, X):
        return self.model.fit_transform(X)

    def transform(self, X):
        return self.model.transform(X)

    def get_kurtosis(self, X):
        X_star = self.model.fit_transform(X)
        kurtosis = scipy.stats.kurtosis(X_star).mean()
        return kurtosis

class RPWrapper:
    NAME = 'RP'
    def __init__(self, n_components) -> None:
        self.model = GaussianRandomProjection(n_components=n_components)
    
    def fit(self, X):
        return self.model.fit(X)
    
    def fit_transform(self, X):
        return self.model.fit_transform(X)

    def transform(self, X):
        return self.model.transform(X)
    
    def get_kurtosis(self, X):
        X_star = self.model.fit_transform(X)
        kurtosis = scipy.stats.kurtosis(X_star).mean()
        return kurtosis
    
    def get_reconstruction_error(self, X):
        X_star = self.model.fit_transform(X)
        X_reconstructed = self.model.inverse_transform(X_star)
        return np.mean(np.square(X - X_reconstructed))
    
class LLEWrapper:
    NAME = 'LLE'
    def __init__(self, n_components) -> None:
        self.model = LocallyLinearEmbedding(n_components=n_components, n_neighbors=10)
    
    def fit(self, X):
        return self.model.fit(X)
    
    def fit_transform(self, X):
        return self.model.fit_transform(X)
    
    def transform(self, X):
        return self.model.transform(X)
    
    def get_kurtosis(self, X):
        X_star = self.model.fit_transform(X)
        kurtosis = scipy.stats.kurtosis(X_star).mean()
        return kurtosis
    
    def get_reconstruction_error(self):
        return self.model.reconstruction_error_

In [4]:
DATASET_NAME = 'wine_quality'
# DATASET_NAME = 'mnist'

# METHOD = RPWrapper
# METHOD = ICAWrapper
METHOD = LLEWrapper
K_FOLDS = 5
SAVE = True

In [5]:
if DATASET_NAME == 'wine_quality':
    DATASET_STR = 'Wine Quality'
    X, y, _, _ = load_wine_quality_data()
elif DATASET_NAME == 'mnist':
    DATASET_STR = 'MNIST'
    X, y, _, _ = load_mnist_data()
else:
    raise ValueError(f'Invalid dataset name {DATASET_NAME}')

# n = 5000
# X = X[:n]
# y = y[:n]

In [6]:
# if METHOD == LLEWrapper:
#     model = LocallyLinearEmbedding(n_components=2, n_neighbors=10)
#     X_star = model.fit_transform(X)
#     plt.title(f'Locally Linear Embedding - {DATASET_STR} - 2 Components')
#     for label in np.unique(y):
#         X_label = X_star[y == label]
#         plt.scatter(X_label[:, 0], X_label[:, 1], label=label)
#     plt.ylabel('Component 2')
#     plt.xlabel('Component 1')
#     plt.savefig(f"figures/{DATASET_NAME}_LLE_2D.png")

In [7]:
df_path = f'results/{DATASET_NAME}/{METHOD.NAME}_metrics.csv'
if os.path.exists(df_path):
    df = pd.read_csv(df_path)
    df.set_index('num_components', inplace=True)
    df['num_components'] = df.index
else:
    df = pd.DataFrame()

In [8]:
n = X.shape[0] // K_FOLDS

for components in tqdm(range(2, 12)):
    print(components)
    training_time_list = []
    evaluation_time_list = []
    kurtosis_list = []
    reconstruction_error_list = []
    acc_list = []
    for i in range(K_FOLDS):
        X_train = np.concatenate([X[:i * n], X[(i + 1) * n:]])
        y_train = np.concatenate([y[:i * n], y[(i + 1) * n:]])
        X_test = X[i * n:(i + 1) * n]
        y_test = y[i * n:(i + 1) * n]

        model = METHOD(components)
        t0 = time.perf_counter()
        X_train_star = model.fit_transform(X_train)
        t1 = time.perf_counter()
        training_time_list.append(t1 - t0)

        t0 = time.perf_counter()
        X_test_star = model.transform(X_test) 
        t1 = time.perf_counter()
        evaluation_time_list.append(t1 - t0)

        knn = KNeighborsClassifier(n_neighbors=3)
        knn.fit(model.transform(X_train), y_train)
        acc_knn = knn.score(model.transform(X_test), y_test)
        acc_list.append(acc_knn)

        if METHOD == ICAWrapper:
            kurtosis = model.get_kurtosis(X_test)
            kurtosis_list.append(kurtosis)

        if METHOD == RPWrapper:
            error = model.get_reconstruction_error(X_test)
            reconstruction_error_list.append(error)

        if METHOD == LLEWrapper:
            error = model.get_reconstruction_error()
            reconstruction_error_list.append(error)

    df.loc[components, 'num_components'] = components
    df.loc[components, 'training_time_mean'] = np.mean(training_time_list)
    df.loc[components, 'training_time_std'] = np.std(training_time_list)
    df.loc[components, 'evaluation_time_mean'] = np.mean(evaluation_time_list)
    df.loc[components, 'evaluation_time_std'] = np.std(evaluation_time_list)
    df.loc[components, 'accuracy_mean'] = np.mean(acc_list)
    df.loc[components, 'accuracy_std'] = np.std(acc_list)
    if METHOD == RPWrapper or METHOD == LLEWrapper:
        df.loc[components, 'reconstruction_error_mean'] = np.mean(reconstruction_error_list)
        df.loc[components, 'reconstruction_error_std'] = np.std(reconstruction_error_list)
    if METHOD == ICAWrapper:
        df.loc[components, 'kurtosis_mean'] = np.mean(kurtosis_list)
        df.loc[components, 'kurtosis_std'] = np.std(kurtosis_list)

  0%|          | 0/10 [00:00<?, ?it/s]

2


 10%|█         | 1/10 [00:10<01:30, 10.08s/it]

3


 20%|██        | 2/10 [00:19<01:15,  9.44s/it]

4


 30%|███       | 3/10 [00:28<01:07,  9.64s/it]

5


 40%|████      | 4/10 [00:38<00:58,  9.78s/it]

6


 50%|█████     | 5/10 [00:48<00:48,  9.74s/it]

7


 60%|██████    | 6/10 [00:57<00:37,  9.50s/it]

8


 70%|███████   | 7/10 [01:07<00:28,  9.46s/it]

9


 80%|████████  | 8/10 [02:06<00:50, 25.26s/it]

10


 90%|█████████ | 9/10 [03:06<00:36, 36.24s/it]

11


100%|██████████| 10/10 [04:06<00:00, 24.65s/it]


In [9]:
if SAVE:
    os.makedirs(os.path.dirname(df_path), exist_ok=True)
    df.to_csv(df_path, index=False)