# Livrable 2 - Image processing

## The subject
The goal is to process a set of photographs by denoising them in order to make them better processable by Machine Learning algorithms. In this Jupyter notebook we will explain the pre-processing steps. The algorithms will rely on convolutional auto-encoders, and apply them to improve the image quality.

## Import

### Disable Tensorflow's warnings

In [None]:
import os

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
RUN_DIR = 'tf/'

In [None]:
import datetime
import tensorflow as tf
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from urllib.request import urlopen
from io import BytesIO
from zipfile import ZipFile

## Global Variables

In [None]:
BATCH_SIZE: int = 16
IMG_HEIGHT: int = 228
IMG_WIDTH: int = 228
EPOCHS: int = 40
ZIP_PATH: str = 'https://raw.githubusercontent.com/Stan-fld/auto_encoder_data/main/data_ae.zip'
DATASET_PATH: str = RUN_DIR + 'data_ae'

## Load dataset

### Import dataset from github

In [None]:
http_response = urlopen(ZIP_PATH)
zipfile = ZipFile(BytesIO(http_response.read()))
zipfile.extractall(path=f'{RUN_DIR}data_ae')

### Normal datasets

In [None]:
# Train
train_data = tf.keras.utils.image_dataset_from_directory(f'{RUN_DIR}data_ae/training/',
                                                         image_size=(IMG_HEIGHT, IMG_WIDTH),
                                                         batch_size=BATCH_SIZE)

# Train
val_data = tf.keras.utils.image_dataset_from_directory(f'{RUN_DIR}data_ae/validation/',
                                                       image_size=(IMG_HEIGHT, IMG_WIDTH),
                                                       batch_size=BATCH_SIZE)

# Transforming BatchDataset into array 
train_data = np.concatenate(list(train_data.map(lambda x, y: x)))
val_data = np.concatenate(list(val_data.map(lambda x, y: x)))
train_data = train_data.astype('float32') / 255.
val_data = val_data.astype('float32') / 255.

### Noisy datasets

In [None]:
y_train = train_data

y_val = val_data

noise_factor = 0.2
x_train = train_data + noise_factor * np.random.normal(size=train_data.shape)  #A COMPLETER
x_val = val_data + noise_factor * np.random.normal(size=val_data.shape)  #A COMPLETER

x_train = np.clip(x_train, 0., 1.)
x_val = np.clip(x_val, 0., 1.)

## Implementation of functions to display the images.

In [None]:
def display_single_image(img):
    plt.figure(figsize=(4, 4))
    plt.imshow(img)
    plt.axis("off")


def display_image(x, n):
    plt.figure(figsize=(20, 5))
    for i in range(n):
        ax = plt.subplot(1, n, i + 1)
        plt.imshow(np.array(x[i]), vmax=1)
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
    plt.show()

In [None]:
#Print an image of both datasets for testing
display_single_image(y_train[0])
display_single_image(x_train[0])

In [None]:
print("Training set")
display_image(y_train, 5)
print("Noisy training set")
display_image(x_train, 5)

## Encoder

In [None]:
inputs = tf.keras.Input(shape=(IMG_HEIGHT, IMG_WIDTH, 3))

encoder = tf.keras.layers.Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
encoder = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(encoder)
encoder = tf.keras.layers.Conv2D(32, (3, 3), activation='relu', padding='same')(encoder)
encoder = tf.keras.layers.MaxPooling2D((2, 2), padding='same')(encoder)

## Decoder

In [None]:
decoder = tf.keras.layers.Conv2DTranspose(filters=32, kernel_size=(3, 3), activation='relu', padding='same')(encoder)
decoder = tf.keras.layers.UpSampling2D((2, 2))(decoder)
decoder = tf.keras.layers.Conv2DTranspose(filters=64, kernel_size=(3, 3), activation='relu', padding='same')(decoder)
decoder = tf.keras.layers.UpSampling2D((2, 2))(decoder)

decoder = tf.keras.layers.Conv2DTranspose(filters=3, kernel_size=(3, 3), activation='sigmoid', padding='same')(decoder)

## Auto encoder

Plusieurs choix sont possibles pour la fonction de coût de l'autoencoder. Dans notre cas nous voulons débruiter les images nous avons choisi d'implémenter une fonction de coût calculant la différence entre les images ici le `DSSIM` afin d'augmenter les performances du débruitage. L'optimizer adam est un des optimiseurs les plus utilisés. Il permet une convergence rapide, ce qui réduit le temps d'entrainement. Nous pouvons notamment changer le learning_rate pour influer sur sa vitesse de convergence. Nous n'avons pas sélectionner de metrics pour comparer nos images. Seulement le score `DSSIM` qui est notre fonction loss.

In [None]:
# Loss function
def ssim_accuracy(y_true, y_pred):
    return tf.image.ssim(y_true, y_pred, 1.0)


optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss = tf.keras.losses.MeanSquaredError()

auto_encoder = tf.keras.Model(inputs, decoder, name="auto_encoder")
auto_encoder.compile(optimizer=optimizer, loss=loss, metrics=[ssim_accuracy])

## Save the best model

In [None]:
filename = RUN_DIR + "models/best_model.h5"
callback_best_model = tf.keras.callbacks.ModelCheckpoint(filepath=filename, verbose=0, save_best_only=True)

## Train the model

In [None]:
# auto_encoder.summary()

with tf.device('/GPU:0'):
    history = auto_encoder.fit(x_train, y_train,
                               batch_size=BATCH_SIZE,
                               epochs=EPOCHS,
                               verbose=1,
                               shuffle=True,
                               validation_data=(x_val, y_val),
                               callbacks=[callback_best_model]
                               )

## Loss curve

In [None]:
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs_range = range(EPOCHS)
plt.figure(figsize=(16, 8))
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()

## Output

In [None]:
decoded_images = auto_encoder.predict(x_val)
display_image(x_val, n=5)
display_image(decoded_images, n=5)

In [None]:
date: datetime = datetime.datetime.now().strftime("%m.%d.%Y_%H:%M:%S")
model_dir: str = f"models/autoenc_{date}"
auto_encoder.save(model_dir)
f = open(f"{model_dir}/model_summary.txt", "a")
auto_encoder.summary(print_fn=lambda x: f.write(x + '\n'))
f.close()

f = open(f"{model_dir}/model_history.csv", "a")
f.write(pd.DataFrame.from_dict(history.history).to_csv(index=False))
f.close()