<a href="https://colab.research.google.com/github/nicologhielmetti/AN2DL-challenges/blob/master/custom_arch_v1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install gdown
!gdown https://drive.google.com/uc?id=1Mv7vKoI-QL6kV-1TIDE7N67_L0LXvJAg
!unzip /content/ANDL2.zip

Collecting gdown
  Downloading gdown-3.12.2.tar.gz (8.2 kB)
  Installing build dependencies ... [?25l- \ | / - done
[?25h  Getting requirements to build wheel ... [?25l- done
[?25h    Preparing wheel metadata ... [?25l- done
Collecting filelock
  Downloading filelock-3.0.12-py3-none-any.whl (7.6 kB)
Collecting PySocks!=1.5.7,>=1.5.6; extra == "socks"
  Downloading PySocks-1.7.1-py3-none-any.whl (16 kB)
Building wheels for collected packages: gdown
  Building wheel for gdown (PEP 517) ... [?25l- done
[?25h  Created wheel for gdown: filename=gdown-3.12.2-py3-none-any.whl size=9681 sha256=86dcda712b90c12f822999c4f5bd3280dcdb22a1dc57924eb39bbd959e136b6d
  Stored in directory: /home/nicolo/.cache/pip/wheels/ba/e0/7e/726e872a53f7358b4b96a9975b04e98113b005cd8609a63abc
Successfully built gdown
Installing collected packages: filelock, gdown, PySocks
Successfully installed PySocks-1.7.1 filelock-3.0.12 gdown-3.12.2
Downloading...
From: https://drive.google

In [2]:
from google.colab import drive
drive.mount('/content/drive')

ModuleNotFoundError: No module named 'google.colab'

In [None]:
import json
import os
import shutil
from datetime import datetime
from functools import partial

from PIL import Image

import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorboard import program

SEED = 1996


In [None]:
class ConvBlock(tf.keras.Model):
    def __init__(self, num_filters, regularizer, use_batch_norm):
        super(ConvBlock, self).__init__()
        self.conv2d = tf.keras.layers.Conv2D(filters=num_filters,
                                             kernel_size=(3, 3),
                                             strides=(1, 1),
                                             padding='same',
                                             kernel_regularizer=regularizer,
                                             bias_regularizer=regularizer)
        if use_batch_norm is True:
            self.batch_norm = tf.keras.layers.BatchNormalization()
        self.activation = tf.keras.layers.ReLU()
        self.pooling = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))

    def call(self, inputs, **kwargs):
        x = self.conv2d(inputs)
        x = self.batch_norm(x)
        x = self.activation(x)
        x = self.pooling(x)
        return x


class CNNClassifier(tf.keras.Model):
    def __init__(self, depth_conv, start_f, num_classes, init_neurons_fc=512,
                 regularizer=None, dropout=None, depth_fc=4, use_batch_norm=True):
        super(CNNClassifier, self).__init__()

        self.feature_extractor = tf.keras.Sequential()

        for _ in range(depth_conv):
            self.feature_extractor.add(ConvBlock(num_filters=start_f, regularizer=regularizer,
                                                 use_batch_norm=use_batch_norm))
            start_f *= 2

        self.flatten = tf.keras.layers.Flatten()
        self.classifier = tf.keras.Sequential()

        for _ in range(depth_fc):
          self.classifier.add(tf.keras.layers.Dense(units=init_neurons_fc, activation='relu',
                                                    kernel_regularizer=regularizer, bias_regularizer=regularizer))
          if dropout is not None:
            self.classifier.add(tf.keras.layers.Dropout(dropout))

        self.classifier.add(tf.keras.layers.Dense(units=num_classes, activation='softmax'))

    def call(self, inputs, **kwargs):
        x = self.feature_extractor(inputs)
        x = self.flatten(x)
        x = self.classifier(x)
        return x

    def summary(self, line_length=None, positions=None, print_fn=None):
        #super(CNNClassifier, self).summary(line_length, positions, print_fn)
        self.feature_extractor.summary()
        self.classifier.summary()

In [None]:
def divideDatasetInTargetFolders(json_definition, dataset_path):
    for elem in json_definition:
        dest_dir = os.path.join(dataset_path, str(json_definition[elem]))
        if not os.path.isdir(dest_dir):
            os.mkdir(dest_dir)
        try:
            shutil.move(os.path.join(dataset_path, elem),
                        os.path.join(dest_dir, elem)
                        )
        except FileNotFoundError as e:
            print("File not found: " + str(e))
            continue
    os.mkdir(os.path.join(dataset_path, "augmented"))
    os.mkdir(os.path.join(dataset_path, "augmented/training"))
    os.mkdir(os.path.join(dataset_path, "augmented/validation"))


def getMaxImageSize(dataset_dir):
    max_w = 0
    max_h = 0
    path = os.path.join(os.getcwd(), dataset_dir)
    for filename in os.listdir(path):
        if filename.endswith(".jpg"):
            image = Image.open(os.path.join(path, filename))
            width, height = image.size
            max_w = width if width > max_w else max_w
            max_h = height if height > max_h else max_h
        else:
            print("This file -> " + filename + " is not .jpg")
    return max_w, max_h


def getMinImageSize(dataset_dir, max_w, max_h):
    min_w = max_w
    min_h = max_h
    for filename in os.listdir(dataset_dir):
        if filename.endswith(".jpg"):
            image = Image.open(os.path.join(dataset_dir, filename))
            width, height = image.size
            min_w = width if width < min_w else min_w
            min_h = height if height < min_h else min_h
        else:
            print("This file -> " + filename + " is not .jpg")
    return min_w, min_h

In [None]:
train_path = os.path.join(os.getcwd(), 'MaskDataset/training')
test_path  = os.path.join(os.getcwd(), 'MaskDataset/test')

In [None]:
division_dict = json.load(
  open(os.path.join(os.getcwd(), 'MaskDataset/train_gt.json'))
)

divideDatasetInTargetFolders(division_dict, train_path)

In [None]:
# remember to check both train and test datasets to be sure of max dimensions
max_w, max_h = max(getMaxImageSize(os.path.join(train_path, '0')),
                   getMaxImageSize(os.path.join(train_path, '1')),
                   getMaxImageSize(os.path.join(train_path, '2')))
print("Maximum width and height: " + str((max_w, max_h)))

min_w, min_h = min(getMinImageSize(os.path.join(train_path, '0'), max_w, max_h),
                   getMinImageSize(os.path.join(train_path, '1'), max_w, max_h),
                   getMinImageSize(os.path.join(train_path, '2'), max_w, max_h))
print("Minimum width and height:  " + str((min_w, min_h)))
print("Maximum width  expansion:  " + str(max_w - min_w) + ", increase ratio: " +
      str(float(max_w) / float(max_w - min_w)))
print("Maximum height expansion:  " + str(max_h - min_h) + ", increase ratio: " +
      str(float(max_h) / float(max_h - min_h)))

In [None]:
preproc_fun_fixed = partial(tf.keras.preprocessing.image.smart_resize, size=(max_w, max_h))

train_data_gen = ImageDataGenerator(rotation_range=10,
                                    width_shift_range=10,
                                    height_shift_range=10,
                                    zoom_range=0.3,
                                    horizontal_flip=True,
                                    fill_mode='reflect',
                                    rescale=1. / 255,
                                    validation_split=0.3,
                                    preprocessing_function=preproc_fun_fixed
                                    )

test_data_gen = ImageDataGenerator(rescale=1. / 255, preprocessing_function=preproc_fun_fixed)

classes = ['0', '1', '2']
save_dir = os.path.join(train_path, 'augmented')

import pandas as pd
images = [f for f in os.listdir(test_path)]
images = pd.DataFrame(images)
images.rename(columns = {0:'filename'}, inplace = True)
images["class"] = 'test'

bs = 32

train_gen = train_data_gen.flow_from_directory(train_path,
                                               target_size=(max_w, max_h),
                                               seed=SEED,
                                               classes=classes,
                                               #save_prefix='training_aug',
                                               #save_to_dir=os.path.join(save_dir, 'training'),
                                               subset='training',
                                               shuffle=True,
                                               batch_size=bs
                                               )

valid_gen = train_data_gen.flow_from_directory(train_path,
                                               target_size=(max_w, max_h),
                                               seed=SEED,
                                               classes=classes,
                                               #save_prefix='validation',
                                               #save_to_dir=os.path.join(save_dir, 'validation'),
                                               subset='validation',
                                               shuffle=False,
                                               batch_size=bs
                                               )

test_gen = test_data_gen.flow_from_dataframe(images,
                                               test_path,
                                               batch_size=bs,
                                               target_size=(max_h, max_w),
                                               class_mode='categorical',
                                               shuffle=False,
                                               seed=SEED)

# set the right order for predictions
test_gen.reset()

train_set = tf.data.Dataset.from_generator(lambda: train_gen,
                                           output_types=(tf.float32, tf.float32),
                                           output_shapes=(
                                               [None, max_w, max_h, 3],
                                               [None, len(classes)]
                                           ))

validation_set = tf.data.Dataset.from_generator(lambda: valid_gen,
                                                output_types=(tf.float32, tf.float32),
                                                output_shapes=(
                                                    [None, max_w, max_h, 3],
                                                    [None, len(classes)]
                                                ))

test_set = tf.data.Dataset.from_generator(lambda: test_gen,
                                          output_types=(tf.float32, tf.float32),
                                          output_shapes=(
                                              [None, max_w, max_h, 3],
                                              [None, len(classes)]
                                          ))

train_set.repeat()
validation_set.repeat()
test_set.repeat()

In [None]:
start_f = 6
depth_conv = 6
depth_fc = 4

model = CNNClassifier(depth_conv=depth_conv,
                      start_f=start_f,
                      num_classes=len(classes),
                      regularizer=tf.keras.regularizers.l1(),
                      dropout=0.4,
                      depth_fc=depth_fc
                      )

model.build(input_shape=(None, max_h, max_w, 3))

model.summary()

In [None]:
callbacks = []
tensorboard = False
if tensorboard:
  tracking_address = os.path.join(os.getcwd(), "tracking_dir")
  tb = program.TensorBoard()
  tb.configure(argv=[None, '--logdir', tracking_address])
  url = tb.launch()

  if not os.path.exists(tracking_address):
      os.makedirs(tracking_address)

  now = datetime.now().strftime('%b%d_%H-%M-%S')

  model_name = 'CNN'

  exp_dir = os.path.join(tracking_address, model_name + '_' + str(now))
  if not os.path.exists(exp_dir):
      os.makedirs(exp_dir)

  ckpt_dir = os.path.join(exp_dir, 'ckpts')
  if not os.path.exists(ckpt_dir):
      os.makedirs(ckpt_dir)

  ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(ckpt_dir, 'cp_{epoch:02d}.ckpt'),
                                                    save_weights_only=True)  # False to save the model directly
  callbacks.append(ckpt_callback)

  tb_dir = os.path.join(exp_dir, 'tb_logs')
  if not os.path.exists(tb_dir):
      os.makedirs(tb_dir)

  # By default shows losses and metrics for both training and validation
  tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir,
                                              profile_batch=0,
                                              histogram_freq=1)  # if 1 shows weights histograms
  callbacks.append(tb_callback)
  %load_ext tensorboard
  %tensorboard --logdir /content/tracking_dir

In [None]:
early_stop = True
if early_stop:
    es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, verbose=1, restore_best_weights=True)
    callbacks.append(es_callback)
    cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath = os.getcwd() + '/drive/My Drive/weights_nik.h5',
      verbose=1, save_best_only=True, save_weights_only=False)
    callbacks.append(cp_callback)

In [None]:
loss = tf.keras.losses.CategoricalCrossentropy()
# maybe explore learning rate solutions
lr = 1e-3
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
metrics = ['accuracy']
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

In [None]:
train = True
retrain = True
if train:
  if retrain:
    model.load_weights('/content/drive/My Drive/weights_nik.h5')
  model.fit(x=train_set,
            epochs=100,  #### set repeat in training dataset
            steps_per_epoch=len(train_gen),
            validation_data=validation_set,
            validation_steps=len(valid_gen),
            callbacks=callbacks)
else:
  model.load_weights('/content/drive/My Drive/weights_nik.h5')

In [None]:
#testing

In [None]:
def create_csv(results, results_dir='/content/drive/My Drive'):

    csv_fname = 'results_'
    csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

    with open(os.path.join(results_dir, csv_fname), 'w') as f:

        f.write('Id,Category\n')

        for key, value in results.items():
            f.write(key + ',' + str(value) + '\n')

In [None]:
predictions = model.predict_generator(test_gen, len(test_gen), verbose=1)

In [None]:
import numpy as np

results = {}
images = test_gen.filenames
i = 0

for p in predictions:
  prediction = np.argmax(p)
  import ntpath
  image_name = ntpath.basename(images[i])
  results[image_name] = str(prediction)
  i = i + 1

In [None]:
create_csv(results)

In [None]:
!cp results_Nov15_11-49-28.csv "/content/drive/My Drive/"