リスタートに伴い、前回公開したベースラインノートブックを更新しました！

主な変更点としては、
- 未知選手の検出のためにArcFaceを使用
- Pytorch lightningを使用したコードに変更
- docstring等を追加

（参考までに、リスタート前のコンペにてArcFaceを使用したところ、LBスコア0.9608となりました。  
これに加えてたかいとさんがdiscussionで公開した後処理を適用したところ、LBスコア0.9765となりました）

このノートブックを実行して訓練すると、val_f1スコアが0.99を超えます。  
ですが、LBスコアでは0.8を少し上回る程度となるため、より良い検証方法について検討する必要があります。



## 1. Setup & Imports

In [None]:
import math
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path

import albumentations as A
import cv2
import numpy as np
import pandas as pd
import pytorch_lightning as pl
import timm
import torch
import torch.nn as nn
import torch.nn.functional as F
from albumentations.pytorch import ToTensorV2
from pytorch_lightning.callbacks import LearningRateMonitor, ModelCheckpoint, RichProgressBar
from pytorch_lightning.callbacks.progress.rich_progress import RichProgressBarTheme
from pytorch_lightning.loggers import WandbLogger
from sklearn.model_selection import StratifiedGroupKFold
from timm.utils import ModelEmaV3
from torch.utils.data import DataLoader, Dataset
from torchmetrics import F1Score
from tqdm import tqdm

## 2. Configuration

In [None]:
@dataclass
class Config:
    # パス設定
    data_dir: Path = Path("/kaggle/input/atmacup22")
    image_dir: Path = Path("/kaggle/input/atmacup22/images")
    crop_dir: Path = Path("/kaggle/dataset/crops/train")  # trainデータのクロップ画像の保存先
    output_dir: Path = Path("/kaggle/working/output")
    use_crops: bool = True  # 高速読み込みのため事前クロップした画像を使用

    # モデル設定
    model_name: str = "efficientnet_b0"
    num_classes: int = 11  # label_id 0-10
    pretrained: bool = True
    img_size: int = 224
    embedding_dim: int = 512

    # ArcFace設定
    arcface_s: float = 30.0
    arcface_m: float = 0.5

    # 訓練設定
    batch_size: int = 128
    num_workers: int = 8
    epochs: int = 20

    lr: float = 1e-3
    weight_decay: float = 1e-4
    ema_decay: float = 0.995
    use_ema: bool = True

    seed: int = 42

    # デバイス設定
    device: str = "cuda"

    # WandB設定
    use_wandb: bool = False
    wandb_project: str = "atmacup22"
    wandb_run_name: str | None = "baseline"

    def __post_init__(self):
        self.output_dir.mkdir(parents=True, exist_ok=True)


cfg = Config()

## 3. Preprocessing - Crop Images

In [None]:
def get_image_path(row: pd.Series, image_dir: Path) -> Path:
    """データフレームの行から画像パスを生成する

    Args:
        row (pd.Series): メタデータの行（quarter, angle, session, frameを含む）
        image_dir (Path): 画像ディレクトリのパス

    Returns:
        Path: 生成された画像ファイルのパス

    Note:
        ファイル名の形式: {quarter}__{angle}__{session:02d}__{frame:02d}.jpg
    """
    fname = f"{row['quarter']}__{row['angle']}__{row['session']:02d}__{row['frame']:02d}.jpg"
    return image_dir / fname


def process_single_crop(args: tuple) -> tuple[int, bool]:
    """単一の画像クロップ処理を実行する

    Args:
        args (tuple): 処理に必要な引数のタプル
            - idx (int): データフレームのインデックス
            - row (pd.Series): メタデータの行
            - image_dir (Path): 画像ディレクトリのパス
            - output_dir (Path): 出力ディレクトリのパス
            - padding_ratio (float): パディング比率

    Returns:
        tuple[int, bool]: (インデックス, 成功フラグ)

    Note:
        バウンディングボックスにパディングを追加してクロップし、
        JPEG品質95%で保存します。
    """
    idx, row, image_dir, output_dir, padding_ratio = args

    try:
        # 画像パスを取得して画像を読み込み
        img_path = get_image_path(row, image_dir)
        img = cv2.imread(str(img_path))

        if img is None:
            return idx, False

        # パディング付きのバウンディングボックスを取得
        x, y, w, h = int(row["x"]), int(row["y"]), int(row["w"]), int(row["h"])
        img_h, img_w = img.shape[:2]

        # パディングサイズを計算
        pad_w = int(w * padding_ratio)
        pad_h = int(h * padding_ratio)

        # クロップ範囲を計算（画像境界内に制限）
        x1 = max(0, x - pad_w)
        y1 = max(0, y - pad_h)
        x2 = min(img_w, x + w + pad_w)
        y2 = min(img_h, y + h + pad_h)

        # 画像をクロップ
        crop = img[y1:y2, x1:x2]

        # クロップした画像を保存
        output_path = output_dir / f"{idx}.jpg"
        cv2.imwrite(str(output_path), crop, [cv2.IMWRITE_JPEG_QUALITY, 95])

        return idx, True
    except Exception as e:
        print(f"インデックス {idx} の処理中にエラーが発生: {e}")
        return idx, False


def preprocess_crops(
    csv_path: Path,
    image_dir: Path,
    output_dir: Path,
    padding_ratio: float = 0.01,
    num_workers: int = None,
):
    """全てのバウンディングボックスを事前クロップして保存する

    Args:
        csv_path (Path): メタデータCSVファイルのパス
        image_dir (Path): 元画像が格納されているディレクトリのパス
        output_dir (Path): クロップした画像を保存するディレクトリのパス
        padding_ratio (float, optional): バウンディングボックスに追加するパディングの比率.
                                       デフォルトは0.1（10%）
        num_workers (int, optional): 並列処理のワーカー数.
                                   Noneの場合はCPUコア数を使用

    Note:
        - 並列処理を使用して高速化
        - 出力ディレクトリが存在しない場合は自動作成
        - 失敗したサンプルのインデックスを記録・表示
        - クロップした画像は{idx}.jpgの形式で保存
    """
    if num_workers is None:
        num_workers = multiprocessing.cpu_count()

    # 出力ディレクトリを作成
    output_dir.mkdir(parents=True, exist_ok=True)

    # CSVファイルを読み込み
    df = pd.read_csv(csv_path)
    print(f"{num_workers}ワーカーで{len(df)}サンプルを処理中...")

    # 並列処理用の引数リストを準備
    args_list = [(idx, row, image_dir, output_dir, padding_ratio) for idx, row in df.iterrows()]

    # 並列処理で実行
    success_count = 0
    failed_indices = []

    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        # 全てのタスクを投入
        futures = {executor.submit(process_single_crop, args): args[0] for args in args_list}

        # 完了したタスクから順次結果を取得
        for future in tqdm(as_completed(futures), total=len(futures), desc="クロッピング中"):
            idx, success = future.result()
            if success:
                success_count += 1
            else:
                failed_indices.append(idx)

    # 処理結果を表示
    print(f"完了: {success_count}/{len(df)} クロップを保存")
    if failed_indices:
        print(f"失敗したインデックス: {failed_indices[:10]}...")

## 4. Dataset & DataLoader

In [None]:
def get_train_transform(img_size: int) -> A.Compose:
    return A.Compose(
        [
            A.Resize(img_size, img_size),
            A.HorizontalFlip(p=0.5),
            A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=15, p=0.5),
            A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1, p=0.5),
            A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            ToTensorV2(),
        ]
    )


def get_val_transform(img_size: int) -> A.Compose:
    return A.Compose(
        [
            A.Resize(img_size, img_size),
            A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            ToTensorV2(),
        ]
    )


class PlayerDataset(Dataset):
    def __init__(
        self,
        df: pd.DataFrame,
        image_dir: Path,
        transform: A.Compose,
        is_test: bool = False,
        cache_images: bool = False,
        crop_dir: Path = None,
    ):
        self.original_indices = df.index.tolist()
        self.df = df.reset_index(drop=True)
        self.image_dir = Path(image_dir)
        self.transform = transform
        self.is_test = is_test
        self.cache_images = cache_images
        self.image_cache = {}
        self.crop_dir = Path(crop_dir) if crop_dir else None
        self.use_crops = crop_dir is not None

    def __len__(self) -> int:
        return len(self.df)

    def _load_image(self, img_path: Path) -> np.ndarray:
        if self.cache_images and str(img_path) in self.image_cache:
            return self.image_cache[str(img_path)]

        img = cv2.imread(str(img_path))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        if self.cache_images:
            self.image_cache[str(img_path)] = img

        return img

    def __getitem__(self, idx: int) -> dict:
        row = self.df.iloc[idx]

        if self.use_crops:
            # 事前クロップした画像を直接読み込む（元のインデックスを使用）
            original_idx = self.original_indices[idx]
            crop_path = self.crop_dir / f"{original_idx}.jpg"
            crop = self._load_image(crop_path)
        else:
            # フル画像を読み込んでクロップ
            img_path = get_image_path(row, self.image_dir)
            img = self._load_image(img_path)

            # パディング付きでバウンディングボックスをクロップ
            x, y, w, h = int(row["x"]), int(row["y"]), int(row["w"]), int(row["h"])
            img_h, img_w = img.shape[:2]

            # パディングを追加（bboxサイズの10%）
            pad_w = int(w * 0.1)
            pad_h = int(h * 0.1)

            x1 = max(0, x - pad_w)
            y1 = max(0, y - pad_h)
            x2 = min(img_w, x + w + pad_w)
            y2 = min(img_h, y + h + pad_h)

            crop = img[y1:y2, x1:x2]

        transformed = self.transform(image=crop)
        image = transformed["image"]

        result = {
            "image": image,
            "angle": row["angle"],
        }

        if not self.is_test:
            result["label"] = torch.tensor(row["label_id"], dtype=torch.long)

        return result


class TestDataset(Dataset):
    def __init__(
        self,
        df: pd.DataFrame,
        base_dir: str,
        transform: A.Compose,
    ):
        self.df = df.reset_index(drop=True)
        self.base_dir = base_dir
        self.transform = transform

    def __len__(self) -> int:
        return len(self.df)

    def __getitem__(self, idx: int) -> dict:
        row = self.df.iloc[idx]

        # rel_pathから直接画像を読み込む
        img_path = f"{self.base_dir}/{row['rel_path']}"
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        # 変換を適用
        transformed = self.transform(image=img)
        image = transformed["image"]

        return {"image": image}


def create_dataloader(
    df: pd.DataFrame,
    image_dir: Path,
    img_size: int,
    batch_size: int,
    num_workers: int,
    is_train: bool = True,
    is_test: bool = False,
    crop_dir: Path = None,
) -> torch.utils.data.DataLoader:
    transform = get_train_transform(img_size) if is_train else get_val_transform(img_size)

    dataset = PlayerDataset(
        df=df,
        image_dir=image_dir,
        transform=transform,
        is_test=is_test,
        crop_dir=crop_dir,
    )

    return torch.utils.data.DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=is_train,
        num_workers=num_workers,
        pin_memory=True,
        drop_last=is_train,
        persistent_workers=num_workers > 0,
        prefetch_factor=4 if num_workers > 0 else None,
    )


class PlayerDataModule(pl.LightningDataModule):
    def __init__(
        self,
        data_dir: Path,
        image_dir: Path,
        crop_dir: Path,
        img_size: int = 224,
        batch_size: int = 128,
        num_workers: int = 8,
        seed: int = 42,
        use_crops: bool = True,
    ):
        super().__init__()
        self.data_dir = data_dir
        self.image_dir = image_dir
        self.crop_dir = crop_dir if use_crops else None
        self.img_size = img_size
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.seed = seed

    def setup(self, stage: str = None):
        train_df = pd.read_csv(self.data_dir / "train_meta.csv")
        val_mask = train_df["quarter"] >= "Q2-016"
        self.train_data = train_df[~val_mask]
        self.val_data = train_df[val_mask]

        print(f"Train: {len(self.train_data)}, Val: {len(self.val_data)}")
        print(f"Val quarters: {self.val_data['quarter'].unique()}")

    def train_dataloader(self):
        return create_dataloader(
            self.train_data,
            self.image_dir,
            self.img_size,
            self.batch_size,
            self.num_workers,
            is_train=True,
            crop_dir=self.crop_dir,
        )

    def val_dataloader(self):
        return create_dataloader(
            self.val_data,
            self.image_dir,
            self.img_size,
            self.batch_size,
            self.num_workers,
            is_train=False,
            crop_dir=self.crop_dir,
        )

## 5. Model Definition

In [None]:
class ArcFaceHead(nn.Module):
    """メトリック学習のためのArcFaceヘッド。

    ArcFaceとは、顔認識や人物再識別などのタスクで使用される損失関数の一種です。
    正式名称は「Additive Angular Margin Loss」で、特徴量空間において
    クラス間の分離をより明確にするために角度マージンを導入します。

    主な特徴:
    1. 特徴量とクラス重みを単位球面上に正規化
    2. コサイン類似度ベースの分類
    3. 正解クラスに対してのみ角度マージン（m）を追加
    4. スケールファクター（s）で勾配の大きさを調整

    これにより、同じクラスの特徴量は密集し、異なるクラス間の特徴量は
    より大きな角度で分離されるため、識別性能が向上します。

    参考文献: ArcFace: Additive Angular Margin Loss for Deep Face Recognition
    https://arxiv.org/abs/1801.07698
    """

    def __init__(
        self,
        in_features: int,
        out_features: int,
        s: float = 30.0,
        m: float = 0.5,
        easy_margin: bool = False,
    ):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.s = s
        self.m = m
        self.easy_margin = easy_margin

        self.weight = nn.Parameter(torch.FloatTensor(out_features, in_features))
        nn.init.xavier_uniform_(self.weight)

        self.cos_m = math.cos(m)
        self.sin_m = math.sin(m)
        self.th = math.cos(math.pi - m)
        self.mm = math.sin(math.pi - m) * m

    def forward(self, x: torch.Tensor, labels: torch.Tensor = None) -> torch.Tensor:
        # 特徴量と重みを正規化
        x_norm = F.normalize(x, p=2, dim=1)
        w_norm = F.normalize(self.weight, p=2, dim=1)

        # コサイン類似度
        cosine = F.linear(x_norm, w_norm)

        if labels is None:
            # 推論モード: コサイン類似度を返す
            return cosine * self.s

        # 訓練モード: 角度マージンを適用
        sine = torch.sqrt(1.0 - torch.clamp(cosine * cosine, 0, 1))
        phi = cosine * self.cos_m - sine * self.sin_m  # cos(theta + m)

        if self.easy_margin:
            phi = torch.where(cosine > 0, phi, cosine)
        else:
            phi = torch.where(cosine > self.th, phi, cosine - self.mm)

        # ワンホットエンコーディング
        one_hot = torch.zeros_like(cosine)
        one_hot.scatter_(1, labels.view(-1, 1).long(), 1)

        # ターゲットクラスにのみマージンを適用
        output = (one_hot * phi) + ((1.0 - one_hot) * cosine)
        output *= self.s

        return output


class PlayerEmbeddingModel(nn.Module):
    """プレイヤー再識別のための埋め込みヘッド付きCNNモデル。

    このモデルは、プレイヤーの画像から特徴的な埋め込みベクトルを抽出し、
    同じプレイヤーの画像同士は近い埋め込みを、異なるプレイヤーの画像同士は
    遠い埋め込みを生成するように学習されます。

    アーキテクチャ:
    1. バックボーン（CNN）: 画像から高次元特徴量を抽出
    2. 埋め込み層: 特徴量を固定次元の埋め込みベクトルに変換
    3. ArcFaceヘッド: 訓練時に角度マージンを用いた分類損失を計算

    推論時は埋め込みベクトル同士のコサイン類似度を計算して
    プレイヤーの同一性を判定します。

    Args:
        model_name (str): 使用するバックボーンモデル名（timmライブラリ）
        embedding_dim (int): 埋め込みベクトルの次元数
        num_classes (int): 訓練データに含まれるプレイヤー数
        pretrained (bool): ImageNet事前訓練重みを使用するか
        arcface_s (float): ArcFaceのスケールパラメータ
        arcface_m (float): ArcFaceの角度マージンパラメータ
    """

    def __init__(
        self,
        model_name: str = "resnet18",
        embedding_dim: int = 512,
        num_classes: int = 11,
        pretrained: bool = True,
        arcface_s: float = 30.0,
        arcface_m: float = 0.5,
    ):
        super().__init__()

        # 分類器なしのバックボーン
        self.backbone = timm.create_model(
            model_name,
            pretrained=pretrained,
            num_classes=0,  # 分類器を削除
        )

        # バックボーンの出力特徴量数を取得
        backbone_out = self.backbone.num_features

        # 埋め込み層（ArcFace論文のBN-Dropout-FC-BN構成）
        self.embedding = nn.Sequential(
            nn.BatchNorm1d(backbone_out),
            nn.Dropout(0.3),
            nn.Linear(backbone_out, embedding_dim, bias=False),
            nn.BatchNorm1d(embedding_dim),
        )

        # 訓練用のArcFaceヘッド
        self.arcface = ArcFaceHead(
            in_features=embedding_dim,
            out_features=num_classes,
            s=arcface_s,
            m=arcface_m,
        )

        self.embedding_dim = embedding_dim

    def get_embedding(self, x: torch.Tensor) -> torch.Tensor:
        """正規化された埋め込み特徴量を抽出。"""
        features = self.backbone(x)
        embedding = self.embedding(features)
        return F.normalize(embedding, p=2, dim=1)

    def forward(self, x: torch.Tensor, labels: torch.Tensor = None) -> torch.Tensor:
        """順伝播。

        Args:
            x: 入力画像
            labels: ArcFaceマージン用のラベル（推論時はNone）

        Returns:
            訓練時（ラベル提供）: 分類損失用のArcFaceロジット
            推論時（ラベルなし）: 正規化された埋め込み
        """
        features = self.backbone(x)
        embedding = self.embedding(features)

        if labels is not None:
            # 訓練: ArcFaceロジットを返す
            return self.arcface(embedding, labels)
        else:
            # 推論: 正規化された埋め込みを返す
            return F.normalize(embedding, p=2, dim=1)


def create_model(
    model_name: str = "resnet18",
    num_classes: int = 11,
    pretrained: bool = True,
    embedding_dim: int = 512,
    arcface_s: float = 30.0,
    arcface_m: float = 0.5,
) -> PlayerEmbeddingModel:
    """モデルインスタンスを作成。"""
    return PlayerEmbeddingModel(
        model_name=model_name,
        embedding_dim=embedding_dim,
        num_classes=num_classes,
        pretrained=pretrained,
        arcface_s=arcface_s,
        arcface_m=arcface_m,
    )


class PlayerModule(pl.LightningModule):
    """ArcFaceを使用したプレイヤー再識別のためのLightningモジュール。"""

    def __init__(
        self,
        model_name: str = "resnet18",
        num_classes: int = 11,
        pretrained: bool = True,
        embedding_dim: int = 512,
        arcface_s: float = 30.0,
        arcface_m: float = 0.5,
        lr: float = 1e-3,
        weight_decay: float = 1e-4,
        epochs: int = 20,
        ema_decay: float = 0.9998,
        use_ema: bool = True,
    ):
        super().__init__()
        self.save_hyperparameters()

        self.model = create_model(
            model_name=model_name,
            num_classes=num_classes,
            pretrained=pretrained,
            embedding_dim=embedding_dim,
            arcface_s=arcface_s,
            arcface_m=arcface_m,
        )
        self.criterion = nn.CrossEntropyLoss()
        self.train_f1 = F1Score(task="multiclass", num_classes=num_classes, average="macro")
        self.val_f1 = F1Score(task="multiclass", num_classes=num_classes, average="macro")

        self.use_ema = use_ema
        self.ema_decay = ema_decay
        self.model_ema = None

    def setup(self, stage: str = None):
        """モデルがデバイスに移動された後にEMAを初期化。"""
        if self.use_ema and self.model_ema is None:
            self.model_ema = ModelEmaV3(
                self.model,
                decay=self.ema_decay,
                device=self.device,
            )

    def forward(self, x: torch.Tensor, labels: torch.Tensor = None) -> torch.Tensor:
        return self.model(x, labels)

    def get_embedding(self, x: torch.Tensor) -> torch.Tensor:
        """埋め込みを抽出（利用可能な場合はEMAモデルを使用）。"""
        if self.use_ema and self.model_ema is not None:
            return self.model_ema.module.get_embedding(x)
        return self.model.get_embedding(x)

    def forward_ema(self, x: torch.Tensor, labels: torch.Tensor = None) -> torch.Tensor:
        """EMAモデルを使用した順伝播。"""
        if self.model_ema is not None:
            return self.model_ema.module(x, labels)
        return self.model(x, labels)

    def on_before_zero_grad(self, optimizer):
        """各最適化ステップ後にEMAを更新。"""
        if self.use_ema and self.model_ema is not None:
            self.model_ema.update(self.model)

    def training_step(self, batch: dict, batch_idx: int) -> torch.Tensor:
        images = batch["image"]
        labels = batch["label"]

        # ArcFaceマージンのためにラベル付きで順伝播
        outputs = self(images, labels)
        loss = self.criterion(outputs, labels)

        preds = outputs.argmax(dim=1)
        self.train_f1(preds, labels)

        self.log("train_loss", loss, prog_bar=True, on_step=True, on_epoch=True)
        self.log("train_f1", self.train_f1, prog_bar=True, on_step=False, on_epoch=True)

        return loss

    def validation_step(self, batch: dict, batch_idx: int) -> torch.Tensor:
        images = batch["image"]
        labels = batch["label"]

        # 検証にはEMAモデルを使用（適切なマージンのためにラベル付き）
        if self.use_ema and self.model_ema is not None:
            outputs = self.forward_ema(images, labels)
        else:
            outputs = self(images, labels)
        loss = self.criterion(outputs, labels)

        preds = outputs.argmax(dim=1)
        self.val_f1(preds, labels)

        self.log("val_loss", loss, prog_bar=True, on_step=False, on_epoch=True)
        self.log("val_f1", self.val_f1, prog_bar=True, on_step=False, on_epoch=True)

        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(
            self.parameters(),
            lr=self.hparams.lr,
            weight_decay=self.hparams.weight_decay,
        )
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=self.hparams.epochs)
        return {
            "optimizer": optimizer,
            "lr_scheduler": {
                "scheduler": scheduler,
                "interval": "epoch",
            },
        }

## 6. Training Functions

In [None]:
@torch.no_grad()
def compute_prototypes(
    model: PlayerModule,
    dataloader: torch.utils.data.DataLoader,
    num_classes: int,
    device: str,
) -> dict:
    """訓練データからクラスプロトタイプ（平均埋め込み）を計算する

    プロトタイプとは、各クラスの代表的な特徴ベクトルのことです。
    具体的には、同じクラスに属する全ての埋め込みベクトルの平均を取り、
    正規化したものがプロトタイプになります。これにより、各クラスの
    「典型的な」特徴を表現するベクトルを得ることができます。

    推論時には、新しいサンプルの埋め込みベクトルと各クラスの
    プロトタイプとの類似度（コサイン類似度など）を計算することで、
    最も近いクラスを予測することができます。

    Returns:
        以下を含む辞書:
        - prototypes: [num_classes, embedding_dim] クラス重心のテンソル
        - all_embeddings: [N, embedding_dim] 全埋め込みのテンソル
        - all_labels: [N] 全ラベルのテンソル
    """
    model.eval()

    all_embeddings = []
    all_labels = []

    for batch in tqdm(dataloader, desc="埋め込み計算中"):
        images = batch["image"].to(device, non_blocking=True)
        labels = batch["label"]

        # 埋め込みを抽出
        embeddings = model.get_embedding(images)
        all_embeddings.append(embeddings.cpu())
        all_labels.append(labels)

    all_embeddings = torch.cat(all_embeddings, dim=0)
    all_labels = torch.cat(all_labels, dim=0)

    # クラスプロトタイプを計算（クラスごとの埋め込みの平均）
    embedding_dim = all_embeddings.shape[1]
    prototypes = torch.zeros(num_classes, embedding_dim)

    for c in range(num_classes):
        mask = all_labels == c
        if mask.sum() > 0:
            class_embeddings = all_embeddings[mask]
            # 平均を正規化して単位ベクトルプロトタイプを取得
            prototype = class_embeddings.mean(dim=0)
            prototypes[c] = torch.nn.functional.normalize(prototype, p=2, dim=0)

    return {
        "prototypes": prototypes,
        "all_embeddings": all_embeddings,
        "all_labels": all_labels,
    }

## 7. Inference Functions

In [None]:
@torch.no_grad()
def extract_embeddings(
    model: PlayerModule,
    loader: DataLoader,
    device: str,
) -> torch.Tensor:
    """画像から埋め込みベクトルを抽出する。

    埋め込みベクトルは、画像の特徴を高次元空間（通常512次元）で表現したベクトルです。
    同じプレイヤーの画像は類似した埋め込みベクトルを持ち、異なるプレイヤーの画像は
    異なる埋め込みベクトルを持つように学習されています。これにより、ベクトル間の
    距離やコサイン類似度を計算することで、プレイヤーの同一性を判定できます。
    """
    model.eval()
    embeddings = []

    for batch in tqdm(loader, desc="Extracting embeddings"):
        images = batch["image"].to(device, non_blocking=True)
        emb = model.get_embedding(images)
        embeddings.append(emb.cpu())

    return torch.cat(embeddings, dim=0)


def predict_with_prototypes(
    embeddings: torch.Tensor,
    prototypes: torch.Tensor,
    threshold: float = 0.5,
) -> list[int]:
    """プロトタイプとのコサイン類似度でラベルを予測する。

    Args:
        embeddings: テスト埋め込み（正規化済み）[N, embedding_dim]
        prototypes: クラスプロトタイプ（正規化済み）[num_classes, embedding_dim]
        threshold: 最小類似度閾値。最大類似度がこの値未満の場合、
                   -1（訓練データに存在しない不明プレイヤー）を予測する。

    Returns:
        list[int]: 予測ラベルのリスト（不明プレイヤーは-1）
    """
    # コサイン類似度を計算（両方とも既に正規化済み）
    similarities = F.linear(embeddings, prototypes)  # [N, num_classes]
    max_sims, max_indices = similarities.max(dim=1)

    # 最大類似度が閾値未満の場合、-1（不明プレイヤー）を予測
    predictions = []
    for sim, idx in zip(max_sims.tolist(), max_indices.tolist()):
        if sim < threshold:
            predictions.append(-1)
        else:
            predictions.append(idx)

    return predictions

## 8. Run Preprocessing

In [None]:
# 訓練データを事前クロップ
preprocess_crops(
    csv_path=cfg.data_dir / "train_meta.csv",
    image_dir=cfg.image_dir,
    output_dir=cfg.crop_dir,
)

## 9. Run Training

In [None]:
pl.seed_everything(cfg.seed)

# データモジュール
dm = PlayerDataModule(
    data_dir=cfg.data_dir,
    image_dir=cfg.image_dir,
    crop_dir=cfg.crop_dir,
    img_size=cfg.img_size,
    batch_size=cfg.batch_size,
    num_workers=cfg.num_workers,
    seed=cfg.seed,
    use_crops=cfg.use_crops,
)

# モデル
model = PlayerModule(
    model_name=cfg.model_name,
    num_classes=cfg.num_classes,
    pretrained=cfg.pretrained,
    embedding_dim=cfg.embedding_dim,
    arcface_s=cfg.arcface_s,
    arcface_m=cfg.arcface_m,
    lr=cfg.lr,
    weight_decay=cfg.weight_decay,
    ema_decay=cfg.ema_decay,
    use_ema=cfg.use_ema,
    epochs=cfg.epochs,
)

# コールバック
checkpoint_callback = ModelCheckpoint(
    dirpath=cfg.output_dir,
    filename="best_model",
    monitor="val_f1",
    mode="max",
    save_top_k=1,
    verbose=True,
    enable_version_counter=False,
)
progress_bar = RichProgressBar(
    theme=RichProgressBarTheme(
        description="cyan",
        progress_bar="blue",
        progress_bar_finished="bright_blue",
        progress_bar_pulse="#0080FF",
        batch_progress="cyan",
        time="grey82",
        processing_speed="grey82",
        metrics="grey82",
    )
)

callbacks = [checkpoint_callback, progress_bar]

if cfg.use_wandb:
    wandb_logger = WandbLogger(
        project=cfg.wandb_project,
        name=cfg.wandb_run_name,
        save_dir="/tmp",
        log_model=True,
    )
    wandb_logger.log_hyperparams(cfg.__dict__)
    callbacks.append(LearningRateMonitor(logging_interval="epoch"))
else:
    wandb_logger = None

# トレーナー
trainer = pl.Trainer(
    max_epochs=cfg.epochs,
    accelerator="auto",
    devices="auto",
    callbacks=callbacks,
    logger=wandb_logger if cfg.use_wandb else False,
    precision="16-mixed",
    enable_progress_bar=True,
    deterministic=True,
    default_root_dir="/tmp",
)

trainer.fit(model, dm)

## 10. Compute Prototypes

In [None]:
print("\n訓練データからクラスプロトタイプを計算中...")

# チェックポイントを手動で読み込み、EMAキーをフィルタリング
checkpoint = torch.load(checkpoint_callback.best_model_path, map_location=cfg.device, weights_only=False)
state_dict = checkpoint["state_dict"]

# model_emaキーをフィルタリング（推論には不要）
filtered_state_dict = {k: v for k, v in state_dict.items() if not k.startswith("model_ema.")}

# 推論用にEMAなしの新しいモデルを作成
best_model = PlayerModule(
    model_name=cfg.model_name,
    num_classes=cfg.num_classes,
    pretrained=False,  # チェックポイントから読み込むため事前訓練重みは不要
    embedding_dim=cfg.embedding_dim,
    arcface_s=cfg.arcface_s,
    arcface_m=cfg.arcface_m,
    lr=cfg.lr,
    weight_decay=cfg.weight_decay,
    ema_decay=cfg.ema_decay,
    use_ema=False,  # 推論時はEMAを無効化
    epochs=cfg.epochs,
)
best_model.load_state_dict(filtered_state_dict, strict=False)
best_model = best_model.to(cfg.device)
best_model.eval()

# 全訓練データ用のデータローダーを作成（拡張なし）
train_df = pd.read_csv(cfg.data_dir / "train_meta.csv")
train_loader = create_dataloader(
    train_df,
    cfg.image_dir,
    cfg.img_size,
    cfg.batch_size,
    cfg.num_workers,
    is_train=False,  # 拡張なし
    crop_dir=cfg.crop_dir,
)

# プロトタイプを計算して保存
prototype_data = compute_prototypes(
    best_model,
    train_loader,
    cfg.num_classes,
    cfg.device,
)

# プロトタイプを保存
prototype_path = cfg.output_dir / "prototypes.pt"
torch.save(prototype_data, prototype_path)
print(f"プロトタイプを保存しました: {prototype_path}")
print(f"プロトタイプ形状: {prototype_data['prototypes'].shape}")
print(f"総埋め込み数: {prototype_data['all_embeddings'].shape[0]}")

## 11. Run Inference

In [None]:
base_dir = str(cfg.data_dir)  # /kaggle/input/atmacup22

test_df = pd.read_csv(cfg.data_dir / "test_meta.csv")
print(f"テストサンプル数: {len(test_df)}")

# チェックポイントを読み込む
ckpt_path = cfg.output_dir / "best_model.ckpt"
checkpoint = torch.load(ckpt_path, map_location=cfg.device, weights_only=False)
state_dict = checkpoint["state_dict"]

# model_emaキーを除外
filtered_state_dict = {k: v for k, v in state_dict.items() if not k.startswith("model_ema.")}

# モデルを作成
inference_model = PlayerModule(
    model_name=cfg.model_name,
    num_classes=cfg.num_classes,
    pretrained=False,
    embedding_dim=cfg.embedding_dim,
    arcface_s=cfg.arcface_s,
    arcface_m=cfg.arcface_m,
    lr=cfg.lr,
    weight_decay=cfg.weight_decay,
    ema_decay=cfg.ema_decay,
    use_ema=False,
    epochs=cfg.epochs,
)
inference_model.load_state_dict(filtered_state_dict, strict=False)
inference_model = inference_model.to(cfg.device)
inference_model.eval()
print(f"モデルを読み込みました: {ckpt_path}")

# プロトタイプを読み込む
prototype_path = cfg.output_dir / "prototypes.pt"
prototype_data = torch.load(prototype_path, map_location="cpu", weights_only=True)
prototypes = prototype_data["prototypes"]
print(f"プロトタイプを読み込みました: {prototypes.shape}")

# データセットとデータローダーを作成
transform = get_val_transform(cfg.img_size)
test_dataset = TestDataset(test_df, base_dir, transform)
test_loader = DataLoader(
    test_dataset,
    batch_size=cfg.batch_size,
    shuffle=False,  # 順番を保持
    num_workers=cfg.num_workers,
    pin_memory=True,
    drop_last=False,
)

print("\nテスト埋め込みを抽出中...")
embeddings = extract_embeddings(inference_model, test_loader, cfg.device)
print(f"埋め込み形状: {embeddings.shape}")

print("\nプロトタイプとのコサイン類似度で予測中...")
predictions = predict_with_prototypes(embeddings, prototypes, threshold=0.5)

submission = pd.DataFrame({"label_id": predictions})
submission_path = cfg.output_dir / "submission.csv"
submission.to_csv(submission_path, index=False)
print(f"提出ファイルを保存しました: {submission_path}")

print("\n予測分布:")
print(pd.Series(predictions).value_counts().sort_index())