# Import and Definitions

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.python.keras.callbacks import EarlyStopping, ModelCheckpoint
import matplotlib.pyplot as plt
from tensorflow.keras.models import Model
import os
import shutil
import csv
from datetime import datetime

SEED = 678759
np.seed = SEED

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
def plot_hist(hist):
    plt.plot(hist.history["accuracy"])
    plt.plot(hist.history["val_accuracy"])
    plt.title("model accuracy")
    plt.ylabel("accuracy")
    plt.xlabel("epoch")
    plt.legend(["train", "validation"], loc="upper left")
    plt.show()

In [None]:
def create_csv(results, results_dir):

    csv_fname = 'results_'
    csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

    with open(os.path.join(results_dir, csv_fname), 'w') as f:

        f.write('Id,Category\n')

        for key, value in results.items():
            f.write(key + ',' + str(value) + '\n')

In [None]:
def shuffle_validation_weighted(base_dir, split = 0.2, reset = False):
    # Create validation_set weighted on number of occurrences in training_set
    train_dir = os.path.join(base_dir, 'training')
    valid_dir = os.path.join(base_dir, 'validation')

    if not reset:
        # First identify training and validation dir
        if not os.path.exists(valid_dir):
            os.makedirs(valid_dir)

        # Count elements in each dir in training
        class_and_card = {name: len(os.listdir(os.path.join(train_dir, name))) for name in os.listdir(train_dir) if
                          os.path.join(train_dir, name)}

        print(class_and_card)
        # Get images per class wrt total images
        class_and_card_validation = {name: int(class_and_card[name] * split) for name in class_and_card}

        print(class_and_card_validation)
        # Select images to move
        for key, item in class_and_card_validation.items():
            source_dir = os.path.join(train_dir, key)
            images = os.listdir(source_dir)
            np.random.shuffle(images)
            target_dir = os.path.join(valid_dir, key)
            if not os.path.exists(target_dir):
                os.makedirs(target_dir)
            for i in range(item):
                shutil.move(os.path.join(source_dir, images[i]), target_dir)
    else:
        # Restore initial state
        # For each class, move images to train_dir in respective folders
        classes = [name for name in os.listdir(valid_dir)]
        for class_name in classes:
            source_dir = os.path.join(valid_dir, class_name)
            images = os.listdir(source_dir)
            target_dir = os.path.join(train_dir, class_name)
            for img in images:
                shutil.move(os.path.join(source_dir, img), target_dir)

# Hyperparameters 

In [None]:
# We executed a scoped GridSearch on the following parameters
# Dense Layers -> [1, 2] (tested parameters)
# Neurons Number -> [256, 512]
# Image Size -> [224 (default)] (same for heigth and width)
# Batch Size -> [16]
# Valid Percentage -> [0.2]
# Starting Learning Rate -> [1e-3, 1e-4]
# LR1 values -> // not tested
# LR2 values -> // not tested
# Below we have the best parameters

batch_size = 16
img_w, img_h = 224, 224
num_classes = 3
valid_split_perc = 0.2

dense_neurons = [256, 512, 1024]
learning_rate = 1e-4
train_after_layer = [35, 40, 45]

nb_train_samples = 4492
nb_val_samples = 1122

epochs_fine = 100

# Dataset Loading and Preprocessing

In [None]:
cwd = os.getcwd()
dataset_dir = os.path.join(cwd, 'MaskDataset')
if os.path.exists(dataset_dir):
  shutil.rmtree(dataset_dir)
!unzip '/content/drive/My Drive/Keras3/MaskDataset'

[1;30;43mOutput streaming troncato alle ultime 5000 righe.[0m
  inflating: MaskDataset/training/0/12637.jpg  
  inflating: MaskDataset/training/0/12638.jpg  
  inflating: MaskDataset/training/0/12639.jpg  
  inflating: MaskDataset/training/0/12644.jpg  
  inflating: MaskDataset/training/0/12650.jpg  
  inflating: MaskDataset/training/0/12652.jpg  
  inflating: MaskDataset/training/0/12655.jpg  
  inflating: MaskDataset/training/0/12658.jpg  
  inflating: MaskDataset/training/0/12663.jpg  
  inflating: MaskDataset/training/0/12666.jpg  
  inflating: MaskDataset/training/0/12667.jpg  
  inflating: MaskDataset/training/0/12668.jpg  
  inflating: MaskDataset/training/0/12672.jpg  
  inflating: MaskDataset/training/0/12678.jpg  
  inflating: MaskDataset/training/0/12681.jpg  
  inflating: MaskDataset/training/0/12684.jpg  
  inflating: MaskDataset/training/0/12689.jpg  
  inflating: MaskDataset/training/0/12700.jpg  
  inflating: MaskDataset/training/0/12703.jpg  
  inflating: MaskDataset

In [None]:
shuffle_validation_weighted(dataset_dir, split=valid_split_perc, reset=False)
shuffle_validation_weighted(dataset_dir, split=valid_split_perc, reset=True)
shuffle_validation_weighted(dataset_dir, split=valid_split_perc, reset=False)

{'0': 1900, '1': 1897, '2': 1817}
{'0': 380, '1': 379, '2': 363}


In [None]:
train_datagen = ImageDataGenerator(rotation_range=40,
                                   width_shift_range=0.2,
                                   height_shift_range=0.2,
                                   shear_range=0.2,
                                   zoom_range=0.2,
                                   horizontal_flip=True,
                                   vertical_flip=True,
                                   fill_mode="nearest",
                                   preprocessing_function=tf.keras.applications.resnet50.preprocess_input)

train_generator = train_datagen.flow_from_directory(os.path.join(dataset_dir, 'training'),
                                                    target_size=(img_w, img_h),
                                                    batch_size=batch_size,
                                                    shuffle=True,
                                                    class_mode='categorical',
                                                    seed = SEED)

val_datagen = ImageDataGenerator(preprocessing_function=tf.keras.applications.resnet50.preprocess_input)

val_generator = val_datagen.flow_from_directory(os.path.join(dataset_dir, 'validation'),
                                                target_size=(img_w, img_h),
                                                batch_size=batch_size,
                                                shuffle=False,
                                                class_mode='categorical',
                                                seed = SEED)

Found 4492 images belonging to 3 classes.
Found 1122 images belonging to 3 classes.


# Model Definition

In [None]:
base_model = keras.applications.ResNet50(weights='imagenet',
                                                 include_top=False,
                                                 input_shape=(img_w, img_h, 3))
for lr in train_after_layer:
    print(base_model.layers[lr].name)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5
conv2_block3_3_conv
conv3_block1_1_bn
conv3_block1_0_conv


# Grid Search Training

In [None]:
# We used grid search and we obtained the best results with:
# dense_neurons = 512
# train_after_layer = 35
    
#   BEST RESULT --> Epoch 12/100
#                   280/280 [==============================] - 96s 342ms/step - loss: 0.2615 - accuracy: 0.8954 - val_loss: 0.3087 - val_accuracy: 0.8946

# Since we got better result with another model, we didn't investigate ResNet furtherly
for lr in train_after_layer:
    for nr in dense_neurons:
        
        base_model = keras.applications.ResNet50(weights='imagenet',
                                                 include_top=False,
                                                 input_shape=(img_w, img_h, 3))

        path = str(lr) + '_' + str(nr)
        print(path)
        
        cb_early_stopper = EarlyStopping(monitor = 'val_loss', patience = 5)
        cb_checkpointer = ModelCheckpoint(filepath = path + '.hdf5', monitor = 'val_loss', save_best_only = True)

        print(base_model.layers[lr].name)

        for layer in base_model.layers[:lr]:
            if isinstance(layer, layers.Conv2D) or isinstance(layer, layers.SeparableConv2D):
                layer.trainable = False


        model = keras.models.Sequential()

        model.add(base_model)
        model.add(layers.GlobalAveragePooling2D())
        model.add(layers.Dense(nr, activation='relu'))
        model.add(layers.Dropout(0.5))
        model.add(layers.Dense(num_classes, activation='softmax'))

        model.compile(loss='categorical_crossentropy',
                      optimizer=keras.optimizers.Adam(lr=learning_rate),
                      metrics=['accuracy'])

        hist = model.fit(train_generator,
                         steps_per_epoch=nb_train_samples // batch_size,
                         epochs=epochs_fine,
                         validation_data=val_generator,
                         validation_steps=nb_val_samples // batch_size,
                         callbacks=[cb_checkpointer, cb_early_stopper],
                         verbose = 1)

        plot_hist(hist)

model.load_weights("best.hdf5")

# Testing and Creating CSV

In [None]:
model.load_weights('basic_85.hdf5')

test_dir = os.path.join(cwd, 'test')

test_data_gen = ImageDataGenerator(preprocessing_function=tf.keras.applications.resnet50.preprocess_input)

test_gen = test_data_gen.flow_from_directory(test_dir, target_size=(img_h, img_w), 
                                                 color_mode='rgb',
                                                 class_mode='categorical',
                                                 classes = None,
                                                 batch_size=1,
                                                 shuffle=False)
test_gen.reset()

predictions = model.predict_generator(test_gen, len(test_gen), verbose=1)

results = {}

images = test_gen.filenames
i = 0

for p in predictions:
  prediction = np.argmax(p)
  import ntpath
  image_name = ntpath.basename(images[i])
  results[image_name] = str(prediction)
  i = i + 1
  
create_csv(results, cwd)