### Model 3 Implementation (Method 2)
Model is constructed with multiple outputs, calculated type loss remains the same, 
usage loss is mutiplied with y_type ground truth label

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os, math
import pandas as pd
import random, json
import cv2
import tensorflow as tf
import seaborn as sns
import time
from keras.callbacks import TensorBoard
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_sample_image

### Preparing the Dataset

In [None]:
# Logging stuff
model_name  = "cnnMask_resnet_multiout_custom_loss_{}".format(int(time.time()))
tensorboard = TensorBoard(log_dir = 'logs/{}'.format(model_name))

In [None]:
BATCH_SIZE = 5
NUM_EPOCHS = 15
IMG_SIZE   = (128, 128)

DATASET_DIR = '../FINAL_DATASET'
IMAGE_DIR   = DATASET_DIR + '/croppedv2/'

percent_val = 0.1

mask_train = pd.read_csv(DATASET_DIR + '/traindf.csv')
mask_test  = pd.read_csv(DATASET_DIR + '/test.csv')

Sample all classes so that the number of images from each class are equal

In [None]:
equal_classes = pd.concat([mask_train, mask_test]).groupby('classname')
# Uncomment to sample all classes to make them equal sizes
#num_per_class = equal_classes.size().max() # or equal_classes.size().max(), equal_classes.size().min()
#equal_classes = pd.DataFrame(equal_classes.apply(lambda x : x.sample(num_per_class, replace = True)).reset_index(drop = True))
equal_classes = equal_classes.sample(frac = 1)

equal_classes['classname'].value_counts()

In [None]:
equal_classes['type' ] = equal_classes['classname']
equal_classes['usage'] = equal_classes['classname']

replacement_dict = {
    'type': { 
        'face_other_covering'       : 0.0,
        'face_with_mask_incorrect'  : 1.0,
        'face_with_mask'            : 1.0,
        'face_no_mask'              : 0.0,
    },  
    'usage': { 
        'face_other_covering'       :-1.0,
        'face_with_mask_incorrect'  : 0.0,
        'face_with_mask'            : 1.0,
        'face_no_mask'              :-1.0,
    }
}

equal_classes = equal_classes.replace(replacement_dict)
equal_classes[['type', 'usage']].value_counts()

mask_train, mask_test = train_test_split(equal_classes, test_size = percent_val, stratify = equal_classes[['type', 'usage']])
valid_mask_test       = mask_test[mask_test['usage'] != -1.0]
#, mask_test[['type', 'usage']].value_counts() / valid_mask_test[['type', 'usage']].value_counts())

In [None]:
print(mask_train[['type', 'usage']].value_counts())
print(mask_test[['type', 'usage']].value_counts())
print(valid_mask_test[['type', 'usage']].value_counts())
print()

print(mask_train[['type', 'usage']].value_counts() / len(mask_train))
print(mask_test[['type', 'usage']].value_counts() / len(mask_test))
print(valid_mask_test[['type', 'usage']].value_counts() / len(valid_mask_test))

In [None]:
def adjust_image(input_image):
    brightness = random.choice([1.0, 0.8, 1.2])
    contrast   = random.choice([1.0, 0.8, 1.2])
    saturation = random.choice([1.0, 0.8, 1.2])

    img_proc = cv2.cvtColor(input_image, cv2.COLOR_RGB2HSV)
    np.multiply(img_proc, np.array([ 1.0, saturation, 1.0 ], dtype = np.single), out = img_proc)
    
    img_proc[img_proc > 255] = 255
    img_proc[img_proc < 0]   = 0

    cv2.cvtColor(img_proc, cv2.COLOR_HSV2RGB, dst = img_proc)
    np.multiply(img_proc, brightness, out = img_proc)
    np.add(img_proc, ((1-contrast) * 100))

    img_proc[img_proc > 255] = 255
    img_proc[img_proc < 0]   = 0
    img_proc  = img_proc.astype(np.float32) * (1.0 / 255)

    return img_proc

image_gen  = tf.keras.preprocessing.image.ImageDataGenerator(width_shift_range      = 0.1, 
                                                             height_shift_range     = 0.1, 
                                                             horizontal_flip        = True,
                                                             preprocessing_function = adjust_image)

In [None]:
filename_col = 'newFilename'
label_cols   = ['type', 'usage']

train_ds = image_gen.flow_from_dataframe(mask_train, IMAGE_DIR, 
                                              x_col       = filename_col, 
                                              y_col       = label_cols, 
                                              target_size = IMG_SIZE, 
                                              class_mode  = 'multi_output',
                                              subset      = "training", 
                                              batch_size  = BATCH_SIZE,
                                              dtype       = 'float32')

test_ds = image_gen.flow_from_dataframe(mask_test, IMAGE_DIR, 
                                            x_col       = filename_col, 
                                            y_col       = label_cols, 
                                            target_size = IMG_SIZE, 
                                            class_mode  = 'multi_output',
                                            batch_size  = BATCH_SIZE,
                                            dtype       = 'float32')

valid_mask_test_ds = image_gen.flow_from_dataframe(valid_mask_test, IMAGE_DIR, 
                                            x_col       = filename_col, 
                                            y_col       = label_cols, 
                                            target_size = IMG_SIZE, 
                                            class_mode  = 'multi_output',
                                            batch_size  = BATCH_SIZE,
                                            dtype       = 'float32')

### Usage and Type classifier

In [None]:
def build_cnn_model():
    img   = tf.keras.layers.Input(shape = (128, 128, 3), dtype = 'float32')
    base  = tf.keras.applications.ResNet50(include_top = False, input_shape = (*IMG_SIZE,3), pooling = 'max')(img)
    base  = tf.keras.layers.Flatten()(base)
    base  = tf.keras.layers.Dense(256, activation = 'relu', dtype = 'float32')(base)
    t_out = tf.keras.layers.Dense(1, activation = 'sigmoid', dtype = 'float32', name = 'mask_type')(base)
    u_out = tf.keras.layers.Dense(1, activation = 'sigmoid', dtype = 'float32', name = 'mask_usage')(base)
    model = tf.keras.Model(inputs = img, outputs = [t_out, u_out])
    return model

model = build_cnn_model()
model.summary()

In [None]:
def custom_loss(y_true, y_pred):
    is_valid = tf.reshape(tf.where(y_true == -1, 0.0, 1.0), (1, -1))
    bce_loss = tf.keras.losses.BinaryCrossentropy(reduction = tf.keras.losses.Reduction.NONE)(y_true, y_pred)
    bce_loss = tf.where(is_valid == 1, bce_loss, 0.0)
    return tf.reduce_sum(bce_loss) * (1.0 / float(len(y_true)))

In [None]:
y_true_test = tf.constant([
    [-1.0],
    [1.0],
    [0.0],
    [1.0],
])

y_pred_test = tf.constant([
    [1.0],
    [1.0],
    [0.0],
    [0.0],
])

# should be zero since they are the same
print(custom_loss(y_true_test, y_true_test))

# should remain the same even when changing the corresponding y_pred to anything if y_true == -1
print(custom_loss(y_true_test, y_pred_test)) 

In [None]:
model.compile(optimizer = 'adam', 
              loss      = {
                  'mask_type' : tf.keras.losses.BinaryCrossentropy(),
                  'mask_usage': custom_loss
              }, 
              metrics   = { 
                  'mask_type'  : [
                      'accuracy',
                      tf.keras.metrics.Precision(),
                      tf.keras.metrics.Recall(),
                      tf.keras.metrics.TruePositives(),
                      tf.keras.metrics.TrueNegatives(),
                      tf.keras.metrics.FalsePositives(),
                      tf.keras.metrics.FalseNegatives(),
                  ]
              })

In [None]:
history = model.fit(train_ds, validation_data = test_ds, epochs = NUM_EPOCHS, callbacks = [tensorboard])

In [None]:
results = model.evaluate(test_ds, callbacks = [tensorboard])

In [None]:
model.compile(optimizer = model.optimizer, 
              loss      = model.losses, 
              metrics   = {
                  'mask_usage' : [
                      'accuracy',
                      tf.keras.metrics.Precision(),
                      tf.keras.metrics.Recall(),
                      tf.keras.metrics.TruePositives(),
                      tf.keras.metrics.TrueNegatives(),
                      tf.keras.metrics.FalsePositives(),
                      tf.keras.metrics.FalseNegatives(),
                  ]
              })

In [None]:
results_for_valid_mask = model.evaluate(valid_mask_test_ds, callbacks = [tensorboard])

In [None]:
model.save('saved-models/{}'.format(model_name))

In [None]:
x, y = valid_mask_test_ds.next()
pred = model(x)
pred