> Modelado generativo

En este *notebook*, vamos a utilizar un autocodificador variacional o VAE (del inglés, [Variational Autoencoder](https://en.wikipedia.org/wiki/Variational_autoencoder)) como una especie de "máquina para esbozar caras". El *codificador* aprende a comprimir cada imagen en solo unas pocas variables *latentes* (un boceto aproximado que captura las características principales pero no los detalles) y el *decodificador* aprende a convertir ese boceto de nuevo en una imagen completa. Vamos a forzar a que este "espacio comprimido" sigua una distribución [gaussiana](https://es.wikipedia.org/wiki/Distribuci%C3%B3n_normal) estándar (una "nube" de puntos), para así saber qué tipo de variables latentes son válidas. Esa es la parte interesante: como este espacio es suave, los puntos cercanos corresponden a caras similares, y podemos muestrear puntos aleatorios de esta nube gaussiana para generar nuevas caras que el modelo nunca ha visto antes. La imagen de abajo intenta transmitir la idea principal (observa que a la izquierda tenemos un espacio 3D, mientras que el de la derecha es 2D).

<div style="text-align: center;">
<img src="https://raw.githubusercontent.com/manuvazquez/uc3m_computation_and_intelligence/master/labs/notebooks/figures/vae.svg" alt="Description" width="1200">
</div>

En principio, podrías ejecutar el *notebook* tanto en *Colab* como localmente. ¿Se está ejecutando el *notebook* en *Colab*?

In [None]:
try:
    import google.colab
    running_in_colab = True
except ImportError:
    running_in_colab = False

running_in_colab

Si ejecutamos el *notebook* en *Colab* necesitamos instalar un par de librerías de Python. Si no, podríamos elegir una GPU si hay varias disponibles.

In [None]:
import os

if running_in_colab:
    !pip install equinox numpyro
else:
    os.environ["CUDA_VISIBLE_DEVICES"] = "0"

El resto de `import`s necesarios van aquí

In [None]:
import pathlib
import random

import numpy as np
import torch
from torch import nn
import torchvision
from torchvision import datasets
import torchvision.transforms as transforms
from torch.utils.data import Dataset
from torch import optim

from PIL import Image

import jax
import jax.numpy as jnp
import jax.random as jr
import equinox as eqx
import optax
from jaxtyping import Array, Float, Int
import numpyro.distributions as dist

import matplotlib.pyplot as plt

import kagglehub

En aras de la reproducibilidad, fijamos las *semillas* aleatorias.

In [None]:
torch.manual_seed(42)
np.random.seed(42)

Dispositivo a usar...deberías ver *[CudaDevice(id=0)]* o similar (siendo el prefijo *Cuda* lo importante) si quieres (deberías) usar la GPU disponible.

In [None]:
jax.devices()

En este *notebook* usaremos las librerías [JAX](https://docs.jax.dev/en/latest/) y [Equinox](https://docs.kidger.site/equinox/), que adoptan un enfoque más [funcional](https://es.wikipedia.org/wiki/Programaci%C3%B3n_funcional) para la computación. *PyTorch* solo se usa para el manejo de datos.

# Datos

Se descargan imágenes del [CelebFaces Dataset](https://www.kaggle.com/datasets/arnrob/celeba-small-images-dataset)

In [None]:
imgs_dir = pathlib.Path(kagglehub.dataset_download("arnrob/celeba-small-images-dataset"))
print("Path to dataset files:", imgs_dir)

Algo de código para preparar las imágenes para el entrenamiento. Esencialmente, necesitamos construir un `DataLoader` de *PyTorch* a partir de las imágenes en el directorio de arriba.

In [None]:
class CustomImageDataset(Dataset):
    
    def __init__(self, img_dir, transform=None, n_samples=None):
        
        self.img_dir = img_dir
        self.transform = transform
        self.image_files = []

        # the names of *all* image files
        print(f"Scanning directory: {img_dir}")
        all_files = [os.path.join(img_dir, f) for f in os.listdir(img_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp'))]

        # optionally limit the number of samples and shuffle for randomness
        if n_samples is not None and n_samples < len(all_files):
            self.image_files = random.sample(all_files, n_samples)
        else:
            self.image_files = all_files
            random.shuffle(self.image_files) # Shuffle if using all files

        # we don't have *actual* labels, but in the usual `Dataset` one is expected; it is set to 0 (dummy label) for all images
        self.labels = [0] * len(self.image_files)

    def __len__(self):

        return len(self.image_files)

    def __getitem__(self, idx):

        img_path = self.image_files[idx]
        image = Image.open(img_path).convert('RGB')
        
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)
        
        return image, label


def get_celeba(
        batch_size: int,
        dataset_directory: str | pathlib.Path,
        n: int | None = None,
        data_subset: str = "training", # either 'training' or 'validation'
    ) -> torch.utils.data.DataLoader:

    # size of the images after resizing
    img_size: tuple[int, int] = (64, 64)

    train_transformation = transforms.Compose([
        transforms.Resize(img_size), # *images* are resized,...
        transforms.ToTensor(), # ...converted to *tensors*,...
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)), # ...and normalized to have values in [-1, 1]
    ])

    # the path to the specific data subset (e.g., 'training')
    actual_image_directory = pathlib.Path(dataset_directory) / data_subset
    
    if not actual_image_directory.is_dir():
        raise ValueError(f"Specified data_subset '{data_subset}' not found in '{dataset_directory}'.")

    train_dataset = CustomImageDataset(actual_image_directory, train_transformation, n_samples=n)

    # a `DataLoader` is returned
    return torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

Creemos un `DataLoader` que recorra $10,000$ imágenes en batches de tamaño $32$

In [None]:
trainloader = get_celeba(32, imgs_dir, n=10_000)
trainloader

Un `DataLoader` de *PyTorch* es en última instancia un iterador...

<font color='red'>TO-DO</font>: Obtén el primer elemento de él. ¿Qué es?

In [None]:
# image = 

<font color='red'>TO-DO</font>: Extrae la primera imagen del `DataLoader` de arriba. ¿de qué tipo (Python) es?

$d_x$ en la imagen de arriba sería aquí el número total de píxeles en una imagen, es decir, *ancho* $\times$ *alto* $\times$ 3 canales (RGB)

Vamos a implementar una función para mostrar un `Tensor` de *PyTorch* como imagen. Observa que la función `transforms.Normalize` de arriba está haciendo $\frac{x - 0.5}{0.5} = 2(x-0.5)$...que debe deshacerse para obtener una imagen lista para "consumo humano".

In [None]:
def show_image(img):
    
    # image is "unnormalized"
    img = img / 2 + 0.5
    
    # pytorch expects the channel dimension first whereas matplotlib expects it last
    plt.imshow(np.transpose(img, (1, 2, 0)))
    
    plt.show()

<font color='red'>TO-DO</font>: Muestra la imagen que has extraido arriba.

In [None]:
# show_image(image)

<font color='red'>TO-DO</font>: ¿Qué pasa si omites la parte de *desnormalización* en `show_image`?

*PyTorch* tiene una función de para "pegar" juntas un conjunto de imágenes. Podemos usarla para mostrar un *batch* completo.

In [None]:
show_image(torchvision.utils.make_grid(next(iter(trainloader))[0]))

# Modelo

El modelo engloba dos componentes, el *encoder/compresor* (implementando la función `f_enc` en la imagen de arriba) y el *decoder/descompresor* (implementando la función `f_dec`). En medio de ellos tenemos el "espacio comprimido", conocido como el espacio *latente*. Podemos elegir su dimensión (tamaño).

In [None]:
d_z = 3

## Codificador

Una clase que define la arquitectura del *encoder* (es decir, el compresor). Esto asume imágenes de $64 \times 64$. Si tuvieran otro tamaño, habría que hacer ajustes.

In [None]:
class Encoder(eqx.Module):

    layers: list

    def __init__(self, d_z: int, input_channels: int = 3, rng_key: jr.PRNGKey = jr.PRNGKey(42)):

        key1, key2, key3, key4, key5, key6 = jr.split(rng_key, 6)

        self.layers = [
            eqx.nn.Conv2d(in_channels=input_channels, out_channels=32, kernel_size=4, stride=2, padding=1, key=key1),
            jax.nn.relu,
            eqx.nn.Conv2d(in_channels=32, out_channels=32, kernel_size=4, stride=2, padding=1, key=key2),
            jax.nn.relu,
            eqx.nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2, padding=1, key=key3),
            jax.nn.relu,
            eqx.nn.Conv2d(in_channels=64, out_channels=64, kernel_size=4, stride=2, padding=1, key=key4),
            jax.nn.relu,
            eqx.nn.Conv2d(in_channels=64, out_channels=256, kernel_size=4, stride=1, padding=0, key=key5),
            jnp.ravel,
            eqx.nn.Linear(256, 2*d_z, key=key6),
            lambda x: x.at[d_z:].set(jax.nn.softplus(x[d_z:]))
        ]
    
    def __call__(self, x):

        for layer in self.layers:

            x = layer(x)
        
        return x

Lo instanciamos

In [None]:
encoder = Encoder(d_z=d_z)

El objeto `encoder` se comporta en última instancia como una función que acepta una imagen (en forma de un array) como entrada, y devuelve un vector de tamaño $2 \times d_z$ que proporciona la media y desviación típica (apiladas verticalmente) de una distribución gaussiana en el espacio latente. Efectivamente, el encoder no solo te da un $z$ en el espacio comprimido, sino también una medida de su incertidumbre.

Lo aplicamos sobre la primera imagen del primer *batch*.

In [None]:
z_mean_std = encoder(next(iter(trainloader))[0][0].numpy())
z_mean, z_std = jnp.split(z_mean_std, 2)
z_mean, z_std

Observa que, como debe ser, las desviaciones típicas son no-negativas.

## Decodificador

La arquitectura para el *decoder*, es decir, el descompresor.

In [None]:
class Decoder(eqx.Module):

    layers: list

    def __init__(self, d_z: int, input_channels: int = 3, rng_key: jr.PRNGKey = jr.PRNGKey(42)):

        key1, key2, key3, key4, key5, key6 = jr.split(rng_key, 6)

        self.layers = [
            eqx.nn.Linear(d_z, 256, key=key1),
            lambda x: jnp.reshape(x, (256, 1, 1)),
            jax.nn.relu,
            eqx.nn.ConvTranspose2d(in_channels=256, out_channels=64, kernel_size=4, stride=1, padding=0, key=key2),
            jax.nn.relu,
            eqx.nn.ConvTranspose2d(in_channels=64, out_channels=64, kernel_size=4, stride=2, padding=1, key=key3),
            jax.nn.relu,
            eqx.nn.ConvTranspose2d(in_channels=64, out_channels=32, kernel_size=4, stride=2, padding=1, key=key4),
            jax.nn.relu,
            eqx.nn.ConvTranspose2d(in_channels=32, out_channels=32, kernel_size=4, stride=2, padding=1, key=key5),
            jax.nn.relu,
            eqx.nn.ConvTranspose2d(in_channels=32, out_channels=input_channels, kernel_size=4, stride=2, padding=1, key=key6),
            jax.nn.tanh
        ]
    
    def __call__(self, x):

        for layer in self.layers:

            x = layer(x)
        
        return x

`decoder` se comporta como una función que actúa sobre vectores en el espacio latente (de dimensión $d_z$).

In [None]:
decoder = Decoder(d_z=d_z)

Vamos a obtener una muestra a partir de la media y desviación estándar de arriba (en este paradigma de programación *funcional*, debemos pasar una clave/seed de generador de números pseudoaleatorios, aquí `jr.PRNGKey(42)` cada vez que queramos generar un número aleatorio)...

In [None]:
z = dist.Normal(loc=z_mean, scale=z_std).sample(jr.PRNGKey(42))

...y *decodificarla* para (¿quizás?) recuperar la imagen original.

In [None]:
x_est = decoder(z)
x_est.shape

<font color='red'>TO-DO</font>: Visualiza la imagen. ¿Cuál es el problema?

# Entrenamiento

Algunos hiperparámetros que se pueden ajustar

In [None]:
learning_rate = 1e-3
# n_epochs = 40
n_epochs = 10
d_z = 75

Las redes neuronales para el decoder y encoder se instancian *e* inicializan

In [None]:
encoder = Encoder(d_z=d_z)
decoder = Decoder(d_z=d_z)

Por conveniencia, reuniremos ambas cosas en una `tuple` de Python

In [None]:
model = encoder, decoder

Definimos la función de pérdida (la que se debe minimizar). Antes de eso, y por claridad, también definimos la [divergencia de Kullback-Leibler](https://es.wikipedia.org/wiki/Divergencia_de_Kullback-Leibler), que nos da una forma de cuantificar como de diferentes son dos distribuciones de probabilidad. **Ignora** este código por ahora (en este curso), ya que tiene que ver con la teoría matemática en la que se basa un VAE.

In [None]:
def kl_loss(mean: Float[Array, 'feature'], sd: Float[Array, 'feature']) -> Float[Array, '']:

    return -0.5 * jnp.sum(1 + 2*jnp.log(sd) - mean**2 - sd**2)

def loss(model, x: Float[Array, 'batch channel width height'], rng_key) -> Float[Array, '']:

    # never mind for now...but this is variance assumed for the decoded `x`
    x_var = 0.1

    encoder, decoder = model
    z_mean_std = jax.vmap(encoder)(x)

    z = dist.Normal(loc=z_mean_std[:, :d_z], scale=z_mean_std[:, d_z:]).sample(rng_key)
    
    x_pred = jax.vmap(decoder)(z)

    log_likelihood = dist.Normal(loc=x_pred, scale=jnp.sqrt(x_var)).log_prob(x).sum()

    kl_divergence = jax.vmap(kl_loss)(z_mean_std[:, :d_z], z_mean_std[:, d_z:]).sum()
    
    return -log_likelihood + kl_divergence

La función de pérdida no es más que una...función, a la que puedes llamar como cualquier otra función. Obtengamos un *batch* de imágenes del `DataLoader` de arriba

In [None]:
images, _ = next(iter(trainloader))
images.shape

<font color='red'>TO-DO</font>: Explica el tamaño del `Tensor` de arriba.

<font color='red'>TO-DO</font>: Llama a la función de pérdida (con el modelo de arriba) sobre las imágenes. A la vista de la definición, la función `loss` recibe:

- el modelo,

- un array de *numpy* o de *JAX*, por lo que debes convertir `images`, que es un tensor de *PyTorch* (puedes usar el método `numpy()` sobre `images`)

- una clave de generador de números pseudoaleatorios (puedes usar de nuevo `jr.PRNGKey(42)`...u otra cosa) para producir los números aleatorios necesarios.

Una ventaja de *JAX*/*Equinox* es que si tienes una función (de Python), puedes obtener fácilmente el [gradiente](https://es.wikipedia.org/wiki/Gradiente) de esa función usando `jax.grad`. En este caso, como estamos usando *Equinox*, llamamos al *wrapper* equivalente `eqx.filter_grad`.

In [None]:
grad_loss = eqx.filter_grad(loss)

Ahora, `grad_loss` es una función que recibe los mismos argumentos que `loss`, así que puedes...

<font color='red'>TO-DO</font>: ...llamar a la función tal como llamas a `loss` arriba. ¿Qué resulta? Ten en cuenta que estamos calculando el *gradiente* de la función de pérdida!!

Empaquetatamos en una única función todas las operaciones que hay que hacer sobre cada *batch* durante el entrenamiento.

In [None]:
@eqx.filter_jit
def take_step(model, opt_state, x: jax.Array, rng_key):

    loss_value, grads = eqx.filter_value_and_grad(loss)(model, x, rng_key)

    updates, opt_state = optim.update(grads, opt_state, model)
    model = eqx.apply_updates(model, updates)
    
    return model, opt_state, loss_value

## Bucle

Vamos a implementar el bucle de entrenamiento. Verás que la evolución de la *pérdida* es ruidosa: es de esperar. Además, la primera iteración podría tardar un poco (ya que los datos se están leyendo en memoria).

In [None]:
optim = optax.adam(learning_rate)
opt_state = optim.init(eqx.filter(model, eqx.is_array))

key = jr.PRNGKey(42)

for epoch in range(n_epochs):

    key, subkey = jr.split(key)

    for x, _ in trainloader:

        model, opt_state, loss_value = take_step(model, opt_state, x.numpy(), subkey)

    print(epoch, loss_value)

# Resultados

Veamos la primera imagen en el último *batch* procesado en el bucle de entrenamiento (todavía en `x`)

In [None]:
show_image(x[0])

Codifiquémosla en el espacio latente y decodifiquémosla de vuelta. Formalmente, el codificador devuelve la media y desviación típica (apiladas) en el espacio de datos pero, por simplicidad, podemos tomar la media como si fuera una muestra.

In [None]:
encoder, decoder = model
show_image(decoder(encoder(x[0].numpy())[:d_z]))

# Experimentos

<font color='red'>TO-DO</font>: Genera un par de imágenes nuevas extrayendo muestras en el *espacio latente* y llamando al decoder sobre ellas.

<font color='red'>TO-DO</font>: Entrena durante más *epochs* para intentar mejorar la calidad de las *reconstrucciones*. ¿Mejora?

<font color='red'>TO-DO</font>: Entrena con muy pocas imágenes, digamos 10. El número de imágenes que usas para entrenar (del número total que hay en el conjunto de datos) está controlado por el parámetro `n` de la función `get_celeba` arriba. Usa un número mucho mayor de epochs, digamos 500, o el modelo no habrá visto suficientes ejemplos para aprender algo. Luego, genera unas pocas imágenes y compáralas. ¿Qué observas?

<font color='red'>TO-DO</font>: Experimenta con la dimensión del espacio latente. ¿Puedes obtener buenos resultados con una dimensión pequeña, digamos $d_z=10$?

# Preguntas de muestra

## ¿Qué ocurre típicamente cuando usas una dimensión del espacio latente *más pequeña* (por ejemplo, 10)?
- [ ] El modelo siempre entrena más rápido y se vuelve perfecto
- [ ] El modelo tiene menos capacidad para capturar detalles, por lo que las reconstrucciones pueden volverse más borrosas o perder información
- [ ] El modelo deja de usar la red decoder
- [ ] El modelo no puede entrenarse en absoluto

## ¿Por qué establecemos semillas aleatorias (para, por ejemplo, *numpy* o *PyTorch*) al principio?
- [ ] Para hacer que el entrenamiento se ejecute solo una vez
- [ ] Para evitar usar la GPU por error
- [ ] Para hacer los resultados más reproducibles cuando ejecutamos el código de nuevo
- [ ] Para evitar que el código use cualquier número aleatorio