# Распределение баллов

Задание 1: до **40** баллов

Задание 2: до **40** баллов

Дополнительно: до **20** баллов

# Задание 1. Классификация спутниковых снимков

В этом задании мы будем работать с [**EuroSAT Dataset**](https://github.com/phelber/eurosat). В датасете представлены космоснимки со спутника Sentinel-2, которые находятся в открытом и свободном доступе в рамках программы наблюдения Земли — Copernicus. Датасет охватывает 13 спектральных диапазонов и состоит из 10 классов с общим количеством 27 000 размеченых и привязанных к местности изображений.

Загрузите непредобученную сеть `efficientnet_lite0.ra_in1k` и обучите ее. Посчитайте метрики на тестовом датасете.

## Формат результата


* Значение метрики на тестовом датасете


Установка и импорт необходимых библиотек:

In [None]:
!pip install -q torchsat torchinfo
!pip install -q timm lightning torchmetrics torchinfo

In [None]:
import timm
import torch
import numpy as np
import torchmetrics
import torch.nn as nn
import matplotlib.pyplot as plt
import lightning as L

from tqdm import tqdm
from torchvision import transforms
from torch.utils.data import DataLoader
from torchsat.datasets.eurosat import EuroSAT

from torchmetrics import MetricCollection
from torchmetrics.classification import (
    MulticlassAccuracy, MulticlassF1Score, MulticlassAUROC,
)

L.seed_everything(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Загрузка данных

Воспользуемся пакетом [torchsat](https://torchsat.readthedocs.io/en/latest/index.html) для работы с датасетом

Загрузка данных в нем пока не поддерживается :( придется загрузить и распаковать архив самостоятельно:

In [None]:
!mkdir -p /content/eurosat-ms
!wget -q https://edunet.kea.su/repo/EduNet-web_dependencies/datasets/EuroSATallBands.zip
!unzip -q EuroSATallBands.zip -d /content/eurosat-ms
!rm EuroSATallBands.zip

In [None]:
from torchsat.datasets.eurosat import EuroSAT

dataset = EuroSAT(root="/content/eurosat-ms/ds/", mode="AllBand")

Создадим файл `annotation.csv` для разделения объектов на tran/test/val выборки и их фиксации:

In [None]:
from pathlib import Path
import pandas as pd

annotation = []
img_paths = Path('/content/eurosat-ms/ds/images/remote_sensing/otherDatasets/sentinel_2/').glob('**/*.tif')
for item in img_paths:
    row = {}
    row['img_path'] = item
    row['class'] = item.parent.stem
    annotation.append(row)
annotation = pd.DataFrame(annotation)
annotation.head(3)

In [None]:
annotation['class_idx'] = pd.get_dummies(annotation['class']).apply(lambda row: np.argmax(row), axis=1)
class_idx_to_class_name = {class_idx: class_name for class_idx, class_name in enumerate(pd.get_dummies(annotation['class']).columns)}
annotation.head(3)

In [None]:
class_idx_to_class_name

Сделаем разделение на выборки. Уменьшим количество объектов для обучения, чтобы смоделировать ситуацию малого количества данных.

In [None]:
from sklearn.model_selection import train_test_split


x_train, x, y_train, y = train_test_split(
    annotation['img_path'], annotation['class'], test_size=0.98, random_state=42,
    stratify=annotation['class']
)

x_val, x_test, y_val, y_test = train_test_split(
    x, y, test_size=0.9, random_state=42, stratify=y
)
print('train shape: ', x_train.shape)
print('val shape: ', x_val.shape)
print('test shape: ', x_test.shape)

Сохраним аннотации:

In [None]:
annotation.iloc[x_train.index].to_csv('train_eurosat.csv', index=False)
annotation.iloc[x_val.index].to_csv('val_eurosat.csv', index=False)
annotation.iloc[x_test.index].to_csv('test_eurosat.csv', index=False)

Напишем свой `CustomImageDataset`:

In [None]:
from torch.utils.data import Dataset, DataLoader
import tifffile as tff

class CustomImageDataset(Dataset):
    def __init__(
        self,
        annotations_csv,
        transforms=None,
    ):
        self.annotation = pd.read_csv(annotations_csv)
        self.transforms = transforms

    def __len__(self):
        return len(self.annotation)

    def __getitem__(self, idx):
        image = tff.imread(self.annotation.loc[idx, 'img_path'])
        label = self.annotation.loc[idx, 'class_idx']
        if self.transforms:
            image = self.transforms(image)
        label = torch.as_tensor(label).long()
        return image, label

In [None]:
train_set = CustomImageDataset('/content/train_eurosat.csv')

## Предварительный анализ

Посмотрим, что мы скачали

In [None]:
print("Image count: ", len(train_set))
image, label = train_set[0]
print("Type: ", type(image),
      "\nshape", image.shape,
      "\nClass", class_idx_to_class_name[label.item()])

Ага! У нас не 3 канала, как в обычном RGB, а 13! Давайте на них посмотрим

In [None]:
fig, ax = plt.subplots(ncols=13, figsize=(20, 3))
for band, a in enumerate(ax):
    a.imshow(image[:, :, band])
    a.axis("off")
    a.set_title("Band %i" % band)

Что значат эти Bands?

* Band 0 – Coastal aerosol
* Band 1 – Blue
* Band 2 – Green
* Band 3 – Red
* Band 4–6 – Vegetation red edge
* Band 7 – NIR (near infrared range)
* Band 8 – Narrow NIR
* Band 9 – Water vapour
* Band 10–12 – SWIR (short wave infrared spectral range)

Вооружившись этим знанием, посмотрим на картинки снова:

In [None]:
fig, ax = plt.subplots(ncols=7, figsize=(20, 3))

ax[0].imshow(image[:, :, 0], cmap="Greys")
ax[0].set_title("Coastal aerosol")

# get 1 image from 3 channel
b = image[:, :, 1] / image[:, :1].max()  # Normalize at 0 to 1
g = image[:, :, 2] / image[:, :2].max()
r = image[:, :, 3] / image[:, :3].max()
rgb = np.stack((r, g, b))
rgb = np.moveaxis(rgb, [0, 1, 2], [2, 0, 1])  # 3,64,64 -> 64,64,3 like permute(1,2,0)

ax[1].imshow(rgb)
ax[1].set_title("RGB")

ax[2].imshow(image[:, :, 4:6].sum(axis=2), cmap="inferno")
ax[2].set_title("Vegitation")

ax[3].imshow(image[:, :, 7], cmap="inferno")
ax[3].set_title("NIR")

ax[4].imshow(image[:, :, 8], cmap="inferno")
ax[4].set_title("NIR Narrow")

ax[5].imshow(image[:, :, 9], cmap="Blues")
ax[5].set_title("Water vapour")

ax[6].imshow(image[:, :, 10:].sum(axis=2), cmap="inferno")
ax[6].set_title("SWIR")
for a in ax:
    a.axis("off")

plt.show()

## Подготовка к обучению

Посчитаем среднее и стандартное отклонение, чтобы нормализовать данные. Данных много, и они могут не поместиться в память целиком. Придется считывать частями:






Считать std как среднее от стандартных отклонений по batch-ам не вполне корректно. Поэтому используем [альтернативный способ](https://stackoverflow.com/questions/10365119/mean-value-and-standard-deviation-of-a-very-huge-data-set).

Дисперсия:

$ Var(X) = E[X^{2}] - (E[X])^2$

$ std = \sqrt{Var}$

Для него потребуется хранить сумму квадратов всех заначений, что теоретически может привести к переполнению.

In [None]:
from tqdm.notebook import tqdm

mean_on_img = [train_set[idx][0].mean(axis=(0, 1)) for idx in tqdm(range(len(train_set)))]
mean_squared_on_img = np.power(mean_on_img, 2)
squared_mean = np.mean(mean_squared_on_img, axis=0)

mean = np.mean(mean_on_img, axis=0)
std = np.sqrt(squared_mean - mean**2)



Dataloader не сможет преобразовать uint16 numpy массив к тензору и выдаст ошибку. Стандартный ToTensor рассчитан на работу с картинками, где значения яркости находятся в интервале 0 .. 255, и тоже не сработает. Придется добавить к датасету трансформацию, которая преобразует значения во float.

In [None]:
from torchvision import transforms

all_transforms = transforms.Compose([
    transforms.Lambda(lambda np_arr: np_arr.astype(np.float32)),
    transforms.ToTensor(),
    transforms.Normalize(mean, std),
])

train_set = CustomImageDataset('/content/train_eurosat.csv', transforms=all_transforms)
val_set = CustomImageDataset('/content/val_eurosat.csv', transforms=all_transforms)
test_set = CustomImageDataset('/content/test_eurosat.csv', transforms=all_transforms)

Нам не всегда нужны одинаковые `transforms` для каждой подвыборки,  поэтому в `torchvision`  можно добавлять `transforms` в уже созданый [список](https://discuss.pytorch.org/t/cannot-combine-compose-transforms/32157/5)


Инициализируем загрузчики

In [None]:
train_loader = DataLoader(train_set, batch_size=256, shuffle=True, num_workers=2)
val_loader = DataLoader(val_set, batch_size=256, shuffle=False, num_workers=2)
test_loader = DataLoader(test_set, batch_size=256, shuffle=False, num_workers=2)

##  Обучите сеть



In [None]:
class LModel(L.LightningModule):
    def __init__(self, model, lr=0.001, gamma=0.9):
        super().__init__()
        self.save_hyperparameters(logger=False)

        # for optimizer and shaduler
        self.lr = lr
        self.gamma = gamma

        # model
        self.model = model
        self.criterion = nn.CrossEntropyLoss()

        # metrics
        self.metrics = MetricCollection([
            MulticlassAccuracy(num_classes=10,),
            MulticlassF1Score(num_classes=10,),
        ])
        self.train_metrics = self.metrics.clone(postfix='/train')
        self.val_metrics = self.metrics.clone(postfix='/val')

    def configure_optimizers(self):
        # set optimizer
        optimizer = torch.optim.AdamW(
            self.model.parameters(),
            lr=self.lr,
        )
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer,
        )
        return {
            "optimizer": optimizer,
            "lr_scheduler": {
                "scheduler": scheduler,
                "interval": "epoch",  # or 'step'
                "monitor": "loss" # only for self.log
            },
        }

    def training_step(self, batch, batch_idx):
        x, y = batch
        out = self.model(x)
        loss = self.criterion(out, y)
        self.train_metrics.update(out.softmax(-1), y)
        self.log("loss", loss, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        out = self.model(x)
        self.val_metrics.update(out.softmax(-1), y)

    def on_train_epoch_end(self):
        self.log_dict(self.train_metrics.compute())
        self.train_metrics.reset()

        self.log_dict(self.val_metrics.compute())
        self.val_metrics.reset()

    def test_step(self, batch, batch_idx):
        x, y = batch
        out = self.model(x)
        self.metrics.update(out.softmax(-1), y)

    def on_test_epoch_end(self):
        self.log_dict(self.metrics.compute())
        self.metrics.reset()

In [None]:
import timm

model = timm.create_model(
    "efficientnet_lite0.ra_in1k", pretrained=False, num_classes=10, in_chans=13
)

In [None]:
pl_model = LModel(model)
trainer = L.Trainer(
    max_epochs=3,
    num_sanity_val_steps=0,
    log_every_n_steps=10,
    logger=L.pytorch.loggers.TensorBoardLogger(save_dir="./final_log/"),
)

trainer.fit(
    model=pl_model,
    train_dataloaders=train_loader,
    val_dataloaders=val_loader
)

In [None]:
%load_ext tensorboard

In [None]:
%tensorboard --logdir final_log/lightning_logs

##  Оцените точность

Оцените точность своей модели на `test_set`

In [None]:
trainer.test(model=pl_model, dataloaders=[test_loader])

# Задание 2. Transfer learning

Теперь используйте технику `transfer learning` и дообучите предобученную сеть `efficientnet_lite0.ra_in1k`.

* Загрузите предобученную сеть
* Заморозьте параметры и обучите новый слой. Оцените качество на тесте
* Разморозьте параметры и поучите уже всю сеть. Оцените качество на тесте

## Формат результата


* Значение метрики на тестовом датасете


In [None]:
import timm

model = timm.create_model(
    "efficientnet_lite0.ra_in1k", pretrained=True, num_classes=10, in_chans=13
)

In [None]:
for param in list(model.named_parameters())[:-2]:
    param[1].requires_grad = False

In [None]:
for name, param in model.named_parameters():
    print(name, "\t", param[1].requires_grad)

In [None]:
pl_model = LModel(model)
trainer = L.Trainer(
    max_epochs=10,
    num_sanity_val_steps=0,
    log_every_n_steps=10,
    logger=L.pytorch.loggers.TensorBoardLogger(save_dir="./final_log/"),
)

trainer.fit(
    model=pl_model,
    train_dataloaders=train_loader,
    val_dataloaders=val_loader
)

In [None]:
trainer.test(model=pl_model, dataloaders=[test_loader])

In [None]:
for param in list(model.named_parameters()):
    param[1].requires_grad = True

In [None]:
pl_model = LModel(model)
trainer = L.Trainer(
    max_epochs=3,
    num_sanity_val_steps=0,
    log_every_n_steps=10,
    logger=L.pytorch.loggers.TensorBoardLogger(save_dir="./final_log/"),
)

trainer.fit(
    model=pl_model,
    train_dataloaders=train_loader,
    val_dataloaders=val_loader
)

In [None]:
trainer.test(model=pl_model, dataloaders=[test_loader])



# Задание 3. Регрессия


В этом задании вам нужно предсказать возраст человека по его фотографии.

## Формат результата


* Значение метрики на тестовом датасете


Датасет: http://yanweifu.github.io/FG_NET_data/

In [None]:
!mkdir -p /content/fg_net_data
!wget -nc  http://yanweifu.github.io/FG_NET_data/FGNET.zip
!unzip -q FGNET.zip -d /content/fg_net_data
!rm FGNET.zip

In [None]:
from pathlib import Path
import pandas as pd


annotation = []
img_paths = Path('/content/fg_net_data/FGNET/images').glob('**/*.JPG')
for item in img_paths:
    row = {}
    row['img_path'] = item
    row['age'] = np.float32(str(item).split('/')[-1][4:6])
    annotation.append(row)
annotation = pd.DataFrame(annotation)
annotation.head(3)

In [None]:
from sklearn.model_selection import train_test_split


x_train, x, y_train, y = train_test_split(
    annotation['img_path'], annotation['age'], test_size=0.4, random_state=42
)

x_val, x_test, y_val, y_test = train_test_split(
    x, y, test_size=0.4, random_state=42
)
print('train shape: ', x_train.shape)
print('val shape: ', x_val.shape)
print('test shape: ', x_test.shape)

In [None]:
annotation.age.hist()
plt.show()

In [None]:
import numpy as np

annotation['age_transformed'] = np.log1p(annotation.age.values)
annotation.age_transformed.hist()
plt.show()

In [None]:
annotation.iloc[x_train.index].to_csv('train_fg_net.csv', index=False)
annotation.iloc[x_val.index].to_csv('val_fg_net.csv', index=False)
annotation.iloc[x_test.index].to_csv('test_fg_net.csv', index=False)

In [None]:
train_data = pd.read_csv('/content/train_fg_net.csv')
val_data = pd.read_csv('/content/val_fg_net.csv')
test_data = pd.read_csv('/content/test_fg_net.csv')

train_data.head(3)

In [None]:
from sklearn.preprocessing import MinMaxScaler


scaler = MinMaxScaler()
scaler.fit(train_data['age_transformed'].values.reshape(-1, 1))

train_data['age_scaled'] = scaler.transform(train_data['age_transformed'].values.reshape(-1, 1))
val_data['age_scaled'] = scaler.transform(val_data['age_transformed'].values.reshape(-1, 1))
test_data['age_scaled'] = scaler.transform(test_data['age_transformed'].values.reshape(-1, 1))

train_data.to_csv('train_fg_net.csv', index=False)
val_data.to_csv('val_fg_net.csv', index=False)
test_data.to_csv('test_fg_net.csv', index=False)

train_data.head(3)

In [None]:
from torch.utils.data import Dataset, DataLoader
from PIL import Image

class CustomImageDataset(Dataset):
    def __init__(
        self,
        annotations_csv,
        transforms=None,
        target='age', # age/age_transformed/age_scaled
    ):
        self.annotation = pd.read_csv(annotations_csv)
        self.transforms = transforms
        self.target = target

    def __len__(self):
        return len(self.annotation)

    def __getitem__(self, idx):
        image = Image.open(self.annotation.loc[idx, 'img_path'])
        age = self.annotation.loc[idx, self.target]

        # 1-channel to 3-channel
        if image.mode != 'RGB':
            image = image.convert('RGB')

        if self.transforms:
            image = self.transforms(image)

        age = torch.as_tensor(age).float()
        return image, age

In [None]:
train_set = CustomImageDataset('/content/train_fg_net.csv')

In [None]:
print("Image count: ", len(train_set))
image, label = train_set[3]
print("Type: ", type(image),
      "\nsSize", image.size,
      "\nAge", label.item())

In [None]:
import matplotlib.pyplot as plt

plt.rcParams["figure.figsize"] = (15, 10)

def show(img, label_1, num, label_2=""):
    ax = plt.subplot(1, 6, num + 1)
    plt.imshow(img)
    plt.title("Age: "+str(label_1))
    ax.set_xlabel(label_2)
    plt.axis("off")


for i in range(6, 12):
    img, label = train_set[i * 6]
    show(img, int(label.item()), i - 6)

In [None]:
from tqdm.notebook import tqdm

mean_on_img = torch.stack(
    [torch.as_tensor(np.asarray(train_set[idx][0]).mean(axis=(0, 1))) for idx in tqdm(range(len(train_set)))]
)
mean_squared_on_img = torch.pow(mean_on_img, 2)
squared_mean = mean_squared_on_img.mean(axis=0)

mean_1 = mean_on_img.mean(axis=0)
std_1 = np.sqrt(squared_mean - mean_1**2)

In [None]:
from torchvision import transforms

img_transforms = transforms.Compose([
    transforms.Resize((256, 256)), # LongestMaxSize and PadIfNeeded
    transforms.ToTensor(),
    transforms.Normalize(mean_1, std_1),
])

train_set = CustomImageDataset('/content/train_fg_net.csv', transforms=img_transforms, target='age_scaled')
val_set = CustomImageDataset('/content/val_fg_net.csv', transforms=img_transforms, target='age_scaled')
test_set = CustomImageDataset('/content/test_fg_net.csv', transforms=img_transforms, target='age_scaled')

In [None]:
train_loader = DataLoader(train_set, batch_size=32, shuffle=True, num_workers=2)
val_loader = DataLoader(val_set, batch_size=32, shuffle=False, num_workers=2)
test_loader = DataLoader(test_set, batch_size=32, shuffle=False, num_workers=2)

In [None]:
from torchmetrics.regression import MeanSquaredError


class LModel(L.LightningModule):
    def __init__(self, model, lr=0.001, gamma=0.9, scaler=None):
        super().__init__()
        self.scaler = scaler
        self.save_hyperparameters(logger=False)

        # for optimizer and shaduler
        self.lr = lr
        self.gamma = gamma

        # model
        self.model = model
        self.criterion = nn.HuberLoss() #nn.MSELoss()

        # metrics
        self.metrics = MetricCollection([
            MeanSquaredError(squared=False),
        ])
        self.train_metrics = self.metrics.clone(postfix='/train')
        self.val_metrics = self.metrics.clone(postfix='/val')

    def configure_optimizers(self):
        # set optimizer
        optimizer = torch.optim.AdamW(
            self.model.parameters(),
            lr=self.lr,
        )
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer,
        )
        return {
            "optimizer": optimizer,
            "lr_scheduler": {
                "scheduler": scheduler,
                "interval": "epoch",  # or 'step'
                "monitor": "loss" # only for self.log
            },
        }

    def training_step(self, batch, batch_idx):
        x, y = batch
        out = self.model(x)
        loss = self.criterion(out, y.unsqueeze(1))
        self.train_metrics.update(out, y.unsqueeze(1))
        self.log("loss", loss, prog_bar=True, on_epoch=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        out = self.model(x)
        self.val_metrics.update(out, y.unsqueeze(1))

    def on_train_epoch_end(self):
        self.log_dict(self.train_metrics.compute())
        self.train_metrics.reset()

        self.log_dict(self.val_metrics.compute())
        self.val_metrics.reset()

    def test_step(self, batch, batch_idx):
        x, y = batch
        out = self.model(x)
        if scaler:
            y = self.scaler.inverse_transform(y.cpu().numpy().reshape(-1, 1))
            out = self.scaler.inverse_transform(out.cpu().numpy().reshape(-1, 1))
            y, out = torch.as_tensor(y).reshape(-1, 1).to(self.device), torch.as_tensor(out).reshape(-1, 1).to(self.device)
        self.metrics.update(out, y)

    def on_test_epoch_end(self):
        self.log_dict(self.metrics.compute())
        self.metrics.reset()

In [None]:
import timm
L.seed_everything(42)


model = timm.create_model(
    "tf_efficientnet_b0", pretrained=True, num_classes=1)

In [None]:
L.seed_everything(42)

pl_model = LModel(model, scaler=scaler)
trainer = L.Trainer(
    max_epochs=20,
    log_every_n_steps=19,
    logger=L.pytorch.loggers.TensorBoardLogger(save_dir="./final_log/"),
)

trainer.fit(
    model=pl_model,
    train_dataloaders=train_loader,
    val_dataloaders=val_loader
)

In [None]:
%load_ext tensorboard

In [None]:
%tensorboard --logdir final_log/lightning_logs

In [None]:
trainer.test(model=pl_model, dataloaders=[test_loader])

In [None]:
# get batch
imgs, labels = next(iter(test_loader))
print("imgs shape: ", imgs.shape)
pred = model(imgs)
print("pred shape: ", pred.shape)
# inverse transform
predict = scaler.inverse_transform(pred.cpu().detach().numpy().reshape(-1, 1))
out = scaler.inverse_transform(labels.cpu().detach().numpy().reshape(-1, 1))

predict = np.round(np.exp(predict)-1).reshape(1, -1)[0]
out = np.round(np.exp(out)-1).reshape(1, -1)[0]

In [None]:
def imshow(image):
  npimg = image.numpy()
  npimg = np.transpose(npimg, (1,2,0))
  npimg = ((npimg * std_1.numpy()) + mean_1.numpy())
  plt.imshow(npimg, interpolation='nearest')

In [None]:
plt.figure(figsize=(25.0, 25.0))
for i in range(10):
    img = imgs[i]
    plt.subplot(1, 10, i + 1)
    plt.title(
        "pred: " + str(int(predict[i])) + " real: " + str(int(out[i]))
    )  # predicted and real values
    imshow(img)
    plt.axis("off")
plt.show()

# Задание 4. CustomResNet

Требуется самостоятельно реализовать свёрточную сеть на базе архитектуры ResNet-18, описанной в статье [Deep Residual Learning for Image Recognition](https://arxiv.org/abs/1512.03385).

Создайте свою собственную сеть на основе архитектуры ResNet, описанной в лекции. От 15  до 25 слоев.

Используйте заготовки классов `CustomResnet`, `BasicBlock`.

Прежде чем приступать к работе, просмотрите оригинальную [статью](https://arxiv.org/pdf/1512.03385.pdf).

*Не допускается целиком копировать код из исходников [PyTorch](https://github.com/pytorch/vision/blob/master/torchvision/models/resnet.py).

**Если вы используете готовый фрагмент кода, то должны быть приведены ссылка на источник и комментарии.


## Формат результата

* Графики loss и accuracy при обучении

<img src = "https://edunet.kea.su/repo/EduNet-web_dependencies/dev-2.0/Exercises/EX08/result_2_task_ex08.png" width="1200">

* Значение accuracy на тесте должно составить не менее 0.77
* Сравнительный анализ результатов точности вашей модели и библиотечной версии.

Установка и импорт необходимых библиотек:

In [None]:
!pip install -q timm lightning torchmetrics

In [None]:
import timm
import torch
import torchmetrics
import torch.nn as nn
import lightning as pl

from torchsummary import summary
from torchvision.models import resnet18
from torch.utils.data import DataLoader
from torchvision import models, datasets, transforms

## Структура модели

Прежде чем создавать собственную сеть, посмотрите архитектуру ResNet-18 из "Зоопарка моделей" PyTorch. Для этого используйте пакет [torchsummary](https://pypi.org/project/torch-summary/) или метод [add_graph](https://pytorch.org/docs/stable/tensorboard.html?highlight=add_graph#torch.utils.tensorboard.writer.SummaryWriter.add_graph) для TensorBoard.

Мы отказались от использования TB, и эталонное решение не должно использовать этот инструмент. Поэтому ограничимся torchsummary  :(

In [None]:
# Your code here

from torchsummary import summary
from torchvision.models import resnet18

resnet_original = resnet18()
summary(resnet_original, (3, 32, 32), device="cpu")

Визуализировав библиотечную модель, можно заметить, что пространственные размеры тензора на выходе из последнего сверточного блока 7×7.

Если подавать в такую сеть картинки, которые в 7 раз меньше (CIFAR-10 вместо ImageNet), то этот тензор схлопнется в вектор.

Из этого можно сделать вывод о необходимости отключить слои, которые содержат свертки с большим шагом в начале сети.

## Загрузка данных

Блок кода, отвечающий за загрузку данных.
Можно использовать без внесения изменений.

In [None]:
# Load and preprocess the data. Don't change this code

# https://github.com/facebookarchive/fb.resnet.torch/issues/180
cifar10_mean = (0.491, 0.482, 0.447)
cifar10_std = (0.247, 0.244, 0.262)


# Data preprocessing
transform = transforms.Compose(
    [
        transforms.ToTensor(),  # PIL Image to Pytorch tensor [0..255]->[0 .. 1]
        transforms.Normalize(
            cifar10_mean, cifar10_std
        ),  # https://pytorch.org/docs/stable/torchvision/transforms.html?highlight=transforms%20normalize#torchvision.transforms.Normalize
    ]
)

cifar_train = datasets.CIFAR10(
    "content", train=True, transform=transform, download=True
)
train_set, val_set = torch.utils.data.random_split(cifar_train, [45000, 5000])

Рекомендуется делать всю отладку на небольших фрагментах датасета, для чего создать объекты класса `Subset` и  соответствующие Dataloader-объекты.


```
mini_trainset , _ = torch.utils.data.random_split(trainset, [5000, 45000])
mini_testset, _  = torch.utils.data.random_split(testset, [1000, 9000])
...

```



## Блок кода для обучения

В него можно вносить изменения, как минимум менять гиперпараметры.

In [None]:
import lightning as pl
import torchmetrics


class LightningModel(pl.LightningModule):
    def __init__(self, model):
        super().__init__()
        self.model = model
        self.criterion = nn.CrossEntropyLoss()
        self.train_acc = torchmetrics.Accuracy(task="multiclass", num_classes=10)
        self.valid_acc = torchmetrics.Accuracy(task="multiclass", num_classes=10)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.model.parameters())
        return optimizer

    def training_step(self, batch, batch_idx):
        x, y = batch
        out = self.model(x)
        loss = self.criterion(out, y)
        self.train_acc.update(out, y)
        self.log("loss/train", loss.detach().item())
        return loss

    def on_train_epoch_end(self):
        self.log("accuracy/train", self.train_acc.compute(), prog_bar=True)
        self.train_acc.reset()

    def validation_step(self, batch, batch_idx):
        x, y = batch
        out = self.model(x)
        self.valid_acc.update(out, y)

    def on_validation_epoch_end(self):
        self.log("accuracy/val", self.valid_acc.compute(), prog_bar=True)
        self.valid_acc.reset()

## Основная часть задания

Создайте свою собственную сеть на основе архитектуры ResNet, описанной в лекции. От 15  до 25 слоев.

Используйте заготовки классов `CustomResnet`, `BasicBlock`.

Прежде чем приступать к работе, просмотрите оригинальную [статью](https://arxiv.org/pdf/1512.03385.pdf).

Не допускается копировать код из исходников [PyTorch](https://github.com/pytorch/vision/blob/master/torchvision/models/resnet.py).

Если вы используете готовый фрагмент кода, то должна быть приведена ссылка на источник и комментарии.

Цель — добиться точности > 0.76.
При разумной архитектуре и гиперпараметрах для этого достаточно 20 эпох.

In [None]:
class CustomResnet(nn.Module):
    def __init__(self, class_nums=10):
        super(CustomResnet, self).__init__()
        # Your code here
        self.core = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(16),
            BasicBlock(16, False),
            BasicBlock(16, True),  # 32x16x16
            BasicBlock(32, False),
            BasicBlock(32, True),  # 64x8x8
            BasicBlock(64, False),
            BasicBlock(64, False),  # 64x8x8
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Linear(64, 10),
        )

    def forward(self, batch):
        return self.core(batch)


class BasicBlock(nn.Module):
    def __init__(self, in_channels, downsample):  # You can add params here
        super(BasicBlock, self).__init__()
        # Your code here
        self.downsamlpe = downsample
        if self.downsamlpe:
            out_channels = in_channels * 2
            stride = 2
            # according to article option 2
            self.downsample_layer = nn.Conv2d(
                in_channels, out_channels, kernel_size=1, stride=2
            )
        else:
            out_channels = in_channels
            stride = 1
            self.downsample_layer = None

        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, in_channels, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(in_channels),
            nn.Conv2d(
                in_channels, out_channels, kernel_size=3, stride=stride, padding=1
            ),
            nn.ReLU(),
            nn.BatchNorm2d(out_channels),
        )

    def forward(self, x):
        identity = x
        x = self.conv(x)
        if self.downsamlpe:
            identity = self.downsample_layer(identity)
        assert identity.shape == x.shape
        return x + identity

## Обучите вашу модель на CIFAR-10

Не забудьте вернуть в датасет данные, если вы удаляли их для ускорения отладки.

Оптимизатор, количество эпох, шаг обучения, критерий останова выберите на свое усмотрение.

Цель — добиться точности, лучшей, чем в прошлом задании.

In [None]:
train_loader = DataLoader(train_set, batch_size=128, num_workers=2, shuffle=True)
val_loader = DataLoader(val_set, batch_size=128, num_workers=2, shuffle=False)

In [None]:
import timm

model = CustomResnet()
lit_model = LightningModel(model)
trainer = pl.Trainer(max_epochs=20)
trainer.fit(model=lit_model, train_dataloaders=train_loader, val_dataloaders=val_loader)

Проверьте качество на тестовой части датасета.

In [None]:
cifar_test = datasets.CIFAR10(
    "content", train=False, transform=transform, download=True
)
test_loader = DataLoader(cifar_test, batch_size=256, shuffle=False)

test_metric = torchmetrics.Accuracy(task="multiclass", num_classes=10)
for batch in test_loader:
    x, y = batch
    out = lit_model.model(x)
    test_metric.update(out, y)

print(f"Accuracy on TEST {test_metric.compute().item():.2f}")

## Обучите ResNet-18

Теперь обучите ResNet-18  из `torchvision.models` на том же CIFAR-10.
Используйте непредобученную модель (`weights = None`)

Для обучения на CIFAR-10 потребуется изменить параметры линейного слоя на выходе модели, так как в этом датасете 10 классов, а не 1000, как в ImageNet.

Пример того, как можно подменить слой, можно найти [здесь](https://github.com/Gan4x4/CV-HSE2019/blob/master/helloworld/Change_model_structure.ipynb)

После завершения обучения сравните точность вашей модели с точностью ResNet-18  из `torchvision.models`, а также с точностью, полученной авторами статьи на CIFAR-10:


<img src = "https://edunet.kea.su/repo/EduNet-web_dependencies/dev-2.0/Exercises/EX08/resnet_accuracy.png" width="600">

In [None]:
from torchvision.models import resnet18

resnet_original = resnet18()
print(resnet_original.fc)
resnet_original.fc = nn.Linear(in_features=512, out_features=10, bias=True)
print(resnet_original.fc)

Заменим последний слой, отвечающий за классификацию, линейным слоем с 10-ю выходами

In [None]:
lit_model = LightningModel(resnet_original)
trainer = pl.Trainer(max_epochs=10)
trainer.fit(model=lit_model, train_dataloaders=train_loader, val_dataloaders=val_loader)

Оценим точность библиотечной модели на test

In [None]:
test_metric = torchmetrics.Accuracy(task="multiclass", num_classes=10)

for batch in test_loader:
    x, y = batch
    out = resnet_original(x)
    test_metric.update(out, y)

print(f"Accuracy on TEST {test_metric.compute().item():.2f}")

## Вывод:

Сравнительный анализ результатов точности вашей модели и библиотечной версии.

...

Так как библиотечная модель рассчитана на работу с большими картинками из ImageNet (224×224), вполне естественно, что она плохо работает на крошечных картинках из CIFAR-10, которые сильно сжимаются на первых слоях.
