## Batchnorm

Применяет нормализацию по батчу; подробнее описано в [этой статье](https://arxiv.org/abs/1502.03167). Формула при этом используется следующая: $$x = \frac{x - E[x]}{\sqrt{var[x] + \epsilon}} * \gamma + \beta$$.

Среднее значение и стандартное отклонение рассчитываются для каждого измерения по мини-батчу, $\gamma$ и $\beta$ - обучаемые вектора параметров размера С (где С - размер инпута). По умолчанию элементы $\gamma$ устанавливаются равными единице, а элементы $\beta$ устанавливаются равными нулю. Стандартное отклонение вычисляется с помощью смещенной оценки, эквивалентной torch.var(input, unbiased=False).

Получается, что при этом мы нормализуем все входные данные к интервалу N~(0,1). Это может спасти нас от затухающих градиентов, если мы используем сигмоиду в качестве активации, потому что:

<img src="img/batchnorm.png" />

Чаще всего нормализация по батчу нужна для обучения сверточных нейронных сетей (CNN) и работы с картинками. 

В этом простеньком примере посмотрим, как использовать нормализацию. Возьмем стандартный датасет [CIFAR-10](https://www.cs.toronto.edu/~kriz/cifar.html): в этом датасете много маленьких картинок; всего у картинок 10 классов, по 6 тысяч картинок на класс. 

In [None]:
import os
import torch
from torch import nn
from torchvision.datasets import CIFAR10 # датасет есть в учебных датасетах torchvision
from torch.utils.data import DataLoader
from torchvision import transforms # здесь есть полезные инструменты для обработки картинок - нам понадобится функция для превращения картинки в тензор

Напишем самую простую архитектуру.

In [None]:
class MLP(nn.Module):
  '''
    Многослойный перцептрон.
    На входе: 32 * 32 * 3 (картинки 32х32 пикселя, 3 канала)
    Скрытый слой: 64 нейрона
    Второй скрытый слой: 32 нейрона (нормальная практика - делать на
    втором слое меньше в два раза)
    Выходной слой: 10 нейронов
    Поместим два слоя нормализации
  '''
  def __init__(self):
    super().__init__()
    self.layers = nn.Sequential(
      nn.Flatten(),
      nn.Linear(32 * 32 * 3, 64),
      nn.BatchNorm1d(64), # должно быть указано, сколько выходов у нейронов
      nn.ReLU(),
      nn.Linear(64, 32),
      nn.BatchNorm1d(32),
      nn.ReLU(),
      nn.Linear(32, 10)
    )


  def forward(self, x):
    '''Forward pass'''
    return self.layers(x)

In [None]:
# Зафиксируем рандом
torch.manual_seed(42)

# Приготовим датасет
dataset = CIFAR10(os.getcwd(), download=True, transform=transforms.ToTensor()) # датасет стандартный - у него свой класс
trainloader = torch.utils.data.DataLoader(dataset, batch_size=10, shuffle=True, num_workers=1)

# Инициализируем модель
mlp = MLP()

# выберем функцию потерь и вид градиентного спуска
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(mlp.parameters(), lr=1e-4)

# запускаем трейнлуп
for epoch in range(0, 5): # 5 эпох

  # печатаем эпоху
  print(f'Starting epoch {epoch + 1}')

  # текущий лосс обнуляем на каждой эпохе, чтобы его аккумулировать
  current_loss = 0.0

  # итерируемся по лоадеру
  for i, data in enumerate(trainloader, 0):

    # достаем фичи и цп из батча
    inputs, targets = data

    # обнуляем градиенты
    optimizer.zero_grad()

    # делаем форвард-проброс
    outputs = mlp(inputs)

    # вычисляем потери
    loss = loss_function(outputs, targets)

    # делаем бэкпроп
    loss.backward()

    # делаем оптимизацию весов
    optimizer.step()

    # печатаем стату
    current_loss += loss.item()
    if i % 500 == 499: # на последнем батче
        print(f'Loss after mini-batch {i + 1}: {current_loss / 500:.3f}')
        current_loss = 0.0

# обучились
print('Training process has finished.')

## Dropout

Dropout — это метод машинного обучения, при котором мы удаляем (или «выбрасываем») блоки нейронной сети для имитации одновременного обучения большого количества архитектур. Важно отметить, что отсев может значительно снизить вероятность переобучения во время тренировки.

Примерно так выглядит дропаут на схеме:

<img src="https://api.wandb.ai/files/authors/images/projects/81595/2d131f38.gif" />

Добавить дропаут в свою архитектуру очень просто:

    self.dropout = nn.Dropout(0.25) 
    
(Добавит слой дропаута с p = 0.25). 

Поиграемся с датасетом [Sonar](http://archive.ics.uci.edu/dataset/151/connectionist+bench+sonar+mines+vs+rocks), в котором содержатся данные от сигналов гидролокатора (если я правильно понимаю)), отражающихся от разных камней. Скачаем его и переименуем в csv для удобства (сами данные - просто таблица csv). 

In [None]:
!wget http://archive.ics.uci.edu/ml/machine-learning-databases/undocumented/connectionist-bench/sonar/sonar.all-data

In [None]:
import os

os.rename('sonar.all-data', 'sonar.csv')

Напишем простенький бейзлайн. 

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold

Считаем датасет.

In [None]:
# собственно считываем
data = pd.read_csv("sonar.csv", header=None)
X = data.iloc[:, 0:60]
y = data.iloc[:, 60]

# закодируем ytrue (в исходном датасете буковки)
encoder = LabelEncoder()
encoder.fit(y)
y = encoder.transform(y)

# сконвертируем в тензора
X = torch.tensor(X.values, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32).reshape(-1, 1)

Определим архитектуру:

In [None]:
class SonarModel(nn.Module):
    '''
    Обычный MLP. На входе 60 признаков
    Потом линейный слой из 60 нейронов
    Потом 30 нейронов
    Потом выходной на один нейрон. У нас бинарная классификация:
    R - камень или M - металл
    '''
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Linear(60, 60)
        self.act1 = nn.ReLU()
        self.layer2 = nn.Linear(60, 30)
        self.act2 = nn.ReLU()
        self.output = nn.Linear(30, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.act1(self.layer1(x))
        x = self.act2(self.layer2(x))
        x = self.sigmoid(self.output(x))
        return x

Напишем трейнлуп

In [None]:
def model_train(model, X_train, y_train, X_val, y_val,
                n_epochs=300, batch_size=16):
    loss_fn = nn.BCELoss() # для бинарной классификации
    optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.8)
    # даже не будем писать dataloader: сделаем ручками
    batch_start = torch.arange(0, len(X_train), batch_size)

    model.train()
    for epoch in range(n_epochs):
        for start in batch_start:
            X_batch = X_train[start:start + batch_size]
            y_batch = y_train[start:start + batch_size]
            y_pred = model(X_batch)
            loss = loss_fn(y_pred, y_batch)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    # валидация
    model.eval()
    y_pred = model(X_val)
    acc = (y_pred.round() == y_val).float().mean()
    acc = float(acc)
    return acc

# 10-фолдовая кросс-валидация: моделька у нас супер-маленькая и датасет тоже, можем позволить
kfold = StratifiedKFold(n_splits=10, shuffle=True)
accuracies = []
for train, test in kfold.split(X, y):
    # создаем модель, учим ее и вычисляем Accuracy
    torch.manual_seed(42)
    model = SonarModel()
    acc = model_train(model, X[train], y[train], X[test], y[test])
    print(f"Accuracy: {acc:.2f}")
    accuracies.append(acc)

# оцениваем, что получилось в среднем
mean = np.mean(accuracies)
std = np.std(accuracies)
print(f"Baseline: {mean * 100:.2f} (+/- {std * 100:.2f})")

Неплохо, но не идеально.

Добавим дропаут на инпуты (иногда так делается):

In [None]:
class SonarModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.dropout = nn.Dropout(0.2) # 20% шанс отрубить какую-то фичу
        self.layer1 = nn.Linear(60, 60)
        self.act1 = nn.ReLU()
        self.layer2 = nn.Linear(60, 30)
        self.act2 = nn.ReLU()
        self.output = nn.Linear(30, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.dropout(x)
        x = self.act1(self.layer1(x))
        x = self.act2(self.layer2(x))
        x = self.sigmoid(self.output(x))
        return x

Посмотрим, что получилось:

In [None]:
kfold = StratifiedKFold(n_splits=10, shuffle=True)
accuracies = []
for train, test in kfold.split(X, y):
    # создаем модель, учим ее и вычисляем Accuracy
    torch.manual_seed(42)
    model = SonarModel()
    acc = model_train(model, X[train], y[train], X[test], y[test])
    print(f"Accuracy: {acc:.2f}")
    accuracies.append(acc)

# оцениваем, что получилось в среднем
mean = np.mean(accuracies)
std = np.std(accuracies)
print(f"Baseline: {mean * 100:.2f} (+/- {std * 100:.2f})")

Чет не оче. Давайте добавим дропауты на скрытые слои (так делается куда чаще):

In [None]:
class SonarModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Linear(60, 60)
        self.act1 = nn.ReLU()
        self.dropout1 = nn.Dropout(0.2) # один после первого
        self.layer2 = nn.Linear(60, 30)
        self.act2 = nn.ReLU()
        self.dropout2 = nn.Dropout(0.2) # один после второго
        self.output = nn.Linear(30, 1)
        self.sigmoid = nn.Sigmoid()
 
    def forward(self, x):
        x = self.act1(self.layer1(x))
        x = self.dropout1(x)
        x = self.act2(self.layer2(x))
        x = self.dropout2(x)
        x = self.sigmoid(self.output(x))
        return x

In [None]:
kfold = StratifiedKFold(n_splits=10, shuffle=True)
accuracies = []
for train, test in kfold.split(X, y):
    # создаем модель, учим ее и вычисляем Accuracy
    torch.manual_seed(42)
    model = SonarModel()
    acc = model_train(model, X[train], y[train], X[test], y[test])
    print(f"Accuracy: {acc:.2f}")
    accuracies.append(acc)

# оцениваем, что получилось в среднем
mean = np.mean(accuracies)
std = np.std(accuracies)
print(f"Baseline: {mean * 100:.2f} (+/- {std * 100:.2f})")

В зависимости от рандома может оказаться одинаково с бейзлайном, но чаще покажет небольшое улучшение.