<a href="https://colab.research.google.com/github/khaoula-kplr/Generative-IA/blob/main/2.VAE/vae_fashion.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# üëñ Variational Autoencoders - Fashion-MNIST Dataset

* L'objectif principal de cet atelier est de vous guider √† travers la mise en place d'un Variational Autoencoder (VAE) en utilisant l'ensemble de donn√©es Fashion MNIST.

* Commencez par l'importation des diff√©rentes librairies que vous aurez besoin √† savoir : Numpy, Matplotlib, et bien sur Tensorflow et Keras.

In [None]:
%load_ext autoreload
%autoreload 2

import numpy as np
import matplotlib.pyplot as plt

import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras import (
    layers,
    models,
    datasets,
    callbacks,
    losses,
    optimizers,
    metrics,
)

from scipy.stats import norm

from utils import display

## 0. Parameters <a name="parameters"></a>

In [None]:
# Configuration des param√®tres pour l'entra√Ænement de votre VAE.
IMAGE_SIZE = 32
BATCH_SIZE = 100
VALIDATION_SPLIT = 0.2
EMBEDDING_DIM = 2
EPOCHS = 5
BETA = 500

## 1. Prepare the data <a name="prepare"></a>

* L'ensemble de donn√©es est pr√©install√© avec TensorFlow, donc il peut √™tre t√©l√©charg√© comme suit :

In [None]:
# Load the data
(x_train, y_train), (x_test, y_test) = datasets.fashion_mnist.load_data()

* Ce sont des images en niveaux de gris de 28 √ó 28 pixels (valeurs de pixels entre 0 et 255) par d√©faut, que nous devons pr√©traiter pour nous assurer que les valeurs de pixels sont mises √† l'√©chelle entre 0 et 1.

* Nous allons √©galement ajouter un padding √† chaque image pour la redimensionner en 32 √ó 32 pixels, ce qui facilitera la manipulation de la forme du tensor lorsqu'il traverse le r√©seau.

In [None]:
# Preprocess the data


def preprocess(imgs):
    """
    Normalize and reshape the images
    """
    imgs = imgs.astype("float32") / 255.0
    imgs = np.pad(imgs, ((0, 0), (2, 2), (2, 2)), constant_values=0.0)
    imgs = np.expand_dims(imgs, -1)
    return imgs


x_train = preprocess(x_train)
x_test = preprocess(x_test)

In [None]:
# Show some items of clothing from the training set
display(x_train)

## 2. Build the variational autoencoder <a name="build"></a>

## **2.1 Explications**

### **2.1.1 Subclassing de la classe Layer**

Vous pouvez cr√©er de nouvelles couches dans Keras en sous-classant la classe Layer abstraite et en d√©finissant la m√©thode call, qui d√©crit comment un tenseur est transform√© par la couche.

Par exemple, dans l'autoencodeur variationnel, nous pouvons cr√©er une couche d'√©chantillonnage (Sampling layer) qui peut g√©rer l'√©chantillonnage de z √† partir d'une distribution normale avec des param√®tres d√©finis par z_mean et z_log_var.

Cela est utile lorsque vous souhaitez appliquer une transformation √† un tenseur qui n'est pas d√©j√† inclus parmi les types de couches Keras pr√©configur√©es.

### **2.1.2 Le tour de r√©param√©trisation**

Au lieu d'√©chantillonner directement √† partir d'une distribution normale avec les param√®tres z_mean et z_log_var, nous pouvons √©chantillonner epsilon √† partir d'une distribution normale standard, puis ajuster manuellement l'√©chantillon pour qu'il ait la moyenne et la variance correctes.

Cela est connu sous le nom de "tour de r√©param√©trisation" (reparameterization trick), et c'est important car cela signifie que les gradients peuvent se propager librement √† travers la couche. En gardant toute la composante al√©atoire de la couche contenue dans la variable epsilon, la d√©riv√©e partielle de la sortie de la couche par rapport √† son entr√©e peut √™tre montr√©e comme d√©terministe (c'est-√†-dire ind√©pendante de la valeur al√©atoire epsilon), ce qui est essentiel pour permettre la r√©tropropagation √† travers la couche.

## **2.2 Code**
* Tout d'abord, nous devons cr√©er un nouveau type de couche d'√©chantillonnage (Sampling layer) qui nous permettra d'√©chantillonner √† partir de la distribution d√©finie par z_mean et z_log_var.

Pour se faire :
* Nous cr√©ons une nouvelle couche en sous-classant la classe de base Layer de Keras.
* Nous utilisons la technique de r√©param√©trisation pour g√©n√©rer un √©chantillon √† partir de la distribution normale param√©tr√©e par z_mean et z_log_var.

In [None]:
class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = K.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

In [None]:
# Encoder
encoder_input = layers.Input(
    shape=(IMAGE_SIZE, IMAGE_SIZE, 1), name="encoder_input"
)
x = layers.Conv2D(32, (3, 3), strides=2, activation="relu", padding="same")(
    encoder_input
)
x = layers.Conv2D(64, (3, 3), strides=2, activation="relu", padding="same")(x)
x = layers.Conv2D(128, (3, 3), strides=2, activation="relu", padding="same")(x)
shape_before_flattening = K.int_shape(x)[1:]  # the decoder will need this!

x = layers.Flatten()(x)
z_mean = layers.Dense(EMBEDDING_DIM, name="z_mean")(x)
z_log_var = layers.Dense(EMBEDDING_DIM, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])

encoder = models.Model(encoder_input, [z_mean, z_log_var, z], name="encoder")
encoder.summary()

In [None]:
# Decoder
decoder_input = layers.Input(shape=(EMBEDDING_DIM,), name="decoder_input")
x = layers.Dense(np.prod(shape_before_flattening))(decoder_input)
x = layers.Reshape(shape_before_flattening)(x)
x = layers.Conv2DTranspose(
    128, (3, 3), strides=2, activation="relu", padding="same"
)(x)
x = layers.Conv2DTranspose(
    64, (3, 3), strides=2, activation="relu", padding="same"
)(x)
x = layers.Conv2DTranspose(
    32, (3, 3), strides=2, activation="relu", padding="same"
)(x)
decoder_output = layers.Conv2D(
    1,
    (3, 3),
    strides=1,
    activation="sigmoid",
    padding="same",
    name="decoder_output",
)(x)

decoder = models.Model(decoder_input, decoder_output)
decoder.summary()

*  Le code suivant montre comment nous construisons le mod√®le VAE global en tant que sous-classe de la classe abstraite Keras Model.

* Cela nous permet d'inclure le calcul du terme de divergence KL (KL divergence) de la fonction de perte dans une m√©thode de train personnalis√©e (custom train_step).

In [None]:
class VAE(models.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]
    # Cette fonction d√©crit ce que nous aimerions obtenir lorsque nous appelons le VAE sur une image d'entr√©e particuli√®re.
    def call(self, inputs):
        """Call the model on a particular input."""
        z_mean, z_log_var, z = encoder(inputs)
        reconstruction = decoder(z)
        return z_mean, z_log_var, reconstruction
    # Cette fonction d√©crit une √©tape d'entra√Ænement du VAE, y compris le calcul de la fonction de perte.

    def train_step(self, data):
        """Step run during training."""
        with tf.GradientTape() as tape:
            z_mean, z_log_var, reconstruction = self(data)
            reconstruction_loss = tf.reduce_mean(
                BETA
                * losses.binary_crossentropy(
                    data, reconstruction, axis=(1, 2, 3)
                )
            )
            kl_loss = tf.reduce_mean(
                tf.reduce_sum(
                    -0.5
                    * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)),
                    axis=1,
                )
            )
            # La perte totale est la somme de la perte de reconstruction et de la perte de divergence KL.
            total_loss = reconstruction_loss + kl_loss

        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {m.name: m.result() for m in self.metrics}

    def test_step(self, data):
        """Step run during validation."""
        if isinstance(data, tuple):
            data = data[0]

        z_mean, z_log_var, reconstruction = self(data)
        reconstruction_loss = tf.reduce_mean(
            BETA
            * losses.binary_crossentropy(data, reconstruction, axis=(1, 2, 3))
        )
        kl_loss = tf.reduce_mean(
            tf.reduce_sum(
                -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)),
                axis=1,
            )
        )
        total_loss = reconstruction_loss + kl_loss

        return {
            "loss": total_loss,
            "reconstruction_loss": reconstruction_loss,
            "kl_loss": kl_loss,
        }

In [None]:
# Create a variational autoencoder
vae = VAE(encoder, decoder)

## 3. Train the variational autoencoder <a name="train"></a>

In [None]:
# Compile the variational autoencoder
optimizer = optimizers.Adam(learning_rate=0.0005)
vae.compile(optimizer=optimizer)

In [None]:
# Create a model save checkpoint
model_checkpoint_callback = callbacks.ModelCheckpoint(
    filepath="./checkpoint",
    save_weights_only=False,
    save_freq="epoch",
    monitor="loss",
    mode="min",
    save_best_only=True,
    verbose=0,
)
tensorboard_callback = callbacks.TensorBoard(log_dir="./logs")

* Nous pouvons maintenant entra√Æner l'autoencodeur VAE en utilisant les images d'entr√©e √† la fois comme entr√©e et sortie.

In [None]:
vae.fit(
    x_train,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    shuffle=True,
    validation_data=(x_test, x_test),
    callbacks=[model_checkpoint_callback, tensorboard_callback],
)

In [None]:
# Sauvegardez les mod√®les
vae.save("./models/vae")
encoder.save("./models/encoder")
decoder.save("./models/decoder")

## 3. Reconstruct using the variational autoencoder <a name="reconstruct"></a>

* Nous pouvons tester maintenant la capacit√© de l'autoencodeur VAE √† reconstruire des images en faisant passer des images de l'ensemble de test √† travers le VAE et en comparant la sortie avec les images originales.

In [None]:
# S√©lectionner un sous-ensemble de l'ensemble de test.
n_to_predict = 5000
example_images = x_test[:n_to_predict]
example_labels = y_test[:n_to_predict]

In [None]:
# Cr√©er des pr√©dictions de l'autoencodeur et les afficher.
z_mean, z_log_var, reconstructions = vae.predict(example_images)
print("Example real clothing items")
display(example_images)
print("Reconstructions")
display(reconstructions)

## 4. Embed using the encoder <a name="encode"></a>

* Nous pouvons visualiser comment les images sont int√©gr√©es dans l'espace latent.

In [None]:
# Encoder les images d'exemple
z_mean, z_var, z = encoder.predict(example_images)

In [None]:
# Quelques exemples des int√©grations (embeddings).
print(z[:10])

In [None]:
# Afficher les points encod√©s dans un espace en 2D.
figsize = 8

plt.figure(figsize=(figsize, figsize))
plt.scatter(z[:, 0], z[:, 1], c="black", alpha=0.5, s=3)
plt.show()

## 5. Generate using the decoder <a name="decode"></a>

In [None]:
# √âchantillonner quelques points dans l'espace latent √† partir de la distribution normale standard.
grid_width, grid_height = (6, 3)
z_sample = np.random.normal(size=(grid_width * grid_height, 2))

In [None]:
# D√©coder les points √©chantillonn√©s.
reconstructions = decoder.predict(z_sample)

In [None]:
# Convertir les int√©grations d'origine et les int√©grations √©chantillonn√©es en valeurs p.
p = norm.cdf(z)
p_sample = norm.cdf(z_sample)

In [None]:
# Tracez un graphique de...
figsize = 8
plt.figure(figsize=(figsize, figsize))

# ... les int√©grations d'origine ...
plt.scatter(z[:, 0], z[:, 1], c="black", alpha=0.5, s=2)

# ... et les points nouvellement g√©n√©r√©s dans l'espace latent.
plt.scatter(z_sample[:, 0], z_sample[:, 1], c="#00B0F0", alpha=1, s=40)
plt.show()

# Ajoutez en dessous une grille des images d√©cod√©es.
fig = plt.figure(figsize=(figsize, grid_height * 2))
fig.subplots_adjust(hspace=0.4, wspace=0.4)

for i in range(grid_width * grid_height):
    ax = fig.add_subplot(grid_height, grid_width, i + 1)
    ax.axis("off")
    ax.text(
        0.5,
        -0.35,
        str(np.round(z_sample[i, :], 1)),
        fontsize=10,
        ha="center",
        transform=ax.transAxes,
    )
    ax.imshow(reconstructions[i, :, :], cmap="Greys")

## 6. Explore the latent space <a name="explore"></a>

In [None]:
# Coloriez les int√©grations en fonction de leur √©tiquette (type de v√™tement - voir tableau).
figsize = 8
fig = plt.figure(figsize=(figsize * 2, figsize))
ax = fig.add_subplot(1, 2, 1)
plot_1 = ax.scatter(
    z[:, 0], z[:, 1], cmap="rainbow", c=example_labels, alpha=0.8, s=3
)
plt.colorbar(plot_1)
ax = fig.add_subplot(1, 2, 2)
plot_2 = ax.scatter(
    p[:, 0], p[:, 1], cmap="rainbow", c=example_labels, alpha=0.8, s=3
)
plt.show()

| ID | Clothing Label |
| :- | :- |
| 0 | T-shirt/top |
| 1 | Trouser |
| 2 | Pullover |
| 3 | Dress |
| 4 | Coat |
| 5 | Sandal |
| 6 | Shirt |
| 7 | Sneaker |
| 8 | Bag |
| 9 | Ankle boot |

In [None]:
# Coloriez les int√©grations en fonction de leur √©tiquette (type de v√™tement - voir tableau).
figsize = 12
grid_size = 15
plt.figure(figsize=(figsize, figsize))
plt.scatter(
    p[:, 0], p[:, 1], cmap="rainbow", c=example_labels, alpha=0.8, s=300
)
plt.colorbar()

x = norm.ppf(np.linspace(0, 1, grid_size))
y = norm.ppf(np.linspace(1, 0, grid_size))
xv, yv = np.meshgrid(x, y)
xv = xv.flatten()
yv = yv.flatten()
grid = np.array(list(zip(xv, yv)))

reconstructions = decoder.predict(grid)
# plt.scatter(grid[:, 0], grid[:, 1], c="black", alpha=1, s=10)
plt.show()

fig = plt.figure(figsize=(figsize, figsize))
fig.subplots_adjust(hspace=0.4, wspace=0.4)
for i in range(grid_size**2):
    ax = fig.add_subplot(grid_size, grid_size, i + 1)
    ax.axis("off")
    ax.imshow(reconstructions[i, :, :], cmap="Greys")