# Final model

In [None]:
from google.colab import drive
drive.mount('/gdrive')
%cd /gdrive/My Drive/Colab Notebooks/AN2DL/Homework1

## Import libraries

In [None]:
import random
import os
from pathlib import Path

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format='retina'
plt.style.use('ggplot')

from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
from sklearn.utils import class_weight

import tensorflow as tf
from tensorflow import keras
from keras import layers

print(tf.__version__)

# Define auxiliary functions

In [None]:
def nice_plot(hist):
    """Prints the diagnostic plot"""
    fig, (ax1,ax2) = plt.subplots(2, sharex=True) # Two subplots, the axes array is 1-d
    
    fig.suptitle('Training and validation')

    ax1.plot(hist.history['accuracy'], label='Training accuracy')
    ax1.plot(hist.history['val_accuracy'], label='Validation accuracy')
    #ax1.set_title('Training and validation accuracy')
    ax1.set_ylabel('Accuracy')
    ax1.set_xlabel('Epochs')
    ax1.legend(loc='upper left')
    
    ax2.plot(history.history['loss'], label='Training loss')
    ax2.plot(history.history['val_loss'], label='Validation loss')
    #ax2.set_title('Training and validation loss')
    ax2.set_ylabel('Loss')
    ax2.set_xlabel('Epochs')
    ax2.legend(loc='upper left')
    
    plt.show()
    

def evaluate_model(model, dataset):
    """Prints the accuracy of the model evaluated on the given dataset"""
    test_loss, test_acc = model.evaluate(dataset)
    print(f'Accuracy: {test_acc*100:.2f}%')
    

def print_trainability(model):
    """Show which layers are trainable and which are not"""
    for i, layer in enumerate(model.layers):
        print(f'{i:<2} {layer.name:<13} {layer.trainable}')


def compile_model(model, lr=1e-3, opt='Adam'):
    """Compile model according to learning rate and optimizer with CategoricalCrossentropy loss"""
    if opt == 'Adam':
        chosen_opt = keras.optimizers.Adam(learning_rate=lr)
    elif opt == 'RMSprop':
        chosen_opt = keras.optimizers.RMSprop(learning_rate=lr)
    else:
        print('No valid optimizer chosen')
        return
    model.compile(
        loss=keras.losses.CategoricalCrossentropy(),
        optimizer=chosen_opt,
        metrics=['accuracy'])


def fit_model(model, patience=20, append_callback=None, weights=None):
    """Fit model with EarlyStopping on validation accuracy and class weights if specified"""
    callbacks = [
        keras.callbacks.EarlyStopping(
            monitor='val_accuracy',
            mode='max',
            patience=patience,
            restore_best_weights=True),
    ]

    if append_callback:
        callbacks.append(append_callback)

    if weights:
        history = model.fit(train_dataset,epochs=5000,validation_data=validation_dataset,callbacks=callbacks,class_weight=weights)
    else:
        history = model.fit(train_dataset,epochs=5000,validation_data=validation_dataset,callbacks=callbacks)
    
    return history

# Global settings

In [None]:
IMAGE_SHAPE = [96,96]
INPUT_SHAPE = (*IMAGE_SHAPE,3)
BATCH_SIZE = 16
SEED = 42
DATASET_DIR = Path() / 'training_data_final'
MODELS_DIR = Path() / 'models'
MODELS_DIR.mkdir(parents=True, exist_ok=True)
CLASSES = [f'Species{i+1}' for i in range(8)]
NUM_CLASSES = len(CLASSES)

In [None]:
tf.random.set_seed(SEED)

# Instantiate the dataset generators

Create two generators, one for training and one for validation.
It should be more correct to create one generator only and select `subset='training'` and `subset='validation'`, but in this way augmentation is applied to both sets. I prefer to create [two generators with the same split and the same seed.](https://stackoverflow.com/questions/71744605/keras-imagedatagenerator-validation-split-does-not-split-validation-data-as-expe)

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# train generator with augmentation
train_image_gen  = ImageDataGenerator(rotation_range=40,
                                      width_shift_range=0.2,
                                      height_shift_range=0.2,
                                      zoom_range=[0.5,1.5],
                                      brightness_range=[0.5,1.5],
                                      shear_range=0.2,
                                      vertical_flip=True,
                                      horizontal_flip=True,
                                      fill_mode='reflect',
                                      validation_split = 0.15,
                                      )

# validation generator without augmentation
validation_image_gen = ImageDataGenerator(validation_split = 0.15)

train_dataset = train_image_gen.flow_from_directory(directory=DATASET_DIR,
                                                    target_size=IMAGE_SHAPE,
                                                    color_mode='rgb',
                                                    classes=None,
                                                    class_mode='categorical',
                                                    batch_size=BATCH_SIZE,
                                                    shuffle=True,
                                                    seed=SEED,
                                                    subset='training',
                                                    )

validation_dataset = validation_image_gen.flow_from_directory(directory=DATASET_DIR,
                                                              target_size=IMAGE_SHAPE,
                                                              color_mode='rgb',
                                                              classes=None,
                                                              class_mode='categorical',
                                                              batch_size=BATCH_SIZE,
                                                              shuffle=False,
                                                              seed=SEED,
                                                              subset='validation'
                                                              )

# Vanilla CNN baseline

In [None]:
train_dataset_not_augmented = validation_image_gen.flow_from_directory(directory=DATASET_DIR,
                                                                       target_size=IMAGE_SHAPE,
                                                                       color_mode='rgb',
                                                                       classes=None,
                                                                       class_mode='categorical',
                                                                       batch_size=BATCH_SIZE,
                                                                       shuffle=True,
                                                                       seed=SEED,
                                                                       subset='training',
                                                                       )

In [None]:
inputs = keras.Input(shape=INPUT_SHAPE)
x = layers.Rescaling(1./255)(inputs)

for size in [32, 64, 128, 256, 512]:
    x = layers.Conv2D(size, 3, padding='same', activation='relu', kernel_initializer=keras.initializers.HeUniform(seed=SEED))(x)
    x = layers.MaxPooling2D(2)(x)

x = layers.Flatten()(x)
x = layers.Dropout(0.3)(x)
x = layers.Dense(512, kernel_initializer=keras.initializers.HeUniform(seed=SEED), activation='relu')(x)
x = layers.Dropout(0.3)(x)
outputs = layers.Dense(NUM_CLASSES, activation='softmax', kernel_initializer=keras.initializers.GlorotUniform())(x)

# Connect input and output through the Model class
model_baseline = keras.Model(inputs, outputs, name='vanilla_CNN')

# Compile the model
compile_model(model_baseline, lr=1e-3, opt='Adam')

In [None]:
history = model_baseline.fit(
    x = train_dataset_not_augmented,
    epochs = 5000,
    validation_data = validation_dataset,
    callbacks = keras.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', patience=15, restore_best_weights=True)
)

In [None]:
evaluate_model(model_baseline, validation_dataset)
nice_plot(history)

# Supernet comparison

In the separate `supernet_choice.ipynb` notebook for clarity.

# Keras tuner

In the separate `keras_tuner.ipynb` notebook for clarity.

# Compute the class weights to account for class imbalance

Class weights are computed [according to this.](https://datascience.stackexchange.com/questions/13490/how-to-set-class-weights-for-imbalanced-classes-in-keras)

In [None]:
class_weights = class_weight.compute_class_weight(class_weight='balanced',
                                                  classes=np.unique(train_dataset.classes),
                                                  y=train_dataset.classes)

class_weights = dict(zip(np.unique(train_dataset.classes), class_weights))
class_weights

# Training VGG16

In [None]:
MODELS_DIR_VGG = MODELS_DIR / 'model_vgg16'
MODELS_DIR_VGG.mkdir(parents=True, exist_ok=True)

## Instantiate supernet

In [None]:
conv_base  = keras.applications.VGG16(
    weights='imagenet',
    include_top=False,
    input_shape=(224, 224,3)
)
conv_base.trainable = False

## Build model

In [None]:
inputs = keras.Input(shape=INPUT_SHAPE)

x = layers.Resizing(224, 224, interpolation='bicubic', name='resizing')(inputs)
x = keras.applications.vgg16.preprocess_input(x)
x = conv_base(x)

x = layers.GlobalAveragePooling2D()(x)

x = layers.Dense(384, kernel_initializer=keras.initializers.HeUniform(seed=SEED))(x)
x = layers.BatchNormalization()(x)
x = layers.Activation('relu')(x)
x = layers.Dropout(0.3)(x)

x = layers.Dense(256, kernel_initializer=keras.initializers.HeUniform(seed=SEED))(x)
x = layers.BatchNormalization()(x)
x = layers.Activation('relu')(x)
x = layers.Dropout(0.3)(x)

outputs = layers.Dense(NUM_CLASSES, activation='softmax', kernel_initializer=keras.initializers.GlorotUniform(seed=SEED))(x)

model_vgg16 = keras.Model(inputs, outputs, name='vgg16')

compile_model(model_vgg16, lr=1e-3, opt='Adam')

## Perform transfer learning

In [None]:
history = fit_model(model_vgg16, patience=20, weights=class_weights)
evaluate_model(model_vgg16, validation_dataset)
nice_plot(history)

In [None]:
model_vgg16.save(MODELS_DIR_VGG / '01_no_finetuning')

## Fine tuning (1st pass)

In [None]:
DEFROST = 4

In [None]:
model_vgg16 = keras.models.load_model(MODELS_DIR_VGG / '01_no_finetuning')

model_vgg16.get_layer('vgg16').trainable = True
for i, layer in enumerate(model_vgg16.get_layer('vgg16').layers[:-DEFROST]):
    layer.trainable = False

compile_model(model_vgg16, lr=1e-4, opt='Adam')

In [None]:
history = fit_model(model_vgg16, patience=20)
evaluate_model(model_vgg16, validation_dataset)
nice_plot(history)

In [None]:
model_vgg16.save(MODELS_DIR_VGG / '02_finetuning_pass1')

## Fine tuning (2nd pass)

In [None]:
DEFROST = 8

In [None]:
model_vgg16 = keras.models.load_model(MODELS_DIR_VGG / '02_finetuning_pass1')

model_vgg16.get_layer('vgg16').trainable = True
for i, layer in enumerate(model_vgg16.get_layer('vgg16').layers[:-DEFROST]):
    layer.trainable = False

compile_model(model_vgg16, lr=1e-4, opt='Adam')

In [None]:
append_callback = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=10, min_lr=1e-6)
history = fit_model(model_vgg16, patience=20, append_callback=append_callback)
evaluate_model(model_vgg16, validation_dataset)
nice_plot(history)

In [None]:
model_vgg16.save(MODELS_DIR_VGG / '03_finetuning_pass2')

## Fine tuning (3rd pass)

In [None]:
DEFROST = 12

In [None]:
model_vgg16 = keras.models.load_model(MODELS_DIR_VGG / '03_finetuning_pass2')

model_vgg16.get_layer('vgg16').trainable = True
for i, layer in enumerate(model_vgg16.get_layer('vgg16').layers[:-DEFROST]):
    layer.trainable = False

compile_model(model_vgg16, lr=1e-5, opt='Adam')

In [None]:
append_callback = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=10, min_lr=1e-6)
history = fit_model(model_vgg16, patience=30, append_callback=append_callback)
evaluate_model(model_vgg16, validation_dataset)
nice_plot(history)

In [None]:
model_vgg16.save(MODELS_DIR_VGG / '04_finetuning_pass3')

# Training ResNet-50

In [None]:
MODELS_DIR_RESNET = MODELS_DIR / 'model_resnet50'
MODELS_DIR_RESNET.mkdir(parents=True, exist_ok=True)

## Instantiate supernet

In [None]:
conv_base  = keras.applications.ResNet50(
    weights='imagenet',
    include_top=False,
    input_shape=(224, 224,3)
)
conv_base.trainable = False

## Build model

In [None]:
inputs = keras.Input(shape=INPUT_SHAPE)

x = layers.Resizing(224, 224, interpolation='bicubic', name='resizing')(inputs)
x = keras.applications.resnet.preprocess_input(x)
x = conv_base(x)

x = layers.GlobalAveragePooling2D()(x)

x = layers.Dense(512, kernel_initializer=keras.initializers.HeUniform(seed=SEED))(x)
x = layers.BatchNormalization()(x)
x = layers.Activation('relu')(x)
x = layers.Dropout(0.5)(x)

outputs = layers.Dense(NUM_CLASSES, activation='softmax', kernel_initializer=keras.initializers.GlorotUniform(seed=SEED))(x)

model_resnet50 = keras.Model(inputs, outputs, name='resnet50')

compile_model(model_resnet50, lr=1e-3, opt='Adam')

## Perform transfer learning

In [None]:
history = fit_model(model_resnet50, patience=20, weights=class_weights)
evaluate_model(model_resnet50, validation_dataset)
nice_plot(history)

In [None]:
model_resnet50.save(MODELS_DIR_RESNET / '01_no_finetuning')

## Fine tuning (1st pass)

In [None]:
DEFROST = 32

In [None]:
model_resnet50 = keras.models.load_model(MODELS_DIR_RESNET / '01_no_finetuning')

model_resnet50.get_layer('resnet50').trainable = True
for i, layer in enumerate(model_resnet50.get_layer('resnet50').layers[:-DEFROST]):
    layer.trainable = False

# make sure BatchNorm layers are frozen
for i, layer in enumerate(model_resnet50.get_layer('resnet50').layers):
    if isinstance(layer, layers.BatchNormalization):
        layer.trainable = False

compile_model(model_resnet50, lr=1e-4, opt='Adam')

In [None]:
append_callback = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=10, min_lr=1e-6)
history = fit_model(model_resnet50, patience=20, append_callback=append_callback)
evaluate_model(model_resnet50, validation_dataset)
nice_plot(history)

In [None]:
model_resnet50.save(MODELS_DIR_RESNET / '02_finetuning_pass1')

## Fine tuning (2nd pass)

In [None]:
DEFROST = 64

In [None]:
model_resnet50 = keras.models.load_model(MODELS_DIR_RESNET / '02_finetuning_pass1')

model_resnet50.get_layer('resnet50').trainable = True
for i, layer in enumerate(model_resnet50.get_layer('resnet50').layers[:-DEFROST]):
    layer.trainable = False

# make sure BatchNorm layers are frozen
for i, layer in enumerate(model_resnet50.get_layer('resnet50').layers):
    if isinstance(layer, layers.BatchNormalization):
        layer.trainable = False

compile_model(model_resnet50, lr=1e-4, opt='Adam')

In [None]:
append_callback = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=15, min_lr=1e-6)
history = fit_model(model_resnet50, patience=20, append_callback=append_callback)
evaluate_model(model_resnet50, validation_dataset)
nice_plot(history)

In [None]:
model_resnet50.save(MODELS_DIR_RESNET / '03_finetuning_pass2')

## Fine tuning (3rd pass)

In [None]:
DEFROST = 96

In [None]:
model_resnet50 = keras.models.load_model(MODELS_DIR_RESNET / '03_finetuning_pass2')

model_resnet50.get_layer('resnet50').trainable = True
for i, layer in enumerate(model_resnet50.get_layer('resnet50').layers[:-DEFROST]):
    layer.trainable = False

# make sure BatchNorm layers are frozen
for i, layer in enumerate(model_resnet50.get_layer('resnet50').layers):
    if isinstance(layer, layers.BatchNormalization):
        layer.trainable = False

compile_model(model_resnet50, lr=1e-5, opt='Adam')

In [None]:
append_callback = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=15, min_lr=1e-7)
history = fit_model(model_resnet50, patience=30, append_callback=append_callback)
evaluate_model(model_resnet50, validation_dataset)
nice_plot(history)

In [None]:
model_resnet50.save(MODELS_DIR_RESNET / '04_finetuning_pass3')

# Building the ensemble

Choose the best models from the previous trainings and put them together.

**Important note:** due to stochasticity, if the notebook if re-run it may happen that a different unfreeze configuration performs better. In such case, adjust manually here to build the ensemble from the two best models of the supernets.

In [None]:
model_vgg16 = keras.models.load_model(MODELS_DIR_VGG / '04_finetuning_pass3')
model_resnet50 = keras.models.load_model(MODELS_DIR_RESNET / '03_finetuning_pass2')
models = [model_vgg16, model_resnet50]

[On Stack Overflow](https://stackoverflow.com/questions/67647843/is-there-a-way-to-ensemble-two-keras-h5-models-trained-for-same-classes) there is also specified a way to weigh them differently.

In [None]:
inputs = keras.Input(shape=INPUT_SHAPE)
outputs = [model(inputs) for model in models]
outputs_ensemble = layers.Average()(outputs)
model_ensemble = keras.Model(inputs=inputs, outputs=outputs_ensemble, name='ensemble_vgg16-resnet50')
model_ensemble.save(MODELS_DIR / 'ensemble_vgg16-resnet50')
# keras.utils.plot_model(ensemble_model, to_file=str(Path() / 'report_material' / 'ensemble_vgg16-resnet50.png')

# Implementing Test-Time Augmentation

The source code of the `model.py` to implement TTA is as follows:

In [None]:
import os
import tensorflow as tf
import numpy as np

class model:
    def __init__(self, path):
        self.model = tf.keras.models.load_model(os.path.join(path, 'ensemble_vgg16-resnet50'))
        
    def predict(self, X):

        PERFORM_TTA = False
        
        if PERFORM_TTA:

            TTA_STEPS = 10
            BATCH_SIZE = 32

            test_gen = tf.keras.preprocessing.image.ImageDataGenerator(
                rotation_range=40,
                width_shift_range=0.2,
                height_shift_range=0.2,
                zoom_range=[0.5,1.5],
                brightness_range=[0.5,1.5],
                shear_range=0.2,
                vertical_flip=True,
                horizontal_flip=True,
                fill_mode='reflect')

            # make predictions
            yhats = []
            
            for i in range(TTA_STEPS):
                # predict augmented test set
                # preds = self.model.predict(test_gen.flow(X, batch_size=BATCH_SIZE, shuffle=False)) # version with the new .predict()
                preds = self.model.predict_generator(test_gen.flow(X, batch_size=BATCH_SIZE, shuffle=False), steps=len(X)/BATCH_SIZE)
                yhats.append(preds)

            yhats = np.array(yhats)
            yhats = np.mean(yhats, axis=0) # take average of predictions of augmented images
            yhats = np.argmax(yhats, axis=1) # argmax across classes

        else:

            yhats = self.model.predict(X)
            yhats = tf.argmax(yhats, axis=1) # argmax across classes

        return yhats

# Computing metrics

In the separate `metrics.ipynb` notebook for clarity.