# GDrive connection

In [None]:
from google.colab import drive
drive.mount('/gdrive')

In [None]:
%cd /gdrive/My Drive/HMK1

# Installs

In [None]:
!pip install tensorflow==2.10.1 # needed for ConvNextLarge model

In [None]:
!pip install keras-cv --upgrade # needed for cutout data augmentation

# Imports

In [None]:
from google.colab import drive
import tensorflow as tf
import numpy as np
import os
import random
import scipy as sp
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.metrics import confusion_matrix

from keras import backend as K
from keras.callbacks import *
from PIL import Image

tfk = tf.keras
tfkl = tf.keras.layers
print(tf.__version__)


2.10.1


# Env Setup

In [None]:
# Fixed random seed to make results as reproducible as possible
seed = 42
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)
batch_size = 32

# Data merging

> ⚠️ first-time use only!



In [None]:
# Define a function that takes a directory path and returns a sorted list of file paths for all the files in the directory
def folderToPaths(full_img_dir):
    # Initialize an empty list to store the file paths
    x_paths_list = []
    full_img_dir = full_img_dir
    # Iterate over the files in the directory and append their paths to the list
    for full in os.listdir(full_img_dir):
         x_paths_list.append(os.path.join(full_img_dir, full))
    # Sort the list alphabetically and return it
    x_paths_list.sort()
    return x_paths_list

In [None]:
# Set the paths for the directories containing the original data and the merged data
dataset_dir = 'data/original_data'
merged_dataset_dir = "data/merged_data"

# Define a list of labels associated with different species
labels = ['Species1',       # 0
          'Species2',       # 1
          'Species3',       # 2
          'Species4',       # 3
          'Species5',       # 4
          'Species6',       # 5
          'Species7',       # 6
          'Species8'        # 7
]

# merging all the images, ! one time use only !
for idx, class_label in enumerate(labels) :
  paths = folderToPaths(full_img_dir = '{}/{}/'.format(dataset_dir, class_label))
  
  for path in paths:
    s = str(idx) + "_" + path[-9:]
    new_path = os.path.join(merged_dataset_dir, s)
    shutil.copy(path, new_path)

# Data splitting - train/val/test version

In [None]:
paths_merged = folderToPaths(merged_dataset_dir)
labels_of_paths = [p[-11] for p in paths_merged] # get all labels from the image path

validation_percentage = 0.15
test_percentage = 0.2

X_train_val, X_test = train_test_split(paths_merged, test_size = test_percentage, shuffle = True, stratify = labels_of_paths)

labels_of_paths_train_val = [p[-11] for p in X_train_val]

X_train, X_val = train_test_split(X_train_val, test_size = validation_percentage, shuffle = True, stratify = labels_of_paths_train_val)

labels_train = [p[-11] for p in X_train]
labels_test = [p[-11] for p in X_test]
labels_val =  [p[-11] for p in X_val]

## Moving the file into the folders

In [None]:
import shutil
from tqdm import tqdm
import glob


for root, subdirectories, fi in os.walk("data/test/"):
    for sub in subdirectories:
        files = os.listdir(root+sub)
        for f in files:
            os.remove(root+sub+"/"+f)
for root, subdirectories, fi in os.walk("data/train/"):
    for sub in subdirectories:
        files = os.listdir(root+sub)
        for f in files:
            os.remove(root+sub+"/"+f)
for root, subdirectories, fi in os.walk("data/val/"):
    for sub in subdirectories:
        files = os.listdir(root+sub)
        for f in files:
            os.remove(root+sub+"/"+f)

for img in tqdm(zip(labels_train,X_train),total=len(labels_train)):
  shutil.copy(img[1],"data/train/"+str(int(img[0])+1))

for img in tqdm(zip(labels_test,X_test),total=len(labels_test)):
  shutil.copy(img[1],"data/test/"+str(int(img[0])+1))

for img in tqdm(zip(labels_val,X_val),total=len(labels_val)):
  shutil.copy(img[1],"data/val/"+str(int(img[0])+1))

## Keras data loader with test set

In [None]:
training_dir = "data/train"
validation_dir = "data/val"
test_dir = "data/test"

train_data = tf.keras.utils.image_dataset_from_directory(
    training_dir,
    labels='inferred',
    label_mode='categorical',
    color_mode='rgb',
    batch_size=batch_size,
    image_size=(96, 96),
    shuffle=True,
    seed=seed)

val_data = tf.keras.utils.image_dataset_from_directory(
    validation_dir,
    labels='inferred',
    label_mode='categorical',
    color_mode='rgb',
    batch_size=batch_size,
    image_size=(96, 96),
    shuffle=True,
    seed=seed)

test_data = tf.keras.utils.image_dataset_from_directory(
    test_dir,
    labels='inferred',
    label_mode='categorical',
    color_mode='rgb',
    batch_size=batch_size,
    image_size=(96, 96),
    shuffle=True,
    seed=seed)


# Data splitting - train/val version

In [None]:
paths_merged = folderToPaths(merged_dataset_dir)
labels_of_paths = [p[-11] for p in paths_merged]

val_percentage = 0.2

X_train, X_val = train_test_split(paths_merged, test_size = val_percentage, shuffle = True, stratify = labels_of_paths)

labels_train = [p[-11] for p in X_train]
labels_val = [p[-11] for p in X_val]

## Moving the files

In [None]:
import shutil
from tqdm import tqdm
import glob


for root, subdirectories, fi in os.walk("data/train2/"):
    for sub in subdirectories:
        files = os.listdir(root+sub)
        for f in files:
            os.remove(root+sub+"/"+f)
for root, subdirectories, fi in os.walk("data/val2/"):
    for sub in subdirectories:
        files = os.listdir(root+sub)
        for f in files:
            os.remove(root+sub+"/"+f)

for img in tqdm(zip(labels_train,X_train),total=len(labels_train)):
  shutil.copy(img[1],"data/train2//"+str(int(img[0])+1))

for img in tqdm(zip(labels_val,X_val),total=len(labels_val)):
  shutil.copy(img[1],"data/val2//"+str(int(img[0])+1))

100%|██████████| 2832/2832 [10:46<00:00,  4.38it/s]
100%|██████████| 709/709 [02:36<00:00,  4.52it/s]


## Keras data loader without test set

In [None]:
training_dir = "data/train2"
validation_dir = "data/val2"

train_data = tf.keras.utils.image_dataset_from_directory(
    training_dir,
    labels='inferred',
    label_mode='categorical',
    color_mode='rgb',
    batch_size=batch_size,
    image_size=(96, 96),
    shuffle=True,
    seed=seed)

val_data = tf.keras.utils.image_dataset_from_directory(
    validation_dir,
    labels='inferred',
    label_mode='categorical',
    color_mode='rgb',
    batch_size=batch_size,
    image_size=(96, 96),
    shuffle=True,
    seed=seed)

Found 2832 files belonging to 8 classes.
Found 709 files belonging to 8 classes.


# Cyclical Learning Rate

Cyclical learning rate (CLR) is a technique used in deep learning that involves varying the learning rate of the optimizer during training. Instead of using a fixed learning rate throughout the entire training process, the learning rate is gradually increased and then decreased in a cyclical manner. This technique was introduced by Leslie N. Smith in 2015.

The idea behind CLR is to allow the model to explore a wider range of learning rates and find the optimal learning rate for the given problem. This is achieved by gradually increasing the learning rate from a lower bound to an upper bound and then decreasing it back to the lower bound. This cycle can be repeated multiple times during the training process.

The benefits of CLR include faster convergence to the optimal solution and better generalization performance of the model. It can also help prevent the model from getting stuck in local minima by allowing it to escape to a different part of the loss landscape.

CLR can be implemented using various techniques, including triangular learning rate policy, triangular2 learning rate policy, and exponential learning rate policy. These policies differ in how the learning rate is varied over each cycle. The choice of policy depends on the specific problem and the architecture of the model being trained.

In [None]:
class CyclicLR(Callback):
    """
    code taken from https://github.com/bckenstler/CLR/blob/master/clr_callback.py

    This callback implements a cyclical learning rate policy (CLR).
    The method cycles the learning rate between two boundaries with
    some constant frequency, as detailed in this paper (https://arxiv.org/abs/1506.01186).
    The amplitude of the cycle can be scaled on a per-iteration or 
    per-cycle basis.
    This class has three built-in policies, as put forth in the paper.
    "triangular":
        A basic triangular cycle w/ no amplitude scaling.
    "triangular2":
        A basic triangular cycle that scales initial amplitude by half each cycle.
    "exp_range":
        A cycle that scales initial amplitude by gamma**(cycle iterations) at each 
        cycle iteration.
    For more detail, please see paper.
    
    # Example
        ```python
            clr = CyclicLR(base_lr=0.001, max_lr=0.006,
                                step_size=2000., mode='triangular')
            model.fit(X_train, Y_train, callbacks=[clr])
        ```
    
    Class also supports custom scaling functions:
        ```python
            clr_fn = lambda x: 0.5*(1+np.sin(x*np.pi/2.))
            clr = CyclicLR(base_lr=0.001, max_lr=0.006,
                                step_size=2000., scale_fn=clr_fn,
                                scale_mode='cycle')
            model.fit(X_train, Y_train, callbacks=[clr])
        ```    
    # Arguments
        base_lr: initial learning rate which is the
            lower boundary in the cycle.
        max_lr: upper boundary in the cycle. Functionally,
            it defines the cycle amplitude (max_lr - base_lr).
            The lr at any cycle is the sum of base_lr
            and some scaling of the amplitude; therefore 
            max_lr may not actually be reached depending on
            scaling function.
        step_size: number of training iterations per
            half cycle. Authors suggest setting step_size
            2-8 x training iterations in epoch.
        mode: one of {triangular, triangular2, exp_range}.
            Default 'triangular'.
            Values correspond to policies detailed above.
            If scale_fn is not None, this argument is ignored.
        gamma: constant in 'exp_range' scaling function:
            gamma**(cycle iterations)
        scale_fn: Custom scaling policy defined by a single
            argument lambda function, where 
            0 <= scale_fn(x) <= 1 for all x >= 0.
            mode paramater is ignored 
        scale_mode: {'cycle', 'iterations'}.
            Defines whether scale_fn is evaluated on 
            cycle number or cycle iterations (training
            iterations since start of cycle). Default is 'cycle'.
    """

    def __init__(self, base_lr=0.001, max_lr=0.006, step_size=2000., mode='triangular',
                 gamma=1., scale_fn=None, scale_mode='cycle'):
        super(CyclicLR, self).__init__()

        self.base_lr = base_lr
        self.max_lr = max_lr
        self.step_size = step_size
        self.mode = mode
        self.gamma = gamma
        if scale_fn == None:
            if self.mode == 'triangular':
                self.scale_fn = lambda x: 1.
                self.scale_mode = 'cycle'
            elif self.mode == 'triangular2':
                self.scale_fn = lambda x: 1/(2.**(x-1))
                self.scale_mode = 'cycle'
            elif self.mode == 'exp_range':
                self.scale_fn = lambda x: gamma**(x)
                self.scale_mode = 'iterations'
        else:
            self.scale_fn = scale_fn
            self.scale_mode = scale_mode
        self.clr_iterations = 0.
        self.trn_iterations = 0.
        self.history = {}
        self._reset()

    def _reset(self, new_base_lr=None, new_max_lr=None,
               new_step_size=None):
        """Resets cycle iterations.
        Optional boundary/step size adjustment.
        """
        if new_base_lr != None:
            self.base_lr = new_base_lr
        if new_max_lr != None:
            self.max_lr = new_max_lr
        if new_step_size != None:
            self.step_size = new_step_size
        self.clr_iterations = 0.
        
    def clr(self):
        cycle = np.floor(1+self.clr_iterations/(2*self.step_size))
        x = np.abs(self.clr_iterations/self.step_size - 2*cycle + 1)
        if self.scale_mode == 'cycle':
            return self.base_lr + (self.max_lr-self.base_lr)*np.maximum(0, (1-x))*self.scale_fn(cycle)
        else:
            return self.base_lr + (self.max_lr-self.base_lr)*np.maximum(0, (1-x))*self.scale_fn(self.clr_iterations)
        
    def on_train_begin(self, logs={}):
        logs = logs or {}

        if self.clr_iterations == 0:
            K.set_value(self.model.optimizer.lr, self.base_lr)
        else:
            K.set_value(self.model.optimizer.lr, self.clr())        
            
    def on_batch_end(self, epoch, logs=None):
        
        logs = logs or {}
        self.trn_iterations += 1
        self.clr_iterations += 1

        self.history.setdefault('lr', []).append(K.get_value(self.model.optimizer.lr))
        self.history.setdefault('iterations', []).append(self.trn_iterations)

        for k, v in logs.items():
            self.history.setdefault(k, []).append(v)
        
        K.set_value(self.model.optimizer.lr, self.clr())

# Final Model

## Supernet instantiation

In [None]:
supernet = tf.keras.applications.convnext.ConvNeXtLarge(
    include_top=False,
    weights="imagenet",
    input_shape=(224,224,3)
)
supernet.summary()


count = 1
for layer in supernet.layers:
    if count < 88:
        layer.trainable = False
    else:
        layer.trainable = True
    count = count + 1

## Final model building

In [None]:
import keras_cv #for CutOut agumentation
from keras import regularizers

inputs = tfk.Input((96,96,3))

# RESIZING
x = tfkl.Resizing(224,224,interpolation = "bicubic")(inputs)

# AUGMENTATION
x = keras_cv.layers.RandomCutout(0.35, 0.35)(x)

x = tf.keras.layers.RandomBrightness(
    0.25, value_range=(0, 255), seed=seed
)(x)

x= tf.keras.layers.RandomFlip(
    mode="horizontal_and_vertical", seed=seed
)(x)

x = tf.keras.layers.RandomTranslation(
    (-0.25, 0.25) ,
    (-0.25, 0.25) ,
    fill_mode="reflect",
    interpolation="bilinear",
    seed=seed
)(x)

x = tf.keras.layers.RandomRotation(
    (-0.25, 0.25),
    fill_mode="reflect",
    interpolation="bilinear",
    seed=seed
)(x)

x = tf.keras.layers.RandomContrast(0.2, seed=seed)(x)

x = supernet(x)
x = tf.keras.layers.GlobalAveragePooling2D()(x)
x = tf.keras.layers.Dropout(0.3)(x)
x = tf.keras.layers.Dense(1024, activation='relu', kernel_initializer = tfk.initializers.HeUniform(seed),
                          kernel_regularizer=regularizers.L1L2(l1=1e-5, l2=1e-4),
                          bias_regularizer=regularizers.L2(1e-4),
                          activity_regularizer=regularizers.L2(1e-4))(x)
outputs = tf.keras.layers.Dense(8, activation='softmax', kernel_initializer = tfk.initializers.GlorotUniform(seed))(x)


model = tfk.Model(inputs=inputs, outputs=outputs, name='model')
model.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(learning_rate = 1e-4), metrics='accuracy')

model.summary()

In [None]:
training_samples = int(len(train_data)*batch_size)
step_size = 5*training_samples // batch_size

clr = CyclicLR(
    mode='triangular',
    base_lr=1e-5, 
    max_lr=1e-4,
    step_size= step_size)

history = model.fit(
    train_data,
    epochs = 500,
    validation_data = val_data,
    callbacks = [tfk.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', patience=8, restore_best_weights=True),clr]
).history

Accordingly to what we stated in the report we decided to modify the Learning rate during the training. However, we did this by hand. One of the possible future works could be to implement a proper LR Scheduler.

In order to be able to submit our model to CodaLab we performed a little trick since keras_cv was not present in the test enviroment: we crafted another identical model with the same structure except the cutout layer, then we copied the weights from the complete one.

In [None]:
supernet1 = tf.keras.applications.convnext.ConvNeXtLarge(
    include_top=False,
    weights="imagenet",
    input_shape=(224,224,3)
)


count = 1
for layer in supernet1.layers:
    if count < 88:
        layer.trainable = False
    else:
        layer.trainable = True
    count = count + 1

In [None]:
from keras.layers import Dense,Flatten,GlobalAveragePooling2D, MaxPooling2D, BatchNormalization,Concatenate
from keras import regularizers

inputs = tfk.Input((96,96,3))

# RESIZING
x = tfkl.Resizing(224,224,interpolation = "bicubic")(inputs)

# AUGMENTATION
x = tf.keras.layers.RandomBrightness(
    0.25, value_range=(0, 255), seed=seed
)(x)

x= tf.keras.layers.RandomFlip(
    mode="horizontal_and_vertical", seed=seed
)(x)

x = tf.keras.layers.RandomTranslation(
    (-0.25, 0.25) ,
    (-0.25, 0.25) ,
    fill_mode="reflect",
    interpolation="bilinear",
    seed=seed
)(x)

x = tf.keras.layers.RandomRotation(
    (-0.25, 0.25),
    fill_mode="reflect",
    interpolation="bilinear",
    seed=seed
)(x)

x = tf.keras.layers.RandomContrast(0.2, seed=seed)(x)

x = supernet1(x)
x = tf.keras.layers.GlobalAveragePooling2D()(x)
x = tf.keras.layers.Dropout(0.3)(x)
x = tf.keras.layers.Dense(1024, activation='relu', kernel_initializer = tfk.initializers.HeUniform(seed),
                          kernel_regularizer=regularizers.L1L2(l1=1e-5, l2=1e-4),
                          bias_regularizer=regularizers.L2(1e-4),
                          activity_regularizer=regularizers.L2(1e-4))(x)

outputs = tf.keras.layers.Dense(8, activation='softmax', kernel_initializer = tfk.initializers.GlorotUniform(seed))(x)

# Connect input and output through the Model class
model2 = tfk.Model(inputs=inputs, outputs=outputs, name='model')

model2.compile(loss=tfk.losses.CategoricalCrossentropy(), optimizer=tfk.optimizers.Adam(learning_rate = 1e-4), metrics='accuracy')

model2.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_4 (InputLayer)        [(None, 96, 96, 3)]       0         
                                                                 
 resizing_1 (Resizing)       (None, 224, 224, 3)       0         
                                                                 
 random_brightness_1 (Random  (None, 224, 224, 3)      0         
 Brightness)                                                     
                                                                 
 random_flip_1 (RandomFlip)  (None, 224, 224, 3)       0         
                                                                 
 random_translation_1 (Rando  (None, 224, 224, 3)      0         
 mTranslation)                                                   
                                                                 
 random_rotation_1 (RandomRo  (None, 224, 224, 3)      0     

In [None]:
model2.set_weights(model.get_weights()) # Final model for Codalab submission

In [None]:
model2.save("model.h5")

#Prediction

We used Test Time Augmentation in our model inference to improve our accuracy

Test Time Augmentation (TTA) is a technique used in machine learning and computer vision to improve the accuracy of a trained model's predictions. TTA involves applying data augmentation techniques to test images, in addition to the original image, and then taking the average or maximum of the model's predictions on all the augmented images to make a final prediction.

The idea behind TTA is to introduce more variety into the test set to account for variations in the input images that were not present in the training set. By applying different types of transformations to the test images, such as rotation, scaling, or cropping, the model is forced to make more robust predictions and handle variations in the input data.

In [None]:
def flip_lr(images):
    return np.flip(images, axis=2)

def shift(images, shift, axis):
    return np.roll(images, shift, axis=axis)

def rotate(images, angle):
    return sp.ndimage.rotate(
        images, angle, axes=(1,2),
        reshape=False, mode='nearest')

def combine_predictions(predictions):
    pred_agg = np.mean(predictions, axis=0)
    preds = np.argmax(pred_agg, axis=-1)
    return preds

def tta_predict(m, x_test):
    pred = m.predict(x_test)

    pred_f = m.predict(flip_lr(x_test))

    pred_w0 = m.predict(shift(x_test, -3, axis=2))
    pred_w1 = m.predict(shift(x_test, 3, axis=2))

    pred_h0 = m.predict(shift(x_test, -3, axis=1))
    pred_h1 = m.predict(shift(x_test, 3, axis=1))

    pred_r0 = m.predict(rotate(x_test, -10))
    pred_r1 = m.predict(rotate(x_test, 10))
    out = combine_predictions(np.stack((pred, pred_h0, pred_h1, pred_w0, pred_w1, pred_f, pred_r0, pred_r1)))
    return tf.convert_to_tensor(out)

In [None]:
predictions = tta_predict(model2, test_data)