In [1]:
import random # random
import pandas as pd # pandas 
import numpy as np # numpy 
import cv2 # opencv
import os 

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler

from albumentations.core.transforms_interface import ImageOnlyTransform #RobidouxSharp



import timm

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import torchvision.models as models
import ttach as tta

from tqdm.auto import tqdm
from sklearn.metrics import accuracy_score
from sklearn.model_selection import StratifiedKFold

from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.metrics import classification_report
from tqdm.auto import tqdm

import warnings
warnings.filterwarnings(action='ignore') 


  check_for_updates()


In [2]:
device = "cuda:0" if torch.cuda.is_available() else "cpu"

CFG = {
    "IMG_SIZE" : 384,
    "RSR_IMG_SIZE" : 1024,
    "EPOCHS" : 100,
    "LEARNING_RATE" : 1e-5,
    "BATCH_SIZE" : 24,
    "SEED" : 42,
}

In [3]:
# fix seed
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything(CFG['SEED']) # Seed 고정

In [4]:
# dataload
train_df = pd.read_csv("./train.csv")
test_df = pd.read_csv("./test.csv")

In [5]:
# # stratified k fold

# from sklearn.model_selection import StratifiedKFold
# skf = StratifiedKFold(n_splits=2)

# StratifiedKFold(n_splits=2, random_state=None, shuffle=False)
# for i, (train_index, test_index) in enumerate(skf.split(X, y)):
#      print(f"Fold {i}:")
#      print(f"  Train: index={train_index}")
#      print(f"  Test:  index={test_index}")

In [6]:
# raw_img_path fix 
path = "./train/"
train_df["img_path"] = train_df["img_path"].apply(lambda x : path + x.split("/")[-1])

path = "./up_train/"
train_df["upscale_img_path"] = train_df["upscale_img_path"].apply(lambda x : path + x.split("/")[-1])


path = "./test/"
test_df["img_path"] = test_df["img_path"].apply(lambda x : path + x.split("/")[-1])

path = "./up_test/"
test_df["upscale_img_path"] = test_df["img_path"].apply(lambda x : path + x.split("/")[-1])



In [7]:
# label encoding

from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
le.fit(train_df["label"])
train_df["label"] = le.transform(train_df["label"])

In [8]:
from sklearn.model_selection import StratifiedKFold
skf = StratifiedKFold(n_splits=5 , shuffle= True , random_state= CFG["SEED"])
for i, (train_index, test_index) in enumerate(skf.split(train_df["img_path"], train_df["label"])):
     print(f"Fold {i}:")
     print(f"  Train: index={train_index}")
     print(f"  Test:  index={test_index}")
     
     if i == 1:
          break
val_df = train_df.iloc[test_index]
train_df = train_df.iloc[train_index]

Fold 0:
  Train: index=[    0     2     3 ... 15830 15831 15833]
  Test:  index=[    1     4     5 ... 15808 15811 15832]
Fold 1:
  Train: index=[    0     1     2 ... 15830 15832 15833]
  Test:  index=[    6     8    13 ... 15825 15826 15831]


In [9]:
class CustomDataset(Dataset):
    def __init__(self, img_path_list, label_list, transforms=None , flag_mixup = False):
        self.img_path_list = img_path_list
        self.label_list = label_list
        self.transforms = transforms
        self.flag_mixup = flag_mixup
        
    def __getitem__(self, index):
        img_path = self.img_path_list[index]
        image = cv2.imread(img_path)
        if self.label_list is not None:
            label = torch.zeros(len(le.classes_))
            label[self.label_list[index]] = 1.
            
        # 기존 image data
        if self.transforms is not None:
            image = self.transforms(image=image)['image']
        
        if self.flag_mixup :
            # mixup에 사용될 data 선택
            mixup_label = torch.zeros(len(le.classes_))

            while True:
              mixup_idx = random.randint(0, len(self.img_path_list)-1) # 전체 데이터 중 아무거나 선택 / 중복되는 클래스가 선택될 수 있음
              if self.label_list[mixup_idx] != self.label_list[index]: # 같은 카테고리 방지
                mixup_label[self.label_list[mixup_idx]] = 1.
                break
        
            # mix할 이미지
            mixup_image = cv2.imread(self.img_path_list[mixup_idx])
            if self.transforms is not None:
                mixup_image = self.transforms(image = mixup_image)["image"]

            # Select a random number from the given beta distribution
            # Mixup the images accordingly
            alpha = 0.4
            lam = np.random.beta(alpha, alpha)
            image = lam * image + (1 - lam) * mixup_image
            label = lam * label + (1 - lam) * mixup_label

        # label one-hot으로 생성
        if self.label_list is not None:
            return image, label
        else:
            return image
        
    def __len__(self):
        return len(self.img_path_list)

In [10]:
# Custom RobidouxSharp resize transform
class RobidouxSharpResize(ImageOnlyTransform):
    def __init__(self, height, width, always_apply=False, p=1.0):
        super(RobidouxSharpResize, self).__init__(always_apply, p)
        self.height = height
        self.width = width

    def apply(self, img, **params):
        return cv2.resize(img, (self.width, self.height), interpolation=cv2.INTER_CUBIC)  # Approximating RobidouxSharp

In [11]:
train_transform = A.Compose([
    RobidouxSharpResize(CFG['RSR_IMG_SIZE'], CFG['RSR_IMG_SIZE']),
    A.Resize(CFG["IMG_SIZE"], CFG["IMG_SIZE"], interpolation=2),
    A.HorizontalFlip(p=0.5),
    
   # 배경을 흐리게 하여 객체를 강조하는 효과 추가
    A.OneOf([
        A.GaussNoise(var_limit=(10.0, 50.0), p=0.15),  # 잡음을 너무 과하게 넣지 않기 위해 확률과 강도를 줄임
        A.GaussianBlur(blur_limit=(3, 5), p=0.2),  # 블러 강도를 줄여 배경 흐림 처리만 약간 적용
        A.MotionBlur(blur_limit=3, p=0.2),
    ], p=0.4),  # 전체적인 적용 확률을 낮춰서 원본 이미지 손상을 줄임
    
    # 객체 강조를 위한 추가 전처리
    A.OneOf([
        A.CLAHE(clip_limit=1.5, p=0.2),  # 대비를 너무 과하게 적용하지 않도록 조정
        A.RandomBrightnessContrast(brightness_limit=0.1, contrast_limit=0.1, p=0.2),  # 밝기와 대비를 적당히 조절하여 이미지 특징 유지
        A.Sharpen(alpha=(0.1, 0.3), lightness=(0.5, 1.0), p=0.2)  # 샤프닝 효과를 너무 강하지 않게 적용
    ], p=0.4),  # 전체적인 적용 확률을 낮춤


    A.Rotate(limit=15, p=0.4),
    
    # 선택적으로 배경을 흐리게 하여 조류를 강조
    A.OneOf([
        A.Blur(blur_limit=3, p=0.2),  # 블러 강도를 낮춰 원본 이미지의 세부 사항 보존
        A.ImageCompression(quality_lower=70, quality_upper=90, p=0.2),  # 압축 강도를 적당히 하여 특징을 유지
    ], p=0.3),  # 전체적인 확률을 조정하여 중요한 세부 사항 보존
    
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, p=1.0),
    ToTensorV2()
])

test_transform = A.Compose([
                            A.Resize(CFG["IMG_SIZE"],CFG["IMG_SIZE"],interpolation=2),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, p=1.0),
                            ToTensorV2()
                            ])

In [12]:
'''
train_transform = A.Compose([
                            RobidouxSharpResize(CFG['RSR_IMG_SIZE'], CFG['RSR_IMG_SIZE']),
                            A.Resize(CFG["IMG_SIZE"],CFG["IMG_SIZE"],interpolation=2),
                            A.HorizontalFlip(p= 0.5), 
                            A.Rotate(limit =30 , p = 0.5),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, p=1.0),
                            ToTensorV2()
                            ])

test_transform = A.Compose([
                            A.Resize(CFG["IMG_SIZE"],CFG["IMG_SIZE"],interpolation=2),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, p=1.0),
                            ToTensorV2()
                            ])
'''

'\ntrain_transform = A.Compose([\n                            RobidouxSharpResize(CFG[\'RSR_IMG_SIZE\'], CFG[\'RSR_IMG_SIZE\']),\n                            A.Resize(CFG["IMG_SIZE"],CFG["IMG_SIZE"],interpolation=2),\n                            A.HorizontalFlip(p= 0.5), \n                            A.Rotate(limit =30 , p = 0.5),\n                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, p=1.0),\n                            ToTensorV2()\n                            ])\n\ntest_transform = A.Compose([\n                            A.Resize(CFG["IMG_SIZE"],CFG["IMG_SIZE"],interpolation=2),\n                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, p=1.0),\n                            ToTensorV2()\n                            ])\n'

In [13]:
train_df["upscale_img_path"] = train_df["upscale_img_path"].apply(lambda x : x.replace("png" , "jpg"))
val_df["upscale_img_path"] = val_df["upscale_img_path"].apply(lambda x : x.replace("png" , "jpg"))

In [14]:

train_dataset = CustomDataset(train_df['img_path'].values, train_df['label'].values, train_transform , flag_mixup = True )
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

val_dataset = CustomDataset(val_df['img_path'].values, val_df['label'].values, test_transform)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

In [15]:
class BaseModel(nn.Module):
    def __init__(self, num_classes=len(le.classes_)):
        super(BaseModel, self).__init__()
        self.backbone = timm.create_model('swinv2_large_window12to24_192to384.ms_in22k_ft_in1k	', pretrained=True).to(device)
        self.classifier = nn.Linear(1000, num_classes)
        
    def forward(self, x):
        x = self.backbone(x)
        x = self.classifier(x)
        return x

In [None]:
def train(model, optimizer, train_loader, val_loader, scheduler, device):
    model.to(device)
    criterion = nn.CrossEntropyLoss().to(device)
    
    best_score = 0
    best_model = None
    
    for epoch in range(1, CFG['EPOCHS']+1):
        model.train()
        train_loss = []
        for imgs, labels in tqdm(iter(train_loader)):
            imgs = imgs.float().to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            
            output = model(imgs)
            loss = criterion(output, labels)
            
            loss.backward()
            optimizer.step()
            
            train_loss.append(loss.item())
                    
        _val_loss, _val_score = validation(model, criterion, val_loader, device)
        _train_loss = np.mean(train_loss)
        print(f'Epoch [{epoch}], Train Loss : [{_train_loss:.5f}] Val Loss : [{_val_loss:.5f}] Val Macro F1 Score : [{_val_score:.5f}]')
       
        if scheduler is not None:
            scheduler.step(_val_score)
            
        if best_score < _val_score:
            best_score = _val_score
            best_model = model
            print("model save")
            torch.save(best_model.state_dict() , "./models/swinv2_384_64*64_scale_best.pt")
        torch.save(model.state_dict() , "./models/swinv2_384_64*64_scale_last.pt")
    
    return best_model

In [17]:
def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss = []
    preds, true_labels = [], []

    with torch.no_grad():
        for imgs, labels in tqdm(iter(val_loader)):
            imgs = imgs.float().to(device)
            labels = labels.to(device)
            
            pred = model(imgs)
            
            loss = criterion(pred, labels)
            
            preds += pred.argmax(1).detach().cpu().numpy().tolist()
            true_labels += labels.argmax(1).detach().cpu().numpy().tolist()
            
            val_loss.append(loss.item())
        
        _val_loss = np.mean(val_loss)
        _val_score = f1_score(true_labels, preds, average='macro')
        
    
    return _val_loss, _val_score

In [18]:
model = BaseModel()
model = torch.nn.DataParallel(model).to(device)
model.eval()
optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG["LEARNING_RATE"])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2, threshold_mode='abs', min_lr=1e-8, verbose=True)

infer_model = train(model, optimizer, train_loader, val_loader, scheduler, device)

  0%|          | 0/528 [00:00<?, ?it/s]

  0%|          | 0/132 [00:00<?, ?it/s]

Epoch [1], Train Loss : [1.34411] Val Loss : [0.19037] Val Macro F1 Score : [0.95009]
model save


  0%|          | 0/528 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
load_model = BaseModel().to(device)
load_model = torch.nn.DataParallel(load_model)
load_model.load_state_dict(torch.load('/home/user/Desktop/test/swinv2_384_64*64_scale_best.pt'))
load_model.eval()

In [None]:
import ttach as tta

In [None]:
test_df

Unnamed: 0,id,img_path,upscale_img_path
0,TEST_00000,./test/TEST_00000.jpg,./up_test/TEST_00000.jpg
1,TEST_00001,./test/TEST_00001.jpg,./up_test/TEST_00001.jpg
2,TEST_00002,./test/TEST_00002.jpg,./up_test/TEST_00002.jpg
3,TEST_00003,./test/TEST_00003.jpg,./up_test/TEST_00003.jpg
4,TEST_00004,./test/TEST_00004.jpg,./up_test/TEST_00004.jpg
...,...,...,...
6781,TEST_06781,./test/TEST_06781.jpg,./up_test/TEST_06781.jpg
6782,TEST_06782,./test/TEST_06782.jpg,./up_test/TEST_06782.jpg
6783,TEST_06783,./test/TEST_06783.jpg,./up_test/TEST_06783.jpg
6784,TEST_06784,./test/TEST_06784.jpg,./up_test/TEST_06784.jpg


In [None]:
test_dataset = CustomDataset(test_df['img_path'].values, None, test_transform)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'] * 10, shuffle=False, num_workers=0)

In [None]:
def inference(model, test_loader, device):
    model.eval()
    preds = []
    i = 0
    with torch.no_grad():
        for imgs in tqdm(iter(test_loader)):
            imgs = imgs.float().to(device)
            
            pred = model(imgs)
            # print(pred)
            preds += pred.argmax(1).detach().cpu().numpy().tolist()
            # preds += pred.detach().cpu()
    preds = le.inverse_transform(preds)
    return preds

In [None]:
tta_ = tta.Compose(
    [
      tta.HorizontalFlip(),
      tta.Multiply(factors=[0.9, 1, 1.1])
    ]
)
tta_model = tta.ClassificationTTAWrapper(load_model, tta_)
preds = inference(tta_model, test_loader, device)

100%|██████████| 85/85 [21:33<00:00, 15.22s/it]


In [None]:
submission = pd.read_csv('./sample_submission.csv')
submission['label'] = preds
submission.to_csv('./0505-swinv2_384_64*64.csv',index=False)