In [None]:
import pandas as pd
import numpy as np

import os
import random

import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import cv2
from PIL import Image
from tqdm import tqdm

from sklearn.model_selection import train_test_split

import torch
from torch.utils.data import Dataset, DataLoader, Subset
import torchvision.models as models
import torchvision.transforms as transforms
import torch.nn.functional as F
from torch import nn, optim
from torchvision.transforms import AutoAugment, AutoAugmentPolicy
from torchvision.models import convnext_tiny, ConvNeXt_Tiny_Weights

from torch.optim.lr_scheduler import OneCycleLR


from sklearn.metrics import log_loss

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

In [None]:
DEBUG = False

if DEBUG: CFG = {
    'IMG_SIZE': 232,
    'BATCH_SIZE': 32,
    'EPOCHS': 4,
    'LEARNING_RATE': 1e-4,
    'SEED' : 42
}
else: CFG = {
    'IMG_SIZE': 232,
    'BATCH_SIZE': 32,
    'EPOCHS': 30,
    'LEARNING_RATE': 1e-4,
    'SEED' : 42
}

In [None]:
import shutil

shutil.copytree('/kaggle/input/hai-competition/train', '/kaggle/working/hai-competition/train')
shutil.copytree('/kaggle/input/train-cleaned/kaggle/working/hai-competition/train_cleaned', '/kaggle/working/hai-competition/train_cleaned')

In [None]:
train_root1 = '/kaggle/working/hai-competition/train'
train_root2 = '/kaggle/working/hai-competition/train_cleaned'
test_root = '/kaggle/input/hai-competition/test'

In [None]:
removing_data = [
    '5시리즈_G60_2024_2025_0010.jpg', '6시리즈_GT_G32_2018_2020_0018.jpg', '7시리즈_G11_2016_2018_0040.jpg', '911_992_2020_2024_0030.jpg', 'E_클래스_W212_2010_2016_0022.jpg', 'K5_2세대_2016_2018_0007.jpg', 'F150_2004_2021_0018.jpg', 'G_클래스_W463b_2019_2025_0030.jpg', 'GLE_클래스_W167_2019_2024_0068.jpg', 'Q5_FY_2021_2024_0032.jpg',
    'Q30_2017_2019_0075.jpg', 'Q50_2014_2017_0031.jpg', 'SM7_뉴아트_2008_2011_0053.jpg', 'X3_G01_2022_2024_0029.jpg', 'XF_X260_2016_2020_0023.jpg', '뉴_ES300h_2013_2015_0000.jpg', '뉴_G80_2025_2026_0042.jpg', '뉴_G80_2025_2026_0043.jpg', '뉴_SM5_임프레션_2008_2010_0033.jpg', '더_기아_레이_EV_2024_2025_0078.jpg',
    '더_뉴_K3_2세대_2022_2024_0001.jpg', '더_뉴_그랜드_스타렉스_2018_2021_0078.jpg', '더_뉴_그랜드_스타렉스_2018_2021_0079.jpg', '더_뉴_그랜드_스타렉스_2018_2021_0080.jpg', '더_뉴_아반떼_2014_2016_0031.jpg', '더_뉴_파사트_2012_2019_0067.jpg', '레니게이드_2019_2023_0041.jpg', '박스터_718_2017_2024_0011.jpg', '싼타페_TM_2019_2020_0009.jpg', '아반떼_MD_2011_2014_0081.jpg',
    '아반떼_N_2022_2023_0064.jpg', '익스플로러_2016_2017_0072.jpg', '콰트로포르테_2017_2022_0074.jpg', '프리우스_4세대_2019_2022_0052.jpg', '아반떼_N_2022_2023_0035.jpg', 'E_클래스_W212_2010_2016_0069.jpg', 'ES300h_7세대_2019_2026_0028.jpg', 'G_클래스_W463_2009_2017_0011.jpg', 'GLB_클래스_X247_2020_2023_0008.jpg', 'GLS_클래스_X167_2020_2024_0013.jpg',
    'K3_2013_2015_0045.jpg', 'K5_3세대_2020_2023_0081.jpg', 'Q7_4M_2020_2023_0011.jpg', 'RAV4_5세대_2019_2024_0020.jpg', 'S_클래스_W223_2021_2025_0008.jpg', 'S_클래스_W223_2021_2025_0071.jpg', 'X4_F26_2015_2018_0068.jpg', '그랜드_체로키_WL_2021_2023_0018.jpg', '레이_2012_2017_0063.jpg', '레인지로버_5세대_2023_2024_0030.jpg',
    '레인지로버_스포츠_2세대_2018_2022_0014.jpg', '레인지로버_스포츠_2세대_2018_2022_0017.jpg', '마칸_2019_2021_0035.jpg', '머스탱_2015_2023_0086.jpg', '아반떼_MD_2011_2014_0009.jpg', '아반떼_MD_2011_2014_0082.jpg', '컨티넨탈_GT_3세대_2018_2023_0007.jpg', '타이칸_2021_2025_0065.jpg', '파나메라_2010_2016_0000.jpg', '파나메라_2010_2016_0036.jpg',
    '3시리즈_F30_2013_2018_0036.jpg', '4시리즈_F32_2014_2020_0027.jpg', '5시리즈_G60_2024_2025_0056.jpg', '7시리즈_F01_2009_2015_0029.jpg', '7시리즈_F01_2009_2015_0044.jpg', '911_992_2020_2024_0006.jpg', 'C_클래스_W204_2008_2015_0068.jpg', 'CLS_클래스_C257_2019_2023_0021.jpg',
     '4시리즈_G22_2024_2025_0031.jpg', 'A_클래스_W177_2020_2025_0034.jpg', 'EQA_H243_2021_2024_0063.jpg', 'G_클래스_W463b_2019_2025_0049.jpg', 'SM7_뉴아트_2008_2011_0045.jpg',
    'SM7_뉴아트_2008_2011_0067.jpg', 'SM7_뉴아트_2008_2011_0069.jpg', 'SM7_뉴아트_2008_2011_0083.jpg', 'SM7_뉴아트_2008_2011_0020.jpg', 
    'SM7_뉴아트_2008_2011_0001.jpg', 'X3_G01_2022_2024_0029.jpg', 'X7_G07_2019_2022_0052.jpg', 'XJ_8세대_2010_2019_0064.jpg', 'YF쏘나타_하이브리드_2011_2015_0003.jpg', 
    'YF쏘나타_하이브리드_2011_2015_0072.jpg', 'YF쏘나타_하이브리드_2011_2015_0013.jpg', 'YF쏘나타_2009_2012_0026.jpg', 'YF쏘나타_2009_2012_0068.jpg', 
    'YF쏘나타_2009_2012_0045.jpg', '그랜저TG_2007_2008_0022.jpg', '그랜저TG_2007_2008_0023.jpg', '그랜저TG_2007_2008_0075.jpg', '그랜저TG_2007_2008_0008.jpg', '그랜저TG_2007_2008_0009.jpg', 
    '뉴_A6_2012_2014_0046.jpg', '뉴_GV80_2024_2025_0010.jpg', '뉴_GV80_2024_2025_0021.jpg', '뉴_GV80_2024_2025_0069.jpg', '뉴_G80_2025_2026_0023.jpg', 
    '뉴_G80_2025_2026_0035.jpg', '뉴_G80_2025_2026_0042.jpg', '뉴_G80_2025_2026_0043.jpg', '뉴_SM5_플래티넘_2013_2014_0047.jpg', '뉴_QM5_2012_2014_0001.jpg', '뉴_QM5_2012_2014_0002.jpg', '뉴쏘렌토_R_2013_2014_0009.jpg', 
    '더_뉴_QM6_2024_2025_0040.jpg', '더_뉴_스파크_2019_2022_0040.jpg', '더_올뉴투싼_하이브리드_2021_2023_0027.jpg', '더_올뉴투싼_하이브리드_2021_2023_0038.jpg', 
    '더_올뉴투싼_하이브리드_2021_2023_0042.jpg', '더_올뉴G80_2021_2024_0001.jpg', '더_올뉴G80_2021_2024_0054.jpg', '더_올뉴G80_2021_2024_0070.jpg', 
    '더_올뉴G80_2021_2024_0076.jpg', '디_올뉴코나_2023_2025_0058.jpg', '레인지로버_4세대_2018_2022_0048.jpg', '베뉴_2020_2024_0005.jpg', '박스터_718_2017_2024_0044.jpg', '박스터_718_2017_2024_0051.jpg', '박스터_718_2017_2024_0082.jpg', 
    '아베오_2012_2016_0018.jpg', '아베오_2012_2016_0052.jpg', '아반떼_N_2022_2023_0003.jpg', '아반떼_N_2022_2023_0035.jpg', '아반떼_N_2022_2023_0064.jpg', '에쿠스_신형_2010_2015_0044.jpg', '카이엔_PO536_2019_2023_0035.jpg', 
    '티볼리_에어_2016_2019_0047.jpg', 
    
]

print(len(removing_data))

deleted_count = 0

for p in removing_data:
    path = os.path.join(train_root1, '_'.join(p.split('_')[:-1]), p)
    if os.path.isfile(path):
        try:
            os.remove(path)
            deleted_count += 1
            
        except Exception as e:
            print(f"Error removing {path}: {e}")
    else:
        print(f"File not found: {path}")

    if train_root2:
      path = os.path.join(train_root2, '_'.join(p.split('_')[:-1]), p)
      if os.path.isfile(path):
          try:
              os.remove(path)
              deleted_count += 1
              # print(f"Removed: {path}")
          except Exception as e:
              print(f"Error removing {path}: {e}")
      else:
          print(f"File not found: {path}")

print(f"\nTotal deleted: {deleted_count} files.")

In [None]:
test = pd.read_csv('/kaggle/input/hai-competition/test.csv')
submission = pd.read_csv('/kaggle/input/hai-competition/sample_submission.csv')

In [None]:
class CustomImageDataset(Dataset):
    def __init__(self, root_dir1, root_dir2, transform=None, is_test=False):
        self.root_dir1 = root_dir1
        self.root_dir2 = root_dir2
        self.transform = transform
        self.is_test = is_test
        self.samples = []
        

        if is_test:
            
            for fname in sorted(os.listdir(root_dir1)):
                if fname.lower().endswith(('.jpg')):
                    img_path = os.path.join(root_dir1, fname)
                    self.samples.append((img_path,))
        else:
            
            self.classes = sorted(os.listdir(root_dir1))
            self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}
            self.idx_to_class = {v: k for k, v in self.class_to_idx.items()}

            # root 1            
            for cls_name in self.classes:
                cls_folder = os.path.join(root_dir1, cls_name)
                model_label = self.class_to_idx[cls_name]
               
                for fname in os.listdir(cls_folder):
                    if fname.lower().endswith(('.jpg')):
                        img_path = os.path.join(cls_folder, fname)
                        self.samples.append((img_path, model_label))

            # root 2
            if root_dir2:
                for cls_name in self.classes:
                    cls_folder = os.path.join(root_dir2, cls_name)
                    model_label = self.class_to_idx[cls_name]
                   
                    for fname in os.listdir(cls_folder):
                        if fname.lower().endswith(('.jpg')):
                            img_path = os.path.join(cls_folder, fname)
                            self.samples.append((img_path, model_label))

    

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        if self.is_test:
            img_path = self.samples[idx][0]
            image = Image.open(img_path).convert('RGB')
            if self.transform:
                image = self.transform(image)
            return image
        else:
            img_path, model_label = self.samples[idx]
            image = Image.open(img_path).convert('RGB')
            if self.transform:
                image = self.transform(image)
            return image, model_label

    def get_model_name(self, model_idx):
      return self.idx_to_class[model_idx]


In [None]:
class TestImageDataset(Dataset):
    def __init__(self, root_dir, transform=None, is_test=False):
        self.root_dir = root_dir
        self.transform = transform
        self.is_test = is_test
        self.samples = []
        

        if is_test:
        
            for fname in sorted(os.listdir(root_dir)):
                if fname.lower().endswith(('.jpg')):
                    img_path = os.path.join(root_dir, fname)
                    self.samples.append((img_path,))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        if self.is_test:
            img_path = self.samples[idx][0]
            image = Image.open(img_path).convert('RGB')
            if self.transform:
                image = np.array(image)  # PIL → np.ndarray
                augmented = self.transform(image=image)
                image = augmented['image']
                
            return image
        else:
            img_path, model_label = self.samples[idx]
            image = Image.open(img_path).convert('RGB')
            if self.transform:
                image = np.array(image)  # PIL → np.ndarray
                augmented = self.transform(image=image)
                image = augmented['image']
                
            return image, model_label

    def get_model_name(self, model_idx):
      return self.idx_to_class[model_idx]

In [None]:
def random_half_crop_horizontal(img,  **kwargs):
    h, w, _ = img.shape
    if np.random.rand() < 0.5:
        return img[:, :w // 2, :]  
    else:
        return img[:, w // 2:, :] 

def random_half_crop_vertical(img, **kwargs):
    h, w, _ = img.shape
    if np.random.rand() < 0.5:
        return img[:h // 2, :, :] 
    else:
        return img[h // 2:, :, :] 

In [None]:
import albumentations as A
from albumentations.pytorch import ToTensorV2

train_transform = A.OneOf([
   
    A.Compose([
        A.Resize(height=CFG['IMG_SIZE'], width=CFG['IMG_SIZE']),
        A.Normalize(mean=(0.485, 0.456, 0.406),
                    std=(0.229, 0.224, 0.225)),
        ToTensorV2()
    ], p=1.0),

   
    A.Compose([
        A.SomeOf([ 
            A.Lambda(image=random_half_crop_horizontal, p=1.0),
            A.Lambda(image=random_half_crop_vertical, p=1.0),
            A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=1.0),
            A.VerticalFlip(p=1.0),
            A.HorizontalFlip(p=1.0),
          
        ], n=2, replace=False), 

        A.Resize(height=CFG['IMG_SIZE'], width=CFG['IMG_SIZE']),
        A.Normalize(mean=(0.485, 0.456, 0.406),
                    std=(0.229, 0.224, 0.225)),
        ToTensorV2()
    ], p=1.0)
], p=1.0)

val_transform = A.Compose([
    A.Resize(height=CFG['IMG_SIZE'], width=CFG['IMG_SIZE']),
    A.Normalize(mean=(0.485, 0.456, 0.406), 
                std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

In [None]:
full_dataset = CustomImageDataset(train_root1, train_root2, transform=None)
print(f"Total num of images: {len(full_dataset)}")

In [None]:
targets = [model for _, model in full_dataset.samples]
class_names = full_dataset.classes

# Stratified Split
train_idx, val_idx = train_test_split(
    range(len(targets)), test_size=0.3, stratify=targets, random_state=42
)

class TransformedSubset(Dataset):
    def __init__(self, dataset, indices, transform):
        self.dataset = dataset
        self.indices = indices
        self.transform = transform

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, idx):
        real_idx = self.indices[idx]
        image, label = self.dataset[real_idx]
        image = np.array(image)  # PIL → np.ndarray
        augmented = self.transform(image=image)
        image = augmented['image']
        return image, label

train_dataset = TransformedSubset(full_dataset, train_idx, train_transform)
val_dataset = TransformedSubset(full_dataset, val_idx, val_transform)
print(f'train len: {len(train_dataset)}, valid len: {len(val_dataset)}')

# DataLoader 정의
train_loader = DataLoader(train_dataset, batch_size=CFG['BATCH_SIZE'], num_workers=2, shuffle=True, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], num_workers=2, shuffle=False, pin_memory=True)

In [None]:
from torchvision.models import convnext_base, ConvNeXt_Base_Weights

weights = ConvNeXt_Base_Weights.DEFAULT
model = convnext_base(weights=weights)


model.classifier[2] = nn.Linear(model.classifier[2].in_features, len(full_dataset.class_to_idx))

model = model.to(device)

In [None]:
save_path = '/kaggle/working/'

import logging

log_path = os.path.join(save_path, 'train_log.txt')
logging.basicConfig(
    level=logging.INFO,
    format='[%(asctime)s] %(message)s',
    handlers=[
        logging.FileHandler(log_path),
        logging.StreamHandler()
    ]
)

In [None]:
import time
import os

early_stop_counter = 0
patience = 3
best_logloss = float('inf')

criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = torch.optim.AdamW(model.parameters(), lr=CFG['LEARNING_RATE'])

scheduler = OneCycleLR(
    optimizer,
    max_lr=CFG['LEARNING_RATE'],
    steps_per_epoch=len(train_loader),
    epochs=CFG['EPOCHS']
)

for epoch in range(CFG['EPOCHS']):
    model.train()
    train_loss = 0.0
    correct_train = 0
    total_train = 0
    train_probs = []
    train_true = []

    for images, model_labels in tqdm(train_loader, desc=f"[Epoch {epoch+1}/{CFG['EPOCHS']}] Training", leave=True, dynamic_ncols=True):
        images, model_labels = images.to(device), model_labels.to(device)

        optimizer.zero_grad()
        model_logits = model(images)
        loss = criterion(model_logits, model_labels)
        loss.backward()
        optimizer.step()
        scheduler.step()

        train_loss += loss.item()
        preds = model_logits.argmax(dim=1)
        correct_train += (preds == model_labels).sum().item()
        total_train += model_labels.size(0)
        train_probs.extend(F.softmax(model_logits, dim=1).detach().cpu().numpy())
        train_true.extend(model_labels.cpu().numpy())

    avg_train_loss = train_loss / len(train_loader)
    train_acc = 100 * correct_train / total_train
    train_logloss = log_loss(train_true, train_probs, labels=list(range(len(full_dataset.class_to_idx))))

    # ---------- VALID ----------
    model.eval()
    val_loss = 0.0
    correct_val = 0
    total_val = 0
    val_probs = []
    val_true = []

    with torch.no_grad():
        for images, model_labels in tqdm(val_loader, desc=f"[Epoch {epoch+1}/{CFG['EPOCHS']}] Validation"):
            images, model_labels = images.to(device), model_labels.to(device)

            model_logits = model(images)
            loss = criterion(model_logits, model_labels)
            val_loss += loss.item()

            preds = model_logits.argmax(dim=1)
            correct_val += (preds == model_labels).sum().item()
            total_val += model_labels.size(0)
            val_probs.extend(F.softmax(model_logits, dim=1).detach().cpu().numpy())
            val_true.extend(model_labels.detach().cpu().numpy())

    avg_val_loss = val_loss / len(val_loader)
    val_acc = 100 * correct_val / total_val
    val_logloss = log_loss(val_true, val_probs, labels=list(range(len(full_dataset.class_to_idx))))

    print(f"\n Epoch {epoch+1} Summary")
    print(f"Train Loss : {avg_train_loss:.4f} || Valid Loss : {avg_val_loss:.4f}")
    print(f"Train Acc  : {train_acc:.2f}% | Valid Acc  : {val_acc:.2f}%")
    print(f"Train LogLoss: {train_logloss:.4f} | Valid LogLoss: {val_logloss:.4f}")

   
    if val_logloss < best_logloss:
        best_logloss = val_logloss
        torch.save(model.state_dict(), os.path.join(save_path, 'best_model.pth'))
        logging.info(f"Best model saved at epoch {epoch+1} (Valid LogLoss: {val_logloss:.4f})")
        early_stop_counter = 0
    else:
        early_stop_counter += 1
        logging.info(f"⏸ No improvement. Early stop counter: {early_stop_counter}/{patience}")


    if early_stop_counter >= patience:
        logging.info(f"\nEarly stopping at epoch {epoch+1}. Saving final model...")
        torch.save(model.state_dict(), os.path.join(save_path, 'final_model.pth'))
        break

In [None]:
tta_transforms = [
    A.Compose([
        A.HorizontalFlip(p=1.0),
        A.Resize(CFG['IMG_SIZE'], CFG['IMG_SIZE']),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2()
    ]),
    A.Compose([
        A.VerticalFlip(p=1.0),
        A.Resize(CFG['IMG_SIZE'], CFG['IMG_SIZE']),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2()
    ]),
    A.Compose([
        A.RandomBrightnessContrast(p=1.0),
        A.Resize(CFG['IMG_SIZE'], CFG['IMG_SIZE']),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2()
    ]),
    A.Compose([  # identity transform
        A.Resize(CFG['IMG_SIZE'], CFG['IMG_SIZE']),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2()
    ]),
]

In [None]:
test_dataset = TestImageDataset(test_root, transform=val_transform, is_test=True)
test_loader = DataLoader(test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False)

In [None]:
model_path = '/kaggle/input/convnextbase_ob_best/pytorch/default/1/convnext_OB_best_model.pth'

model.load_state_dict(torch.load(model_path, map_location=device))
model.to(device);

In [None]:
model.eval()
model_results = np.zeros((len(test_dataset), len(submission.columns[1:])))

with torch.no_grad():
    for tta in tta_transforms:
        
        tta_dataset = TestImageDataset(test_root, transform=tta, is_test=True)
        tta_loader = DataLoader(tta_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False)

        start_idx = 0
        for images in tqdm(tta_loader, desc="Predicting with TTA"):
            images = images.to(device)
            logits = model(images)
            probs = F.softmax(logits, dim=1).cpu().numpy()

            if start_idx == 0: print(probs)

            batch_size = images.size(0)
            model_results[start_idx:start_idx + batch_size] += probs
            start_idx += batch_size

model_results /= len(tta_transforms)

In [None]:
test = pd.read_csv('/kaggle/input/hai-competition/test.csv')
submission = pd.read_csv('/kaggle/input/hai-competition/sample_submission.csv')

In [None]:
class_columns = submission.columns[1:]
submission[class_columns] = model_results

submission.to_csv(save_path + 'final_submission.csv', index=False, encoding='utf-8-sig')