<a href="https://colab.research.google.com/github/pietropadovese/Prado-Pictures-Recognizer/blob/main/AMD_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install visualkeras

import pandas as pd
import numpy as np
import os
import re
import shutil
import zipfile

import tensorflow as tf
import keras

from scipy.optimize import fsolve
from math import exp

from tensorflow.keras.layers import(
    Conv2D,
    Dense,
    Flatten,
    Add,
    MaxPool2D,
    AveragePooling2D,
    GlobalAveragePooling2D,
    BatchNormalization,
    Dropout,
    ReLU,
    PReLU,
    concatenate,
  )

from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger
import tensorflow.keras.backend as K

import matplotlib.pyplot as plt

import visualkeras



Collecting visualkeras
  Downloading visualkeras-0.1.3-py3-none-any.whl.metadata (11 kB)
Collecting aggdraw>=1.3.11 (from visualkeras)
  Downloading aggdraw-1.3.19-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (655 bytes)
Downloading visualkeras-0.1.3-py3-none-any.whl (16 kB)
Downloading aggdraw-1.3.19-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (993 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m993.7/993.7 kB[0m [31m43.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: aggdraw, visualkeras
Successfully installed aggdraw-1.3.19 visualkeras-0.1.3


# 1) Store the Data

In [None]:
# Download file from Kaggle
os.environ["KAGGLE_USERNAME"] = "userdata.get('KAGGLE_USERNAME')"
os.environ["KAGGLE_KEY"] = "userdata.get('KAGGLE_KEY')"
!kaggle datasets download maparla/prado-museum-pictures
!kaggle datasets download maparla/prado-museum-pictures -f prado.csv
!unzip prado.csv.zip

Dataset URL: https://www.kaggle.com/datasets/maparla/prado-museum-pictures
License(s): MIT
Downloading prado-museum-pictures.zip to /content
100% 24.9G/24.9G [22:54<00:00, 17.8MB/s]
100% 24.9G/24.9G [22:54<00:00, 19.5MB/s]
Dataset URL: https://www.kaggle.com/datasets/maparla/prado-museum-pictures
License(s): MIT
Downloading prado.csv.zip to /content
 93% 17.0M/18.3M [00:02<00:00, 14.5MB/s]
100% 18.3M/18.3M [00:02<00:00, 9.25MB/s]
Archive:  prado.csv.zip
  inflating: prado.csv               


In [None]:
# Read teh csv file with url and info about images
df = pd.read_csv("prado.csv")
df["work_id"] = df['work_image_url'].apply(lambda x: x.split('/')[-1])
col_to_keep = ['work_id', 'work_image_url', 'author']
df = df[col_to_keep]

In [None]:
# Inspect the most common authors
values, counts = np.unique(df['author'], return_counts = True)
count_df = pd.DataFrame({'author': values, 'count': counts})
count_df = count_df.sort_values(by = 'count', ascending = False)
authors = count_df['author'][1:6].values #skipping Anonimo
authors = {author.split(" ")[0] : author for author in authors}

In [None]:
# Store the painting of each author in a different folder
zip_file_path = 'prado-museum-pictures.zip'
os.makedirs('./extracted_images', exist_ok = True)

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:

  namelist = [el.lstrip('images/images/') for el in zip_ref.namelist()]

  for author, author_full_name in authors.items():

    df_reduced = df[df['author'] == author_full_name]

    os.makedirs(f'./Data/{author}', exist_ok = True)

    for file in df_reduced['work_id'].unique():

      if file in namelist:

        extracted_path = zip_ref.extract(f'images/images/{file}', path = f'./extracted_images/')

        new_name = os.path.join(f'./Data/{author}')

        shutil.move(extracted_path, new_name)

In [None]:
!rm prado-museum-pictures.zip

# 2) Load the Data for the models

In [None]:
gb_folder = '/content/Data'
train_ds, val_ds = tf.keras.utils.image_dataset_from_directory(gb_folder,
                                                               label_mode="int",
                                                               color_mode="rgb",
                                                               batch_size=32,
                                                               image_size=(224, 224),
                                                               shuffle=True,
                                                               seed=42,
                                                               validation_split=0.1,
                                                               subset="both",
                                                               labels="inferred"
                                                               )

INPUT_SHAPE = (224, 224, 3)

Found 2058 files belonging to 5 classes.
Using 1853 files for training.
Using 205 files for validation.


Let's make sure to use buffered prefetching so we can yield data from disk without having I/O become blocking:
- Dataset.cache: keeps the images in memory after they're loaded off disk during the first epoch.
- Dataset.prefetch: overlaps data preprocessing and model executing while training.

In [None]:
AUTOTUNE = tf.data.AUTOTUNE

train_ds = train_ds.cache().prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)

In [None]:
# Create folders to save best model and metrics
!mkdir model_saves
!mkdir csv_logs
!mkdir training_history
!mkdir plots

In [None]:
# Defining a preporcessing step that will be used in all models

def preprocessing(inputs):

  X = tf.keras.layers.Rescaling(1./255)(inputs)

  X = tf.keras.layers.RandomFlip(mode = "horizontal")(X)

  X = tf.keras.layers.RandomRotation(factor = 0.2)(X)

  X = tf.keras.layers.Lambda(function=tf.image.per_image_standardization,
                           name="Per_image_standardisation")(X)
  return X

In [None]:
# Defining class weights and output bias

class_obs = np.array([1080, 446, 326, 290, 222])
class_frequencies = class_obs / sum(class_obs)

class_weights = {}

for cls, freq in enumerate(class_obs):
  class_weights[cls] = (1 / class_obs[cls]) * sum(class_obs) / len(class_obs)


def eqn(x, frequency=class_frequencies):

      sum_exp = sum([exp(x_i) for x_i in x])

      return [exp(x[i])/sum_exp - frequency[i] for i in range(len(frequency))]

output_bias = fsolve(func=eqn, x0=[0]*len(class_frequencies),).tolist()

output_bias = tf.keras.initializers.Constant(output_bias)

# 3) Creating the models

In [None]:
class Sequential:

    def __init__(self, input_shape = (INPUT_SHAPE)):

        self.inputs = tf.keras.Input(shape = input_shape)
        self.model = self._build_model()

    def _build_model(self):

        X = preprocessing(self.inputs)

        X = self._conv_block(X, 64, 2)

        X = self._conv_block(X, 128, 2)

        X = self._conv_block(X, 256, 3)

        X = self._conv_block(X, 512, 3)

        X = tf.keras.layers.Flatten()(X)

        X = tf.keras.layers.Dense(32, activation = 'relu')(X)
        X = BatchNormalization()(X)
        X = PReLU()(X)

        X = tf.keras.layers.Dense(16, activation = 'relu')(X)
        X = BatchNormalization()(X)
        X = PReLU()(X)

        X = tf.keras.layers.Dense(5, activation = 'softmax', kernel_initializer = "HeNormal", bias_initializer = output_bias)(X)

        return tf.keras.Model(inputs = self.inputs, outputs = X)

    def _conv_block(self, inputs, filters, repetitions, kernel_size = (3,3)):

        for i in range(repetitions):

            X = Conv2D(filters = filters,
                        kernel_size = kernel_size,
                        padding = 'same',
                        kernel_initializer = "HeNormal",
                        )(inputs)

            X = BatchNormalization()(X)

            X = ReLU()(X)

        X = AveragePooling2D(pool_size = (2,2))(X)

        return X

    def get_model(self):

        return self.model

In [None]:
class ResNet:

    def __init__(self, input_shape = (INPUT_SHAPE)):

        self.inputs = tf.keras.Input(shape = input_shape)
        self.model = self._build_model()

    def _build_model(self):

        X = preprocessing(self.inputs)

        X = self._conv_block(X, 64, 1)

        X = self._identity_block(X, 64, layer_rep = 2, increase_dim = False)

        X = self._identity_block(X, 128, layer_rep = 3, increase_dim = True)

        X = self._identity_block(X, 256, layer_rep = 3, increase_dim = True)

        X = self._identity_block(X, 512, layer_rep = 3, increase_dim = True)

        X = tf.keras.layers.Flatten()(X)

        X = tf.keras.layers.Dense(32, activation = 'relu')(X)
        X = BatchNormalization()(X)
        X = PReLU()(X)

        X = tf.keras.layers.Dense(16, activation = 'relu')(X)
        X = BatchNormalization()(X)
        X = PReLU()(X)

        X = tf.keras.layers.Dense(5, activation = 'softmax', kernel_initializer = "HeNormal", bias_initializer = output_bias)(X)

        return tf.keras.Model(inputs = self.inputs, outputs = X)


    def _conv_block(self, inputs, filters, kernel_size = (3,3)):

        X = Conv2D(filters = filters,
                  kernel_size = kernel_size,
                  padding = 'same',
                  kernel_initializer = "he_normal",
                  )(inputs)

        X = BatchNormalization()(X)
        X = ReLU()(X)

        return X

    def _identity_block(self, inputs, filter, layer_rep, increase_dim):

      """
      - layer_rep : number of convolutional layers
      - increase_dim : boolean representing whether the input and the output have different dim
      """

      # Save the input to add it back at the end
      X_short = inputs
      X = inputs

      for i in range(layer_rep-1):
        X = self._conv_block(X, filter)

      # After this we need to add one more Conv and Batch before adding X_short and proceed with the activation

      X = Conv2D(filters = filter,
                kernel_size = 3,
                padding = "same",
                kernel_initializer = "he_normal"#,
                )(X)

      X = BatchNormalization()(X)

      if increase_dim:

        X_short = Conv2D(filters = filter,
                kernel_size = 1,
                padding = "same",
                kernel_initializer = "he_normal"#,
                #name = f'Conv2D_{index}'
                )(X_short)

        X_short = BatchNormalization()(X_short)

      X = Add()([X, X_short])
      X = ReLU()(X)
      X = AveragePooling2D(pool_size = (2,2))(X)

      return X

    def get_model(self):

        return self.model

In [None]:
class DenseNet:

    def __init__(self, input_shape = (INPUT_SHAPE)):

        self.inputs = tf.keras.Input(shape = input_shape)
        self.model = self._build_model()

    def _build_model(self):

        X = preprocessing(self.inputs)

        X = Conv2D(64, 7, strides = 2, padding = 'same')(X)
        X = MaxPool2D(3, strides = 2, padding = 'same')(X)

        for repetition in [6,12,24,16]:

            d = self._dense_block(X, repetition)
            X = self._transition_block(d)

        X = GlobalAveragePooling2D()(d)


        X = tf.keras.layers.Dense(32, activation = 'relu')(X)
        X = BatchNormalization()(X)
        X = PReLU()(X)

        X = tf.keras.layers.Dense(16, activation = 'relu')(X)
        X = BatchNormalization()(X)
        X = PReLU()(X)

        X = tf.keras.layers.Dense(5, activation = 'softmax', kernel_initializer = "HeNormal", bias_initializer = output_bias)(X)

        return tf.keras.Model(inputs = self.inputs, outputs = X)


    def _conv_block(self, inputs, filters, kernel = 1):

        X = Conv2D(filters = filters,
               kernel_size = kernel,
               padding = 'same',
               kernel_initializer = "he_normal"
               )(inputs)

        X = BatchNormalization()(X)
        X = ReLU()(X)

        return X

    def _dense_block(self, X, repetitions, filters = 32):

        for _ in range(repetitions):

            y = self._conv_block(X, filters*4)
            y = self._conv_block(y, filters, 3)
            X = concatenate([y,X])

        return X

    def _transition_block(self, X):

        X = self._conv_block(X, K.int_shape(X)[-1] //2 )
        X = AveragePooling2D(2, strides = 2, padding = 'same')(X)

        return X

    def get_model(self):

        return self.model

# 4) Training the models

In [None]:
def scheduler(epoch, lr):
    """
    Creates schedule to obtain an exponential decay of the learning rate
    :param epoch: Int. Epoch at which we start the decay of the learning rate
    :param lr: Float Learning rate for the optimiser
    :return: Learning rate for a given epoch
    """
    if epoch < 10:
        return lr
    else:
        return float(lr * tf.math.exp(-0.05))

lr_scheduler = tf.keras.callbacks.LearningRateScheduler(scheduler)

In [None]:
def compile_and_fit(model_class, name):

  model = model_class.get_model()
  model.compile(loss = 'sparse_categorical_crossentropy',
                optimizer = 'adam',
                metrics = ['accuracy'])

  # define checkpoint callback
  checkpoint_callback = ModelCheckpoint(
    filepath = f'./model_saves/best_{name}.keras',
    monitor = 'val_loss',
    save_best_only = True,
    mode = 'min',
    verbose = 1
  )

  csv_logger = CSVLogger(f'csv_logs/{name}_training_log.csv', separator = ',', append = False)

  history = model.fit(
      train_ds,
      validation_data = val_ds,
      epochs = 50,
      class_weight = class_weights,
      callbacks = [checkpoint_callback, csv_logger, lr_scheduler]
  )

  return history



In [None]:
sequential = Sequential(input_shape = INPUT_SHAPE)
history_sequential = compile_and_fit(sequential, 'sequential')

Epoch 1/50
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 55ms/step - accuracy: 0.4446 - loss: 1.8320
Epoch 1: val_loss improved from inf to 2.04503, saving model to ./model_saves/best_sequential.keras
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 68ms/step - accuracy: 0.4462 - loss: 1.8281 - val_accuracy: 0.4585 - val_loss: 2.0450 - learning_rate: 0.0010
Epoch 2/50
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 55ms/step - accuracy: 0.6868 - loss: 1.1676
Epoch 2: val_loss improved from 2.04503 to 1.47333, saving model to ./model_saves/best_sequential.keras
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 63ms/step - accuracy: 0.6869 - loss: 1.1676 - val_accuracy: 0.5317 - val_loss: 1.4733 - learning_rate: 0.0010
Epoch 3/50
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 55ms/step - accuracy: 0.7511 - loss: 0.9716
Epoch 3: val_loss improved from 1.47333 to 0.94107, saving model to ./model_saves/best_sequ

In [None]:
resnet = ResNet(input_shape = INPUT_SHAPE)
history_resnet = compile_and_fit(resnet, 'resnet')

Epoch 1/50
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 182ms/step - accuracy: 0.4884 - loss: 1.5817
Epoch 1: val_loss improved from inf to 21.14794, saving model to ./model_saves/best_resnet.keras
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m24s[0m 210ms/step - accuracy: 0.4896 - loss: 1.5790 - val_accuracy: 0.2146 - val_loss: 21.1479 - learning_rate: 0.0010
Epoch 2/50
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 182ms/step - accuracy: 0.6740 - loss: 1.1059
Epoch 2: val_loss improved from 21.14794 to 5.11909, saving model to ./model_saves/best_resnet.keras
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 201ms/step - accuracy: 0.6741 - loss: 1.1056 - val_accuracy: 0.3512 - val_loss: 5.1191 - learning_rate: 0.0010
Epoch 3/50
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 182ms/step - accuracy: 0.7661 - loss: 0.9027
Epoch 3: val_loss improved from 5.11909 to 2.50882, saving model to ./model_saves/best_re

In [None]:
densenet = DenseNet(input_shape = INPUT_SHAPE)
history_densenet = compile_and_fit(densenet, 'densenet')

Epoch 1/50
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 192ms/step - accuracy: 0.4353 - loss: 1.6463
Epoch 1: val_loss improved from inf to 61.84329, saving model to ./model_saves/best_densenet.keras
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m104s[0m 289ms/step - accuracy: 0.4374 - loss: 1.6433 - val_accuracy: 0.1512 - val_loss: 61.8433 - learning_rate: 0.0010
Epoch 2/50
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 193ms/step - accuracy: 0.7064 - loss: 1.1307
Epoch 2: val_loss improved from 61.84329 to 8.67159, saving model to ./model_saves/best_densenet.keras
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 226ms/step - accuracy: 0.7063 - loss: 1.1307 - val_accuracy: 0.2585 - val_loss: 8.6716 - learning_rate: 0.0010
Epoch 3/50
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 192ms/step - accuracy: 0.7060 - loss: 1.0302
Epoch 3: val_loss did not improve from 8.67159
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━