In [None]:
from google.colab import drive
drive.mount('/gdrive')

In [None]:
%cd /gdrive/My Drive/Datasets/homework1

In [None]:
!mkdir ~/leaves_hw
!cp ./dataset_70_15_15.zip ~/leaves_hw/

In [None]:
%cd ~/leaves_hw
!ls

In [None]:
import tensorflow as tf
import numpy as np
import os
import random
import pandas as pd
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.metrics import confusion_matrix
from sklearn.utils import class_weight
from PIL import Image

tfk = tf.keras
tfkl = tf.keras.layers
print(tf.__version__)

In [None]:
# Random seed for reproducibility
seed = 57

random.seed(seed)
np.random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)


In [None]:
#Dati
input_shape = (256, 256, 3)
epochs = 400
n_classes = 14

In [None]:
# Load the dataset to be used for classification

# ################################
# ################################
# ################################
# ################################

!unzip dataset_70_15_15.zip

# ################################
# ################################
# ################################
# ################################


In [None]:
dataset_dir = './dataset'
training_dir = os.path.join(dataset_dir, 'training')
validation_dir = os.path.join(dataset_dir, 'validation')
test_dir = os.path.join(dataset_dir, 'test')

In [None]:
# # Plot example images from dataset

# # lables in lexicografial order # index
lables = []

f = open("./dataset/classes.csv")

lines = f.readlines()

for line in lines:
	lables.append(line.strip())
 
print(lables)


In [None]:
from tensorflow.keras.applications.vgg16 import preprocess_input



def add_noise(img):
    '''Add random noise to an image'''
#     img = img.astype('float64')
    VARIABILITY = 5
    deviation = VARIABILITY*random.random()
    noise = np.random.normal(0, deviation, img.shape)
    img += noise
    np.clip(img, 0., 255.)
    return img

def image_prep(img):
  img = preprocess_input(img)
  img = add_noise(img)
  
  return img

In [None]:
# Images are divided into folders, one for each class. 
# If the images are organized in such a way, we can exploit the 
# ImageDataGenerator to read them from disk.

from tensorflow.keras.preprocessing.image import ImageDataGenerator

train_data_generator = ImageDataGenerator(
                          rotation_range=180, 
                          height_shift_range=10,
                          width_shift_range=10,
                          zoom_range=0.7,
                          horizontal_flip=True,
                          vertical_flip=True,
                          #brightness_range=(-0.05,0.05),
                          fill_mode='nearest',
                          preprocessing_function=image_prep,
#                          rescale=1/255.
                     )

valid_data_generator = ImageDataGenerator(
    preprocessing_function=preprocess_input
)
test_data_generator = ImageDataGenerator(
    preprocessing_function=preprocess_input
)

train_gen = train_data_generator.flow_from_directory(
    directory=training_dir, # the directory
    # target_size=(408,408), # the size I want for the images (it resize the images to that one) - we use the original one
    target_size=(256,256), # the size I want for the images (it resize the images to that one) - we use the original one
    color_mode='rgb', # use the tree channels for the colors
    classes=lables, # default is None (this allows to decide from the name of the classes which is the integer to be associated to this class)
                  #   with None, it will read the folder name, and order it in lexicographical order
                  #   classes=lables    ## will give the integer associated to the classes as lable!
    batch_size=20,
    shuffle=True, # to shuffle the data at the end of every epoch 
    seed=seed
)

valid_gen = valid_data_generator.flow_from_directory(
    directory=validation_dir, # the directory
    target_size=(256,256), # the size I want for the images (it resize the images to that one) - we use the original one
    color_mode='rgb', # use the tree channels for the colors
    classes=lables, # default is None (this allows to decide from the name of the classes which is the integer to be associated to this class)
                  #   with None, it will read the folder name, and order it in lexicographical order
                  #   classes=lables    ## will give the integer associated to the classes as lable!
    batch_size=20,
    shuffle=False, # to shuffle the data at the end of every epoch 
                   # shuffling is not important in the validation set!
    seed=seed
)

test_gen = test_data_generator.flow_from_directory(
    directory=test_dir, # the directory
    target_size=(256,256), # the size I want for the images (it resize the images to that one) - we use the original one
    color_mode='rgb', # use the tree channels for the colors
    classes=lables, # default is None (this allows to decide from the name of the classes which is the integer to be associated to this class)
                  #   with None, it will read the folder name, and order it in lexicographical order
                  #   classes=lables    ## will give the integer associated to the classes as lable!
    batch_size=8,
    shuffle=False, # to shuffle the data at the end of every epoch 
    seed=seed
)

In [None]:
from sklearn.utils import class_weight


class_weights = class_weight.compute_class_weight(
                                        class_weight = "balanced",
                                        classes = np.unique(train_gen.labels),
                                        y = train_gen.labels                                                    
                                    )

print(class_weights)

minimum = 0
minimum = min(class_weights)

cv = {}
for i in range(14):
  cv[i] = class_weights[i]/minimum

print(cv)

In [None]:
def get_next_batch(generator):
  batch = next(generator) # get the next batch

  image = batch[0]    # image   X
  target = batch[1]   # target  t

  print("(Input) image shape:", image.shape)
    # each image variable is a batch of 8 samples

  print("Target shape:",target.shape)
    # it is the one-hot encoding
    # this is why we have the shape of target (8,21)
    #     because it is converted to one-hot encoding

  # Visualize only the first sample
  image = image[0]
  target = target[0]
  target_idx = np.argmax(target)
  print()
  print("Categorical label:", target)
  print("Label:", target_idx)
  print("Class name:", lables[target_idx])
  fig = plt.figure(figsize=(6, 4))
  plt.imshow(np.uint8(image))

  return batch

In [None]:
# Get a sample from dataset and show info
_ = get_next_batch(train_gen)

In [None]:
# Model used:
def build_model(input_shape):
    global n_classes

    supernet = tfk.applications.VGG16(
      include_top=False, # remove the classifier from VGG (keep only the classifies)
      weights="imagenet", # takes the weight already trained
      input_shape=input_shape # shape
    )   

    supernet.trainable = False  # to not fit the VGG (the supernet)


    input_layer = tfkl.Input(shape=input_shape, name='Input')


    sup = supernet(input_layer)


    flattening_layer = tfkl.Flatten(name='Flatten')(sup)

    
    dp6 = tfkl.Dropout(0.3, seed=seed)(flattening_layer)
    classifier_layer_6 = tfkl.Dense(units=128, name='Classifier6', kernel_initializer=tfk.initializers.GlorotUniform(seed), activation='relu')(dp6)
    

    output_layer = tfkl.Dense(units=n_classes, activation='softmax', kernel_initializer=tfk.initializers.GlorotUniform(seed), name='Output')(classifier_layer_6)


    model = tfk.Model(inputs=input_layer, outputs=output_layer, name='model')

    optimizer=tfk.optimizers.Adam(
            learning_rate=0.0005
    )

    # Compile the model
    model.compile(
        loss=tfk.losses.CategoricalCrossentropy(), 
        optimizer=optimizer, 
        metrics='accuracy'
    )

    # Return the model
    return model

In [None]:

# Utility function to create folders and callbacks for training
from datetime import datetime

def create_folders_and_callbacks(model_name):

  exps_dir = os.path.join('logs_and_results')
  if not os.path.exists(exps_dir):
      os.makedirs(exps_dir)

  now = datetime.now().strftime('%b%d_%H-%M-%S')

  exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
  if not os.path.exists(exp_dir):
      os.makedirs(exp_dir)
      
  callbacks = []

  # Model checkpoint
  # ----------------
  ckpt_dir = os.path.join(exp_dir, 'ckpts')
  if not os.path.exists(ckpt_dir):
      os.makedirs(ckpt_dir)

  ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(ckpt_dir, 'cp'),
                                                     save_weights_only=False,
                                                     save_best_only=False)
  # callbacks.append(ckpt_callback)

  # Visualize Learning on Tensorboard
  # ---------------------------------
  tb_dir = os.path.join(exp_dir, 'tb_logs')
  if not os.path.exists(tb_dir):
      os.makedirs(tb_dir)
      
  tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir,
                                               profile_batch=0,
                                               histogram_freq=1)
  # callbacks.append(tb_callback)


  # -------------------- #
  # CHANGE learning_rate #
  # -------------------- #
  from keras.callbacks import LearningRateScheduler

  # This is a sample of a scheduler I used in the past
  def lr_scheduler(epoch, lr):
    if epoch > 4:
      lr = lr * tf.math.exp(-0.1)
    return lr

  change_lr_callback = LearningRateScheduler(lr_scheduler, verbose=1)
  callbacks.append(change_lr_callback)

  # Early Stopping
  # --------------
  es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=7, restore_best_weights=True)
  callbacks.append(es_callback)

  return callbacks

In [None]:
# Build model (for NO augmentation training)
model = build_model(input_shape)
model.summary()

In [None]:
# Creathttps://accounts.google.com/o/oauth2/approval/v2/approvalnativeapp?auto=false&response=code%3D4%2F1AX4XfWgtWkgsNgsKy4bK1aIMgpqWjN2OMh4-oTu0pSKreeCuYpaNqoHE-C4%26scope%3Demail%2520https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email%2520https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%2520https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%2520https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%2520https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly%2520https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.activity.readonly%2520https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fexperimentsandconfigs%2520https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fphotos.native%2520openid%26authuser%3D1%26prompt%3Dconsent&hl=it&approvalCode=4%2F1AX4XfWgtWkgsNgsKy4bK1aIMgpqWjN2OMh4-oTu0pSKreeCuYpaNqoHE-C4e folders and callbacks and fit
callbacks = create_folders_and_callbacks(model_name='CNN_')

# Train the model
history = model.fit(
    x = train_gen,
    epochs = epochs,
    validation_data = valid_gen,
    class_weight = cv,
    callbacks = callbacks,
).history

In [None]:
model.save("/gdrive/My Drive/Datasets/homework1/models_VGG_TL_2___/train_TL_VGG_1")

In [None]:
model.save("/gdrive/My Drive/Datasets/homework1/models_VGG_TL_2___/train_TL_VGG_1.h5")

In [None]:
model = tfk.models.load_model("/gdrive/My Drive/Datasets/homework1/models_VGG_TL_2___/train_TL_VGG_1.h5")

.
.
.
FINE TUNING
.
.
.

In [None]:
model.get_layer('vgg16').trainable = True

for i, layer in enumerate(model.get_layer('vgg16').layers[:14]):
  layer.trainable=False

In [None]:
model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(1e-4), metrics='accuracy')

In [None]:

# Utility function to create folders and callbacks for training
from datetime import datetime

def create_folders_and_callbacks_ft(model_name):

  exps_dir = os.path.join('logs_and_results')
  if not os.path.exists(exps_dir):
      os.makedirs(exps_dir)

  now = datetime.now().strftime('%b%d_%H-%M-%S')

  exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
  if not os.path.exists(exp_dir):
      os.makedirs(exp_dir)
      
  callbacks = []

  # Model checkpoint
  # ----------------
  ckpt_dir = os.path.join(exp_dir, 'ckpts')
  if not os.path.exists(ckpt_dir):
      os.makedirs(ckpt_dir)

  ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(ckpt_dir, 'cp'),
                                                     save_weights_only=False,
                                                     save_best_only=False)
  # callbacks.append(ckpt_callback)

  # Visualize Learning on Tensorboard
  # ---------------------------------
  tb_dir = os.path.join(exp_dir, 'tb_logs')
  if not os.path.exists(tb_dir):
      os.makedirs(tb_dir)
      
  tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir,
                                               profile_batch=0,
                                               histogram_freq=1)
  # callbacks.append(tb_callback)


  # -------------------- #
  # CHANGE learning_rate #
  # -------------------- #
  from keras.callbacks import LearningRateScheduler

  # This is a sample of a scheduler I used in the past
  def lr_scheduler(epoch, lr):
    if epoch > 4:
      lr = lr * tf.math.exp(-0.1)
    return lr

  change_lr_callback = LearningRateScheduler(lr_scheduler, verbose=1)
  callbacks.append(change_lr_callback)

  # Early Stopping
  # --------------
  es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=5, restore_best_weights=True)
  callbacks.append(es_callback)

  return callbacks

In [None]:
callbacks = create_folders_and_callbacks_ft(model_name='CNN_FT_')

# Fine-tune the model
history = model.fit(
    x = train_gen,
    epochs = epochs,
    validation_data = valid_gen,
    class_weight = cv,
    callbacks = callbacks
).history

In [None]:
model.save("/gdrive/My Drive/Datasets/homework1/models_VGG_TL_2___/train_TL_VGG_1_TL_1.h5")