### **Autoencoders, VAE, GANs y Difusión + DiT**


#### **0. Preparación del entorno**

> Si faltan librerías, descomenta la instalación.


In [None]:
# %pip install -q torch torchvision datasets diffusers transformers accelerate matplotlib numpy pillow


In [None]:
import math
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader


In [None]:
def get_device():
    if torch.cuda.is_available():
        return torch.device("cuda")
    if getattr(torch.backends, "mps", None) and torch.backends.mps.is_available():
        return torch.device("mps")
    return torch.device("cpu")

device = get_device()
print("Dispositivo:", device)


In [None]:
def show_images_grid(images, ncols=8, title=None, cmap="gray"):
    if isinstance(images, torch.Tensor):
        imgs = images.detach().cpu()
        if imgs.ndim == 2:
            imgs = imgs.unsqueeze(0).unsqueeze(0)
        elif imgs.ndim == 3:
            imgs = imgs.unsqueeze(1)
    else:
        proc = []
        for im in images:
            arr = np.array(im) if isinstance(im, Image.Image) else np.array(im)
            if arr.ndim == 2:
                arr = arr[None, ...]
            elif arr.ndim == 3 and arr.shape[-1] in (1, 3):
                arr = np.transpose(arr, (2, 0, 1))
            proc.append(torch.tensor(arr))
        imgs = torch.stack(proc)

    imgs = imgs.float()
    if imgs.max() > 1.5:
        imgs = imgs / 255.0

    n = len(imgs)
    nrows = math.ceil(n / ncols)
    fig, axes = plt.subplots(nrows, ncols, figsize=(1.6*ncols, 1.6*nrows))
    axes = np.array(axes).reshape(-1)
    for ax in axes: ax.axis("off")

    for i in range(n):
        img = imgs[i]
        if img.shape[0] == 1:
            axes[i].imshow(img[0], cmap=cmap)
        else:
            axes[i].imshow(np.transpose(img.numpy(), (1,2,0)))
    if title:
        fig.suptitle(title, y=1.02)
    plt.tight_layout()
    plt.show()


#### **1. Repaso breve: Autoencoders, VAE y GANs**

##### **AE**
- Encoder: $x\to z$
- Decoder: $z\to \hat{x}$
- Optimiza reconstrucción (MSE/BCE)

**Fortalezas:** compresión, representación latente, anomalías.  
**Limitaciones:** reconstrucciones "promedio", no siempre gran generador.

##### **VAE**
- Encoder produce $\mu$, $\log\sigma^2$
- Reparametrización: $z = \mu + \sigma \odot \epsilon$

Pérdida:
$$
\mathcal{L}=\mathcal{L}_{rec}+\beta D_{KL}(q(z|x)||p(z))
$$

**Fortalezas:** latente probabilístico, muestreo.  
**Limitaciones:** blur, trade-off con KL.

##### **GAN**
- G genera, D discrimina.

**Fortalezas:** nitidez visual.  
**Limitaciones:** inestabilidad, mode collapse.


##### **Preguntas rápidas (responder en markdown)**
1. Diferencia principal entre el latente de AE y VAE.
2. ¿Por qué GAN puede ser inestable?
3. ¿Qué ventaja conceptual aporta difusión?


In [None]:
# TODO: escribe tus respuestas aquí si prefieres usar una celda de código.


#### **2. Dataset base: MNIST**


In [None]:
from datasets import load_dataset
from torchvision import transforms

mnist = load_dataset("ylecun/mnist")
to_tensor = transforms.ToTensor()

def transform_batch(examples):
    examples["image"] = [to_tensor(img) for img in examples["image"]]
    return examples

mnist = mnist.with_transform(transform_batch)

train_loader = DataLoader(mnist["train"], batch_size=128, shuffle=True)
test_loader  = DataLoader(mnist["test"], batch_size=128, shuffle=False)

batch = next(iter(train_loader))
show_images_grid(batch["image"][:16], ncols=4, title="MNIST - muestra")
print(batch["image"].shape)


#### **3. Autoencoder guiado**

Completa las partes con **TODO** y entrena el modelo.


In [None]:
class EncoderAE(nn.Module):
    def __init__(self, latent_dim=32):
        super().__init__()
        # TODO: puedes modificar esta arquitectura
        self.net = nn.Sequential(
            nn.Conv2d(1, 16, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(16, 32, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(32*7*7, latent_dim),
        )

    def forward(self, x):
        return self.net(x)

class DecoderAE(nn.Module):
    def __init__(self, latent_dim=32):
        super().__init__()
        self.fc = nn.Linear(latent_dim, 32*7*7)
        self.deconv = nn.Sequential(
            nn.ConvTranspose2d(32, 16, 4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(16, 1, 4, stride=2, padding=1),
            nn.Sigmoid(),
        )

    def forward(self, z):
        h = self.fc(z).view(-1, 32, 7, 7)
        return self.deconv(h)

class AutoEncoder(nn.Module):
    def __init__(self, latent_dim=32):
        super().__init__()
        self.encoder = EncoderAE(latent_dim)
        self.decoder = DecoderAE(latent_dim)

    def forward(self, x):
        # TODO: x -> z -> x_hat
        z = self.encoder(x)
        x_hat = self.decoder(z)
        return x_hat


In [None]:
ae = AutoEncoder(latent_dim=32).to(device)
opt_ae = torch.optim.Adam(ae.parameters(), lr=1e-3)

def train_ae(model, loader, optimizer, steps=200):
    model.train()
    losses = []
    step = 0
    while step < steps:
        for batch in loader:
            x = batch["image"].to(device)
            x_hat = model(x)
            loss = F.mse_loss(x_hat, x)  # prueba BCE también

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            losses.append(loss.item())
            step += 1
            if step >= steps:
                break
    return losses


In [None]:
ae_losses = train_ae(ae, train_loader, opt_ae, steps=200)
plt.plot(ae_losses)
plt.title("AE - pérdida")
plt.xlabel("Paso")
plt.ylabel("MSE")
plt.show()
print("Última pérdida:", ae_losses[-1])


In [None]:
ae.eval()
with torch.no_grad():
    xb = next(iter(test_loader))["image"][:8].to(device)
    xb_hat = ae(xb)

show_images_grid(xb.cpu(), ncols=8, title="AE - originales")
show_images_grid(xb_hat.cpu(), ncols=8, title="AE - reconstrucciones")


##### **Ejercicio conceptual (AE)**
1. Prueba `latent_dim = 8` y `latent_dim = 2`.
2. Compara MSE vs BCE.
3. ¿Cómo usarías AE para detección de anomalías?


In [None]:
## Tus respuestas

#### **4. VAE guiado**


In [None]:
class EncoderVAE(nn.Module):
    def __init__(self, latent_dim=16):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1,16,3,stride=2,padding=1),
            nn.ReLU(),
            nn.Conv2d(16,32,3,stride=2,padding=1),
            nn.ReLU(),
            nn.Flatten(),
        )
        self.mu = nn.Linear(32*7*7, latent_dim)
        self.logvar = nn.Linear(32*7*7, latent_dim)

    def forward(self, x):
        h = self.features(x)
        # TODO
        return self.mu(h), self.logvar(h)

class DecoderVAE(nn.Module):
    def __init__(self, latent_dim=16):
        super().__init__()
        self.fc = nn.Linear(latent_dim, 32*7*7)
        self.deconv = nn.Sequential(
            nn.ConvTranspose2d(32,16,4,stride=2,padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(16,1,4,stride=2,padding=1),
            nn.Sigmoid(),
        )
    def forward(self, z):
        h = self.fc(z).view(-1,32,7,7)
        return self.deconv(h)

class VAE(nn.Module):
    def __init__(self, latent_dim=16):
        super().__init__()
        self.encoder = EncoderVAE(latent_dim)
        self.decoder = DecoderVAE(latent_dim)

    def reparameterize(self, mu, logvar):
        # TODO: reparametrización
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + std * eps

    def forward(self, x):
        mu, logvar = self.encoder(x)
        z = self.reparameterize(mu, logvar)
        x_hat = self.decoder(z)
        return x_hat, mu, logvar

def vae_loss(x_hat, x, mu, logvar, beta=1.0):
    # TODO: reconstrucción + KL
    rec = F.binary_cross_entropy(x_hat, x, reduction="mean")
    kl = -0.5 * torch.mean(1 + logvar - mu.pow(2) - logvar.exp())
    return rec + beta * kl, rec, kl


In [None]:
vae = VAE(latent_dim=16).to(device)
opt_vae = torch.optim.Adam(vae.parameters(), lr=1e-3)

hist = []
steps = 220
step = 0
vae.train()
while step < steps:
    for batch in train_loader:
        x = batch["image"].to(device)
        x_hat, mu, logvar = vae(x)
        loss, rec, kl = vae_loss(x_hat, x, mu, logvar, beta=1.0)

        opt_vae.zero_grad()
        loss.backward()
        opt_vae.step()

        hist.append((loss.item(), rec.item(), kl.item()))
        step += 1
        if step >= steps:
            break

hist = np.array(hist)
plt.plot(hist[:,0], label="total")
plt.plot(hist[:,1], label="rec")
plt.plot(hist[:,2], label="kl")
plt.legend(); plt.title("VAE - pérdidas"); plt.xlabel("Paso"); plt.show()
print("Última:", hist[-1])


In [None]:
vae.eval()
with torch.no_grad():
    xb = next(iter(test_loader))["image"][:8].to(device)
    xb_hat, mu, logvar = vae(xb)

show_images_grid(xb.cpu(), ncols=8, title="VAE - originales")
show_images_grid(xb_hat.cpu(), ncols=8, title="VAE - reconstrucciones")

with torch.no_grad():
    z = torch.randn(16, 16, device=device)
    samp = vae.decoder(z)
show_images_grid(samp.cpu(), ncols=4, title="VAE - muestras")


##### **Actividad guiada (VAE)**

Cambia `beta` a: `[0.1, 1.0, 4.0]`

Describe el efecto en:

- reconstrucción,
- regularización,
- muestras generadas.


In [None]:
## Tus respuestas

#### **5. GAN (repaso + 1 paso de entrenamiento)**


In [None]:
class GeneratorGAN(nn.Module):
    def __init__(self, z_dim=64):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(z_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 28*28),
            nn.Tanh()
        )
    def forward(self, z):
        return self.fc(z).view(-1,1,28,28)

class DiscriminatorGAN(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(28*28,256),
            nn.LeakyReLU(0.2),
            nn.Linear(256,128),
            nn.LeakyReLU(0.2),
            nn.Linear(128,1)
        )
    def forward(self, x):
        return self.fc(x)


In [None]:
G = GeneratorGAN().to(device)
D = DiscriminatorGAN().to(device)

bce = nn.BCEWithLogitsLoss()
opt_g = torch.optim.Adam(G.parameters(), lr=2e-4, betas=(0.5, 0.999))
opt_d = torch.optim.Adam(D.parameters(), lr=2e-4, betas=(0.5, 0.999))

batch = next(iter(train_loader))
x_real = batch["image"][:64].to(device)
x_real_scaled = x_real * 2 - 1

z = torch.randn(x_real.size(0), 64, device=device)
x_fake = G(z)

# D
d_real = D(x_real_scaled)
d_fake = D(x_fake.detach())  # detach para no actualizar G aquí
loss_d = bce(d_real, torch.ones_like(d_real)) + bce(d_fake, torch.zeros_like(d_fake))
opt_d.zero_grad(); loss_d.backward(); opt_d.step()

# G
d_fake2 = D(x_fake)
loss_g = bce(d_fake2, torch.ones_like(d_fake2))
opt_g.zero_grad(); loss_g.backward(); opt_g.step()

print("loss_D:", float(loss_d), "loss_G:", float(loss_g))
show_images_grid(((x_fake[:16]+1)/2).cpu(), ncols=4, title="GAN - muestras (sin entrenamiento real)")


##### **Preguntas (GAN)**
1. ¿Por qué escalamos las imágenes reales a `[-1, 1]`?
2. ¿Para qué sirve `detach()`?
3. Explica *mode collapse*.


In [None]:
## Tus respuestas

#### **6. Difusión: idea central (añadir ruido y aprender un denoiser)**

**Forward:**
$$
x_t = \sqrt{\bar{\alpha}_t}x_0 + \sqrt{1 - \bar{\alpha}_t}\epsilon
$$

**Reverse:**

- una red predice el ruido (u otro objetivo equivalente),
- se aplica un scheduler para retroceder desde ruido puro hasta una muestra.


In [None]:
def linear_beta_schedule(T=200, beta_start=1e-4, beta_end=0.02):
    return torch.linspace(beta_start, beta_end, T)

T = 200
betas = linear_beta_schedule(T)
alphas = 1.0 - betas
alpha_bars = torch.cumprod(alphas, dim=0)

plt.plot(betas.numpy(), label="beta_t")
plt.plot(alpha_bars.numpy(), label="alpha_bar_t")
plt.title("Schedule lineal")
plt.xlabel("t")
plt.legend()
plt.show()


In [None]:
def q_sample(x0, t, noise=None):
    if noise is None:
        noise = torch.randn_like(x0)

    if isinstance(t, int):
        t = torch.tensor([t], device=x0.device).repeat(x0.shape[0])

    a_bar_t = alpha_bars.to(x0.device)[t].view(-1,1,1,1)
    x_t = torch.sqrt(a_bar_t)*x0 + torch.sqrt(1-a_bar_t)*noise
    return x_t, noise


In [None]:
xb = next(iter(test_loader))["image"][:4].to(device)
times = [0, 20, 60, 120, 199]
viz = []
for t in times:
    x_t, _ = q_sample(xb, t)
    viz.extend([im for im in x_t.cpu()])
show_images_grid(viz, ncols=len(times), title="Forward diffusion (más t = más ruido)")


##### **Actividad de observación**
- ¿En qué `t` deja de ser reconocible el dígito?
- Relaciona eso con `alpha_bar_t`.


In [None]:
## Tus respuestas

#### **7. DDPM y DDIM (noción general)**

##### **7.1 DDPM (entrenamiento)**

El DDPM define primero un **proceso forward** (de degradación) que **sí conocemos**: tomamos una imagen real $x_0$ y la corrompemos paso a paso con ruido Gaussiano hasta llegar a algo casi indistinguible de $\mathcal{N}(0,I)$.

La forma más útil del forward no es simular $(x_1, x_2,\cdots)$, sino usar la **forma cerrada**:

$$
x_t = \sqrt{\bar{\alpha}_t} x_0 + \sqrt{1-\bar{\alpha}_t}\epsilon,
\quad \epsilon\sim\mathcal{N}(0,I)
$$

donde:

* $\beta_t$ es el "nivel de ruido" del paso $t$,
* $\alpha_t = 1-\beta_t$,
* $\bar{\alpha}_t = \prod_{s=1}^{t}\alpha_s$.

Con eso, tus 4 pasos quedan así (con el "por qué"):

1. **Escoge $t$**
   Se toma $t$ aleatorio (uniforme en $\{1,\dots,T\})$ para que el modelo aprenda a denoiser **en todos los niveles de ruido**, no solo en uno.

2. **Genera $x_t$**
   Se construye $x_t$ en una sola operación usando la ecuación cerrada. Esto hace el entrenamiento eficiente (no necesitas simular toda la cadena).

3. **Predice ruido**
   Entrenas una red $\epsilon_\theta(x_t, t)$ para aproximar el ruido real $\epsilon$ que metiste.
   Intuición: si sabes "qué ruido hay", puedes **restarlo** y recuperar algo más cercano a $x_0$.

4. **Minimiza MSE**
   $$
   \mathcal{L}(\theta)=\mathbb{E}_{x_0,t,\epsilon}\left[||\epsilon-\epsilon_\theta(x_t,t)||^2\right]
   $$

¿Por qué MSE es tan natural? Porque el forward usa ruido Gaussiano y el proceso inverso se modela como gaussiano condicionado; con esa elección, el MSE se alinea con maximizar una cota variacional (ELBO) en la formulación original de DDPM.

**Detalle práctico importante:** el modelo necesita saber $t$. Por eso se usa un **embedding temporal** (posicional/sinusoidal o MLP), para que la red sepa si está denoising "poco ruido" o "mucho ruido".

##### **7.2 DDPM (muestreo)**

Para generar, ya no tienes $x_0$. Arrancas con:

$$
x_T \sim \mathcal{N}(0,I)
$$

y aplicas un **proceso reverse** aprendido:

$$
x_{t-1} \sim p_\theta(x_{t-1}\mid x_t)
$$

En DDPM, este reverse suele escribirse como una gaussiana con media $\mu_\theta(x_t,t)$ y varianza (a veces fija) $\sigma_t^2$. La clave es que $\mu_\theta$ se calcula a partir de $\epsilon_\theta(x_t,t)$. Una forma común (conceptual) es:

* Primero estimas $x_0$ desde $x_t$ y el ruido predicho:

$$
\hat{x}_0 = \frac{1}{\sqrt{\bar{\alpha}_t}}\left(x_t - \sqrt{1-\bar{\alpha}_t}\epsilon_\theta(x_t,t)\right)
$$

* Luego construyes $x_{t-1}$ como "mezcla" entre $x_t$ y $\hat{x}_0$ más un poco de ruido:

$$
x_{t-1} = \mu_\theta(x_t,t) + \sigma_t z,\quad z\sim\mathcal{N}(0,I)
$$

**Intuición:**
Cada paso quita una fracción de ruido *y vuelve a inyectar un poquito* (el término $\sigma_t z)$ para mantener consistencia con la cadena probabilística. Eso hace el muestreo **estocástico**.

##### **7.3 DDIM**

DDIM (Denoising Diffusion Implicit Models) parte del mismo entrenamiento (misma $\epsilon_\theta)$, pero cambia el muestreo: en vez de seguir estrictamente la cadena estocástica de DDPM, define una familia de trayectorias donde puedes hacer el proceso **casi determinista**.

La idea central:

* DDPM: reverse **estocástico** (ruido en cada paso).
* DDIM: reverse **determinista o con menos ruido**, usando una reparametrización que permite saltar pasos.

Una forma de verlo:

* Usas $\hat{x}_0$ como antes.

* Calculas directamente $x_{t-1}$ (o $x_{\tau_{k-1}})$ combinando $\hat{x}_0$ y $\epsilon*\theta$, con un parámetro $\eta$ que controla cuán estocástico es:

* $\eta=0$: muestreo casi determinista (rápido, reproducible).

* $\eta>0$: reintroduces ruido (más diversidad, a veces mejor cobertura).

DDIM permite usar un subconjunto de tiempos $\tau_1 > \tau_2 > \dots > \tau_K$ con $K \ll T$. Por ejemplo, en vez de 1000 pasos, haces 50 o 20. Sigues usando la misma red, pero recorres una trayectoria "más directa".



#### **Preguntas**

1. **Pregunta conceptual:**
   ¿Por qué muestrear $t$ al azar es mejor que entrenar siempre con un mismo $t$?.

2. **Experimento de velocidad/calidad:**
   Usa el mismo denoiser y compara muestreo con:

* todos los pasos $T$,
* solo 50 pasos (submuestreo de tiempos),
  y describe qué se pierde primero: nitidez, coherencia global o diversidad.

3. **Diversidad vs determinismo:**
   Explica por qué $\eta=04$ (DDIM determinista) reduce diversidad aunque mantenga calidad.
 

In [None]:
## Tus respuestas

#### **8. Denoiser mínimo para difusión**


In [None]:
class TimeEmbedding(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(1, dim),
            nn.ReLU(),
            nn.Linear(dim, dim),
        )
    def forward(self, t):
        return self.net((t.float()/T).unsqueeze(-1))

class TinyDenoiser(nn.Module):
    def __init__(self, time_dim=32):
        super().__init__()
        self.time_mlp = TimeEmbedding(time_dim)
        self.conv1 = nn.Conv2d(1,32,3,padding=1)
        self.conv2 = nn.Conv2d(32,32,3,padding=1)
        self.conv3 = nn.Conv2d(32,1,3,padding=1)
        self.to_scale = nn.Linear(time_dim, 32)
        self.to_shift = nn.Linear(time_dim, 32)

    def forward(self, x, t):
        temb = self.time_mlp(t)
        scale = self.to_scale(temb).unsqueeze(-1).unsqueeze(-1)
        shift = self.to_shift(temb).unsqueeze(-1).unsqueeze(-1)

        h = self.conv1(x)
        h = F.relu(h * (1 + scale) + shift)
        h = F.relu(self.conv2(h))
        return self.conv3(h)


In [None]:
denoiser = TinyDenoiser().to(device)
opt_diff = torch.optim.Adam(denoiser.parameters(), lr=1e-3)

def train_denoiser(model, loader, optimizer, steps=250):
    model.train()
    losses = []
    step = 0
    while step < steps:
        for batch in loader:
            x0 = batch["image"].to(device)
            bsz = x0.size(0)

            # TODO
            t = torch.randint(0, T, (bsz,), device=device)
            x_t, noise = q_sample(x0, t)
            pred_noise = model(x_t, t)
            loss = F.mse_loss(pred_noise, noise)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            losses.append(loss.item())
            step += 1
            if step >= steps:
                break
    return losses


In [None]:
diff_losses = train_denoiser(denoiser, train_loader, opt_diff, steps=260)
plt.plot(diff_losses)
plt.title("Denoiser - pérdida")
plt.xlabel("Paso")
plt.ylabel("MSE")
plt.show()
print("Última pérdida:", diff_losses[-1])


#### **Preguntas de interpretación**

1. ¿Por qué predecir ruido?.
2. ¿Por qué muestrear `t` aleatorio?.
3. ¿Qué pasa si `T` aumenta mucho?.


In [None]:
## Tus respuestas

#### **9. Muestreo didáctico (DDPM)**


In [None]:
@torch.no_grad()
def sample_toy_ddpm(model, n_samples=16):
    model.eval()
    x = torch.randn(n_samples, 1, 28, 28, device=device)

    for t_inv in range(T-1, -1, -1):
        t = torch.full((n_samples,), t_inv, device=device, dtype=torch.long)
        pred_noise = model(x, t)

        a_t = alphas.to(device)[t].view(-1,1,1,1)
        a_bar_t = alpha_bars.to(device)[t].view(-1,1,1,1)
        beta_t = betas.to(device)[t].view(-1,1,1,1)

        x = (1/torch.sqrt(a_t)) * (x - ((1-a_t)/torch.sqrt(1-a_bar_t)) * pred_noise)

        if t_inv > 0:
            x = x + torch.sqrt(beta_t) * torch.randn_like(x)

    return x.clamp(0,1)


In [None]:
samples = sample_toy_ddpm(denoiser, n_samples=16)
show_images_grid(samples.cpu(), ncols=4, title="Muestras (toy DDPM)")


#### **Ejercicio guiado (muestreo)**

- Entrena más pasos y compara calidad.
- Cambia `T` a 100.
- Quita la parte estocástica y describe el efecto.


In [None]:
## Tus respuestas

#### **10. Difusión más allá de imágenes**

##### **Texto**
La difusión en texto es más compleja porque los tokens son discretos. Se usan variantes sobre embeddings continuos o híbridas.

##### **Audio**
Aplicaciones: síntesis, denoising, restauración, TTS.

##### **Datos científicos**
Aplicaciones: imágenes médicas, simulación, estructuras, series.

**Idea reusable:** representación + ruido + denoiser + métrica.


##### **Actividad de discusión**
Escoge  un dominio (texto, audio o datos científicos) y responde:

1. ¿Qué sería `x0`?
2. ¿Qué ruido usarían?
3. ¿Qué métrica(s) usarían?
4. ¿Qué riesgos técnicos/éticos ven?


In [None]:
## Tus respuestas

#### **11. DiT (Diffusion + Transformers)**

Un **DiT** usa un **Transformer tipo ViT** como *denoiser* en difusión: en vez de un UNet, procesa la imagen ruidosa $x_t$ como **secuencia de patches** (tokens), añade **embeddings de tiempo $t$** y **condición** (clase/texto), y predice $\hat\epsilon$ (o $x_0$)/(v)). El **scheduler** (DDPM/DDIM) sigue igual: solo cambia la red que predice.

**Cómo entra $t$ y la condición**

* $t$ y condición -> embeddings.
* Se inyectan en los bloques Transformer (típicamente vía **modulación de LayerNorm/AdaLN-Film** y/o **cross-attention** para texto).


**Limitaciones clave**

* Atención completa cuesta **(O(N^2))** con (N) patches (sube con resolución).
* UNet trae sesgos multi-escala "gratis"; DiT suele necesitar **más escala** para brillar.

**En una frase:** DiT = difusión donde el denoiser es un Transformer sobre patches, con tiempo/condición embebidos.


In [None]:
class PatchEmbed(nn.Module):
    def __init__(self, in_ch=1, patch=4, emb_dim=128, img_size=28):
        super().__init__()
        self.patch = patch
        self.grid = img_size // patch
        self.n_patches = self.grid * self.grid
        self.proj = nn.Conv2d(in_ch, emb_dim, kernel_size=patch, stride=patch)

    def forward(self, x):
        h = self.proj(x)
        return h.flatten(2).transpose(1,2)  # [B,N,D]

class TinyDiT(nn.Module):
    def __init__(self, img_size=28, patch=4, emb_dim=128, depth=3, nhead=4):
        super().__init__()
        self.patch_embed = PatchEmbed(1, patch, emb_dim, img_size)
        n_patches = self.patch_embed.n_patches
        self.pos_embed = nn.Parameter(torch.randn(1, n_patches, emb_dim) * 0.02)
        self.time_mlp = nn.Sequential(
            nn.Linear(1, emb_dim), nn.ReLU(), nn.Linear(emb_dim, emb_dim)
        )
        enc_layer = nn.TransformerEncoderLayer(
            d_model=emb_dim, nhead=nhead, dim_feedforward=emb_dim*4, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(enc_layer, num_layers=depth)
        self.patch = patch
        self.to_patch_pixels = nn.Linear(emb_dim, patch*patch)

    def forward(self, x_t, t):
        B, C, H, W = x_t.shape
        h = self.patch_embed(x_t) + self.pos_embed
        t_emb = self.time_mlp((t.float()/T).unsqueeze(-1))
        h = h + t_emb.unsqueeze(1)
        h = self.transformer(h)

        p = self.patch
        G = self.patch_embed.grid
        patch_vals = self.to_patch_pixels(h).view(B, G, G, p, p)
        x = patch_vals.permute(0,1,3,2,4).contiguous().view(B,1,G*p,G*p)
        x = F.interpolate(x, size=(H,W), mode="bilinear", align_corners=False)
        return x


In [None]:
tiny_dit = TinyDiT().to(device)
xb = next(iter(train_loader))["image"][:8].to(device)
tb = torch.randint(0, T, (xb.size(0),), device=device)
out = tiny_dit(xb, tb)
print("Entrada:", xb.shape, "Salida:", out.shape)
print("Parámetros TinyDiT:", sum(p.numel() for p in tiny_dit.parameters()))


#### **Ejercicio (DiT)**
1. Cambia `patch=2`, `patch=7` y analiza implicancias.
2. ¿Cómo agregarías condición por clase?
3. ¿Qué papel tendría el encoder de texto en un DiT condicionado por texto?


In [None]:
## Tus respuestas

#### **12. Generación condicionada por texto (visión general)**

Un pipeline **texto -> imagen** suele funcionar así:

1. **Encoder de texto (CLIP/T5)**
   Convierte el prompt en embeddings (c) que capturan semántica (objetos, estilo, relaciones).

* CLIP: fuerte alineamiento texto-imagen (muy usado en difusión).
* T5: común en algunos modelos por su capacidad lingüística.

2. **Modelo de difusión (UNet o DiT)**
   Actúa como **denoiser condicionado**: recibe $z_t$ (imagen/latente ruidoso), el tiempo $t$ y la condición $c$.
   La condición entra típicamente vía:

* **cross-attention** (tokens visuales atienden a tokens de texto), y/o
* modulación (AdaLN/FiLM).

3. **Scheduler (DDPM/DDIM/Euler/…)**
   Define *cómo* actualizas $z_t \rightarrow z_{t-1}$: número de pasos, ruido, y fórmula de actualización.

* Cambiar scheduler afecta velocidad/calidad sin reentrenar el denoiser.

4. **VAE / Latent Diffusion (muy común)**
   En vez de denoising en píxeles, se trabaja en un **latente comprimido**:

* **VAE encoder:** imagen -> latente $z$ (menor resolución/dimensiones).
* **Difusión:** denoise en $z_t$ (más barato).
* **VAE decoder:** $z_0$ -> píxeles finales.

**Por qué latentes es más eficiente**
Denoising en latente reduce drásticamente el costo (menos "pixeles" efectivos), permitiendo alta resolución y modelos más grandes.

**Pieza clave extra: Classifier-Free Guidance (CFG)**
En muestreo se combinan dos predicciones:

* una **condicionada** (con texto) y otra **no condicionada** (texto "vacío"), para aumentar "obediencia al prompt" a cambio de algo de diversidad.


In [None]:
# EJEMPLO OPCIONAL (no ejecutar si no tienes GPU/memoria suficiente)
# from diffusers import AutoPipelineForText2Image
# model_id = "stabilityai/sdxl-turbo"
# pipe = AutoPipelineForText2Image.from_pretrained(
#     model_id,
#     torch_dtype=torch.float16 if device.type == "cuda" else torch.float32
# ).to(device)
# prompt = "Un laboratorio de IA futurista, estilo infografía técnica"
# img = pipe(prompt=prompt, num_inference_steps=4, guidance_scale=0.0).images[0]
# img


#### **Actividad escrita**
1. ¿Qué hace el encoder de texto?
2. ¿Qué significa `guidance` (intuición)?
3. ¿Por qué usar un VAE dentro de un pipeline de difusión grande?


In [None]:
## Tus respuestas

#### **13. Comparación integrada**


In [None]:
# TODO: completar la comparación
comparacion = {
    "Modelo": ["AE", "VAE", "GAN", "Difusión"],
    "Estabilidad": ["", "", "", ""],
    "Calidad visual": ["", "", "", ""],
    "Velocidad de muestreo": ["", "", "", ""],
    "Latente interpretable": ["", "", "", ""],
    "Comentario": ["", "", "", ""],
}
comparacion


#### **14. Mini-proyecto (elige uno)**

**Opción A**: Agregar condición de clase (0-9) al denoiser de difusión.

**Opción B**:  Comparar AE vs VAE con el mismo `latent_dim`.

**Opción C**: Modificar `TinyDiT` para incluir condición de clase.

**Opción D**: Cambiar MNIST por Fashion-MNIST y comparar resultados.

##### **Entregable sugerido**
- capturas/figuras,
- tabla comparativa,
- respuestas conceptuales,
- código comentado.


#### **15. Ejercicios de codificación**


1. Implementa un scheduler coseno y comparar con lineal.  
2. Construye un `TinyUNet` con skip connection.  
3. Cambia el objetivo del denoiser para predecir `x0`.  
4. Agrega embedding de clase al denoiser.  
5. Implementa el muestreo rápido (menos pasos) y comparar calidad.


In [None]:
# Plantilla - Ejercicio 1
def cosine_beta_schedule(T=200, s=0.008):
    """TODO: Implementar schedule coseno y devolver betas."""
    raise NotImplementedError("Completar scheduler coseno")


In [None]:
# Plantilla - Ejercicio 4
class ClassConditionedDenoiser(nn.Module):
    def __init__(self, num_classes=10, time_dim=32, class_dim=16):
        super().__init__()
        # TODO: embedding de clase + bloque de tiempo + convs
        raise NotImplementedError("Completar arquitectura")

    def forward(self, x_t, t, y):
        # TODO: usar etiqueta y (0-9) como condición
        raise NotImplementedError("Completar forward")
