In [None]:
# 셀 1 : 기본 경로 설정 & 시드 고정
import os, random, numpy as np, torch
import warnings
warnings.filterwarnings(
    "ignore",
    message="The number of unique classes is greater than 50% of the number of samples",
    category=UserWarning,
    module=r"sklearn\.")     # sklearn 계열 모듈에만 적용

# 절대 경로 (Windows)
ROOT       = "/car2"
TRAIN_DIR  = os.path.join(ROOT, "data", "train")
TEST_DIR   = os.path.join(ROOT, "data", "test")

CFG = dict(
    # IMG_SIZES = {0:256, 6:384, 12:512, 20:640, 28:768},
    IMG_SIZES = {0: 256, 6: 384, 11: 512, 17: 640},
    # BATCH_SIZES = {448: 48}, # 해상도별 배치 사이즈
    BATCH_SIZES = {256: 224, 384: 96, 512: 48, 640:32, 768:32},
    # ★★★ 해상도별 학습률 ★★★
    LRS = {256: 4e-5, 384: 8e-5, 512: 6e-5, 640: 2e-5, 768: 8e-6},
    EPOCH    = 30,
    FT_EPOCHS = 6,
    FINAL_IMG_SIZE = 768,
    LR       = 4e-5,
    FOLDS    = 5,
    SEED     = 2025
)

def seed_everything(seed:int=42):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark     = False

seed_everything(CFG["SEED"])

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"🔧  ROOT  : {ROOT}")
print(f"🖼️   Train: {TRAIN_DIR}")
print(f"🖼️   Test : {TEST_DIR}")
print(f"🚀  Device: {device}  |  Seed: {CFG['SEED']}")


In [None]:
# ─── 제외할 파일 목록 --------------------------------------------------
EXCLUDE_FILES = {
    # 완전 노이즈
    "5시리즈_G60_2024_2025_0010.jpg",
    "6시리즈_GT_G32_2018_2020_0018.jpg",
    "7시리즈_G11_2016_2018_0040.jpg",
    "911_992_2020_2024_0030.jpg",
    "E_클래스_W212_2010_2016_0022.jpg",
    "K5_2세대_2016_2018_0007.jpg",
    "F150_2004_2021_0018.jpg",
    "G_클래스_W463b_2019_2025_0030.jpg",
    "GLE_클래스_W167_2019_2024_0068.jpg",
    "Q5_FY_2021_2024_0032.jpg",
    "Q30_2017_2019_0075.jpg",
    "Q50_2014_2017_0031.jpg",
    "SM7_뉴아트_2008_2011_0053.jpg",
    "X3_G01_2022_2024_0029.jpg",
    "XF_X260_2016_2020_0023.jpg",
    "뉴_ES300h_2013_2015_0000.jpg",
    "뉴_G80_2025_2026_0042.jpg", "뉴_G80_2025_2026_0043.jpg",
    "뉴_SM5_임프레션_2008_2010_0033.jpg",
    "더_기아_레이_EV_2024_2025_0078.jpg",
    "더_뉴_K3_2세대_2022_2024_0001.jpg",
    "더_뉴_그랜드_스타렉스_2018_2021_0078.jpg",
    "더_뉴_그랜드_스타렉스_2018_2021_0079.jpg",
    "더_뉴_그랜드_스타렉스_2018_2021_0080.jpg",
    "더_뉴_아반떼_2014_2016_0031.jpg",
    "더_뉴_파사트_2012_2019_0067.jpg",
    "레니게이드_2019_2023_0041.jpg",
    "박스터_718_2017_2024_0011.jpg",
    "싼타페_TM_2019_2020_0009.jpg",
    "아반떼_MD_2011_2014_0081.jpg",
    "아반떼_N_2022_2023_0064.jpg", "아반떼_N_2022_2023_0035.jpg",
    "익스플로러_2016_2017_0072.jpg",
    "콰트로포르테_2017_2022_0074.jpg",
    "프리우스_4세대_2019_2022_0052.jpg",
    # 차량 내부
    "E_클래스_W212_2010_2016_0069.jpg",
    "ES300h_7세대_2019_2026_0028.jpg",
    "G_클래스_W463_2009_2017_0011.jpg",
    "GLB_클래스_X247_2020_2023_0008.jpg",
    "GLS_클래스_X167_2020_2024_0013.jpg",
    "K3_2013_2015_0045.jpg",
    "K5_3세대_2020_2023_0081.jpg",
    "Q7_4M_2020_2023_0011.jpg",
    "RAV4_5세대_2019_2024_0020.jpg",
    "S_클래스_W223_2021_2025_0008.jpg", "S_클래스_W223_2021_2025_0071.jpg",
    "X4_F26_2015_2018_0068.jpg",
    "그랜드_체로키_WL_2021_2023_0018.jpg",
    "레이_2012_2017_0063.jpg",
    "레인지로버_5세대_2023_2024_0030.jpg",
    "레인지로버_스포츠_2세대_2018_2022_0014.jpg", "레인지로버_스포츠_2세대_2018_2022_0017.jpg",
    "마칸_2019_2021_0035.jpg",
    "머스탱_2015_2023_0086.jpg",
    "아반떼_MD_2011_2014_0009.jpg", "아반떼_MD_2011_2014_0082.jpg",
    "컨티넨탈_GT_3세대_2018_2023_0007.jpg",
    "타이칸_2021_2025_0065.jpg",
    "파나메라_2010_2016_0000.jpg", "파나메라_2010_2016_0036.jpg",
    "3시리즈_F30_2013_2018_0036.jpg",
    "4시리즈_F32_2014_2020_0027.jpg",
    "5시리즈_G60_2024_2025_0056.jpg",
    "7시리즈_F01_2009_2015_0029.jpg", "7시리즈_F01_2009_2015_0044.jpg",
    "911_992_2020_2024_0006.jpg",
    "C_클래스_W204_2008_2015_0068.jpg",
    "CLS_클래스_C257_2019_2023_0021.jpg",
    # 뒷트렁크 열림
    "Q30_2017_2019_0074.jpg", "글래디에이터_JT_2020_2023_0075.jpg",
    "뉴_CC_2012_2016_0001.jpg", "뉴_CC_2012_2016_0002.jpg",
    "더_뉴_코나_2021_2023_0081.jpg",
    "2시리즈_액티브_투어러_U06_2022_2024_0004.jpg",
    "A8_D5_2018_2023_0084.jpg",
}

# ---------- Cell-2 : StratifiedKFold (이미지 단위) --------------------
import os, hashlib, cv2, numpy as np, pandas as pd
from pathlib import Path
from sklearn.model_selection import StratifiedKFold
import pickle # 캐싱을 위해 pickle 라이브러리 import
from tqdm.auto import tqdm # 진행 상황 확인을 위해 tqdm import

# 1) ────────────────────────── ②  중복 검출 해시 함수  ─────────────────
def sha1(path: str) -> str:
    # 8 ~ 10 ms/장. 충분히 빠릅니다.
    with open(path, "rb") as f:
        return hashlib.sha1(f.read()).hexdigest()

TRAIN_DIR = Path(TRAIN_DIR)

# 2. 캐시 파일 경로 정의
CACHE_PATH = Path(ROOT) / "hash_cache.pkl"

# 3. 캐시 파일이 있으면 불러오고, 없으면 생성
if CACHE_PATH.exists():
    print(f"✅ Loading hash cache from: {CACHE_PATH}")
    with open(CACHE_PATH, "rb") as f:
        path_to_hash = pickle.load(f)
else:
    print(f"⚠️ Hash cache not found. Creating a new one... (This will take a minute)")
    path_to_hash = {}
    # 전체 이미지 경로를 미리 수집
    all_img_paths = list(TRAIN_DIR.glob("**/*.jpg"))
    for img_path in tqdm(all_img_paths, desc="Computing Hashes"):
        path_to_hash[str(img_path)] = sha1(img_path)

    # 다음 실행을 위해 캐시 파일 저장
    with open(CACHE_PATH, "wb") as f:
        pickle.dump(path_to_hash, f)
    print(f"✅ Hash cache created and saved to: {CACHE_PATH}")

# 2) (alias 반영된) 클래스 목록 구축
class_names    = sorted([p.name for p in TRAIN_DIR.iterdir() if p.is_dir()])
cls2id        = {c: i for i, c in enumerate(class_names)}

records, seen_hash = [], set()

# 디스크를 다시 읽는 대신, 미리 계산된 해시 딕셔너리를 사용
for img_path_str, h in tqdm(path_to_hash.items(), desc="Filtering Duplicates"):
    img_path = Path(img_path_str)

    if img_path.name in EXCLUDE_FILES:
        continue

    if h in seen_hash:
        continue
    seen_hash.add(h)

    # 파일 경로에서 클래스 이름(폴더명)을 추출
    class_name = img_path.parent.name
    records.append([img_path_str, cls2id[class_name]])

df = pd.DataFrame(records, columns=["img_path", "label"])
print(f"#images {len(df):,} | #classes {len(class_names)}")

# 3) StratifiedKFold ----------------------------------------------------
df["fold"] = -1
skf = StratifiedKFold(n_splits=CFG["FOLDS"], shuffle=True,
                      random_state=CFG["SEED"])
for fold, (_, val_idx) in enumerate(skf.split(df, y=df["label"])):
    df.loc[val_idx, "fold"] = fold

print("\n◆ per-class 이미지 수 by fold")
print(df.groupby(["label", "fold"]).size().unstack(fill_value=0).head())

In [None]:
# ───────────────────── 셀 3 (재수정) ─────────────────────
import albumentations as A
from albumentations.pytorch import ToTensorV2
import cv2, numpy as np

# "convnext_base" 전용
# MEAN, STD = [0.485, 0.456, 0.406], [0.229, 0.224, 0.225]

# "convnext_base.clip_laion2b_augreg_ft_in12k_in1k_384" 전용
MEAN, STD = [0.48145466, 0.4578275, 0.40821073], [0.26862954, 0.26130258, 0.27577711]

IMG_MAX   = max(CFG["IMG_SIZES"].values())           # 512

# 1) Half-crop ----------------------------------------------------------
def half_crop(img, **kw):
    h, w, _ = img.shape
    side = np.random.choice(["top", "bottom", "left", "right"])
    if   side == "top":    img = img[:h//2]
    elif side == "bottom": img = img[h//2:]
    elif side == "left":   img = img[:, :w//2]
    else:                  img = img[:,  w//2:]
    return img
half_crop_aug = A.Lambda(image=half_crop)

# 2) 우측-상단 마스킹 ---------------------------------------------------
def mask_rand_tr(img, frac=(0.10, 0.25), **kw):
    h, w, _ = img.shape
    ph, pw = int(h * np.random.uniform(*frac)), int(w * np.random.uniform(*frac))
    img[0:ph, w-pw:w] = 0
    return img
mask_ur_aug = A.Lambda(image=mask_rand_tr)

# 3) Letter-box --------------------------------------------------------
def letterbox_block(sz):
    return [
        A.LongestMaxSize(max_size=sz, interpolation=cv2.INTER_CUBIC),
        A.PadIfNeeded(min_height=sz, min_width=sz,
                      border_mode=cv2.BORDER_CONSTANT, fill=0),
    ]

# 4) build_aug ---------------------------------------------------------
def build_aug(sz: int, phase: str = "train"):
    if phase == "train":
        # return A.Compose([
        #     # (a) 10 % 확률 스티커 마스킹
        #     A.OneOf([mask_ur_aug, A.NoOp()], p=0.10),

        #     # (b) half-crop 20 %  vs  RandomResizedCrop 80 %
        #     A.OneOf([
        #         A.Lambda(image=half_crop, p=1.0),                    # half-crop
        #         A.RandomResizedCrop(size=(sz, sz),                   # ★ tuple!
        #                             scale=(0.8, 1.0),
        #                             ratio=(0.75, 1.333), p=1.0),
        #     ], p=1.0),
        #     # A.RandomResizedCrop(size=(sz, sz),                   # ★ tuple!
        #     #         scale=(0.5, 1.0),
        #     #         ratio=(0.75, 1.333), p=1.0),
        #     # (c) Letter-box로 정확히 sz×sz
        #     *letterbox_block(sz),

        #     # (d) 약한 변형
        #     A.HorizontalFlip(p=0.25),
        #     A.Perspective(scale=(0.05, 0.1), p=0.20),
        #     A.OneOf([
        #         A.ColorJitter(0.4, 0.4, 0.4, 0.1, p=1.0),
        #         A.Affine(translate_percent=(0.05, 0.05),
        #                  scale=(0.9, 1.1), rotate=(-15, 15), p=1.0),
        #     ], p=1.0),
        #     A.ToGray(p=0.10),

        #     # (e) CoarseDropout — letter-box 이후
        #     A.CoarseDropout(
        #         num_holes_range=(1, 2),
        #         hole_height_range=(int(sz*0.10), int(sz*0.25)),
        #         hole_width_range =(int(sz*0.10), int(sz*0.25)),
        #         fill=0, p=0.50
        #     ),

        #     A.Normalize(MEAN, STD),
        #     ToTensorV2(),
        # ])

        # 1. RandomResizedCrop 블록 정의
        rrc = A.RandomResizedCrop(
                size=(sz, sz),
                scale=(0.5, 1.0),  # 원본의 30%까지 잘라내어 부분만 보는 훈련 강화
                ratio=(0.75, 1.333),
                p=1.0
            )

        # 2. Letterbox 블록 정의 (기존 val_tf와 동일)
        letter = A.Compose([
                *letterbox_block(sz) # LongestMaxSize + PadIfNeeded
            ])

        return A.Compose([
            # (a) 10 % 확률 스티커 마스킹
            A.OneOf([mask_ur_aug, A.NoOp()], p=0.10),

            # ★★★ 50:50 확률로 두 리사이징 전략 중 하나를 선택 ★★★
            A.OneOf([rrc, letter], p=1.0),

            # 2. 기하학적 변환: 다양한 구도와 각도 대응
            A.HorizontalFlip(p=0.5),
            # Perspective와 Affine을 낮은 확률로 함께 사용하여 복합적인 왜곡 생성
            A.Perspective(scale=(0.05, 0.1), p=0.3),
            A.Affine(translate_percent=0.1, scale=(0.9, 1.1), rotate=(-15, 15), shear=(-10, 10), p=0.3),

            # 3. 색상/조명/노이즈 변환: 까다로운 조명 조건 대응
            A.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.05, p=0.8),
            A.ToGray(p=0.15),
            # 저조도 환경의 노이즈와 블러를 시뮬레이션
            A.OneOf([
                A.GaussianBlur(p=1.0),
                A.ISONoise(p=1.0),
            ], p=0.2),

            # 4. 가려짐(Occlusion) 시뮬레이션
            # A.Erasing(
            #     p=0.3,
            #     scale=(0.02, 0.25), # 이미지의 2% ~ 25% 영역을 무작위 노이즈로 지움
            #     ratio=(0.3, 3.3)
            # ),
            # 5. 정규화
            A.Normalize(MEAN, STD),
            ToTensorV2(),
        ])

    # ----- val / test -----
    return A.Compose([
        *letterbox_block(sz),
        A.Normalize(MEAN, STD),
        ToTensorV2(),
    ])

# warm-up(256) 초기화
train_tf = build_aug(256, "train")
val_tf   = build_aug(256, "val")
print("✅ Albumentations pipeline ready (progressive-resize compatible)")


In [None]:
# ────────────────────────────────  셀 4  ────────────────────────────────
import cv2, torch, numpy as np
from torch.utils.data import Dataset, DataLoader, Sampler
from functools import partial
from collections import defaultdict

class HardPairSampler(Sampler):
    def __init__(self, cls2idx, hard_pairs, batch_size, total_samples, prob=0.3):
        self.cls2idx = cls2idx
        self.hard_pairs = list(hard_pairs) if hard_pairs else []
        self.batch_size = batch_size
        self.total_samples = total_samples
        self.prob = prob
        
        self.all_indices = list(range(total_samples))

    def __iter__(self):
        # 전체 데이터 길이에 맞춰 배치 개수 계산
        num_batches = self.total_samples // self.batch_size
        
        for _ in range(num_batches):
            # 30% 확률로 Hard-pair 배치 생성
            if self.hard_pairs and random.random() < self.prob:
                batch_indices = []
                # 배치 크기/2 만큼의 hard pair를 샘플링
                for _ in range(self.batch_size // 2):
                    # hard_pairs에서 무작위로 클래스 쌍 선택
                    anchor_cls, positive_cls = random.choice(self.hard_pairs)
                    
                    # 각 클래스에서 이미지 인덱스를 하나씩 샘플링
                    # 샘플이 1개뿐인 경우를 대비한 예외 처리
                    anchor_idx = random.choice(self.cls2idx.get(anchor_cls, [0]))
                    positive_idx = random.choice(self.cls2idx.get(positive_cls, [0]))
                    
                    batch_indices.extend([anchor_idx, positive_idx])
                yield batch_indices
            else:
                # 일반 무작위 샘플링
                yield random.sample(self.all_indices, self.batch_size)

    def __len__(self):
        return self.total_samples // self.batch_size

# ---------------- Dataset ----------------------------------------------
class CarDataset(Dataset):
    def __init__(self, df, transform=None, is_test: bool=False):
        self.paths  = df["img_path"].tolist()
        self.labels = None if is_test else df["label"].tolist()
        self.tf = transform;  self.is_test = is_test

        # 클래스별 인덱스 캐시 (anchor/positive 샘플링용)
        cls2idx = defaultdict(list)
        if not is_test:
            for idx, lbl in enumerate(self.labels):
                cls2idx[lbl].append(idx)
        self.cls2idx = cls2idx

    def __len__(self): return len(self.paths)

    def __getitem__(self, idx):
        # is_test 처리는 그대로
        if self.is_test:
            img = cv2.cvtColor(cv2.imread(self.paths[idx]), cv2.COLOR_BGR2RGB)
            if self.tf: img = self.tf(image=img)["image"]
            return img, self.paths[idx]
        
        # 항상 이미지 하나와 라벨 하나만 반환
        img = cv2.cvtColor(cv2.imread(self.paths[idx]), cv2.COLOR_BGR2RGB)
        if self.tf: img = self.tf(image=img)["image"]
        label = self.labels[idx]
        return img, label

# ---------------- CutMix / MixUp helper ---------------------------------
def rand_bbox(W, H, lam):
    cut_rat = np.sqrt(1. - lam)
    cut_w, cut_h = int(W * cut_rat), int(H * cut_rat)
    cx, cy = np.random.randint(W), np.random.randint(H)
    bbx1 = np.clip(cx - cut_w // 2, 0, W)
    bby1 = np.clip(cy - cut_h // 2, 0, H)
    bbx2 = np.clip(cx + cut_w // 2, 0, W)
    bby2 = np.clip(cy + cut_h // 2, 0, H)
    return int(bbx1), int(bby1), int(bbx2), int(bby2)

def collate_cutmix(batch, alpha: float = 1.0, prob: float = 0.1):
    imgs, labels = list(zip(*batch))
    imgs, labels = torch.stack(imgs), torch.tensor(labels)
    labels2 = labels
    
    if np.random.rand() < prob:
        lam = np.random.beta(alpha, alpha)
        rand_idx = torch.randperm(imgs.size(0))
        imgs2, labels2 = imgs[rand_idx], labels[rand_idx]

        _, H, W = imgs.shape[1:]
        x1, y1, x2, y2 = rand_bbox(W, H, lam)   # ← tuple 언팩 OK
        imgs[:, :, y1:y2, x1:x2] = imgs2[:, :, y1:y2, x1:x2]

        lam = 1.0 - (x2 - x1) * (y2 - y1) / (W * H)
    else:
        lam, labels2 = 1.0, labels

    return imgs, labels, labels2, lam

def collate_mixup(batch, alpha=0.2):
    imgs, labels = list(zip(*batch)); imgs = torch.stack(imgs); labels = torch.tensor(labels)
    lam = np.random.beta(alpha, alpha)
    rand_idx = torch.randperm(imgs.size(0))
    imgs = lam*imgs + (1-lam)*imgs[rand_idx]
    labels2 = labels[rand_idx]
    return imgs, labels, labels2, lam

def collate_plain(batch):
    imgs, labels = list(zip(*batch))
    return torch.stack(imgs), torch.tensor(labels)

# ---------------- collate_fn dispatcher ---------------------------------
def get_collate_fn(epoch):
    if   epoch <= 15: return lambda b: collate_cutmix(b, alpha=1.0, prob=0.3)
    elif epoch <= 20: return lambda b: collate_mixup(b, alpha=0.2)
    else:             return collate_plain


In [None]:
# ────────────────────────────────  셀 5  ────────────────────────────────
def make_loaders(fold:int,
                 df_full,
                 epoch:int,
                 train_tf,
                 val_tf,
                 batch_size:int = 32,
                 num_workers:int = 10,
                 hard_pairs: set | None = None):   # 🔸추가
    """
    epoch : 현재 epoch 번호 (증강·Collate 스케줄용)
    """
    train_df = df_full[df_full.fold != fold].reset_index(drop=True)
    val_df   = df_full[df_full.fold == fold].reset_index(drop=True)

    train_set = CarDataset(train_df, transform=train_tf)
    val_set   = CarDataset(val_df,   transform=val_tf)

    # 1. HardPairSampler 인스턴스 생성
    sampler = HardPairSampler(
        cls2idx=train_set.cls2idx,
        hard_pairs=hard_pairs,
        batch_size=batch_size,
        total_samples=len(train_set)
    )

    base_collate_fn = get_collate_fn(epoch)

    
    train_loader = DataLoader(
        train_set,
        batch_sampler=sampler, # ★ shuffle 대신 batch_sampler 사용
        num_workers=num_workers,
        pin_memory=True,
        persistent_workers=True,
        collate_fn=base_collate_fn
    )

    val_loader = DataLoader(
        val_set,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True,
        persistent_workers=True,
        collate_fn=collate_plain
    )

    # test_paths = sorted([os.path.join(TEST_DIR,f)
    #                      for f in os.listdir(TEST_DIR) if f.lower().endswith(".jpg")])
    # test_set = CarDataset(pd.DataFrame({"img_path":test_paths}),
    #                       transform=val_tf, is_test=True)
    # test.csv를 직접 읽어 순서를 보장해야 합니다.
    test_df = pd.read_csv(os.path.join(ROOT, "data", "test.csv"))
    # test.csv의 경로가 상대 경로일 수 있으므로 절대 경로로 변환
    test_df['img_path'] = test_df['img_path'].apply(lambda p: os.path.join(ROOT, "data", p))
    test_set = CarDataset(test_df, transform=val_tf, is_test=True)

    test_loader = DataLoader(
        test_set,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True,
        persistent_workers=True,
        collate_fn=lambda b: (torch.stack([x[0] for x in b]),
                              [x[1] for x in b])
    )

    return train_loader, val_loader, test_loader


In [None]:
# ────────────────────────────────  셀 6 (모델 정의) ────────────────────────────────
import torch, torch.nn as nn, torch.nn.functional as F, timm
from einops import rearrange

# --------------------------- GeM Pool -----------------------------------------
class GeM(nn.Module):
    def __init__(self, p: float = 3.0, eps: float = 1e-6):
        super().__init__()
        self.p   = nn.Parameter(torch.ones(1) * p)
        self.eps = eps
    def forward(self, x):
        return F.avg_pool2d(x.clamp(min=self.eps).pow(self.p),
                            kernel_size=(x.size(-2), x.size(-1))
                           ).pow(1.0/self.p).flatten(1)

# -------------------- Sub-center ArcFace Head (k=3, s=30, m=0.25) -------------
class ArcMarginProduct_subcenter(nn.Module):
    def __init__(self,
                 in_features:  int,
                 out_features: int,
                 k: int   = 3,
                 s: float = 30.0,   # ⬆ scale 30 
                 m: float = 0.25):  # ⬆ margin 0.50 → 0.25
        super().__init__()
        self.out_features, self.k, self.s, self.m = out_features, k, s, m
        self.weight = nn.Parameter(torch.randn(out_features * k, in_features))
        nn.init.xavier_uniform_(self.weight)

    def forward(self, x, label: torch.Tensor | None = None):
        x_norm = F.normalize(x, dim=1)
        w_norm = F.normalize(self.weight, dim=1)

        cosine = F.linear(x_norm, w_norm)           # (B, out*k)
        cosine = cosine.view(-1, self.out_features, self.k)
        cos_max, _ = torch.max(cosine, dim=2)       # (B, out)

        if label is None:               # inference (margin X)
            return self.s * cos_max
        # ---------- margin 추가 (학습) ----------
        theta   = torch.acos(cos_max.clamp(-1+1e-7, 1-1e-7))
        cos_m   = torch.cos(theta + self.m)
        one_hot = F.one_hot(label, self.out_features).float().to(x.device)
        logits  = self.s * (one_hot * cos_m + (1.0 - one_hot) * cos_max)
        return logits

# --------------------------- Backbone + Head -----------------------------------
class CarNet(nn.Module):
    def __init__(self,
                 n_classes: int,
                 k: int = 3,
                 s: float = 30.0,
                 m: float = 0.25,
                 drop_path_rate: float = 0.1):
        super().__init__()
        self.backbone = timm.create_model(
            "convnext_base.clip_laion2b_augreg_ft_in12k_in1k_384",
            pretrained=True,
            features_only=True, # ★ 중요: 여러 단계의 특징 맵을 리스트로 반환하도록 설정
            drop_path_rate=drop_path_rate          # ⬆ DropPath 0.1
        )

        # 2. 백본의 마지막 두 단계의 출력 채널 수를 가져옴
        feature_info = self.backbone.feature_info.channels()
        # 예: convnext_base -> [128, 256, 512, 1024] -> 마지막 두 개는 512, 1024
        in_dim1 = feature_info[-2] # 두 번째 마지막 특징 맵의 채널 수 (로컬 정보)
        in_dim2 = feature_info[-1] # 마지막 특징 맵의 채널 수 (글로벌 정보)

        # 3. 각 특징 맵에 적용할 별도의 GeM 풀링 레이어 2개 생성
        self.pool1 = GeM(p=3)
        self.pool2 = GeM(p=3)

        # ★★★ 정규화 레이어 추가 ★★★
        # 각 특징 벡터의 차원에 맞는 LayerNorm을 각각 생성합니다.
        self.norm1 = nn.LayerNorm(in_dim1)
        self.norm2 = nn.LayerNorm(in_dim2)

        # 4. 두 특징 벡터를 연결할 것이므로, 헤드의 입력 차원은 두 차원의 합
        head_in_dim = in_dim1 + in_dim2

        self.head = ArcMarginProduct_subcenter(
            in_features=head_in_dim, 
            out_features=n_classes, 
            k=k, s=s, m=m)

    def forward(self, x, label: torch.Tensor | None = None, return_feat=False):
        # 1. 백본에서 특징 맵 리스트를 받아옴
        features = self.backbone(x) # [map1, map2, map3, map4]

        # 2. 마지막 두 개의 특징 맵을 각각 풀링
        feat1 = self.pool1(features[-2]) # 로컬 특징
        feat2 = self.pool2(features[-1]) # 글로벌 특징

        # ★★★ 각 특징 벡터를 정규화 ★★★
        norm_feat1 = self.norm1(feat1)
        norm_feat2 = self.norm2(feat2)

        # 3. 정규화된 특징 벡터들을 연결합니다.
        feat_combined = torch.cat([norm_feat1, norm_feat2], dim=1)

        # 4. 연결된 특징으로 로짓 계산
        logits = self.head(feat_combined, label)
        return (logits, feat_combined) if return_feat else logits

# --------------------------- 예시 인스턴스 -------------------------------------
n_classes = len(class_names)         # 396
model = CarNet(n_classes).to(device)
model = model.to(memory_format=torch.channels_last)
model = torch.compile(model)         # PyTorch ≥ 2.0

print("✅ Model initialized – ConvNeXt-B (DropPath 0.1) + GeM + Sub-center ArcFace "
      f"(k=3, s=30, m=0.25)")

In [None]:
# ───────────────────────────────  셀 7 : 학습 세트업  ───────────────────────────────
# 1) 모델은 셀 6에서 이미 생성되어 있음 (model)

# 2) Loss (초기값만, epoch 루프에서 0.10→0.05로 갱신)
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

# 3) Optimizer ── Backbone lr = CFG["LR"], Head lr = ×5
def param_groups(model, base_lr, head_lr_mul=5):
    back, head = [], []
    for n,p in model.named_parameters():
        (head if "head" in n else back).append(p)
    return [{"params":back,  "lr":base_lr},
            {"params":head,  "lr":base_lr*head_lr_mul}]

optimizer = torch.optim.AdamW(
    param_groups(model, CFG["LR"]),
    lr=CFG["LR"], weight_decay=1e-2
)


# 5) AMP & EMA
scaler      = torch.amp.GradScaler(enabled=(device=="cuda"))
ema_decay   = 0.997
ema_weights = [p.clone().detach() for p in model.parameters()]

print("✅ Optimizer ready – Scheduler will be created inside the Fold loop.")

In [None]:
# # ──────────────────────────────  셀 8  (본학습용 LR Finder / 돌린 후 주석 처리) ─────────────────────────────
# # ──────────────────────────────  반드시 주석 처리!!!!!!!!!!!!!!!!!!!!!!!!! ─────────────────────────────

# from torch_lr_finder import LRFinder           # pip install torch-lr-finder
# tmp_loader = DataLoader(                       # 작은 서브셋(예: 2~3k 샘플)
#     CarDataset(df.sample(3000, random_state=0).reset_index(drop=True),
#                transform=train_tf),
#     batch_size=CFG["BATCH"], shuffle=True, num_workers=4)

# lr_finder = LRFinder(model, optimizer, criterion, device=device)
# lr_finder.range_test(tmp_loader,
#                      start_lr=1e-5, end_lr=1e-2,
#                      num_iter=1000)
# lr_finder.plot()    # 그래프 확인
# lr_finder.reset()   # 옵티마이저 상태 복구
# # ▲▲▲ LR Finder 끝 ───────────────────────────

In [None]:
# ──────────────────────────────  셀 9  (W&B 로깅 통합) ─────────────────────────────
import time, math, os, numpy as np, wandb, torch
from pathlib import Path
from tqdm.auto       import tqdm
from sklearn.metrics import log_loss, confusion_matrix
from timm.layers     import DropPath
from torch.optim.lr_scheduler import LinearLR, CosineAnnealingLR, SequentialLR

# ╭─ W&B 기본 ─────────────────────────────────────────────────────────────────╮
WANDB_PROJECT = "hecto_car_version3_0613"
WANDB_RUNNAME = f"convnextB_k5_bs"
TOP_K         = 300
HEAD_MULT     = 5          # back-bone LR 1 ×, head LR 5 ×
device        = "cuda" if torch.cuda.is_available() else "cpu"
# ╰────────────────────────────────────────────────────────────────────────────╯


# ───────── DropPath helper ───────────────────────────────────────────────────
def set_drop_path(model, p: float):
    for m in model.modules():
        if isinstance(m, DropPath):
            m.drop_prob = p

# ───────── EMA · 기타 ────────────────────────────────────────────────────────
def update_ema(model, ema_w, d):
    with torch.no_grad():
        for p, e in zip(model.parameters(), ema_w):
            e.mul_(d).add_(p.data, alpha=1 - d)

def param_groups(model, lr, head_mult: int = HEAD_MULT):
    back, head = [], []
    for n, p in model.named_parameters():
        (head if "head" in n else back).append(p)
    return [{"params": back,  "lr": lr},
            {"params": head,  "lr": lr * head_mult}]

def topk_accuracy(logits, labels, topk=(1, 5)):
    with torch.no_grad():
        maxk = max(topk)
        _, pred = logits.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(labels.view(1, -1).expand_as(pred))
        return [correct[:k].reshape(-1).float().mean().item() for k in topk]

def grad_global_norm(model):
    total = 0.0
    for p in model.parameters():
        if p.grad is not None:
            total += p.grad.detach().float().pow(2).sum().item()
    return total ** 0.5

def build_scheduler(optimizer, warm_iters, main_iters, eta_min=0.0):
    """(Warm-up → Cosine) or 단일 Cosine 스케줄러 반환"""
    if warm_iters == 0:
        return CosineAnnealingLR(optimizer, T_max=main_iters, eta_min=eta_min)
    warm  = LinearLR(optimizer,  start_factor=0.05, end_factor=1.0,
                     total_iters=warm_iters)
    cos   = CosineAnnealingLR(optimizer, T_max=main_iters, eta_min=eta_min)
    return SequentialLR(optimizer, [warm, cos], milestones=[warm_iters])

# ─────────────── train / val 루프 ────────────────────────────────────────────
def train_one_epoch(model, loader, scaler, optim, scheduler, ema_w, epoch):
    model.train()
    run_loss = grad_acc = 0.0
    pbar = tqdm(loader, desc=f"Ep{epoch:02d} ▸ train", leave=False)

    for i, batch in enumerate(pbar, 1):
        if len(batch) == 4:
            x, y1, y2, lam = batch
        else:
            x, y1 = batch;  y2, lam = y1, 1.0

        x   = x.to(device, memory_format=torch.channels_last)
        y1, y2 = y1.to(device), y2.to(device)

        with torch.amp.autocast(device_type="cuda", enabled=(device=="cuda")):
            if lam < 1.0:
                logits1 = model(x, label=y1)
                logits2 = model(x, label=y2)
                loss = lam*criterion(logits1, y1) + (1-lam)*criterion(logits2, y2)
            else:
                logits = model(x, label=y1)
                loss   = criterion(logits, y1)

        scaler.scale(loss).backward()
        scaler.unscale_(optim)
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        grad_acc += grad_global_norm(model)

        scaler.step(optim);  scaler.update();  scheduler.step()
        optim.zero_grad(set_to_none=True)
        update_ema(model, ema_w, ema_decay)

        run_loss += loss.item()
        pbar.set_postfix(L=f"{run_loss/i:.4f}")

    return run_loss/i, grad_acc/i


def validate_one_epoch(model, loader, epoch, hard_prev):
    model.eval()
    vloss, logL, lblL = [], [], []
    with torch.no_grad(), torch.amp.autocast(device_type="cuda", enabled=(device=="cuda")):
        for x, l in tqdm(loader, desc=f"Ep{epoch:02d} ▸ val", leave=False):
            x, l = x.to(device, memory_format=torch.channels_last), l.to(device)
            logits, _ = model(x, label=None, return_feat=True)
            vloss.append(criterion(logits, l).item())
            logL.append(logits.cpu()); lblL.append(l.cpu())

    logitsT = torch.cat(logL);  labelsT = torch.cat(lblL)
    probs   = logitsT.softmax(1).numpy()
    val_ll  = log_loss(labelsT, probs, labels=list(range(n_classes)))
    top1, top5 = topk_accuracy(logitsT, labelsT)
    avg_maxP   = probs.max(1).mean()

    # ── Hard-pair mining
    cm = confusion_matrix(labelsT.numpy(), probs.argmax(1), labels=range(n_classes))
    cm[np.diag_indices_from(cm)] = 0
    flat = np.argpartition(cm.reshape(-1), -TOP_K)[-TOP_K:]
    r, c = np.unravel_index(flat, cm.shape)
    hard_new = {(int(a), int(b)) for a, b in zip(r, c) if cm[a, b] > 0}
    new_cnt, retired_cnt = len(hard_new - hard_prev), len(hard_prev - hard_new)
    conf_err = int(cm.sum())

    return (np.mean(vloss), val_ll, top1, top5, probs, labelsT.numpy(),
            hard_new, conf_err, avg_maxP, new_cnt, retired_cnt)

# ───────── Fold-level 하이퍼파라미터 ───────────────────────────────────────────
FT_LR_SCALE = 1.5
SCALE_MAP   = {256:1.0, 384:1.0, 512:0.9, 640:0.5, 768:0.3}   # 해상도별 배수

fold_best, all_logits, all_labels = [], [], []

for fold in range(CFG["FOLDS"]):
    hard_pairs_global = set()
    run = wandb.init(project=WANDB_PROJECT,
                     name=f"{WANDB_RUNNAME}_fold{fold}",
                     config={**CFG, "fold": fold}, reinit=True)

    # ── 모델 & Optimizer (처음은 256 px 기준)
    model = CarNet(len(class_names), k=3, s=30.0, m=0.25)\
            .to(device).to(memory_format=torch.channels_last)

    base_lr_init = CFG["LR"] * SCALE_MAP[256]
    optimizer = torch.optim.AdamW(param_groups(model, base_lr_init, HEAD_MULT),
                                  lr=base_lr_init,  weight_decay=1e-2)

    scaler      = torch.amp.GradScaler(enabled=(device=="cuda"))
    ema_decay   = 0.997
    ema_weights = [p.detach().clone() for p in model.parameters()]
    best_ll, last_ll = math.inf, None

    finetune_start_epoch = CFG["EPOCH"] - CFG["FT_EPOCHS"]

    # ── 첫 DataLoader · Scheduler 생성 (256 px)
    img_sz   = 256
    batch_sz = CFG["BATCH_SIZES"][img_sz]

    train_tf = build_aug(img_sz, "train")
    val_tf   = build_aug(img_sz, "val")
    train_loader, val_loader, _ = make_loaders(
        fold, df_full=df, epoch=0,
        train_tf=train_tf, val_tf=val_tf,
        batch_size=batch_sz, num_workers=10, hard_pairs=hard_pairs_global)

    steps_ep  = len(train_loader)
    warm_it   = 3 * steps_ep
    main_it   = max(1, (CFG["IMG_SIZES"][6] - 0) * steps_ep - warm_it)  # 256 → 384 구간
    scheduler = build_scheduler(optimizer, warm_it, main_it, eta_min=base_lr_init*0.1)

    # ── Epoch Loop ────────────────────────────────────────────────────────────
    for ep in range(CFG["EPOCH"]):
        # 상태 전환(해상도 변경 / Fine-tune 진입) 판정
        is_resize_epoch        = ep in CFG["IMG_SIZES"]
        is_first_finetune_ep   = (ep == finetune_start_epoch)

        if is_resize_epoch or is_first_finetune_ep:
            if is_first_finetune_ep:               # Fine-tune(768 px)
                img_sz   = CFG["FINAL_IMG_SIZE"]
                batch_sz = CFG["BATCH_SIZES"][img_sz]
                base_lr  = CFG["LR"] * SCALE_MAP[768] * FT_LR_SCALE
                print(f"✨ Epoch {ep}: Entering Fine-tune at {img_sz}px")
            else:                                   # Progressive-resize
                img_sz   = CFG["IMG_SIZES"][ep]
                batch_sz = CFG["BATCH_SIZES"][img_sz]
                base_lr  = CFG["LR"] * SCALE_MAP.get(img_sz, 0.3)
                print(f"✨ Epoch {ep}: Resize → {img_sz}px | Base LR {base_lr:.2e}")

            # Optimizer LR 갱신
            for i, pg in enumerate(optimizer.param_groups):
                pg["lr"] = base_lr if i == 0 else base_lr * HEAD_MULT
                pg["initial_lr"] = pg["lr"]

            # DataLoader 재생성
            train_tf = build_aug(img_sz, "train")
            val_tf   = build_aug(img_sz, "val")
            train_loader, val_loader, _ = make_loaders(
                fold, df_full=df, epoch=ep,
                train_tf=train_tf, val_tf=val_tf,
                batch_size=batch_sz, num_workers=10,
                hard_pairs=hard_pairs_global)

            steps_ep  = len(train_loader)
            if is_first_finetune_ep:
                # EMA 재초기화
                for e, p in zip(ema_weights, model.parameters()):
                    e.copy_(p.detach())
                warm_it = 0
                main_it = (CFG["EPOCH"] - ep) * steps_ep
                eta_min = 0.0
            else:
                # 현 스테이지 잔여 에폭 계산
                next_switch_ep = min(
                    [k for k in list(CFG["IMG_SIZES"].keys())+[finetune_start_epoch] if k > ep] or [CFG["EPOCH"]])
                stage_epochs = next_switch_ep - ep
                warm_it = 3 * steps_ep
                main_it = max(1, stage_epochs*steps_ep - warm_it)
                eta_min = base_lr * 0.1

            scheduler = build_scheduler(optimizer, warm_it, main_it, eta_min)

        # Loss 함수(LS 가변)
        if ep >= finetune_start_epoch:
            model.head.m = 0.0
            criterion = torch.nn.CrossEntropyLoss(label_smoothing=0.0)
        else:
            criterion = torch.nn.CrossEntropyLoss(label_smoothing=(0.05 if ep <= 15 else 0.0))

        # DropPath
        set_drop_path(model.backbone, 0.1 * ep / CFG["EPOCH"])

        # Train / Val
        tr_loss, tr_gn = train_one_epoch(model, train_loader, scaler,
                                         optimizer, scheduler, ema_weights, ep)

        vl_loss, vl_ll, top1, top5, vl_probs, vl_lbls,\
        hard_pairs_global, conf_err, avg_maxP, hp_new, hp_ret = \
            validate_one_epoch(model, val_loader, ep, hard_pairs_global)

        logdiff = 0.0 if last_ll is None else (vl_ll - last_ll)
        last_ll = vl_ll

        wandb.log({
            "epoch": ep, "train_loss": tr_loss, "val_loss": vl_loss,
            "logloss": vl_ll, "logloss_diff": logdiff,
            "loss_gap": abs(tr_loss-vl_loss),
            "top1": top1, "top5": top5,
            "conf_err": conf_err, "avg_maxP_val": avg_maxP,
            "hard_pairs": len(hard_pairs_global),
            "hard_pairs/new": hp_new, "hard_pairs/retired": hp_ret,
            "grad_norm": tr_gn,
            "lr": scheduler.get_last_lr()[0],
        })

        print(f"[Ep{ep:02d}] Train {tr_loss:.4f} | Val {vl_loss:.4f} | "
              f"LL {vl_ll:.5f} Δ{logdiff:+.5f} | Top1 {top1*100:.2f}% | "
              f"HP {len(hard_pairs_global)}(+{hp_new}/-{hp_ret}) | "
              f"ConfErr {conf_err} | LR {scheduler.get_last_lr()[0]:.2e}")

        if vl_ll < best_ll:
            best_ll = vl_ll
            torch.save({
                "model": model.state_dict(),
                "ema"  : [w.cpu() for w in ema_weights]},
                f"{ROOT}/best_model_fold{fold}.pth")
            wandb.save(f"{ROOT}/best_model_fold{fold}.pth", base_path=ROOT)

    fold_best.append(best_ll)
    all_logits.append(vl_probs);  all_labels.append(vl_lbls)
    run.finish()
    print(f"🏁 Fold {fold} best LL {best_ll:.4f}")

# ───── OOF 저장 ───────────────────────────────────────────────────────────────
ROOT = Path(ROOT)
np.save(ROOT / "oof_logits.npy",  np.concatenate(all_logits).astype(np.float32))
np.save(ROOT / "oof_labels.npy", np.concatenate(all_labels).astype(np.int32))
print("🔚 CV mean LogLoss :", f"{np.mean(fold_best):.4f}")

In [None]:
# ─────────────────  Cell 10-prep : train ↓ embeds  ─────────────────
import torch, numpy as np, os
from tqdm.auto import tqdm
from pathlib import Path

ROOT  = Path(ROOT)
BATCH = 96
device = "cuda" if torch.cuda.is_available() else "cpu"
n_cls  = len(class_names)

# 0) 헬퍼 ────────────────────────────────────────────────────────────
def backbone_forward_feat(bk, x):
    """
    timm backbone 이
      • 일반 모델        → forward_features(x)  반환 (B,C,H,W)
      • FeatureListNet   → bk(x)[-1]           반환 (B,C,H,W)
    로 가변적이므로, 공통 인터페이스로 묶어 준다.
    """
    if hasattr(bk, "forward_features"):
        feat = bk.forward_features(x)
    else:                               # FeatureListNet
        out = bk(x)                     # list[Tensor] or tuple
        feat = out[-1]                  # 마지막 stage
    return feat                         # (B,C,H,W)

# util.py ------------------------------------------
def extract_feat(model: CarNet, x: torch.Tensor) -> torch.Tensor:
    """
    학습 시 head 입력과 동일한 (concat·norm 포함) feature 반환.
    """
    _, feat = model(x, label=None, return_feat=True)
    return feat          # (B, C1+C2)


# 1) Dataset (증강 X, val_tf 로 충분)
train_set = CarDataset(df, transform=val_tf)
train_loader = torch.utils.data.DataLoader(
    train_set, batch_size=BATCH, shuffle=False,
    num_workers=8, pin_memory=True,
    collate_fn=lambda b: torch.stack([t[0] for t in b]))

# 2) backbone (fold0 모델 == feature space 통일)
ckpt  = torch.load(ROOT / "best_model_fold0.pth", map_location=device)
state = {k.replace("_orig_mod.",""): v for k,v in ckpt["model"].items()}

net = CarNet(n_cls, k=3).to(device).eval().to(memory_format=torch.channels_last)
net.load_state_dict(state, strict=True)

embeds = []
with torch.no_grad(), torch.amp.autocast(device_type=device):
    for x in tqdm(train_loader, desc="⏳ extract train emb"):
        x = x.to(device, memory_format=torch.channels_last)

        # # ── 변경된 부분 ───────────────────────────────────────────────
        # f = backbone_forward_feat(net.backbone, x)   # (B,C,H,W)
        # f = net.pool(f).flatten(1)                  # (B, C)
        # # embed BN 등이 있으면:  f = net.bn(f)

        # ★★★ CarNet의 forward를 직접 호출하여 융합된 특징(feat)을 가져옴 ★★★
        f = extract_feat(net, x)        # ← 단일 호출

        embeds.append(f.cpu().numpy())

embeds = np.vstack(embeds).astype("float32")          # (N, dim)
labels  = df["label"].to_numpy().astype("int32")

np.save(ROOT / "train_embeds.npy", embeds)
np.save(ROOT / "train_labels.npy", labels)
print("✅ saved train_embeds.npy :", embeds.shape)


In [None]:
# ───────────────────────────────  셀 10  (k = 20, 30 추가) ───────────────────────────────
#  • 이미 구축된 FAISS 인덱스 / 임베딩 재사용
#  • k ∈ {20, 30, 50} 각각에 대해
#      knn_prob_train_k{k}.npy
#      knn_majority_ratio_k{k}.npy
#    가 없으면 계산·저장, 있으면 건너뜀
# ─────────────────────────────────────────────────────────────────────────────────────────

from pathlib import Path
# ------------------------------------------------------------------
ROOT = Path(ROOT)          # ← 문자열이면 Path 로, 이미 Path 면 그대로
# ------------------------------------------------------------------

import faiss, torch, os, numpy as np
from tqdm.auto import tqdm

# --------------- 전 단계 산출물 불러오기 -------------------------------------------------
train_emb = np.load(f"{ROOT}/train_embeds.npy").astype("float32")   # (N,dim)
train_lbl = np.load(f"{ROOT}/train_labels.npy")                     # (N,)
faiss.normalize_L2(train_emb)                                       # Cosine

index_path = f"{ROOT}/faiss_ip.index"
index = faiss.read_index(index_path) if os.path.exists(index_path) \
        else faiss.IndexFlatIP(train_emb.shape[1])
if index.ntotal == 0:                        # 처음 실행 시만 add
    index.add(train_emb)
    faiss.write_index(index, index_path)
print(f"🔧 FAISS index ready  • vectors = {index.ntotal}")

# 이하 동일
n_classes = len(class_names)

# ───────── k-NN 확률 산출 (self 제거) ──────────────────────────
for K in [20, 30, 50]:
    prob_file  = ROOT / f"knn_prob_train_k{K}.npy"
    ratio_file = ROOT / f"knn_majority_ratio_k{K}.npy"

    if prob_file.exists() and ratio_file.exists():
        print(f"✅ k={K} already exists – skipped")
        continue

    print(f"⇢ computing k-NN  (k={K}) …")

    D, I = index.search(train_emb, K + 1)   # self 포함 K+1
    I = I[:, 1:]                            # self drop

    knn_prob  = np.zeros((len(train_lbl), n_classes), dtype=np.float32)
    for n, nbr in enumerate(I):
        cls, cnt         = np.unique(train_lbl[nbr], return_counts=True)
        knn_prob[n, cls] = cnt / K

    maj_ratio = knn_prob.max(1)

    np.save(prob_file,  knn_prob)
    np.save(ratio_file, maj_ratio)
    print(f"  • saved {prob_file.name} | majority_ratio mean {maj_ratio.mean():.4f}")

print("\n🏁  k-NN probability files regenerated without self-neighbor.\n")


In [None]:
# ────────────────  셀 12 : OOF  ‖  flip-TTA + Global-T ────────────────
import os, numpy as np, pandas as pd, torch, optuna, joblib, kornia
from tqdm.auto import tqdm
from PIL import Image
from sklearn.metrics import log_loss
from pathlib import Path

ROOT       = Path(ROOT)
DEVICE     = "cuda" if torch.cuda.is_available() else "cpu"
FOLDS      = CFG["FOLDS"]
IMG_SIZE = CFG["FINAL_IMG_SIZE"]
N_CLASSES  = len(class_names)

val_tf = build_aug(IMG_SIZE, phase="val")          # = train val transform

# ---------- Dataset ----------
class CarDatasetOOF(torch.utils.data.Dataset):
    def __init__(self, df):
        self.df = df
    def __len__(self): return len(self.df)
    def __getitem__(self, i):
        row = self.df.iloc[i]
        img = Image.open(row.img_path).convert("RGB")
        img = val_tf(image=np.array(img))["image"]
        # row.name 대신, 'index' 열에 저장된 원래 인덱스 값을 반환
        return img, row.label, row['index']            # row.name = 원래 인덱스

def collate(batch):
    return (torch.stack([b[0] for b in batch]),
            torch.tensor([b[1] for b in batch]),
            torch.tensor([b[2] for b in batch]))

# ---------- OOF logits (flip-TTA, EMA X) ----------
df_full    = df
oof_logits = np.empty((len(df_full), N_CLASSES), np.float32)
oof_labels = df_full.label.to_numpy().astype(np.int32)

for f in range(FOLDS):
    print(f"🔎  OOF – fold {f}")
    loader = torch.utils.data.DataLoader(
        CarDatasetOOF(df_full[df_full.fold == f].reset_index()), BATCH, False,
        num_workers=10, pin_memory=True, collate_fn=collate)

    ckpt  = torch.load(ROOT/f"best_model_fold{f}.pth", map_location=DEVICE)
    state = {k.replace("_orig_mod.",""):v for k,v in ckpt["model"].items()}
    model = CarNet(N_CLASSES, k=3, s=30.0, m=0.25)\
            .to(DEVICE).to(memory_format=torch.channels_last).eval()
    model.load_state_dict(state, strict=True)
    # EMA 가중치 및 버퍼 덮어쓰기
    for p_ema, p in zip(ckpt["ema"], model.parameters()):
        p.data.copy_(p_ema.to(DEVICE))

    if "ema_buf" in ckpt and len(ckpt["ema_buf"]) == len(list(model.buffers())):
        for b_ema, b in zip(ckpt["ema_buf"], model.buffers()):
            b.data.copy_(b_ema.to(DEVICE))

    with torch.no_grad(), torch.amp.autocast(device_type=DEVICE, enabled=True):
        for x, _, idx in tqdm(loader, leave=False):
            x = x.to(DEVICE, memory_format=torch.channels_last)
            log1 = model(x)
            log2 = model(torch.flip(x, dims=[3]))      # h-flip
            oof_logits[idx.numpy()] = ((log1 + log2) / 2).cpu().numpy()
    del model; torch.cuda.empty_cache()

np.save(ROOT/"oof_logits_raw.npy", oof_logits.astype(np.float32))
np.save(ROOT/"oof_labels.npy",     oof_labels)

# ---------- Global-T 단일 최적화 ----------
def obj_global(trial):
    T = trial.suggest_float("T", 0.2, 4.0, log=True)
    prob = torch.softmax(torch.tensor(oof_logits)/T, 1).numpy()
    return log_loss(oof_labels, prob, labels=np.arange(N_CLASSES))

study = optuna.create_study(direction="minimize",
                            sampler=optuna.samplers.TPESampler(seed=42))
study.optimize(obj_global, n_trials=200, show_progress_bar=False)
T_global = study.best_params["T"]; LL_global = study.best_value
print(f"🌡️ Global T = {T_global:.4f} | OOF LL = {LL_global:.6f}")

# ---------- 확률 & T 저장 ----------
np.save(ROOT/"best_Ts.npy", np.array([T_global], np.float32))   # 길이 1
prob_oof = torch.softmax(torch.tensor(oof_logits)/T_global,1)\
                 .numpy().astype(np.float32)
np.save(ROOT/"oof_logits_tta.npy", prob_oof)


In [None]:
# ────────────────  셀 13 : Test  ‖ flip-TTA + Global-T ────────────────
import torch, numpy as np, os, pandas as pd
from tqdm.auto import tqdm
from pathlib import Path
from torch.utils.data import DataLoader

ROOT     = Path(ROOT)
DEVICE   = "cuda" if torch.cuda.is_available() else "cpu"
N_FOLDS  = CFG["FOLDS"];  N_CLASS = len(class_names)
BATCH = 32
val_tf = build_aug(IMG_SIZE, phase="val")

# test_paths = sorted([os.path.join(TEST_DIR,f)
#                      for f in os.listdir(TEST_DIR) if f.lower().endswith(".jpg")])
# test_set = CarDataset(pd.DataFrame({"img_path":test_paths}),
#                       transform=val_tf, is_test=True)

# test.csv를 직접 읽어 제출 순서를 정확히 맞춰야 합니다.
test_df = pd.read_csv(os.path.join(ROOT, "data", "test.csv"))
# test.csv의 경로가 상대 경로일 수 있으므로 절대 경로로 변환
test_df['img_path'] = test_df['img_path'].apply(lambda p: os.path.join(ROOT, "data", p))

test_set = CarDataset(test_df, transform=val_tf, is_test=True)

loader = DataLoader(test_set, BATCH, False, num_workers=10, pin_memory=True,
                    collate_fn=lambda b: torch.stack([x[0] for x in b]))

T_global = float(np.load(ROOT/"best_Ts.npy"))        # 0-D 또는 길이 1

probs_fold = np.zeros((N_FOLDS, len(test_set), N_CLASS), np.float32)

for f in range(N_FOLDS):
    print(f"🔸 Test – fold {f}")
    ckpt  = torch.load(ROOT/f"best_model_fold{f}.pth", map_location=DEVICE)
    state = {k.replace("_orig_mod.",""):v for k,v in ckpt["model"].items()}
    model = CarNet(N_CLASS, k=3, s=30.0, m=0.25)\
            .to(DEVICE).to(memory_format=torch.channels_last).eval()
    model.load_state_dict(state, strict=True)
    # EMA 가중치 및 버퍼 덮어쓰기
    for p_ema, p in zip(ckpt["ema"], model.parameters()):
        p.data.copy_(p_ema.to(DEVICE))

    if "ema_buf" in ckpt and len(ckpt["ema_buf"]) == len(list(model.buffers())):
        for b_ema, b in zip(ckpt["ema_buf"], model.buffers()):
            b.data.copy_(b_ema.to(DEVICE))

    out = np.empty((len(test_set), N_CLASS), np.float32); ofs = 0
    with torch.no_grad(), torch.amp.autocast(device_type=DEVICE, enabled=True):
        for x in tqdm(loader, leave=False):
            x = x.to(DEVICE, memory_format=torch.channels_last)
            log1 = model(x)
            log2 = model(torch.flip(x, [3]))
            logits = (log1 + log2) / 2
            prob   = torch.softmax(logits / T_global, 1)
            bsz = len(x); out[ofs:ofs+bsz] = prob.cpu().numpy(); ofs += bsz
    probs_fold[f] = out; del model; torch.cuda.empty_cache()

prob_test = probs_fold.mean(0).astype(np.float32)
np.save(ROOT/"test_logits.npy",  prob_test)
np.save(ROOT/"test_probs_f.npy", probs_fold)
print("✅ test_logits.npy 저장 :", prob_test.shape)


In [None]:
# ─────────────────────  셀 14 : Test k-NN (k = 20·30·50 동시) ─────────────────────
import torch, numpy as np, os, pandas as pd, faiss
from tqdm.auto import tqdm
from torch.utils.data import DataLoader          # ★ 추가

# ── 파라미터 ───────────────────────────────────────────────────────────────────
K_LIST      = [20, 30, 50]                    # 탐색할 k
EMB_BATCH   = 96
INDEX_PATH  = f"{ROOT}/faiss_ip.index"
LABEL_PATH  = f"{ROOT}/train_labels.npy"

# ── 헬퍼: timm FeatureListNet 호환 ────────────────────────────────────────────
def backbone_forward_feat(bk, x):
    """
    • 일반 timm 모델    → bk.forward_features(x)
    • FeatureListNet    → bk(x)[-1]
    둘 다 (B,C,H,W) 텐서를 반환.
    """
    return bk.forward_features(x) if hasattr(bk, "forward_features") else bk(x)[-1]

# ── 1. Index & train labels 로드 ───────────────────────────────────────────────
assert os.path.exists(INDEX_PATH), "train 임베딩 index 가 없습니다."
index        = faiss.read_index(INDEX_PATH)          # IP + L2-normalized
train_labels = np.load(LABEL_PATH)                   # (N_train,)
n_train      = index.ntotal
print(f"🔧  FAISS index ready  • vectors = {n_train}")

# ── 2. 테스트 DataLoader  (셀13과 동일) ────────────────────────────────────────
# test_paths = sorted([os.path.join(TEST_DIR, f)
#                      for f in os.listdir(TEST_DIR) if f.lower().endswith(".jpg")])
# test_df  = pd.DataFrame({"img_path": test_paths})
# test_set = CarDataset(test_df, transform=val_tf, is_test=True)

test_df = pd.read_csv(os.path.join(ROOT, "data", "test.csv"))
test_df['img_path'] = test_df['img_path'].apply(lambda p: os.path.join(ROOT, "data", p))
test_set = CarDataset(test_df, transform=val_tf, is_test=True)

def collate(batch):                  # (img,) 리스트 → Tensor
    return torch.stack([b[0] for b in batch], 0)

loader = DataLoader(test_set, batch_size=EMB_BATCH, shuffle=False,
                    num_workers=8, pin_memory=True, collate_fn=collate)

n_test   = len(test_set)
n_class  = len(class_names)

# ── 3. Backbone 로드 (fold0 모델) & 테스트 임베딩 추출 (1회) ──────────────────
ckpt  = torch.load(f"{ROOT}/best_model_fold0.pth", map_location=device)
state = {k.replace("_orig_mod.", ""): v for k, v in ckpt["model"].items()}

embed_net = CarNet(n_class, k=3).to(device)
embed_net.load_state_dict(state, strict=True)
embed_net = embed_net.to(memory_format=torch.channels_last).eval()

emb_test = []
with torch.no_grad(), torch.amp.autocast(device_type="cuda" if device == "cuda" else "cpu"):
    for x in tqdm(loader, desc="⇢ extract test emb"):
        x = x.to(device, memory_format=torch.channels_last)

        # # ★ 변경: FeatureListNet 호환
        # f = backbone_forward_feat(embed_net.backbone, x)   # (B,C,H,W)
        # feat = embed_net.pool(f)                           # (B,C)
        f = extract_feat(embed_net, x)
        emb_test.append(f.cpu().numpy())

emb_test = np.vstack(emb_test).astype("float32")
faiss.normalize_L2(emb_test)                            # Cosine 기반

# ── 4. k-별 검색 & 확률 저장 ───────────────────────────────────────────────────
for K in K_LIST:
    out_file = f"{ROOT}/knn_prob_test_k{K}.npy"
    if os.path.exists(out_file):
        print(f"⏩  {out_file} already exists – skip")
        continue

    print(f"⇢ computing k-NN  (k={K}) …")
    D, I = index.search(emb_test, K)                    # 최근접 K 인덱스

    knn_prob = np.zeros((n_test, n_class), dtype=np.float32)
    for n, nbr in enumerate(I):
        cls, cnt = np.unique(train_labels[nbr], return_counts=True)
        knn_prob[n, cls] = cnt / K

    np.save(out_file, knn_prob)
    maj_ratio = knn_prob.max(1).mean()
    print(f"  • saved {os.path.basename(out_file)}  | majority_ratio mean {maj_ratio:.4f}")

print("🏁 k-NN probability files ready for k =", ", ".join(map(str, K_LIST)))


In [None]:
# ─────────────────────  Cell 15 : 최종 확률 산출 ─────────────────────
"""
● 이 셀은 모델 폴더 안에서 단독으로 실행
   (ROOT = 현재 모델 디렉터리)  
● 결과물: prob_test_blend.npy  ← 다른 모델과 앙상블 단계에서 사용
"""
import numpy as np, pandas as pd, os
from pathlib import Path
from sklearn.metrics import log_loss

ROOT     = Path(os.getenv("ROOT", "."))      # 모델 전용 디렉터리
USE_KNN  = True

# ── 1. 필수 파일 로드 ───────────────────────────────────────────────
prob_oof   = np.load(ROOT / "oof_logits_tta.npy")      # (N_train,C)
labels_oof = np.load(ROOT / "oof_labels.npy").astype(int)
prob_test  = np.load(ROOT / "test_logits.npy")         # (N_test ,C)

# class 이름을 sample_submission 로부터 확보
sample_sub = next(ROOT.rglob("sample_submission.csv"))
class_names = pd.read_csv(sample_sub, nrows=0).columns[1:].tolist()
C = len(class_names)

# ── 2. pure-model OOF LL 확인 ───────────────────────────────────────
oof_ll = log_loss(labels_oof, prob_oof, labels=np.arange(C))
print(f"🔎  OOF LogLoss (pure model) = {oof_ll:.6f}")

# ── 3. k-NN 블렌드 (옵션) ───────────────────────────────────────────
prob_out = prob_test.copy()
if USE_KNN:
    k_list    = [15]
    beta_grid = np.append(np.linspace(0.60, 0.95, 15), 1.00)
    best_k = None; best_b = 1.0; best_ll = 1e9

    print("\n🔎  Grid-search β for k-NN blend")
    for k in k_list:
        knn_oof = np.load(ROOT / f"knn_prob_train_k{k}.npy")
        for b in beta_grid:
            mix = b*prob_oof + (1-b)*knn_oof
            mix /= mix.sum(1, keepdims=True)
            ll  = log_loss(labels_oof, mix, labels=np.arange(C))
            if ll < best_ll:
                best_ll, best_b, best_k = ll, b, k

    if best_k is not None and best_b < 1.0:
        print(f"✅  best k={best_k}  β={best_b:.2f}  OOF LL={best_ll:.6f}")
        knn_test = np.load(ROOT / f"knn_prob_test_k{best_k}.npy")
        prob_out = best_b*prob_test + (1-best_b)*knn_test
        prob_out /= prob_out.sum(1, keepdims=True)

# ── 4. 최종 test 확률 저장 ──────────────────────────────────────────
np.save(ROOT / "prob_test_blend.npy", prob_out.astype(np.float32))
print("💾  prob_test_blend.npy saved :", prob_out.shape)


**5fold x 2 모델 앙상블**

셀15까지 돌린 후 모든 결과물을 아래와 비슷한 방식으로 폴더에 모두 옮긴 후 셀16 실행

/work/
 ├─ model_A/          (ex. convnext_base)
 │   ├─ best_model_fold0.pth … fold4.pth
 │   ├─ oof_logits_tta.npy     ← Cell12
 │   ├─ best_Ts.npy
 │   ├─ test_logits.npy        ← Cell13
 │   ├─ knn_prob_train_k15.npy ← Cell10/14
 │   └─ knn_prob_test_k15.npy  ← Cell14
 │   └─ prob_test_blend.npy ← Cell15
 └─ model_B/          (ex. swin_large)
     ├─ best_model_fold0.pth … fold4.pth
     ├─ oof_logits_tta.npy
     ├─ best_Ts.npy
     ├─ test_logits.npy
     ├─ knn_prob_train_k15.npy
     └─ knn_prob_test_k15.npy
     └─ prob_test_blend.npy ← Cell15

In [None]:
# ─────────────────────  Cell 16 : 모델 간 앙상블 ───────────────────
"""
● MODELS 리스트에 ‘prob_test_blend.npy’ 가 있는 모델 폴더 경로만 추가
● OOF 파일(oof_logits_tta.npy)이 존재하는 모델만 가중치 탐색 대상
"""
import numpy as np, pandas as pd, os, itertools
from pathlib import Path
from sklearn.metrics import log_loss

# ------------------------------------------------------------------------------
# 1) 모델 폴더 경로 설정
MODEL_DIRS = [
    Path("/work/model_A"),   # 5-fold modelA
    Path("/work/model_B"),   # 5-fold modelB
]

# ------------------------------------------------------------------------------
# 2) 파일 로드
probs_test  = {}
probs_oof   = {}
labels_oof  = None
for d in MODEL_DIRS:
    probs_test[d.name] = np.load(d / "prob_test_blend.npy")
    oof_path = d / "oof_logits_tta.npy"
    if oof_path.exists():
        probs_oof[d.name] = np.load(oof_path)
        if labels_oof is None:
            labels_oof = np.load(d / "oof_labels.npy")

model_names   = list(probs_test.keys())
have_oof      = list(probs_oof.keys())
C             = probs_test[model_names[0]].shape[1]

print("🗂  models :", model_names)
print("🗂  with OOF :", have_oof)

# ------------------------------------------------------------------------------
# 3) 가중치 결정
weights = {n:0.0 for n in model_names}

if len(have_oof) == 0:                     # OOF 전혀 없으면 동등 가중
    for n in weights: weights[n] = 1/len(weights)

elif len(have_oof) == 1:                   # OOF 1개뿐 → weight=1
    weights[have_oof[0]] = 1.0

elif len(have_oof) == 2:                   # OOF 2개 → 1차원 β grid
    best_ll, best_b = 1e9, 0.5
    betas = np.linspace(0.0,1.0,21)
    a,b = have_oof
    for β in betas:
        mix = β*probs_oof[a] + (1-β)*probs_oof[b]
        mix /= mix.sum(1,keepdims=True)
        ll = log_loss(labels_oof, mix)
        if ll < best_ll: best_ll, best_b = ll, β
    weights[a] = best_b
    weights[b] = 1-best_b

else:                                      # OOF ≥3  → 균등 or 간단 Optuna
    for n in have_oof: weights[n] = 1/len(have_oof)

# 잔여 가중치를 OOF 없는 모델에 균등 분배
residual = 1 - sum(weights.values())
no_oof   = [n for n in model_names if n not in have_oof]
for n in no_oof:
    weights[n] = residual / len(no_oof) if no_oof else 0.0

print("⚖️  final weights :", weights)

# ------------------------------------------------------------------------------
# 4) 테스트 확률 합산
prob_final = sum(weights[n]*probs_test[n] for n in model_names)
prob_final /= prob_final.sum(1, keepdims=True)

# ------------------------------------------------------------------------------
# 5) sample_submission 작성
ss_path = next(Path("/work").rglob("sample_submission.csv"))
sub_df  = pd.read_csv(ss_path)
sub_df.iloc[:,1:] = prob_final
sub_df.to_csv("submission.csv", index=False,
              encoding="utf-8-sig", float_format="%.9f")

print("✅  submission.csv saved :", sub_df.shape)
