## Import

In [None]:
import random
import pandas as pd
import numpy as np
import os
import cv2

from sklearn import preprocessing
from sklearn.model_selection import StratifiedKFold

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# import wandb
# import datetime

from tqdm.auto import tqdm

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

import torchvision.models as models

from sklearn.metrics import f1_score

import warnings
warnings.filterwarnings(action='ignore') 

## Config & Fixed RandomSeed & GPU 설정

In [None]:
CFG = {'ARCHITECTURE' : 'convnext_large',
       'IMG_SIZE':224,
       'EPOCHS':50,
       'LEARNING_RATE':3e-4,
       'BATCH_SIZE':64,
       'SEED':41,
       'NFOLD' : 5
       }

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

os.environ["CUDA_VISIBLE_DEVICES"]= "1"  # Set the GPU 1 to use
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

## CustomDataset & Data load

In [None]:
class CustomDataset(Dataset):
    def __init__(self, img_paths, labels, transforms=None):
        self.img_paths = img_paths
        self.labels = labels
        self.transforms = transforms

    def __getitem__(self, index):
        img_path = self.img_paths[index]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        if self.transforms is not None:
            image = self.transforms(image=image)['image']
        
        if self.labels is not None:
            label = self.labels[index]
            return image, label
        else:
            return image
    
    def __len__(self):
        return len(self.img_paths)
    
def get_data(df, infer=False):
    
    df['img_path'] = df['img_path'].apply(lambda x: x[:2] + 'data/' + x[2:])
    
    if infer:
        return df['img_path'].values
    return df['img_path'].values, df['artist'].values

# def get_data(df, infer=False):
#     if infer:
#         return df['img_path'].values
#     return df['img_path'].values, df['artist'].values

## Data Augmentation

In [None]:
def get_transform(train_mode, img_size = 224):
    if train_mode == 'train':
        transforms_base = A.Compose([
                                    A.Resize(p=1, height=img_size*2, width=img_size*2),
                                    A.RandomCrop(p=1,height=img_size,width=img_size),
                                    A.CoarseDropout(max_holes=8, max_height=32, max_width=32, p = 0.5),
                                    A.OneOf([
                                            A.MotionBlur(p=1),
                                            A.OpticalDistortion(p=1),
                                            A.GaussNoise(p=1)
                                            ], p= 0.3),
                                    A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2, always_apply=False, p=0.3),
                                    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                                    ToTensorV2()
                                    ])
    elif train_mode == 'valid':
        transforms_base = A.Compose([
                                    A.Resize(p=1, height=img_size*2, width=img_size*2),
                                    A.RandomCrop(p=1,height=img_size,width=img_size),
                                    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                                    ToTensorV2()
                                    ])
    else:        
        transforms_base = A.Compose([
                                    A.Resize(p=1, height=img_size, width=img_size), 
                                    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                                    ToTensorV2()
                                    ])
    return transforms_base


## Model Define

In [None]:
class BaseModel(nn.Module):
    def __init__(self, num_classes=50):
        super(BaseModel, self).__init__()
        self.backbone = models.convnext_large(pretrained=True)
        self.classifier = nn.Linear(1000, num_classes)
        
    def forward(self, x):
        x = self.backbone(x)
        x = self.classifier(x)
        return x

## SAM Optimizer

In [None]:
class SAM(torch.optim.Optimizer):
    def __init__(self, params, base_optimizer, rho=0.05, **kwargs):
        assert rho >= 0.0, f"Invalid rho, should be non-negative: {rho}"

        defaults = dict(rho=rho, **kwargs)
        super(SAM, self).__init__(params, defaults)

        self.base_optimizer = base_optimizer(self.param_groups, **kwargs)
        self.param_groups = self.base_optimizer.param_groups

    @torch.no_grad()
    def first_step(self, zero_grad=False):
        grad_norm = self._grad_norm()
        for group in self.param_groups:
            scale = group["rho"] / (grad_norm + 1e-12)

            for p in group["params"]:
                if p.grad is None: continue
                e_w = p.grad * scale.to(p)
                p.add_(e_w)  # climb to the local maximum "w + e(w)"
                self.state[p]["e_w"] = e_w

        if zero_grad: self.zero_grad()

    @torch.no_grad()
    def second_step(self, zero_grad=False):
        for group in self.param_groups:
            for p in group["params"]:
                if p.grad is None: continue
                p.sub_(self.state[p]["e_w"])  # get back to "w" from "w + e(w)"

        self.base_optimizer.step()  # do the actual "sharpness-aware" update

        if zero_grad: self.zero_grad()

    @torch.no_grad()
    def step(self, closure=None):
        assert closure is not None, "Sharpness Aware Minimization requires closure, but it was not provided"
        closure = torch.enable_grad()(closure)  # the closure should do a full forward-backward pass

        self.first_step(zero_grad=True)
        closure()
        self.second_step()

    def _grad_norm(self):
        shared_device = self.param_groups[0]["params"][0].device  # put everything on the same device, in case of model parallelism
        norm = torch.norm(
                    torch.stack([
                        p.grad.norm(p=2).to(shared_device)
                        for group in self.param_groups for p in group["params"]
                        if p.grad is not None
                    ]),
                    p=2
               )
        return norm

## Train

In [None]:
def rand_bbox(size, lam):
    W = size[2]
    H = size[3]
    cut_rat = np.sqrt(1. - lam)
    cut_w = np.int(W * cut_rat)
    cut_h = np.int(H * cut_rat)

    # uniform
    cx = np.random.randint(W)
    cy = np.random.randint(H)

    bbx1 = np.clip(cx - cut_w // 2, 0, W)
    bby1 = np.clip(cy - cut_h // 2, 0, H)
    bbx2 = np.clip(cx + cut_w // 2, 0, W)
    bby2 = np.clip(cy + cut_h // 2, 0, H)

    return bbx1, bby1, bbx2, bby2

def train(model, optimizer, train_loader, test_loader, scheduler, device, k_idx,beta=1, cutmix_prob=0.5):
    model.to(device)

    criterion = nn.CrossEntropyLoss().to(device)
    
    best_score = 0
    best_model = None
    
    for epoch in tqdm(range(1,CFG["EPOCHS"]+1)):
        model.train()
        train_loss = []
        for img, label in iter(train_loader):
            img, label = img.float().to(device), label.to(device)
            
            optimizer.zero_grad()
            
            r = np.random.rand(1)
            
            if beta > 0 and r < cutmix_prob:
                lam = np.random.beta(beta, beta)
                rand_index = torch.randperm(img.size()[0]).cuda()
                target_a = label
                target_b = label[rand_index]              
                bbx1, bby1, bbx2, bby2 = rand_bbox(img.size(), lam)
                img[:, :, bbx1:bbx2, bby1:bby2] = img[rand_index, :, bbx1:bbx2, bby1:bby2]
                
                lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (img.size()[-1] * img.size()[-2]))
                
                model_pred = model(img)
                loss = criterion(model_pred, target_a) * lam + criterion(model_pred, target_b) * (1. - lam)
                loss.backward()
                optimizer.first_step(zero_grad=True)
                
                model_pred = model(img)
                loss = criterion(model_pred, target_a) * lam + criterion(model_pred, target_b) * (1. - lam)
                loss.backward()
                optimizer.second_step(zero_grad=True)             
            else:
                model_pred = model(img)
                loss = criterion(model_pred, label)
                loss.backward()
                optimizer.first_step()

                criterion(model(img), label).backward()
                optimizer.second_step(zero_grad=True)  

            train_loss.append(loss.item())

        tr_loss = np.mean(train_loss)
            
        val_loss, val_score = validation(model, criterion, test_loader, device)
        
        # wandb.log({"train_loss": tr_loss, "valid_loss": val_loss, "valid_f1": val_score})
            
        print(f'Epoch [{epoch}], Train Loss : [{tr_loss:.5f}] Val Loss : [{val_loss:.5f}] Val F1 Score : [{val_score:.5f}]')
        
        if scheduler is not None:
            scheduler.step()
            
        if best_score < val_score:
            best_model = model
            best_score = val_score
            torch.save(best_model.state_dict(),  f'./saved/best_model_fold{k_idx+1}.pt')
            print('Model Saved.')
            
    # wandb.finish()
    return best_model

def competition_metric(true, pred):
    return f1_score(true, pred, average="macro")

def validation(model, criterion, test_loader, device):
    model.eval()
    
    model_preds = []
    true_labels = []
    
    val_loss = []
    
    with torch.no_grad():
        for img, label in iter(test_loader):
            img, label = img.float().to(device), label.to(device)
            
            model_pred = model(img)
            
            loss = criterion(model_pred, label)
            
            val_loss.append(loss.item())
            
            model_preds += model_pred.argmax(1).detach().cpu().numpy().tolist()
            true_labels += label.detach().cpu().numpy().tolist()
        
    val_f1 = competition_metric(true_labels, model_preds)
    return np.mean(val_loss), val_f1

In [None]:
df = pd.read_csv('./data/train.csv')

# Label Encoding
le = preprocessing.LabelEncoder()
df['artist'] = le.fit_transform(df['artist'].values)

## 5-fold Run

In [None]:
kf = StratifiedKFold(n_splits=5, shuffle=True,random_state=CFG['SEED'])
for idx, [train_idx, valid_idx] in enumerate(kf.split(X = df['img_path'], y= df['artist'])):
    print(f'[{idx + 1}] Fold Training................') 
    
    # now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    # wandb.init(project="dacon_artist", 
    #            name=f'experiment_fold{idx+1}_{now}',
    #            entity="jang346",
    #            config = CFG)
    
    train_df = df.iloc[train_idx,:]
    val_df = df.iloc[valid_idx,:]
    
    train_img_paths, train_labels = get_data(train_df)
    val_img_paths, val_labels = get_data(val_df)
    
    train_dataset = CustomDataset(train_img_paths, train_labels, get_transform(train_mode = 'train', img_size = 224))
    train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=0)

    val_dataset = CustomDataset(val_img_paths, val_labels, get_transform(train_mode = 'valid', img_size = 224))
    val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)
    
    model = BaseModel(num_classes=len(le.classes_))
    base_optimizer = torch.optim.AdamW
    optimizer = SAM(model.parameters(), base_optimizer, lr=CFG["LEARNING_RATE"])

    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer,T_max=50)

    train(model, optimizer, train_loader, val_loader, scheduler, device, k_idx=idx)

## Inference

In [None]:
def predict(model: nn.Module, test_loader, weight_save_path, device) -> np.array:
    model = model.to(device) 
    weight_path_list = weight_save_path
    test_probs = np.zeros(shape=(len(test_loader.dataset), len(le.classes_)))
    for weight in weight_path_list :
        model.load_state_dict(torch.load(weight))
        model.eval()
        probs = None
        
        with torch.no_grad(): 
            for img in tqdm(iter(test_loader)):
                img = img.float().to(device)
                model_pred = model(img).cpu().numpy()
                if probs is None:
                    probs = model_pred
                else:
                    probs = np.concatenate([probs, model_pred])                

        test_probs += (probs / CFG['NFOLD']) 
    _, test_preds = torch.max(torch.tensor(test_probs), dim=1) ## 최대값과 인덱스

    return test_preds ## 라벨값 

In [None]:
test_df = pd.read_csv('./data/test.csv')

test_img_paths = get_data(test_df, infer=True)
test_dataset = CustomDataset(test_img_paths, None, get_transform(train_mode = 'test'))
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

weight_save_path = [f'./saved/best_model_fold{i}.pt' for i in range(1,6)]

infer_model = BaseModel(num_classes=len(le.classes_))

preds = predict(infer_model, test_loader, weight_save_path, device)

## Sumbit

In [None]:
preds = le.inverse_transform(preds)
submit = pd.read_csv('./data/sample_submission.csv')
submit['artist'] = preds
submit.to_csv('./submit.csv', index=False)