# **Deep Hedging**
# Buchkov Viacheslav

In [1]:
import abc
import math
import os
import random
import sys
from functools import lru_cache
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader
from tqdm import tqdm

# You may add any imports you need
from torch.cuda.amp import GradScaler

RANDOM_SEED = 12

In [2]:
def seed_everything(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True


seed_everything(RANDOM_SEED)

In [3]:
N_DAYS = 5
PATH = Path("data")

In [4]:
data = pd.read_pickle(PATH / "data.pkl")
data["rub_rate"] = data["rub_rate"] / 100
data.dropna(inplace=True)
data

In [5]:
from src.base.instrument import Instrument
from src.forward.forward import Forward


def create_instrument(period_df: pd.DataFrame) -> Instrument:
    start = period_df.loc[period_df.index.min()]
    return Forward(
        rates_difference=start["rub_rate"] - start["usd_rate"],
        spot_price=start["ask"],
        term=N_DAYS / 365,
    )

In [6]:
import datetime as dt

start_date = data.index.min()
end_date = start_date + dt.timedelta(days=N_DAYS)
data[(data.index >= data.index.min()) & (data.index <= end_date)]

In [7]:
fwd = create_instrument(
    data[(data.index >= data.index.min()) & (data.index <= end_date)]
)
fwd

In [8]:
start_date = data.index.min()
end_date = start_date + dt.timedelta(days=N_DAYS)
i = 1

feature_data = []
target_data = []
while end_date < data.index.max():
    features = data[(data.index >= start_date) & (data.index <= end_date)]
    target = create_instrument(features).payoff(spot=features.ask.iloc[-1])
    print(create_instrument(features), features.ask.iloc[-1])

    feature_data.append(features.to_numpy())
    target_data.append(target)

    start_date = data.index[i]
    end_date = start_date + dt.timedelta(days=N_DAYS)
    i += 1

    if i > 10:
        break

target_data

## Dataset.

In [70]:
from typing import Union, Type
from torch.utils.data import Dataset

from src.base.instrument import Instrument


class SpotDataset(Dataset):
    BID_COLUMN = "bid"
    ASK_COLUMN = "ask"
    RATE_DOMESTIC_COLUMN = "rub_rate"
    RATE_FOREIGN_COLUMN = "usd_rate"

    def __init__(
        self,
        instrument_cls: Type[Instrument],
        n_days: int = N_DAYS,
        path: Path = PATH,
        data: Union[pd.DataFrame, None] = None,
    ):
        self.instrument_cls = instrument_cls
        self.n_days = n_days

        self.data = self._create_df(path) if data is None else data
        # self.data = self.data.dropna()
        self.data = self.data.fillna(method="ffill")
        # self.X, self.y = self._create_dataset()

    @staticmethod
    def _create_df(path: Path) -> pd.DataFrame:
        if "data.pkl" in os.listdir(path):
            return pd.read_pickle(PATH / "data.pkl")
        else:
            pass

    # def _create_dataset(self) -> tuple[list[np.array], list[float]]:
    #     start_date = self.data.index.min()
    #     end_date = start_date + dt.timedelta(days=self.n_days)
    #     i = 1
    #
    #     feature_data = []
    #     target_data = []
    #     print("Creating dataset...")
    #     while end_date < self.data.index.max():
    #         features = self.data[(self.data.index >= start_date) & (self.data.index <= end_date)]
    #         target = self._create_instrument(features).payoff(spot=features.ask.iloc[-1])
    #
    #         feature_data.append(features.to_numpy())
    #         target_data.append(target)
    #
    #         start_date = self.data.index[i]
    #         end_date = start_date + dt.timedelta(days=self.n_days)
    #         i += 1
    #
    #     return feature_data, target_data

    def _create_instrument(self, period_df: pd.DataFrame) -> Instrument:
        start = period_df.loc[period_df.index.min()]
        return self.instrument_cls(
            rates_difference=start["rub_rate"] - start["usd_rate"],
            spot_price=start["ask"],
            term=N_DAYS / 365,
        )

    def __len__(self):
        return len(
            self.data[
                self.data.index < self.data.index.max() - dt.timedelta(days=self.n_days)
            ]
        )

    def __getitem__(self, idx: int):
        start_date = self.data.index[idx]
        end_date = start_date + dt.timedelta(days=self.n_days)

        features = self.data[
            (self.data.index >= start_date) & (self.data.index <= end_date)
        ]
        target = self._create_instrument(features).payoff(spot=features.ask.iloc[-1])

        return features.astype(float).to_numpy(), np.array([target])

In [71]:
spot_dataset = SpotDataset(instrument_cls=Forward)

In [72]:
spot_dataset[0]

In [73]:
loader = DataLoader(spot_dataset, batch_size=32, shuffle=False, drop_last=True)

In [74]:
for feature, target in loader:
    print(feature)
    # print(target)
    break

In [76]:
feature.dtype, target.dtype

In [52]:
feature.shape

In [54]:
SEQUENCE_LENGTH = feature.shape[1]
SEQUENCE_LENGTH

In [53]:
target.shape

In [None]:
# import pickle
#
# with open('spot_dataset.pickle', 'wb') as handle:
#     pickle.dump(spot_dataset, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [17]:
from tqdm import tqdm
from torch.utils.data import DataLoader


def train_epoch(
    model: nn.Module,
    optimizer: torch.optim.Optimizer,
    criterion: nn.Module,
    loader: DataLoader,
    tqdm_desc: str = "Model",
):
    device = next(model.parameters()).device

    if tqdm_desc is None:
        iterator = loader
    else:
        iterator = tqdm(loader, desc=tqdm_desc)

    train_loss = 0.0

    model.train()
    hidden = None
    for features, labels in iterator:
        optimizer.zero_grad()

        features = features.to(device)
        pred, hidden = model(features, hidden=hidden, return_hidden=True)
        hidden = [h_t.detach() for h_t in hidden]

        loss = criterion(pred, labels.to(device))
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    train_loss /= len(loader.dataset)

    return train_loss


@torch.no_grad()
def validation_epoch(
    model: nn.Module,
    criterion: nn.Module,
    loader: DataLoader,
    tqdm_desc: [str, None] = None,
):
    device = next(model.parameters()).device

    if tqdm_desc is None:
        iterator = loader
    else:
        iterator = tqdm(loader, desc=tqdm_desc)

    val_loss = 0.0
    preds = []
    model.train()
    hidden = None
    for features, labels in iterator:
        features = features.to(device)
        pred, hidden = model(features, hidden=hidden, return_hidden=True)

        loss = criterion(pred, labels.to(device))

        val_loss += loss.item()

        preds.append(pred.detach().cpu().numpy())

    val_loss /= len(loader.dataset)

    return preds

In [18]:
from typing import Tuple, List, Optional, Any


def train(
    model: nn.Module,
    optimizer: torch.optim.Optimizer,
    scheduler: Optional[Any],
    train_loader: DataLoader,
    val_loader: DataLoader,
    num_epochs: int,
    print_logs: bool = True,
) -> Tuple[List[float], List[float]]:
    train_losses, val_losses = [], []
    criterion = nn.MSELoss()

    for epoch in range(1, num_epochs + 1):
        if print_logs:
            desc_train = f"Training {epoch}/{num_epochs}"
            desc_val = f"Validation {epoch}/{num_epochs}"
        else:
            desc_train, desc_val = None, None

        train_loss = train_epoch(
            model, optimizer, criterion, train_loader, tqdm_desc=desc_train
        )
        val_loss = validation_epoch(model, criterion, val_loader, tqdm_desc=desc_val)

        if scheduler is not None:
            scheduler.step()

        train_losses += [train_loss]
        val_losses += [val_loss]

    return train_losses, val_losses

In [77]:
loader = DataLoader(spot_dataset, batch_size=1, shuffle=False, drop_last=True)
for feature, target in loader:
    break

In [78]:
feature.shape

In [102]:
# class Hedger(nn.Module):
#     def __init__(self, input_size: int = 4, hidden_size: int = 64):
#         super().__init__()
#         self.input_size = input_size
#         self.hidden_size = hidden_size
#         self.lstm = nn.LSTMCell(input_size, self.hidden_size)
#
#         self.hedging_weights = nn.Sequential(
#             nn.Linear(self.hidden_size, self.hidden_size),
#             nn.ReLU(),
#             nn.Linear(self.hidden_size, self.hidden_size),
#             nn.ReLU(),
#             nn.Linear(self.hidden_size, 1)
#         )
#
#     def linear(self, spot: torch.Tensor, hidden: [(torch.Tensor), None] = None,
#                 predict_length: int = 1, return_hidden: bool = False) -> [torch.Tensor,
#                                                                           (torch.Tensor, torch.Tensor, torch.Tensor)]:
#         model_device = spot.device
#         outputs = []
#         if hidden is None:
#             h_t = torch.zeros(spot.size(0), self.hidden_size, dtype=torch.float32).to(model_device)
#             c_t = torch.zeros(spot.size(0), self.hidden_size, dtype=torch.float32).to(model_device)
#         elif len(hidden) != 2:
#             raise ValueError(f"Expected two hidden state variables, got {len(hidden)}")
#         else:
#             h_t, c_t = hidden
#
#         i = 0
#         for input_t in spot.chunk(spot.size(1), dim=1):
#             input_t = input_t.squeeze(0)
#             h_t, c_t = self.lstm(input_t, (h_t, c_t))
#
#             if i > 0:
#                 output = self.hedging_weights(h_t)
#                 outputs.append(output)
#
#             i += 1
#
#         outputs = torch.stack(outputs, 1).squeeze(2)
#         if return_hidden:
#             return outputs, (h_t, c_t)
#         else:
#             return outputs

In [103]:
# hedger = Hedger()
# hedger(feature.to(torch.float32)).shape

In [119]:
class Hedger(nn.Module):
    def __init__(self, input_size: int = 4, num_layers: int = 1, hidden_size: int = 64):
        super().__init__()
        self.input_size = input_size
        self.num_layers = num_layers
        self.hidden_size = hidden_size

        self.lstm = nn.LSTM(
            input_size, self.hidden_size, num_layers=num_layers, batch_first=True
        )

        self.hedging_weights = nn.Sequential(
            nn.Linear(self.hidden_size, self.hidden_size),
            nn.ReLU(),
            nn.Linear(self.hidden_size, self.hidden_size),
            nn.ReLU(),
            nn.Linear(self.hidden_size, 1),
        )

    def forward(
        self,
        spot: torch.Tensor,
        hidden: [(torch.Tensor), None] = None,
        return_hidden: bool = False,
    ) -> [torch.Tensor, (torch.Tensor, torch.Tensor, torch.Tensor)]:
        model_device = spot.device
        if hidden is None:
            h_t = torch.zeros(
                spot.size(0), self.num_layers, self.hidden_size, dtype=torch.float32
            ).to(model_device)
            c_t = torch.zeros(
                spot.size(0), self.num_layers, self.hidden_size, dtype=torch.float32
            ).to(model_device)
        elif len(hidden) != 2:
            raise ValueError(f"Expected two hidden state variables, got {len(hidden)}")
        else:
            h_t, c_t = hidden

        h_t, c_t = self.lstm(spot, (h_t, c_t))
        outputs = self.hedging_weights(h_t)[:, 1:-1, :].squeeze(2)

        if return_hidden:
            return outputs, (h_t, c_t)
        else:
            return outputs

    def get_pnl(self, spot: torch.Tensor) -> torch.float32:
        hedging_weights = nn.Softmax(self.forward(spot, return_hidden=False))
        pnl = hedging_weights @ spot[1:].T

In [120]:
hedger = Hedger()
weights = hedger(feature.to(torch.float32))
weights.shape

In [121]:
feature.shape

In [122]:
weights_all = torch.concat(
    [
        torch.zeros(feature.shape[0], 1, requires_grad=False),
        weights,
        torch.zeros(feature.shape[0], 1, requires_grad=False),
    ],
    dim=1,
)
weights_all.shape

In [123]:
weights_diff = weights_all.diff(n=1, dim=1)
weights_diff.shape

In [131]:
example = torch.Tensor([[0, 1, 1, 0]]).to(torch.float64)
example.shape

In [132]:
spot_example = feature[:, :4, :2]
spot_example

In [133]:
pnl_example = -61.0680 + 61.0120
pnl_example

In [135]:
diff_example = example.diff(n=1, dim=1)
diff_example

In [140]:
spot_example.shape, diff_example.shape

In [145]:
diff_example > 0

In [147]:
bought = torch.where(diff_example > 0, diff_example, 0)
sold = torch.where(diff_example < 0, diff_example, 0)
bought, sold

In [149]:
spot_example.shape, bought.shape

In [151]:
spot_example[:, :, 1]

In [158]:
cash_outflow = -spot_example[:, 1:, 1] @ bought.T
cash_inflow = -spot_example[:, 1:, 0] @ sold.T
cash_outflow, cash_inflow

In [162]:
-61.0680 + 61.0120

In [160]:
(cash_outflow + cash_inflow).item()

In [161]:
pnl_example, (cash_outflow + cash_inflow).item()

In [169]:
class Hedger(nn.Module):
    def __init__(self, input_size: int = 4, num_layers: int = 1, hidden_size: int = 64):
        super().__init__()
        self.input_size = input_size
        self.num_layers = num_layers
        self.hidden_size = hidden_size

        self.lstm = nn.LSTM(
            input_size, self.hidden_size, num_layers=num_layers, batch_first=True
        )

        self.hedging_weights = nn.Sequential(
            nn.Linear(self.hidden_size, self.hidden_size),
            nn.ReLU(),
            nn.Linear(self.hidden_size, self.hidden_size),
            nn.ReLU(),
            nn.Linear(self.hidden_size, 1),
        )

    def forward(
        self,
        spot: torch.Tensor,
        hidden: [(torch.Tensor), None] = None,
        return_hidden: bool = False,
    ) -> [torch.Tensor, (torch.Tensor, torch.Tensor, torch.Tensor)]:
        model_device = spot.device
        if hidden is None:
            h_t = torch.zeros(
                spot.size(0), self.num_layers, self.hidden_size, dtype=torch.float32
            ).to(model_device)
            c_t = torch.zeros(
                spot.size(0), self.num_layers, self.hidden_size, dtype=torch.float32
            ).to(model_device)
        elif len(hidden) != 2:
            raise ValueError(f"Expected two hidden state variables, got {len(hidden)}")
        else:
            h_t, c_t = hidden

        h_t, c_t = self.lstm(spot, (h_t, c_t))
        outputs = self.hedging_weights(h_t)[:, 1:-1, :].squeeze(2)

        if return_hidden:
            return outputs, (h_t, c_t)
        else:
            return outputs

    def get_pnl(self, spot: torch.Tensor) -> torch.float32:
        # hedging_weights = nn.Softmax()(self.linear(spot, return_hidden=False), dim=XXX)
        hedging_weights = self.forward(spot, return_hidden=False)

        weights_all = torch.concat(
            [
                torch.zeros(spot.shape[0], 1, requires_grad=False),
                hedging_weights,
                torch.zeros(spot.shape[0], 1, requires_grad=False),
            ],
            dim=1,
        )
        weights_diff = weights_all.diff(n=1, dim=1)

        bought = torch.where(weights_diff > 0, weights_diff, 0)
        sold = torch.where(weights_diff < 0, weights_diff, 0)

        cash_outflow = -spot[:, 1:, 1] @ bought.T
        cash_inflow = -spot[:, 1:, 0] @ sold.T

        return (cash_outflow + cash_inflow).item()

In [170]:
hedger = Hedger()
hedger.get_pnl(feature.to(torch.float32))

In [6]:
# from operator import is_
# from typing import List, Tuple, Union
# from dataclasses import dataclass

## @dataclass
# class ExperimentConfig:
#     TRAIN_ROOT: str = ROOT_PATH / ROOT_PATH / 'train'
#     VAL_ROOT: str = ROOT_PATH / ROOT_PATH / 'val'
#     CKPT_ROOT: str = GDRIVE_ROOT_PATH

#     NORMALIZE: bool = False

#     N_EPOCHS: int = 20
#     # LR: float = 1e-2
#     LR: float = 0.5
#     BATCH_SIZE: int = 32

#     N_CLASSES = len(os.listdir(ROOT_PATH / ROOT_PATH / 'train'))

#     NUM_WORKERS: int = 2

#     if torch.backends.mps.is_available():
#         DEVICE = torch.device('mps')
#     elif torch.cuda.is_available():
#         DEVICE = torch.device('cuda')
#     else:
#         DEVICE = torch.device('cpu')

#     IS_PRETRAINED: bool = False


# @dataclass
# class AugmentationHyperparams:
#     RESIZE_HEIGHT: Union[int, None] = None
#     RESIZE_WIDTH: Union[int, None] = None
#     RANDOM_CROP_SIZE: int = 32
#     RANDOM_CROP_PADDING: int = 4
#     FLIP_PROB: float = 0.5
#     ROTATION_DEG: float = 15
#     JITTER_PARAM: float = 0.25
#     BRIGHTNESS: float = 0.2
#     CONTRAST: float = 0.15
#     SATURATION: float = 0.15
#     HUE: float = 0.15

## Задание 1

5 баллов
Добейтесь accuracy на валидации не менее 0.44. В этом задании запрещено пользоваться предобученными моделями и ресайзом картинок.

Советы:
1. Аугментации.
2. Оптимайзеры.
4. Регуляризация

In [7]:
train_dataloader, test_dataloader = get_dataloaders(
    config=ExperimentConfig(), augmentation_hyperparams=AugmentationHyperparams()
)

### Модель

In [None]:
class YourNet(torch.nn.Module):
    def __init__(self, n_classes, kernel_size=3, stride=1, padding=1, dilation=1):
        super().__init__()
        self.accuracy_list = []

        self.pretrained = False

        # YOUR CODE HERE
        self.net = nn.Sequential(
            nn.Conv2d(
                in_channels=3,
                out_channels=16,
                kernel_size=kernel_size,
                stride=stride,
                padding=padding,
                dilation=dilation,
            ),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(
                in_channels=16,
                out_channels=32,
                kernel_size=kernel_size,
                stride=stride,
                padding=padding,
                dilation=dilation,
            ),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(
                in_channels=32,
                out_channels=64,
                kernel_size=kernel_size,
                stride=stride,
                padding=padding,
                dilation=dilation,
            ),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(
                in_channels=64,
                out_channels=128,
                kernel_size=kernel_size,
                stride=stride,
                padding=padding,
                dilation=dilation,
            ),
            nn.ReLU(),
        )

        self.classifier = nn.Linear(in_features=128, out_features=n_classes)

    def _forward(self, x):
        # runs the Neural Network
        # YOUR CODE HERE
        feature_map = self.net(x)
        feature_vector = feature_map.mean(dim=(2, 3))
        logits = self.classifier(feature_vector)

        return logits

    def forward(self, images, target=None):
        # images ~ (batch size, num channels, height, width)
        # target ~ (batch size)
        # output ~ (batch size, num classes)
        output = self._forward(images)

        # get binary mask and save it to self.accuracy_list
        if target is not None:
            pred = torch.argmax(output, dim=-1)
            self.accuracy_list.extend((target == pred).tolist())

        return output

    def get_accuracy(self, reset=False):
        # return accuracy by all values in the dataset
        if reset:
            self.accuracy_list = []
            return None
        else:
            return torch.mean(torch.Tensor(self.accuracy_list))

### Тренировочный цикл

In [None]:
from IPython.display import clear_output


def plot_losses(
    train_losses: List[float],
    val_losses: List[float],
    train_accs: List[float],
    val_accs: List[float],
):
    """
    Plot loss and perplexity of train and validation samples
    :param train_losses: list of train losses at each epoch
    :param val_losses: list of validation losses at each epoch
    """
    clear_output()
    fig, axs = plt.subplots(1, 2, figsize=(13, 4))
    axs[0].plot(range(1, len(train_losses) + 1), train_losses, label="train")
    axs[0].plot(range(1, len(val_losses) + 1), val_losses, label="val")
    axs[0].set_ylabel("loss")
    axs[0].set_title("Loss Dynamics")

    axs[1].plot(range(1, len(train_accs) + 1), train_accs, label="train")
    axs[1].plot(range(1, len(val_accs) + 1), val_accs, label="val")
    axs[1].set_ylabel("accuracy")
    axs[1].set_title("Accuracy Dynamics")

    for ax in axs:
        ax.set_xlabel("epoch")
        ax.legend()

    plt.show()

In [None]:
def train_one_epoch(
    model, train_dataloader, criterion, optimizer, tqdm_desc: str = "Training"
):
    # YOUR CODE
    # Train your model here
    device = next(model.parameters()).device

    if tqdm_desc is None:
        iterator = train_dataloader
    else:
        iterator = tqdm(train_dataloader, desc=tqdm_desc)

    running_loss, running_acc = 0.0, 0.0
    model.train()
    model.get_accuracy(reset=True)
    scaler = GradScaler()
    for features, labels in iterator:
        optimizer.zero_grad()

        features = features.to(device)
        labels = labels.to(device)

        with torch.autocast(device_type="cuda", dtype=torch.float16):
            logits = model(features, labels)
            loss = criterion(logits, labels)

        scaler.scale(loss).backward()
        scaler.step(optimizer)

        scaler.update()

        running_loss += loss.item() * features.shape[0]

    return running_loss / len(train_dataloader.dataset), model.get_accuracy().item()


def predict(model, test_dataloder, criterion, tqdm_desc: str = "Training"):
    # YOUR CODE
    # Validate your model here
    device = next(model.parameters()).device

    if tqdm_desc is None:
        iterator = test_dataloader
    else:
        iterator = tqdm(test_dataloader, desc=tqdm_desc)

    running_loss, running_acc = 0.0, 0.0
    model.eval()
    model.get_accuracy(reset=True)
    for features, labels in iterator:
        features = features.to(device)
        labels = labels.to(device)
        logits = model(features, labels)

        loss = criterion(logits, labels)

        running_loss += loss.item() * features.shape[0]

    return running_loss / len(test_dataloader.dataset), model.get_accuracy().item()


def train(
    model,
    train_dataloader,
    test_dataloader,
    criterion,
    optimizer,
    device="cuda:0",
    n_epochs=10,
    scheduler=None,
    ckpt_path: Path = ExperimentConfig.CKPT_ROOT,
):
    model.to(device)
    train_losses, val_losses = [], []
    train_accs, val_accs = [], []
    for epoch in range(1, n_epochs + 1):
        # YOUR CODE
        # Train, evaluate, print accuracy, make a step of scheduler or whatever you want...
        train_loss, train_acc = train_one_epoch(
            model=model,
            train_dataloader=train_dataloader,
            criterion=criterion,
            optimizer=optimizer,
            tqdm_desc=f"Training {epoch}/{n_epochs}",
        )
        val_loss, val_acc = predict(
            model=model,
            test_dataloder=test_dataloader,
            criterion=criterion,
            tqdm_desc=f"Validating {epoch}/{n_epochs}",
        )

        if scheduler is not None:
            scheduler.step()

        train_losses += [train_loss]
        val_losses += [val_loss]
        train_accs += [train_acc]
        val_accs += [val_acc]
        plot_losses(train_losses, val_losses, train_accs, val_accs)

        if model.pretrained:
            ckpt_name = f"{model.__class__.__name__}_pretrained_exp.pt"
        else:
            ckpt_name = f"{model.__class__.__name__}_exp.pt"
        torch.save(model.state_dict(), ckpt_path / ckpt_name)

In [None]:
%%time
model = YourNet(n_classes=ExperimentConfig.N_CLASSES)
optimizer = torch.optim.Adam(model.parameters(), lr=ExperimentConfig.LR)
criterion = torch.nn.CrossEntropyLoss()
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer, T_max=ExperimentConfig.N_EPOCHS
)

train(
    model=model,
    train_dataloader=train_dataloader,
    test_dataloader=test_dataloader,
    criterion=criterion,
    optimizer=optimizer,
    device=ExperimentConfig.DEVICE,
    n_epochs=ExperimentConfig.N_EPOCHS,
    scheduler=scheduler,
)

In [None]:
class YourResNet18(torch.nn.Module):
    def __init__(self, n_classes, is_pretrained: bool = ExperimentConfig.IS_PRETRAINED):
        super().__init__()
        self.accuracy_list = []

        self.pretrained = is_pretrained

        # YOUR CODE HERE
        self.model = torchvision.models.resnet18(
            pretrained=is_pretrained, num_classes=n_classes
        )

    def _forward(self, x):
        # runs the Neural Network
        # YOUR CODE HERE
        logits = self.model(x)

        return logits

    def forward(self, images, target=None):
        # images ~ (batch size, num channels, height, width)
        # target ~ (batch size)
        # output ~ (batch size, num classes)
        output = self._forward(images)

        # get binary mask and save it to self.accuracy_list
        if target is not None:
            pred = torch.argmax(output, dim=-1)
            self.accuracy_list.extend((target == pred).tolist())

        return output

    def get_accuracy(self, reset=False):
        # return accuracy by all values in the dataset
        if reset:
            self.accuracy_list = []
            return None
        else:
            return torch.mean(torch.Tensor(self.accuracy_list))

In [None]:
%%time


@dataclass
class AugmentationHyperparams:
    RESIZE_HEIGHT: Union[int, None] = None
    RESIZE_WIDTH: Union[int, None] = None
    RANDOM_CROP_SIZE: Union[int, None] = None
    RANDOM_CROP_PADDING: Union[int, None] = None
    FLIP_PROB: Union[float, None] = 0.25
    ROTATION_DEG: Union[float, None] = None
    JITTER_PARAM: Union[float, None] = None
    BRIGHTNESS: Union[float, None] = None
    CONTRAST: Union[float, None] = None
    SATURATION: Union[float, None] = None
    HUE: Union[float, None] = None


train_dataloader, test_dataloader = get_dataloaders(
    config=ExperimentConfig(), augmentation_hyperparams=AugmentationHyperparams()
)

model = YourResNet18(n_classes=ExperimentConfig.N_CLASSES)
# optimizer = torch.optim.Adam(model.parameters(), lr=ExperimentConfig.LR)
optimizer = torch.optim.SGD(
    model.parameters(), lr=ExperimentConfig.LR, momentum=0.9, weight_decay=0.01
)
criterion = torch.nn.CrossEntropyLoss()
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer, T_max=ExperimentConfig.N_EPOCHS
)

train(
    model=model,
    train_dataloader=train_dataloader,
    test_dataloader=test_dataloader,
    criterion=criterion,
    optimizer=optimizer,
    device=ExperimentConfig.DEVICE,
    n_epochs=ExperimentConfig.N_EPOCHS,
    scheduler=scheduler,
)

In [None]:
%%time


@dataclass
class AugmentationHyperparams:
    RESIZE_HEIGHT: Union[int, None] = None
    RESIZE_WIDTH: Union[int, None] = None
    RANDOM_CROP_SIZE: Union[int, None] = None
    RANDOM_CROP_PADDING: Union[int, None] = None
    FLIP_PROB: Union[float, None] = 0.25
    ROTATION_DEG: Union[float, None] = None
    JITTER_PARAM: Union[float, None] = None
    BRIGHTNESS: Union[float, None] = None
    CONTRAST: Union[float, None] = None
    SATURATION: Union[float, None] = None
    HUE: Union[float, None] = None


# #@dataclass
# class AugmentationHyperparams:
#     RESIZE_HEIGHT: Union[int, None] = None
#     RESIZE_WIDTH: Union[int, None] = None
#     RANDOM_CROP_SIZE: int = 32
#     RANDOM_CROP_PADDING: int = 4
#     FLIP_PROB: float = 0.5
#     ROTATION_DEG: float = 15
#     JITTER_PARAM: float = 0.25
#     BRIGHTNESS: float = 0.2
#     CONTRAST: float = 0.15
#     SATURATION: float = 0.15
#     HUE: float = 0.15

train_dataloader, test_dataloader = get_dataloaders(
    config=ExperimentConfig(), augmentation_hyperparams=AugmentationHyperparams()
)

model = YourNet(n_classes=ExperimentConfig.N_CLASSES)
optimizer = torch.optim.SGD(model.parameters(), lr=ExperimentConfig.LR)
# optimizer = torch.optim.Adam(model.parameters(), lr=ExperimentConfig.LR)
criterion = torch.nn.CrossEntropyLoss()
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer, T_max=ExperimentConfig.N_EPOCHS
)

train(
    model=model,
    train_dataloader=train_dataloader,
    test_dataloader=test_dataloader,
    criterion=criterion,
    optimizer=optimizer,
    device=ExperimentConfig.DEVICE,
    n_epochs=ExperimentConfig.N_EPOCHS,
    scheduler=scheduler,
)

In [None]:
class YourMobileNet(torch.nn.Module):
    def __init__(self, n_classes, is_pretrained: bool = ExperimentConfig.IS_PRETRAINED):
        super().__init__()
        self.accuracy_list = []

        self.pretrained = is_pretrained

        # YOUR CODE HERE
        self.model = torchvision.models.mobilenet_v3_large(
            pretrained=is_pretrained, num_classes=n_classes
        )

    def _forward(self, x):
        # runs the Neural Network
        # YOUR CODE HERE
        logits = self.model(x)

        return logits

    def forward(self, images, target=None):
        # images ~ (batch size, num channels, height, width)
        # target ~ (batch size)
        # output ~ (batch size, num classes)
        output = self._forward(images)

        # get binary mask and save it to self.accuracy_list
        if target is not None:
            pred = torch.argmax(output, dim=-1)
            self.accuracy_list.extend((target == pred).tolist())

        return output

    def get_accuracy(self, reset=False):
        # return accuracy by all values in the dataset
        if reset:
            self.accuracy_list = []
            return None
        else:
            return torch.mean(torch.Tensor(self.accuracy_list))

In [None]:
%%time
model = YourMobileNet(n_classes=ExperimentConfig.N_CLASSES)
optimizer = torch.optim.Adam(model.parameters(), lr=ExperimentConfig.LR)
criterion = torch.nn.CrossEntropyLoss()
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer, T_max=ExperimentConfig.N_EPOCHS
)

train(
    model=model,
    train_dataloader=train_dataloader,
    test_dataloader=test_dataloader,
    criterion=criterion,
    optimizer=optimizer,
    device=ExperimentConfig.DEVICE,
    n_epochs=ExperimentConfig.N_EPOCHS,
    scheduler=scheduler,
)

In [None]:
from enum import Enum


class PretrainMode(Enum):
    NOT_PRETRAINED = "NOT_PRETRAINED"
    LINEAR_PROBING = "LINEAR_PROBING"
    FINE_TUNING = "FINE_TUNING"

In [None]:
class YourModel(torch.nn.Module):
    def __init__(
        self,
        n_classes: int,
        arhcitecture: nn.Module,
        pretrain_mode: PretrainMode = PretrainMode.LINEAR_PROBING,
    ):
        super().__init__()
        self.accuracy_list = []

        # YOUR CODE HERE
        if pretrain_mode.NOT_PRETRAINED:
            self.pretrained = False
        else:
            self.pretrained = True

        if not pretrain_mode.NOT_PRETRAINED:
            self.model = torchvision.models.resnet50(pretrained=True)

            # Freezing weights
            if pretrain_mode.LINEAR_PROBING:
                for param in self.model.parameters():
                    param.requires_grad = False

            in_features = self.model.fc.in_features
            self.model.fc = nn.Linear(in_features=in_features, out_features=n_classes)
        else:
            self.model = torchvision.models.resnet50(
                pretrained=False, num_classes=n_classes
            )

    def _forward(self, x):
        # runs the Neural Network
        # YOUR CODE HERE
        logits = self.model(x)

        return logits

    def forward(self, images, target=None):
        # images ~ (batch size, num channels, height, width)
        # target ~ (batch size)
        # output ~ (batch size, num classes)
        output = self._forward(images)

        # get binary mask and save it to self.accuracy_list
        if target is not None:
            pred = torch.argmax(output, dim=-1)
            self.accuracy_list.extend((target == pred).tolist())

        return output

    def get_accuracy(self, reset=False):
        # return accuracy by all values in the dataset
        if reset:
            self.accuracy_list = []
            return None
        else:
            return torch.mean(torch.Tensor(self.accuracy_list))

In [None]:
class Experiment:
    def __init__(
        self,
        experiment_config: ExperimentConfig,
        augmentation_hyperparams: AugmentationHyperparams,
    ):
        self.experiment_config = experiment_config
        self.augmentation_hyperparams = augmentation_hyperparams

    def run(self, model: nn.Module) -> nn.Module:
        train_dataloader, test_dataloader = get_dataloaders(
            config=self.experiment_config,
            augmentation_hyperparams=self.augmentation_hyperparams,
        )

        if (
            self.experiment_config.MOMENTUM is not None
            and self.experiment_config.WEIGHT_DECAY is not None
        ):
            optimizer = torch.optim.SGD(
                model.parameters(),
                lr=self.experiment_config.LR,
                momentum=self.experiment_config.MOMENTUM,
                weight_decay=self.experiment_config.WEIGHT_DECAY,
            )
        else:
            optimizer = torch.optim.Adam(
                model.parameters(), lr=self.experiment_config.LR
            )

        if self.experiment_config.LABEL_SMOOTHING is not None:
            criterion = torch.nn.CrossEntropyLoss(
                label_smoothing=self.experiment_config.LABEL_SMOOTHING
            )
        else:
            criterion = torch.nn.CrossEntropyLoss()

        eta_min = (
            self.experiment_config.ETA_MIN
            if self.experiment_config.ETA_MIN is not None
            else 0.0
        )
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
            optimizer, T_max=self.experiment_config.N_EPOCHS, eta_min=eta_min
        )

        train(
            model=model,
            train_dataloader=train_dataloader,
            test_dataloader=test_dataloader,
            criterion=criterion,
            optimizer=optimizer,
            device=ExperimentConfig.DEVICE,
            n_epochs=ExperimentConfig.N_EPOCHS,
            scheduler=scheduler,
        )

        return model

In [None]:
%%time


@dataclass
class ExperimentConfig:
    TRAIN_ROOT: str = ROOT_PATH / ROOT_PATH / "train"
    VAL_ROOT: str = ROOT_PATH / ROOT_PATH / "val"
    CKPT_ROOT: str = GDRIVE_ROOT_PATH

    # MEAN: Union[Tuple[float], None] = (0.485, 0.456, 0.406)
    # STD: Union[Tuple[float], None] = (0.229, 0.224, 0.225)

    MEAN: Union[Tuple[float], None] = None
    STD: Union[Tuple[float], None] = None

    N_EPOCHS: int = 100
    # LR: float = 1e-2
    LR: float = 0.5
    BATCH_SIZE: int = 32

    N_CLASSES = len(os.listdir(ROOT_PATH / ROOT_PATH / "train"))

    NUM_WORKERS: int = 2

    MOMENTUM: Union[float, None] = 0.9
    WEIGHT_DECAY: Union[float, None] = 2e-05
    ETA_MIN: Union[float, None] = 0.00001
    LABEL_SMOOTHING: Union[float, None] = 0.1

    if torch.backends.mps.is_available():
        DEVICE = torch.device("mps")
    elif torch.cuda.is_available():
        DEVICE = torch.device("cuda")
    else:
        DEVICE = torch.device("cpu")

    IS_PRETRAINED: bool = False


@dataclass
class AugmentationHyperparams:
    RESIZE_HEIGHT: Union[int, None] = None
    RESIZE_WIDTH: Union[int, None] = None
    RANDOM_CROP_SIZE: int = None
    RANDOM_CROP_PADDING: int = None
    FLIP_PROB: float = None
    ROTATION_DEG: float = None
    JITTER_PARAM: float = None
    BRIGHTNESS: float = None
    CONTRAST: float = None
    SATURATION: float = None
    HUE: float = None


model = YourModel(n_classes=ExperimentConfig.N_CLASSES)
experiment = Experiment(
    experiment_config=ExperimentConfig(),
    augmentation_hyperparams=AugmentationHyperparams(),
)
final_model = experiment.run(model)

### Валидация результатов задания

In [None]:
def evaluate_task(model, test_dataloader, device="cuda:0"):
    model = model.to(device)
    model.eval()
    accuracy = 0.0
    model.get_accuracy(reset=True)
    for images, labels in tqdm(test_dataloader):
        images, labels = images.to(device), labels.to(device)
        with torch.no_grad():
            model(images, labels)
    accuracy = model.get_accuracy()
    return accuracy

In [None]:
model = YourResNet50(n_classes=ExperimentConfig.N_CLASSES)
model.load_state_dict(
    torch.load(ExperimentConfig.CKPT_ROOT / f"{model.__class__.__name__}.pt")
)

accuracy = evaluate_task(model, test_dataloader)
print(
    f"Оценка за это задание составит {np.clip(10 * accuracy / 0.44, 0, 10):.2f} баллов"
)

# Отчёт об экспериментах

* Описание проведенных вами экспериментов
* Ссылка на трекер (wandb или любой другой)
* Ваши мысли и наблюдения

## Задание 2

5 баллов
Добейтесь accuracy на валидации не менее 0.84. В этом задании делать ресайз и использовать претрейн можно.

Советы:
1. Аугментации
2. Предобученные модели (https://pytorch.org/vision/stable/models.html)
3. Попробуйте сначала посмотреть качество исходной модели без дообучения, отталкиваться от него как baseline.

### Импортируйте предобученную модель в наш интерфейс

In [None]:
%%time


@dataclass
class ExperimentConfig:
    TRAIN_ROOT: str = ROOT_PATH / ROOT_PATH / "train"
    VAL_ROOT: str = ROOT_PATH / ROOT_PATH / "val"
    CKPT_ROOT: str = GDRIVE_ROOT_PATH

    NORMALIZE: bool = True

    N_EPOCHS: int = 20
    # LR: float = 1e-2
    LR: float = 0.5
    BATCH_SIZE: int = 32

    N_CLASSES = len(os.listdir(ROOT_PATH / ROOT_PATH / "train"))

    NUM_WORKERS: int = 2

    MOMENTUM: Union[float, None] = 0.9
    WEIGHT_DECAY: Union[float, None] = 2e-05
    ETA_MIN: Union[float, None] = 0.00001
    LABEL_SMOOTHING: Union[float, None] = 0.1

    if torch.backends.mps.is_available():
        DEVICE = torch.device("mps")
    elif torch.cuda.is_available():
        DEVICE = torch.device("cuda")
    else:
        DEVICE = torch.device("cpu")

    IS_PRETRAINED: bool = True


AugmentationHyperparams.RESIZE_HEIGHT = 224
AugmentationHyperparams.RESIZE_WIDTH = 224


@dataclass
class AugmentationHyperparams:
    RESIZE_HEIGHT: Union[int, None] = 224
    RESIZE_WIDTH: Union[int, None] = 224
    RANDOM_CROP_SIZE: int = None
    RANDOM_CROP_PADDING: int = None
    FLIP_PROB: float = None
    ROTATION_DEG: float = None
    JITTER_PARAM: float = None
    BRIGHTNESS: float = None
    CONTRAST: float = None
    SATURATION: float = None
    HUE: float = None


model = YourModel(
    n_classes=ExperimentConfig.N_CLASSES, pretrain_mode=PretrainMode.LINEAR_PROBING
)
experiment = Experiment(
    experiment_config=ExperimentConfig(),
    augmentation_hyperparams=AugmentationHyperparams,
)
final_model = experiment.run(model)

In [None]:
model = YourModel(
    arhcitecture=torchvision.models.resnet18,
    n_classes=ExperimentConfig.N_CLASSES,
    pretrain_mode=PretrainMode.LINEAR_PROBING,
)
evaluate_task(model, test_dataloader)

### Валидация результатов задания

In [None]:
model = YourResNet50(n_classes=ExperimentConfig.N_CLASSES)
model.load_state_dict(
    torch.load(ExperimentConfig.CKPT_ROOT / f"{model.__class__.__name__}.pt")
)

accuracy = evaluate_task(model, test_dataloader)
print(
    f"Оценка за это задание составит {np.clip(10 * (accuracy - 0.5) / 0.34, 0, 10):.2f} баллов"
)

# Отчёт об экспериментах

* Описание проведенных вами экспериментов
* Ссылка на трекер (wandb или любой другой)
* Ваши мысли и наблюдения