## Homework 3: VQ-VAE

Ранее мы с вами познакомились с **вариационными автокодировщиками**, которые используют непрерывные скрытые переменные $\mathbf{z}_e$. Однако для многих типов данных дискретные представления могут быть более естественными.

**Vector Quantized Variational Autoencoder** (**VQ-VAE** ) — подход, позволяющий обучить автокодировщики с дискретным латентным пространством. Вместо того чтобы кодировать данные в непрерывное распределение ($\boldsymbol{\mu}, \boldsymbol{\sigma}^2$), `VQ-VAE` отображает их в один из векторов словаря (кодовой книги).

**Основная идея векторной квантизации состоит в следующем**:

1. Создается **кодовая книга** (**codebook**) — обучаемый словарь из $K$ векторов-прототипов $\{\mathbf{e}_k\}_{k=1}^K$, где каждый вектор $\mathbf{e}_k \in \mathbb{R}^D$.

<center><img src="images/codebook.png" width=300></center>

2. **Кодировщик** $q_{\boldsymbol{\phi}}(c|\mathbf{x})$ преобразует входной объект $\mathbf{x}$ в непрерывное представление $\mathbf{z}_e \in \mathbb{R}^D$.

3. Для непрерывного вектора $\mathbf{z}_e$ находится **ближайший** к нему вектор $\mathbf{e}_{k^*}$ из кодовой книги:

$$k^* = \arg\min_k \|\mathbf{z}_e - \mathbf{e}_k\|$$

Результатом такого преобразования является **квантованный вектор** $\mathbf{z}_q = \mathbf{e}_{k^*}$.

<center><img src="images/clusters.png" width=250></center>

4. Квантованный вектор $\mathbf{z}_q$ передается в **декодер** $p_{\boldsymbol{\theta}}(\mathbf{x}|\mathbf{z}_q)$ для восстановления исходного объекта $\mathbf{\hat{x}}$.

В этой работе мы реализуем `VQ-VAE` и исследуем его свойства.

### Задание

Вам предстоит реализовать модель `VQ-VAE` и обучить модель на датасете `CIFAR10`.

За выполнение домашнего задания можно получить до **10 баллов**. Для части заданий мы написали для вас скелет. Заполните в них пропуски, выделенные с помощью `...`.

In [None]:
import os
from typing import List, Tuple, Dict

import numpy as np
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader

from torchvision import transforms
from torchvision.utils import make_grid
from torchvision.datasets import CIFAR10 

from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from IPython.display import clear_output

### Задание 1: Dataset (0.5 балла)

Для обучения будем использовать датасет `CIFAR-10`.

**Ваша задача**:

- Загрузить `CIFAR-10`

- Создать преобразования `ToTensor` и `Normalize` в диапазон $[-1, 1]$

- Создать `Dataset`-ы и `DataLoader`-ы для обучающей и валидационной выборок

In [None]:
# --- YOUR CODE HERE ---

### Задание 2: Residual Blocks (0.5 балла)

В оригинальной статье `VQ-VAE` авторы использовали стек `ResNet` блоков как в кодировщике, так и в декодере, чтобы построить достаточно глубокие и мощные сверточные сети. 

Структура блоков имеет следующий вид: `ReLU` $\rightarrow$ `Conv3x3` $\rightarrow$ `ReLU` $\rightarrow$ `Conv1x1`

Реализуйте классы `ResidualBlock` и `ResidualStack`, следуя архитектуре из статьи.

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, num_hiddens, num_residual_hiddens):
        super().__init__()
        self.block = nn.Sequential(
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=in_channels,
                      out_channels=num_residual_hiddens,
                      kernel_size=3, stride=1, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=num_residual_hiddens,
                      out_channels=num_hiddens,
                      kernel_size=1, stride=1, bias=False)
        )

    def forward(self, x):
        # Implement the residual connection

        return ...
    

class ResidualBlockStack(nn.Module):
    def __init__(self, in_channels, num_hiddens, num_residual_layers, num_residual_hiddens):
        super().__init__()
        self._num_residual_layers = num_residual_layers
        # Create a list self.layers and add num_residual_layers instances of ResidualBlock to the list.
        self.layers = ...

    def forward(self, x):
        # Pass x through all layers and apply Relu after the entire stack
        return ...

### Задание 3: Encoder и Decoder (1 балл)

**Encoder** и **Decoder** в `VQ-VAE` — это сверточные нейросети, отвечающие за преобразование данных между исходным пространством изображений и пространством непрерывных латентных векторов

- **Encoder**:  сжимает входное изображение $\mathbf{x}$ в латентный вектор $\mathbf{z}_e$

- **Decoder**:  будет принимать на вход квантованный вектор $\mathbf{z}_q$, полученный из $\mathbf{z}_e$ после квантизации и восстанавливает из нее изображение $\hat{\mathbf{x}}$

Ваша задача — реализовать классы `Encoder` и `Decoder`.

In [None]:
class Encoder(nn.Module):
    def __init__(self, in_channels, num_hiddens, num_residual_layers, num_residual_hiddens):
        super().__init__()
        
        # Define the layers based on the VQ-VAE paper description:
        # 1. Conv2d with stride 2 (kernel 4x4, padding 1) -> num_hiddens // 2 channels
        self.conv_1 = ...
        
        # 2. Conv2d with stride 2 (kernel 4x4, padding 1) -> num_hiddens channels
        self.conv_2 = ...
        
        # 3. Conv2d with stride 1 (kernel 3x3, padding 1) -> num_hiddens channels
        self.conv_3 = ...
        
        # 4. ResidualStack 
        self.residual_stack = ...

    def forward(self, inputs):
        # Implement the forward pass:
        # Conv1 -> ReLU -> Conv2 -> ReLU -> Conv3 -> ResidualStack
        return ...

In [None]:
class Decoder(nn.Module):
    def __init__(self, in_channels, num_hiddens, num_residual_layers, num_residual_hiddens, out_channels):
        super().__init__()
        
        # Define the layers:
        # 1. Conv2d with stride 1 (kernel 3x3, padding 1) -> num_hiddens channels
        self.conv_1 = ...
        
        # 2. ResidualStack
        self.residual_stack = ...
        
        # 3. ConvTranspose2d with stride 2 (kernel 4x4, padding 1) -> num_hiddens // 2 channels
        self.conv_trans_1 = ...

        # 4. ConvTranspose2d with stride 2 (kernel 4x4, padding 1) -> out_channels
        self.conv_trans_2 = ...

    def forward(self, inputs):
        # Implement the forward pass:
        # Conv1 -> ResidualStack (applies ReLU) -> ConvTranspose1 -> ReLU -> ConvTranspose2
        return ...

### Задание 4: VectorQuantizer (2 балла)

`VectorQuantizer` — это основной блок `VQ-VAE`, отвечающий за дискретизацию непрерывного латентного пространства.

Его основная задача для каждого вектора $\mathbf{z}_e$ найти индекс $k^*$ ближайшего вектора $\mathbf{e}_{k^*}$ из **обучаемой кодовой книги**:
$\{\mathbf{e}_k\}_{k=1}^K$.$$k^* = \arg\min_k \|\mathbf{z}_e - \mathbf{e}_k\|^2$$

И используя найденные индексы, извлечь соответствующие квантованные векторы $\mathbf{z}_q = \mathbf{e}_{k^*}$ из кодовой книги, которые и будут передаваться декодеру.

Однако проблема в том, что операция `argmin` недифференцируема, поскольку её производная почти везде равна нулю. Чтобы решить эту проблему, в `VQ-VAE` предложили использовать `Straight-Through Estimator`.

**Идея STE**:

На **прямом проходе** (**forward pass**) мы используем обычный результат операции квантования, т.е. выбираем ближайший вектор $\mathbf{z}_q = \mathbf{e}_{k^*}$.

На **обратном проходе** (**backward pass**) градиент $\nabla_{\mathbf{z}_q} \mathcal{L}$, который пришел от декодера к $\mathbf{z}_q$, используется в качестве аппроксимации для градиента по $\mathbf{z}_e$:

$$\nabla_{\mathbf{z}_e} \mathcal{L} \approx \nabla_{\mathbf{z}_q} \mathcal{L}$$

В итоге, градиент от декодера фактически пропускается через недифференцируемый блок квантования без изменений.

Хотя `STE` дает смещенную оценку градиента он всё же предоставляет кодировщику полезный обучающий сигнал. Градиент $\nabla_{\mathbf{z}_q} \mathcal{L}$ указывает, в каком направлении должен был бы измениться выбранный вектор $\mathbf{e}_{k^*}$, чтобы улучшить реконструкцию. Применяя этот же градиент к $\mathbf{z}_e$, мы подталкиваем выход кодировщика в такое направление, чтобы в следующий раз он с большей вероятностью был квантован в более "правильный" вектор из кодовой книги.


### VQ Loss

STE решает проблему проброса градиента к кодировщику, однако не предоставляет никакого обучающего сигнала для самой кодовой книги. 

Поэтому, чтобы обучить кодовую книгу и стабилизировать кодировщик, в `VQ-VAE` вводят два дополнительных лосса: 

$$L_{VQ} = \underbrace{\|\text{sg}[\mathbf{z}_e] - \mathbf{e}_{k^*}\|^2}_{\text{Codebook Loss}} + \underbrace{\beta \|\mathbf{z}_e - \text{sg}[\mathbf{e}_{k^*}]\|^2}_{\text{Commitment Loss}}$$

**Codebook Loss** отвечает за обучение кодовой книги, минимизируя $L_2$-расстояние между выходом кодировщика $\mathbf{z}_e$ и выбранным вектором $\mathbf{e}_{k^*}$. Это притягивает вектор $\mathbf{e}_{k^*}$ к $\mathbf{z}_e$. Оператор `stop-gradient` у $\mathbf{z}_e$ блокирует поток градиента к кодировщику. Это важно, так как гарантирует, что данное слагаемое лосса будет обновлять только веса кодовой книги.

**Commitment Loss** отвечает за обучение кодировщика, притягивая выход кодировщика $\mathbf{z}_e$ к выбранному вектору $\mathbf{e}_{k^*}$. Это заставляет кодировщик **фиксироваться** (**commit**) на выученных векторах словаря и предотвращает неконтролируемый рост его выходов. `Stop-gradient` у $\mathbf{e}_{k^*}$ блокирует поток градиента к кодовой книге, гарантируя, что это слагаемое будет отвечать только за кодировщик.

$\beta$ — это гиперпараметр, контролирующий силу фиксации.

### Perplexity

Часто для оценки того, насколько эффективно используется кодовая книга используется метрика **perplexity**. Она показывает эффективное количество векторов словаря, которые модель использует в среднем.

1. Для батча, состоящего из $N$ векторов $\mathbf{z}_e$ кодировщик выбирает $N$ индексов $k^*$.

2. Вычисляется среднее распределение $p$ использования кодов по этому батчу: $p = (p_1, ..., p_K)$, где $p_k$ — это средняя частота выбора $k$-го вектора из словаря.

3. **Perplexity** — это экспонента от энтропии $H(p)$ этого среднего распределения:$$PPL = e^{H(p)} = e^{-\sum_{k=1}^K p_k \log p_k}$$

**Hint**:

Низкая перплексия ($PPL << K$) указывает на **коллапс кодовой книги** (**codebook collapse**), высокая перплексия ($PPL \approx K$) указывает на хорошее, разнообразное использование словаря.

Ваша задача — заполнить пропуски в классе `VectorQuantizer`.

In [None]:
class VectorQuantizer(nn.Module):

    def __init__(self, num_embeddings: int, embedding_dim: int, beta: float):
        super().__init__()
        self.embedding_dim = embedding_dim
        self.num_embeddings = num_embeddings
        self.beta = beta

        self.embedding = nn.Embedding(self.num_embeddings, self.embedding_dim)
        self.embedding.weight.data.uniform_(-1./self.num_embeddings, 1./self.num_embeddings)

    def forward(self, inputs: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        """
        Connects the encoder output to the decoder input.
        Args:
            inputs (Tensor): Input tensor from encoder z_e [B, D, H, W].
        Returns:
            Tuple[Tensor, Tensor, Tensor]: 
                vq_loss (scalar = Codebook Loss + beta * Commitment Loss), 
                quantized_ste (tensor [B, D, H, W] with straight-through gradient), 
                encoding_indices (tensor [N] (N=B*H*W) for perplexity calculation).
        """

        # Calculate L2 distance between inputs and embedding weights
        distances = ...
        
        # Find the indices of the closest embeddings
        encoding_indices = ...  
        
        # Get the quantized latent vectors using embedding lookup 
        quantized = ...  
        
        # Calculate the codebook loss: ||sg[z_e] - e_k*||^2, use detach() for sg

        codebook_loss = ...
        
        # Calculate the commitment loss: ||z_e - sg[e_k*]||^2
        commitment_loss = ...
        
        # Combine the losses
        vq_loss = ...
        
        # Apply the Straight-Through Estimator

        quantized_ste = ...                              
        
        return ...
    
    def calculate_perplexity(self, encoding_indices: torch.Tensor) -> torch.Tensor:
        return ...


### Задание 5: VQ-VAE (1 балл)

Теперь, когда у нас есть `Encoder`, `Decoder` и `VectorQuantizer`, мы можем собрать итоговую модель.

Реализуйте класс `VQVAE`, объединив все ранее созданные компоненты.

In [None]:
class VQVAE(nn.Module):
    def __init__(self, in_channels: int, embedding_dim: int, num_embeddings: int,
                 num_hiddens: int, num_residual_layers: int, num_residual_hiddens: int,
                 beta: float):
        super().__init__()

        # Initialize the encoder
        self._encoder = ...

        # Initialize the pre-quantization Conv2d with kernel_size=1, stride=1.
        self.pre_vq_conv = ...

        # Initialize the vector quantization layer
        self.vq_layer = ...

        # Initialize the decoder
        self.decoder = ...

    def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        """
        Forward pass through the VQ-VAE model.
        Args:
            x (Tensor): Input image tensor [B, C_in, H, W].
        Returns:
            Tuple[Tensor, Tensor, Tensor]:
                vq_loss (scalar = Codebook Loss + beta * Commitment Loss),
                x_hat_logits (tensor [B, C_in, H, W] - output logits from Decoder),
                encoding_indices (tensor [N = B*H'*W'] - flat indices for perplexity calculation).
        """
        # Pass `x` through the encoder to get `z_e`
        z_e = ...

        # Pass `z_e` through the pre-quantization conv
        z_e_pre_vq = ...

        # Pass `z_e_pre_vq` through the quantization layer
        vq_loss, z_q, encoding_indices = ...

        # Pass the quantized vector `z_q` through the decoder
        x_hat_logits = ...
        
        return ...
    
    @torch.no_grad()
    def decode_from_indices(self, 
                            indices: torch.Tensor,
                            final_activation=nn.Tanh()) -> torch.Tensor:
        """
        Takes a grid of indices and decodes them into images.
        """
        self.eval()
        
        # Get the quantized vectors `z_q`
        z_q = ...
            
        # Pass `z_q` through the decoder
        out_logits = ...
        
        # Apply the final activation
        out = ...
            
        return out
    
    @torch.no_grad()
    def sample(self, 
               num_samples: int, 
               latent_shape: Tuple[int, int],
               device: torch.device, 
               final_activation=nn.Tanh()) -> torch.Tensor:
        """
        Generates images from random codes
        """
        self.eval()
        # Generate random integer indices
        indices = ...
        
        # Use `decode_from_indices` for decoding
        return self.decode_from_indices(indices, final_activation)

### Задание 6: Training and Validation Loop (1 балл)

Теперь, когда все компоненты модели готовы, мы можем собрать полный цикл обучения. Обучение `VQ-VAE` происходит путем минимизации суммарной функции потерь:
$$L_{total} = L_{rec} + L_{VQ},$$

- $L_{rec} = \|\mathbf{x} - \mathbf{\hat{x}}\|^2$ — лосс, вычисляемый между оригинальным $\mathbf{x}$ и восстановленным $\mathbf{\hat{x}}$ объектами
- $L_{VQ} = \|\mathbf{z}_e - \text{sg}[\mathbf{e}_{k^*}]\|^2 + \beta \|\text{sg}[\mathbf{z}_e] - \mathbf{e}_{k^*}\|^2$ — лосс, вычисляемый внутри `VectorQuantizer` для обучения кодовой книги и кодировщика.

Мы будем отслеживать все компоненты функции потерь (`total`, `reconstruction`, `vq`) и `Perplexity` на обучающей и валидационной выборках, чтобы контролировать процесс обучения.

Реализуйте функции `train_step_vqvae`, `validate_step_vqvae` и основной цикл `train_loop_vqvae`.

In [None]:
def train_step_vqvae(model, optimizer, train_loader, device):

    model.train()

    epoch_losses = {'loss': 0.0, 'recon_loss': 0.0, 'vq_loss': 0.0, 'perplexity': 0.0}
    num_batches = len(train_loader)

    for batch_x, _ in tqdm(train_loader, desc="Train", leave=False):
        x = batch_x.to(device)
        optimizer.zero_grad()

        # Get model outputs
        ...

        # Apply final activation
        ...

        # Calculate reconstruction loss, total loss, perplexity
        ...

        # Get average losses over batches
       
    return epoch_losses

In [None]:
@torch.no_grad()
def validate_step_vqvae(model, val_loader, device):
    
    model.eval()
    epoch_losses = {'loss': 0.0, 'recon_loss': 0.0, 'vq_loss': 0.0, 'perplexity': 0.0}
    num_batches = len(val_loader)

    for batch_x, _ in tqdm(val_loader, desc="Val", leave=False):
        x = batch_x.to(device)

        # --- YOUR CODE HERE ---
        
    return epoch_losses


In [None]:
def visualize_reconstructions(model, data_loader, device, epoch, num_images=10, final_activation=F.tanh):
    model.eval()
    
    x_batch, _ = next(iter(data_loader))
    originals = x_batch[:num_images].to(device)

    _, x_hat_logits, _ = model(originals)
    reconstructions = final_activation(x_hat_logits)
    
    originals_cpu = originals.cpu()
    reconstructions_cpu = reconstructions.cpu()

    images_to_plot = torch.cat([originals_cpu, reconstructions_cpu], dim=0)
    grid = make_grid(images_to_plot * 0.5 + 0.5, nrow=num_images)
    
    plt.figure(figsize=(num_images * 1.5, 4))
    plt.imshow(grid.permute(1, 2, 0))
    plt.title(f"Reconstructions: Epoch {epoch} (Top: Original, Bottom: Reconstructed)")
    plt.axis('off')
    plt.show()

In [None]:
def plot_all_losses(train_stats, val_stats):
    clear_output(wait=True)
    epochs = range(1, len(train_stats['loss']) + 1)
    
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    axes = axes.flatten() 
    
    metrics_to_plot = [
        ('loss', 'Total Loss'), 
        ('recon_loss', 'Reconstruction Loss'), 
        ('vq_loss', 'VQ + Commitment Loss'), 
        ('perplexity', 'Perplexity')
    ]
    
    for ax, (key, title) in zip(axes, metrics_to_plot):
        ax.plot(epochs, train_stats[key], label=f'Train {title}')
        ax.plot(epochs, val_stats[key], label=f'Validation {title}')
        ax.set_title(title)
        ax.set_xlabel('Epoch')
        ax.set_ylabel('Value')
        ax.grid(True)
        ax.legend()
        
    fig.suptitle('Training VQ-VAE', fontsize=14)
    plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    plt.show()
    
def save_checkpoint(model, optimizer, epoch, path_template):
    save_path = path_template.format(epoch=epoch)
    save_dir = os.path.dirname(save_path)
    if save_dir:
        os.makedirs(save_dir, exist_ok=True)

    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict()
    }, save_path)

    print(f"Checkpoint saved: {save_path}")

In [None]:
def train_loop_vqvae(model, optimizer, train_loader, val_loader, num_epochs, device,
                     checkpoint_path=None):
    model = model.to(device)
    train_history = {'loss': [], 'recon_loss': [], 'vq_loss': [], 'perplexity': []}
    val_history = {'loss': [], 'recon_loss': [], 'vq_loss': [], 'perplexity': []}

    for epoch in range(1, num_epochs + 1):
        train_epoch_stats = ...
        val_epoch_stats = ...

        for key in train_history:
            ...

        clear_output(wait=True)
        plot_all_losses(train_history, val_history) 
        
        visualize_reconstructions(model, val_loader, device, epoch) 

        print(f"[Epoch {epoch}/{num_epochs}]")
        print(f"  Train: Loss={train_epoch_stats['loss']:.4f}, Recon={train_epoch_stats['recon_loss']:.4f}, VQ={train_epoch_stats['vq_loss']:.4f}, PPL={train_epoch_stats['perplexity']:.2f}")
        print(f"  Val:   Loss={val_epoch_stats['loss']:.4f}, Recon={val_epoch_stats['recon_loss']:.4f}, VQ={val_epoch_stats['vq_loss']:.4f}, PPL={val_epoch_stats['perplexity']:.2f}")

        save_checkpoint(model, optimizer, epoch, checkpoint_path)

In [None]:
loss_fn_recon = nn.MSELoss() 

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_epochs = 300
learning_rate = 5e-4

in_channels = 3
embedding_dim = 64
num_embeddings = 512
num_hiddens = 256
num_residual_layers = 2
num_residual_hiddens = 64
beta = 0.25

model = VQVAE(
        in_channels=in_channels, 
        embedding_dim=embedding_dim, 
        num_embeddings=num_embeddings, 
        num_hiddens=num_hiddens, 
        num_residual_layers=num_residual_layers, 
        num_residual_hiddens=num_residual_hiddens, 
        beta=beta
    )

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
checkpoint_path = "checkpoints_vq/vqvae_epoch_{epoch}.pth"

In [None]:
train_loop_vqvae(
        model=model,
        optimizer=optimizer,
        train_loader=train_loader,
        val_loader=val_loader,
        num_epochs=num_epochs,
        device=device,
        checkpoint_path=checkpoint_path
    )

### Задание 7: Sampling (0.5 балла)

После обучения `VQ-VAE` мы можем использовать его декодер для генерации новых изображений. Давайте попробуем это сделать, загрузив обученную модель.

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vqvae_checkpoint_path = 'checkpoints_vq/vqvae_epoch_300.pth' 

model_vqvae = VQVAE(
    in_channels=3,
    embedding_dim=64,
    num_embeddings=512,
    num_hiddens=256,
    num_residual_layers=2,
    num_residual_hiddens=64,
    beta=0.25
).to(device)

checkpoint = torch.load(vqvae_checkpoint_path, map_location=device)
model_vqvae.load_state_dict(checkpoint['model_state_dict'])
model_vqvae.eval()

In [None]:
@torch.no_grad()
def visualize_generation(model, device, latent_shape=(8, 8), num_images=50, nrow=10):
    model.eval()
    
    samples = model.sample(num_images, latent_shape, device).cpu()
    
    grid = make_grid(samples * 0.5 + 0.5, nrow=nrow)
    
    plt.figure(figsize=(nrow*1.5, (num_images//nrow)*1.7 ))
    plt.imshow(grid.permute(1, 2, 0))
    plt.title(f"Generated Samples")
    plt.axis('off')
    plt.show()

In [None]:
visualize_generation(model_vqvae, device)

**Вопрос**:

Почему сгенерированные изображения имеют такой вид?
Как это можно исправить?

**Ваш ответ**:

### Learnable Prior

Итак, мы реализовали и обучили модель `VQ-VAE`. Она хорошо обучилась сжимать изображения в дискретное латентное пространство, а затем их восстанавливать. Однако, как вы могли заметить сама по себе `VQ-VAE` не является хорошей генеративной моделью. 

Чтобы `VQ-VAE` могла генерировать качественные изображения, нам необходимо научиться моделировать априорное распределение $p(\mathbf{z})$ над этими дискретными скрытыми переменными. То есть, нам нужна модель, которая понимает, какие последовательности индексов являются осмысленными.

Для этой задачи отлично подходят авторегрессионные модели. Авторы оригинальной статьи предложили использовать для этого модель `Gated PixelCNN`.

### Задание 8: Latent Dataset (1 балл)

Поскольку PixelCNN будет работать не в пространстве пикселей, а в пространстве дискретных латентных переменных, нам необходимо предварительно создать этот датасет.

**Ваша задача**:

- Загрузить обученную модель VQ-VAE

- Реализовать функцию `get_code_indices`, которая прогоняет датасет изображений через кодировщик и `Vector Quantizer` и возвращает сетку индексов кодовой книги для каждого изображения

- Создать и сохранить датасеты индексов для обучающей и валидационной выборок `CIFAR-10`

- Реализовать класс `LatentCodeDataset` для загрузки полученных данных

- Создать `DataLoader`-ы для обучающей и валидационной выборок

In [None]:
# --- YOUR CODE HERE ---

vqvae_model = ...
train_prior_loader = ...
val_prior_loader = ...

In [None]:
def get_code_indices(model, data_loader, device):
    all_indices = []
    all_labels = []
    model.eval()

    for images, labels in tqdm(data_loader, desc="Encoding"):
        images = images.to(device)

        # Pass images through the encoder
        z_e = ...

        # Pass z_e through the pre_vq_conv layer
        z_e_pre_vq = ...

        # Get the encoding indices from the vq_layer.
        ...

        # Append results 
    
    return ..., ...

In [None]:
class LatentCodeDataset(Dataset):
    def __init__(self, data_path):
        ...

    def __len__(self):
        return ...

    def __getitem__(self, idx):
        return ...

### Задание 9: Gated PixelCNN (1.5 балла)

Итак, чтобы `VQ-VAE` мог генерировать качественные изображения, нам необходимо смоделировать априорное распределение $p(\mathbf{z})$. Мы будем делать это с помощью авторегрессионной модели `Gated PixelCNN`. Вот её основные идеи:

- **Маскированные cвертки** (**Masked Convolutions**) видят только контекст (пиксели/коды выше и левее). `Маска A` применяется к первому слою, `маска B` — ко всем последующим.

- Для эффективного вычисления, в `Gated PixelCNN` используется два потока сверток:
    - **Вертикальный поток**: видит все строки над текущей
    - **Горизонтальный поток**: видит пиксели левее в текущей строке, а также обуславливается выходом вертикального потока

- Вместо стандартной **ReLU** используется **Gated Activations** $\mathbf{y} = \tanh(W_f * \mathbf{x}) \odot \sigma(W_g * \mathbf{x})$, что, как было показано авторами, значительно улучшает производительность.

<center><img src="images/GatedPixelCNN.png" width=350></center>

В этом задании вам нужно реализовать классы `GatedMaskedConv2d` и `GatedPixelCNN`.

In [None]:
class GatedActivation(nn.Module):
    def __init__(self): 
        super().__init__()
    def forward(self, x):
        # Split the input tensor `x` into two halves along the channel dimension (dim=1)
        
        # Apply tanh to the first half and sigmoid to the second half and return their element-wise product

        return ...

In [None]:
class GatedMaskedConv2d(nn.Module):
    def __init__(self, mask_type, dim, kernel_size, residual=True, n_classes=None):
        super().__init__()
        assert kernel_size % 2 == 1, "Kernel size must be odd"
        self.mask_type = mask_type
        self.n_classes = n_classes
        self.residual = residual
        
        # Conditional embedding
        if self.n_classes is not None: 
            self.class_cond_embedding = nn.Embedding(n_classes, 2*dim)
        
        # Vertical stack convolutions
        ks_v = (kernel_size//2 + 1, kernel_size)
        pad_v = (kernel_size//2, kernel_size//2)
        self.vert_stack = nn.Conv2d(dim, dim*2, ks_v, 1, pad_v)
        self.vert_to_horiz = nn.Conv2d(2*dim, 2*dim, 1)
        
        # Horizontal stack convolutions
        ks_h = (1, kernel_size//2 + 1)
        pad_h = (0, kernel_size//2)
        self.horiz_stack = nn.Conv2d(dim, dim*2, ks_h, 1, pad_h)
        self.horiz_resid = nn.Conv2d(dim, dim, 1)
        
        self.gate = GatedActivation()

    def make_causal(self):
        """Applies Mask 'A' to the convolutional weights."""
        
        # Zero out the bottom row of the vertical stack's kernel
        ...
        
        # Zero out the right-most column of the horizontal stack's kernel
        ...

    def forward(self, x_v, x_h, y=None):
        if self.mask_type == 'A': 
            self.make_causal()
        
        y_cond = 0
        if y is not None and self.class_cond_embedding is not None:
            y_cond = self.class_cond_embedding(y).unsqueeze(-1).unsqueeze(-1)

        # 1. Vertical stack:
        #    Apply self.vert_stack to x_v
        #    Add y_cond
        #    Apply self.gate to get out_v
        out_v = ...

        # 2. Horizontal stack:
        #    Apply self.vert_to_horiz to h_vert
        #    Apply self.horiz_stack to x_h
        #    Sum all components: vert_to_horiz + horiz_stack + y_cond
        #    Apply self.gate to get out
        out = ...

        # 3. Residual connection for the horizontal stream:
        if self.residual:
            out_h = ...
        else:
            out_h = ...

        return out_v, out_h

In [None]:
class GatedPixelCNN(nn.Module):
    def __init__(self, num_embeddings=512, embedding_dim=128, n_layers=15, n_classes=10):
        super().__init__()
        self.embedding_dim = embedding_dim
        self.num_embeddings = num_embeddings

        # Create the embedding layer 
        self.embedding = ...

        for i in range(n_layers):
            mask_type = 'A' if i == 0 else 'B'
            kernel_size = 7 if i == 0 else 3
            residual = False if i == 0 else True
            self.layers.append(
                GatedMaskedConv2d(mask_type, embedding_dim, kernel_size, residual, n_classes)
            )

        # Create the final output convolution layers 
        #    nn.Sequential: Conv1x1 (D -> 512) -> ReLU -> Conv1x1 (512 -> K)
        self.output_conv = ...

    def forward(self, x, y=None):
        # Embed the input indices x 
        # x_emb = ...

        # Initialize vertical and horizontal streams
        x_v, x_h = ...

        # Pass through all layers
        ...

        # Apply the final output convolutions to `x_h` to get logits
        logits = ...
        
        return logits
        

    @torch.no_grad()
    def sample(self, h=None, shape=(8, 8), num_samples=10, device='gpu'):
        self.eval()
        latents = torch.zeros((num_samples, *shape), dtype=torch.int64).to(device)
        
        if h is not None:
             h = h.to(device)
        
        for i in range(shape[0]):
            for j in range(shape[1]):

                latents = ...
                
        return latents

### Задание 10: Training and Validation Loop for PixelCNN (0.5 балла)

Реализуйте функции `train_prior`, `validate_prior` и основной цикл `train_loop_prior`.

In [None]:
def train_prior(model, optimizer, train_loader, criterion, device):
    model.train()
    total_loss = 0.0

    for codes, labels in tqdm(train_loader, desc="Prior Train", leave=False):
        codes, labels = codes.to(device), labels.to(device)

        # --- YOUR CODE HERE ---

    return ...

@torch.no_grad()
def validate_prior(model, val_loader, criterion, device):
    model.eval()
    total_loss = 0.0

    for codes, labels in tqdm(val_loader, desc="Prior Val", leave=False):
        codes, labels = codes.to(device), labels.to(device)

        # --- YOUR CODE HERE ---

    return ...

In [None]:
def plot_prior_losses(train_losses, val_losses):
    clear_output(wait=True) 
    plt.figure(figsize=(8, 5))
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Cross Entropy')
    plt.title('Loss Curve')
    plt.legend()
    plt.grid(True)
    
def visualize_prior_generation(prior_model, vqvae_model, device, shape=(8, 8), n_classes=10):
    prior_model.eval()
    vqvae_model.eval()
    
    labels_to_generate = torch.arange(n_classes).to(device)

    latent_codes = prior_model.sample(labels_to_generate, shape=shape, 
                                        batch_size=n_classes, device=device)
    
    generated_images = vqvae_model.decode_from_indices(latent_codes).cpu()
    
    grid = make_grid(generated_images * 0.5 + 0.5, nrow=n_classes) 
    plt.figure(figsize=(n_classes * 1.5, 3))
    plt.imshow(grid.permute(1, 2, 0))
    plt.title(f'Generated Images with Prior')
    plt.axis('off') 
    plt.show()

In [None]:
def train_loop_prior(prior_model, vqvae_model, optimizer, train_loader, val_loader, criterion, num_epochs, device):
    prior_model.to(device)
    vqvae_model.to(device) 
    train_losses, val_losses = [], []
    latent_shape = next(iter(train_loader))[0].shape[1:] 

    for epoch in range(1, num_epochs + 1):
        avg_train_loss = ...
        avg_val_loss = ...
        
        # --- YOUR CODE HERE ---
        
        clear_output(wait=True)
        plot_prior_losses(train_losses, val_losses)
        visualize_prior_generation(prior_model, vqvae_model, device, shape=latent_shape)
        
        print(f"[Epoch {epoch}/{num_epochs}] Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")

In [None]:
pixelcnn_embedding_dim = 128
pixelcnn_n_layers = 12 

prior_num_epochs = 10
prior_learning_rate = 1e-4


prior_model = GatedPixelCNN(
        num_embeddings=num_embeddings, 
        embedding_dim=pixelcnn_embedding_dim, 
        n_layers=pixelcnn_n_layers, 
        n_classes=10
).to(device)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(prior_model.parameters(), lr=prior_learning_rate)

train_loop_prior(prior_model, vqvae_model, optimizer, train_prior_loader, val_prior_loader, 
                     criterion, prior_num_epochs, device)


In [None]:
visualize_prior_generation(prior_model, vqvae_model, device, shape=(8, 8), n_classes=10)

### Задание 11: Выводы (0.5 балла)

Сгенерируйте 50 изображений с помощью функции `visualize_prior_generation`, посмотрите на них и сравните их с теми, что вы получили в **Задании 7**.

Ответьте на следующие вопросы:

1. Сравните качество изображений, сгенерированных с помощью **learnable prior**, и изображений, полученных путем наивного сэмплирования. В чем разница?

2. Почему обучение PixelCNN поверх дискретных кодов позволяет генерировать осмысленные изображения? Объясните роль **prior** $p(\mathbf{z})$.

**Ваш ответ**: