**Avant de débuter ce TP** :

1. **Changez le type d'exécution sur Google Colab** : `Exécution > Modifiez le type d'exécution > T4 GPU`
2. **Installez les paquets ci-dessous** :

In [None]:
! pip install lightning torchmetrics torchinfo

3. Exécutez ce code pour supprimer quelques messages et avertissements éventuellement affichés.

In [None]:
import logging
logging.getLogger("lightning").setLevel(logging.ERROR)
logging.getLogger("lightning.pytorch.utilities.rank_zero").setLevel(logging.WARNING)
logging.getLogger("lightning.pytorch.accelerators.cuda").setLevel(logging.WARNING)
logger = logging.getLogger("lightning")
logger.propagate = False

import warnings
warnings.filterwarnings("ignore", ".*does not have many workers.*")

# Classification d'images par réseau de neurones convolutif

## Chargement des données

Nous allons travailler sur un jeu de données appelé [EMNIST](https://www.westernsydney.edu.au/icns/resources/reproducible_research3/publication_support_materials2/emnist), et plus spécifiquement sur un sous-jeu de données constitué d'images en nuances de gris de lettres manuscrites.
Pour ce faire, nous allons utiliser la classe [`torchvision.datasets.EMNIST`](https://pytorch.org/vision/stable/generated/torchvision.datasets.EMNIST.html) pour télécharger et charger ce jeu de données.

In [None]:
import torch
from torchvision.datasets import EMNIST
from torchvision.transforms import v2


dataset_train = EMNIST(
    root='data',
    split='letters',
    download=True,
    train=True,
    transform=v2.Compose([v2.ToImage(), v2.ToDtype(torch.float32, scale=True)]),
    target_transform=lambda x: x - 1
)

dataset_val = EMNIST(
    root='data',
    split='letters',
    download=True,
    train=False,
    transform=v2.Compose([v2.ToImage(), v2.ToDtype(torch.float32, scale=True)]),
    target_transform=lambda x: x - 1
)

### Exercice 1

**a)** Calculer la taille des jeux d'entraînement et de validation, c'est-à-dire le nombre d'images dans chacun des deux jeux.

**b)** Calculer la taille d'une image, c'est-à-dire la taille d'une image. On prendra pour acquis que toutes les observations ont la même taille (c'est bien le cas, vous n'avez pas à le vérifier).

In [None]:
# TODO

Il y a 26 classes dans ce jeu de données, correspondant aux 26 lettres de l'alphabet. Néanmoins, le jeu de données mélange les lettres majuscules et minuscules : des images pour les lettres *A* et *a* ont donc le même label.

Le code ci-dessous permet d'afficher neuf observations tirées aléatoirement dans le jeu d'entraînement.

In [None]:
import matplotlib.pyplot as plt


figure = plt.figure(figsize=(8, 8))
cols, rows = 3, 3
for i in range(1, cols * rows + 1):
    sample_idx = torch.randint(len(dataset_train), size=(1,)).item()
    img, label = dataset_train[sample_idx]
    figure.add_subplot(rows, cols, i)
    plt.title(f"{chr(65 + label)} ou {chr(97 + label)}")
    plt.axis("off")
    plt.imshow(img.squeeze().T, cmap="gray")
plt.show()

### Exercice 2

Vous avez peut-être remarqué le code suivant dans la création des jeux de données :
```
transform=v2.Compose([v2.ToImage(), v2.ToDtype(torch.float32, scale=True)],
target_transform=lambda x: x - 1
```

En lisant la documentation de la classe [torchvision.datasets.EMNIST](https://pytorch.org/vision/stable/generated/torchvision.datasets.EMNIST.html), vous verrez que :
* l'argument `transform` permet de transformer l'image brute,
* l'argument `target_transform` permet de transformer le label.

**a)** Expliquez l'utilité du code transformant les entrées. Pour ce faire, pour la première observation du jeu d'entraînement, affichez le tenseur de l'entrée brute et le tenseur de l'entrée transformée. En particulier, comparez la taille, le type de données (*dtype*) et les valeurs minimale et maximale pour ces deux tenseurs.

> **Remarque** : vous pouvez accéder aux données brutes de ce jeu de données grâce à l'attribut `data` (par exemple `dataset_train.data`)

**b)** Expliquez l'utilité du code transformant les labels.

> **Remarque** : vous pouvez accéder à la correspondance des labels bruts de ce jeu de données grâce à l'attribut `class_to_idx` (par exemple `dataset_train.class_to_idx`). Vous admettrez, sans le vérifier, qu'il n'y a aucun label manquant (`'N/A'`) dans les deux jeux de données.

In [None]:
# TODO

**Réponses** :
    
**a)** TODO

**b)** TODO

### Exercice 3

Calculez la distribution des classes sur les jeux d'entraînement et de validation. Est-ce que les classes sont équilibrées ? Est-ce que la précision (*accuracy*) est une métrique adaptée pour évaluer la performance d'un modèle ? Vous pouvez utiliser, au choix, les fonctions [numpy.unique()](https://numpy.org/doc/stable/reference/generated/numpy.unique.html), [numpy.bincount()](https://numpy.org/doc/stable/reference/generated/numpy.bincount.html) ou [torch.bincount()](https://pytorch.org/docs/stable/generated/torch.bincount.html) pour vous aider.

In [None]:
# TODO

### Exercice 4

Construisez les *dataloaders* pour les jeux d'entraînement et de validation. Vous utiliserez des lots de taille *64*. Mélangez les observations sur le jeu d'entraînement.

In [None]:
# TODO

## Un premier réseau de neurones convolutif

### Exercice 5

Le code suivant est l'ébauche de la définition d'un réseau de neurones convolutif. Répondez aux questions suivantes :

**a)** Quelles couches sont des couches de convolution ?

**b)** Pourquoi le nombre de canaux en entrée est fixé à `1` dans `self.cv1` ?

**c)** Indiquez, pour `self.cv1`, le nombre de canaux en sortie, la taille du noyau et la taille du pas.

**d)** De quel type de couche de regroupement ce réseau est-il constitué ?

**e)** Quelles sont les couches avec des paramètres entraînables ?

In [None]:
import lightning as L
from torchmetrics import Accuracy


class CNN(L.LightningModule):  # La classe hérite de la classe lightning.LightningModule
    def __init__(self):
        """Constructeur.
        
        Dans le constructeur, on exécute le constructeur de la clase mère et on définit
        toutes les couches et fonctions d'activation de notre réseau de neurones.
        """
        super().__init__()  # Toujours exécuter le constructeur de la classe mère
        
        # Initialisation des couches et fonctions d'activation
        self.cv1 = torch.nn.Conv2d(in_channels=1, out_channels=4, kernel_size=5, padding='same')
        self.relu1 = torch.nn.ReLU()
        self.pool1 = torch.nn.MaxPool2d(kernel_size=2, stride=2)
        self.flatten = torch.nn.Flatten()
        self.fc2 = torch.nn.Linear(123, 64)
        self.relu2 = torch.nn.ReLU()
        self.fc3 = torch.nn.Linear(64, 10)
        self.relu3 = torch.nn.ReLU()
        
        # Initialisation de la fonction de perte
        self.loss = torch.nn.CrossEntropyLoss()
        
        # Initialisation des métriques
        self.accuracy_train = Accuracy(task="multiclass", num_classes=26)
        self.accuracy_val = Accuracy(task="multiclass", num_classes=26)
    
    def forward(self, x):
        """Implémente la passe avant.
        
        L'argument x est un tenseur correspondant soit à l'entrée une seule
        observation soit aux entrées d'un lot d'observations.
        """
        out = self.cv1(x)
        out = self.relu1(out)
        out = self.pool1(out)
        out = self.flatten(out)
        out = self.fc2(out)
        out = self.relu2(out)
        out = self.fc3(out)
        out = self.relu3(out)
        return out
    
    def step(self, batch, dataset):
        """Effectue une étape.

        Une étape consiste à passer d'un lot d'observations (l'argument batch)
        à l'évaluation de la fonction de coût pour ce lot d'observations.

        Parameters
        ----------
        batch : tuple
            Un lot d'observations. Le premier élément du tuple est le lot
            des entrées, le second est le lot des labels.
            
        dataset : {"training", "validation"}
            Jeu de données utilisé.

        Returns
        -------
        loss : Tensor, shape = (1,)
            La fonction de coût pour ce lot d'observations.
        """
        X, y = batch  # X correspond aux images, y aux classes
        logits = self(X)  # Passe avant, qui renvoie les logits
        loss = self.loss(logits, y)  # Évaluation de la fonction de coût
        y_pred = logits.argmax(1)  # Prédictions du modèle
        
        if dataset == "training":
            metric = self.accuracy_train
            name = "train"
            bar_step = True
        else:
            metric = self.accuracy_val
            name = "val"
            bar_step = False

        acc = metric(y_pred, y)
        self.log(f"loss_{name}", loss, prog_bar=bar_step, on_step=bar_step, on_epoch=True)
        self.log(f"accuracy_{name}", acc, prog_bar=bar_step, on_step=bar_step, on_epoch=True)
        
        return loss
    
    def training_step(self, batch):
        """Effectue une étape d'entraînement."""
        return self.step(batch, "training")

    def validation_step(self, batch):
        """Effectue une étape de validation."""
        return self.step(batch, "validation")
    
    def on_train_start(self):
        """Code exécuté au début de l'entraînement."""
        string = f"Version {self.trainer.logger.version}"
        print(f"{string}\n{'=' * len(string)}\n")
    
    def on_train_epoch_end(self):
        """Code exécuté à la fin de chaque époque d'entraînement."""
        metrics = self.trainer.callback_metrics
        string = (f"""
            Époque {self.trainer.current_epoch + 1} / {self.trainer.max_epochs}
            ------------------------------------------------
            |     Jeu      | Fonction de perte | Précision |
            | ------------ | ----------------- | --------- |
            | Entraînement |{metrics['loss_train'].item():^19.5f}|{metrics['accuracy_train'].item():^11.3%}|
            |  Validation  |{metrics['loss_val'].item():^19.5f}|{metrics['accuracy_val'].item():^11.3%}|
            ------------------------------------------------
        """)
        string = '\n'.join([line.strip() for line in string.split('\n')])
        print(string)

    def configure_optimizers(self):
        """Configure l'algorithme d'optimisation à utiliser."""
        optimizer = torch.optim.Adam(self.parameters())
        return optimizer

**Réponses** :

**a)** TODO

**b)** TODO

**c)** TODO

**d)** TODO

**e)** TODO

### Exercice 6

En essayant d'afficher le résumé de l'architecture de notre modèle avec le code ci-dessous, une erreur va être levée. Identifiez le problème correspondant à cette erreur et corrigez-le. Indiquez, avec des mots, quel était le problème.

In [None]:
from torchinfo import summary


summary(CNN(), input_size=(64, 1, 28, 28))

**Réponse** : TODO

### Exercice 7

Entraînez votre modèle pendant 10 époques. Comparez la performance du modèle à celle attendue pour un modèle trivial (qui prédirait toujours la même classe ou qui prédirait de façon totalement aléatoire les classes).

In [None]:
from lightning.pytorch.callbacks import TQDMProgressBar
from lightning.pytorch.loggers import CSVLogger


model = CNN()

trainer = L.Trainer(
    max_epochs=10,
    enable_model_summary=False,  # supprimer le résumé du modèle
    logger=CSVLogger('.'),  # sauvegarder les résultats dans un fichier CSV
    num_sanity_val_steps=0,  # ne pas effectuer d'étape de validation avant l'entraînement
    callbacks=[TQDMProgressBar(refresh_rate=100)]  # mettre à jour la barre de progression tous les 100 lots
)

trainer.fit(
    model=model,
    train_dataloaders=dataloader_train,
    val_dataloaders=dataloader_val
)

**Réponse** : TODO

### Exercice 8

Viusalisez les courbes de performance du modèle en utilisant la fonction `plot_loss_accuracy` définie ci-dessous. Vous pouvez utiliser l'argument `version` pour indiquer quelle version du modèle choisir.

In [None]:
def plot_loss_accuracy(savedir='.', version=None):
    """Affiche les courbes de la fonction de perte et d'accuracy.
    
    Parameters
    ----------
    savedir : str (default = '.')
        Chemin où les résultats sont sauvegardés.
        
    version : int or None (default = None)
        Numéro de la version du modèle.
    """
    # Récupère les résultats sous la forme d'un DataFrame
    import os
    import pandas as pd
    if version is None:
        version = max([
            int(folder.split('version_')[1])
            for folder in os.listdir(os.path.join(savedir, 'lightning_logs'))
            if folder.startswith('version')
        ])
    df = pd.read_csv(os.path.join(savedir, 'lightning_logs', f'version_{version}', 'metrics.csv'))
    df['epoch'] += 1  # On commence à compter à partir de 1

    loss_train = df.dropna(subset='loss_train_epoch').set_index('epoch')['loss_train_epoch']
    loss_val = df.dropna(subset='loss_val').set_index('epoch')['loss_val']

    accuracy_train = df.dropna(subset='accuracy_train_epoch').set_index('epoch')['accuracy_train_epoch']
    accuracy_val = df.dropna(subset='accuracy_val').set_index('epoch')['accuracy_val']

    # Affiche les résultats
    plt.figure(figsize=(13, 4))

    plt.subplot(1, 2, 1)
    plt.plot(loss_train.index, loss_train.to_numpy(), 'o-', label='Entraînement');
    plt.plot(loss_val.index, loss_val.to_numpy(), 'o-', label='Validation');
    plt.xlabel('Époque')
    plt.ylabel('Fonction de perte')
    plt.legend();

    plt.subplot(1, 2, 2)
    plt.plot(accuracy_train.index, accuracy_train.to_numpy(), 'o-', label='Entraînement');
    plt.plot(accuracy_val.index, accuracy_val.to_numpy(), 'o-', label='Validation');
    plt.xlabel('Époque')
    plt.ylabel('Précision')
    plt.legend();

In [None]:
plot_loss_accuracy()

## Un deuxième réseau de neurones convolutif

### Exercice 9

Complétez la méthode `__init__()` de la classe `LeNet5` définie ci-dessous dont l'achitecture séquentielle est décrite ci-dessous, correspondant à l'architecture de [LeNet5](https://en.wikipedia.org/wiki/LeNet) :

* une couche de convolution avec $6$ canaux en sortie, un noyau de taille $5 \times 5$ et du rembourrage (*padding*) de telle que sorte l'image en sortie soit de la même taille que l'image en entrée.
* la fonction d'activation sigmoïde.
* une couche de regroupement par le maximum avec un noyau de taille $2 \times 2$ et un pas de $2$.
* une couche de convolution avec $16$ canaux en sortie, un noyau de taille $5 \times 5$ et sans rembourrage.
* la fonction d'activation sigmoïde.
* une couche de regroupement par le maximum avec un noyau de taille $2 \times 2$ et un pas de $2$.
* une couche d'aplatissement (*flatten*).
* une couche linéaire avec une entrée de taille $400$ et une sortie de taille $256$.
* la fonction d'activation sigmoïde.
* une couche linéaire avec une sortie de taille $128$.
* la fonction d'activation sigmoïde.
* une couche linéaire avec une sortie de taille $26$.

Voici les liens vers les documentations des classes correspondantes :
[torch.nn.Conv2d](https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html),
[torch.nn.Sigmoid](https://pytorch.org/docs/stable/generated/torch.nn.Sigmoid.html),
[torch.nn.MaxPool2d](https://pytorch.org/docs/stable/generated/torch.nn.MaxPool2d.html),
[torch.nn.Flatten](https://pytorch.org/docs/stable/generated/torch.nn.Flatten.html) et
[torch.nn.Linear](https://pytorch.org/docs/stable/generated/torch.nn.Linear.html).

In [None]:
class LeNet5(L.LightningModule):  # La classe hérite de la classe lightning.LightningModule
    def __init__(self):
        """Constructeur.
        
        Dans le constructeur, on exécute le constructeur de la clase mère et on définit
        toutes les couches et fonctions d'activation de notre réseau de neurones.
        """
        super().__init__()  # Toujours exécuter le constructeur de la classe mère
        
        ### BEGIN TODO ###
        # Initialisation du modèle
        #### END TODO ####
        
        # Initialisation de la fonction de perte
        self.loss = torch.nn.CrossEntropyLoss()
        
        # Initialisation des métriques
        self.accuracy_train = Accuracy(task="multiclass", num_classes=26)
        self.accuracy_val = Accuracy(task="multiclass", num_classes=26)
    
    def forward(self, x):
        """Implémente la passe avant.
        
        L'argument x est un tenseur correspondant soit à l'entrée une seule
        observation soit aux entrées d'un lot d'observations.
        """
        ### BEGIN TODO ###
        # y = 
        #### END TODO ####
        return y
    
    def step(self, batch, dataset):
        """Effectue une étape.

        Une étape consiste à passer d'un lot d'observations (l'argument batch)
        à l'évaluation de la fonction de coût pour ce lot d'observations.

        Parameters
        ----------
        batch : tuple
            Un lot d'observations. Le premier élément du tuple est le lot
            des entrées, le second est le lot des labels.
            
        dataset : {"training", "validation"}
            Jeu de données utilisé.

        Returns
        -------
        loss : Tensor, shape = (1,)
            La fonction de coût pour ce lot d'observations.
        """
        X, y = batch  # X correspond aux images, y aux classes
        logits = self(X)  # Passe avant, qui renvoie les logits
        loss = self.loss(logits, y)  # Évaluation de la fonction de coût
        y_pred = logits.argmax(1)  # Prédictions du modèle
        
        if dataset == "training":
            metric = self.accuracy_train
            name = "train"
            bar_step = True
        else:
            metric = self.accuracy_val
            name = "val"
            bar_step = False

        acc = metric(y_pred, y)
        self.log(f"loss_{name}", loss, prog_bar=bar_step, on_step=bar_step, on_epoch=True)
        self.log(f"accuracy_{name}", acc, prog_bar=bar_step, on_step=bar_step, on_epoch=True)
        
        return loss
    
    def training_step(self, batch):
        """Effectue une étape d'entraînement."""
        return self.step(batch, "training")

    def validation_step(self, batch):
        """Effectue une étape de validation."""
        return self.step(batch, "validation")
    
    def on_train_start(self):
        """Code exécuté au début de l'entraînement."""
        string = f"Version {self.trainer.logger.version}"
        print(f"{string}\n{'=' * len(string)}\n")
    
    def on_train_epoch_end(self):
        """Code exécuté à la fin de chaque époque d'entraînement."""
        metrics = self.trainer.callback_metrics
        string = (f"""
            Époque {self.trainer.current_epoch + 1} / {self.trainer.max_epochs}
            ------------------------------------------------
            |     Jeu      | Fonction de perte | Précision |
            | ------------ | ----------------- | --------- |
            | Entraînement |{metrics['loss_train'].item():^19.5f}|{metrics['accuracy_train'].item():^11.3%}|
            |  Validation  |{metrics['loss_val'].item():^19.5f}|{metrics['accuracy_val'].item():^11.3%}|
            ------------------------------------------------
        """)
        string = '\n'.join([line.strip() for line in string.split('\n')])
        print(string)

    def configure_optimizers(self):
        """Configure l'algorithme d'optimisation à utiliser."""
        optimizer = torch.optim.Adam(self.parameters())
        return optimizer

### Exercice 9

Affichez un résumé de cette architecture. Combien de paramètres entraînables a cette architecture ?

In [None]:
# TODO

**Réponse** : TODO

### Exercice 10

Entraînez votre nouveau réseau de neurones pendant 10 époques. Comparez sa performance à celle du modèle précédent.

In [None]:
model_lenet = LeNet5()

trainer_lenet = L.Trainer(
    max_epochs=10,
    enable_model_summary=False,  # supprimer le résumé du modèle
    logger=CSVLogger('.'),  # sauvegarder les résultats dans un fichier CSV
    num_sanity_val_steps=0,  # ne pas effectuer d'étape de validation avant l'entraînement
    callbacks=[TQDMProgressBar(refresh_rate=100)]  # mettre à jour la barre de progression tous les 100 lots
)

trainer_lenet.fit(
    model=model_lenet,
    train_dataloaders=dataloader_train,
    val_dataloaders=dataloader_val
)

In [None]:
plot_loss_accuracy()

**Réponse** : TODO