# 1. Setup

In [None]:
import matplotlib.pyplot as plt
import keras.optimizers as optimizers
from keras.layers import Flatten, Dense
from keras import models
from keras import regularizers
import tensorflow as tf
import math
import os

In [None]:
# image parameters
input_height = 69
input_width = 69

# augmentation parameters
rescale = True
if rescale:
    rescale_size=1./255
else:
    rescale_size=1
augmentation=False

rotation_range=40
width_shift_range=0.2
height_shift_range=0.1
shear_range=0.2
zoom_range=0.2
horizontal_flip=True
fill_mode='nearest'

In [None]:
# best loss function for multi-class classification, measures the distance between two probability distributions
# the probability distribution of the output of the network and the true distribution of the labels
loss_function='categorical_crossentropy'

optimizer='rmsprop'
optimizer_learning_rate=1e-4
epochs=100
batch_size=32
num_classes = 9
regularizer=regularizers.l1_l2(l1=0.001, l2=0.001) # simultaneous l1 and l2, add 0.001*weight_coefficient_value + 0.001 * 1/2*weight^2

if optimizer == 'rmsprop':
    optimizer=optimizers.RMSprop(learning_rate=optimizer_learning_rate)

In [None]:
# customized metrics for multi class classification

import keras.backend as K

def precision(y_true, y_pred):
    """Precision metric.
    Computes the precision, a metric for multi-label classification of
    how many selected items are relevant.
    """
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def recall(y_true, y_pred):
    """Recall metric.
    Computes the recall, a metric for multi-label classification of
    how many relevant items are selected.
    """
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

metrics = [
    precision,
    recall,
    tf.keras.metrics.CategoricalAccuracy(name='acc')
]

In [None]:
paths = {
    'TRAIN_PATH' : os.path.join('..','workspace', 'images', 'train'),
    'TEST_PATH' : os.path.join('..','workspace', 'images','test'),
    'EVAL_PATH' : os.path.join('..','workspace', 'images','eval'),
    'IMAGES_PATH': os.path.join('..','workspace','images','all'),
    'LOG_DIR' : os.path.join('..','model', 'log_dir')
 }

In [None]:
# training set class distribution
def plotTrainingDistribution():
    files_per_label = dict()
    for i in range(9):
      path = os.path.join(paths['TRAIN_PATH'],str(i))
      n_images = len([f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))])
      files_per_label[i] = n_images
    plt.bar(list(files_per_label.keys()), files_per_label.values(), color='g')
    plt.show()
    print(files_per_label)
    return files_per_label

files_per_label = plotTrainingDistribution()

In [None]:
# class weights computation
tot_images = sum(list(files_per_label.values()))
weights = dict([ (class_label , tot_images/(num_classes * n_images)) for class_label, n_images in files_per_label.items()])
print(weights)

In [None]:
# training set image data generator
from keras.preprocessing.image import ImageDataGenerator
train_datagen = ImageDataGenerator(rescale=1./255)
train_dir=paths['TRAIN_PATH']
train_generator = train_datagen.flow_from_directory(train_dir, target_size=(input_width, input_height), batch_size=batch_size, class_mode='categorical')

In [None]:
# validation set image data generator
val_datagen = ImageDataGenerator(rescale=rescale_size)
validation_dir=paths['EVAL_PATH']
validation_generator = val_datagen.flow_from_directory(validation_dir, target_size=(input_width, input_height), batch_size=batch_size, class_mode='categorical')

# 3. Model creation

In [None]:
from keras.applications import VGG16

# pre-trained model on which we applied feature extraction
conv_base = VGG16(weights='imagenet',
                    include_top=False, # exclude fully connected layer
                    input_shape=(input_width, input_height, 3))

In [None]:
def build_model(input_conv_base):
    #build the cnn using the pre-trained cnn
    built_model = models.Sequential()
    built_model.add(input_conv_base)
    # add fully connected layer
    built_model.add(Flatten())
    built_model.add(Dense(64, activation='relu'))
    built_model.add(Dense(9, activation='softmax'))

    return built_model

In [None]:
conv_base.summary()

In [None]:
conv_base.trainable = False # freeze conv_base otherwise representation previously learned got updated
# only the weights of the densely connected layer will be trained

In [None]:
conv_base.summary()

In [None]:
#build model
model = build_model(conv_base)
model.compile(optimizer=optimizer,
              loss=loss_function,
              metrics=metrics)

In [None]:
model.summary()

# 4. Model training

In [None]:
number_training = tot_images

n_images_eval = 0
for i in range(9):
    path = os.path.join(paths['EVAL_PATH'],str(i))
    # compute number of images in each eval folder and sum it up
    n_images_eval = n_images_eval + len([f for f in os.listdir(path)if os.path.isfile(os.path.join(path, f))])

number_eval = n_images_eval

In [None]:
import keras

callbacks_list = [
        # interrupts training when accuracy has stopped improving accuracy on the validation set for at least 3+1=4 epochs
        keras.callbacks.EarlyStopping(
            monitor='acc', # should be part of the metrics specific during compilation
            patience=5,
        ),
        # monitor the model's validation loss and reduce the LR when the validation loss has stopped improving, effective strategy to escape local minima
        keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.2, # divides LR by 5 when triggered
            patience=3 # called when stopped improving for 3 epochs
        )
]

history = model.fit(
      train_generator,
      steps_per_epoch=int(math.ceil((1. * number_training) / batch_size)),
      epochs=epochs,
      class_weight=weights,
      validation_data=validation_generator,
      validation_steps=int(math.ceil((1. * number_eval) / batch_size)),
      callbacks=callbacks_list)

  # 5. Visualization

In [None]:
acc = history.history['acc']
val_acc = history.history['val_acc']
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(acc) + 1)

In [None]:
plt.plot(epochs, acc, 'r', label='Training acc')
plt.plot(epochs, val_acc, 'b', label='Validation acc')
plt.title('Training and validation ACC')
plt.legend()
plt.figure()

In [None]:
plt.plot(epochs, loss, 'r', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()
plt.show()

In [None]:
precision = history.history['precision']
val_precision = history.history['val_precision']
plt.plot(epochs, precision, 'r', label='Training precision')
plt.plot(epochs, val_precision, 'b', label='Validation precision')
plt.title('Training and validation Precision')
plt.legend()
plt.figure()

In [None]:
recall = history.history['recall']
val_recall = history.history['val_recall']
plt.plot(epochs, recall, 'r', label='Training recall')
plt.plot(epochs, val_recall, 'b', label='Validation recall')
plt.title('Training and validation Recall')
plt.legend()
plt.figure()

In [None]:
# smooth curves if they look noisy
# replace each point with an exponential moving average of the previous points
def smooth_curve(points, factor=0.8):
  smoothed_points = []
  for point in points:
    if smoothed_points:
      previous = smoothed_points[-1]
      smoothed_points.append(previous * factor + point * (1 - factor))
    else:
      smoothed_points.append(point)
  return smoothed_points

In [None]:
plt.plot(epochs,
         smooth_curve(acc), 'r', label='Smoothed training acc')
plt.plot(epochs,
         smooth_curve(val_acc), 'b', label='Smoothed validation acc')
plt.title('Training and validation MAE')
plt.legend()
plt.figure()

In [None]:
plt.plot(epochs,
         smooth_curve(loss), 'r', label='Smoothed training loss')
plt.plot(epochs,
         smooth_curve(val_loss), 'b', label='Smoothed validation loss')
plt.title('Training and validation loss')
plt.legend()
plt.show()
# display average, the model may improve even if not reflected

  # 6. Model testing

In [None]:
number_test = 0
for i in range(9):
      path = os.path.join(paths['TEST_PATH'],str(i))
      n_images = len([f for f in os.listdir(path)if os.path.isfile(os.path.join(path, f))])
      number_test += n_images

In [None]:
test_dir=paths['TEST_PATH']
test_datagen = ImageDataGenerator(rescale=rescale_size) # it should not be augmented

test_generator = test_datagen.flow_from_directory(test_dir, target_size=(input_width, input_height), batch_size=batch_size, class_mode='categorical', classes=None, shuffle=False)

In [None]:
# classification report and confusion matrixx

import numpy as np
from sklearn.metrics import confusion_matrix, classification_report

test_generator.reset()
Y_pred = model.predict_generator(test_generator, number_test // batch_size+1)
y_pred = np.argmax(Y_pred, axis=1)
print('Confusion Matrix')
print(confusion_matrix(test_generator.classes, y_pred))
print('Classification Report')
target_names = ['0','1','2','3','4','5','6','7','8']
print(classification_report(test_generator.classes, y_pred, target_names=target_names))

In [None]:
# ROC curve and AUC

from sklearn.preprocessing import LabelBinarizer
from sklearn.metrics import roc_curve, roc_auc_score, auc
fig, c_ax = plt.subplots(1,1, figsize = (12, 8))

def multiclass_roc_auc_score(y_test, y_pred, average="macro"):
    lb = LabelBinarizer()
    lb.fit(y_test)
    y_test = lb.transform(y_test)
    y_pred = lb.transform(y_pred)

    for (idx, c_label) in enumerate(target_names):
        fpr, tpr, thresholds = roc_curve(y_test[:,idx].astype(int), y_pred[:,idx])
        c_ax.plot(fpr, tpr, label = '%s (AUC:%0.2f)'  % (c_label, auc(fpr, tpr)))
    c_ax.plot(fpr, fpr, 'b-', label = 'Random Guessing')
    c_ax.legend()
    return roc_auc_score(y_test, y_pred, average=average)

test_generator.reset()
y_pred = model.predict_generator(test_generator, verbose = True)
y_pred = np.argmax(y_pred, axis=1)
print("Multiclass roc auc score:", multiclass_roc_auc_score(test_generator.classes, y_pred))

# 7. Model exportation

In [None]:
model.save("models/VGG_ feature_extractor.h5")

# 8. Plot model as graph of layers

In [None]:
from keras.utils import plot_model

In [None]:
plot_model(model, show_shapes=True, to_file='feature_extraction_model.png')