Ce notebook sert de base pour l'entraînement du réseau avec transfer learning.

# Training and testing

In [None]:
import tensorflow as tf
# import tensorflow_addons as tfa 
# Peut servir pour utiliser des variations d'Adam ou pour faire plus de data augmentation
import pathlib
import numpy as np
import matplotlib.pyplot as plt
import os

In [None]:
# Nombre de GPU détectés
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

## Move posters to use the data loader

In [None]:
import pandas as pd

source = pathlib.Path('../data/posters')
movefiles = pd.read_csv('../data/movefiles.csv')

for cat in movefiles.category.unique():
    p = source/cat
    p.mkdir()
    for genre in movefiles.genre.unique():
        q = p/genre
        q.mkdir()

p = source/'autres'
p.mkdir()

for i, row in movefiles.iterrows():
    s = source/row['name']
    if s.exists():
        s.replace(source/row['category']/row['genre']/row['name'])

for autre in source.glob('*.jpg'):
    autre.replace(source/'autres'/autre.name)

## Data loader and data augmentation

In [None]:
data_dir = pathlib.Path('../data/posters')

AUTOTUNE = tf.data.experimental.AUTOTUNE

BATCH_SIZE = 32
IMG_HEIGHT = 256
IMG_WIDTH = 256

NUMBER_TRAIN_SAMPLES = len(list((data_dir/'train').glob('*/*')))
NUMBER_VAL_SAMPLES = len(list((data_dir/'val').glob('*/*')))
NUMBER_TEST_SAMPLES = len(list((data_dir/'test').glob('*/*')))

STEP_SIZE_TRAIN = int(np.ceil(NUMBER_TRAIN_SAMPLES/BATCH_SIZE))
STEP_SIZE_VAL = int(np.ceil(NUMBER_VAL_SAMPLES/BATCH_SIZE))
STEP_SIZE_TEST = int(np.ceil(NUMBER_TEST_SAMPLES/BATCH_SIZE))

CLASS_NAMES = np.array(['Action', 'Animation', 'Comédie', 'Comédie-dramatique', 'Documentaire', 'Drame', 'Thriller-Policier'])

In [None]:
# Le code pour le data loader est adapté de: https://www.tensorflow.org/tutorials/load_data/images

def get_label(file_path):
    # convert the path to a list of path components
    parts = tf.strings.split(file_path, os.path.sep)
    # The second to last is the class-directory
    return parts[-2] == CLASS_NAMES

def decode_img(img):
    # convert the compressed string to a 3D uint8 tensor
    img = tf.image.decode_jpeg(img, channels=3)
    return tf.image.resize(img, [IMG_WIDTH, IMG_HEIGHT], method='lanczos3')

def process_path(file_path):
    label = get_label(file_path)
    # load the raw data from the file as a string
    img = tf.io.read_file(file_path)
    img = decode_img(img)
    return img, label

In [None]:
from classification_models.tfkeras import Classifiers

# ResNet18 pas disponible dans keras
ResNet18, preprocess_input = Classifiers.get('resnet18')

def augment(image,label):
    image = tf.image.random_flip_left_right(image)
#     image = tf.image.random_flip_up_down(image)
    image = tf.image.random_hue(image, 0.1)
    image = tf.image.random_saturation(image, 0.5, 2)
    image = tf.image.random_brightness(image, 0.3)
    image = tf.image.random_contrast(image, 0.2, 1.8)
#     image = tf.image.rot90(image, tf.random.uniform(shape=[], minval=0, maxval=4, dtype=tf.int32))
    padding = 50
    image = tf.image.resize_with_crop_or_pad(image, IMG_WIDTH+padding, IMG_HEIGHT+padding)
    image = tf.image.random_crop(image, size=[IMG_WIDTH, IMG_HEIGHT, 3])
    return image, label


def prepare_data(ds, phase, shuffle_buffer_size=1000):
    ds = ds.shuffle(buffer_size=shuffle_buffer_size)
    ds = ds.repeat()
    if phase == 'train':
        ds = ds.map(augment, num_parallel_calls=AUTOTUNE)
    # Important de faire le preprocessing nécessaire pour ResNet18, et de le faire à ce moment là après augmentation
    ds.map(lambda img, l : (preprocess_input(img), l), num_parallel_calls=AUTOTUNE)
    ds = ds.batch(BATCH_SIZE)
    # `prefetch` lets the dataset fetch batches in the background while the model
    # is training.
    ds = ds.prefetch(buffer_size=AUTOTUNE)       
    return ds

In [None]:
list_ds = {x : tf.data.Dataset.list_files(str(data_dir/x/'*/*')) for x in ['train', 'val', 'test']}
labeled_ds = {x : list_ds[x].map(process_path, num_parallel_calls=AUTOTUNE) for x in ['train', 'val', 'test']}
dataset = {x: prepare_data(labeled_ds[x], x) for x in ['train', 'val', 'test']}

## Build the model

In [None]:
with strategy.scope():
    base_model = ResNet18((IMG_WIDTH, IMG_HEIGHT, 3), include_top=False, weights='imagenet')
    base_model.trainable = False
    global_average_layer = tf.keras.layers.GlobalAveragePooling2D()
    prediction_layer = tf.keras.layers.Dense(7, activation="softmax")

    model = tf.keras.Sequential([
      base_model,
      global_average_layer,
      prediction_layer
    ])

In [None]:
# Adapté de https://www.tensorflow.org/tutorials/distribute/keras

# Define the checkpoint directory to store the checkpoints
checkpoint_dir = '../data/checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

# Function for decaying the learning rate.
# You can define any decay function you need.
def decay(epoch):
    if epoch <= 15:
        return 1e-3
    elif epoch > 15 and epoch <= 30:
        return 1e-5
    elif epoch > 30 and epoch <= 60:
        return 1e-6
    elif epoch > 60 and epoch <= 90:
        return 1e-7
    else:
        return 1e-8

# Callback for printing the LR at the end of each epoch.
class PrintLR(tf.keras.callbacks.Callback):
      def on_epoch_end(self, epoch, logs=None):
            print('\nLearning rate for epoch {} is {}'.format(epoch + 1,
                                                      model.optimizer.lr.numpy()))

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='./logs'),
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                       save_weights_only=True),
    tf.keras.callbacks.LearningRateScheduler(decay),
    PrintLR()
]

In [None]:
model.summary()

In [None]:
model.compile(optimizer=tf.optimizers.Adam(),
          loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
          metrics=['accuracy'])

In [None]:
# Entraînement initial, uniquement la dernière couche

initial_epochs = 30
history = model.fit(dataset['train'],
                    epochs=initial_epochs,
                    validation_data=dataset['val'],
                    steps_per_epoch=STEP_SIZE_TRAIN,
                    validation_steps=STEP_SIZE_VAL,
                    callbacks=callbacks,
                    )

In [None]:
loss0,accuracy0 = model.evaluate(dataset['test'], steps=STEP_SIZE_TEST)
print(accuracy0)

In [None]:
model.layers[0].trainable = True # Dégèle tout le reste du réseau en une fois

model.compile(optimizer=tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

total_epochs =  130

In [None]:
model.summary()

In [None]:
# Fine-tuning: ajuster les learning rates dans la fonction decay utilisée dans callbacks

history2 = model.fit(dataset['train'],
                    epochs=total_epochs,
                    initial_epoch=history.epoch[-1],
                    validation_data=dataset['val'],
                    steps_per_epoch=STEP_SIZE_TRAIN,
                    validation_steps=STEP_SIZE_VAL,
                    callbacks=callbacks,
                    )

In [None]:
loss1, accuracy1 = model.evaluate(dataset['test'], steps=STEP_SIZE_TEST)
print(accuracy1)

# Results

In [None]:
acc = history.history['accuracy'] + history2.history['accuracy'] 
val_acc = history.history['val_accuracy'] + history2.history['val_accuracy'] 

loss = history.history['loss'] + history2.history['loss'] 
val_loss = history.history['val_loss'] + history2.history['val_loss'] 

plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.plot([initial_epochs-1,initial_epochs-1],
          plt.ylim(), label='Start Fine Tuning')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(2, 1, 2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.plot([initial_epochs-1,initial_epochs-1],
         plt.ylim(), label='Start Fine Tuning')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.xlabel('epoch')
plt.show()

In [None]:
# model = tf.keras.models.load_model('../data/final_model.h5')

In [None]:
def show_batch(image_batch, label_batch):
    plt.figure(figsize=(10,10))
    for n in range(25):
        ax = plt.subplot(5,5,n+1)
        plt.imshow(image_batch[n]/255)
        plt.title(CLASS_NAMES[label_batch[n].numpy()==1][0].title())
        plt.axis('off')

In [None]:
# Sûrement des warnings liés au mauvais intervalle de valeurs -> à cause du preprocessing pour ResNet18

image_batch, label_batch = next(iter(dataset['train']))
show_batch(image_batch, label_batch)

In [None]:
def plot_image(img, ground_truth_array, class_names, predictions_array, in_test=True, distance=0):
    plt.grid(False)
    plt.xticks([])
    plt.yticks([])

    plt.imshow(img/255, cmap=plt.cm.binary)

    true_label = np.argmax(ground_truth_array)
    predicted_label = np.argmax(predictions_array)
    if in_test:
        if predicted_label == true_label:
            color = 'blue'
        else:
            color = 'red'
        plt.xlabel("Préd:{} {:2.0f}% \n({})".format(class_names[predicted_label],
            100*np.max(predictions_array),
            'L:'+class_names[true_label]),
            color=color)
    else:
        if predicted_label == true_label:
            color = 'blue'
        else:
            color = 'black'
        plt.xlabel("{} \n Distance:{}".format(class_names[true_label], distance), color=color)

def plot_value_array(ground_truth_array, predictions_array):
    plt.grid(False)
    plt.xticks(range(len(predictions_array)))
    plt.yticks([])

    thisplot = plt.bar(range(len(predictions_array)), predictions_array, color="#777777")
    plt.ylim([0, 1])
    thisplot[np.argmax(predictions_array)].set_color('red')
    thisplot[np.argmax(ground_truth_array)].set_color('blue')


def plot_batch_results(batch, predictions, name=None):

    num_images = len(batch[1])
    num_cols, num_rows = 4, 1+num_images//4
    plt.figure(figsize=(2 * 2 * num_cols, 2 * num_rows))
    for i in range(num_images):
        plt.subplot(num_rows, 2 * num_cols, 2*i+1)
        plot_image(batch[0][i], batch[1][i], CLASS_NAMES, predictions[i])

        plt.subplot(num_rows, 2 * num_cols, 2*i+2)
        plot_value_array(batch[1][i], predictions[i])

    plt.tight_layout()
    if name is not None:
        plt.savefig(name)

In [None]:
batch = next(iter(dataset['test']))
prediction = model.predict(batch)
plot_batch_results(batch, prediction)

In [None]:
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix

labels = []
predictions = []
k = 0

for batch in iter(dataset['test']):
    proba = model.predict(batch)
    predictions += list(np.argmax(proba, axis=1))
    labels += list(np.argmax(batch[1].numpy(), axis=1))
    k += 1
    if k == STEP_SIZE_TEST+1:
        break

        
conf_matrix = confusion_matrix(labels, predictions, normalize='true')
disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=CLASS_NAMES)

disp.plot(cmap=plt.cm.coolwarm_r, xticks_rotation='vertical')
plt.title('Matrice de confusion')

# Keras preprocessing : not used

In [None]:
from classification_models.tfkeras import Classifiers

ResNet18, preprocess_input = Classifiers.get('resnet18')

image_generator ={
    'train': tf.keras.preprocessing.image.ImageDataGenerator(
                                                    rotation_range=20,
                                                    width_shift_range=.15,
                                                    height_shift_range=.15,
                                                    horizontal_flip=True,
#                                                     vertical_flip=True,
                                                    brightness_range=(0.8, 1.2),
                                                    zoom_range=0.2,
                                                    shear_range=0.1,
                                                    preprocessing_function=preprocess_input
                                                    ),
    'val': tf.keras.preprocessing.image.ImageDataGenerator(
                                            preprocessing_function=preprocess_input
                                            ),
    'test': tf.keras.preprocessing.image.ImageDataGenerator(
                                            preprocessing_function=preprocess_input
                                            ),
}

dataset = {x: image_generator[x].flow_from_directory(directory=str(data_dir/x),
                                                     batch_size=BATCH_SIZE,
                                                     shuffle=True,
                                                     target_size=(IMG_HEIGHT, IMG_WIDTH),
                                                     interpolation='bilinear')
           for x in ['train', 'val', 'test']
}

In [None]:
def plotImages(images_arr):
    fig, axes = plt.subplots(1, 5, figsize=(20,20))
    axes = axes.flatten()
    for img, ax in zip( images_arr, axes):
        ax.imshow(img/255)
        ax.axis('off')
    plt.tight_layout()
    plt.show()

In [None]:
augmented_images = [dataset['train'][0][0][0] for i in range(5)]
plotImages(augmented_images)