# Classifier Guidance em Latent Diffusion com MNIST

Neste notebook vamos implementar um pipeline didático de Latent Diffusion com MNIST, incluindo:

- Um autoencoder simples que define o espaço latente.
- Um modelo de difusão treinado no espaço latente.
- Um classificador que recebe latentes ruidosos e prevê o dígito.
- Amostragem guiada por classe usando classifier guidance, modificando os passos de denoising via gradiente do classificador.

O objetivo é entender a ideia de classifier guidance no contexto de difusão latente, mais do que obter imagens de altíssima qualidade.

In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from torchvision import datasets, transforms
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np

In [None]:
torch.manual_seed(0)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

In [None]:
transform = transforms.Compose([
    transforms.ToTensor()  # [0,1]
])

train_ds = datasets.MNIST(root="./data", train=True, download=True, transform=transform)
test_ds  = datasets.MNIST(root="./data", train=False, download=True, transform=transform)

batch_size = 128
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_ds, batch_size=batch_size, shuffle=False)

# Visualizar alguns exemplos
images, labels = next(iter(train_loader))
plt.figure(figsize=(10, 4))
for i in range(8):
    plt.subplot(1, 8, i+1)
    plt.imshow(images[i, 0].numpy(), cmap="gray")
    plt.title(int(labels[i]))
    plt.axis("off")
plt.tight_layout()
plt.show()

## Autoencoder Variacional (VAE)

O primeiro componente do nosso sistema é o VAE. O objetivo é comprimir a imagem de entrada $x \in \mathbb{R}^{784}$ em um vetor latente $z \in \mathbb{R}^{d}$, onde $d \ll 784$. O VAE aprende uma distribuição probabilística $q_\phi(z|x)$ (codificador) e $p_\theta(x|z)$ (decodificador).

Para permitir o treinamento via backpropagation, utilizamos o **truque de reparametrização**. Em vez de amostrar $z$ diretamente da distribuição estocástica, definimos:

$$
z = \mu + \sigma \odot \epsilon, \quad \text{onde } \epsilon \sim \mathcal{N}(0, I)
$$

Isso permite que o gradiente flua deterministicamente através de $\mu$ e $\sigma$.

In [None]:
class VAE(nn.Module):
    def __init__(self, latent_dim=32):
        super().__init__()

        # encoder
        self.enc = nn.Sequential(
            nn.Linear(784, 400),
            nn.ReLU()
        )
        self.fc_mu = nn.Linear(400, latent_dim)
        self.fc_logvar = nn.Linear(400, latent_dim)

        # decoder
        self.dec = nn.Sequential(
            nn.Linear(latent_dim, 400),
            nn.ReLU(),
            nn.Linear(400, 784),
            nn.Sigmoid()
        )

    def encode(self, x):
        h = self.enc(x)
        mu = self.fc_mu(h)
        logvar = self.fc_logvar(h)
        return mu, logvar

    def reparam(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z):
        x_hat = self.dec(z)
        return x_hat.view(-1, 1, 28, 28)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        mu, logvar = self.encode(x)
        z = self.reparam(mu, logvar)
        x_hat = self.decode(z)
        return x_hat, mu, logvar

### Função de Perda: ELBO

A função de perda do VAE é derivada do *Evidence Lower Bound* (ELBO). Ela maximiza a probabilidade dos dados enquanto minimiza a divergência entre a distribuição latente aproximada e uma priori (geralmente Gaussiana unitária). A perda é composta por dois termos:

1.  **Erro de Reconstrução:** Mede a fidelidade da imagem decodificada $\hat{x}$ em relação a $x$.
2.  **Divergência KL:** Regulariza o espaço latente para que $q_\phi(z|x)$ se aproxime de $\mathcal{N}(0, I)$.

$$
\mathcal{L}_{VAE} = \mathbb{E}_{q}[\log p(x|z)] - D_{KL}(q(z|x) || p(z))
$$

Na prática, minimizamos o negativo do ELBO.

In [None]:
def vae_loss(x, x_hat, mu, logvar):
    recon = F.binary_cross_entropy(x_hat, x, reduction="sum")
    kl = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return recon + kl

### Treinamento do VAE

Treinaremos o VAE por poucas épocas, o suficiente para obtermos uma representação latente coerente. Note que aplicamos um peso $\beta$ (no código ajustado para 0.1) ao termo KL para evitar o *colapso posterior*, onde o codificador ignora a entrada e produz apenas a priori, o que prejudicaria a capacidade de reconstrução.

In [None]:
latent_dim = 16
vae = VAE(latent_dim).to(device)
optimizer_vae = torch.optim.Adam(vae.parameters(), lr=1e-3)

In [None]:
vae_epochs = 20

vae.train()
for epoch in range(vae_epochs):
    total_loss = 0

    for x, _ in train_loader:
        x = x.to(device)

        optimizer_vae.zero_grad()

        # forward
        recon_x, mu, logvar = vae(x)

        # perda de reconstrução (MSE)
        loss_recon = F.mse_loss(recon_x, x, reduction="sum")

        # KL Divergence
        loss_kl = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

        # Total (com peso KL reduzido para estabilidade)
        loss = loss_recon + 0.1 * loss_kl

        loss.backward()
        optimizer_vae.step()

        total_loss += loss.item()

    print(f"[VAE] Epoch {epoch+1}/{vae_epochs} Loss: {total_loss / len(train_loader.dataset):.4f}")

In [None]:
vae.eval()
x, _ = next(iter(test_loader))
x = x.to(device)

with torch.no_grad():
    x_hat, _, _ = vae(x)

x = x.cpu()
x_hat = x_hat.cpu()

plt.figure(figsize=(8,4))
for i in range(8):
    plt.subplot(2,8,i+1)
    plt.imshow(x[i,0], cmap="gray")
    plt.axis("off")

    plt.subplot(2,8,8+i+1)
    plt.imshow(x_hat[i,0], cmap="gray")
    plt.axis("off")

plt.suptitle("VAE - Original (linha 1) vs Reconstrução (linha 2)")
plt.tight_layout()
plt.show()

## Difusão Latente

Esta é a etapa que caracteriza o *Latent Diffusion*. Em vez de treinarmos o modelo de difusão nas imagens brutas, nós "congelamos" o VAE e codificamos todo o dataset MNIST para o espaço latente $z$.

A partir de agora, nosso dataset de treinamento consiste em pares $(z, y)$, onde $z$ são os vetores latentes amostrados e $y$ são os rótulos das classes. Isso reduz a dimensionalidade do problema e foca o modelo de difusão na aprendizagem da semântica estrutural dos dados comprimidos.

In [None]:
def encode_dataset(loader, vae):
    vae.eval()
    zs, ys = [], []
    with torch.no_grad():
        for x, y in loader:
            x = x.to(device)
            mu, logvar = vae.encode(x.view(x.size(0), -1))
            z = vae.reparam(mu, logvar)
            zs.append(z.cpu())
            ys.append(y)
    return torch.cat(zs), torch.cat(ys)

train_z, train_y = encode_dataset(train_loader, vae)
test_z, test_y = encode_dataset(test_loader, vae)

train_latent = TensorDataset(train_z, train_y)
test_latent  = TensorDataset(test_z, test_y)

train_latent_loader = DataLoader(train_latent, batch_size=batch_size, shuffle=True)
test_latent_loader  = DataLoader(test_latent, batch_size=batch_size)

### Processo Forward

Definimos o processo de difusão *forward* como uma cadeia de Markov fixa que adiciona ruído Gaussiano gradualmente aos dados. Seguindo a formulação DDPM (Ho et al., 2020), definimos uma schedule de variância $\beta_t$.

A propriedade fundamental das gaussianas nos permite amostrar $z_t$ diretamente de $z_0$ sem iterar pelos passos intermediários:

$$
q(z_t | z_0) = \mathcal{N}(z_t; \sqrt{\bar{\alpha}_t} z_0, (1 - \bar{\alpha}_t)I)
$$

Onde $\alpha_t = 1 - \beta_t$ e $\bar{\alpha}_t = \prod_{s=1}^t \alpha_s$.

In [None]:
T = 20
betas = torch.linspace(1e-4, 0.02, T).to(device)
alphas = 1 - betas
alphas_cumprod = torch.cumprod(alphas, dim=0)

sqrt_ac = torch.sqrt(alphas_cumprod)
sqrt_omac = torch.sqrt(1 - alphas_cumprod)

def forward_diffusion(z0, t):
    noise = torch.randn_like(z0)
    zt = sqrt_ac[t].unsqueeze(1) * z0 + sqrt_omac[t].unsqueeze(1) * noise
    return zt, noise

### Modelo de Difusão (Denoising Network)

Nossa rede neural, $\epsilon_\theta(z_t, t)$, é um Perceptron Multicamadas (MLP) condicionado pelo tempo. Seu objetivo não é prever $z_0$ diretamente, mas sim estimar o ruído $\epsilon$ que foi adicionado à imagem no tempo $t$.

A entrada da rede é a concatenação do latente ruidoso $z_t$ e o embedding de tempo.

In [None]:
class SinusoidalTimeEmbedding(nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        self.emb_dim = emb_dim
        half_dim = emb_dim // 2
        inv_freq = torch.exp(
            -torch.arange(0, half_dim, dtype=torch.float32) * np.log(10000.0) / half_dim
        )
        self.register_buffer("inv_freq", inv_freq)

    def forward(self, t):
        # t: (B,)
        t = t.float().unsqueeze(1)  # (B,1)
        freqs = t * self.inv_freq.unsqueeze(0)  # (B, half_dim)
        emb = torch.cat([torch.sin(freqs), torch.cos(freqs)], dim=-1)
        if self.emb_dim % 2 == 1:
            emb = torch.cat([emb, torch.zeros_like(emb[:, :1])], dim=-1)
        return emb

In [None]:
class LatentDiffusionModel(nn.Module):
    def __init__(self, latent_dim, time_emb_dim=64, hidden_dim=256):
        super().__init__()
        self.time_mlp = SinusoidalTimeEmbedding(time_emb_dim)
        self.net = nn.Sequential(
            nn.Linear(latent_dim + time_emb_dim, hidden_dim),
            nn.SiLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.SiLU(),
            nn.Linear(hidden_dim, latent_dim)
        )

    def forward(self, zt, t):
        # zt: (B, latent_dim), t: (B,)
        t_emb = self.time_mlp(t)
        x = torch.cat([zt, t_emb], dim=-1)
        return self.net(x)

### Treinamento do Modelo de Difusão

O objetivo de treinamento é minimizar o erro quadrático médio (MSE) entre o ruído real adicionado e o ruído predito pelo modelo. A função de perda simplificada é dada por:

$$
\mathcal{L}_{\text{simple}} = \mathbb{E}_{z_0, \epsilon, t} \left[ \| \epsilon - \epsilon_\theta(z_t, t) \|^2 \right]
$$

Amostramos aleatoriamente passos de tempo $t$ e vetores de ruído $\epsilon$ para cada batch.

In [None]:
diffusion_model = LatentDiffusionModel(latent_dim).to(device)
diff_optimizer = torch.optim.Adam(diffusion_model.parameters(), lr=2e-4)

In [None]:
def sample_timesteps(b, T):
    return torch.randint(0, T, (b,), device=device)

n_epochs_diff = 20

for epoch in range(n_epochs_diff):
    diffusion_model.train()
    running_loss = 0.0
    for z0, _ in train_latent_loader:
        z0 = z0.to(device)
        b = z0.size(0)
        t = sample_timesteps(b, T)
        zt, noise = forward_diffusion(z0, t)

        noise_pred = diffusion_model(zt, t)
        loss = F.mse_loss(noise_pred, noise)

        diff_optimizer.zero_grad()
        loss.backward()
        diff_optimizer.step()

        running_loss += loss.item() * b

    epoch_loss = running_loss / len(train_latent_loader.dataset)
    print(f"[Diff] Epoch {epoch+1}/{n_epochs_diff} - Loss: {epoch_loss:.4f}")

In [None]:
# Amostra um batch
z0, _ = next(iter(train_latent_loader))
z0 = z0[:8].to(device)

# Visualiza forward
fig, axes = plt.subplots(3, 8, figsize=(10,4))

timesteps = [0, T//2, T-1]
for row, t in enumerate(timesteps):
    zt, _ = forward_diffusion(z0, torch.full((8,), t, device=device))
    imgs = vae.decode(zt).cpu().detach()

    for col in range(8):
        axes[row, col].imshow(imgs[col,0], cmap="gray")
        axes[row,col].axis("off")
    axes[row,0].set_ylabel(f"t={t}")

plt.suptitle("Forward diffusion no espaço latente")
plt.tight_layout()
plt.show()

## Classificador Latente

Para aplicar o **Classifier Guidance**, precisamos de um classificador $p_\phi(y | z_t, t)$. É crucial que este classificador seja treinado em representações latentes ruidosas $z_t$, pois durante a geração ele precisará fornecer gradientes para guiar a difusão a partir de estados altamente ruidosos.

O classificador recebe como entrada $z_t$ e o tempo $t$, e prevê o dígito $y$.

In [None]:
class LatentClassifier(nn.Module):
    def __init__(self, latent_dim, time_dim=64, hidden=256):
        super().__init__()
        self.time = SinusoidalTimeEmbedding(time_dim)
        self.net = nn.Sequential(
            nn.Linear(latent_dim + time_dim, hidden),
            nn.SiLU(),
            nn.Linear(hidden, hidden),
            nn.SiLU(),
            nn.Linear(hidden, 10)
        )

    def forward(self, zt, t):
        t_emb = self.time(t)
        return self.net(torch.cat([zt, t_emb], dim=1))

### Treinamento do Classificador

O treinamento simula o processo de difusão: para cada batch, amostramos um tempo $t$ e adicionamos ruído ao latente $z_0$ para obter $z_t$. O classificador é treinado com Cross Entropy convencional para prever a classe correta a partir dessa entrada ruidosa.

In [None]:
clf = LatentClassifier(latent_dim).to(device)
opt_clf = torch.optim.Adam(clf.parameters(), lr=1e-3)

In [None]:
n_epochs_clf = 10

for e in range(n_epochs_clf):
    clf.train()
    total = 0
    correct = 0

    for z0, y in train_latent_loader:
        z0, y = z0.to(device), y.to(device)
        b = z0.size(0)

        t = torch.randint(0, T, (b,), device=device)
        zt, _ = forward_diffusion(z0, t)

        logits = clf(zt, t)
        loss = F.cross_entropy(logits, y)

        opt_clf.zero_grad()
        loss.backward()
        opt_clf.step()

        total += b
        correct += (logits.argmax(1) == y).sum().item()

    print(f"[CLF] Epoch {e+1} Acc: {correct/total:.4f}")

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns

clf.eval()
all_preds = []
all_true = []

with torch.no_grad():
    for z0, y in test_latent_loader:
        z0 = z0.to(device)
        y = y.to(device)

        t = torch.randint(0, T, (z0.size(0),), device=device)
        zt, _ = forward_diffusion(z0, t)

        preds = clf(zt, t).argmax(1)
        all_preds.append(preds.cpu())
        all_true.append(y.cpu())

all_preds = torch.cat(all_preds)
all_true = torch.cat(all_true)

cm = confusion_matrix(all_true, all_preds)

plt.figure(figsize=(6,5))
sns.heatmap(cm, annot=False, cmap="Blues")
plt.title("Matriz de confusão do classificador no espaço latente")
plt.show()

## Amostragem com Classifier Guidance

Esta é a parte central da técnica. No processo de amostragem reverso padrão, estimamos a média da transição $p_\theta(z_{t-1}|z_t)$ como $\mu_\theta(z_t, t)$. Com guidance, perturbamos essa média utilizando o gradiente do classificador:

$$
\hat{\mu}_\theta(z_t, t) = \mu_\theta(z_t, t) + s \cdot \Sigma_\theta(z_t, t) \nabla_{z_t} \log p_\phi(y|z_t, t)
$$

Onde $s$ é a escala de guidance (*guidance scale*).
- Se $s > 1$, forçamos o modelo a gerar amostras que o classificador reconhece com alta confiança (maior fidelidade, menor diversidade).
- O gradiente $\nabla_{z_t}$ indica a direção no espaço latente que maximiza a probabilidade da classe alvo $y$.

In [None]:
def p_sample_step(diff_model, clf, zt, t, y, guidance=3.0):
    b = zt.size(0)
    t_batch = torch.full((b,), t, device=device, dtype=torch.long)

    # epsθ
    with torch.no_grad():
        eps_theta = diff_model(zt, t_batch)

    # grad log p(y|z_t)
    zt_req = zt.clone().detach().requires_grad_(True)
    logits = clf(zt_req, t_batch)
    log_probs = F.log_softmax(logits, dim=1)
    selected = log_probs[torch.arange(b), y]

    grad = torch.autograd.grad(selected.sum(), zt_req)[0]

    eps_guided = eps_theta - guidance * grad

    beta_t = betas[t]
    alpha_t = alphas[t]
    alpha_bar_t = alphas_cumprod[t]
    sqrt_om = torch.sqrt(1 - alpha_bar_t)

    mean = (1/torch.sqrt(alpha_t)) * (zt - (beta_t / sqrt_om) * eps_guided)

    if t > 0:
        return mean + torch.sqrt(beta_t) * torch.randn_like(zt)
    return mean

In [None]:
def sample_guided(diff_model, clf, vae, num=16, digit=3, guidance=3.0):
    z = torch.randn(num, latent_dim, device=device)
    y = torch.full((num,), digit, device=device)

    for t in reversed(range(T)):
        z = p_sample_step(diff_model, clf, z, t, y, guidance)

    with torch.no_grad():
        x = vae.decode(z).cpu()
    return x

In [None]:
fig, axes = plt.subplots(10, 8, figsize=(8,10))

for digit in range(10):
    samples = sample_guided(diffusion_model, clf, vae, num=8, digit=digit, guidance=7.0)
    for i in range(8):
        axes[digit, i].imshow(samples[i,0], cmap="gray")
        axes[digit, i].axis("off")

plt.suptitle("Latent Diffusion + Classifier Guidance (por classe)")
plt.tight_layout()
plt.show()

## Exercícios

### Exercício 1

Altere a dimensão latente do VAE e o número de passos no modelo de difusão. Qual combinação foi melhor para imagens mais realistas geradas?

### Exercício 2

Experimente variar o fator de guidance na geração guiada. O que você observa? Qual o valor gera imagens mais realistas?