# Import

In [None]:
import os
import math
import random
import warnings

import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision import transforms
from torchvision.transforms import TrivialAugmentWide, RandomErasing
from sklearn.model_selection import train_test_split

from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import log_loss
from torch.amp import GradScaler, autocast
from torch_ema import ExponentialMovingAverage
from torch.optim.lr_scheduler import _LRScheduler
import timm


device = torch.device("cuda:1" if torch.cuda.is_available() and torch.cuda.device_count() > 1 else "cuda:0")

print("사용 중인 디바이스:", device)


# Hyperparameter Setting

In [None]:
CFG = {
    'IMG_SIZE': 448,               
    'CONVNEXT_IMG_SIZE': 448,     
    'EVA_IMG_SIZE': 336,          
    'BATCH_SIZE': 32,
    'EPOCHS': 100,
    'LEARNING_RATE': 1e-4,
    'SEED': 42
}

# Fixed RandomSeed

In [None]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything(CFG['SEED'])

# CustomDataset

In [None]:
class CustomImageDataset(Dataset):
    def __init__(self, root_dir, transform=None, is_test=False):
        self.transform = transform
        self.is_test = is_test
        self.samples = []

        if is_test:
            self.samples = sorted([
                os.path.join(root_dir, f) for f in os.listdir(root_dir)
                if f.lower().endswith('.jpg')
            ])
        else:
            self.classes = sorted(os.listdir(root_dir))
            self.class_to_idx = {cls: i for i, cls in enumerate(self.classes)}
            for cls in self.classes:
                for f in os.listdir(os.path.join(root_dir, cls)):
                    if f.lower().endswith('.jpg'):
                        self.samples.append((os.path.join(root_dir, cls, f), self.class_to_idx[cls]))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        if self.is_test:
            img = Image.open(self.samples[idx]).convert('RGB')
            return self.transform(img) if self.transform else img
        else:
            img_path, label = self.samples[idx]
            img = Image.open(img_path).convert('RGB')
            return (self.transform(img) if self.transform else img, label)


# Data Load

In [None]:
train_root = './train'
test_root = './test'

In [None]:
class RandomHalfCrop:
    def __init__(self, p=0.3):
        self.p = p

    def __call__(self, img):
        w, h = img.size
        if random.random() < self.p and w > 2 and h > 2:
            side = random.choice(['left', 'right', 'top', 'bottom'])
            if side == 'left' and w // 2 > 1:
                img = img.crop((0, 0, w // 2, h))
            elif side == 'right' and w // 2 > 1:
                img = img.crop((w // 2, 0, w, h))
            elif side == 'top' and h // 2 > 1:
                img = img.crop((0, 0, w, h // 2))
            elif side == 'bottom' and h // 2 > 1:
                img = img.crop((0, h // 2, w, h))
        return img

In [None]:
conv_train_transform = transforms.Compose([
    transforms.Resize((448, 448)),
    RandomHalfCrop(p=0.3),
    transforms.RandomAffine(
        degrees=15,
        translate=(0.1, 0.1),
        scale=(0.9, 1.1),
        shear=10,
        interpolation=transforms.InterpolationMode.BILINEAR,
        fill=(124, 117, 104)
    ),
    TrivialAugmentWide(),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.05),
    transforms.RandomGrayscale(p=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
    transforms.RandomErasing(p=0.3, scale=(0.02, 0.1), ratio=(0.3, 3.3), value='random')
])

conv_val_transform = transforms.Compose([
    transforms.Resize((448, 448)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

eva_train_transform = transforms.Compose([
    transforms.Resize((336, 336)),
    RandomHalfCrop(p=0.3),
    transforms.RandomAffine(
        degrees=15,
        translate=(0.1, 0.1),
        scale=(0.9, 1.1),
        shear=10,
        interpolation=transforms.InterpolationMode.BILINEAR,
        fill=(124, 117, 104)
    ),
    TrivialAugmentWide(),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.05),
    transforms.RandomGrayscale(p=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
    transforms.RandomErasing(p=0.3, scale=(0.02, 0.1), ratio=(0.3, 3.3), value='random')
])

eva_val_transform = transforms.Compose([
    transforms.Resize((336, 336)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])


In [None]:
full_dataset = CustomImageDataset(train_root, transform=None)
print(f"총 이미지 수: {len(full_dataset)}")

targets = [label for _, label in full_dataset.samples]
class_names = full_dataset.classes

train_idx, val_idx = train_test_split(
    range(len(targets)), test_size=0.2, stratify=targets, random_state=42
)

# ✅ ConvNeXtV2용
conv_train_dataset = Subset(CustomImageDataset(train_root, transform=conv_train_transform), train_idx)
conv_val_dataset = Subset(CustomImageDataset(train_root, transform=conv_val_transform), val_idx)

# ✅ EVA용
eva_train_dataset = Subset(CustomImageDataset(train_root, transform=eva_train_transform), train_idx)
eva_val_dataset = Subset(CustomImageDataset(train_root, transform=eva_val_transform), val_idx)

print(f"[ConvNeXt] train 이미지 수: {len(conv_train_dataset)}, valid 이미지 수: {len(conv_val_dataset)}")
print(f"[EVA] train 이미지 수: {len(eva_train_dataset)}, valid 이미지 수: {len(eva_val_dataset)}")

# Model Define

In [None]:
class EvaModel(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.backbone = timm.create_model(
            'hf_hub:timm/eva_large_patch14_336.in22k_ft_in1k',
            pretrained=True,
            num_classes=0  
        )
        self.head = nn.Linear(self.backbone.num_features, num_classes)

    def forward(self, x):
        x = self.backbone(x)
        x = self.head(x)
        return x

In [None]:
class BaseModel(nn.Module):
    def __init__(self, num_classes):
        super(BaseModel, self).__init__()
        self.backbone = timm.create_model(
            'hf_hub:timm/convnextv2_base.fcmae_ft_in22k_in1k_384',
            pretrained=True,
            num_classes=0
        )
        self.feature_dim = self.backbone.num_features
        self.head = nn.Linear(self.feature_dim, num_classes)

    def forward(self, x):
        x = self.backbone(x)
        x = self.head(x)
        return x

In [None]:
class CosineAnnealingWarmupRestarts(_LRScheduler):
    """
        optimizer (Optimizer): Wrapped optimizer.
        first_cycle_steps (int): First cycle step size.
        cycle_mult(float): Cycle steps magnification. Default: -1.
        max_lr(float): First cycle's max learning rate. Default: 0.1.
        min_lr(float): Min learning rate. Default: 0.001.
        warmup_steps(int): Linear warmup step size. Default: 0.
        gamma(float): Decrease rate of max learning rate by cycle. Default: 1.
        last_epoch (int): The index of last epoch. Default: -1.
    """
    
    def __init__(self,
                 optimizer : torch.optim.Optimizer,
                 first_cycle_steps : int,
                 cycle_mult : float = 1.,
                 max_lr : float = 0.1,
                 min_lr : float = 0.001,
                 warmup_steps : int = 0,
                 gamma : float = 1.,
                 last_epoch : int = -1
        ):
        assert warmup_steps < first_cycle_steps
        
        self.first_cycle_steps = first_cycle_steps 
        self.cycle_mult = cycle_mult 
        self.base_max_lr = max_lr 
        self.max_lr = max_lr 
        self.min_lr = min_lr 
        self.warmup_steps = warmup_steps 
        self.gamma = gamma 
        
        self.cur_cycle_steps = first_cycle_steps
        self.cycle = 0 
        self.step_in_cycle = last_epoch 
        
        super(CosineAnnealingWarmupRestarts, self).__init__(optimizer, last_epoch)
        
        
        self.init_lr()
    
    def init_lr(self):
        self.base_lrs = []
        for param_group in self.optimizer.param_groups:
            param_group['lr'] = self.min_lr
            self.base_lrs.append(self.min_lr)
    
    def get_lr(self):
        if self.step_in_cycle == -1:
            return self.base_lrs
        elif self.step_in_cycle < self.warmup_steps:
            return [(self.max_lr - base_lr)*self.step_in_cycle / self.warmup_steps + base_lr for base_lr in self.base_lrs]
        else:
            return [base_lr + (self.max_lr - base_lr) \
                    * (1 + math.cos(math.pi * (self.step_in_cycle-self.warmup_steps) \
                                    / (self.cur_cycle_steps - self.warmup_steps))) / 2
                    for base_lr in self.base_lrs]

    def step(self, epoch=None):
        if epoch is None:
            epoch = self.last_epoch + 1
            self.step_in_cycle = self.step_in_cycle + 1
            if self.step_in_cycle >= self.cur_cycle_steps:
                self.cycle += 1
                self.step_in_cycle = self.step_in_cycle - self.cur_cycle_steps
                self.cur_cycle_steps = int((self.cur_cycle_steps - self.warmup_steps) * self.cycle_mult) + self.warmup_steps
        else:
            if epoch >= self.first_cycle_steps:
                if self.cycle_mult == 1.:
                    self.step_in_cycle = epoch % self.first_cycle_steps
                    self.cycle = epoch // self.first_cycle_steps
                else:
                    n = int(math.log((epoch / self.first_cycle_steps * (self.cycle_mult - 1) + 1), self.cycle_mult))
                    self.cycle = n
                    self.step_in_cycle = epoch - int(self.first_cycle_steps * (self.cycle_mult ** n - 1) / (self.cycle_mult - 1))
                    self.cur_cycle_steps = self.first_cycle_steps * self.cycle_mult ** (n)
            else:
                self.cur_cycle_steps = self.first_cycle_steps
                self.step_in_cycle = epoch
                
        self.max_lr = self.base_max_lr * (self.gamma**self.cycle)
        self.last_epoch = math.floor(epoch)
        for param_group, lr in zip(self.optimizer.param_groups, self.get_lr()):
            param_group['lr'] = lr

# Train/ Validation

In [None]:

model_configs = [
    ("ConvNeXtV2", BaseModel, CFG['CONVNEXT_IMG_SIZE'], "conv"),
    ("EVA", EvaModel, CFG['EVA_IMG_SIZE'], "eva")
]


device = torch.device("cuda:1" if torch.cuda.is_available() and torch.cuda.device_count() > 1 else "cuda:0")


NUM_FOLDS = 5
skf = StratifiedKFold(n_splits=NUM_FOLDS, shuffle=True, random_state=CFG['SEED'])


targets = [label for _, label in full_dataset.samples]
class_names = full_dataset.classes

for model_name, model_class, img_size, model_prefix in model_configs:
    print(f"\n\U0001f9e0 Start training for {model_name}...")

    for fold, (train_idx, val_idx) in enumerate(skf.split(np.zeros(len(targets)), targets)):
        print(f"\n\U0001f501 Fold {fold+1}/{NUM_FOLDS} - {model_name}")

        
        train_transform = transforms.Compose([
            transforms.Resize((img_size, img_size)),
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.ColorJitter(0.2, 0.2, 0.2, 0.1),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

        val_transform = transforms.Compose([
            transforms.Resize((img_size, img_size)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

        
        train_dataset = Subset(CustomImageDataset(train_root, transform=train_transform), train_idx)
        val_dataset = Subset(CustomImageDataset(train_root, transform=val_transform), val_idx)

        train_loader = DataLoader(train_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=True, num_workers=4, pin_memory=True)
        val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=4, pin_memory=True)

        
        model = model_class(num_classes=len(class_names)).to(device)
        ema = ExponentialMovingAverage(model.parameters(), decay=0.9995)

        optimizer = torch.optim.AdamW(model.parameters(), lr=CFG['LEARNING_RATE'], weight_decay=1e-4)
        scheduler = CosineAnnealingWarmupRestarts(
            optimizer, first_cycle_steps=10, cycle_mult=2,
            max_lr=CFG['LEARNING_RATE'], min_lr=1e-6,
            warmup_steps=3, gamma=0.5
        )
        scaler = GradScaler()
        criterion = nn.CrossEntropyLoss()

        best_logloss = float('inf')
        counter = 0
        patience = 5

        
        for epoch in range(CFG['EPOCHS']):
            model.train()
            total_train_loss = 0.0

            for images, labels in tqdm(train_loader, desc=f"[{model_name} Fold {fold+1}] Epoch {epoch+1}"):
                images, labels = images.to(device), labels.to(device)
                optimizer.zero_grad()

                with autocast(device_type='cuda'):
                    outputs = model(images)
                    probs = F.softmax(outputs, dim=1)
                    true_probs = probs[torch.arange(len(labels)), labels]
                    weights = (1.0 - true_probs).detach()
                    log_probs = F.log_softmax(outputs, dim=1)
                    loss_per_sample = F.nll_loss(log_probs, labels, reduction='none')
                    loss = (weights * loss_per_sample).mean()

                scaler.scale(loss).backward()
                scaler.step(optimizer)
                scaler.update()
                ema.update()
                total_train_loss += loss.item()

            avg_train_loss = total_train_loss / len(train_loader)

            
            model.eval()
            val_loss = 0.0
            correct = 0
            total = 0
            all_probs = []
            all_labels = []

            with ema.average_parameters(), torch.no_grad():
                for images, labels in val_loader:
                    images, labels = images.to(device), labels.to(device)
                with autocast(device_type='cuda'):
                    outputs = model(images)                 
                    loss = criterion(outputs, labels)       

                    val_loss += loss.item()
                    probs = F.softmax(outputs, dim=1)
                    all_probs.append(probs.cpu().numpy())
                    all_labels.extend(labels.cpu().numpy())
                    preds = outputs.argmax(dim=1)
                    correct += (preds == labels).sum().item()
                    total += labels.size(0)

            all_probs = np.vstack(all_probs)
            try:
                val_logloss = log_loss(all_labels, all_probs, labels=list(range(len(class_names))))
            except:
                val_logloss = float('inf')

            val_acc = 100 * correct / total

            print(f"\n📊 Epoch {epoch+1} Summary - Train Loss: {avg_train_loss:.4f} | Val Loss: {val_loss:.4f} | Acc: {val_acc:.2f}% | LogLoss: {val_logloss:.4f}")

            scheduler.step()

            if val_logloss < best_logloss:
                best_logloss = val_logloss
                torch.save(model.state_dict(), f"{model_prefix}_fold{fold+1}_best.pth")
                print(f"✅ Best model saved: {model_prefix}_fold{fold+1}_best.pth")
                counter = 0
            else:
                counter += 1
                if counter >= patience:
                    print("🛑 Early stopping")
                    break

# Inference

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
NUM_FOLDS = 5
BATCH_SIZE = CFG['BATCH_SIZE']

In [None]:
def infer_probs(model_class, fold_paths, loader, model_name):
    all_fold_probs = []
    with torch.no_grad():
        for fold, path in enumerate(fold_paths):
            print(f"📦 {model_name} Fold {fold+1} → {path}")
            model = model_class(num_classes=len(class_names)).to(device)
            model.load_state_dict(torch.load(path, map_location=device))
            model.eval()

            fold_probs = []
            for images in tqdm(loader, desc=f"{model_name} - Fold {fold+1}", leave=False):
                images = images.to(device)
                outputs = model(images)
                probs = F.softmax(outputs, dim=1)
                fold_probs.append(probs.cpu())

            fold_probs = torch.cat(fold_probs, dim=0)
            all_fold_probs.append(fold_probs)

    return torch.stack(all_fold_probs).mean(dim=0).numpy()


In [None]:
eva_transform = transforms.Compose([
    transforms.Resize((CFG['EVA_IMG_SIZE'], CFG['EVA_IMG_SIZE'])),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

conv_transform = transforms.Compose([
    transforms.Resize((CFG['CONVNEXT_IMG_SIZE'], CFG['CONVNEXT_IMG_SIZE'])),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

eva_dataset = CustomImageDataset(test_root, transform=eva_transform, is_test=True)
conv_dataset = CustomImageDataset(test_root, transform=conv_transform, is_test=True)

eva_loader = DataLoader(eva_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=4, pin_memory=True)
conv_loader = DataLoader(conv_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=4, pin_memory=True)


In [None]:
eva_fold_paths = [f"eva_fold{fold+1}_best.pth" for fold in range(NUM_FOLDS)]
conv_fold_paths = [f"conv_fold{fold+1}_best.pth" for fold in range(NUM_FOLDS)]

print("🚀 EVA 추론 시작")
eva_preds = infer_probs(EvaModel, eva_fold_paths, eva_loader, model_name="EVA")

print("🚀 ConvNeXtV2 추론 시작")
conv_preds = infer_probs(BaseModel, conv_fold_paths, conv_loader, model_name="ConvNeXtV2")

ensemble_preds = (eva_preds + conv_preds) / 2


# Submission

In [None]:
def predict_fold_ensemble(model_class, fold_paths, loader, desc):
    all_fold_probs = []

    with torch.no_grad():
        for fold, path in enumerate(fold_paths):
            print(f"{desc} Fold {fold+1} → {path}")
            model = model_class(num_classes=len(class_names)).to(device)
            model.load_state_dict(torch.load(path, map_location=device))
            model.eval()

            fold_probs = []
            for images in tqdm(loader, desc=f"{desc} - Fold {fold+1}", leave=False):
                images = images.to(device)
                outputs = model(images)
                probs = F.softmax(outputs, dim=1)
                fold_probs.append(probs.cpu())

            fold_probs = torch.cat(fold_probs, dim=0)
            all_fold_probs.append(fold_probs)

    return torch.stack(all_fold_probs).mean(dim=0).numpy()


In [None]:
submission = pd.read_csv("sample_submission.csv", encoding="utf-8-sig")
class_columns = submission.columns[1:]
pred_df = pd.DataFrame(ensemble_preds, columns=class_names)
submission[class_columns] = pred_df[class_columns].values
submission.to_csv("ensemble_eva_convnext_kfold.csv", index=False, encoding="utf-8-sig")
print("✅ 최종 앙상블 저장 완료: ensemble_eva_convnext_kfold.csv")
