# Base Pipeline
This is the base pipeline with which we have trained the models. 

Before writing this notebook down we have tried many different options. Also, we have made several variations of this notebook as well.

In [None]:
# Set to false if working on local env
onColab = True

# Decide batch size and validation split
batch_size = 128
validation_split = 0.3

# Set path to training folder. This folder can contain as many subfolders(/classes) as you need
dataset_dir = 'training'
dataset_path = './' + dataset_dir
img_h, img_w = (256, 256)

In [None]:
#@title Import libraries
import os
import time
import scipy
import random
import matplotlib as mpl
import matplotlib.pyplot as plt
from PIL import Image
import tensorflow as tf
import numpy as np
import cv2

from tensorflow.keras.preprocessing.image import ImageDataGenerator
tfk = tf.keras
tfkl = tf.keras.layers

In [None]:
if onColab:
  from google.colab import drive
  drive.mount('/gdrive')
  %cd /gdrive/MyDrive/University/ANN/CHALLENGE1/with_img_divided/moreDivision
  !ls

In [None]:
# set to True to unzip dataset.zip
doUnzipData = False

if doUnzipData:
  !unzip dataset.zip

In [None]:
#@title init seed everywhere
seed =20

random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)

In [None]:
# we also leave the option to make a proper test set. Due to the scarcity of a lot of classes, we have decided to use only the validation set.
test_path = './test'

def load_data(dontUseFun=True, fun=None, isTest = False):
  """
  load data from folder -> train, val, test. It loads data and augment it as well.
  dontUseFun: if it's true then the data is not preprocessed. If it's false, the data is preprocessed with fun (which must not be none)
  fun: function to preprocess data (only used if dontUseFun is false)
  isTest: False if we don't have/want a test set, true otherwise
  """
  image_generator = ImageDataGenerator(preprocessing_function = fun, validation_split=validation_split, 
                                          rotation_range=30,  # augment with rotation
                                          height_shift_range=50,  # augment with shift
                                          width_shift_range=50,
                                          zoom_range=0.3,  # augment with zoom
                                          horizontal_flip=True,  # augment by flipping
                                          vertical_flip=True, 
                                          fill_mode='reflect') 
  if dontUseFun:
    image_generator = ImageDataGenerator(validation_split=validation_split, 
                                          rotation_range=30,
                                          height_shift_range=50,
                                          width_shift_range=50,
                                          zoom_range=0.3,
                                          horizontal_flip=True,
                                          vertical_flip=True, 
                                          fill_mode='reflect')  


  # Obtain a data generator with the 'ImageDataGenerator.flow_from_directory' method
  train_gen = image_generator.flow_from_directory(directory=dataset_path,
                                                target_size=(img_h, img_w),
                                                color_mode='rgb',
                                                classes=None, # can be set to None
                                                class_mode='categorical',
                                                subset='training',
                                                batch_size=batch_size,
                                                shuffle=True,
                                                seed=seed)

  valid_gen = image_generator.flow_from_directory(directory=dataset_path,
                                                target_size=(img_h, img_w),
                                                color_mode='rgb',
                                                classes=None, # can be set to None
                                                class_mode='categorical',
                                                subset='validation',
                                                batch_size=batch_size,
                                                shuffle=False,
                                                seed=seed)
  test_gen = None
  if isTest:
    test_image_gen = ImageDataGenerator(preprocessing_function = fun)
    if dontUseFun:
      test_image_gen = ImageDataGenerator(rotation_range=30)
    
    test_gen = test_image_gen.flow_from_directory(directory=test_path,
                                                target_size=(img_h, img_w),
                                                color_mode='rgb',
                                                classes=None, # can be set to None
                                                class_mode='categorical',
                                                batch_size=batch_size,
                                                shuffle=True, # sometimes, instead of train, we use the test for training
                                                seed=seed)
  return train_gen, valid_gen, test_gen

In [None]:
# we need this ONLY for the labels
train_gen = load_data(True, None, False)[0]
labels = list(train_gen.class_indices.keys())

In [None]:
print(labels)

In [None]:
#@title SETTINGS
# IF YOU CHANGE NET GO CHECK THE FINE TUNING CODE IF IT IS ALRIGHT!

model_dir = 'NAME_OF_FOLDER_IN_WHICH_YOU_SAVE_THE_MODEL'

# Highly dependent on the network chosen:
n_not_trainable_layers = 398

# TO USE THIS PARAMETER ADD HIDDEN LAYER (in previous pipelines we used hidden layer and we left this parameters here in case we wanted to swap back again)
drop_out_last_dense = 0
fc_dense_n_neurons = 0

########################################################
# choose network and its preprocessing function
net = tfk.applications.DenseNet121

def tl_preprocess(x):
  return tf.keras.applications.densenet.preprocess_input(x)
########################################################

In [None]:
def compute_weight(val_split):
  """
  compute class weight for each class in the training folder. 
  We also consider that a fraction of the data (val_split) is used for validation and not for training.
  """
  w = []
  for i in range(len(labels)):
    class_imgs = next(os.walk('{}/{}/'.format('./training', labels[i])))[2]
    w.append(len(class_imgs) * (1 - val_split))

  tot = sum(w)
  for i in range(len(w)):
    w[i] = tot / w[i]
  m = max(w)
  w = np.array(w) / m
  w[np.argmax(w)] = 0.99999999999999
  d = {}
  for i in range(len(w)):
    d[i] = w[i]
  return d

class_weight = compute_weight(validation_split)

In [None]:
comment = 'HIGH LEVEL COMMENT ON EXPERIMENT OF CURRENT MODEL'
comment += '\nnot trainable layers: ' + str(n_not_trainable_layers)
comment += ' drop_out_last_dense ' + str(drop_out_last_dense)
comment += ' fc_dense_n_neurons ' + str(fc_dense_n_neurons)
comment += '\nvalidation_split = ' + str(validation_split) 
comment += '. batch_size = ' + str(batch_size) 
comment += '. seed = ' + str(seed)
comment += '\nweigths: '
comment += str(class_weight)
print(comment)

In [None]:
# We leave the option to create a test set automatically. In this pipeline we have never used this.
create_test_set = False

if create_test_set:
  import shutil
  os.mkdir(test_path)
  for i in range(len(labels)):
      class_imgs = next(os.walk('{}/{}/'.format(dataset_dir, labels[i])))[2]
      # In this way we always get the same data. 
      # TODO: randomize using seed the data we take
      class_imgs.sort()
      for j in range(5):
        class_img = class_imgs[j]
        dest_dir = './test/' + labels[i] + '/'
        if not os.path.exists(dest_dir):
          os.mkdir(dest_dir)
        shutil.move('{}/{}/'.format(dataset_dir, labels[i]) + class_img, dest_dir)


In [None]:
def preprocess(x):
  # x = clean_image(x) # <- it was a function to darken the background of the images automatically. It required opencv, so we dropped it
  if tl_preprocess:
    return tl_preprocess(np.uint8(x))
  return x

In [None]:
train_gen, valid_gen, test_gen = load_data(False, preprocess)

In [None]:
def get_next_batch(generator):
  batch = next(generator)

  image = batch[0]
  target = batch[1]

  print("(Input) image shape:", image.shape)
  print("Target shape:",target.shape)

  # Visualize only the first sample
  image = image[0]
  target = target[0]
  target_idx = np.argmax(target)
  print()
  print("Categorical label:", target)
  print("Label:", target_idx)
  print("Class name:", labels[target_idx])
  fig = plt.figure(figsize=(6, 4))
  plt.imshow(image)
  plt.title("image which will be fed to the cnn")
  return batch

# Uncomment following lines only if you want to check that the dataset was loaded ok

# Get a sample from dataset and show info
# _ = get_next_batch(train_gen)

In [None]:
supernet =  net(
    weights="imagenet",
    input_shape=(img_h, img_w, 3),
    include_top=False
)

In [None]:
print(len(supernet.layers))

In [None]:
# let last block trainable
for i, layer in enumerate(supernet.layers[:n_not_trainable_layers]):
  layer.trainable=False
for i, layer in enumerate(supernet.layers):
  print(i, layer.name, layer.trainable)
# supernet.summary()

# Now we use the supernet as layer of our network
inputs = tfk.Input(shape=(img_h, img_w, 3))
x = supernet(inputs)

# MODIFY BELOW IF YOU WANT TO CHANGE CLASSIFIER

# Now we build our classifier (which will be actually trained)
x = tfkl.GlobalAveragePooling2D(name="avg_pool")(x)
x = tfkl.BatchNormalization()(x)

x = tfkl.Dropout(0.3, seed=seed)(x)
outputs = tfkl.Dense(
    len(labels), 
    activation='softmax',
    kernel_initializer = tfk.initializers.GlorotUniform(seed))(x)

model = tfk.Model(inputs=inputs, outputs=outputs, name='model')

# Compile the model
model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(), metrics='accuracy')

model.summary()

In [None]:
c = [tfk.callbacks.ModelCheckpoint('./ckpt_' + model_dir, save_best_only=True), tfk.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', patience=8, restore_best_weights=True)] 
history = model.fit(
    x = train_gen,
    batch_size = batch_size,
    epochs = 200,
    validation_data = valid_gen,
    class_weight = class_weight,
    callbacks = c
).history

In [None]:
model.save('./' + model_dir)

In [None]:
def compareModels(history1, h1lbl, history2=None, h2lbl=''):
  '''
  Plot history1 against history2. 
  If history2 is None then Plot twice history1 (it'll be overlapped)
  h1lbl and h2lbl are the labels of the two histories
  '''
  if history2 == None: 
    history2 = history1
    h2lbl = h1lbl
  # Plot the training
  plt.figure(figsize=(15,5))
  plt.plot(history1['loss'], alpha=.3, color='#ff7f0e', linestyle='--')
  plt.plot(history1['val_loss'], label=h1lbl, alpha=.8, color='#ff7f0e')
  plt.plot(history2['loss'], alpha=.3, color='#4D61E2', linestyle='--')
  plt.plot(history2['val_loss'], label=h2lbl, alpha=.8, color='#4D61E2')
  plt.legend(loc='upper left')
  plt.title('Categorical Crossentropy')
  plt.grid(alpha=.3)

  plt.figure(figsize=(15,5))
  plt.plot(history1['accuracy'], alpha=.3, color='#ff7f0e', linestyle='--')
  plt.plot(history1['val_accuracy'], label=h1lbl, alpha=.8, color='#ff7f0e')
  plt.plot(history2['accuracy'], alpha=.3, color='#4D61E2', linestyle='--')
  plt.plot(history2['val_accuracy'], label=h2lbl, alpha=.8, color='#4D61E2')
  plt.legend(loc='upper left')
  plt.title('Accuracy')
  plt.grid(alpha=.3)

  plt.show()
compareModels(history, model_dir)

In [None]:
model = tf.keras.models.load_model('./' + model_dir)

In [None]:
from sklearn.metrics import confusion_matrix
Y_pred = model.predict(valid_gen)
y_pred = np.argmax(Y_pred, axis=1)
print('Confusion Matrix')
cm = confusion_matrix(valid_gen.classes, y_pred)
print(cm)

In [None]:
# Plot the confusion matrix
import seaborn as sns
plt.figure(figsize=(10,8))
sns.heatmap(cm.T, xticklabels=labels, yticklabels=labels)
plt.xlabel('True labels')
plt.ylabel('Predicted labels')
plt.show()

In [None]:
# Let's replot the heatmap but this time we put vmax = 200, so that tomatoes don't cloud the colors of the heatmap and we can visualize the distribution a bit better
plt.figure(figsize=(10,8))
sns.heatmap(cm.T, xticklabels=labels, yticklabels=labels, vmax=200)
plt.xlabel('True labels')
plt.ylabel('Predicted labels')
plt.show()

In [None]:
# # Compute the classification metrics
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
target = valid_gen.classes
predictions = Y_pred
accuracy = accuracy_score(target, np.argmax(predictions, axis=1))
precision = precision_score(target, np.argmax(predictions, axis=1), average='macro')
recall = recall_score(target, np.argmax(predictions, axis=1), average='macro')
f1 = f1_score(target, np.argmax(predictions, axis=1), average='macro')
print('Accuracy:',accuracy.round(4))
print('Precision:',precision.round(4))
print('Recall:',recall.round(4))
print('F1:',f1.round(4))