# 3. Clasificación de imágenes con Redes Neuronales

En este notebook, veremos como llevar a cabo un proyecto de clasificación de imágenes usando redes neuronales. Para ello, haremos un ejemplo de imágenes de satélite. Al final del notebook, sabrás qué arquitectura se usan para clasificación de imágenes, y cómo diseñarla y entrenarla desde 0 o coger una ya preentrenada. 

## Descargamos el dataset!

In [None]:
import torch
torch.manual_seed(123)

from torchvision import datasets

data_path = '../EuroSAT/'
eurosat = datasets.EuroSAT(data_path, download=True)

Tenemos muchos datasets que podemos descargar directamente desde Pytorch. Incluye reconocimiento de objetos, dígitos, plantas, cáncer, etc. Podemos ver la documentación en: https://pytorch.org/vision/stable/datasets.html
y abajo desplegamos la lista de datasets:

In [None]:
dir(datasets) # En Pytorch tenemos para probar multitud de datasets

Vemos un ejemplo de las 10 clases que componen este dataset.

In [None]:
import matplotlib.pyplot as plt

class_names = eurosat.classes

fig = plt.figure(figsize=(16,6))
num_classes = 10
for i in range(num_classes):
    ax = fig.add_subplot(2, 5, 1 + i, xticks=[], yticks=[])
    ax.set_title(class_names[i])
    img = next(img for img, label in eurosat if label == i)
    plt.imshow(img)
plt.show()



In [None]:
eurosat

In [None]:
type(eurosat).__mro__

Vemos que el dataset EUROSAT es una subclase de *torch.utils.data.dataset.Dataset*. Pytorch usa esta clase para trabajar con datasets. Se caracteriza por tener dos métodos: *\_\_len\_\_* y *\_\_getitem\_\_*.

- *\_\_len\_\_* nos dice el tamaño del dataset.
- *\_\_getitem\_\_* nos permite indexar elementos del dataset.

In [None]:
len(eurosat)

In [None]:
img, label = eurosat[100]
print(img, label, class_names[label])

plt.imshow(img)
plt.show()

Los objetos del dataset son imágenes PIL. Para poder trabajar con nuestras redes neuronales en Pytorch necesitamos convertirlas en tensores! Esto se consigue con la función *torchvision.transforms.ToTensor()*.

In [None]:
from torchvision import transforms

to_tensor = transforms.ToTensor()
img_t = to_tensor(img)
img_t.shape

In [None]:
tensor_eurosat = datasets.EuroSAT(data_path, download=False,
                         transform=transforms.ToTensor())

In [None]:
plt.imshow(img_t.permute(1, 2, 0))  # <1>
plt.show()

### Dividimos en train-test y normalizamos!

En un proyecto de machine learning, solemos dividir los datos en train, validación y test. De tal modo, que aprendemos los patrones con el conjunto de train y vemos como de bueno es el modelo con un conjunto de validación que no ha sido usado para entrenar. Por último se le pasa un test independiente para ver su capacidad de generalización y posterior aplicabilidad. En este caso, dividiremos nuestros datos en train-test. Nos quedamos con un 80% de los datos para entrenar.

In [None]:
import random

indices = list(range(len(eurosat)))
random.seed(310) 
random.shuffle(indices)

train_size = int(0.8 * len(eurosat))
train_dataset_split = torch.utils.data.Subset(tensor_eurosat, indices[:train_size])

imgs = torch.stack([img_t for img_t, _ in train_dataset_split], dim=3)
imgs.shape

Es importante normalizar los datos, para ellos le quitamos la media a cada canal y dividimos por la desviación típica. Esto lo haremos con la función *transforms.Normalize*.

In [None]:
imgs.view(3, -1).mean(dim=1)

In [None]:
imgs.view(3, -1).std(dim=1)

In [None]:
transformed_eurosat = datasets.EuroSAT(
    data_path,
    transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.3452, 0.3810, 0.4083),
                             (0.2036, 0.1367, 0.1150))
    ]))

eurosat_train = torch.utils.data.Subset(transformed_eurosat, indices[:train_size])
eurosat_val = torch.utils.data.Subset(transformed_eurosat, indices[train_size:])

In [None]:
img_t, _ = eurosat_train[120]

plt.imshow(img_t.permute(1, 2, 0))
plt.show()

Podemos recuperar la imagen original invirtiendo la normalización previamente usada.

In [None]:
invTrans = transforms.Compose([ transforms.Normalize(mean = [ 0., 0., 0. ],
                                                     std = [ 1/0.2036, 1/0.1367, 1/0.1150 ]),
                                transforms.Normalize(mean = [ -0.2036, -0.1367, -0.1150 ],
                                                     std = [ 1., 1., 1. ]),
                               ])

inv_tensor = invTrans(img_t)

plt.imshow(inv_tensor.permute(1, 2, 0))
plt.show()

## Mi primera red convolucional: LeNet (1989)

La primera red convolucional que vamos a implementar se llama LeNet. Fue de las primeras en existir y es muy simple. Podemos definirla de forma sencilla en pocas líneas.

Se compone de capas convolucionales con activaciones de tangente hiperbólica seguidas de average pooling. Por último tiene un clasificador no lineal. Podemos ver la primera parte de la red neuronal con convoluciones como el extractor de características de la imagen. Las convoluciones nos extraen información local de cada píxel. Con el pooling logramos reducir la dimensión de la imagen siendo más tratable. Las primeras capas almacenan información más local de la imagen mientras que las más profundas tienen características más globales, esto es debido a la composición de distintas funciones no lineales que agrupa información de distintos puntos de la imagen. Luego el clasificador trata de predecir las probabilidades asociadas a cada clase. En este caso al tener más de dos clases se usa una capa softmax.

* Pooling: Operación mediante la cual se reduce el tamaño de los datos. Se suele reemplazar cada cuadrícula de cierto tamaño, por el máximo o la media de ella. Por ejemplo, en Lenet, cada cuadrícula de 2x2 es reemplazada por la media de los 4 números.

* Convolución: Es una operación mediante la cual se aplica una operación lineal a cada píxel, donde los pesos solo influyen en un vecindario (es muy poco práctico aplicar una operación lineal que dependa de TODOS los píxeles!, sobreajustaría). En este caso, consigue aprender información local de la imagen como aristas, esquinas, etc.

* SoftMax: Esta capa convierte un vector de números reales en números entre 0 y 1. Podemos entender que esta capa nos devuelve una distribución de probabilidad para las clases. 

$$ SoftMax(\mathbf{x})_i = \frac{e^{\mathbf{x_i}}}{\sum^D_{j=1}e^{\mathbf{x_j}}}$$

*torch.nn* contiene capas que nos serán útiles para las redes neuronales. Además *torch.nn.F* contiene funciones que son propias también de las redes neuronales y son usadas en estas capas.

Las redes neuronales heredan de la clase *nn.Module* y en el *\__init__* podemos definir las distintas capas que usaremos. Es muy importante el método forward. Ahí definimos cómo será el flow de la red, es decir, qué capas y en que orden se aplicarán sobre los datos. Cuando apliquemos la red sobre unos datos se ejecutará este método. En este caso devolvemos logits sin normalizar y probabilidades.

In [None]:
import torch.nn.functional as F
import torch.nn as nn

class LeNet5(nn.Module):

    def __init__(self, n_classes):
        super(LeNet5, self).__init__()
        
        self.feature_extractor = nn.Sequential(            
            nn.Conv2d(in_channels=3, out_channels=6, kernel_size=5, stride=1),
            nn.Tanh(),
            nn.AvgPool2d(kernel_size=2),
            nn.Conv2d(in_channels=6, out_channels=36, kernel_size=5, stride=1),
            nn.Tanh(),
            nn.AvgPool2d(kernel_size=2),
            nn.Conv2d(in_channels=36, out_channels=64, kernel_size=5, stride=1),
            nn.Tanh(),
            nn.AvgPool2d(kernel_size=2)
        )

        self.classifier = nn.Sequential(
            nn.Linear(in_features=1024, out_features=128),
            nn.Tanh(),
            nn.Linear(in_features=128, out_features=n_classes),
        )


    def forward(self, x):
        x = self.feature_extractor(x) # Se extraen los rasgos automáticamente
        x = torch.flatten(x, 1) # Ponemos el volumen como un vector
        logits = self.classifier(x) # Se usa una red neuronal para clasificar los rasgos
        probs = F.softmax(logits, dim=1) # Calculamos la probabilidad
        return logits, probs

En Pytorch tenemos que disponer los datos en la clase Dataset, en este caso los datos de torch vision vienen ya directamente en este formato. Para poder trabajar con ellos en batches debemos cargarlos en un DataLoader.

In [None]:
train_loader = torch.utils.data.DataLoader(eurosat_train, batch_size=64,
                                           shuffle=True)
val_loader = torch.utils.data.DataLoader(eurosat_val, batch_size=64,
                                         shuffle=False)

Definimos el modelo y definimos el optimizador, la pérdida el bucle de entrenamiento, ... tal y como habíamos visto en el notebook anterior. Qué simplicidad!! Pytorch te amo!!

En este caso no podemos usar la misma función de pérdida que en regresión. La función de pérdida depende de cada problema. En clasificación se suele utilizar la entropía cruzada. Esta función se define como:

$$CE(y, y') = \sum_{c=1}^C y_{o,c} log(y'_{o,c})$$

Siendo $y_{o, c}$ los datos que tienen como clase real la clase $c$ y  $log(y'_{o,c})$ la probabilidad correspondiente asignada por la red neuronal. Esta pérdida busca separar las clases lo máximo posible, queriendo que la probabilidad se lo más alta en la clase auténtica y lo más baja en las clase que no lo son. De hecho penaliza bastante (por el logaritmo) los fallos estrepitosos, considerando también el suavizado de probabilidades en casos difíciles.

Pytorch espera que la entrada de la función de pérdida sea las puntuaciones antes de la capa softmax. Podemos aplicar esta capa para predecir posteriormente y tener interpretabilidad del modelo.

In [None]:
lenet5_model = LeNet5(n_classes=10)  #  <2>
optimizer = torch.optim.SGD(lenet5_model.parameters(), lr=1e-2)  #  <3>
loss_fn = nn.CrossEntropyLoss()  #  <4>

In [None]:
device = (torch.device('cuda') if torch.cuda.is_available()
          else torch.device('cpu'))
print(f"Training on device {device}.")

In [None]:
import datetime

def training_loop(n_epochs, optimizer, model, loss_fn, train_loader):
    model = model.to(device=device)
    for epoch in range(1, n_epochs + 1):
        loss_train = 0.0
        model.train() # Modo train
        
        for imgs, labels in train_loader:
            imgs = imgs.to(device=device)  # <1>
            labels = labels.to(device=device)
            logits, _ = model(imgs)
            loss = loss_fn(logits, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            loss_train += loss.item()

        if epoch == 1 or epoch % 10 == 0:
            print('{} Epoch {}, Training loss {}'.format(
                datetime.datetime.now(), epoch,
                loss_train / len(train_loader)))
            validate(model, train_loader, val_loader, device)
    return model

In [None]:
def validate(model, train_loader, val_loader, device):
    for name, loader in [("train", train_loader), ("val", val_loader)]:
        correct = 0
        total = 0
        
        model.eval() # Ponemos nuestrom modelo en modo evaluación

        with torch.no_grad():  # <1>
            for imgs, labels in loader:
                imgs = imgs.to(device=device)
                labels = labels.to(device=device)
                outputs, _ = model(imgs)
                _, predicted = torch.max(outputs, dim=1) # <2>
                total += labels.shape[0]  # <3>
                correct += int((predicted == labels).sum())  # <4>

        print("Accuracy in {}: {:.2f}".format(name , correct / total))

In [None]:
lenet_model_trained = training_loop(
    n_epochs = 100,
    optimizer = optimizer,
    model = lenet5_model,
    loss_fn = loss_fn,
    train_loader = train_loader,
)

Cada vez que usemos nuestra red no vamos a volverla a entrenar!! Por ello podemos guardar los pesos para cargarlos posteriormente.

In [None]:
import os
model_path = "../models/"
if not os.path.exists(model_path):
    os.mkdir(model_path)

In [None]:
torch.save(lenet_model_trained.state_dict(), model_path + 'lenet5.pt')

In [None]:
loaded_model = LeNet5(n_classes=10)  
loaded_model.load_state_dict(torch.load(model_path
                                        + 'lenet5.pt'))

**Ejercicio 1:** Prueba a modificar la red: hacerla más profunda, más ancha, un clasificador más complejo,... ¿cómo altera esto el proceso de entrenamiento?

## Pretrained model

Una cosa muy interesante de Pytorch es que tenemos modelos preentrenados muy potentes que solo tendremos que reentrenar en nuestros datos. En este ejemplo, usamos la efficientnet.

https://pytorch.org/vision/stable/models.html#semantic-segmentation

In [None]:
import torchvision
efficientnet = torchvision.models.efficientnet_b0(weights="IMAGENET1K_V1")
print(efficientnet)

Tenemos que modificar el clasificador para que se adapte a nuestro número de clases.

In [None]:
efficientnet.classifier[1] = nn.Linear(in_features = 1280, out_features = 10)

In [None]:
import datetime

def training_loop(n_epochs, optimizer, model, loss_fn, train_loader):
    model = model.to(device=device)
    for epoch in range(1, n_epochs + 1):
        loss_train = 0.0
        model.train() # Modo train
        
        for imgs, labels in train_loader:
            imgs = imgs.to(device=device)  # <1>
            labels = labels.to(device=device)
            logits = model(imgs)
            loss = loss_fn(logits, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            loss_train += loss.item()

        if epoch == 1 or epoch % 2 == 0:
            print('{} Epoch {}, Training loss {}'.format(
                datetime.datetime.now(), epoch,
                loss_train / len(train_loader)))
            validate(model, train_loader, val_loader, device)
    return model

In [None]:
def validate(model, train_loader, val_loader, device):
    for name, loader in [("train", train_loader), ("val", val_loader)]:
        correct = 0
        total = 0
        
        model.eval() # Ponemos nuestrom modelo en modo evaluación

        with torch.no_grad():  # <1>
            for imgs, labels in loader:
                imgs = imgs.to(device=device)
                labels = labels.to(device=device)
                outputs = model(imgs)
                _, predicted = torch.max(outputs, dim=1) # <2>
                total += labels.shape[0]  # <3>
                correct += int((predicted == labels).sum())  # <4>

        print("Accuracy in {}: {:.2f}".format(name , correct / total))

In [None]:
optimizer = torch.optim.SGD(efficientnet.parameters(), lr=1e-3) 
loss_fn = nn.CrossEntropyLoss()
efficientnet_trained = training_loop(
    n_epochs = 20,
    optimizer = optimizer,
    model = efficientnet,
    loss_fn = loss_fn,
    train_loader = train_loader,
)

In [None]:
torch.save(efficientnet_trained.state_dict(), model_path + 'efficientnet.pt')

In [None]:
efficientnet

**Ejercicio 2:** Examina las redes disponibles en torchvision y prueba otra.