# Multi Task Model

## Imports

In [None]:
import numpy as np
import matplotlib.pyplot as plt
# Tensorflow imports
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.applications import MobileNet

from loadData import createDataset

## Model parameters

In [None]:
# Log parameters
model_name = 'multiTaskModel_cascade'
savedModelPath = f'../../log/saved_models/{model_name}'
tb_log_dir = f'../../log/tensorboard/{model_name}'
cp_filepath = f'../../log/cps/{model_name}/latest_weights.h5'

# Dynamic hyperparameters
learningRate = 0.001
doDataAugmentation = True
doFineTuning = True
dropoutRate = 0.25
width_multiplier = 1
depth_multiplier = 1

# Training parameters
batch_size = 64
epochs = 20

## Model callbacks

In [None]:
callbacks = [
    # Checkpoint callback                    
    keras.callbacks.ModelCheckpoint(
                    filepath=cp_filepath, 
                    verbose=1, 
                    save_weights_only=True),

    # Tensorboard callback
    keras.callbacks.TensorBoard(log_dir=tb_log_dir, histogram_freq=1),

    # Early Stopping callback
    # keras.callbacks.EarlyStopping(
    #                 monitor="val_loss",
    #                 patience=2,
    #                 verbose=1)
]

## Data parameters

In [None]:
image_height = 160
image_width = 160

## Data Augmentation

In [None]:
data_augmentation = keras.Sequential(
    [
        keras.layers.RandomRotation(0.1),
        keras.layers.RandomBrightness(0.2),
    ]
)

## Creating the training datasets

In [None]:
x, y_face, y_mask, y_age = createDataset('../../data/m3/training', (image_height, image_width))

# for x,y in training_ds:
#     for batch in y:
#         print(batch)

## Creating the test dataset

In [None]:
# test_ds = createDataset('../../data/m3/training', batch_size, 0)

## Load the ResNet model

In [None]:
# Loading either the MobileNet architecture model or the previously saved model, and freeze it for transfer learning
mobilenet = MobileNet(
                input_shape=(image_height, image_width, 3), # Optional shape tuple, only to be specified if include_top is False
                alpha=width_multiplier, # Controls the width of the network. (Width multiplier)
                depth_multiplier=depth_multiplier, # Depth multiplier for depthwise convolution. (Resolution multiplier)
                dropout=dropoutRate, # Dropout rate. Default to 0.001.
                weights="imagenet",
                input_tensor=None,
                pooling='avg', # Optional pooling mode for feature extraction when include_top is False. (None, avg, max)
                include_top=False
                )
           
# Freeze the base model
# mobilenet.trainable = False

inputs = keras.Input(shape=(image_height, image_width, 3))

# Data Augmentation on input
if(doDataAugmentation):
    inputs = data_augmentation(inputs)

# Running base model in inference mode
base_model = mobilenet(inputs, training=False)

## Creating Task 1 (Face Detection) Top Model

In [None]:
# Add Dense layer
face_head = tf.keras.layers.Dense(256, activation='leaky_relu')(base_model)
face_head = tf.keras.layers.Dense(128, activation='leaky_relu')(face_head)

# Final layer for binary classification
face_outputs = keras.layers.Dense(1, activation='sigmoid', name='face_output')(face_head)

In [None]:
multiply_layer = keras.layers.Multiply()([base_model, face_outputs])
multiply_layer = keras.layers.Dense(512, activation='leaky_relu')(multiply_layer)

In [None]:
# face_model = keras.Model(inputs, face_outputs)

# face_model.compile(
#             optimizer=keras.optimizers.Adam(), # Learning Rate?
#             loss=keras.losses.BinaryCrossentropy(), 
#             metrics=['accuracy']
# )

# face_model.summary()

# face_model.fit(
#             training_ds,
#             validation_data=validation_ds,
#             epochs=epochs, 
#             callbacks=callbacks
# )

## Creating Task 2 (Mask Detection) Top Model

In [None]:
# Add Dense layer
mask_head = tf.keras.layers.Dense(128, activation='leaky_relu')(multiply_layer)
mask_head = tf.keras.layers.Dense(64, activation='leaky_relu')(mask_head)

# Final layer for binary classification
mask_outputs = keras.layers.Dense(1, activation='sigmoid', name='mask_output')(mask_head)

## Creating Task 3 (Age Prediction) Top Model

In [None]:
# Add Dense layer
age_head = tf.keras.layers.Dense(256, activation='leaky_relu')(multiply_layer)
age_head = tf.keras.layers.Dense(128, activation='leaky_relu')(age_head)
age_head = tf.keras.layers.Dense(64, activation='leaky_relu')(age_head)

# Final layer for binary classification
age_outputs = keras.layers.Dense(120, activation='softmax', name='age_output')(age_head)

## Creating and compiling the final model

In [None]:

model = keras.Model(inputs, [face_outputs, mask_outputs, age_outputs])
#keras.utils.plot_model(model)

# Using a joint loss function for the three tasks:
# [ Loss = gamma * Loss_task1 + gamma * Loss_task2 + gamma * Loss_task3 ]
# Because every task is dependant on every other task, the model receives the loss of every task when gamma > 0

gamma = 1

model.compile(
      optimizer=keras.optimizers.Adam(), # Learning Rate?
            loss={
                  'face_output': keras.losses.BinaryCrossentropy(), 
                  'mask_output': keras.losses.BinaryCrossentropy(),
                  'age_output': keras.losses.SparseCategoricalCrossentropy(ignore_class=-1)
                  },
            loss_weights={
                  'face_output': 0.33 * gamma,
                  'mask_output': 0.33 * gamma,
                  'age_output': 0.33 * gamma
                  }, 
            metrics={
                  'face_output': keras.metrics.BinaryAccuracy(), 
                  'mask_output': keras.metrics.BinaryAccuracy(),
                  'age_output': keras.metrics.SparseCategoricalAccuracy()
                  },
)

model.summary()

## Training the model with the dataset

In [39]:
history = model.fit(
            x,
            y={'face_output': y_face, 
             'mask_output': y_mask,
             'age_output': y_age},
            batch_size=batch_size,
            epochs=epochs, 
            callbacks=callbacks,
            validation_split=0.2,
            shuffle=True
        )

 43/256 [====>.........................] - ETA: 4:18 - loss: 2.6570 - face_output_loss: 2.0237 - mask_output_loss: 1.1500 - age_output_loss: 4.8779 - face_output_binary_accuracy: 0.4535 - mask_output_sparse_categorical_accuracy: 0.6999 - age_output_sparse_categorical_accuracy: 0.0389

KeyboardInterrupt: 

## Fine tuning

In [None]:
if doFineTuning:
      model_name = model_name + '_ft'
      mobilenet.trainable = True
      model.summary()

      model.compile(
            optimizer=keras.optimizers.Adam(1e-5),  # Low learning rate
            loss={
                  'face_output': keras.losses.BinaryCrossentropy(), 
                  'mask_output': keras.losses.BinaryCrossentropy(),
                  'age_output': keras.losses.SparseCategoricalCrossentropy(ignore_class=-1)
                  },
            loss_weights={
                  'face_output': gamma, 
                  'mask_output': gamma,
                  'age_output': gamma
                  }, 
                  metrics=['accuracy']
      )

      model.fit(training_ds, epochs=epochs)

## Save the model

In [None]:
model.save(savedModelPath)

## Test model with test dataset

In [None]:
results = model.evaluate(test_ds)

print(f'Loss: {results[0]}; Accuracy: {results[1]}')

## Predict new images

In [None]:
# https://www.tensorflow.org/tutorials/images/classification

img = tf.keras.utils.load_img(
    '../../data/m3/training/face/noMask/1_0_34_512938.jpg', target_size=(image_height, image_width)
)
img_array = tf.keras.utils.img_to_array(img)
img_array_batch = tf.expand_dims(img_array, 0) # Create a batch

preds = model.predict(img_array_batch)
age_output_indexes = np.array([i for i in range(0, 120)])
apparent_predictions = np.sum(preds[2][0] * age_output_indexes)

print(f"Face: {preds[0][0]*100}%")
print(f"Mask: {preds[1][0]*100}%")
print(f"Age: {np.argmax(preds[2][0])}")

ax = plt.subplot(1, 1, 1)
plt.imshow(img)
plt.title("Face: {:.2f}% | Mask: {:.2f}% | Age: {:.0f}".format(preds[0][0][1]*100, preds[1][0][1]*100, apparent_predictions))