# Initial Configuration

In [None]:
import os, sys, pathlib
import glob
from tqdm import tqdm
from PIL import Image
import split_folders
import numpy as np

import tensorflow as tf
from keras.models import Sequential, load_model
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, GlobalAveragePooling2D, Lambda
from keras.applications import MobileNetV2
from keras.preprocessing.image import ImageDataGenerator
from keras import backend as K
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
import matplotlib.pyplot as plt

In [None]:
#--------- CONFIG ---------#
batch_size = 32
#--------------------------#

# Dataset Preprocessing

## Split dataset into train-val-test

In [None]:
split_folders.ratio('raw_dataset_face_mask', output="dataset_face_mask", seed=2020, ratio=(.70, .15, .15))

## Set data directories

In [None]:
data_dir  = pathlib.Path("./dataset_face_mask/")
train_dir = data_dir / "train"
val_dir   = data_dir / "val"
test_dir  = data_dir / "test"

## Resize every image to 96x96

In [None]:
for filename in tqdm(data_dir.glob('**/*.png')):
    im = Image.open(filename)
    imResize = im.resize((96, 96), Image.ANTIALIAS)
    imResize.save(filename, 'PNG', quality=100)

# Create ImageDataGenerators

## Define ImageDataGenerator

In [None]:
imageGenerator = ImageDataGenerator(
    rotation_range = 20,
    width_shift_range = 0.2,
    height_shift_range = 0.2,
    shear_range = 0.1,
    zoom_range = 0.1,
    horizontal_flip = True)

gen_args = {"target_size": (96, 96),
            "batch_size": batch_size,
            "classes": ["WithoutMask", "WithMask"],
            "seed": 2020,
            "color_mode": "rgb",
            "class_mode": "binary"}

## Set Train, Validation and Test ImageDataGenerators

In [None]:
train_data = imageGenerator.flow_from_directory(directory = train_dir, shuffle = True,  **gen_args)
val_data   = imageGenerator.flow_from_directory(directory = val_dir,   shuffle = True,  **gen_args)
test_data  = imageGenerator.flow_from_directory(directory = test_dir,  shuffle = False, **gen_args)

In [None]:
# Sanity check
train_data.class_indices, val_data.class_indices, test_data.class_indices

# Set Tensorflow Session

In [None]:
try:
    graph = tf.get_default_graph()
    K.clear_session()  # Clear previous models from memory to avoid conflicts with previous sessions
    sess = tf.Session()
    K.set_session(sess)
except:
    graph = tf.compat.v1.get_default_graph()
    K.clear_session()

# Callbacks

In [None]:
from callbacks import EarlyStopping, ReduceLROnPlateau, MakeLRGreatAgain, ModelCheckpoint, BetterCSVLogger, TerminateOnNaN #, LearningRateScheduler

def lr_schedule(epoch):
    if epoch < 80: return 0.001
    elif epoch < 100: return 0.0001
    else: return 0.00001

# Define model callbacks.
callbacks = [
    EarlyStopping(monitor='val_loss', min_delta=0.0, patience=115, verbose=1),
    ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=20, verbose=1, min_delta=0.001, cooldown=0, min_lr=0.00001),
    MakeLRGreatAgain(monitor='val_loss', factor_min=10, factor_max=100, patience=59, verbose=1, min_delta=0.001, cooldown=0, min_lr=0.00001),
#     LearningRateScheduler(schedule=lr_schedule, verbose=1),
    ModelCheckpoint(filepath="./weights/epoch{epoch:02d}_loss{loss:.4f}_val{val_loss:.4f}.h5", monitor='val_loss', verbose=1, save_best_only=True,
                    save_weights_only=False, mode='auto', period=1),
    BetterCSVLogger(filename=f"./log/training_log.csv", separator=',', append=True),
    TerminateOnNaN(),
]

# Keras Model - Sequential API

## CNNs

### Initialise the CNNs

In [None]:
model_clf_adam = Sequential([Conv2D(32, (3, 3), input_shape = (96, 96, 3), activation = 'relu'),
                             MaxPooling2D(pool_size = (2, 2)),
                             Conv2D(32, (3, 3), activation = 'relu'),
                             GlobalAveragePooling2D(),
                             Dense(1, activation='sigmoid')])

model_clf_rmsprop = Sequential([Conv2D(32, (3, 3), input_shape = (96, 96, 3), activation = 'relu'),
                                MaxPooling2D(pool_size = (2, 2)),
                                Conv2D(32, (3, 3), activation = 'relu'),
                                GlobalAveragePooling2D(),
                                Dense(1, activation='sigmoid')])

### Compile the models

In [None]:
model_clf_adam.compile(optimizer= "rmsprop", loss='binary_crossentropy', metrics=['accuracy'])
model_clf_rmsprop.compile(optimizer= "rmsprop", loss='binary_crossentropy', metrics=['accuracy'])

### Models Train

In [None]:
history_clf_adam = model_clf_adam.fit_generator(train_data,
                                                steps_per_epoch = np.ceil(train_data.samples/batch_size),
                                                epochs = 1000,
                                                validation_data = val_data,
                                                validation_steps = np.ceil(val_data.samples/batch_size),
                                                callbacks = callbacks)

In [None]:
history_clf_rmsprop = model_clf_rmsprop.fit_generator(train_data,
                                                      steps_per_epoch = np.ceil(train_data.samples/batch_size),
                                                      epochs = 1000,
                                                      validation_data = val_data,
                                                      validation_steps = np.ceil(val_data.samples/batch_size),
                                                      callbacks = callbacks)

### Plot loss/accuracy

In [None]:
# plot accuracy
acc_adam     = history_clf_adam.history["acc"][5:]
val_acc_adam = history_clf_adam.history["val_acc"][5:]
epochs_adam  = range(len(acc_adam))

acc_rmsprop     = history_clf_rmsprop.history["acc"][5:]
val_acc_rmsprop = history_clf_rmsprop.history["val_acc"][5:]
epochs_rmsprop  = range(len(acc_rmsprop))

plt.plot(epochs_adam, acc_adam, "bo", label = "Training acc - Adam")
plt.plot(epochs_adam, val_acc_adam, "b", label = "Validation acc - Adam")
plt.plot(epochs_rmsprop, acc_rmsprop, "ro", label = "Training acc - RMSProp")
plt.plot(epochs_rmsprop, val_acc_rmsprop, "r", label = "Validation acc - RMSProp")
plt.title("Adam/RMSProp optimizers comparison (Accuracy)")
plt.legend()
plt.figure()


# plot Loss
loss_adam     = history_clf_adam.history["loss"][5:]
val_loss_adam = history_clf_adam.history["val_loss"][5:]

loss_rmsprop     = history_clf_rmsprop.history["loss"][5:]
val_loss_rmsprop = history_clf_rmsprop.history["val_loss"][5:]

plt.plot(epochs_adam, loss_adam, "bo", label = "Training loss - Adam")
plt.plot(epochs_adam, val_loss_adam, "b", label = "Validation loss - Adam")
plt.plot(epochs_rmsprop, loss_rmsprop, "ro", label = "Training loss - RMSProp")
plt.plot(epochs_rmsprop, val_loss_rmsprop, "r", label = "Validation loss - RMSProp")
plt.title("Adam/RMSProp optimizers comparison (Loss)")
plt.legend()

plt.show()

## MobileNetV2

### Initialise MobileNetV2

In [None]:
mnv2 = MobileNetV2(input_shape = (96, 96, 3), include_top = False, weights = "imagenet")

model_mnv2 = Sequential([
    mnv2,
    GlobalAveragePooling2D(),
    Dense(1, activation="sigmoid")])

### Compile MobileNetV2

In [None]:
model_mnv2.compile(optimizer = "rmsprop", loss = "binary_crossentropy", metrics = ["accuracy"])

### Train MobileNetV2

In [None]:
history_mnv2 = model_mnv2.fit_generator(train_data,
                                        steps_per_epoch = np.ceil(train_data.samples/batch_size),
                                        epochs = 1000,
                                        validation_data = val_data,
                                        validation_steps = np.ceil(val_data.samples/batch_size),
                                        callbacks = callbacks)

### Plot loss/accuracy

In [None]:
acc_mnv2      = history_mnv2.history["acc"][:5]
val_acc_mnv2  = history_mnv2.history["val_acc"][:5]
loss_mnv2     = history_mnv2.history["loss"][:5]
val_loss_mnv2 = history_mnv2.history["val_loss"][:5]
epochs_mnv2   = range(len(acc_mnv2))

plt.plot(epochs_mnv2, acc_mnv2, "bo", label = "Training acc - MobileNetV2")
plt.plot(epochs_mnv2, val_acc_mnv2, "b", label = "Validation acc - MobileNetV2")
plt.plot(epochs_rmsprop, acc_rmsprop, "ro", label = "Training acc - RMSProp")
plt.plot(epochs_rmsprop, val_acc_rmsprop, "r", label = "Validation acc - RMSProp")
plt.title("Classifier/MobileNetV2 performance comparison (Accuracy)")
plt.legend()

plt.figure()

plt.plot(epochs_mnv2, loss_mnv2, "bo", label = "Training loss")
plt.plot(epochs_mnv2, val_loss_mnv2, "b", label = "Validation loss")
plt.plot(epochs_rmsprop, loss_rmsprop, "ro", label = "Training loss - RMSProp")
plt.plot(epochs_rmsprop, val_loss_rmsprop, "r", label = "Validation loss - RMSProp")
plt.title("Classifier/MobileNetV2 performance comparison (Loss)")
plt.legend()

plt.show()

# Model Selection

## Select the best Model

In [None]:
def get_weights_path(path):
    bestLoss = int(9e9)
    lastEpoch = int(9e9)
    bestWeights = ""
    
    for file in os.listdir(path):
        valLoss = float(file.split("val")[1].split(".h5")[0])
        epoch = int(file.split("_loss")[0].split("epoch")[1])
        if (valLoss < bestLoss) or ((valLoss == bestLoss) and (epoch > lastEpoch)):
            bestLoss = valLoss
            lastEpoch = epoch
            bestWeights = f"{path}{file}"
    
    if bestWeights == "": raise FileNotFoundError(f"There is no model saved in `{weights_dir}`...")
            
    return bestWeights

bestWeights = get_weights_path("./weights/")
print(bestWeights)

## Load the best Model

In [None]:
bestModel = load_model(bestWeights)

# Model Predict

## Predict with `clf_adam` model

In [None]:
preds_adam = model_clf_adam.predict_generator(test_data, steps = np.ceil(test_data.samples/batch_size))
preds_adam = preds_adam.astype(np.int32).reshape(test_data.labels.shape)

## Predict with `clf_rmsprop` model

In [None]:
preds_rmsprop = model_clf_rmsprop.predict_generator(test_data, steps = np.ceil(test_data.samples/batch_size))
preds_rmsprop = preds_rmsprop.astype(np.int32).reshape(test_data.labels.shape)

## Predict with `mnv2` model

In [None]:
preds_mnv2 = model_mnv2.predict_generator(test_data, steps = np.ceil(test_data.samples/batch_size))
preds_mnv2 = preds_mnv2.astype(np.int32).reshape(test_data.labels.shape)

# Model Evaluation

## Evaluate `clf_adam`

In [None]:
print(f"Confusion matrix:\n\n{confusion_matrix(test_data.labels, preds_adam, labels=[0, 1])}")

In [None]:
print(f"Accuracy = {round(accuracy_score(test_data.labels, preds_adam) * 100, 2)}%")

In [None]:
print(f"Classification report:\n\n{classification_report(test_data.labels, preds_adam, target_names=test_data.class_indices)}")

## Evaluate `clf_rmsprop`

In [None]:
print(f"Confusion matrix:\n\n{confusion_matrix(test_data.labels, preds_rmsprop, labels=[0, 1])}")

In [None]:
print(f"Accuracy = {round(accuracy_score(test_data.labels, preds_rmsprop) * 100, 2)}%")

In [None]:
print(f"Classification report:\n\n{classification_report(test_data.labels, preds_rmsprop, target_names=test_data.class_indices)}")

## Evaluate `mnv2`

In [None]:
print(f"Confusion matrix:\n\n{confusion_matrix(test_data.labels, preds_mnv2, labels=[0, 1])}")

In [None]:
print(f"Accuracy = {round(accuracy_score(test_data.labels, preds_mnv2) * 100, 2)}%")

In [None]:
print(f"Classification report:\n\n{classification_report(test_data.labels, preds_mnv2, target_names=test_data.class_indices)}")