# Project 3 - Autoencoder for Dimensionality Reduction

# Import libraries

In [None]:
import logging, os

import tensorflow as tf

import keras
from keras import layers

from tensorflow.keras.models import save_model, load_model

from sklearn.utils import shuffle
import numpy as np

import matplotlib.pyplot as plt

import optuna
from optuna.visualization import plot_pareto_front, plot_optimization_history, plot_slice

from math import ceil

# Load MNIST dataset

In [None]:
keras.datasets.mnist.load_data(path="mnist.npz")
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

# Split dataset into training and validation sets

In [None]:
def split_data(x, y, training_size):
    x, y = shuffle(x, y)
    x_train, y_train = x[0 : training_size], y[0 : training_size]
    x_val, y_val     = x[training_size : ],  y[training_size : ]
    return (x_train, y_train), (x_val, y_val)

# Normalize and flatten data

In [None]:
def normalize_data(x, factor):
    return x.astype('float32') / factor

def flatten_data(x):
    return np.reshape(x, (len(x), 784))

def tensor_data(x):
    return np.reshape(x, (len(x), 28, 28, 1))

In [None]:
TRAINING_SIZE = int(0.8 * x_train.shape[0])
(x_train, y_train), (x_val, y_val) = split_data(x_train, y_train, TRAINING_SIZE)

x_train = normalize_data(x_train, 255)
x_val   = normalize_data(x_val, 255)
x_test  = normalize_data(x_test, 255)

x_d_train = flatten_data(x_train)
x_d_val   = flatten_data(x_val)
x_d_test  = flatten_data(x_test)

x_c_train = tensor_data(x_train)
x_c_val   = tensor_data(x_val)
x_c_test  = tensor_data(x_test)

In [None]:
# print shapes
print(x_train.shape)
print(x_val.shape)
print(x_test.shape)

print(x_d_train.shape)
print(x_d_val.shape)
print(x_d_test.shape)

print(x_c_train.shape)
print(x_c_val.shape)
print(x_c_test.shape)

# Define plot functions

In [None]:
def print_digits(decoded_imgs):
    n = 10  # How many digits we will display
    plt.figure(figsize=(20, 4))
    for i in range(n):
        # Display original
        ax = plt.subplot(2, n, i + 1)
        plt.imshow(x_test[i].reshape(28, 28))
        plt.gray()
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

        # Display reconstruction
        ax = plt.subplot(2, n, i + 1 + n)
        plt.imshow(decoded_imgs[i].reshape(28, 28))
        plt.gray()
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
    plt.show()

# Define Autoencoder model

In [None]:
INITIAL_DIM = x_train.shape[1] ** 2
NEW_DIM = 32

In [None]:
@keras.saving.register_keras_serializable()
class Autoencoder(keras.Model):
    def __init__(self, model_type, dim_list, activation, optimizer, loss, patience):
        super(Autoencoder, self).__init__()
        self.es = keras.callbacks.EarlyStopping(monitor='loss', verbose=1, patience=patience, min_delta=0.0001)
        self.dim_list = dim_list
        self.activation = activation
        self.encoder = keras.Sequential()
        self.decoder = keras.Sequential()
        self.loss = loss
        if model_type == 'dense':
            self.build_dense()
        elif model_type == 'conv':
            self.build_conv()
        self.compile(optimizer=optimizer, loss=loss)

    def build_dense(self):
        self.encoder.add(layers.Flatten(input_shape=(INITIAL_DIM,)))
        for dim in self.dim_list:
            self.encoder.add(layers.Dense(dim, activation=self.activation))
        for dim in reversed(self.dim_list):
            self.decoder.add(layers.Dense(dim, activation=self.activation))
        self.decoder.add(layers.Dense(INITIAL_DIM, activation='sigmoid'))
        
    def build_conv(self):
        for filters, kernel_size, pool_size, padding in self.dim_list[:-1]:
            self.encoder.add(layers.Conv2D(filters, kernel_size, activation=self.activation, padding=padding))
            self.encoder.add(layers.MaxPooling2D(pool_size, padding='same'))
        for i, config in enumerate(reversed(self.dim_list[:-1])):
            filters, kernel_size, pool_size, padding = config
            if i == len(self.dim_list[:-1]) - 1:
                padding = 'valid'
            self.decoder.add(layers.Conv2D(filters, kernel_size, activation=self.activation, padding=padding))
            self.decoder.add(layers.UpSampling2D(pool_size))
        if self.loss == 'binary_crossentropy':
            activation = 'sigmoid'
        elif self.loss == 'mean_squared_error':
            activation = 'linear'
        self.decoder.add(layers.Conv2D(1, self.dim_list[-1][0], activation=activation, padding=self.dim_list[-1][1]))

    def encode(self, x):
        return self.encoder(x)

    def decode(self, x):
        return self.decoder(x)    
    
    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

    def predict(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

## Binary handling

In [None]:
def save_encoded_binary(encoded_imgs, output_file):
	with open(output_file, 'wb') as f:
		# write magic number
		f.write((0).to_bytes(4, byteorder='big'))
		# write number of images
		f.write((len(encoded_imgs)).to_bytes(4, byteorder='big'))
		# write number of rows
		f.write((NEW_DIM).to_bytes(4, byteorder='big'))
		# write number of columns
		f.write((1).to_bytes(4, byteorder='big'))
		# write data
		for i in range(len(encoded_imgs)):
			for j in range(NEW_DIM):
				f.write(encoded_imgs[i][j].astype('float32').tobytes())

def save_decoded_binary(decoded_imgs, output_file):
	with open(output_file, 'wb') as f:
		# write magic number
		f.write((0).to_bytes(4, byteorder='big'))
		# write number of images
		f.write((len(decoded_imgs)).to_bytes(4, byteorder='big'))
		# write number of rows
		f.write((28).to_bytes(4, byteorder='big'))
		# write number of columns
		f.write((28).to_bytes(4, byteorder='big'))
		# write data
		for i in range(len(decoded_imgs)):
			for j in range(28):
				for k in range(28):
					f.write(decoded_imgs[i][j * 28 + k].astype('float32').tobytes())

# Optimizing the Dense Autoencoder

To skip logs click [here](#Dense-Autoencoder-optimization-results).

In [None]:
def objective_dense(trial):
    dim_list = []
    layers = trial.suggest_int('num_layers', 2, 5)
    x_train, x_val, x_test = x_d_train, x_d_val, x_d_test
    for i in range(layers):
        if i == 0:
            dim_list.append(trial.suggest_int('dim_' + str(i), 50, 512))
        elif i == layers - 1:
            min_element = min(dim_list)
            if min_element > 50:
                dim_list.append(trial.suggest_int('dim_' + str(i), 10, 50))
        else:
            min_element = min(dim_list)
            dim_list.append(trial.suggest_int('dim_' + str(i), 10, min_element))
    
    print('Dim list:', dim_list)
    activation = trial.suggest_categorical('activation', ['relu', 'elu', 'gelu'])
    optimizer = trial.suggest_categorical('optimizer', ['adam', 'adagrad', 'adadelta', 'adamax', 'nadam'])
    loss = 'binary_crossentropy'
    patience = trial.suggest_int('patience', 1, 10)
    autoencoder = Autoencoder('dense', dim_list, activation, optimizer, loss, patience)
    batch_size = trial.suggest_categorical('batch_size', [32, 64, 128, 256, 512])
    history = autoencoder.fit(x_train, x_train,
                              epochs=50,
                              batch_size=batch_size,
                              shuffle=True,
                              validation_data=(x_val, x_val),
                              callbacks=[autoencoder.es])
    save_model(autoencoder, 'model_' + 'dense_' + str(trial.number) + '.keras')
    np.save('history_' + 'dense_' + str(trial.number) + '.npy', history.history)
    encoded_imgs = autoencoder.encode(x_test).numpy()
    print('Shape of encoded vector:', encoded_imgs.shape)
    print('######################')
    return history.history['val_loss'][-1] # return validation loss of last epoch

In [None]:
study_dense = optuna.create_study(direction='minimize')
study_dense.optimize(objective_dense, n_trials=50, n_jobs=1)

## Dense Autoencoder optimization results

In [None]:
# plot best model decoded images
x_train, x_val, x_test = x_d_train, x_d_val, x_d_test
model = load_model('model_' + 'dense_' + str(study_dense.best_trial.number) + '.keras')
decoded_imgs = model.predict(x_test).numpy()
print_digits(decoded_imgs)

In [None]:
print(study_dense.best_params)
print(study_dense.best_value)
print(study_dense.best_trial)

In [None]:
df_dense = study_dense.trials_dataframe()

df_dense_sorted = df_dense.copy(deep=True)
df_dense_sorted = df_dense_sorted.dropna(subset=['value'])
df_dense_sorted = df_dense_sorted.sort_values(by=['value'], ascending=True)
df_dense_sorted = df_dense_sorted.reset_index(drop=True)
df_dense_sorted

In [None]:
plot_optimization_history(study_dense)

In [None]:
plot_slice(study_dense)

In [None]:
path = '/kaggle/working/'

sorted_trials = sorted([trial for trial in study_dense.trials if trial.value is not None], key=lambda trial: trial.value)
plt.figure(figsize=(20, 20))
for i, trial in enumerate(sorted_trials[:10]):
    history = np.load('history_' + 'dense_' + str(trial.number) + '.npy', allow_pickle=True).item()
    plt.subplot(5, 2, i + 1)
    plt.plot(history['loss'])
    plt.plot(history['val_loss'])
    plt.title('Trial ' + str(trial.number))
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend(['Train', 'Validation'], loc='upper right')
    plt.subplots_adjust(hspace=0.5)
plt.show()

# Optimizing the Convolutional Autoencoder

To skip logs click [here](#Convolutional-Autoencoder-optimization-results).

In [None]:
def objective_conv(trial):
    dim_list = []
    layers = trial.suggest_int('num_layers', 4, 6)
    x_train, x_val, x_test = x_c_train, x_c_val, x_c_test
    kernel_size = 3
    pool_size = 2
    padding = 'same'
    for i in range(layers):
        if i == 0:
            dim_list.append([trial.suggest_int('filters_' + str(i), 16, 64), (kernel_size, kernel_size), (pool_size, pool_size), padding])
        elif i == layers - 1:
            dim_list.append([(kernel_size, kernel_size), padding])
        else:
            # take min filters size from list
            min_element = min([dim_list[i][0] for i in range(len(dim_list))])
            dim_list.append([trial.suggest_int('filters_' + str(i), 8, min_element), (kernel_size, kernel_size), (pool_size, pool_size), padding])
    print('Dim list:', dim_list)
    activation = trial.suggest_categorical('activation', ['relu', 'elu', 'gelu'])
    optimizer = trial.suggest_categorical('optimizer', ['adam', 'adagrad', 'adadelta', 'adamax', 'nadam'])
    loss = 'binary_crossentropy'
    patience = trial.suggest_int('patience', 1, 10)
    autoencoder = Autoencoder('conv', dim_list, activation, optimizer, loss, patience)
    batch_size = trial.suggest_categorical('batch_size', [32, 64, 128, 256, 512])
    history = autoencoder.fit(x_train, x_train,
                              epochs=50,
                              batch_size=batch_size,
                              shuffle=True,
                              validation_data=(x_val, x_val),
                              callbacks=[autoencoder.es])
    save_model(autoencoder, 'model_' + 'conv_' + str(trial.number) + '.keras')
    np.save('history_' + 'conv_' + str(trial.number) + '.npy', history.history)
    encoded_imgs = autoencoder.encode(x_test).numpy()
    print('Shape of encoded vector:', encoded_imgs.shape)
    flattened_size = 1
    for dim in encoded_imgs.shape[1:]:
        flattened_size *= dim

    # constraint: flatten size must be less than 50
    c0 = flattened_size - 50
    trial.set_user_attr('constraint', [c0])

    print('######################')
    return history.history['val_loss'][-1] # return validation loss of last epoch

def constraints(trial):
    return trial.user_attrs['constraint']

In [None]:
sampler = optuna.samplers.NSGAIISampler(constraints_func=constraints)
study_conv = optuna.create_study(direction='minimize', sampler=sampler)
study_conv.optimize(objective_conv, n_trials=50, n_jobs=1)

## Convolutional Autoencoder optimization results

In [None]:
# plot best model decoded images
x_train, x_val, x_test = x_c_train, x_c_val, x_c_test
model = load_model('model_' + 'conv_' + str(study_conv.best_trial.number) + '.keras')
decoded_imgs = model.predict(x_test)
decoded_imgs = np.reshape(decoded_imgs, (len(decoded_imgs), 784))
print_digits(decoded_imgs)

In [None]:
print(study_conv.best_params)
print(study_conv.best_value)
print(study_conv.best_trial)

In [None]:
df_conv = study_conv.trials_dataframe()

df_conv_sorted = df_conv.copy(deep=True)
df_conv_sorted = df_conv_sorted.dropna(subset=['value'])
df_conv_sorted = df_conv_sorted.sort_values(by=['value'], ascending=True)
df_conv_sorted = df_conv_sorted.reset_index(drop=True)
df_conv_sorted

In [None]:
plot_optimization_history(study_conv)

In [None]:
plot_slice(study_conv)

In [None]:
path = '/kaggle/working/'

sorted_trials = sorted([trial for trial in study_conv.trials if trial.value is not None], key=lambda trial: trial.value)
plt.figure(figsize=(20, 20))
for i, trial in enumerate(sorted_trials[:10]):
    history = np.load('history_' + 'conv_' + str(trial.number) + '.npy', allow_pickle=True).item()
    plt.subplot(5, 2, i + 1)
    plt.plot(history['loss'])
    plt.plot(history['val_loss'])
    plt.title('Trial ' + str(trial.number))
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend(['Train', 'Validation'], loc='upper right')
    plt.subplots_adjust(hspace=0.5)
plt.show()