# Mask classifier

3 classes:
- 0: NO PERSON in the image is wearing a mask
- 1: ALL THE PEOPLE in the image are wearing a mask
- 2: SOMEONE in the image is not wearing a mask"

For more details see [https://www.kaggle.com/c/artificial-neural-networks-and-deep-learning-2020]

In [None]:
from IPython.display import display

import tensorflow as tf
from tensorflow import keras
from keras.preprocessing.image import ImageDataGenerator
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

from pathlib import Path
import json

In [None]:
SEED = 1234
tf.random.set_seed(SEED)
np.random.seed(SEED)

cwd = Path.cwd()

## Preprocessing

Prior to running this notebook one should have prepared the dataset folders by using `prepare_dataset.py`, which creates a handout set for validation from the original training images, by extracing a certain percentage from each directory.

The script is expected to find a `MaskDataset` folder as extracted from the provided zip file, and will create the structure expected by `flow_from_directory`.

In [None]:
dataset_dir = cwd.joinpath('MaskDataset')
train_dir = dataset_dir.joinpath('training')
valid_dir = dataset_dir.joinpath('validation')

class_names = ['NO_MASK', 'ALL_MASK', 'SOME_MASK']

Set up the image data generators for automatic augmentation

In [None]:
APPLY_AUGMENTATION = True
preprocessing_function = keras.applications.vgg16.preprocess_input

if APPLY_AUGMENTATION:
    train_data_gen = ImageDataGenerator(
        rotation_range=30,
        width_shift_range=10,
        height_shift_range=10,
        brightness_range=(0.75,1.15), # 0 is black, 1 is original image
        zoom_range=.15, # % of size
        horizontal_flip=True,
        # vertical_flip=True,
        fill_mode='constant',
        cval=0,
        preprocessing_function=preprocessing_function
    )
else:
    train_data_gen = ImageDataGenerator(preprocessing_function=preprocessing_function)

# Do not perform augmentation on validation set
valid_data_gen = ImageDataGenerator(preprocessing_function=preprocessing_function)

Load data from disk and split it in batches

In [None]:
BATCH_SIZE = 8
TARGET_SIZE = (224,224)

flow_from_directory_kwargs = dict(
    target_size=TARGET_SIZE,
    color_mode='rgb',
    class_mode='categorical',
    batch_size=BATCH_SIZE
)

train_gen = train_data_gen.flow_from_directory(train_dir, **flow_from_directory_kwargs)
valid_gen = valid_data_gen.flow_from_directory(valid_dir, shuffle=False, **flow_from_directory_kwargs)

from_generator_kwargs = dict(
    output_types=(tf.float32, tf.float32),
    output_shapes=((None, *TARGET_SIZE, 3), (None, len(class_names)))
)

train_dataset = tf.data.Dataset.from_generator(lambda: train_gen, **from_generator_kwargs)
valid_dataset = tf.data.Dataset.from_generator(lambda: valid_gen, **from_generator_kwargs)

train_dataset = train_dataset.repeat()
valid_dataset = valid_dataset.repeat()

Show some (augmented) images from the first training batch

In [None]:
plt.figure(figsize=(10,10))

images, labels = next(iter(train_dataset))
for i in range(8):
    ax = plt.subplot(4, 4, i+1)
    plt.imshow(images[i])
    plt.title(class_names[np.argmax(labels[i])])
    plt.axis('off')

## Model

Set up the callbacks

In [None]:
# Create directory for our models
models_dir = cwd.joinpath('vgg')
Path.mkdir(models_dir, exist_ok=True)

def gen_callbacks(model_name):
    ENABLE_CHECKPOINT = True
    ENABLE_TENSORBOARD = True
    ENABLE_EARLYSTOP = False
    ENABLE_REDUCE_LR = False
    callbacks = []

    # Create directory for this particular model
    model_dir = models_dir.joinpath(model_name)
    Path.mkdir(model_dir, exist_ok=False)

    # Checkpointing callback
    if ENABLE_CHECKPOINT:
        ckpt_dir = model_dir.joinpath('ckpts')
        Path.mkdir(ckpt_dir, exist_ok=True)

        callbacks.append(keras.callbacks.ModelCheckpoint(
            str(model_dir.joinpath('ckpts','ckpt_{epoch:02d}.hdf5')),
            monitor='val_loss',
            save_best_only=True,
            save_weights_only=True
        ))

    # Tensorboard callback
    if ENABLE_TENSORBOARD:
        tb_dir = model_dir.joinpath('logs')
        Path.mkdir(tb_dir, exist_ok=True)

        callbacks.append(keras.callbacks.TensorBoard(
            log_dir=str(tb_dir),
            histogram_freq=1,
            update_freq='epoch',
            profile_batch=0
        ))

    # Early stop callback
    if ENABLE_EARLYSTOP:
        early_stop = keras.callbacks.EarlyStopping(
            monitor='val_loss',
            min_delta=1e-2,
            verbose=1,
            patience=10  # min. number of epochs of non-improving in order to stop
        )
        callbacks.append(early_stop)

    # Reduce learning rate on plateau
    if ENABLE_REDUCE_LR:
        reduce_lr = keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=np.sqrt(.1),
            verbose=1,
            patience=5,
            cooldown=0,
            min_lr=.5e-6
        )
        callbacks.append(reduce_lr)
    return callbacks

Define the model using the Sequential API

In [None]:
def gen_model(model_name):

    KERNEL_REGULARIZER = None
    ENABLE_DROPOUT = True
    DROPOUT_RATE_CONV = 0.2
    DROPOUT_RATE_DENSE = 0.5
    ENABLE_BATCH_NORMALIZATION = False

    CONV_PADDING='same'

    # Instantiate VGG16 network without top classifier
    base_model = keras.applications.VGG16(
        include_top=False,
        weights='imagenet',
        pooling='avg',
        input_tensor=keras.Input(shape=(*TARGET_SIZE,3))
    )
    # Freeze the model to perform transfer learning
    for layer in base_model.layers:
        layer.trainable = False

    # Input of classifier head
    head = keras.Sequential(name='mlp_head')
    head.add(keras.Input(shape=base_model.output_shape[1:]))

    if ENABLE_BATCH_NORMALIZATION:
        head.add(keras.layers.BatchNormalization(name='batchnorm_flatten'))

    # MLP layers
    INITIAL_UNITS = 64
    MLP_DEPTH = 0

    for i in range(MLP_DEPTH):
        head.add(keras.layers.Dense(
            INITIAL_UNITS,
            activation='relu',
            kernel_regularizer=KERNEL_REGULARIZER,
            name=f'dense_{i+1}'
        ))

        if ENABLE_DROPOUT:
            head.add(keras.layers.Dropout(DROPOUT_RATE_DENSE, name=f'dropout_dense_{i+1}'))

        if ENABLE_BATCH_NORMALIZATION:
            head.add(keras.layers.BatchNormalization(name=f'batchnorm_dense_{i+1}'))

    # Softmax layer
    head.add(keras.layers.Dense(
        len(class_names),
        activation='softmax',
        name='dense_softmax'
    ))

    # Combine the model parts
    model = keras.Sequential([base_model, head], name=model_name)

    # Show a summary of the model
    model.summary()

    return model

Alternatively we can load a pretrained model, with weights, and optionally unfreeze layers for finetuning

In [None]:
def load_model(model_name, epoch):
    with open(models_dir.joinpath(model_name, 'model.json')) as f:
        # Load pre-saved configuration
        model = keras.models.model_from_json(f.read())

        # Load weights from checkpoint
        model.load_weights(str(models_dir.joinpath(model_name, 'ckpts', f'ckpt_{epoch:02d}.hdf5')))

        # Unfreeze layers for finetuning
        if True:
            for layer in model.layers[0].layers[17:]:
                layer.trainable = True

        model.summary()
        return model

Choose loss, learning rate, and optimizer. Then compile the model.

In [None]:
def compile_model(model):
    # We are using a categorical (i.e. one-hot output)
    loss = keras.losses.CategoricalCrossentropy()

    # Learning rate can be adjusted
    learning_rate = 1e-5
    optimizer = keras.optimizers.Adam(learning_rate=learning_rate)

    metrics = ['accuracy']
    model.compile(
        optimizer=optimizer,
        loss=loss,
        metrics=metrics
    )

## Training

Fit the model to data

In [None]:
NUM_EPOCHS = 30
RUN_SUFFIX = '_step1'

# Start with a fresh model or load a pretrained one
model = gen_model('Mask_vgg16_gap_1')
# model = load_model('Mask_vgg16_gap_1' + '_step2', epoch=30)
compile_model(model)

model_dir = models_dir.joinpath(model.name + RUN_SUFFIX)

history = model.fit(
    train_dataset,
    validation_data=valid_dataset,
    epochs=NUM_EPOCHS,
    steps_per_epoch=len(train_gen),
    validation_steps=len(valid_gen),
    callbacks=gen_callbacks(model.name + RUN_SUFFIX)
)

In [None]:
# Save model configuration to file
with open(model_dir.joinpath('model.json'), 'w') as f:
    f.write(model.to_json())

# Save history to file
df = pd.DataFrame(history.history)
df.to_csv(model_dir.joinpath('metrics.csv'))

In [None]:
model.layers[0].summary()
for i, layer in enumerate(model.layers[0].layers):
    print(i, layer.name, layer.trainable)

## Evaluation

We might want to load the best weights from the previous run, according to the validation loss

In [None]:
model.load_weights(str(models_dir.joinpath(model.name + '_step3', 'ckpts', 'ckpt_03.hdf5')))

In [None]:
def plot_confusion_matrix(labels, predictions):
    # Compute the confusion matrix
    cm = tf.math.confusion_matrix(labels, predictions).numpy()

    # Draw the figure
    figure = plt.figure(figsize=(5, 5))
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title('Confusion matrix')
    plt.colorbar()
    tick_marks = np.arange(len(class_names))
    plt.xticks(tick_marks, class_names, rotation=45)
    plt.yticks(tick_marks, class_names)

    # Normalize the confusion matrix.
    cmn = np.around(cm.astype('float') / cm.sum(axis=1)[:, np.newaxis], decimals=2)

    # Use white text if squares are dark; otherwise black.
    threshold = cmn.max() / 2.
    for i, j in [(i,j) for i in range(cmn.shape[0]) for j in range(cmn.shape[1])]:
        color = "white" if cmn[i, j] > threshold else "black"
        plt.text(j, i, cmn[i, j], horizontalalignment="center", color=color)
        
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

    return figure

predictions = model.predict(valid_gen)
pred_classes = np.argmax(predictions, axis=1)
true_classes = valid_gen.classes

figure = plot_confusion_matrix(true_classes, pred_classes)

## Prediction

Perform a batch prediction on the test dataset, then save the results to a file.

In [None]:
test_dir = dataset_dir.joinpath('test')

img_paths = list(test_dir.iterdir())

def load_preprocess(img_path):
    img = keras.preprocessing.image.load_img(img_path).resize(TARGET_SIZE)
    img = keras.preprocessing.image.img_to_array(img)
    img = preprocessing_function(img)

    return img

img_batch = np.array([load_preprocess(p) for p in img_paths])
predictions = map(np.argmax, model.predict(img_batch))

results = pd.DataFrame({'Id': map(lambda p: p.name, img_paths), 'Category': predictions})

results.to_csv(model_dir.joinpath('results.csv'), index=False)