## Инициализайия и нормализация

В этом задании вам предстоит реализовать два вида нормализации: по батчам (BatchNorm1d) и по признакам (LayerNorm1d).

In [1]:
from __future__ import annotations

from typing import Callable, NamedTuple

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import Tensor

### 1. Реализация BatchNorm1d и LayerNorm1d.

#### 1.1. (2 балла) Реализуйте BatchNorm1d

Подсказка: чтобы хранить текущие значения среднего и дисперсии, вам потребуется метод `torch.nn.Module.register_buffer`, ознакомьтесь с документацией к нему. Подумайте, какие проблемы возникнут, если вы будете просто сохранять ваши значения в тензор

In [2]:
class BatchNorm1d(nn.Module):
    def __init__(
        self, num_features: int, momentum: float = 0.9, eps: float = 1e-5
    ) -> None:
        #комменты для собственного удобсва
        super().__init__()
        self.scale = nn.Parameter(torch.ones(num_features))
        self.shift = nn.Parameter(torch.zeros(num_features))
        self.register_buffer("running_mean", torch.zeros(num_features))
        self.register_buffer("running_var",torch.ones(num_features))
        self.momentum = momentum  # управляет скоростью обновления значений running_mean и running_var. Он определяет, насколько сильно текущие значения среднего и дисперсии зависят от новых батчей по сравнению с предыдущими.
        self.eps = eps
#При обновлении средних и дисперсий в батчевой нормализации по словам гпт(спрашивал правила нормировки) используется следующая формула:
#{running_mean}_{new} = momentum * running_mean_old+ (1 - momentum) * batch_mean

    def forward(self, x: Tensor) -> Tensor:
        if self.training:
            batch_mean = x.mean(0)
            batch_var = x.var(0, unbiased=False)
            self.running_mean= self.momentum * batch_mean + (1 - self.momentum) * self.running_mean #это сам питон уже посоветовал
            self.running_var= self.momentum * batch_var + (1 - self.momentum) * self.running_var
            return self.scale * (x - batch_mean) / torch.sqrt(batch_var + self.eps) + self.shift
        else:
            return self.scale * (x - self. running_mean) / torch.sqrt(self.running_var + self.eps) + self.shift
            
            

#### 1.2. (1 балл) Реализуйте LayerNorm1d

Отличия LayerNorm от BatchNorm - в том, что расчёт средних и дисперсий в BatchNorm происходит вдоль размерности батча (см. рисунок слева), а в LayerNorm - вдоль размерности признаков (см. рисунок справа).

<img src="../attachments/norm.png" width="800">

In [3]:
class LayerNorm1d(nn.Module):
    def __init__(self, num_features: int, eps: float = 1e-5) -> None:
        super(LayerNorm1d, self).__init__()
        self.scale = nn.Parameter(torch.ones(num_features))  
        self.shift = nn.Parameter(torch.zeros(num_features)) 
        self.eps = eps 

    def forward(self, x: Tensor) -> Tensor:
        # Вычисляем среднее и дисперсию для каждого примера по последней оси
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)
        x_normalized = (x - mean) / torch.sqrt(var + self.eps)
        return self.scale * x_normalized + self.shift

### 2. Эксперименты

В этом задании ваша задача - проверить, какие из приёмов хорошо справляются с нездоровыми активациями в промежуточных слоях. Вам будет дана базовая модель, у которой есть проблемы с инициализацией параметров, попробуйте несколько приёмов для устранения проблем обучения:
1. Хорошая инициализация параметров
2. Ненасыщаемая функция активации (например, `F.leaky_relu`)
3. Нормализация по батчам или по признакам (можно использовать встроенные `nn.BatchNorm1d` и `nn.LayerNorm`)
4. Более продвинутый оптимизатор (`torch.optim.RMSprop`)

#### 2.0. Подготовка: датасет, функции для обучения

Проверять наши гипотезы будем на датасете MNIST, для отладки добавим в функции для обучения возможность использовать только несколько батчей данных

In [4]:
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

train_dataset = datasets.MNIST(
    "data",
    train=True,
    download=True,
    transform=transforms.ToTensor(),
)
test_dataset = datasets.MNIST(
    "data",
    train=False,
    download=True,
    transform=transforms.ToTensor(),
)

batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [5]:
def training_step(
    batch: tuple[torch.Tensor, torch.Tensor],
    model: nn.Module,
    optimizer: torch.optim.Optimizer,
) -> torch.Tensor:
    # прогоняем батч через модель
    x, y = batch
    logits = model(x)
    # оцениваем значение ошибки
    loss = F.cross_entropy(logits, y)
    # обновляем параметры
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
    # возвращаем значение функции ошибки для логирования
    return loss


def train_epoch(
    dataloader: DataLoader,
    model: nn.Module,
    optimizer: torch.optim.Optimizer,
    max_batches: int = 100,
) -> Tensor:
    loss_values: list[float] = []
    for i, batch in enumerate(dataloader):
        loss = training_step(batch, model, optimizer)
        loss_values.append(loss.item())
        if i == max_batches:
            break
    return torch.tensor(loss_values).mean()


@torch.no_grad()
def test_epoch(
    dataloader: DataLoader, model: nn.Module, max_batches: int = 100
) -> Tensor:
    loss_values: list[float] = []
    for i, batch in enumerate(dataloader):
        x, y = batch
        logits = model(x)
        # оцениваем значение ошибки
        loss = F.cross_entropy(logits, y)
        loss_values.append(loss.item())
        if i == max_batches:
            break
        print(torch.tensor(loss_values).mean())
    return torch.tensor(loss_values).mean()


#### 2.1. Определение класса модели (2 балла)

Для удобства проведения экспериментов мы немного усложним создание модели, чтобы можно было задать разные способы инициализации параметров и нормализации промежуточных активаций, не меняя определение класса.

Добавьте в метод `__init__`:
- аргумент, который позволит использовать разные функции активации для промежуточных слоёв
- аргумент, который позволит задавать разные способы нормализации: `None` (без нормализации), `nn.BatchNorm` и `nn.LayerNorm`

In [6]:
def init_std_normal(model: nn.Module) -> None:
    """Функция для инициализации параметров модели стандартным нормальным распределением."""
    for param in model.parameters():
        torch.nn.init.normal_(param.data, mean=0, std=1)


from typing import Type


class MLP(nn.Module):
    """Базовая модель для экспериментов

    Args:
        input_dim (int): размерность входных признаков
        hidden_dim (int): размерност скрытого слоя
        output_dim (int): кол-во классов
        act_fn (Callable[[Tensor], Tensor], optional): Функция активации. Defaults to F.tanh.
        init_fn (Callable[[nn.Module], None], optional): Функция для инициализации. Defaults to init_std_normal.
        norm (Type[nn.BatchNorm1d  |  nn.LayerNorm] | None, optional): Способ нормализации промежуточных активаций.
            Defaults to None.
    """
    def __init__(
        self,
        input_dim: int,
        hidden_dim: int,
        output_dim: int,
        act_fn: Callable[[Tensor], Tensor] = torch.tanh,
        init_fn: Callable[[nn.Module], None] = init_std_normal,
        norm: Type[nn.BatchNorm1d | nn.LayerNorm] | None = None,
    ) -> None:
        super().__init__()
        # теперь линейные слои будем задавать
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        self.act_fn = act_fn
        if norm is not None:
            self.norm = norm(hidden_dim)
        else:
            self.norm = None
        # reinitialize parameters
        init_fn(self)

    def forward(self, x: Tensor) -> Tensor:
        h = self.fc1.forward(x.flatten(1))
        # here you can do normalization
        if self.norm:
            h = self.norm(h)
        h = self.act_fn(h)
        return self.fc2.forward(self.act_fn(h))

#### 2.2. Эксперименты (7 баллов)

Проведите по 3 эксперимента с каждой из модификаций с разными значениями `seed`, соберите статистику значений тестовой ошибки после 10 эпох обучения, сделайте выводы о том, что работает лучше

Проверяем:
1. Метод инициализации весов модели: $\mathcal{N}(0, 1)$ / Kaiming normal
2. Функция активации: tanh /  (или любая другая без насыщения)
3. Слой нормализации: None / BatchNorm / LayerNorm
4. Выбранный оптимизатор: SGD / RMSprop / Adam

Итого у нас 2 + 2 + 3 + 3 = 10 экспериментов, каждый нужно повторить 3 раза, посчитать среднее и вывести результаты в pandas.DataFrame.
Можно дополнительно потестировать разные сочетания опций, например инициализация + нормализация


Чтобы автоматизировать проведение экспериментов, можно использовать функцию, которая будет принимать все необходимые настройки эксперимента, запускать его и сохранять нужные метрики:

In [7]:
def run_experiment(
    model_gen: Callable[[], nn.Module],
    optim_gen: Callable[[nn.Module], torch.optim.Optimizer],
    seed: int,
    n_epochs: int = 10,
    max_batches: int | None = None,
    verbose: bool = False,
) -> float:
    """Функция для запуска экспериментов.

    Args:
        model_gen (Callable[[], nn.Module]): Функция для создания модели
        optim_gen (Callable[[nn.Module], torch.optim.Optimizer]): Функция для создания оптимизатора для модели
        seed (int): random seed
        n_epochs (int, optional): Число эпох обучения. Defaults to 10.
        max_batches (int | None, optional): Если указано, только `max_batches` минибатчей
            будет использоваться при обучении и тестировании. Defaults to None.
        verbose (bool, optional): Выводить ли информацию для отладки. Defaults to False.

    Returns:
        float: Значение ошибки на тестовой выборке в конце обучения
    """
    torch.manual_seed(seed)
    # создадим модель и выведем значение ошибки после инициализации
    model = model_gen()
    optim = optim_gen(model)
    epoch_losses: list[float] = []
    for i in range(n_epochs):
        train_loss = train_epoch(train_loader, model, optim, max_batches=max_batches)
        test_loss = test_epoch(test_loader, model, max_batches=max_batches)
        if verbose:
            print(f"Epoch {i} train loss = {train_loss:.4f}")
            print(f"Epoch {i} test loss = {test_loss:.4f}")

        epoch_losses.append(test_loss.item())

    last_epoch_loss = epoch_losses[-1]
    return last_epoch_loss

Пример использования:

In [15]:
losses = run_experiment(
    model_gen=lambda: MLP(784, 128, 10, init_fn=init_std_normal, norm=None),
    optim_gen=lambda x: torch.optim.SGD(x.parameters(), lr=0.01),
    seed=42,
    n_epochs=10,
    max_batches=100,
    verbose=True,
)

tensor(8.0549)
tensor(8.0197)
tensor(7.9458)
tensor(7.7150)
tensor(7.3126)
tensor(7.4582)
tensor(7.7400)
tensor(7.7048)
tensor(7.5503)
tensor(7.9380)
tensor(7.9343)
tensor(8.0560)
tensor(7.8657)
tensor(7.8225)
tensor(7.9060)
tensor(8.0657)
tensor(8.2160)
tensor(8.2739)
tensor(8.2516)
tensor(8.3053)
tensor(8.3724)
tensor(8.3384)
tensor(8.2979)
tensor(8.3386)
tensor(8.3031)
tensor(8.3595)
tensor(8.3882)
tensor(8.4139)
tensor(8.4045)
tensor(8.3217)
tensor(8.3446)
tensor(8.2913)
tensor(8.2716)
tensor(8.2577)
tensor(8.2792)
tensor(8.2442)
tensor(8.2493)
tensor(8.2404)
tensor(8.2385)
tensor(8.2310)
tensor(8.2261)
tensor(8.2536)
tensor(8.2768)
tensor(8.2492)
tensor(8.1883)
tensor(8.1934)
tensor(8.2441)
tensor(8.2467)
tensor(8.2316)
tensor(8.2588)
tensor(8.2663)
tensor(8.2670)
tensor(8.2528)
tensor(8.2562)
tensor(8.2708)
tensor(8.3035)
tensor(8.3358)
tensor(8.3341)
tensor(8.3209)
tensor(8.3420)
tensor(8.3404)
tensor(8.3443)
tensor(8.3229)
tensor(8.2975)
tensor(8.3104)
tensor(8.3530)
tensor(8.3

Для удобства задания настроек эксперимента можно определять их с помощью класса `Experiment`, в котором можно также реализовать логику для строкового представления:

In [9]:
input_dim = 784
hidden_dim = 128
output_dim = len(train_dataset.classes)


class Experiment(NamedTuple):
    init_fn: Callable[[nn.Module], None]
    act_fn: Callable[[Tensor], Tensor]
    norm: Type[nn.BatchNorm1d | nn.LayerNorm] | None
    optim_cls: Type[torch.optim.Optimizer]

    @property
    def model_gen(self) -> Callable[[], nn.Module]:
        return lambda: MLP(
            input_dim, hidden_dim, output_dim, init_fn=self.init_fn, norm=self.norm
        )

    @property
    def optim_gen(self) -> Callable[[nn.Module], torch.optim.Optimizer]:
        return lambda x: self.optim_cls(x.parameters(), lr=0.01)

    def __repr__(self) -> str:
        # TODO: попробуйте сделать представление эксперимента более читаемым
        return str(self)


Описываем все эксперименты:

In [10]:
options = [
    Experiment(
        init_fn=init_std_normal,
        act_fn=F.tanh,
        norm=None,
        optim_cls=torch.optim.SGD,
    ),
    Experiment(
        init_fn=init_std_normal,
        act_fn=F.silu,
        norm=nn.LayerNorm,
        optim_cls=torch.optim.SGD,
    ),
    Experiment(
        init_fn=init_std_normal,
        act_fn=F.relu,
        norm=nn.BatchNorm1d,
        optim_cls=torch.optim.RMSprop,
    ),
]

options

RecursionError: maximum recursion depth exceeded

Запускаем расчёты:

In [11]:
seeds = [42]  # здесь вам нужно 3 разных значения
results = []

for option in options:
    print(option)
    for seed in seeds:
        loss = run_experiment(
            model_gen=...,
            optim_gen=...,
            seed=seed,
            n_epochs=10,
            max_batches=None,
            verbose=False,
        )
        results.append([str(option), seed, loss])

RecursionError: maximum recursion depth exceeded

Выводим результаты:

In [None]:
import pandas as pd

pd.DataFrame(...)

ВЫВОДЫ: