In [None]:
import os
import random
import pandas as pd
import numpy as np
import re
import glob
import cv2
import copy

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torch.optim.lr_scheduler import LambdaLR

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import torchvision.models as models
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn import preprocessing
from sklearn.metrics import f1_score
from tqdm.auto import tqdm

import warnings
warnings.filterwarnings(action='ignore') 

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

CFG = {
    'IMG_SIZE':384,
    'EPOCHS':50,
    'BATCH_SIZE': 16,
    'SEED':42
}

seed_everything(CFG['SEED']) # Seed 고정

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
device

In [None]:
all_img_list = glob.glob('/content/drive/MyDrive/data/dacon/도배하자/open/train/*/*')
test = pd.read_csv('/content/drive/MyDrive/data/dacon/도배하자/open/test.csv')

data = pd.DataFrame(columns=['img_path', 'label'])
data['img_path'] = all_img_list
data['label'] = data['img_path'].apply(lambda x : str(x).split('/')[-2])

In [None]:
train_data, val_data, _, _ = train_test_split(data, data['label'], test_size=0.4, stratify=data['label'], random_state=CFG['SEED'])

In [None]:
encoder = preprocessing.LabelEncoder()
train_data['label'] = encoder.fit_transform(train_data['label'])
val_data['label'] = encoder.transform(val_data['label'])

In [None]:
class CustomDataset(Dataset):
    def __init__(self, img_path_list, label_list, transform = True):
        self.img_path_list = img_path_list
        self.label_list = label_list
        self.transform = transform
        
        if self.transform == True:
            self.transform = A.Compose([                           
                                A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                                A.VerticalFlip(p = 0.5),
                                A.RandomBrightness(limit=0.2, always_apply=False, p=0.5),
                                A.CLAHE(clip_limit=5.0, tile_grid_size=(8, 8), always_apply=False, p=0.5),
                                A.Cutout(num_holes=8, max_h_size=20, max_w_size=20, p=0.5),
                                A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                                ToTensorV2()
                                ])

        else:
            self.transform = A.Compose([
                                        A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                                        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                                        ToTensorV2()
                                        ])
            
            
    def __getitem__(self, index):
        img_path = self.img_path_list[index]
        
        image = cv2.imread(img_path)
        image = self.transform(image = image)['image']
        
        if self.label_list is not None:
            label = self.label_list[index]
            return image, label
        else:
            return image
        
    def __len__(self):
        return len(self.img_path_list)

In [None]:
def cutmix_data(img, target_a, alpha = 1.0):
    lam = np.random.beta(alpha, alpha)
    batch_size = img.size()[0]
    index = torch.randperm(batch_size).cuda()

    shuffled_data = img[index]
    target_b = target_a[index]
    
    image_h, image_w = img.shape[2:]

    cx = np.random.uniform(0, image_w)
    cy = np.random.uniform(0, image_h)
    w = image_w * np.sqrt(1 - lam)
    h = image_h * np.sqrt(1 - lam)
    x0 = int(np.round(max(cx - w / 2, 0)))
    x1 = int(np.round(min(cx + w / 2, image_w)))
    y0 = int(np.round(max(cy - h / 2, 0)))
    y1 = int(np.round(min(cy + h / 2, image_h)))

    img[:, :, y0:y1, x0:x1] = shuffled_data[:, :, y0:y1, x0:x1]

    return img, target_a, target_b, lam

def cutmix_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

##### Label Smoothing + Focal Loss

In [None]:
class LabelSmoothingCrossEntropy(nn.Module):
    def __init__(self):
        super(LabelSmoothingCrossEntropy, self).__init__()

    def forward(self, y, targets, smoothing=0.1):
        confidence = 1. - smoothing
        log_probs = F.log_softmax(y, dim=-1) 
        true_probs = torch.zeros_like(log_probs)
        true_probs.fill_(smoothing / (y.shape[1] - 1))
        true_probs.scatter_(1, targets.data.unsqueeze(1), confidence) 
        return torch.mean(torch.sum(true_probs * -log_probs, dim=-1))

class FocalLoss(nn.Module):
    def __init__(self, alpha=0.25, gamma=2):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma

    def forward(self, inputs, targets):
        ce_loss = nn.functional.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)
        F_loss = self.alpha * (1-pt) ** self.gamma * ce_loss
        return torch.mean(F_loss)

class LabelSmoothingFocalLoss(nn.Module):
    def __init__(self, smoothing=0.1, alpha=0.25):
        super(LabelSmoothingFocalLoss, self).__init__()
        self.smoothing = smoothing
        self.alpha = alpha
        self.cross_entropy = LabelSmoothingCrossEntropy()
        self.focal_loss = FocalLoss()

    def forward(self, inputs, targets):
        ce_loss = self.cross_entropy(inputs, targets)
        fl_loss = self.focal_loss(inputs, targets)
        loss = (1 - self.alpha) * ce_loss + self.alpha * fl_loss # 
        return loss

In [None]:
train_dataset = CustomDataset(train_data['img_path'].values, train_data['label'].values, transform = True)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=6, pin_memory=True, drop_last=True)

val_dataset = CustomDataset(val_data['img_path'].values, val_data['label'].values, transform = False)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=6, pin_memory=True, drop_last=False)

In [None]:
class Swin_B(nn.Module):
    def __init__(self, num_class=len(encoder.classes_)):
        super().__init__()
        self.backbone = models.swin_b(pretrained=True)
        self.classifier = nn.Linear(1000, num_class)

    def forward(self, x):
        x = self.backbone(x)
        x = self.classifier(x)
        return x                                    

In [None]:
class EarlyStopping:
    def __init__(self, patience=5, delta=0, path='checkpoint.pt', verbose=False, save_best_only=True):
        self.patience = patience
        self.verbose = verbose
        self.delta = delta
        self.path = path
        self.save_best_only = save_best_only
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf

    def __call__(self, val_loss, model):
        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        if self.save_best_only:
            if val_loss < self.val_loss_min:
                if self.verbose:
                    print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
                torch.save(model.state_dict(), self.path)
                self.val_loss_min = val_loss
        else:
            if self.verbose:
                print(f'Validation loss decreased. Saving model ...')
            torch.save(model.state_dict(), self.path)


In [None]:
def train(model, optimizer, train_loader, val_loader, scheduler, device):
    model.to(device)
    criterion = LabelSmoothingFocalLoss().to(device)

    early_stopping = EarlyStopping(patience = 5, verbose = True)

    best_val_loss = float('inf')    
    best_state_dict = None
    mean_score = 0

    for epoch in range(1, CFG['EPOCHS']+1):
        model.train()
        train_loss = []

        torch.cuda.empty_cache()
        
        for imgs, labels in tqdm(iter(train_loader)):
            
            imgs = imgs.float().to(device)
            labels = labels.to(device)

            imgs, targets_a, targets_b, lam = cutmix_data(imgs, labels)

            optimizer.zero_grad()
            output = model(imgs)
            
            loss = cutmix_criterion(criterion, output, targets_a, targets_b, lam)
            
            loss.backward()
            optimizer.step()
            
            train_loss.append(loss.item())
        
        _val_loss, _val_score = validation(model, criterion, val_loader, device)
        _train_loss = np.mean(train_loss)
        print(f'Epoch [{epoch}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}] Val Weighted F1 Score : [{_val_score:.5f}]')

        if scheduler is not None:
            scheduler.step(_val_score)
            
        if _val_loss < best_val_loss:
          best_val_loss = _val_loss
          best_state_dict = copy.deepcopy(model.state_dict())
        
        early_stopping(_val_loss, model)
        
        mean_score += _val_score
  
        if early_stopping.early_stop:
          print('Early Stopping')
          break

    model.load_state_dict(best_state_dict)
    return model, mean_score // epoch

In [None]:
def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss = []
    preds = []
    true_labels = []

    with torch.no_grad():
        for imgs, labels in tqdm(val_loader, desc="Validation"):
            imgs, labels = imgs.to(device), labels.to(device)
            pred = model(imgs)

            loss = criterion(pred, labels)

            val_loss.append(loss.item())

            preds += pred.argmax(1).detach().cpu().numpy().tolist()
            true_labels += labels.detach().cpu().numpy().tolist()

    score = f1_score(true_labels, preds, average='weighted')
    return (np.mean(val_loss), score)

In [None]:
def label_decoder(preds):
    label_to_hangul = {
    0: '가구수정',
    1: '걸레받이수정',
    2: '곰팡이',
    3: '꼬임',
    4: '녹오염',
    5: '들뜸',
    6: '면불량',
    7: '몰딩수정',
    8: '반점',
    9: '석고수정',
    10: '오염',
    11: '오타공',
    12: '울음',
    13: '이음부불량',
    14: '창틀,문틀수정',
    15: '터짐',
    16: '틈새과다',
    17: '피스',
    18: '훼손'
    }
    return [label_to_hangul[pred] for pred in preds]

In [None]:
model = Swin_B()
optimizer = torch.optim.AdamW(params = model.parameters(), lr = 3e-5)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max = 50, eta_min = 0.01)
infer_model = train(model, optimizer, train_loader, val_loader, scheduler, device)

In [None]:
test = pd.read_csv('/content/drive/MyDrive/data/dacon/도배하자/open/test.csv')
submit = pd.read_csv('/content/drive/MyDrive/data/dacon/도배하자/open/sample_submission.csv')

test_dataset = CustomDataset(test['img_path'].values, None, transform = False)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=6)

In [None]:
def inference(model, test_loader, device):
    model.eval()
    preds = []
    preds_proba = []
    with torch.no_grad():
        for imgs in tqdm(iter(test_loader)):
            imgs = imgs.float().to(device)
            
            pred = model(imgs)
            
            preds += pred.argmax(1).detach().cpu().numpy().tolist()
            preds_proba += F.softmax(pred, dim=1).detach().cpu().numpy().tolist()
            
    preds = label_decoder(preds)
    return preds

In [None]:
preds = inference(infer_model, test_loader, device)

In [None]:
submit['label'] = preds
submit.to_csv('./cutmix_labelsmoothFocal_vit_b_384.csv', encoding = 'utf-8', index=False)