# Урок 4
Эта демонстрация разбита на 3 ноутбука:

1. Свертки и пулинги.
2. Даталоадеры.
3. **Задача классификации с использованием CNN.**

В этой части мы решим задачу предсказания пола человека по фото.

Метрикой качества будет _accuracy_, датасет возьмем IMDB-Wiki.
В качестве бейзлайна возьмем FC-сеть, затем улучшим его с помощью сверток.
После этого мы возьмем ResNet, зафайнтюним его под нашу задачу и посмотрим на качество.

Мы увидим, что сверточные сети действительно улучшают точность предсказания. Также мы посмотрим на fine-tuning: как быстро он обучится и какое качество даст.

In [1]:
import numpy as np
import cv2
import torch
from albumentations.pytorch import ToTensorV2
import albumentations as A
from scipy.io import loadmat
from torch.utils.data import Dataset


class ImdbWikiDataset(Dataset):
    def __init__(self, image_size: int = 128):
        # Из кодов выше
        imdb_dat = loadmat("imdb_crop/imdb.mat")["imdb"][0][0]
        imdb_paths = [f"imdb_crop/{path[0]}" for path in imdb_dat[2][0]]
        imdb_genders = imdb_dat[3][0]
        bad_indices = set(np.where(np.isnan(imdb_genders))[0])
        imdb_paths = [x for i, x in enumerate(imdb_paths) if i not in bad_indices]
        imdb_genders = [
            int(x) for i, x in enumerate(imdb_genders) if i not in bad_indices
        ]

        # Не будем читать картинки при создании датасета, чтобы сберечь ОЗУ.
        self.paths = imdb_paths
        self.labels = imdb_genders
        self.transforms = A.Compose(
            [
                # Подгонит под размер (128, 128)
                A.Resize(image_size, image_size),
                # A.HorizontalFlip(p=0.5),
                # Пиксели в отрезке [0; 255] - это uint8.
                # Переведем в отрезок [0.0; 1.0] - нейросети будет проще.
                A.ToFloat(max_value=255),
                # Поменяет (H, W, C) -> (C, H, W) и превратит в тензор PyTorch
                ToTensorV2(),
                # Для обогащения: будем переворачивать
            ]
        )
        assert len(self.paths) == len(self.labels)

    def __getitem__(self, index) -> tuple[torch.Tensor, int]:
        # Читать будем только одну картинку - и возвращать пару (тензор картинки, ее label)
        img_numpy = cv2.imread(self.paths[index])
        img_tensor = self.transforms(image=img_numpy)["image"]

        label = self.labels[index]
        return img_tensor, label

    def __len__(self):
        return len(self.paths)

## Классификация с использованием сверток

Попробуем решить задачу классификации пола на таком большом датасете.
Какие модели будем использовать:
- FC (_бейзлайн_);
- одна свертка и нелинейность;
- три свертки;
- три свертки и batch normalization;
- три свертки, batch normalization, dropout; 

Оптимизировать будем бинарную кросс-энтропию (BCE), в качестве метрики качества выберем accuracy.

In [2]:
import random
import os
from dataclasses import dataclass


@dataclass
class Config:
    seed: int = 0

    # Данные
    batch_size: int = 64
    do_shuffle_train: bool = True
    img_size: int = 128
    ratio_train_val_test: tuple[float, float, float] = (0.8, 0.1, 0.1)

    # Модель
    hidden_dim: int = 512
    p_dropout: float = 0.3

    # Обучение
    n_epochs: int = 10
    eval_every: int = 2000
    lr: float = 1e-5


def enable_determinism():
    os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8"
    torch.use_deterministic_algorithms(True)


def fix_seeds(seed: int):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.mps.manual_seed(seed)


config = Config()
enable_determinism()
fix_seeds(config.seed)

In [3]:
from torch.utils.data import DataLoader, random_split

# Готовим заново датасеты
generator = torch.Generator()
generator.manual_seed(config.seed)

dataset = ImdbWikiDataset(image_size=config.img_size)
train_dataset, val_dataset, test_dataset = random_split(
    dataset, lengths=config.ratio_train_val_test, generator=generator
)


# https://pytorch.org/docs/stable/notes/randomness.html#dataloader
def seed_worker(_):
    worker_seed = torch.initial_seed() % 2**32
    np.random.seed(worker_seed)
    random.seed(worker_seed)


train_loader = DataLoader(
    train_dataset,
    batch_size=config.batch_size,
    shuffle=config.do_shuffle_train,
    generator=generator,
    drop_last=True,
    # Для скорости будем готовить данные в 4 процессах
    num_workers=4,
    pin_memory=True,
    # Это для воспроизводимости https://pytorch.org/docs/stable/notes/randomness.html#dataloader
    worker_init_fn=seed_worker,
)
val_loader = DataLoader(
    val_dataset,
    batch_size=config.batch_size,
    shuffle=False,
    drop_last=True,
    num_workers=4,
    pin_memory=True,
)
test_loader = DataLoader(
    test_dataset,
    batch_size=config.batch_size,
    shuffle=False,
    drop_last=True,
)
# Здесь не будем использовать test_loader и test_dataset. Но обычно работают так:
# - на train данных обучают модель;
# - на val данных подбирают гиперпараметры;
# - на test данных финально оценивают качество модели (после подбора гиперпараметров).

In [4]:
import torch.nn as nn


class FcModel(nn.Module):
    def __init__(self, config: Config):
        super().__init__()
        self.img_size = config.img_size
        self.hidden_dim = config.hidden_dim
        n_channels = 3
        self.fc = nn.Sequential(
            nn.Linear(self.img_size * self.img_size * n_channels, self.hidden_dim),
            nn.ReLU(),
            nn.BatchNorm1d(self.hidden_dim),
            nn.Linear(self.hidden_dim, self.hidden_dim // 2),
            nn.ReLU(),
            nn.BatchNorm1d(self.hidden_dim // 2),
            nn.Linear(self.hidden_dim // 2, self.hidden_dim // 4),
            nn.ReLU(),
            nn.BatchNorm1d(self.hidden_dim // 4),
            nn.Linear(self.hidden_dim // 4, 1),
            # Будем предсказывать логиты вероятностей
        )

    def forward(self, x):
        # Схлопнем (N, C, H, W) -> (N, C * H * W)
        x = x.reshape((x.shape[0], -1))
        # И прогоним через линейные слои
        return self.fc(x)


model = FcModel(config)
x, y = next(iter(train_loader))
print(x.shape)
print(model(x).shape)

In [5]:
# Пойдем учиться.
# В качестве ошибки возьмем BCE
import torch.nn.functional as F
import tqdm
import wandb
from torch.optim import Adam


def calc_accuracy(model: nn.Module, loader: DataLoader, device: torch.device):
    count_correct, count_total = 0, 0
    model.eval()
    for img_batch, true_labels in loader:
        img_batch = img_batch.to(device)
        true_labels = true_labels.to(device)
        with torch.no_grad():
            pred_val = model(img_batch).squeeze()
        # Будем предсказывать самый вероятный класс (т.е. порог 0.5 вероятности).
        # Тогда p > 0.5 будет на положительных логитах, а p < 0.5 - на отрицательных
        pred_labels = pred_val >= 0
        count_correct += (pred_labels == true_labels).sum().item()
        count_total += len(true_labels)
    model.train()
    return count_correct / count_total


def train_loop(
    config: Config,
    model: nn.Module,
    train_loader: DataLoader,
    val_loader: DataLoader,
    device: torch.device,
    params_subset: list | None = None,
):
    if params_subset is None:
        params_subset = model.parameters()
    optimizer = Adam(params_subset, lr=config.lr)
    model.to(device)

    for epoch in range(config.n_epochs):
        print(f"Epoch #{epoch + 1}/#{config.n_epochs}")
        for i, (img_batch, true_labels) in enumerate(tqdm.tqdm(train_loader)):
            step = epoch * len(train_loader) + i
            img_batch, true_labels = img_batch.to(device), true_labels.to(device)

            optimizer.zero_grad()
            pred_labels = model(img_batch).squeeze()
            loss = F.binary_cross_entropy_with_logits(pred_labels, true_labels.float())
            loss.backward()
            optimizer.step()

            wandb.log({"loss": loss.cpu().item()}, step=step)
            if (i + 1) % config.eval_every == 0:
                # Подсчитаем accuracy на всем валидационном датасете
                wandb.log(
                    {"accuracy": calc_accuracy(model, val_loader, device)}, step=step
                )
        # В конце эпохи тоже напечатаем accuracy на val-датасете
        wandb.log({"accuracy": calc_accuracy(model, val_loader, device)}, step=step)

In [6]:
if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    # Apple Silicon
    device = torch.device("mps")
else:
    device = torch.device("cpu")
print("using device", device)

using device mps


In [7]:
wandb.init(project="lesson-4", name="simple-fc", config=config.__dict__)
train_loop(config, model, train_loader, val_loader=val_loader, device=device)
wandb.finish()

[34m[1mwandb[0m: Currently logged in as: [33mtheotheo46[0m ([33mtheotheo46-trs[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin
[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.


Epoch #1/#10


  0%|          | 0/5653 [00:00<?, ?it/s]Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'ImdbWikiDataset' on <module '__main__' (built-in)>
  0%|          | 0/5653 [01:19<?, ?it/s]


KeyboardInterrupt: 

Accuracy 69% - негусто. А какое качество дало бы константное предсказание?

In [None]:
from collections import Counter

c = Counter(dataset.labels)
c[1] / (c[1] + c[0])

Бейзлайн лучше, чем константное предсказние, но несильно.

Давайте попробуем улучшить accuracy через сверточные сети.

In [9]:
class CnnModelBase(nn.Module):
    def build_model(self):
        raise NotImplementedError()

    def explain_output(self, x: torch.Tensor):
        # Печатает размеры тензора на выходе каждого слоя
        print("## Модель ##")
        print(model)
        print("## Размерности")
        print("Пришел x:", x.shape)
        current = x
        for one_layer in self.net:
            print("#######")
            print("Слой:".ljust(8), one_layer)
            print("До:".ljust(8), current.shape)
            current = one_layer(current)
            print("После:".ljust(8), current.shape)
        print("## После всей модели")
        print(self(x).shape)

    def __init__(self, config: Config):
        super().__init__()
        self.config = config
        # Нам хватит поменьше размерности внутри
        self.hidden_dim = 64
        self.n_channels = 3
        self.net = self.build_model()
        self.head = nn.Linear(in_features=self.hidden_dim, out_features=1)

    def forward(self, x):
        x = self.net(x).squeeze()
        x = self.head(x).squeeze()
        return x

In [None]:
class SimpleCnn(CnnModelBase):
    def build_model(self):
        return nn.Sequential(
            nn.Conv2d(
                in_channels=self.n_channels, out_channels=self.hidden_dim, kernel_size=3
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=self.config.img_size - 3 + 1),
        )


model = SimpleCnn(config)
x, y = next(iter(train_loader))
model.explain_output(x)

In [None]:
wandb.init(project="lesson-4", name="1-conv", config=config.__dict__)
train_loop(config, model, train_loader, val_loader, device=device)
wandb.finish()

In [None]:
class Cnn3Layers(CnnModelBase):
    def build_model(self):
        return nn.Sequential(
            nn.Conv2d(
                in_channels=self.n_channels, kernel_size=3, out_channels=self.hidden_dim
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(
                in_channels=self.hidden_dim, kernel_size=3, out_channels=self.hidden_dim
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(
                in_channels=self.hidden_dim, kernel_size=3, out_channels=self.hidden_dim
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=28),
        )


model = Cnn3Layers(config)
x, _ = next(iter(train_loader))
model.explain_output(x)

In [None]:
wandb.init(project="lesson-4", name="3-conv", config=config.__dict__)
train_loop(config, model, train_loader, val_loader, device=device)
wandb.finish()

In [None]:
class Cnn3LayersBn(CnnModelBase):
    def build_model(self):
        return nn.Sequential(
            nn.Conv2d(
                in_channels=self.n_channels, kernel_size=3, out_channels=self.hidden_dim
            ),
            nn.ReLU(),
            # >>>>>
            nn.BatchNorm2d(self.hidden_dim),
            # <<<<<
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(
                in_channels=self.hidden_dim, kernel_size=3, out_channels=self.hidden_dim
            ),
            nn.ReLU(),
            # >>>>>
            nn.BatchNorm2d(self.hidden_dim),
            # <<<<<
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(
                in_channels=self.hidden_dim, kernel_size=3, out_channels=self.hidden_dim
            ),
            nn.ReLU(),
            # >>>>>
            nn.BatchNorm2d(self.hidden_dim),
            # <<<<<
            nn.MaxPool2d(kernel_size=28),
        )


model = Cnn3LayersBn(config)
x, _ = next(iter(train_loader))
model.explain_output(x)

In [None]:
wandb.init(project="lesson-4", name="3-conv-bn", config=config.__dict__)
train_loop(config, model, train_loader, val_loader, device=device)
wandb.finish()

In [None]:
class Cnn3LayersBnDropout(CnnModelBase):
    def build_model(self):
        return nn.Sequential(
            nn.Conv2d(
                in_channels=self.n_channels, kernel_size=3, out_channels=self.hidden_dim
            ),
            nn.ReLU(),
            nn.BatchNorm2d(self.hidden_dim),
            # >>>>>
            nn.Dropout(p=self.config.p_dropout),
            # <<<<<
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(
                in_channels=self.hidden_dim, kernel_size=3, out_channels=self.hidden_dim
            ),
            nn.ReLU(),
            nn.BatchNorm2d(self.hidden_dim),
            # >>>>>
            nn.Dropout(p=self.config.p_dropout),
            # <<<<<
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(
                in_channels=self.hidden_dim, kernel_size=3, out_channels=self.hidden_dim
            ),
            nn.ReLU(),
            nn.BatchNorm2d(self.hidden_dim),
            # >>>>>
            nn.Dropout(p=self.config.p_dropout),
            # <<<<<
            nn.MaxPool2d(kernel_size=28),
        )


model = Cnn3LayersBn(config)
x, _ = next(iter(train_loader))
model.explain_output(x)

In [None]:
wandb.init(project="lesson-4", name="3-conv-bn-dropout", config=config.__dict__)
train_loop(config, model, train_loader, val_loader, device=device)
wandb.finish()

**Вывод**: сверточные сети действительно помогли выбить большее качество.

## Fine-tuning готовой модели
Возьмем обученный ResNet и попробуем адаптировать его мощь под нашу задачу.

In [None]:
from torchvision.models import resnet34


def make_resnet():
    """Сделать модель resnet для fine-tuning.

    1. Скачивает готовый обученный ResNet.
    2. Заменяет последний слой в нем на Linear(..., 1).
    3. Инициализирует веса этому слою.
    """
    fix_seeds(config.seed)
    base_model = resnet34()
    base_model.fc = nn.Linear(base_model.fc.in_features, 1)
    torch.nn.init.xavier_uniform_(base_model.fc.weight)
    return base_model


resnet = make_resnet()
resnet(x).shape

In [None]:
fc_params = [v for k, v in resnet.named_parameters() if k in {'fc.weight', 'fc.bias'}]
fc_params

In [None]:
wandb.init(project="lesson-4", name="resnet-finetune", config=config.__dict__)
train_loop(
    config, resnet, train_loader, val_loader, device=device, params_subset=fc_params
)
wandb.finish()

Вообще, мы могли зафайнтюнить не только ResNet, но и любую сеть из рассмотренных на лекции!

In [None]:
# VGG
from torchvision.models import vgg11

vgg = vgg11()
print(vgg)

Вы можете самостоятельно попробовать поменять ResNet на любую из изученных в лекции архитектур - и сравнить качество.

## Резюме

1. Научились работать с датасетами/даталоадерами.
2. Посмотрели на качество FC и CNN в классификации картинок - увидели, что CNN выбивает большее качество.
3. Попробовали сделать fine-tuning ResNet под нашу задачу.