# Final Model Test

After hyper-parameter tuning and regularisation, we have derived the
final model defined below. In this notebook, we run it against the final
test data.


In [None]:
from gc import collect
from typing import Tuple

import matplotlib.pyplot as plt
import numpy as np
from h5py import File
from pandas import read_csv
from sklearn.metrics import accuracy_score, classification_report, \
    confusion_matrix
from tensorflow.config.experimental import list_physical_devices, \
    set_memory_growth
from tensorflow.keras.backend import clear_session
from tensorflow.keras.callbacks import CSVLogger, LearningRateScheduler, \
    ModelCheckpoint, EarlyStopping
from tensorflow.keras.layers import Conv2D, Dense, Flatten, Input, MaxPool2D
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.random import set_random_seed


### Model Creation Function

# TODO: RUN THE OTHER SCRIPTS AND DETERMINE THE BEST MODEL CONFIGURATION THEN INPUT BELOW

In [None]:
# Final model
def create_model(input_shape: Tuple[int, int, int], num_classes: int) -> Model:
    inputs = Input(shape=input_shape)
    
    layer = Conv2D(filters=64, kernel_size=(3, 3), strides=1, padding="same", 
                   activation="relu", kernel_initializer="he_normal",
                   bias_initializer="he_normal")(inputs)
    layer = Conv2D(filters=64, kernel_size=(3, 3), strides=1, padding="same", 
                   activation="relu", kernel_initializer="he_normal",
                   bias_initializer="he_normal")(layer)
    layer = MaxPool2D(pool_size=(2, 2), strides=2)(layer)

    layer = Conv2D(filters=128, kernel_size=(3, 3), strides=1, padding="same", 
                   activation="relu", kernel_initializer="he_normal",
                   bias_initializer="he_normal")(layer)
    layer = Conv2D(filters=128, kernel_size=(3, 3), strides=1, padding="same", 
                   activation="relu", kernel_initializer="he_normal",
                   bias_initializer="he_normal")(layer)
    layer = MaxPool2D(pool_size=(2, 2), strides=2)(layer)
    
    layer = Conv2D(filters=256, kernel_size=(3, 3), strides=1, padding="same", 
                   activation="relu", kernel_initializer="he_normal",
                   bias_initializer="he_normal")(layer)
    layer = Conv2D(filters=256, kernel_size=(3, 3), strides=1, padding="same", 
                   activation="relu", kernel_initializer="he_normal",
                   bias_initializer="he_normal")(layer)
    layer = MaxPool2D(pool_size=(2, 2), strides=2)(layer)
    
    layer = Conv2D(filters=512, kernel_size=(3, 3), strides=1, padding="same", 
                   activation="relu", kernel_initializer="he_normal",
                   bias_initializer="he_normal")(layer)
    layer = Conv2D(filters=512, kernel_size=(3, 3), strides=1, padding="same", 
                   activation="relu", kernel_initializer="he_normal",
                   bias_initializer="he_normal")(layer)
    layer = MaxPool2D(pool_size=(2, 2), strides=2)(layer)
    
    layer = Conv2D(filters=512, kernel_size=(3, 3), strides=1, padding="same", 
                   activation="relu", kernel_initializer="he_normal",
                   bias_initializer="he_normal")(layer)
    layer = Conv2D(filters=512, kernel_size=(3, 3), strides=1, padding="same", 
                   activation="relu", kernel_initializer="he_normal",
                   bias_initializer="he_normal")(layer)
    layer = MaxPool2D(pool_size=(2, 2), strides=2)(layer)
    
    layer = Flatten()(layer)
    layer = Dense(units=4096, activation="relu", 
                  kernel_initializer="he_normal",
                  bias_initializer="he_normal")(layer)
    layer = Dense(units=4096, activation="relu", 
                  kernel_initializer="he_normal",
                  bias_initializer="he_normal")(layer)
    layer = Dense(num_classes, activation="softmax")(layer)
    
    model = Model(inputs=inputs, outputs=layer)
    model.compile(optimizer=Adam(learning_rate=0.0001), 
                  loss="categorical_crossentropy", metrics=["accuracy"])
    
    return model


### Other Functions


In [None]:
def refresh_session():
    # Call this before training a new model, to free up memory from the 
    # previous model
    clear_session()
    try:
        del model
    except NameError:
        pass
    collect()
    
    
def import_dataset(filepath: str = "./dataset.hdf5") \
        -> Tuple[np.ndarray, np.ndarray, np.ndarray, 
                 np.ndarray, np.ndarray, np.ndarray]:
    file = File(filepath, "r")
    train_data = file.get("tr_data")[()]
    val_data = file.get("val_data")[()]
    test_data = file.get("ts_data")[()]
    train_labels = file.get("tr_labels")[()]
    val_labels = file.get("val_labels")[()]
    test_labels = file.get("ts_labels")[()]
    
    return train_data, val_data, test_data, \
           train_labels, val_labels, test_labels


def get_test_results(test_model: Model, test_data: np.ndarray, 
                     test_labels: np.ndarray) -> Tuple:
    predicts = test_model.predict(test_data)
    pred_out = np.argmax(predicts, axis=1)
    test_out = np.argmax(test_labels, axis=1)
    labels = ["car", "heavy vehicles", "motorcycle"]
    
    return accuracy_score(test_out, pred_out), \
           confusion_matrix(test_out, pred_out), \
           classification_report(test_out, pred_out, target_names=labels)


def get_learn_rate(epoch: int) -> float:
    if epoch <= 10:
        lr = 1e-4
    elif epoch <= 20:
        lr = 5e-5
    elif epoch <= 30:
        lr = 1e-5
    elif epoch <= 40:
        lr = 5e-6
    else:
        lr = 1e-6
    print(f"Learning rate: {lr}")
    
    return lr


### Initialise Environment

In [None]:
# Configure tensorflow to optimise GPU utilisation
gpu_list = list_physical_devices("GPU")
for gpu in gpu_list:
    set_memory_growth(gpu, True)
del gpu_list

# Fix tensorflow random seed
set_random_seed(324)

tr_dat, val_dat, ts_dat, tr_lbls, val_lbls, ts_lbls = import_dataset()

in_shape = (tr_dat.shape[1], tr_dat.shape[2], tr_dat.shape[3])
num_cls = tr_lbls.shape[1]

lr_scheduler = LearningRateScheduler(get_learn_rate)
early_stopper = EarlyStopping(monitor="val_loss", patience=10, verbose=1, 
                              mode="min", restore_best_weights=True) 

# Combine training and validation data into one set for the final training
final_tr_dat = np.concatenate((tr_dat, val_dat), axis=0)
final_tr_lbls = np.concatenate((tr_lbls, val_lbls), axis=0)

# tr and val variables no longer needed, delete to free up space
del tr_dat
del val_dat
del tr_lbls
del val_lbls
collect()


### Model Creation and Training

In [None]:
refresh_session()
data_gen = ImageDataGenerator(
    rotation_range=45, width_shift_range=0.2, height_shift_range=0.2,
    zoom_range=0.2, horizontal_flip=True)

model = create_model(in_shape, num_cls)
print(model.summary())

checkpoint = ModelCheckpoint("./trained_models/grandfinal_best.hdf5", 
                             monitor="val_loss", verbose=0, 
                             save_best_only=True, mode="min")
logger = CSVLogger("./training_logs/grandfinal_log.csv")

model.fit_generator(
    data_gen.flow(final_tr_dat, final_tr_lbls, batch_size=32, shuffle=True),
    steps_per_epoch=(len(tr_dat) / 32), epochs=100, verbose=2, 
    callbacks=[checkpoint, logger, lr_scheduler, early_stopper], 
    validation_data=(ts_dat, ts_lbls))


# Training phase is complete: free training data memory
del final_tr_dat
del final_tr_lbls
refresh_session()


### Data Visualisation

In [None]:
plt.style.use("ggplot")

refresh_session()
model = load_model("./trained_models/grandfinal_best.hdf5")
acc_score, conf_matrix, class_report = \
    get_test_results(model, ts_dat, ts_lbls)

print(f"Test accuracy: {acc_score}")
print("Confusion Matrix:")
print(conf_matrix)
print(class_report)

log = read_csv("./training_logs/grandfinal_log.csv")

plt.figure(figsize=[12.5, 12.5])
plt.subplot(311)
plt.plot(log["val_loss"], label="loss")
plt.plot(log["val_acc"], label="accuracy")
plt.title("Test")
plt.legend(loc="upper left", bbox_to_anchor=(1.0, 1.0))

plt.subplot(312)
plt.plot(log["loss"], label="train loss")
plt.plot(log["val_loss"], label="validation loss")
plt.title("Loss")
plt.legend(loc="upper left", bbox_to_anchor=(1.0, 1.0))

plt.subplot(313)
plt.plot(log["acc"], label="train accuracy")
plt.plot(log["val_acc"], label="validation accuracy")
plt.title("Accuracy")
plt.legend(loc="upper left", bbox_to_anchor=(1.0, 1.0))



