# Deep Learning 2024 - project 1
Kinga Frańczak, 313335

Grzegorz Zakrzewski, 313555

In [None]:
# tensorflow==2.10.1
# pandas==1.5.3
# numpy==1.26.4
# seaborn==0.13.2
# matplotlib==3.8.3

import pandas as pd
import tensorflow as tf
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_theme(style="whitegrid", font_scale=0.9)
np.random.seed(0)
keras.utils.set_random_seed(0)

#### Loading data

In [None]:
REPETITIONS = 3  # number of repetitions of each experiment
EPOCHS = 30  # number of epochs usually used to train the models

directory_train = './archive/cinic-10_image_classification_challenge-dataset/train'
directory_test = './archive/cinic-10_image_classification_challenge-dataset/train'

In [None]:
ds_train, ds_valid = keras.preprocessing.image_dataset_from_directory(
    directory_train,
    seed=0,
    image_size=(32, 32),
    validation_split=0.2,
    subset="both"
)

#### Additional functions

In [None]:
def fit_model(
    model,
    name="",
    callbacks=[],
    optimizer="sgd",
    epochs=EPOCHS,
    repetitions=REPETITIONS,
):
    results = []
    for i in range(repetitions):
        model = keras.models.clone_model(model)
        model.compile(
            optimizer=optimizer,
            loss="sparse_categorical_crossentropy",
            metrics=["accuracy"],
        )
        history = model.fit(ds_train, validation_data=ds_valid, epochs=epochs, callbacks=callbacks)
        frame = pd.DataFrame(history.history)
        frame["Name"] = name
        frame["Repetition"] = i
        frame = frame.reset_index().rename(columns={"index": "Epoch"})
        results.append(frame)
    return pd.concat(results)

### Experiment 0.1 - simple convolutional neural network - testing various architectures

In [None]:
# Model I - one convolutional layer 

model1 = keras.models.Sequential()

model1.add(keras.Input(shape=(32, 32, 3)))

# Convolutional layers
model1.add(keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu'))
model1.add(keras.layers.MaxPooling2D(pool_size=(2, 2)))
model1.add(keras.layers.BatchNormalization())
model1.add(keras.layers.Flatten())

# Dense layers
model1.add(keras.layers.Dense(32, activation='relu')) 
model1.add(keras.layers.Dense(10, activation='softmax')) 

results1 = fit_model(model1, name="1 conv layer")
results1.to_csv("results/experiment01_results1.csv", index=False)

In [None]:
# Model II - two convolutional layers 

model2 = keras.models.Sequential()
model2.add(keras.Input(shape=(32, 32, 3)))
model2.add(keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu'))
model2.add(keras.layers.MaxPooling2D(pool_size=(2, 2)))
model2.add(keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu'))
model2.add(keras.layers.BatchNormalization())
model2.add(keras.layers.Flatten())
model2.add(keras.layers.Dense(32, activation='relu')) 
model2.add(keras.layers.Dense(10, activation='softmax')) 

results2 = fit_model(model2, name="2 conv layers")
results2.to_csv("results/experiment01_results2.csv", index=False)

In [None]:
# Model III - three convolutional layers 

model3 = keras.models.Sequential()
model3.add(keras.Input(shape=(32, 32, 3)))
model3.add(keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu'))
model3.add(keras.layers.MaxPooling2D(pool_size=(2, 2)))
model3.add(keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu'))
model3.add(keras.layers.MaxPooling2D(pool_size=(2, 2)))
model3.add(keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu'))
model3.add(keras.layers.BatchNormalization())
model3.add(keras.layers.Flatten())
model3.add(keras.layers.Dense(32, activation='relu')) 
model3.add(keras.layers.Dense(10, activation='softmax'))

results3 = fit_model(model3, name="3 conv layers")
results3.to_csv("results/experiment01_results3.csv", index=False)

In [None]:
results1 = pd.read_csv("results/experiment01_results1.csv")
results2 = pd.read_csv("results/experiment01_results2.csv")

In [None]:
experiment01_results = pd.concat([results1, results2, results3])
experiment01_results.to_csv("results/experiment01_results.csv", index=False)

## #1 - Testing hyper-parameters

## #1a - training

In [None]:
# baseline model -> the best model from Experiment 0.1
model = keras.models.Sequential()
model.add(keras.Input(shape=(32, 32, 3)))
model.add(keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu'))
model.add(keras.layers.MaxPooling2D(pool_size=(2, 2)))
model.add(keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu'))
model.add(keras.layers.MaxPooling2D(pool_size=(2, 2)))
model.add(keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu'))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(32, activation='relu')) 
model.add(keras.layers.Dense(10, activation='softmax'))

### Experiment 1.1. - optimizers

In [None]:
OPTIMIZERS = ["sgd", "rmsprop", "adam", "adadelta"]
experiment11_results = []

for optimizer in OPTIMIZERS:
    results = fit_model(model, name=optimizer, optimizer=optimizer)
    experiment11_results.append(results)

experiment11_results = pd.concat(experiment11_results)
experiment11_results.to_csv("results/experiment11_results.csv", index=False)

### Experiment 1.2. - learning rate

In [None]:
# the best optimazer from Experiment 1.1 is: 
    # Adam optimizer

experiment12_results = []

for learning_rate in [0.0001, 0.0005, 0.001, 0.005, 0.01]:
    optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
    results = fit_model(model, name=f"Learning rate = {learning_rate}", optimizer=optimizer)
    experiment12_results.append(results)

experiment12_results = pd.concat(experiment12_results)
experiment12_results.to_csv("results/experiment12_results.csv", index=False)

## #1b - regularization

In [None]:
# the best optimizer and with the finest value of learning rate parameter from Experiments 1.1 and 1.2
optimizer = keras.optimizers.Adam(learning_rate=0.001)

### Experiment 1.3. - dropout rate

In [None]:
experiment13_results = []

for dropout_rate in [0.2, 0.4, 0.6]:

    # the best model from Experiment 0.1
    model = keras.models.Sequential()
    model.add(keras.Input(shape=(32, 32, 3)))
    model.add(keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu'))
    model.add(keras.layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu'))
    model.add(keras.layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu'))
    model.add(keras.layers.BatchNormalization())
    model.add(keras.layers.Flatten())

    # dropout
    model.add(keras.layers.Dropout(dropout_rate))

    model.add(keras.layers.Dense(32, activation='relu')) 
    model.add(keras.layers.Dense(10, activation='softmax'))

    results = fit_model(model, name=f"Dropout rate = {dropout_rate}", optimizer=optimizer)
    experiment13_results.append(results)

experiment13_results = pd.concat(experiment13_results)
experiment13_results.to_csv("results/experiment13_results.csv", index=False)

### Experiment 1.4. - early stopping

In [None]:
# the model with best architecture, optimizer, learning rate and dropout from previous experiments
model = keras.models.Sequential()
model.add(keras.Input(shape=(32, 32, 3)))
model.add(keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu'))
model.add(keras.layers.MaxPooling2D(pool_size=(2, 2)))
model.add(keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu'))
model.add(keras.layers.MaxPooling2D(pool_size=(2, 2)))
model.add(keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu'))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Flatten())
model.add(keras.layers.Dropout(0.2))
model.add(keras.layers.Dense(32, activation='relu')) 
model.add(keras.layers.Dense(10, activation='softmax'))

In [None]:
experiment14_results = []

for patience in [2, 4, 6]:
    callback = keras.callbacks.EarlyStopping(monitor="val_loss", patience=patience)

    results = fit_model(model, name=f"Patience = {patience}", optimizer=optimizer, callbacks=[callback], epochs=40)
    experiment14_results.append(results)

experiment14_results = pd.concat(experiment14_results)
experiment14_results.to_csv("results/experiment14_results.csv", index=False)

## #2 - Testing augmentation techniques

### Experiment 2.1 - image flips

In [None]:
# keras.layers.RandomFlip

### Experiment 2.2 - image rotations

In [None]:
# keras.layers.RandomRotation

### Experiment 2.3 - image shifts

In [None]:
# keras.layers.RandomTranslation

### Experiment 2.4 - MixUp augmentation (more advanced technique)

In [None]:
# https://keras.io/examples/vision/mixup/

from tensorflow import data as tf_data
from tensorflow.random import gamma as tf_random_gamma

AUTO = tf_data.AUTOTUNE

def sample_beta_distribution(size, concentration_0=0.2, concentration_1=0.2):
    gamma_1_sample = tf_random_gamma(shape=[size], alpha=concentration_1)
    gamma_2_sample = tf_random_gamma(shape=[size], alpha=concentration_0)
    return gamma_1_sample / (gamma_1_sample + gamma_2_sample)


def mix_up(ds_one, ds_two, alpha=0.2):
    # Unpack two datasets
    images_one, labels_one = ds_one
    images_two, labels_two = ds_two
    batch_size = keras.ops.shape(images_one)[0]

    # Sample lambda and reshape it to do the mixup
    l = sample_beta_distribution(batch_size, alpha, alpha)
    x_l = keras.ops.reshape(l, (batch_size, 1, 1, 1))
    y_l = keras.ops.reshape(l, (batch_size, 1))

    # Perform mixup on both images and labels by combining a pair of images/labels
    # (one from each dataset) into one image/label
    images = images_one * x_l + images_two * (1 - x_l)
    labels = labels_one * y_l + labels_two * (1 - y_l)
    return (images, labels)

ds_train_mixup = ds_train.map(
    lambda ds_one, ds_two: mix_up(ds_one, ds_two, alpha=0.2),
    num_parallel_calls=AUTO,
)

## #3 - Testing pre-trained models 