In [1]:
# Defining random seeds to enable reproducibility
from numpy.random import seed
seed(1)

import tensorflow as tf
tf.random.set_seed(1)
 
import random
random.seed(1)
 
import pickle
import numpy as np
import keras_tuner
from tensorflow import keras
from tensorflow.keras import models
from tensorflow.keras import applications
from tensorflow.keras import optimizers
from tensorflow.keras.layers import InputLayer, Flatten, Dense
from tensorflow.python.client import device_lib
from sklearn.model_selection import StratifiedKFold


print(device_lib.list_local_devices())
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))


def load_patras_dataset():
    """
    Loads the datasets encoded in .pkl files and returns its decoded form.

    Returns
    -------
    list
        A list of n-dimensional arrays representing the subjects samples that will be used to \\
        train the drunkenness classification model.
    list
        A list of n-dimensional arrays representing the subjects samples that will be used to \\
        test the drunkenness classification model.
    ndarray
        A n-dimensional array representing the training set labels.
    ndarray
        A n-dimensional array representing the test set labels.
    """

    print("Loading Sober-Drunk Face Dataset, from Patras University")
    
    # Defining the sample and label sets filenames
    sets = [
        "Insert the balanced x_training-set.pkl file path here",
        "Insert the balanced x_test-set.pkl file path here",
        "Insert the balanced y_training-set.pkl file path here",
        "Insert the balanced y_test-set.pkl file path here"
    ]
    
    # Defining an empty list for storing the decoded datasets
    loaded_datasets = []
    
    # Iterating over the dataset files
    for set_ in sets:
        # Opening the .pkl file in read mode
        with open(set_, 'rb') as file:
            # Appending the decoded dataset to the datasets list
            loaded_datasets.append(pickle.load(file))
    
    # Unpacking the datasets list into individual subsets
    x_train, x_test, y_train, y_test = loaded_datasets
    
    # Converting the label list to the n-dimensional array format
    y_train= np.array(y_train)
    y_test = np.array(y_test)
    
    # Printing the dataset length
    print("\nSamples total: {0}".format((len(x_train)) + (len(x_test))))
    
    print("\nDataset splitting: ")
    
    # Printing the training and test sets length
    print("\nTraining set: {0}".format(len(x_train)))
    print("Test set: {0}".format(len(x_test)))
    
    # Returning the training and test sets, and its respective labels
    return x_train, x_test, y_train, y_test


def min_max_norm(dataset):
    """
    Normalizes the keyframes according to the minimum-maximum norm, \\
    such that pixel values ranges from 0 to 1.

    Parameters
    ----------
    dataset : list
        A list of n-dimensional arrays representing the subjects keyframes.

    Returns
    -------
    ndarray
        A n-dimensional array representing keyframes with pixel values ranging from 0 to 1.
    """

    # Converting the dataset type from list to n-dimensional array
    dataset = np.asarray(dataset, dtype="int16")

    # Finding the keyframes minimum and maximum values
    x_min = dataset.min(axis=(1, 2), keepdims=True)
    x_max = dataset.max(axis=(1, 2), keepdims=True)

    # Applying the minimum-maximum norm to each keyframe
    norm_dataset = (dataset - x_min) / (x_max - x_min)

    # Printing the minimum and maximum values from a given sample for sanity check
    print("\nMinMax normalization")
    print("dataset shape: ", norm_dataset.shape)
    print("min: ", norm_dataset[0].min())
    print("max: ", norm_dataset[0].max())

    # Returning the normalized dataset
    return norm_dataset


class MyHyperModel(keras_tuner.HyperModel):
    """
    A custom keras tuner class for running the the stratified k-fold cross-validation \\
    method during the hyperparameter search process.

    Attributes
    ----------
    features_shape : ndarray
        A n-dimensional array describing the flattened feature maps shape.

    Methods
    -------
    build(hp)
        Builds a keras sequential model and returns it.
    fit(hp, model, x, y, x_test, y_test, *args, **kwargs)
        Defines a custom training loop to enable the k-fold cross-validation \\
        during the hyperparameter search.
    """
    
    def __init__(self, features_shape):
        """
        Parameters
        ----------
        features_shape : ndarray
            A n-dimensional array describing the flattened feature maps shape.
        """

        # Instantiating the dense layer input shape
        self.features_shape = features_shape
    
    
    def build(self, hp): 
        """
        Builds a keras sequential model and returns it.

        Parameters
        ----------
        hp : Any
            Defines the hyperparameters to search while building the model.

        Returns
        -------
        Sequential
            A single layer binary classification model.
        """
        
        # Defining a Sequential model
        model = models.Sequential()

        # Adding an input layer to receive the flattened feature array
        model.add(InputLayer(input_shape=self.features_shape, name="input"))

        # Adding an output layer to classify the received features as sober (0) or drunk (1)
        model.add(Dense(1, activation='sigmoid', name="output"))

        # Defining the learning rate hyperparameter search space
        learning_rate = hp.Choice("learning rate", values=[1e-3, 1e-4, 1e-5, 1e-6])

        # Defining the optmization function
        adam = optimizers.Adam(learning_rate=learning_rate)

        # Compiling the model
        model.compile(loss=keras.losses.BinaryCrossentropy(from_logits=False),
                    optimizer=adam,
                    metrics=['accuracy'])
    
        # Printing the model summary
        model.summary()

        # Returning the sequential model
        return model

    
    def fit(self, hp, model, x, y, x_test, y_test, *args, **kwargs):
        """
        Defines a custom training loop to enable the k-fold cross-validation \\
        during the hyperparameter search.

        Parameters
        ----------
        hp : Any
            Defines the hyperparameters to search while building the model.
        model: Sequential
            The keras sequential model returned from the build function.
        x : ndarray
            A n-dimensional array representing the training samples.
        y : ndarray
            A n-dimensional array representing the training samples labels.
        x_test : ndarray
            A n-dimensional array representing the test samples to evaluate the model \\
            classification performance.
        y_test : ndarray
            A n-dimensional array representing the test samples labels.
        *args : Any
            Positional arguments passed by the Tuner.search() function, such as training \\
            and validation data.
        **kwargs : list
            Keyword arguments passed by the Tuner.search() function. It always contains a \\
            callback argument, a list of callback functions called during the model training. 

        Returns
        -------
        dictionary
            The name of the objective function to track as the dictionary key and its \\
            respective value.

        Notes
        -----
        The Keras tuner approach for comparing trials is not the same method we used to evaluate the model, hence, \\
        after obtaining the best hyperparameter settings we retrained the model in order to verify its training behavior \\
        and to assess its classification performance. Check this GitHub discussion for more details regarding Keras tuner \\
        score calculation: https://github.com/keras-team/keras-tuner/discussions/581.
        """

        # Defining the stratified cross-validation folds
        folds = list(StratifiedKFold(n_splits=5, shuffle=False, random_state=None).split(x, y))

        # Defining the epochs hyperparameter search space
        epochs = hp.Choice('epochs', values=[10, 20, 40, 60, 80, 100, 200, 300, 400, 500])

        # Defining the batch size hyperparameter search space
        batch_size = hp.Int('batch_size', min_value=2, max_value=48, step=2)

        # Instantiating an empty list for storing the model classification accuracies on the test set samples
        test_accuracies = []

        # Iterating over the stratified cross-validation folds
        for j, (train_indices, valid_indices) in enumerate(folds):
            print('\nFold ',j)
            # Defining the training and validation sets
            x_train, x_valid = x[train_indices], x[valid_indices]
            y_train, y_valid = y[train_indices], y[valid_indices]
            # Fitting the model
            history = model.fit(x_train, y_train, 
                                validation_data=(x_valid, y_valid),
                                batch_size=batch_size, 
                                epochs=epochs,
                                **kwargs)
            
            # Evaluating the model on unseen data
            test_loss, test_acc = model.evaluate(x_test, y_test, verbose=1)

            # Appending the model classification accuracies to the test set accuracies list
            test_accuracies.append(test_acc)
        
        # Returning the objective metric to track 
        return {'test_accuracy': np.mean(test_accuracies)}


def feature_extraction(x_train, x_test):
    """
    Uses a pre-trained model to extract common features from facial thermal images. \\
    Such features will be used in the transfer learning step to train a binary \\
    classification model, which is expected to abstract new knowledge from these \\ 
    generic representations.

    Parameters
    ----------
    x_train : ndarray
        A n-dimensional array representing the training samples.
    x_test : ndarray
        A n-dimensional array representing the test samples to evaluate the model \\
        classification performance.
    
    Returns
    -------
    ndarray
        A n-dimensional array representing the flattened training feature maps.
    ndarray
        A n-dimensional array representing the flattened test feature maps.
    ndarray
        A n-dimensional array describing the flattened feature maps shape.
    """
    
    # Defining a VGG pre-trained model with new input shape and without dense layers
    vgg = applications.VGG16(weights='imagenet', include_top=False, input_shape=(128,160,3))

    # Defining the VGG model output as the last pooling layer feature maps
    output = vgg.layers[-1].output

    # Adding a Flatten layer to the output
    output = Flatten()(output)

    # Defining a functional model for feature extraction
    vgg_model = models.Model(vgg.input, output)

    # Changing the model 'trainable' parameter to False
    vgg_model.trainable = False

    print("\n")

    # Iterating over the model layers
    for layer in vgg_model.layers:
        # Freezing the layers weights
        layer.trainable = False
        # Printing the layers 'trainable' parameter for sanity check
        print("{}: {}".format(layer.name, layer.trainable))

    # Extracting the training features
    train_features = vgg_model.predict(x_train, verbose=1)

    # Extracting the test features
    test_features = vgg_model.predict(x_test, verbose=1)

    print('\nTraining Bottleneck Features: {0}'.format(train_features.shape))
    print('Test Bottleneck Features: {0}'.format(test_features.shape))

    print('\nModel output shape: {0}'.format(vgg_model.output_shape))

    # Defining the flattened fetaure maps shape for passing to build function
    features_shape = train_features[0].shape

    # Returning the training and test features, and its array shape
    return train_features, test_features, features_shape


# Loading the training and test datasets
x_train, x_test, y_train, y_test = load_patras_dataset()

# Applying the min-max normalization
x_train = min_max_norm(x_train)
x_test = min_max_norm(x_test)

# Reshaping datsets to the tensor format (channel last)
x_train = x_train.reshape(x_train.shape[0], x_train.shape[1], x_train.shape[2], 3)
x_test = x_test.reshape(x_test.shape[0], x_test.shape[1], x_test.shape[2], 3)

# Extracting features from the training and test datasets
train_features, test_features, features_shape = feature_extraction(x_train, x_test)

# Defining the random search object
tuner = keras_tuner.RandomSearch(
    MyHyperModel(features_shape),
    objective=keras_tuner.Objective("test_accuracy", "max"),
    max_trials=64,
    seed=1,
    project_name="1st_frame",
)

# Defining the early stopping callback
early_stopping = keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True, verbose=1)

# Running the random search
tuner.search(train_features, y_train, test_features, y_test, callbacks=[early_stopping])

# Printing the top 10 results out of the 64 trials
tuner.results_summary()


Trial 64 Complete [00h 00m 04s]
test_accuracy: 0.5800000190734863

Best test_accuracy So Far: 0.7800000071525574
Total elapsed time: 00h 10m 18s
Results summary
Results in ./frame_average
Showing 10 best trials
Objective(name='test_accuracy', direction='max')
Trial summary
Hyperparameters:
learning rate: 1e-05
epochs: 300
batch_size: 30
Score: 0.7800000071525574
Trial summary
Hyperparameters:
learning rate: 1e-05
epochs: 60
batch_size: 34
Score: 0.7600000023841857
Trial summary
Hyperparameters:
learning rate: 0.0001
epochs: 10
batch_size: 26
Score: 0.7399999976158143
Trial summary
Hyperparameters:
learning rate: 0.001
epochs: 60
batch_size: 6
Score: 0.6600000023841858
Trial summary
Hyperparameters:
learning rate: 1e-06
epochs: 300
batch_size: 28
Score: 0.6599999904632569
Trial summary
Hyperparameters:
learning rate: 1e-05
epochs: 500
batch_size: 16
Score: 0.6400000095367432
Trial summary
Hyperparameters:
learning rate: 1e-05
epochs: 200
batch_size: 8
Score: 0.6400000095367432
Trial sum