In [1]:
#!/usr/bin/python3
# -*- coding:utf-8 -*-

import itertools
import os
import pathlib
import random
import sys
import cv2
import numpy as np
from matplotlib import pyplot as plt, image as mpimg
import pandas as pd
from difficulty_levels import DifficultyLevels
from tensorflow import keras
import tensorflow as tf
from typing import Optional
from tensorflow.python.keras.callbacks import ModelCheckpoint


download_folder = "tracking_data_download"
labeled_images_folder = "labeled_images"

RANDOM_SEED = 42
NUMBER_OF_CLASSES = 3

results_folder = "ml_results"
data_folder_path = os.path.join("..", "post_processing", download_folder)
# print(data_folder_path)

NEW_IMAGE_SIZE = (128, 128)

In [2]:
def set_random_seed(seed=RANDOM_SEED):
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)

In [3]:
def show_result_plot(train_history, epochs, metric="categorical_accuracy", output_folder=results_folder,
                     output_name="train_history.png"):

    acc = train_history.history[f"{metric}"]
    val_acc = train_history.history[f"val_{metric}"]
    loss = train_history.history["loss"]
    val_loss = train_history.history["val_loss"]

    epochs_range = range(epochs)
    plt.figure(figsize=(8, 8))
    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, acc, label='Training Accuracy')
    plt.plot(epochs_range, val_acc, label='Validation Accuracy')
    plt.legend(loc='lower right')
    plt.title('Training and Validation Accuracy')

    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, loss, label='Training Loss')
    plt.plot(epochs_range, val_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.title('Training and Validation Loss')

    # save plot to file and show in a new window
    plt.savefig(os.path.join(output_folder, output_name))
    plt.show()

In [4]:
class DifficultyImageClassifier:
    """
    Custom CNN for predicting the difficulty level with images of a user's face.
    """

    def __init__(self, train_generator, val_generator, num_classes, num_epochs=32):
        self.n_classes = num_classes
        self.n_epochs = num_epochs

        self.train_generator = train_generator
        self.validation_generator = val_generator

        self.step_size_train = train_generator.n // train_generator.batch_size
        self.step_size_val = val_generator.n // val_generator.batch_size

        self.model_name = "Difficulty-CNN-Model-Generator.h5"
        self.model_path = os.path.join(results_folder, self.model_name)

        self.checkpoint_path = os.path.join(results_folder, "checkpoints_gen",
                                            "checkpoint-improvement-{epoch:02d}-{val_categorical_accuracy:.3f}.ckpt")

    def build_model(self, input_shape: tuple[Optional[int], int, int, int]) -> tf.keras.Model:
        self.sequential_model = tf.keras.Sequential(
            [
                tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=input_shape),
                tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
                tf.keras.layers.Dropout(0.25),

                tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
                tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
                tf.keras.layers.Dropout(0.25),

                tf.keras.layers.Conv2D(128, (3, 3), activation='relu'),
                tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
                tf.keras.layers.Dropout(0.25),

                tf.keras.layers.Flatten(),
                # units in the last layer should be a power of two
                tf.keras.layers.Dense(units=1024, activation="relu"),
                tf.keras.layers.Dropout(0.5),

                # units must be the number of classes -> we want a vector that looks like this: [0.2, 0.5, 0.3]
                tf.keras.layers.Dense(units=self.n_classes, activation="softmax")
            ]
        )

        self.sequential_model.summary()
        self.sequential_model.compile(optimizer="adam",
                                      loss="categorical_crossentropy",
                                      metrics=["categorical_accuracy"])

        return self.sequential_model


    def train_classifier(self):
        num_workers = 8

        checkpoint_callback = ModelCheckpoint(self.checkpoint_path, monitor='val_categorical_accuracy', verbose=1, mode="max",
                                              save_best_only=True, save_weights_only=True)
        lr_callback = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, verbose=1)

        history = self.sequential_model.fit(self.train_generator,
                                            steps_per_epoch=self.step_size_train,
                                            validation_data=self.validation_generator,
                                            validation_steps=self.step_size_val,
                                            use_multiprocessing=False,
                                            workers=num_workers,
                                            epochs=self.n_epochs,
                                            callbacks=[checkpoint_callback, lr_callback],
                                            verbose=1)

        self.sequential_model.save(self.model_path)

        show_result_plot(history, self.n_epochs, metric="categorical_accuracy",
                         output_name="train_history_custom_generator.png")

        return history

    def evaluate_classifier(self):
        val_loss, val_acc = self.sequential_model.evaluate(self.validation_generator,
                                                           steps=self.step_size_val,
                                                           verbose=1)
        print("Validation loss: ", val_loss)
        print("Validation accuracy: ", val_acc * 100)

    def predict(self, test_images):
        pass

In [5]:
class CustomImageDataGenerator(tf.keras.utils.Sequence):
    """
    Structure based on https://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly
    """

    def __init__(self, data_frame, x_col_name, y_col_name, batch_size, num_classes=3,
                 images_base_path=".", use_grayscale=False, shuffle=False):

        self.original_df = data_frame.copy()
        self.df = data_frame.copy()
        self.X_col = x_col_name
        self.y_col = y_col_name
        self.batch_size = batch_size
        self.n_classes = num_classes
        self.images_base_path = images_base_path
        self.use_grayscale = use_grayscale
        self.should_shuffle = shuffle

        self.n = len(self.df)
        self.indices = self.df.index.to_list()

        num_channels = 1 if self.use_grayscale else 3
        self.output_size = (*NEW_IMAGE_SIZE, num_channels)

        # create a random order for the samples
        self.index_order = self.generate_random_index_list()

    def generate_random_index_list(self):
        sample_indices = []
        for i in range(0, self.n, self.batch_size):
            sample_indices.append(i)

        random.shuffle(sample_indices)
        return sample_indices

    def __len__(self):
        return self.n // self.batch_size

    def on_epoch_end(self):
        if self.should_shuffle:
            self.df = self.df.sample(frac=1).reset_index(drop=True)

    def __getitem__(self, index):
        """
        Return a new sample in the form (X, y) where X is an image and y the corresponding label.

        Args:
            index: the number of the current sample from 0 to __len__() - 1
        """
        actual_index = self.index_order[index]

        # Take all elements starting from the current index until the start of the next index
        sample_rows = self.df[actual_index:actual_index + self.batch_size]

        X, y = self.__get_data(sample_rows)
        return X, y

    def __get_data(self, sample):
        # Setup arrays for the image and label data
        X = np.empty((self.batch_size, *self.output_size))
        y = np.empty((self.batch_size, self.n_classes))

        # Load and preprocess the images and labels for the current sample
        i = 0
        for idx, row in sample.iterrows():
            img_path = row[self.X_col]
            image_path = os.path.join(self.images_base_path, img_path)
            X[i, ] = self.__scale_and_convert_image(image_path)  # load image and resize and scale it

            label = row[self.y_col]
            y[i, ] = DifficultyLevels.get_one_hot_encoding(label)  # convert string label to one-hot-vector
            i += 1

        return X, y

    def __scale_and_convert_image(self, image_path):
        try:
            color_mode = "grayscale" if self.use_grayscale else "rgb"

            image = tf.keras.preprocessing.image.load_img(image_path, color_mode=color_mode)
            image_arr = tf.keras.preprocessing.image.img_to_array(image)

            # crop or pad image depending on it's size
            resized_img = tf.image.resize_with_crop_or_pad(image_arr,
                                                           target_height=NEW_IMAGE_SIZE[1],
                                                           target_width=NEW_IMAGE_SIZE[0])

            # normalize pixel values to [0, 1] so the ml model can work with smaller values
            scaled_img = resized_img.numpy() / 255.0
            return scaled_img

        except Exception as e:
            sys.stderr.write(f"\nError in processing image '{image_path}': {e}")
            return None

    def get_image_shape(self):
        return self.output_size

In [6]:
def merge_participant_image_logs(participant_list):
    image_data_frame = pd.DataFrame()
    post_processing_folder_path = os.path.join("..", "post_processing")

    for participant in participant_list:
        images_label_log = os.path.join(post_processing_folder_path, download_folder, participant, "labeled_images.csv")
        labeled_images_df = pd.read_csv(images_label_log)

        image_data_frame = pd.concat([image_data_frame, labeled_images_df])

    # add the index numbers as own column (reset the index first as the concatenate above creates duplicate indexes)
    image_data_frame_numbered = image_data_frame.reset_index(drop=True)
    image_data_frame_numbered["index"] = image_data_frame_numbered.index

    return image_data_frame_numbered

In [7]:
def split_train_test(participant_list, train_ratio=0.8):
    random.shuffle(participant_list)

    train_split = int(len(participant_list) * train_ratio)
    train_participants = participant_list[:train_split]
    test_participants = participant_list[train_split:]
    print(f"{len(train_participants)} participants used for training: {train_participants}")
    print(f"{len(test_participants)} participants used for validation: {test_participants}")

    return train_participants, test_participants

In [8]:
def start_preprocessing():
    set_random_seed()  # set seed for reproducibility

    without_participants = ["participant_1", "participant_2", "participant_4", "participant_5", "participant_6",
                            "participant_7", "participant_8", "participant_9", "participant_11", "participant_12",
                            "participant_13"]

    all_participants = os.listdir(data_folder_path)
    # remove some participants for testing
    all_participants = [p for p in all_participants if p not in set(without_participants)]

    train_participants, test_participants = split_train_test(all_participants)

    train_data = merge_participant_image_logs(train_participants)
    val_data = merge_participant_image_logs(test_participants)

    train_batch_size = 32
    val_batch_size = 32
    print(f"Train batch size: {train_batch_size} (Data len: {len(train_data)})")
    print(f"Validation batch size: {val_batch_size} (Data len: {len(val_data)})")

    for difficulty_level in train_data.difficulty.unique():
        difficulty_level_df = train_data[train_data.difficulty == difficulty_level]
        print(f"Found {len(difficulty_level_df)} train images for category \"{difficulty_level}\".")

    images_path = os.path.join("..", "post_processing")
    use_gray = False
    train_generator = CustomImageDataGenerator(data_frame=train_data, x_col_name="image_path", y_col_name="difficulty",
                                               batch_size=train_batch_size, images_base_path=images_path,
                                               use_grayscale=use_gray, shuffle=False)

    val_generator = CustomImageDataGenerator(data_frame=val_data, x_col_name="image_path", y_col_name="difficulty",
                                             batch_size=val_batch_size, images_base_path=images_path,
                                             use_grayscale=use_gray, shuffle=False)

    image_shape = train_generator.get_image_shape()
    number_epochs = 32

    classifier = DifficultyImageClassifier(train_generator, val_generator, num_classes=NUMBER_OF_CLASSES,
                                           num_epochs=number_epochs)
    classifier.build_model(input_shape=image_shape)
    classifier.train_classifier()
    classifier.evaluate_classifier()

In [None]:
start_preprocessing()

2 participants used for training: ['participant_14', 'participant_10']
1 participants used for validation: ['participant_3']
Train batch size: 32 (Data len: 53056)
Validation batch size: 32 (Data len: 20222)
Found 17702 train images for category "medium".
Found 17600 train images for category "hard".
Found 17754 train images for category "easy".
Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 126, 126, 32)      896       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 63, 63, 32)        0         
_________________________________________________________________
dropout (Dropout)            (None, 63, 63, 32)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 61, 61, 64)        18496     
______________________________________