In [None]:
!pip install timm
!unzip /content/drive/MyDrive/dacon/도배하자분류/open.zip -d /content/work_space
%cd work_space

In [None]:
import random
import math
import copy
import pandas as pd
import numpy as np
import os
import re
import glob
import cv2
from google.colab.patches import cv2_imshow

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler

import timm

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import torchvision.models as models

from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn import preprocessing
from sklearn.metrics import f1_score
from sklearn.metrics import classification_report
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import f1_score, make_scorer

from tqdm.auto import tqdm

import warnings
warnings.filterwarnings(action='ignore')

CFG = {
    'IMG_SIZE':384,
    'EPOCHS':30,
    'LEARNING_RATE':1e-4,
    'BATCH_SIZE':16,
    'SEED':621
}

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정
CLS_MODEL_PATH = '/content/drive/MyDrive/dacon/도배하자분류/save_model/'

/content/work_space


In [None]:
remove_dict = {'가구수정': [0, 3, 7, 8, 10],
                '걸래받이수정': [87, 144, 160, 188, 285],
                '곰팡이' : [10, 33, 57, 63, 84, 100],
                '꼬임' : [27, 30, 33, 39, 55, 63, 82, 93, 100, 103, 107, 124, 137, 140, 146, 148, 155, 166, 178, 179, 190, 192, 208],
                '녹오염' : [0],
                '들뜸' : [10, 53],
                '면불량' : [86, 3, 22, 1, 4, 5, 7, 9, 10, 12, 18, 28, 30, 42, 44, 45, 46, 52, 53, 63, 66, 77, 78, 81, 84, 88, 90, 95, 96],
                '몰딩수정': [16, 34],
                '석고수정' : [20, 23, 25, 41, 42, 49, 53, 56],
                '오염' : [ 5, 13, 14, 49, 59, 71, 75, 105, 123, 129, 136, 151, 153, 157, 159, 166, 187, 191, 220, 222, 225, 262, 266, 267, 270,
                          279, 306, 328, 342, 344, 393, 434, 444, 449, 453, 493, 497, 533, 539, 577, 580, 586],
                '오타공' : [3, 8, 34, 68, 123],
                '울음' : [3, 4, 6, 10, 12, 14, 19],
                '터짐' : [10, 17, 64, 83, 90, 92, 102, 114, 116, 129],
                '피스': [2],
                '훼손' : [8, 11, 13, 39, 47, 62, 105, 107, 117, 120, 155, 156, 157, 173, 195, 196, 202, 206, 213, 223, 225, 232, 241, 252, 264, 277,
                         269, 291, 292, 297, 300, 302, 313, 342, 348, 361, 382, 386, 395, 402, 412, 416, 425, 437, 456, 486, 506, 517, 520, 522, 525, 541,
                         556, 558, 578, 589, 609, 618, 635, 659, 661, 662, 663, 664, 666, 679, 685]
               }


all_img_list = glob.glob('./train/*/*')

df = pd.DataFrame(columns=['img_path', 'label'])
df['img_path'] = all_img_list
df['label'] = df['img_path'].apply(lambda x : str(x).split('/')[2])

for k, v in remove_dict.items():
    for i in v:
        img_path = f'./train/{k}/{i}.png'
        index = df[df['img_path'] == img_path].index
        df.drop(index, axis=0, inplace=True)

df.loc[df['img_path']=='./train/몰딩수정/1.png', 'label'] = '틈새과다'
df.loc[df['img_path']=='./train/몰딩수정/26.png', 'label'] = '틈새과다'
df.loc[df['img_path']=='./train/몰딩수정/32.png', 'label'] = '틈새과다'
df.loc[df['img_path']=='./train/몰딩수정/50.png', 'label'] = '틈새과다'
df.loc[df['img_path']=='./train/몰딩수정/91.png', 'label'] = '틈새과다'
df.loc[df['img_path']=='./train/몰딩수정/124.png', 'label'] = '틈새과다'

le = preprocessing.LabelEncoder()
le_type0 = preprocessing.LabelEncoder()
le_type1 = preprocessing.LabelEncoder()
le_type2 = preprocessing.LabelEncoder()

le.fit_transform(df['label'])

label_transform = {'0' : ['곰팡이', '녹오염', '반점', '오염'],
                   '1' : ['면불량', '석고수정', '오타공', '울음', '이음부불량', '터짐', '훼손', '피스'],
                   '2' : ['가구수정', '걸레받이수정', '꼬임', '들뜸', '몰딩수정', '창틀,문틀수정', '틈새과다']}

train, val, _, _ = train_test_split(df, df['label'], test_size=0.20, stratify=df['label'], random_state=CFG['SEED'])

train_label3, val_label3 = train.copy(), val.copy()
train_type0, val_type0 = train.copy(), val.copy()
train_type1, val_type1 = train.copy(), val.copy()
train_type2, val_type2 = train.copy(), val.copy()

train_label3['label'] = train_label3['label'].apply(lambda label : [int(k) for k, v in label_transform.items() if label in v][0])
val_label3['label'] = val_label3['label'].apply(lambda label : [int(k) for k, v in label_transform.items() if label in v][0])

train_type0 = train[train['label'].isin(['곰팡이', '녹오염', '반점', '오염'])]
val_type0 = val[val['label'].isin(['곰팡이', '녹오염', '반점', '오염'])]
train_type0['label'] = le_type0.fit_transform(train_type0['label'])
val_type0['label'] = le_type0.transform(val_type0['label'])

train_type1 = train[train['label'].isin(['면불량', '석고수정', '오타공', '울음', '이음부불량', '터짐', '훼손', '피스'])]
val_type1 = val[val['label'].isin(['면불량', '석고수정', '오타공', '울음', '이음부불량', '터짐', '훼손', '피스'])]
train_type1['label'] = le_type1.fit_transform(train_type1['label'])
val_type1['label'] = le_type1.transform(val_type1['label'])

train_type2 = train[train['label'].isin(['가구수정', '걸레받이수정', '꼬임', '들뜸', '몰딩수정', '창틀,문틀수정', '틈새과다'])]
val_type2 = val[val['label'].isin(['가구수정', '걸레받이수정', '꼬임', '들뜸', '몰딩수정', '창틀,문틀수정', '틈새과다'])]
train_type2['label'] = le_type2.fit_transform(train_type2['label'])
val_type2['label'] = le_type2.transform(val_type2['label'])

In [None]:
class CustomDataset(Dataset):
    def __init__(self, img_path_list, label_list, transforms=None):
        self.img_path_list = img_path_list
        self.label_list = label_list
        self.transforms = transforms

    def __getitem__(self, index):
        img_path = self.img_path_list[index]

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        if self.transforms is not None:
            image = self.transforms(image=image)['image']

        if self.label_list is not None:
            label = self.label_list[index]
            return image, label
        else:
            return image

    def __len__(self):
        return len(self.img_path_list)

In [None]:
def make_weights(labels, nclasses):
    labels = np.array(labels)
    weight_arr = np.zeros_like(labels)

    _, counts = np.unique(labels, return_counts=True)
    for cls in range(nclasses):
        weight_arr = np.where(labels == cls, 1/counts[cls], weight_arr)

    return weight_arr

train_sampler_weights = make_weights(train_type0['label'].values, len(np.unique(train_type0['label'])))
train_sampler_weights = torch.DoubleTensor(train_sampler_weights)
train_type0_train_sampler = torch.utils.data.sampler.WeightedRandomSampler(train_sampler_weights, len(train_sampler_weights))

train_sampler_weights = make_weights(train_type1['label'].values, len(np.unique(train_type1['label'])))
train_sampler_weights = torch.DoubleTensor(train_sampler_weights)
train_type1_train_sampler = torch.utils.data.sampler.WeightedRandomSampler(train_sampler_weights, len(train_sampler_weights))

train_sampler_weights = make_weights(train_type2['label'].values, len(np.unique(train_type2['label'])))
train_sampler_weights = torch.DoubleTensor(train_sampler_weights)
train_type2_train_sampler = torch.utils.data.sampler.WeightedRandomSampler(train_sampler_weights, len(train_sampler_weights))
# train_class_weights = torch.FloatTensor(compute_class_weight(class_weight = "balanced" , classes=np.unique(train_type0['label']), y=train_type0['label'])).to(device)

In [None]:
train_transform = A.Compose([
                    A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE'], p=1.0),
                    A.HorizontalFlip(always_apply=False, p=0.6),
                    A.ShiftScaleRotate(always_apply=False, p=0.5, shift_limit_x=(-0.05, 0.05), shift_limit_y=(-0.05, 0.05),
                                       scale_limit=(-0.250, 0.100), rotate_limit=(-15, 15), interpolation=0, border_mode=0, value=(0, 0, 0)),
                    A.RandomBrightnessContrast(always_apply=False, p=0.5, brightness_limit=(-0.05, 0.15), contrast_limit=(-0.05, 0.15), brightness_by_max=True),
                    A.RGBShift(always_apply=False, p=0.4, r_shift_limit=(-10, 10), g_shift_limit=(-10, 10), b_shift_limit=(-10, 10)),
                    A.ColorJitter(always_apply=False, p=0.4, brightness=(1.0, 1.0), contrast=(1.0, 1.0), saturation=(0.8, 1.2), hue=(-0.05, 0.05)),
                    A.GaussNoise(always_apply=False, p=0.4, var_limit=(15.00, 30.00), per_channel=True, mean=0.0),
                    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                    ToTensorV2()
                    ])

test_transform = A.Compose([
                            A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE'], p=1.0),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])


train_label3_dataset = CustomDataset(train_label3['img_path'].values, train_label3['label'].values, train_transform)
train_label3_loader = DataLoader(train_label3_dataset, batch_size = CFG['BATCH_SIZE'],  shuffle=True, num_workers=4)
val_label3_dataset = CustomDataset(val_label3['img_path'].values, val_label3['label'].values, test_transform)
val_label3_loader = DataLoader(val_label3_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=4)

train_type0_dataset = CustomDataset(train_type0['img_path'].values, train_type0['label'].values, train_transform)
train_type0_loader = DataLoader(train_type0_dataset, batch_size = CFG['BATCH_SIZE'], sampler=train_type0_train_sampler, shuffle=False, num_workers=4)
val_type0_dataset = CustomDataset(val_type0['img_path'].values, val_type0['label'].values, test_transform)
val_type0_loader = DataLoader(val_type0_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=4)

train_type1_dataset = CustomDataset(train_type1['img_path'].values, train_type1['label'].values, train_transform)
train_type1_loader = DataLoader(train_type1_dataset, batch_size = CFG['BATCH_SIZE'], sampler=train_type1_train_sampler, shuffle=False, num_workers=4)
origin_train_dataset = CustomDataset(train_type1['img_path'].values, train_type1['label'].values, test_transform)
origin_train_loader = DataLoader(origin_train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=4)
val_type1_dataset = CustomDataset(val_type1['img_path'].values, val_type1['label'].values, test_transform)
val_type1_loader = DataLoader(val_type1_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=4)

train_type2_dataset = CustomDataset(train_type2['img_path'].values, train_type2['label'].values, train_transform)
train_type2_loader = DataLoader(train_type2_dataset, batch_size = CFG['BATCH_SIZE'], sampler=train_type2_train_sampler, shuffle=False, num_workers=4)
val_type2_dataset = CustomDataset(val_type2['img_path'].values, val_type2['label'].values, test_transform)
val_type2_loader = DataLoader(val_type2_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=4)

In [None]:
class FocalLoss(nn.Module):
    def __init__(
        self,
        alpha: float = 1,
        gamma: float = 2.0,
        weight=None,
        label_smoothing: float = 0.0,
        reduction: str = "mean",
    ) -> None:
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.weight = weight
        self.reduction = reduction
        self.label_smoothing = label_smoothing

    def forward(self, inp: torch.Tensor, targ: torch.Tensor):
        ce_loss = F.cross_entropy(
            inp,
            targ,
            weight=self.weight,
            label_smoothing=self.label_smoothing,
            reduction="none",
        )
        p_t = torch.exp(-ce_loss)
        loss = self.alpha * (1 - p_t) ** self.gamma * ce_loss

        if self.reduction == "mean":
            loss = loss.mean()
        elif self.reduction == "sum":
            loss = loss.sum()
        return loss

In [None]:
class ArcFaceClassifier(nn.Module):
    r"""Implement of large margin arc distance: :
        Args:
            in_features: size of each input sample
            out_features: size of each output sample
            s: norm of input feature
            m: margin
            cos(theta + m)
        """
    def __init__(self, in_features, out_features, s=30.0, m=0.50, easy_margin=False):
        super(ArcFaceClassifier, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.s = s
        self.m = m
        self.weight = nn.Parameter(torch.FloatTensor(out_features, in_features))
        nn.init.xavier_uniform_(self.weight)

        self.easy_margin = easy_margin
        self.cos_m = math.cos(m)
        self.sin_m = math.sin(m)
        self.th = math.cos(math.pi - m)
        self.mm = math.sin(math.pi - m) * m

    def forward(self, input, label):
        # --------------------------- cos(theta) & phi(theta) ---------------------------
        cosine = F.linear(F.normalize(input), F.normalize(self.weight))

        if label is not None:
            sine = torch.sqrt((1.0 - torch.pow(cosine, 2)).clamp(0, 1))
            phi = cosine * self.cos_m - sine * self.sin_m
            if self.easy_margin:
                phi = torch.where(cosine > 0, phi, cosine)
            else:
                phi = torch.where(cosine > self.th, phi, cosine - self.mm)
            # --------------------------- convert label to one-hot ---------------------------
            # one_hot = torch.zeros(cosine.size(), requires_grad=True, device='cuda')
            one_hot = torch.zeros(cosine.size(), device=label.device)
            one_hot.scatter_(1, label.view(-1, 1).long(), 1)
            # -------------torch.where(out_i = {x_i if condition_i else y_i) -------------
            output = (one_hot * phi) + ((1.0 - one_hot) * cosine)  # you can use torch.where if your torch.__version__ is 0.4
            output *= self.s
            return output
        else:
            return cosine

In [None]:
class BaseModel_label3(nn.Module):
    def __init__(self, num_classes=3):
        super(BaseModel_label3, self).__init__()
        self.backbone =  timm.create_model('tf_efficientnetv2_s_in21ft1k', pretrained=True)
        in_features  = self.backbone.classifier.in_features
        self.backbone.classifier = nn.Sequential(
                                              nn.Dropout(0.4),
                                              nn.Linear(in_features, num_classes)
                                              )
    def forward(self, x):
        x = self.backbone(x)
        return x


class BaseModel_type0(nn.Module):
    def __init__(self, num_classes=len(le_type0.classes_)):
        super(BaseModel_type0, self).__init__()
        self.backbone =  timm.create_model('tf_efficientnetv2_m_in21ft1k', pretrained=True)
        in_features  = self.backbone.classifier.in_features
        self.backbone.classifier = nn.Sequential(
                                              nn.Dropout(0.4),
                                              nn.Linear(in_features, num_classes)
                                              )
    def forward(self, x):
        x = self.backbone(x)
        return x


class BaseModel_type1(nn.Module):
  def __init__(self, num_classes=len(le_type1.classes_)):
      super(BaseModel_type1, self).__init__()
      self.backbone = timm.create_model('tf_efficientnetv2_m_in21ft1k', pretrained=True)
      in_features  = self.backbone.classifier.in_features
      self.backbone.classifier = nn.Identity()
      self.embeddings = nn.Linear(in_features, 256)
      self.classifier = ArcFaceClassifier(256, num_classes)

  def forward(self, x, labels=None, return_embeddings=True):
      x = self.backbone(x)
      embeddings = self.embeddings(x)
      if not return_embeddings:
          logits = self.classifier(embeddings, labels)
          return logits
      else:
          return F.normalize(embeddings, p=2, dim=1)


class BaseModel_type2(nn.Module):
    def __init__(self, num_classes=len(le_type2.classes_)):
        super(BaseModel_type2, self).__init__()
        self.backbone =  timm.create_model('tf_efficientnetv2_m_in21ft1k', pretrained=True)
        in_features  = self.backbone.classifier.in_features
        self.backbone.classifier = nn.Sequential(
                                              nn.Dropout(0.4),
                                              nn.Linear(in_features, num_classes)
                                              )
    def forward(self, x):
        x = self.backbone(x)
        return x

In [None]:
class EarlyStopping:
    def __init__(self, patience=7, type_='score', verbose=False, delta=0, trace_func=print):
        self.patience = patience
        self.type_ = type_
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.stop = False
        self.val_acc_max = np.Inf
        self.delta = delta
        self.trace_func = trace_func

    def __call__(self, val_acc):

        if self.type_ == 'score':
            score = val_acc
        elif self.type_ == 'loss':
            score = -val_acc
        update_info = bool()

        if self.best_score is None:
            self.best_score = score
            update_info = True
            return update_info

        elif score < self.best_score + self.delta:
            self.counter += 1
            self.trace_func(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            update_info = False
            if self.counter >= self.patience:
                self.stop = True
            return update_info

        else:
            self.best_score = score
            self.counter = 0
            update_info = True
            return update_info

def get_lr_at_epoch(cur_epoch, warmup_epoch=5, max_epcoh=30):
    lr = lr_func_cosine(cur_epoch, offset=warmup_epoch, max_epcoh=max_epcoh)
    # Perform warm up.
    if cur_epoch < warmup_epoch:
        lr_start = 1e-6
        lr_end = lr_func_cosine(warmup_epoch, offset=warmup_epoch, max_epcoh=max_epcoh)
        alpha = (lr_end - lr_start) / warmup_epoch
        lr = cur_epoch * alpha + lr_start
    return lr

def lr_func_cosine(cur_epoch, base_lr=CFG['LEARNING_RATE'], offset=5, max_epcoh=30):
    return (1e-6 + (base_lr - 1e-6) * (math.cos(
                math.pi * (cur_epoch - offset) / (max_epcoh - offset)) + 1.0 ) * 0.5)

def set_lr(optimizer, new_lr):
    for param_group in optimizer.param_groups:
        param_group["lr"] = new_lr

In [None]:
def train(model, optimizer, train_loader, origin_train_loader, val_loader, scheduler, early_stopping, model_type, device):
    model.to(device)
    # ce_criterion = nn.CrossEntropyLoss().to(device)
    ce_criterion = FocalLoss(label_smoothing=0.1).to(device)

    best_model = None
    warmup_epoch = 10

    for epoch in range(CFG['EPOCHS']):
        model.train()
        train_loss = []

        for iteration, (imgs, labels) in tqdm(enumerate(train_loader), total=len(train_loader)):
            epoch_exact = epoch + float(iteration) / len(train_loader)
            new_lr = get_lr_at_epoch(epoch_exact, warmup_epoch, max_epcoh=CFG['EPOCHS'])
            set_lr(optimizer, new_lr)

            imgs = imgs.float().to(device)
            labels = labels.to(device)

            optimizer.zero_grad()

            if isinstance(model, BaseModel_type1):
                output = model(imgs, labels, return_embeddings=False)
            else:
                output = model(imgs)

            loss = ce_criterion(output, labels)
            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())

        _train_loss = np.mean(train_loss)

        if isinstance(model, BaseModel_type1):
            _val_loss, _val_acc, _val_score, _val_cos_score = arc_validation(model, origin_train_loader, val_loader, device)
            print(f'Epoch [{epoch+1}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}] Val Weighted F1 Score : [{_val_score:.5f}] COS Val Weighted F1 Score : [{_val_cos_score:.5f}]')
        else:
            _val_loss, _val_acc, _val_score = validation(model, ce_criterion, val_loader, device)
            print(f'Epoch [{epoch+1}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}] Val acc : [{_val_acc:.5f}] Val Weighted F1 Score : [{_val_score:.5f}]')

        if early_stopping(_val_score):
            best_model = model
            best_epoch = epoch+1

        if early_stopping.stop:
              break

        if scheduler is not None:
            scheduler.step(_val_score)

    torch.save(best_model, f'{CLS_MODEL_PATH}{model_type}_epoch{best_epoch}.pt')
    return best_model

In [None]:
def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss = []
    preds, true_labels = [], []

    with torch.no_grad():
        for imgs, labels in tqdm(iter(val_loader)):
            imgs = imgs.float().to(device)
            labels = labels.to(device)

            pred = model(imgs)

            loss = criterion(pred, labels)

            val_loss.append(loss.item())
            preds += pred.argmax(1).detach().cpu().numpy().tolist()
            true_labels += labels.detach().cpu().numpy().tolist()

        _val_loss = np.mean(val_loss)
        _val_acc = np.mean(np.array(preds)==np.array(true_labels))
        _val_score = f1_score(true_labels, preds, average='weighted')

    return _val_loss, _val_acc, _val_score

def arc_validation(model, origin_train_loader, val_loader, device):
    num_class = 8
    centroids = torch.zeros((num_class, 256), device=device)
    counts = torch.zeros(num_class, device=device)

    model.eval()
    embedding_list, labels_list = [], []
    val_loss, preds, cos_pred, true_labels = [], [], [], []

    with torch.no_grad():
        for imgs, labels in tqdm(iter(origin_train_loader)):
            imgs = imgs.to(device)
            labels = labels.to(device)

            embeddings = model(imgs, None, return_embeddings=True)

            for emb, label in zip(embeddings, labels):
                centroids[label] += emb
                counts[label] += 1

            embedding_list.append(embeddings.detach().cpu().numpy())
            labels_list.append(labels.detach().cpu().numpy())

        for i in range(num_class):
            if counts[i] > 0:
                centroids[i] /= counts[i]

        centroids = centroids.detach().cpu().numpy()
        embedding_list = np.vstack(embedding_list)
        labels_list = np.concatenate(labels_list)

        knn = KNeighborsClassifier(metric='cosine')
        knn.fit(embedding_list, labels_list)

        for imgs, labels in tqdm(iter(val_loader)):
            imgs = imgs.to(device)
            labels = labels.to(device)

            embeddings = model(imgs)

            embeddings = embeddings.detach().cpu().numpy()

            cos_dis = cosine_similarity(embeddings, centroids)
            cos_pred += cos_dis.argmax(1).tolist()

            preds += knn.predict(embeddings).tolist()
            true_labels += labels.detach().cpu().numpy().tolist()

        _val_loss = np.mean(val_loss)
        _val_acc = np.mean(np.array(preds)==np.array(true_labels))
        _val_score = f1_score(true_labels, preds, average='weighted')
        _val_cos_score = f1_score(true_labels, cos_pred, average='weighted')

    return _val_loss, _val_acc, _val_score, _val_cos_score

**TRAIN**

In [None]:
model_type = 'label3'
model = BaseModel_label3()

model.eval()
optimizer = torch.optim.AdamW(params=model.parameters(), lr=CFG['LEARNING_RATE'], weight_decay=0.05)
early_stopping = EarlyStopping(patience=10, type_='score', verbose=False)
scheduler = None
infer_model = train(model, optimizer, train_label3_loader, None, val_label3_loader, scheduler, early_stopping, model_type, device)

In [None]:
model_type = 'type0'
model = BaseModel_type0()

model.eval()
optimizer = torch.optim.AdamW(params=model.parameters(), lr=CFG['LEARNING_RATE'], weight_decay=0.05)
early_stopping = EarlyStopping(patience=10, type_='score', verbose=False)
scheduler = None
infer_model = train(model, optimizer, train_type0_loader, None, val_type0_loader, scheduler, early_stopping, model_type, device)

In [None]:
model_type = 'type1'
model = BaseModel_type1()

model.eval()
optimizer = torch.optim.AdamW(params=model.parameters(), lr=CFG['LEARNING_RATE'], weight_decay=0.05)
early_stopping = EarlyStopping(patience=10, type_='score', verbose=False)
scheduler = None
infer_model = train(model, optimizer, train_type1_loader, origin_train_loader, val_type1_loader, scheduler, early_stopping, model_type, device)

In [None]:
model_type = 'type2'
model = BaseModel_type2()

model.eval()
optimizer = torch.optim.AdamW(params=model.parameters(), lr=CFG['LEARNING_RATE'], weight_decay=0.05)
early_stopping = EarlyStopping(patience=10, type_='score', verbose=False)
scheduler = None
infer_model = train(model, optimizer, train_type2_loader, None, val_type2_loader, scheduler, early_stopping, model_type, device)

In [None]:
def get_knn(model, origin_train_loader, device):
    model.eval()

    embedding_list, labels_list = [], []

    with torch.no_grad():
        for imgs, labels in tqdm(iter(origin_train_loader)):
            imgs = imgs.to(device)
            labels = labels.to(device)

            embeddings = model(imgs, None, return_embeddings=True)

            embedding_list.append(embeddings.detach().cpu().numpy())
            labels_list.append(labels.detach().cpu().numpy())
            
        embedding_list = np.vstack(embedding_list)
        labels_list = np.concatenate(labels_list)

    knn = KNeighborsClassifier()

    scorer = make_scorer(f1_score, average='weighted')

    param_grid = {
        'n_neighbors': range(4, 10),
        'weights': ['uniform', 'distance'],
        'algorithm': ['ball_tree', 'kd_tree', 'brute'],
        'metric' : ['minkowski', 'cosine', 'euclidean', 'l1', 'l2']
    }
    grid_search = GridSearchCV(
        knn,
        param_grid=param_grid,
        scoring=scorer,
        cv=5,
        n_jobs=-1
    )
    grid_search.fit(embedding_list, labels_list)

    print(f"Best parameter: {grid_search.best_params_}")
    print(f"Best score: {grid_search.best_score_}")

    knn = KNeighborsClassifier(**grid_search.best_params_)
    knn.fit(embedding_list, labels_list)

    return knn

In [None]:
def inference(label3_model, model_dict, knn, test_df, device):
    label3_model.eval()

    test_transform = A.Compose([
                            A.Resize(384, 384, p=1.0),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])

    test_dataset = CustomDataset(test_df['img_path'].values, None, test_transform)
    test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=4)

    label3_preds = []

    with torch.no_grad():
        for imgs in tqdm(iter(test_loader)):
            imgs = imgs.float().to(device)

            label3_pred = label3_model(imgs)
            label3_preds += label3_pred.argmax(1).detach().cpu().numpy().tolist()

    test_df['label3'] = label3_preds

    type0 = test_df[test_df['label3'] == 0]
    type1 = test_df[test_df['label3'] == 1]
    type2 = test_df[test_df['label3'] == 2]
    type_list = [type0, type1, type2]

    for i, model_type in enumerate(model_dict):
        preds = []
        model = model_dict[model_type]['model']
        img_size = model_dict[model_type]['img_size']
        le_type = model_dict[model_type]['le_type']
        model.eval()

        test_transform = A.Compose([
                            A.Resize(img_size, img_size, p=1.0),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])

        test_dataset = CustomDataset(type_list[i]['img_path'].values, None, test_transform, mode='test')
        test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=4)

        model.eval()
        with torch.no_grad():
            for imgs in tqdm(iter(test_loader)):
                imgs = imgs.float().to(device)
                if isinstance(model, BaseModel_type1):
                    pred = model(imgs)
                    preds += knn.predict(pred.detach().cpu().numpy()).tolist()
                else:
                    pred = model(imgs)
                    preds += pred.argmax(1).detach().cpu().numpy().tolist()


        preds = le_type.inverse_transform(preds)
        type_list[i]['label'] = preds

        for data in type_list[i].iterrows():
            test_df.loc[test_df['id']==data[1]['id'], 'label3'] = data[1]['label']

    return test_df, type_list

**TEST**

In [None]:
label3_model = BaseModel_label3()
label3_model = torch.load(f'{CLS_MODEL_PATH}label3_epoch28.pt', map_location=device)
type0_model = BaseModel_type0()
type0_model = torch.load(f'{CLS_MODEL_PATH}tpye0_epoch10.pt', map_location=device)
type1_model = BaseModel_type1()
type1_model = torch.load(f'{CLS_MODEL_PATH}tpye1_epoch30.pt', map_location=device)
type2_model = BaseModel_type2()
type2_model = torch.load(f'{CLS_MODEL_PATH}tpye2_epoch15.pt', map_location=device)

In [None]:
knn = get_knn(type1_model, origin_train_loader, device)

In [None]:
model_dict = {'type0_model': {'model': type0_model,
                              'img_size': 440,
                              'le_type' : le_type0},

              'type1_model':{'model': type1_model,
                              'img_size': 440,
                              'le_type' : le_type1},

              'type2_model':{'model': type2_model,
                              'img_size': 384,
                              'le_type' : le_type2}
}

test_df = pd.read_csv('./test.csv')
pred_df, _ = inference(label3_model, model_dict, knn, test_df, device)

submit = pd.read_csv('./sample_submission.csv')
submit['label'] = pred_df['label3']
submit.to_csv('./baseline_submit.csv', index=False)