# Dependency

In [None]:
import os
import cv2
import matplotlib.pyplot as plt
from PIL import Image
from google.colab.patches import cv2_imshow
%tensorflow_version 2.x
import tensorflow as tf
import matplotlib.pyplot as plt
import os.path, sys
import numpy as np

from tensorflow import keras
from tensorflow.keras import layers, datasets, models, optimizers
from tensorflow.keras.layers import Conv2D, BatchNormalization, Add, Activation, MaxPooling2D, Dropout, Flatten, Dense, AveragePooling2D
from tensorflow.keras import  Input
from tensorflow.keras import Model
from keras.preprocessing.image import ImageDataGenerator
from tqdm.auto import tqdm
import zipfile

import imutils
import numpy as np
from tensorflow.keras.models import load_model
import numpy as np
import gdown

# check GPU
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

# Dataset

In [None]:
gdown.download("https://drive.google.com/uc?export=download&id=1u57z2B_Kdwtzly1p0gEwm9bQ60AUeM7j", "./train.zip", quiet=False) 
gdown.download("https://drive.google.com/uc?export=donwload&id=1-0uFU0xH2swwQXVE7uGR1tCa7dtkshgd", "./dev.zip", quiet=False)
gdown.download("https://drive.google.com/uc?export=download&id=1-5B1d2UJNQvqY1eFIr7_DT8NngMZNboR", "./test.zip", quiet=False)

# Extract features

In [None]:
if os.path.exists("./train.zip"):
    print("Extracting the archive")
    with zipfile.ZipFile('./train.zip', 'r') as zip_ref:
      zip_ref.extractall("/content/localdata/")
    with zipfile.ZipFile('./dev.zip', 'r') as zip_ref:
      zip_ref.extractall("/content/localdata/")
    with zipfile.ZipFile('./test.zip', 'r') as zip_ref:
      zip_ref.extractall("/content/localdata/")
    print("Done")
    os.remove("./train.zip")   
    os.remove("./dev.zip") 
    os.remove("./test.zip")

Go to localdata

In [None]:
os.chdir("/content/localdata/")

In [None]:
!ls

# Check items for classes

In [None]:
print("***************")
print("**** TRAIN ****")
print("***************")
print()
directory = "./train"
l = os.listdir(directory)
print(f"num classes: {len(l)}")
l.sort()
for c in l:
  print(c+": "+str(len(os.listdir(directory+"/"+c))))

print()
print("***************")
print("***** DEV *****")
print("***************")
print()
directory = "./dev"
l = os.listdir(directory)
print(f"num classes: {len(l)}")
l.sort()
for c in l:
  print(c+": "+str(len(os.listdir(directory+"/"+c))))
  
print()
print("***************")
print("***** TEST ****")
print("***************")
print()
directory = "./test"
l = os.listdir(directory)
print(f"num classes: {len(l)}")
l.sort()
for c in l:
  print(c+": "+str(len(os.listdir(directory+"/"+c))))

# Dataset

In [None]:
class SSDataset():

  def __init__(self,train, valid, test, batch, target_size, data_aug):

    """
      Args:
        - train: path to the train file
        - test: path to the test file
        - batch: batch size
        - target_size: dimension of images
        - data_aug: boolean that we use for data augmentation
    """

    self.train_file = train
    self.valid_file = valid
    self.test_file = test
    self.batch_size = batch
    self.target_size = target_size
    self.data_aug = data_aug
    self.train_generator, self.valid_generator, self.test_generator = self.create_dataset()

  def create_dataset(self):
      """
        creation of the dataset, divided in train, dev and test
      """

      if self.data_aug:
          train_data = ImageDataGenerator(
              rescale = 1. / 255, # convert from uint8 to float32 in range 0,1
              brightness_range=[0.3,1.5]
          )
      else:
          train_data = ImageDataGenerator(
              rescale = 1. / 255, # convert from uint8 to float32 in range 0,1
          )

      train_generator = train_data.flow_from_directory(
          directory=self.train_file,
          target_size=self.target_size,
          color_mode="rgb",
          batch_size=self.batch_size,
          class_mode="categorical",
          shuffle=True, 
      )

      valid_data = ImageDataGenerator(
          rescale = 1. / 255)
      
      valid_generator = valid_data.flow_from_directory(
          directory=self.valid_file, 
          target_size=self.target_size,
          batch_size=self.batch_size,
          color_mode="rgb",
          shuffle=False,
          class_mode='categorical') # set as validation data

      test_datagen = ImageDataGenerator(
          rescale = 1. / 255)

      test_generator = test_datagen.flow_from_directory(
          directory=self.test_file,
          target_size=self.target_size,
          color_mode="rgb",
          batch_size=self.batch_size,
          class_mode="categorical",
          shuffle=False
      )

      return train_generator, valid_generator, test_generator

Init

In [None]:
data_aug = True
target_size =(128, 128)
batch_size = 64

train = "./train"
valid = "./dev"
test = "./test"

dataset = SSDataset(train,valid, test, batch_size, target_size, data_aug)
train_generator, valid_generator, test_generator = dataset.train_generator, dataset.valid_generator, dataset.test_generator

num_samples = train_generator.n
num_classes = train_generator.num_classes
input_shape = train_generator.image_shape
classnames = [k for k,v in train_generator.class_indices.items()]

print("\nClasses:\n%r" %classnames)

# Show batch

In [None]:
def show_batch(image_batch, label_batch):

  """
    Args:
      - image_batch: batch of images (not labels)
      - label_batch: batch of labels 
  """
  
  plt.figure(figsize=(12,12))
  for n in range(16):
      ax = plt.subplot(4,4,n+1)
      plt.imshow(image_batch[n])
      idx = np.where(label_batch[n] == 1)[0][0]
      plt.title(classnames[idx])
      plt.axis('off')

image_batch, label_batch = next(train_generator)
show_batch(image_batch, label_batch)

# Models

## Hyperparameters

In [None]:
class HParams():
  def __init__(self, input_shape, num_classes, lr, model_name, brightness, shift):

    """
      Args:
        - input_shape: shape of images
        - num_classes: number of classes
        - lr: learning rate
        - model_name: name of model
    """
    self.brightness = brightness
    self.shift = shift
    self.shape = input_shape
    self.num_classes = num_classes
    self.lr = lr
    self.optimizer = optimizers.Adam(self.lr)
    self.loss_fn = 'categorical_crossentropy'
    self.metrics = ['accuracy']
    self.drop_rate = 0.5
    self.n_res_net_blocks = 3
    self.n_paths = 4
    self.model_name = model_name #allowed values: Xception, ResNet, ResNeXt, VGG16, VGG19

params = HParams(input_shape, num_classes, 0.00001, "VGG16", True, False)

## Pretrained

In [None]:
class PretrainedModel():
  def __init__(self,name, trainable):

    """
      Args:
        - name: name of pretrained model
        - trainable: boolean used to indicate if layers of pretrained model are freezed or trained
    """
    
    self.name = name
    self.trainable = trainable
    self.model = self.build_model()
  
  def build_model(self):

    if self.name == "Xception":
      base_model = keras.applications.Xception(
          weights='imagenet',
          input_shape=input_shape,
          include_top=False
      )
    elif self.name == "ResNet":
      base_model = keras.applications.ResNet152V2(
          weights='imagenet',
          input_shape=input_shape,
          include_top=False
      )
    elif self.name == "VGG16":
      base_model = keras.applications.VGG16(
          weights='imagenet',
          input_shape=input_shape,
          include_top=False
      )
    else:
      base_model = keras.applications.VGG19(
          weights='imagenet',
          input_shape=input_shape,
          include_top=False
      )

    base_model.trainable = self.trainable
    inputs = keras.Input(shape=input_shape)

    # We freeze pretrained model weigths
    x = base_model(inputs, training=True)

    # Convert features of shape `base_model.output_shape[1:]` to vectors
    x = keras.layers.GlobalAveragePooling2D()(x)

    x = keras.layers.Dense(1024, activation='relu')(x)
    x = keras.layers.Dropout(params.drop_rate)(x)
    x = keras.layers.Dense(128, activation='relu')(x)
    outputs = keras.layers.Dense(params.num_classes, activation="softmax")(x)
    ss_model = keras.Model(inputs, outputs, name=params.model_name)
    ss_model.compile(loss=params.loss_fn, optimizer=params.optimizer, metrics=params.metrics)

    return ss_model

### Init

In [None]:
ss_model = PretrainedModel(params.model_name, True).model

# Training

In [None]:
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping, ModelCheckpoint
from datetime import datetime

nome_prova = f"{params.model_name}{input_shape}_lr{params.lr}_drop{params.drop_rate}_data_aug_{data_aug}_brightness_{params.brightness}_shift_{params.shift}_FACE_DETECTION_{datetime.now().strftime('%Y%m_%d_%H_%M_%S')}"

model_path = "/content/training/checkpoints/"+nome_prova+"/"
logs_path = "/content/training/tensorboard_logs/"+nome_prova

if not os.path.exists(model_path):
  os.makedirs(model_path)

if not os.path.exists(logs_path):
  os.makedirs(logs_path)

my_callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor="val_accuracy", patience=5, restore_best_weights=True),
    tf.keras.callbacks.ModelCheckpoint(filepath=model_path+'model.{epoch:02d}-{val_loss:.2f}.h5'),
    tf.keras.callbacks.TensorBoard(log_dir=logs_path)
]

history = ss_model.fit(train_generator, epochs=20, verbose=1, validation_data=valid_generator, callbacks=my_callbacks)


# Test

In [None]:
loss, acc = ss_model.evaluate(test_generator, verbose=1)
print('Test loss: %f' %loss)
print('Test accuracy: %f' %acc)

# Show batch predict test

In [None]:
def show_batch(image_batch, label_batch):
  plt.figure(figsize=(12,12))
  for n in range(36):
      ax = plt.subplot(6,6,n+1)
      plt.imshow(image_batch[n])
      pred = ss_model(image_batch)
      idx = np.where(label_batch[n] == 1)[0][0]
      idx_pred = pred[n].numpy().argmax()
      plt.title(f"{classnames[idx]} -> {classnames[idx_pred]}")
      plt.axis('off')

image_batch, label_batch = next(test_generator)
show_batch(image_batch, label_batch)