In [None]:
# 필요한 라이브러리를 설치합니다.
# PyTorch 기반 SOTA 이미지 모델 라이브러리 ; resnet34 불러오기 위함
!pip install timm

In [None]:
import os
import time

import timm # 모델
import torch
import albumentations as A
import pandas as pd
import numpy as np
import torch.nn as nn
from albumentations.pytorch import ToTensorV2
from torch.optim import Adam
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from torchvision.datasets import ImageFolder
from torch.optim.lr_scheduler import CosineAnnealingLR
from PIL import Image # 이미지 입출력
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score

# EDA

In [None]:
from pathlib import Path
import pandas as pd

CANDIDATES = [Path("."), Path(".."), Path("../.."), Path("../../..")]
ROOT = None
for base in CANDIDATES:
    if (base / "train.csv").exists() and (base / "train").exists():
        ROOT = base.resolve()
        break
if ROOT is None:
    raise FileNotFoundError("train.csv 및 train 폴더를 찾지 못했습니다.")

TRAIN_DIR = ROOT / "train"
print("ROOT:", ROOT)
print("TRAIN_DIR exists:", TRAIN_DIR.exists())

train_df = pd.read_csv(ROOT / "train.csv")
train_df.head()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

counts = train_df["target"].value_counts().sort_index()
display(counts)

plt.figure(figsize=(10,4))
sns.barplot(x=counts.index, y=counts.values)
plt.title("Label distribution")
plt.xticks(rotation=90)
plt.show()

imbalance_ratio = counts.max() / counts.min()
print(f"Imbalance ratio (max/min): {imbalance_ratio:.2f}")

In [None]:
dup_id = train_df.duplicated(subset=["ID"]).sum()
dup_pair = train_df.duplicated(subset=["ID","target"]).sum()
id_conflict = (train_df.groupby("ID")["target"].nunique()>1).sum()

print("중복 ID:", dup_id)
print("중복 (ID,target):", dup_pair)
print("서로 다른 라벨을 가진 ID(라벨 충돌):", id_conflict)

In [None]:
from PIL import Image, UnidentifiedImageError
from tqdm import tqdm
import numpy as np
from collections import Counter

missing, unreadable, flat_images, modes = [], [], [], []

for fname in tqdm(train_df["ID"]):
    p = TRAIN_DIR / fname
    if not p.exists():
        missing.append(fname)
        continue
    try:
        im = Image.open(p)
        modes.append(im.mode)
        arr = np.array(im)
        if arr.size == 0 or arr.std() == 0:
            flat_images.append(fname)
    except (UnidentifiedImageError, OSError):
        unreadable.append(fname)

print("없는 파일 수:", len(missing))
print("읽기 불가 파일 수:", len(unreadable))
print("단색/이상치 의심 수:", len(flat_images))
print("Image modes:", Counter(modes))

In [None]:
from PIL import Image
import matplotlib.pyplot as plt

widths, heights = [], []
checked = 0
for fname in train_df["ID"]:
    p = TRAIN_DIR / fname
    if not p.exists():
        continue
    try:
        im = Image.open(p)
        w, h = im.size
        widths.append(w); heights.append(h)
        checked += 1
    except:
        pass

print("Checked images:", checked)
plt.figure(); plt.hist(widths, bins=30); plt.title("Width"); plt.show()
plt.figure(); plt.hist(heights, bins=30); plt.title("Height"); plt.show()

# Transform(Albumentations)

In [None]:
import albumentations as A
from albumentations.pytorch import ToTensorV2

IMG_SIZE = 224

# 기본 학습 증강 : 기하/색상 약하게 + 표준화
base_train_transform = A.Compose([
    A.SmallestMaxSize(max_size=256),
    A.CenterCrop(height=IMG_SIZE, width=IMG_SIZE),
    A.HorizontalFlip(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.03, scale_limit=0.10, rotate_limit=10, p=0.4),
    A.ColorJitter(brightness=0.15, contrast=0.15, saturation=0.15, hue=0.03, p=0.4),
    A.Normalize(mean=(0.485,0.456,0.406), std=(0.229,0.224,0.225)),
    ToTensorV2(),
])

# 강한 학습 증강 : 소수 클래스에만 적용 (의도적 노이즈 포함)
strong_train_transform=A.Compose([
    A.SmallestMaxSize(max_size=256),
    A.CenterCrop(height=IMG_SIZE, width=IMG_SIZE),
    A.HorizontalFlip(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.06, scale_limit=0.15, rotate_limit=15, p=0.7),
    A.RandomBrightnessContrast(brightness_limit=0.25, contrast_limit=0.25, p=0.5),
    A.CLAHE(clip_limit=2.0, p=0.2),
    A.GaussNoise(var_limit=(5.0, 30.0), p=0.4),
    A.MotionBlur(blur_limit=5, p=0.2),
    A.JpegCompression(quality_lower=60, quality_upper=95, p=0.3),
    A.CoarseDropout(max_holes=4, max_height=IMG_SIZE//8, max_width=IMG_SIZE//8, p=0.3),
    A.Normalize(mean=(0.485,0.456,0.406), std=(0.229,0.224,0.255)),
    ToTensorV2(),
])

# 검증/테스트 : 학습과 동일한 전처리
valid_transform = A.Compose([
    A.SmallestMaxSize(max_size=256),
    A.CenterCrop(height=IMG_SIZE, width=IMG_SIZE),
    A.Normalize(mean=(0.485,0.456,0.406), std=(0.229,0.224,0.225)),
    ToTensorV2(),
])

tst_transform = valid_transform

trn_transform = base_train_transform

In [None]:
# 데이터셋 클래스를 정의합니다.
class ImageDataset(Dataset):
    def __init__(self, csv, path, transform=None, transform_strong=None, minority_set=None):
        self.df = pd.read_csv(csv).values
        self.path = path
        self.transform = transform
        self.transform_strong = transform_strong
        self.minority_set = set(minority_set) if minority_set is not None else None

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx): # DataLoader가 배치 단위로 호출할 때 행
        name, target = self.df[idx]
        target = int(target)
        
        # EXIF 회전 보정 + numpy 변환
        img = np.array(Image.open(os.path.join(self.path, name))) # PIL.Image.open()으로 읽고 numpy array로 변환 img는 (H,W,C)형태 배열
        img = ImageOps.exif_transpose(img)
        img = np.array(img)
        
        # 클래스별로 강/약 증강 분기
        t = self.transform
        if (self.transform_strong is not None) and (self.minority_set is not None) and (target in self.minority_set):
            t = self.transform_strong
        
        if t is not None:
            img = t(image=img)['image']
            
        return img, target # (이미지 텐서, 라벨) 반환 -> DataLoader에서 (batch_size, C, H, W)형태로 묶임

In [None]:
# one epoch 학습을 위한 함수입니다.
def train_one_epoch(loader, model, optimizer, loss_fn, device):
    model.train()
    train_loss = 0 # 에폭 전체 손실 합계 저장
    preds_list = [] # 모델 예측값 저장
    targets_list = [] # 정답(라벨) 저장

    pbar = tqdm(loader) # 진행 상태 출력
    for image, targets in pbar: # DataLoader에서 image, targets(라벨)을 배치 단위로 불러옴
        image = image.to(device) # 이미지를 GPU로 이동
        targets = targets.to(device)

        model.zero_grad(set_to_none=True) # 이전 배치에서 계산된 gradient 초기화, 메모리 최적화

        preds = model(image) # 모델 forward pass 수행으로 예측값 출력
        loss = loss_fn(preds, targets) # 예측값과 정답을 비교해 손실 계산
        loss.backward() # 역전파 파라미터별 gradient 계산
        optimizer.step() # gradient를 이용해 모델 파라미터 업데이트

        train_loss += loss.item() # 현재 배치 손실 누적
        preds_list.extend(preds.argmax(dim=1).detach().cpu().numpy())
        targets_list.extend(targets.detach().cpu().numpy())

        pbar.set_description(f"Loss: {loss.item():.4f}")

    train_loss /= len(loader)
    train_acc = accuracy_score(targets_list, preds_list)
    train_f1 = f1_score(targets_list, preds_list, average='macro')

    ret = {
        "train_loss": train_loss,
        "train_acc": train_acc,
        "train_f1": train_f1,
    }

    return ret

In [None]:
@torch.no_grad()
def valid_one_epoch(loader, model, loss_fn, device):
    model.eval()
    val_loss = 0.0
    preds_all, t_all = [], []

    for images, targets in loader:
        images = images.to(device)
        targets = targets.to(device)

        logits = model(images)
        loss = loss_fn(logits, targets)
        val_loss += loss.item()

        preds = logits.argmax(1)
        preds_all.extend(preds.detach().cpu().numpy())
        t_all.extend(targets.detach().cpu().numpy())

    val_loss /= len(loader)
    val_acc = accuracy_score(t_all, preds_all)
    val_f1  = f1_score(t_all, preds_all, average="macro")
    return {"val_loss": val_loss, "val_acc": val_acc, "val_f1": val_f1}


# 하이퍼파라미터

In [None]:
# device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # cuda 하면 GPU 사용

# data config
data_path = '/root/cv_data/' # 데이터셋 저장 경로 : 이미지 폴더와 csv 파일들 위치

# model config
model_name = 'resnet34' # 'resnet50' 'efficientnet-b0', ...

# training config
img_size = 224 # 입력 이미지 크기인데 32는 너무 작으니 224 정도로 바꾸기 
LR = 1e-3 # 학습률 AdamW은 1e-3 또는 5-4 / SGD는 1e-2 또는 1e-3 많이 사용
EPOCHS = 20 # 20~50 정도 돌려보기
BATCH_SIZE = 64 # 한번에 학습할 이미지 개수 -> GPU 메모리 용량에 따라 조정 64나 128이면 더 안정적인 gradient 추정
num_workers = 0 # DataLoader의 병렬 데이터 로딩 worker 수 0이면 메인 프로세스에서만 로딩(느려짐) 서버는 4~8 권장(코어 수 따라)

# 데이터 로드

In [None]:
# augmentation을 위한 transform 코드
trn_transform = A.Compose([
    # 이미지 크기 조정
    A.Resize(height=img_size, width=img_size),
    # images normalization
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    # numpy 이미지나 PIL 이미지를 PyTorch 텐서로 변환
    ToTensorV2(),
])

# test image 변환을 위한 transform 코드
tst_transform = A.Compose([
    A.Resize(height=img_size, width=img_size),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

df = pd.read_csv("/root/cv_data/train.csv")

# 클래스 비율 유지하며 train/val 분리
trn_df, val_df = train_test_split(
    df,
    test_size = 0.2, # 8:2 분리
    stratify = df['target'], # 클래스 비율 유지
    random_state=42
)

# 임시 csv 저장
trn_df.to_csv('/root/cv_data/train_split.csv', index=False)
val_df.to_csv('/root/cv_data/val_split.csv', index=False)

In [None]:
# Dataset 정의
# minority_set 로드
minority_path = '/root/cv_data/minority_set.txt'
if os.path.exists(minority_path):
    with open(minority_path, 'r') as f:
        txt = f.read().strip()
    minority_set = set(map(int, txt.split(','))) if txt else set()
else:
    minority_set = set()
    
trn_dataset = ImageDataset(
    "/root/cv_data/train_split.csv",
    "/root/cv_data/train",
    transform=trn_transform, # 약 증강
    transform_strong=strong_train_transform, # 강 증강
    minority_set=minority_set
)

val_dataset = ImageDataset(
    "/root/cv_data/val_split.csv",
    "/root/cv_data/train",
    transform=tst_transform # val데이터와 tst데이터는 같은 방식으로 변환해야 하기 때문
)

tst_dataset = ImageDataset(
    "/root/cv_data/sample_submission.csv",
    "/root/cv_data/test",
    transform=tst_transform
)
print(len(trn_dataset), len(val_dataset), len(tst_dataset))

In [None]:
# DataLoader 정의
# WeightedRandomSampler 적용
from torch.utils.data import DataLoader, WeightedRandomSampler

# sampler : 클래스 불균형 보정
sample_weights = np.load('/root/cv_data/sample_weights.npy')
sampler = WeightedRandomSampler(
    weights=torch.tensor(sample_weights, dtype=torch.double),
    num_samples=len(sample_weights),
    replacement=True
)

trn_loader = DataLoader(
    trn_dataset,
    batch_size=BATCH_SIZE,
    sampler=sampler, # shuffle이 false여서 shuffle 대신 sampler 사용
    shuffle=False, # 매 에폭마다 데이터 섞어서 배치 구성
    num_workers=num_workers,
    pin_memory=True, # gpu 전송을 빠르게 하기 위해 메모리 고정
    drop_last=False # 데이터 수가 배치 크기로 안 나눠떨어질 때 마지막 배치 버릴지 여부
)

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=num_workers,
    pin_memory=True
)

tst_loader = DataLoader(
    tst_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False, # 테스트 용은 데이터 순서 유지해야 함
    num_workers=0,
    pin_memory=True
)

In [None]:
# load model
model = timm.create_model(
    model_name,        # ex) "resnet34", "efficientnet-b0"
    pretrained=True,   # ImageNet 사전학습 가중치 사용
    num_classes=17     # 대회 클래스 개수
).to(device)

# 클래스 가중치(CE Loss)
class_weights = torch.tensor(np.load('/root/cv_data/class_weights.npy'), dtype=torch.float, device=device)

loss_fn = nn.CrossEntropyLoss(
    weight=class_weights,
    label_smoothing=0.05 # 라벨 노이즈/경계 완화
)

# Optimizer (Adam → AdamW)
optimizer = torch.optim.AdamW(
    model.parameters(),
    lr=LR,             # 초기 learning rate (예: 1e-3)
    weight_decay=1e-4  # 정규화 효과, 과적합 방지
)

# Scheduler (CosineAnnealingLR)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=EPOCHS,      # 전체 epoch 수와 맞춤
    eta_min=1e-6       # 최소 learning rate (0까지 떨어지지 않도록)
)


In [None]:
model

In [None]:
best_f1 = -1.0  # 가장 높은 val macro-F1 저장용, -1.0으로 초기화(첫 에포크 성능은 무조건 갱신되도록)

for epoch in range(EPOCHS):
    # 1) Train
    tr = train_one_epoch(trn_loader, model, optimizer, loss_fn, device)

    # 2) Validation 한 에포크 끝나고 검증
    va = valid_one_epoch(val_loader, model, loss_fn, device)

    # 3) Scheduler step (CosineAnnealingLR이면 epoch마다 호출 - 그래야 학습률 변함)
    scheduler.step()

    # 4) 로그 출력
    curr_lr = optimizer.param_groups[0]["lr"] # 현재 optimizer가 쓰는 학습률 가져옴
    print(
        f"[{epoch+1:02d}/{EPOCHS}] " # 두 자리 정수로 출력
        f"train_loss={tr['train_loss']:.4f}  train_f1={tr['train_f1']:.4f}  "
        f"val_loss={va['val_loss']:.4f}  val_f1={va['val_f1']:.4f}  lr={curr_lr:.6f}"
    )

    # 5) 베스트 모델 체크포인트 (대회 지표: macro-F1)
    if va["val_f1"] > best_f1: 
        best_f1 = va["val_f1"]
        torch.save(model.state_dict(), "best_resnet34_2.pth")
        print(f">> best updated! val_f1={best_f1:.4f}")


# 추론

In [None]:
# 1) best 모델 가중치 불러오기
model.load_state_dict(torch.load("best_resnet34_2.pth", map_location=device))
model.to(device)
model.eval()  # 평가 모드 전환

# 2) 추론 실행
preds_list = []
with torch.no_grad():  # 추론 시 gradient 계산 안 함, 한번에 감싸고 루프 전 모델 로드(가장 좋은 가중치로 추론하기 위함)
    for image, _ in tqdm(tst_loader):
        image = image.to(device)

        preds = model(image)  # (batch_size, num_classes) 출력
        preds = preds.argmax(dim=1)  # 가장 확률 높은 클래스 선택

        preds_list.extend(preds.detach().cpu().numpy())  # numpy로 변환 후 리스트에 추가


In [None]:
pred_df = pd.DataFrame(tst_dataset.df, columns=['ID', 'target'])
pred_df['target'] = preds_list

In [None]:
pred_df.head()

In [None]:
sample_submission_df = pd.read_csv("/root/cv_data/sample_submission.csv")
assert (sample_submission_df['ID'] == pred_df['ID']).all()

In [None]:
pred_df.to_csv("code2_pred.csv", index=False)