In [1]:
# Module import
import os
import re
# import gc # garbage collection module
import glob
import numpy as np
import pandas as pd
import cv2

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import torchvision.models as models

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchvision
import pytorch_lightning as L
from transformers import Swinv2Config, Swinv2Model, AutoImageProcessor, AutoModelForImageClassification
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.callbacks.early_stopping import EarlyStopping

from tqdm import tqdm
from sklearn import preprocessing
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split, StratifiedGroupKFold

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Hyperparameters Setting
CFG = {
    'IMG_SIZE': 224,
    'EPOCHS': 5,
    'LEARNING_RATE': 3e-4,
    'BATCH_SIZE': 32,
    'SEED': 41
}

In [3]:
L.seed_everything(CFG['SEED']) # Seed 고정

INFO:lightning_fabric.utilities.seed:Seed set to 41


41

In [4]:
basedir = os.getcwd() + '\..'

In [5]:
data_dir = basedir + '/Bird_image_data'
df = pd.read_csv(data_dir + '/train.csv')
df.head()
train, val, _, _ = train_test_split(df, df['label'], test_size=0.3, stratify=df['label'], random_state=CFG['SEED'])

In [6]:
# CustomDataset : Dataframe에서 받은 이미지 경로 리스트와 라벨 리스트
class CustomDataset(Dataset):
    # Dataframe에서 받은 이미지 경로 리스트와 라벨 리스트를 로드
    def __init__(self, img_path_list, label_list, transforms=None):
        self.img_path_list = img_path_list
        self.label_list = label_list
        self.transforms = transforms

    def __getitem__(self, index):
        img_path = data_dir + self.img_path_list[index][1:]
        image = cv2.imread(img_path) # 이미지 경로 리스트에서 각 파일 경로 로드

        if self.transforms is not None:
            image = self.transforms(image=image)['image']

        if self.label_list is not None: # 해당 이미지에 라벨이 있을 경우
            label = self.label_list[index] # 이미지에 대한 라벨도 함께 반환
            return image, label
        else: # 라벨이 없을 경우
            return image # 이미지만 반환

    def __len__(self):
        return len(self.img_path_list)

In [None]:
# Dataset의 길이가 일정하지 않을 경우 padding을 통해 맞추는 함수 클래스
# class CustomCollateFn:
#     def __init__(self, transforms=None):
#         self.transforms = transforms

#     def __call__(self, batch):
#         if self.transforms is not None:
#             pixel_values = torch.stack([self.transform(data['image']) for data in batch])
#             return {
#                 'pixel_values':pixel_values,
#             }
#         else:
#             pixel_values = torch.stack([self.transform(data['image']) for data in batch])
#             label = torch.LongTensor([data['label'] for data in batch])
#             return {
#                 'pixel_values':pixel_values,
#                 'label':label,
#             }

In [7]:
train_transform = A.Compose([
                            A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])

test_transform = A.Compose([
                            A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])
# train dataset 설정 및 로드
train_dataset = CustomDataset(train['img_path'].values, train['label'].values, train_transform)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

# val dataset 설정 및 로드
val_dataset = CustomDataset(val['img_path'].values, val['label'].values, test_transform)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [None]:
class CustomModel(nn.Module):
    def __init__(self, model):
        super(CustomModel, self).__init__()
        self.model = model
        self.clf = nn.Sequential(
            nn.Tanh(),
            nn.LazyLinear(25),
        )

#     @torch.compile
    def forward(self, x, label=None):
        x = self.model(x).pooler_output
        x = self.clf(x)
        loss = None
        if label is not None:
            loss = nn.CrossEntropyLoss()(x, label)
        probs = nn.LogSoftmax(dim=-1)(x)
        return probs, loss

class LitCustomModel(L.LightningModule):
    def __init__(self, model):
        super().__init__()
        self.model = CustomModel(model)
        self.validation_step_output = []

    def configure_optimizers(self):
        opt = torch.optim.AdamW(self.parameters(), lr=1e-5)
        return opt

    def training_step(self, batch, batch_idx=None):
        x = batch['pixel_values']
        label = batch['label']
        probs, loss = self.model(x, label)
        self.log(f"train_loss", loss, on_step=True, on_epoch=False)
        return loss

    def validation_step(self, batch, batch_idx=None):
        x = batch['pixel_values']
        label = batch['label']
        probs, loss = self.model(x, label)
        self.validation_step_output.append([probs,label])
        return loss

    def predict_step(self, batch, batch_idx=None):
        x = batch['pixel_values']
        probs, _ = self.model(x)
        return probs

    def validation_epoch_end(self, step_output):
        pred = torch.cat([x for x, _ in self.validation_step_output]).cpu().detach().numpy().argmax(1)
        label = torch.cat([label for _, label in self.validation_step_output]).cpu().detach().numpy()
        score = f1_score(label,pred, average='macro')
        self.log("val_score", score)
        self.validation_step_output.clear()
        return score

In [None]:
skf = StratifiedKFold(n_splits=N_SPLIT, random_state=SEED, shuffle=True)

In [None]:
# Inference
test = pd.read_csv(data_dir + '/test.csv')
test_dataset = CustomDataset(test['img_path'].values, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

def inference(model, test_loader, device):
    model.eval()
    preds = []
    with torch.no_grad():
        for imgs in tqdm(iter(test_loader)):
            imgs = imgs.float().to(device)

            pred = model(imgs)

            preds += pred.argmax(1).detach().cpu().numpy().tolist()

    preds = le.inverse_transform(preds)
    return preds

preds = inference(infer_model, test_loader, device)