In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install madgrad

In [None]:
import sys
import os
import gc
import warnings
import random
from copy import deepcopy
import random
import math

import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

import torch
from torch import nn
import torch.nn.functional as F
from torch.nn import Parameter
from torch.utils.data import Dataset, DataLoader
from madgrad import MADGRAD, MirrorMADGRAD
from torch.optim import AdamW
from transformers import (
    get_constant_schedule,
    get_constant_schedule_with_warmup,
    get_cosine_schedule_with_warmup,
    get_cosine_with_hard_restarts_schedule_with_warmup,
    get_linear_schedule_with_warmup,
    get_polynomial_decay_schedule_with_warmup
)

from sklearn.model_selection import StratifiedKFold
    
from tqdm.notebook import tqdm

warnings.filterwarnings("ignore")
tqdm.pandas()

In [None]:
PROJECT_DIR = "/content/drive/MyDrive/ml/Контесты/rucode_6/a"

# EDA

In [None]:
classes = ['Red', 'Green', 'Violet', 'White', 'Yellow', 'Brown', 'Black', 'Blue', 'Cyan', 'Grey', 'Orange']
counts = []
for class_name in classes:
    counts.append(len(os.listdir(f"{PROJECT_DIR}/data/train/{class_name}")))

In [None]:
help(sns.barplot)

In [None]:
sns.barplot(classes, counts, palette=classes)
# for i in range(len(classes)):
#     barplot[i].set_color(classes[i])

# Dataset

In [None]:
classes = ['Red', 'Green', 'Violet', 'White', 'Yellow', 'Brown', 'Black', 'Blue', 'Cyan', 'Grey', 'Orange']
target_encoder = {}
for i in range(len(classes)):
    target_encoder[classes[i]] = i

In [None]:
import os
import pandas as pd
from torchvision.io import read_image


class CarDataset(Dataset):
    def __init__(self, dir, transform=None, target_encoder: dict = None):
        self.transform = transform
        self.target_encoder = target_encoder
        class_names = os.listdir(dir)
        print(class_names)
        self.filenames = []
        self.labels = []

        for class_name in tqdm(class_names):
            class_path = f"{dir}/{class_name}"
            images = os.listdir(class_path)
            for image in images:
                self.filenames.append(f"{dir}/{class_name}/{image}")
                self.labels.append(class_name)

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = self.filenames[idx]
        image = read_image(img_path)
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)
        
        if self.target_encoder:
            label = self.target_encoder[label]

        return image, label

In [None]:
train_dataset = CarDataset(f"{PROJECT_DIR}/data/train", target_encoder=target_encoder)

# Losses

In [None]:
class FocalLoss(nn.Module):
    def __init__(self, gamma=0, eps=1e-7):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.eps = eps
        self.ce = torch.nn.CrossEntropyLoss()

    def forward(self, input, target):
        logp = self.ce(input, target)
        p = torch.exp(-logp)
        loss = (1 - p) ** self.gamma * logp
        return loss.mean()


class ArcFaceLoss(nn.Module):
    def __init__(self, s=45.0, m=0.1, crit="bce", weight=None, reduction="mean",
                 focal_loss_gamma=0, class_weights_norm="batch"):
        super().__init__()

        self.weight = weight
        self.reduction = reduction
        self.class_weights_norm = class_weights_norm
        
        if crit == "focal":
            self.crit = FocalLoss(gamma=focal_loss_gamma)
        elif crit == "bce":
            self.crit = nn.CrossEntropyLoss(reduction="none")   

        if s is None:
            self.s = torch.nn.Parameter(torch.tensor([45.], requires_grad=True, device='cuda'))
        else:
            self.s = s

        self.cos_m = math.cos(m)
        self.sin_m = math.sin(m)
        self.th = math.cos(math.pi - m)
        self.mm = math.sin(math.pi - m) * m
        
    def forward(self, logits, labels):

        logits = logits.float()
        cosine = logits
        sine = torch.sqrt(1.0 - torch.pow(cosine, 2))
        phi = cosine * self.cos_m - sine * self.sin_m
        phi = torch.where(cosine > self.th, phi, cosine - self.mm)
        
        labels2 = torch.zeros_like(cosine)
        labels2.scatter_(1, labels.view(-1, 1).long(), 1)
        labels2 *= (1 - 0.1)
        labels2 += 0.005
        output = (labels2 * phi) + ((1.0 - labels2) * cosine)

        s = self.s

        output = output * s
        loss = self.crit(output, labels)

        if self.weight is not None:
            w = self.weight[labels].to(logits.device)

            loss = loss * w
            if self.class_weights_norm == "batch":
                loss = loss.sum() / w.sum()
            if self.class_weights_norm == "global":
                loss = loss.mean()
            else:
                loss = loss.mean()
            
            return loss
        if self.reduction == "mean":
            loss = loss.mean()
        elif self.reduction == "sum":
            loss = loss.sum()
        return loss

# Modules

In [None]:
class ArcMarginProduct(nn.Module):
    def __init__(self, in_features, out_features):
        super().__init__()
        self.weight = nn.Parameter(torch.Tensor(out_features, in_features))
        self.reset_parameters()

    def reset_parameters(self):
        nn.init.xavier_uniform_(self.weight)

    def forward(self, features):
        cosine = F.linear(F.normalize(features), F.normalize(self.weight))
        return cosine

# Model

# Loops

In [None]:
def train_epoch(model, data_loader, loss_function, optimizer, scheduler, device):
    model.train()
    total_train_loss = 0

    dl_size = len(data_loader)

    batch_i = 0

    for batch in tqdm(data_loader):
        b_input_ids = batch[0].to(device)
        b_attention_mask = batch[1].to(device)
        b_target = batch[2].to(device)
        
        optimizer.zero_grad()        
        logits = model(b_input_ids, b_attention_mask)
        b_probas = torch.softmax(logits, dim=1)

        loss = loss_function(logits, b_target)
        total_train_loss += loss.item()

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        scheduler.step()
    
    return total_train_loss / dl_size    
    
    
def eval_epoch(model, data_loader, loss_function, device):
    model.eval()
    total_train_loss = 0

    dl_size = len(data_loader)

    
    for batch in tqdm(data_loader):
        b_input_ids = batch[0].to(device)
        b_attention_mask = batch[1].to(device)
        b_target = batch[2].to(device)
        
        with torch.no_grad():
            logits = model(b_input_ids, b_attention_mask)
            b_probas = torch.softmax(logits, dim=1)
        
        loss = loss_function(logits, b_target)
        total_train_loss += loss.item()
    
    return total_train_loss / dl_size

In [None]:
def cross_validation(model, 
                     dataset, 
                     strat_array, 
                     loss_function, 
                     device=torch.device("cpu"),
                     random_state: int=69, 
                     n_folds: int=4, 
                     epochs: int=5, 
                     lr: float=1e-6,
                     start_fold: int=0, 
                     batch_size: int=32):
    random.seed(random_state),
    np.random.seed(random_state)
    torch.manual_seed(random_state)
    torch.cuda.manual_seed_all(random_state)
    
    loss_function.to(device)

    kfold = StratifiedKFold(4, shuffle=True, random_state=69)
    for fold, (train_ids, eval_ids) in enumerate(kfold.split(dataset, strat_array)):
        if fold >= start_fold:
            print(f'FOLD {fold}')
            print('--------------------------------')

            fold_model = deepcopy(model)
            fold_model.to(device)

            optimizer = MADGRAD(
                model.parameters(),
                lr = lr, # args.learning_rate - default is 5e-5, our notebook had 2e-5
            )
            
            train_subsampler = torch.utils.data.Subset(dataset,  train_ids)
            train_loader = torch.utils.data.DataLoader(
                          train_subsampler, 
                          batch_size=batch_size)

            eval_subsampler = torch.utils.data.Subset(dataset,  eval_ids)
            eval_loader = torch.utils.data.DataLoader(
                          eval_subsampler,
                          batch_size=batch_size)
            
            total_steps = len(train_loader) * epochs 

            scheduler = get_linear_schedule_with_warmup(optimizer, 
                                                    num_warmup_steps = 0, # Default value in run_glue.py
                                                    num_training_steps = total_steps)

            mrrs = []

            for epoch_i in range(0, epochs):
                train_epoch(fold_model, train_loader, loss_function, optimizer, scheduler, device)
                eval_epoch(fold_model, eval_loader, loss_function, device)


def single_model(model, 
                     dataset, 
                     loss_function, 
                     device=torch.device("cuda"),
                     random_state: int=69, 
                     epochs: int=5, 
                     lr: float=1e-6,
                     batch_size: int=32,
                     ):
    random.seed(random_state),
    np.random.seed(random_state)
    torch.manual_seed(random_state)
    torch.cuda.manual_seed_all(random_state)
    
    loss_function.to(device)
    model.to(device)

    optimizer = MADGRAD(
        model.parameters(),
        lr = lr, # args.learning_rate - default is 5e-5, our notebook had 2e-5
    )

    data_loader = torch.utils.data.DataLoader(
                    dataset,
                    batch_size=batch_size)
    
    total_steps = len(data_loader) * epochs 

    scheduler = get_cosine_schedule_with_warmup(optimizer, 
                                            num_warmup_steps = 0, # Default value in run_glue.py
                                            num_training_steps = total_steps)

    losses = []

    for epoch_i in range(0, epochs):
        if epoch_i >= start_epoch:
            epoch_path = f"{path}/epoch_{epoch_i}/"
            os.mkdir(epoch_path)

            epoch_loss = train_epoch(model, data_loader, loss_function, optimizer, scheduler, device)
            losses.append(epoch_loss)
            print("EPOCH", epoch_i, epoch_loss)
            
            # eval_epoch(fold_model, eval_loader, loss_function, device)

# Cross Validation

In [None]:
model = Model()
for p in model.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)