In [1]:
from __future__ import annotations

import math
import os
from copy import deepcopy
from pathlib import Path
from typing import Tuple, List

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from PIL import Image
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from tqdm.auto import tqdm

In [2]:
DATA_ROOT = Path("./")
IMAGE_DIR = DATA_ROOT / "images"
META_PATH = DATA_ROOT / "data.csv"
RANDOM_SEED = 42
VAL_RATIO = 0.2
BATCH_SIZE = 32
NUM_EPOCHS = 5
LEARNING_RATE = 1e-4
IMAGE_SIZE = 224
EMBED_DIM = 256
TRIPLET_MARGIN = 0.3
LOG_INTERVAL = 50
TOP_K = 1

rng = np.random.default_rng(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed_all(RANDOM_SEED)

assert META_PATH.exists(), f"Metadata not found: {META_PATH}"
assert IMAGE_DIR.exists(), f"Image directory not found: {IMAGE_DIR}"

df = pd.read_csv(META_PATH)
df = df.rename(columns={"dog ID": "dog_id", "nose print image": "image_name"})

available_mask = df["image_name"].apply(lambda name: (IMAGE_DIR / name).exists())
missing = df.loc[~available_mask, "image_name"].tolist()
if missing:
    raise FileNotFoundError(f"{len(missing)} images missing. Example: {missing[:3]}")

label_map = {dog_id: idx for idx, dog_id in enumerate(sorted(df["dog_id"].unique()))}
id_map = {idx: dog_id for dog_id, idx in label_map.items()}
df["label"] = df["dog_id"].map(label_map)


def stratified_per_class_split(frame: pd.DataFrame, val_ratio: float, seed: int):
    rng = np.random.default_rng(seed)
    train_parts = []
    val_parts = []

    for dog_id, group in frame.groupby("dog_id"):
        indices = group.index.to_numpy()
        rng.shuffle(indices)

        if len(group) == 1:
            train_parts.append(group)
            continue

        val_count = max(1, int(round(len(group) * val_ratio)))
        val_count = min(len(group) - 1, val_count)

        val_idx = indices[:val_count]
        train_idx = indices[val_count:]

        train_parts.append(frame.loc[train_idx])
        val_parts.append(frame.loc[val_idx])

    train_df = pd.concat(train_parts).sample(frac=1.0, random_state=seed).reset_index(drop=True)
    if val_parts:
        val_df = pd.concat(val_parts).sample(frac=1.0, random_state=seed).reset_index(drop=True)
    else:
        val_df = pd.DataFrame(columns=frame.columns)
    return train_df, val_df


train_df, val_df = stratified_per_class_split(df, VAL_RATIO, RANDOM_SEED)

triplet_mask = train_df.groupby("dog_id")["image_name"].transform("count") >= 2
triplet_train_df = train_df.loc[triplet_mask].reset_index(drop=True)
excluded = len(train_df) - len(triplet_train_df)

print(f"총 샘플 수: {len(df):,}")
print(f"Train: {len(train_df):,}, Validation: {len(val_df):,}")
print(
    f"Validation 비율: {len(val_df) / len(df):.3f}, "
    f"Validation에 포함된 클래스 수: {val_df['dog_id'].nunique():,}"
)
print(f"Triplet 학습에 사용할 샘플: {len(triplet_train_df):,} (제외 {excluded:,})")
train_df.head()

총 샘플 수: 20,000
Train: 13,956, Validation: 6,044
Validation 비율: 0.302, Validation에 포함된 클래스 수: 6,000
Triplet 학습에 사용할 샘플: 12,185 (제외 1,771)


Unnamed: 0,dog_id,image_name,label
0,4651,A*H6e0QKriV2QAAAAAAAAAAAAAAQAAAQ.jpg,4651
1,5144,A*B1jXQrueubcAAAAAAAAAAAAAAQAAAQ.jpg,5144
2,2148,A*xRIKTrCpOy4_9YRBiXiZ3QAAAQAAAQ.jpg,2148
3,5705,A*GGwZR7_S7yYAAAAAAAAAAAAAAQAAAQ.jpg,5705
4,3628,A*8JIqQpZyc3AAAAAAAAAAAAAAAQAAAQ.jpg,3628


In [9]:
class DogNoseDataset(Dataset):
    def __init__(self, frame: pd.DataFrame, image_dir: Path, transform=None):
        self.frame = frame.reset_index(drop=True)
        self.image_dir = image_dir
        self.transform = transform

    def __len__(self) -> int:
        return len(self.frame)

    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, int, int]:
        row = self.frame.iloc[idx]
        image_path = self.image_dir / row["image_name"]
        image = Image.open(image_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        label = int(row["label"])
        return image, label, idx


class TripletDogDataset(Dataset):
    def __init__(self, frame: pd.DataFrame, image_dir: Path, transform=None, seed: int = RANDOM_SEED):
        self.frame = frame.reset_index(drop=True)
        self.image_dir = image_dir
        self.transform = transform
        self.seed = seed
        grouped = self.frame.groupby("dog_id").indices
        self.grouped_indices = {dog_id: np.array(indices) for dog_id, indices in grouped.items()}
        self.dog_ids = list(self.grouped_indices.keys())

    def __len__(self) -> int:
        return len(self.frame)

    def _sample_positive(self, dog_id: int, anchor_idx: int, rng: np.random.Generator) -> int:
        candidates = self.grouped_indices[dog_id]
        if len(candidates) == 1:
            return anchor_idx
        pos_idx = anchor_idx
        while pos_idx == anchor_idx:
            pos_idx = int(rng.choice(candidates))
        return pos_idx

    def _sample_negative(self, dog_id: int, rng: np.random.Generator) -> int:
        neg_dog = dog_id
        while neg_dog == dog_id:
            neg_dog = rng.choice(self.dog_ids)
        neg_idx = int(rng.choice(self.grouped_indices[neg_dog]))
        return neg_idx

    def __getitem__(self, idx: int):
        rng = np.random.default_rng(self.seed + idx)
        row = self.frame.iloc[idx]
        pos_idx = self._sample_positive(row["dog_id"], idx, rng)
        neg_idx = self._sample_negative(row["dog_id"], rng)

        pos_row = self.frame.iloc[pos_idx]
        neg_row = self.frame.iloc[neg_idx]

        def load_image(image_name: str):
            image = Image.open(self.image_dir / image_name).convert("RGB")
            if self.transform:
                return self.transform(image)
            return transforms.ToTensor()(image)

        anchor_img = load_image(row["image_name"])
        pos_img = load_image(pos_row["image_name"])
        neg_img = load_image(neg_row["image_name"])
        return anchor_img, pos_img, neg_img


def build_transforms(image_size: int, train: bool = True):
    base_transforms = [
        transforms.Resize((image_size, image_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]
    if train:
        aug = [
            transforms.RandomResizedCrop(image_size, scale=(0.8, 1.0)),
            transforms.RandomHorizontalFlip(),
            transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
        ]
        return transforms.Compose(aug + base_transforms[1:])
    return transforms.Compose(base_transforms)


train_transform = build_transforms(IMAGE_SIZE, train=True)
eval_transform = build_transforms(IMAGE_SIZE, train=False)

triplet_dataset = TripletDogDataset(triplet_train_df, IMAGE_DIR, transform=train_transform, seed=RANDOM_SEED)
train_gallery_dataset = DogNoseDataset(train_df, IMAGE_DIR, transform=eval_transform)
val_dataset = DogNoseDataset(val_df, IMAGE_DIR, transform=eval_transform)

num_workers = max(1, min(4, (os.cpu_count() or 1)))
triplet_loader = DataLoader(
    triplet_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=num_workers,
    drop_last=True,
)
train_gallery_loader = DataLoader(
    train_gallery_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=num_workers,
)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=num_workers)

len(triplet_dataset), len(train_gallery_dataset), len(val_dataset)

(12185, 13956, 6044)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"사용 디바이스: {device}")


class NoseEmbeddingModel(nn.Module):
    def __init__(self, embedding_dim: int = EMBED_DIM):
        super().__init__()
        backbone = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
        in_features = backbone.fc.in_features
        backbone.fc = nn.Identity()
        self.backbone = backbone
        self.head = nn.Sequential(
            nn.Linear(in_features, embedding_dim),
            nn.BatchNorm1d(embedding_dim),
        )

    def forward(self, x):
        feats = self.backbone(x)
        emb = self.head(feats)
        return F.normalize(emb, p=2, dim=1)


model = NoseEmbeddingModel().to(device)
criterion = nn.TripletMarginLoss(margin=TRIPLET_MARGIN, p=2)
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=NUM_EPOCHS)


def run_metric_epoch(loader: DataLoader, epoch: int, phase: str, train: bool = True):
    epoch_loss = 0.0
    total = 0

    if train:
        model.train()
    else:
        model.eval()

    progress = tqdm(loader, leave=False)
    with torch.set_grad_enabled(train):
        for batch_idx, (anchor, positive, negative) in enumerate(progress, start=1):
            anchor = anchor.to(device)
            positive = positive.to(device)
            negative = negative.to(device)

            if train:
                optimizer.zero_grad()

            anchor_emb = model(anchor)
            positive_emb = model(positive)
            negative_emb = model(negative)
            loss = criterion(anchor_emb, positive_emb, negative_emb)

            if train:
                loss.backward()
                nn.utils.clip_grad_norm_(model.parameters(), max_norm=5.0)
                optimizer.step()

            batch_size = anchor.size(0)
            epoch_loss += loss.item() * batch_size
            total += batch_size

            progress.set_description(f"{phase} loss={loss.item():.4f}")
            if LOG_INTERVAL and (batch_idx % LOG_INTERVAL == 0 or batch_idx == len(loader)):
                avg_loss = epoch_loss / max(total, 1)
                tqdm.write(
                    f"[{phase}] Epoch {epoch} Step {batch_idx}/{len(loader)} | "
                    f"Batch loss {loss.item():.4f} | Running loss {avg_loss:.4f}"
                )

    return epoch_loss / max(total, 1)


def compute_embeddings(loader: DataLoader, desc: str):
    model.eval()
    embeddings, labels, indices = [], [], []
    with torch.no_grad():
        for images, label_batch, index_batch in tqdm(loader, desc=desc, leave=False):
            images = images.to(device)
            emb = model(images).cpu()
            embeddings.append(emb)
            labels.append(label_batch.cpu())
            indices.append(index_batch.cpu())
    if embeddings:
        return torch.cat(embeddings), torch.cat(labels), torch.cat(indices)
    return (
        torch.empty(0, EMBED_DIM),
        torch.empty(0, dtype=torch.long),
        torch.empty(0, dtype=torch.long),
    )


def evaluate_retrieval(epoch: int):
    train_embs, train_labels, train_indices = compute_embeddings(train_gallery_loader, "Train embed")
    val_embs, val_labels, val_indices = compute_embeddings(val_loader, "Val embed")
    if len(train_embs) == 0 or len(val_embs) == 0:
        return 0.0, {}

    train_norm = F.normalize(train_embs, p=2, dim=1)
    val_norm = F.normalize(val_embs, p=2, dim=1)

    similarity = val_norm @ train_norm.T
    best_sim, best_indices = similarity.max(dim=1)
    pred_labels = train_labels[best_indices]
    acc = (pred_labels == val_labels).float().mean().item()

    return acc, {
        "train_embeddings": train_norm,
        "train_labels": train_labels,
        "train_indices": train_indices,
        "val_embeddings": val_norm,
        "val_labels": val_labels,
        "val_indices": val_indices,
        "pred_indices": best_indices,
        "pred_labels": pred_labels,
        "similarity": best_sim,
    }

사용 디바이스: cuda


In [11]:
history = []
best_state = None
best_val_acc = 0.0

if len(triplet_dataset) == 0:
    raise ValueError("Triplet 학습에 사용할 데이터가 없습니다. 각 dog ID에 최소 2장의 이미지가 필요합니다.")

for epoch in range(1, NUM_EPOCHS + 1):
    train_loss = run_metric_epoch(triplet_loader, epoch, "Train", train=True)
    val_acc, _ = evaluate_retrieval(epoch)
    scheduler.step()

    history.append({
        "epoch": epoch,
        "train_loss": train_loss,
        "val_acc": val_acc,
    })

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_state = {
            "model": deepcopy(model.state_dict()),
            "optimizer": deepcopy(optimizer.state_dict()),
        }

    print(
        f"Epoch {epoch}/{NUM_EPOCHS} | "
        f"Train loss {train_loss:.4f} | "
        f"Val NN acc {val_acc:.4f}"
    )

history

  0%|          | 0/380 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
history_df = pd.DataFrame(history)
if not history_df.empty:
    fig, axes = plt.subplots(1, 2, figsize=(12, 4))
    history_df.plot(x="epoch", y="train_loss", ax=axes[0], title="Triplet Loss")
    history_df.plot(x="epoch", y="val_acc", ax=axes[1], title="Val NN Accuracy")
    axes[1].set_ylim(0, 1)
    plt.show()

history_df.tail(1)

In [None]:
if best_state is not None:
    model.load_state_dict(best_state["model"])
    optimizer.load_state_dict(best_state["optimizer"])
    print(f"최고 Validation NN 정확도: {best_val_acc:.4f}")
else:
    print("경고: 최고 성능 가중치가 저장되지 않았습니다. 현재 모델 상태를 사용합니다.")

final_acc, eval_cache = evaluate_retrieval(NUM_EPOCHS + 1)
print(f"최종 최근접 이웃 정확도: {final_acc:.4f}")


In [None]:
if not eval_cache:
    raise RuntimeError("평가 캐시가 비어 있습니다. 먼저 evaluate_retrieval을 실행하세요.")

train_indices = eval_cache["train_indices"].numpy()
val_indices = eval_cache["val_indices"].numpy()
pred_indices = eval_cache["pred_indices"].numpy()
pred_labels = eval_cache["pred_labels"].numpy()

val_results = val_dataset.frame.iloc[val_indices].reset_index(drop=True)
val_results["pred_label"] = pred_labels
val_results["pred_dog_id"] = val_results["pred_label"].map(id_map)
val_results["correct"] = val_results["pred_label"] == val_results["label"]

match_gallery = train_gallery_dataset.frame.iloc[pred_indices].reset_index(drop=True)
val_results["match_image_name"] = match_gallery["image_name"].values
val_results["match_dog_id"] = match_gallery["dog_id"].values

val_results.head()


In [None]:
def show_predictions(df: pd.DataFrame, num_samples: int = 6):
    if df.empty:
        raise ValueError("시각화할 데이터가 없습니다.")
    picks = df.sample(min(num_samples, len(df)), random_state=RANDOM_SEED)
    cols = 3
    rows = math.ceil(len(picks) / cols)
    plt.figure(figsize=(cols * 4.5, rows * 4.5))

    for idx, (_, row) in enumerate(picks.iterrows()):
        ax = plt.subplot(rows, cols, idx + 1)
        query_image = Image.open(IMAGE_DIR / row["image_name"]).convert("RGB")
        ax.imshow(query_image)
        ax.axis("off")
        title = (
            f"정답:{row['dog_id']}\n예측:{row['pred_dog_id']}"
            + (" ✅" if row["correct"] else " ❌")
        )
        ax.set_title(title)

    plt.tight_layout()
    return picks[["image_name", "dog_id", "pred_dog_id", "correct", "match_image_name", "match_dog_id"]]


show_predictions(val_results)
