# Goal

To evaluate the usefulness of dimensionality reduction methods for digit classification. The methods to be evaluated are:

- Nothing (use all dimensions)
- Principal Components Analysis (PCA)
- Multidimensional Scaling (MDS)
- Kernel PCA with radial basis functions (Laplacian and Gaussian)
- Isomap
- Locally Linear Embedding (LLE)
- Laplacian Eigenmaps (Spectral Embedding)
- Hessian Eigenmaps
- Local Tangent Space Alignment (LTSA)
- Diffusion maps
- Autoencoder
- t-SNE (maybe)

I'll use a sample of 1000 images from the MNIST dataset with all 10 classes. To compare the dimension reduction methods, I'll project down to dimensions that are powers of two, arranged by order of magnitude: 2, 64, and 256. The highest dimension (256) will therefore represent approximately 33% of the original dimensions (784) and the lowest dimension (2) will represent approximately 0.26% of the original dimensions.

In [6]:
import numpy as np
import plotly.express as px
import plotly.graph_objects as go

from sklearn.decomposition import PCA, KernelPCA
from sklearn.manifold import MDS, LocallyLinearEmbedding, Isomap, SpectralEmbedding, TSNE
from sklearn.metrics.pairwise import euclidean_distances
from sklearn.metrics import classification_report

from scipy.linalg import eigh

from kymatio.keras import Scattering2D
import pyshearlab as ps

from keras.layers import Input, Dense, Flatten
from keras.models import Model, Sequential
from keras.datasets import mnist
from keras.optimizers import SGD
from keras.utils import to_categorical

from tabulate import tabulate

In [7]:
def get_model(name = None, input_shape = None):
    """ Returns a Sequential model """
    
    return Sequential(
        [
            Input(shape = (input_shape,)),
            Dense(units = 10, activation = 'softmax', use_bias = False)
        ],
        name = name
    )

class DiffusionMap:
    def __init__(self, alpha = 0.15, n_components = None):
        self.alpha = alpha,
        self.n_components = n_components
        
    def __str__(self):
        return 'DiffusionMap(alpha = {}, n_components = {})'.format(self.alpha, self.n_components)

    def fit_transform(self, X):
        """ Function to find the diffusion matrix P

            args:
            -----
            alpha - to be used for gaussian kernel function
            X - feature matrix as numpy array
            n_components - number of lower dimensions

            returns:
            --------
            Diffusion_map as np.array object
        """

        dists = euclidean_distances(X, X)
        K = np.exp(-dists**2 / self.alpha)

        r = np.sum(K, axis = 0)
        Di = np.diag(1/r)
        P = np.matmul(Di, K)

        D_right = np.diag((r)**0.5)
        D_left = np.diag((r)**-0.5)
        P_prime = np.matmul(D_right, np.matmul(P, D_left))

        self.eigenValues, self.eigenVectors = eigh(P_prime)
        idx = self.eigenValues.argsort()[::-1]
        self.eigenValues = self.eigenValues[idx]
        self.eigenVectors = self.eigenVectors[:, idx]

        diffusion_coordinates = np.matmul(D_left, self.eigenVectors)
        
        self.diffusion_coordinates = diffusion_coordinates

        return diffusion_coordinates[:, :self.n_components]

class DimensionReduction:
    """ General class to run several dimension reduction methods on a dataset 
        and then train and evaluate linear classifier on the transformed data.
    """
    
    def __init__(self, dim_reduction_names, X_train, y_train, X_test, y_test, n_components, n_neighbors = 7, gamma = 0.15, alpha = 0.15):
        self.names = dim_reduction_names
        self.X_train = X_train
        self.y_train = y_train
        self.X_test = X_test
        self.y_test = y_test
        self.n_components = n_components
        self.n_neighbors = n_neighbors
        self.gamma = gamma
        self.alpha = alpha
        
        self.n_neighbors_hessian = int((self.n_components * (self.n_components + 3))/2 + 1)
        
        # initialize with all dim reduction methods
        all_methods = {
            'pca': PCA(n_components = self.n_components),
            'mds': MDS(n_components = self.n_components, max_iter = 20, n_init = 1),
            'kpca_gaussian': KernelPCA(n_components = self.n_components, kernel = 'rbf', gamma = self.gamma),
#             'kpca_laplacian': KernelPCA(n_components = self.n_components, kind = 'laplacian'), # 16SEPT - doesn't work yet
            'lle': LocallyLinearEmbedding(n_components = self.n_components, method = 'standard'),
            'isomap': Isomap(n_components = self.n_components, n_neighbors = self.n_neighbors),
            'lap': SpectralEmbedding(n_components = self.n_components, n_neighbors = self.n_neighbors, gamma = self.gamma),
            'hes': LocallyLinearEmbedding(n_components = self.n_components, n_neighbors = self.n_neighbors_hessian, method = 'hessian', eigen_solver = 'dense'),
            'ltsa': LocallyLinearEmbedding(n_components = self.n_components, n_neighbors = self.n_components, method = 'ltsa'),
            'diff': DiffusionMap(n_components = self.n_components, alpha = self.alpha)
        }
        
        # select only the desired methods
        self.methods = [all_methods[name] for name in self.names]
        
        print('Trial - Training linear classifier on data reduced to {} dimensions using the following methods:\n{}'.format(self.n_components, self.names))
        
    def fit_dim_reduction(self):
        """ Uses selected dim reduction methods to fit the dim reducing transformations """
        
        self.X_train_reduced = [method.fit_transform(self.X_train) for method in self.methods]
        
        return None
    
    def reduce_test_dimension(self):
        """ Performs dim reduction on test data using fitted dim reduction methods """
        
        self.X_test_reduced = [method.transform(self.X_test) for method in self.methods]
        
        return None
    
    def build_classifiers(self):
        """ Build a simple Sequential model for each dim reduction method """
        
        self.models = [get_model(name, input_shape = self.n_components) for name, method in zip(self.names, self.methods)]
        
        for model in self.models:
            model.compile(
                loss = 'categorical_crossentropy',
                optimizer = SGD(learning_rate = 0.01),
                metrics = ['accuracy']
            )
        
        print('All models built and compiled\n')
        
        return None
    
    def train_classifiers(self):
        """ Train all classifiers on the reduced dim training data """
        
        self.training_accuracy = [
            model.fit(
                x,
                self.y_train,
                batch_size = 25,
                epochs = 100,
                verbose = False
            ) for model, x in zip(self.models, self.X_train_reduced)
        ]
        
        print('All models trained on reduced dim training data\n')
        
        return None
    
    def test_classifiers(self):
        """ Evaluate all classifiers on the reduced dim testing data """
        
        self.test_accuracy = [
            model.evaluate(
                x = x,
                y = to_categorical(self.y_test)
            ) for model, x in zip(self.models, self.X_test_reduced)
        ]
        
        print('All models evaluated on reduced dim test data\n')
        
        print('Generating predictions...\n')
        
        self.predictions = [model.predict(x) for model, x in zip(self.models, self.X_test_reduced)]
        self.predictions_boolean = [np.argmax(y, axis = 1) for y in self.predictions]
        
        for name, pred in zip(self.names, self.predictions_boolean):
            print('Summary report for model: {}'.format(name))
            print(classification_report(self.y_test, pred))
        
        return None
        
    def get_model_accuracy(self):
        """ Show the training and testing accuracy for all models """
        
        headers = ['Method ({} dimensions)'.format(self.n_components), 'Training accuracy', 'Test accuracy']
        
        self.table = [
            [name, train_result.history['accuracy'][-1]*100, test_result[-1]*100] for name, train_result, test_result in zip(self.names, self.training_accuracy, self.test_accuracy)
        ]
        
        print(tabulate(self.table, headers = headers))
        print('\n{}\n'.format('*'*100))
        
        return None
        
    def run(self):
        """ Run through the set of experiments """
        
        print('Initializing dimension reduction methods - reducing to d={}\n'.format(self.n_components))
        self.fit_dim_reduction()
        self.reduce_test_dimension()
        
        print('Building classifier models for all dimension reduction methods')
        self.build_classifiers()
        
        print('Training classifiers on reduced dimension training set')
        self.train_classifiers()
        
        print('Testing classifiers on reduced dimension testing set')
        self.test_classifiers()
        
        self.get_model_accuracy()
        
        return None

# used in Shearlet transform
useGPU = True

# names of dim reduction methods
# names = ['pca', 'mds', 'kpca', 'lle', 'isomap', 'lap', 'ltsa', 'diff']

(X_train, y_train), (X_test, y_test) = mnist.load_data()

# only do this for grayscale images
X_train = X_train.astype('float32') / 255.
X_test = X_test.astype('float32') / 255.

X_train = X_train.reshape((len(X_train), np.prod(X_train.shape[1:])))
X_test = X_test.reshape((len(X_test), np.prod(X_test.shape[1:])))

# get a random subsample of training data - helps speed up dim reduction
N = 1000

random_indices = np.random.randint(
    0,
    high = X_train.shape[0], 
    size = N
)

X = X_train[random_indices, :]
y = y_train[random_indices]

# encode the class labels as one-hot vectors
y = to_categorical(y)
y_train = to_categorical(y_train)
# y_test = to_categorical(y_test)

In [9]:
X.shape, type(X)

((1000, 784), numpy.ndarray)

In [15]:
names = ['pca', 'kpca_gaussian', 'lle', 'isomap', 'ltsa']

trials = [
    DimensionReduction(
        names,
        X_train = X,
        y_train = y,
        X_test = X_test,
        y_test = y_test,
        n_components = k
    ) for k in [2, 16, 64, 256]
]

Trial - Training linear classifier on data reduced to 2 dimensions using the following methods:
['pca', 'kpca_gaussian', 'lle', 'isomap', 'ltsa']
Trial - Training linear classifier on data reduced to 16 dimensions using the following methods:
['pca', 'kpca_gaussian', 'lle', 'isomap', 'ltsa']
Trial - Training linear classifier on data reduced to 64 dimensions using the following methods:
['pca', 'kpca_gaussian', 'lle', 'isomap', 'ltsa']
Trial - Training linear classifier on data reduced to 256 dimensions using the following methods:
['pca', 'kpca_gaussian', 'lle', 'isomap', 'ltsa']


In [16]:
for trial in trials:
    trial.run()

Initializing dimension reduction methods - reducing to d=2



  self.M_lu = lu_factor(M)


Building classifier models for all dimension reduction methods
All models built and compiled

Training classifiers on reduced dimension training set
All models trained on reduced dim training data

Testing classifiers on reduced dimension testing set
All models evaluated on reduced dim test data

Generating predictions...

Summary report for model: pca
              precision    recall  f1-score   support

           0       0.38      0.94      0.54       980
           1       0.46      0.99      0.63      1135
           2       0.33      0.24      0.28      1032
           3       0.41      0.35      0.38      1010
           4       0.30      0.29      0.29       982
           5       0.00      0.00      0.00       892
           6       0.00      0.00      0.00       958
           7       0.30      0.74      0.43      1028
           8       0.00      0.00      0.00       974
           9       0.00      0.00      0.00      1009

    accuracy                           0.37     1

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


Building classifier models for all dimension reduction methods
All models built and compiled

Training classifiers on reduced dimension training set
All models trained on reduced dim training data

Testing classifiers on reduced dimension testing set
All models evaluated on reduced dim test data

Generating predictions...

Summary report for model: pca
              precision    recall  f1-score   support

           0       0.87      0.96      0.91       980
           1       0.81      0.97      0.89      1135
           2       0.85      0.77      0.81      1032
           3       0.79      0.83      0.81      1010
           4       0.85      0.79      0.82       982
           5       0.81      0.65      0.72       892
           6       0.82      0.87      0.84       958
           7       0.83      0.85      0.84      1028
           8       0.85      0.71      0.78       974
           9       0.76      0.78      0.77      1009

    accuracy                           0.82     1

# Classifier performance on the full dataset

In [None]:
full_model = get_model(name = 'full_model', input_shape = 784)
full_model.compile(
    loss = 'categorical_crossentropy',
    optimizer = SGD(learning_rate = 0.01),
    metrics = ['accuracy']
)

In [None]:
history_full = full_model.fit(
    X_train,
    y_train,
    batch_size = 25,
    epochs = 50,
    verbose = False
)

test_full = full_model.evaluate(
    x = X_test,
    y = y_test
)

In [None]:
print('Training accuracy\n{}: {:.4f}%\n'.format(full_model.name, history_full.history['accuracy'][-1]*100))
print('Test accuracy\n{}: {:.4f}%\n'.format(full_model.name, test_full[-1]*100))

# Let's check a classifier's performance on the sample

In [None]:
sample_model = get_model(name = 'sample_model', input_shape = 784)
sample_model.compile(
    loss = 'categorical_crossentropy',
    optimizer = SGD(learning_rate = 0.01),
    metrics = ['accuracy']
)

In [None]:
history_sample = sample_model.fit(
    X,
    y,
    batch_size = 25,
    epochs = 50,
    verbose = False
)

test_sample = sample_model.evaluate(
    x = X_test,
    y = y_test
)

In [None]:
print('Training accuracy\n{}: {:.4f}%\n'.format(sample_model.name, history_sample.history['accuracy'][-1]*100))
print('Test accuracy\n{}: {:.4f}%\n'.format(sample_model.name, test_sample[-1]*100))

# Sept 2 - Wavelet decompositions

In [17]:
# wavelet decomposition
inputs = Input(shape = (28, 28))
x = Scattering2D(J = 3, L = 8)(inputs)
x = Flatten()(x)
x_out = Dense(10, activation = 'softmax')(x)
model_scatter = Model(inputs, x_out)

Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: Unable to identify source code of lambda function <function <lambda> at 0x7f54005f8e60>. It was defined in this code:
backend.fft = FFT(lambda x: tf.signal.fft2d(x, name='fft2d'),
                  lambda x: tf.signal.ifft2d(x, name='ifft2d'),
                  lambda x: tf.math.real(tf.signal.ifft2d(x, name='irfft2d')),
                  lambda x: None)

This code must contain a single distinguishable lambda. To avoid this problem, define each lambda in a separate expression.
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: Unable to identify source code of lambda function <function <lambda> at 0x7f54005f8e60>. It was defined in this code:
backend.fft = FFT(lambda x: tf.signal.fft2d(x, name='fft2d'),
    

In [18]:
model_scatter.summary()

Model: "functional_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_56 (InputLayer)        [(None, 28, 28)]          0         
_________________________________________________________________
scattering2d (Scattering2D)  (None, 217, 3, 3)         0         
_________________________________________________________________
flatten (Flatten)            (None, 1953)              0         
_________________________________________________________________
dense_55 (Dense)             (None, 10)                19540     
Total params: 19,540
Trainable params: 19,540
Non-trainable params: 0
_________________________________________________________________


In [19]:
model_scatter.compile(
        loss = 'categorical_crossentropy',
        optimizer = SGD(learning_rate = 0.01),
        metrics = ['accuracy']
)

In [20]:
X_scatter = np.array([img.reshape((28, 28)) for img in X])
X_scatter.shape

(1000, 28, 28)

In [21]:
history_scatter = model_scatter.fit(
    X_scatter,
    y,
    batch_size = 25,
    epochs = 50,
    verbose = False
)

In [22]:
print('model_scatter: {:.4f}%'.format(history_scatter.history['accuracy'][-1]*100))

model_scatter: 51.0000%


# Autoencoders

In [None]:
class Autoencoder:
    
    def __init__(self, X, y, n_components):
        self.X = X
        self.y = y
        self.n_components = n_components
        
        # create the encoder and full autoencoder models
        self.input_img = Input(shape = (784,))
        
        self.encoded = Dense(
            self.n_components,
            activation = 'relu'
        )(self.input_img)
        
        self.decoded = Dense(
            784,
            activation = 'sigmoid'
        )(self.encoded)
        
        self.autoencoder = Model(
            self.input_img,
            self.decoded
        )
        
        self.encoder = Model(
            self.input_img,
            self.encoded
        )
        
        # placeholder for encoded input
        self.encoded_input = Input(
            shape = (self.n_components,)
        )
        
        # create decoder
        self.decoder_layer = self.autoencoder.layers[-1]
        self.decoder = Model(
            self.encoded_input,
            self.decoder_layer(self.encoded_input)
        )
        
        # compile the autoencoder model
        self.autoencoder.compile(
            loss = 'categorical_crossentropy',
            optimizer = SGD(learning_rate = 0.01),
            metrics = ['accuracy']
        )
        
    def fit(self):
        self.autoencoder.fit(
            self.X,
            self.X,
            epochs = 50,
            batch_size = 25,
            shuffle = True,
            verbose = False
        )
        
    def fit_transform(self):
        return self.encoder.predict(self.X)
    
    def build_model(self):
        self.model = Sequential(
            [
                Input(shape = (self.n_components, )),
                Dense(units = 10, activation = 'softmax', use_bias = False)
            ]
        )
        
        self.model.compile(
            loss = 'categorical_crossentropy',
            optimizer = SGD(learning_rate = 0.01),
            metrics = ['accuracy']
        )
        
    def train_model(self):
        reduced_dim = self.fit_transform()
        self.history = self.model.fit(
            reduced_dim,
            self.y,
            batch_size = 25,
            epochs = 50,
            verbose = False
        )
        
        self.accuracy = self.history.history['accuracy'][-1]*100

## Autoencoder reducing to 2 dimensions

Make an ensemble of 50 autoencoders, then take the average accuracy as the reported accuracy.

In [None]:
N = 50
k = 2

auto2 = [Autoencoder(X, y, k) for _ in range(N)]
[a.fit() for a in auto2]

[a.build_model() for a in auto2]
[a.train_model() for a in auto2];

In [None]:
for a in auto2:
    print('model_autoencoder: {:.4f}%'.format(a.accuracy))
          
mean_accuracy2 = sum([a.accuracy for a in auto2])/N
print('Mean accuracy for {} models: {}'.format(N, mean_accuracy2))

## Autoencoder reducing to 16 dimensions

In [None]:
k = 16

auto16 = [Autoencoder(X, y, k) for _ in range(N)]
[a.fit() for a in auto16]

[a.build_model() for a in auto16]
[a.train_model() for a in auto16];

In [None]:
for a in auto16:
    print('model_autoencoder: {:.4f}%'.format(a.accuracy))
          
mean_accuracy16 = sum([a.accuracy for a in auto16])/N
print('Mean accuracy for {} models: {}'.format(N, mean_accuracy16))

## Autoencoder reducing to 256 dimensions

In [None]:
k = 256

auto256 = [Autoencoder(X, y, k) for _ in range(N)]
[a.fit() for a in auto256]

[a.build_model() for a in auto256]
[a.train_model() for a in auto256];

In [None]:
for a in auto256:
    print('model_autoencoder: {:.4f}%'.format(a.accuracy))
          
mean_accuracy256 = sum([a.accuracy for a in auto256])/N
print('Mean accuracy for {} models: {}'.format(N, mean_accuracy256))

## Autoencoder reducing to 783 dimensions

In [None]:
k = 783

auto783 = [Autoencoder(X, y, k) for _ in range(N)]
[a.fit() for a in auto783]

[a.build_model() for a in auto783]
[a.train_model() for a in auto783];

In [None]:
for a in auto783:
    print('model_autoencoder: {:.4f}%'.format(a.accuracy))
          
mean_accuracy783 = sum([a.accuracy for a in auto783])/N
print('Mean accuracy for {} models: {}'.format(N, mean_accuracy783))

# References

[1] Ham, J., Lee, D. D., Mika, S., & Schölkopf, B. (2004, July). A kernel view of the dimensionality reduction of manifolds. In *Proceedings of the Twenty-First International Conference on Machine Learning* (p. 47).