# 02_evaluation_report — カテゴリ横断評価（Mahalanobis / PaDiM）

本ノートでは、指定カテゴリの MVTec AD を対象に、Mahalanobis 距離ベースと PaDiM の2手法で精度を比較します。

評価ポリシー:
- 共分散（およびPaDiM統計）の推定は、各カテゴリの train/good 全画像を使用（CVは行わない）
- モデルの精度評価は test データで実施
- 評価カテゴリはリストで与え、順に推論を実行（初期値: leather, tile, wood, bottle）


In [2]:
# 依存ライブラリの読み込み
import os
from pathlib import Path
import json
from datetime import datetime, timezone
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision import models
from PIL import Image
import warnings

# sklearn が無い環境でも動作するよう、読み込みはベストエフォート
try:
    from sklearn.metrics import roc_auc_score, average_precision_score, f1_score
except Exception as e:
    roc_auc_score = average_precision_score = f1_score = None
    print('[WARN] sklearn.metrics を読み込めませんでした。一部指標が計算できない可能性があります。', e)

# anomaly_detectors からコア関数をインポート
import importlib, anomaly_detectors
importlib.invalidate_caches(); importlib.reload(anomaly_detectors)
from anomaly_detectors import (
    fit_mahalanobis, all_mahalanobis_scores,
    fit_padim, padim_heatmap, all_padim_scores,
)

RESULTS = Path('runs'); RESULTS.mkdir(parents=True, exist_ok=True)
MVTEC_ROOT = Path(os.environ.get('MVTEC_ROOT', 'data/mvtec'))
MVTEC_ROOT.mkdir(parents=True, exist_ok=True)


In [3]:
# torchvision のモデル名から学習済みモデルを読み込む関数
def load_backbone_from_name(name: str):
    try:
        from torchvision.models import get_model, get_model_weights
        weights = None
        try:
            weights = get_model_weights(name).DEFAULT  # 学習済みウェイト
        except Exception:
            pass  # ウェイトが無いモデルはランダム初期化で作る
        return get_model(name, weights=weights).eval()
    except Exception:
        # 旧 API フォールバック（古い torchvision 向け）
        if not hasattr(models, name):
            raise ValueError(f"Unknown backbone name: {name}")
        return models.__dict__[name](pretrained=True).eval()


In [None]:
# JSONからパラメータ読み込み
with open("assets/fixed_pipeline.json") as f:
    cfg = json.load(f)

backbone = cfg['common']['backbone']
model = load_backbone_from_name(backbone)
image_size = cfg['common']['image_size']
threshold_percentile = cfg['common']['threshold_percentile']
MD_layer = cfg['mahalanobis']['layer']
padim_layers = cfg['padim']['layers']
padim_channel_subsample = cfg['padim']['d']

# 各種設定
num_workers = 0 #min(4, os.cpu_count() or 1)
batch_size = 32
eval_categories = ['tile']#, 'lether', 'wood', 'bottle', 'cable', 'capsule']

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"[INFO] Using device: {device}")

## データユーティリティ（ノートブック内に保持）


In [5]:
# 変換（ImageNet の統計を使用）
imagenet_mean = [0.485, 0.456, 0.406]
imagenet_std = [0.229, 0.224, 0.225]
_transform = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),
    transforms.Normalize(mean=imagenet_mean, std=imagenet_std),
])

class ImagePathDataset(Dataset):
    """最小限の画像データセット。 (tensor, label) を返す。

    Args:
        paths (List[Path]): 画像パス群
        labels (List[Any]): 同長のラベル
        transform: 前処理変換
    """
    def __init__(self, paths, labels, transform=None):
        self.paths = [Path(p) for p in paths]
        self.labels = labels
        self.transform = transform
    def __len__(self):
        return len(self.paths)
    def __getitem__(self, idx):
        p = self.paths[idx]; y = self.labels[idx]
        img = Image.open(p).convert('RGB')
        if self.transform is not None:
            img = self.transform(img)
        return img, y

from typing import List, Tuple, Any
def _existing_category_root(category: str) -> Path:
    candidates = [
        MVTEC_ROOT / category,
        Path('datasets/MVTecAD') / category,
        Path('MVtec_dataset') / category,
    ]
    for c in candidates:
        if c.exists():
            return c
    raise FileNotFoundError(f'カテゴリが見つかりません: {category}')

def _list_images(d: Path) -> List[Path]:
    exts = {'.png', '.jpg', '.jpeg'}
    if not d.exists():
        return []
    return sorted([p for p in d.rglob('*') if p.suffix.lower() in exts])

def build_train_and_test_loaders(category: str, batch_size: int = 32) -> Tuple[DataLoader, DataLoader]:
    root = _existing_category_root(category)
    train_good = _list_images(root / 'train' / 'good')
    assert len(train_good) > 0, f'No train/good images: {category}'
    # train: 全 good を使用
    train_ds = ImagePathDataset(train_good, [0]*len(train_good), transform=_transform)
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True)
    # test: サブディレクトリ名をラベルとして使用（'good'以外は異常）
    test_dir = root / 'test'
    test_paths, test_labels = [], []
    for sub in sorted([d for d in test_dir.iterdir() if d.is_dir()], key=lambda p: p.name):
        label = sub.name
        paths = _list_images(sub)
        if paths:
            test_paths.extend(paths)
            test_labels.extend([label]*len(paths))
    assert len(test_paths) > 0, f'No test images: {category}'
    test_ds = ImagePathDataset(test_paths, test_labels, transform=_transform)
    test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True)
    return train_loader, test_loader


## 指標計算ユーティリティ
- 閾値は train/good のスコア分布から FPR=1% を満たす分位点で設定
- AUROC/AUPRC はしきい値に依らないため、test スコアと真値で計算


In [6]:
def fpr_threshold_from_neg(scores_neg: np.ndarray, fpr: float = 0.01) -> float:
    """負例（正常: train/good）のスコアから、指定 FPR を達成するしきい値を返す。
    右裾（スコアが高いほど異常）を閾切りするため、(1 - fpr) 分位点を採用。
    """
    fpr = float(np.clip(fpr, 1e-6, 1-1e-6))
    return float(np.quantile(scores_neg, 1.0 - fpr))

def to_binary_labels(labels: list) -> np.ndarray:
    # 'good' を 0、その他（欠陥名）を 1 とする
    return np.array([0 if str(y) == 'good' else 1 for y in labels], dtype=np.int64)

def compute_metrics(y_true: np.ndarray, y_score: np.ndarray, y_pred: np.ndarray) -> dict:
    out = {}
    if roc_auc_score is not None:
        try:
            out['auroc'] = float(roc_auc_score(y_true, y_score))
        except Exception as e:
            out['auroc'] = None
    if average_precision_score is not None:
        try:
            out['auprc'] = float(average_precision_score(y_true, y_score))
        except Exception as e:
            out['auprc'] = None
    if f1_score is not None:
        try:
            out['f1'] = float(f1_score(y_true, y_pred))
        except Exception as e:
            out['f1'] = None
    out['acc'] = float((y_true == y_pred).mean())
    return out


## カテゴリ評価関数（Mahalanobis / PaDiM）
- 各カテゴリで学習（train/good 全使用）→ test 評価
- しきい値は train/good の FPR=1% を満たすように設定


In [None]:
def evaluate_category(category: str, backbone: str) -> dict:
    # データローダー用意
    train_loader, test_loader = build_train_and_test_loaders(category, batch_size=batch_size)

    # test の真値（2値）
    y_true = to_binary_labels(test_loader.dataset.labels)

    results = {}

    # ---- Mahalanobis ----
    print('start Mahalanobis fitting...')
    state_m = fit_mahalanobis(train_loader, model, feature_node=MD_layer, device=device)
    scores_train_m = all_mahalanobis_scores(state_m, train_loader).numpy()
    #thr_m = fpr_threshold_from_neg(scores_train_m, fpr=fpr_target)
    thr_m = np.percentile(scores_train_m, threshold_percentile)
    scores_test_m = all_mahalanobis_scores(state_m, test_loader).numpy()
    y_pred_m = (scores_test_m >= thr_m).astype(np.int64)
    results['mahalanobis'] = {
        'threshold': float(thr_m),
        **compute_metrics(y_true, scores_test_m, y_pred_m),
    }

    # ---- PaDiM ----
    print('start PaDiM fitting...')
    state_p = fit_padim(
        train_loader,
        model,
        layers=padim_layers,
        d=padim_channel_subsample,
        device=device
    )
    scores_train_p = all_padim_scores(state_p, train_loader).numpy()
    #thr_p = fpr_threshold_from_neg(scores_train_p, fpr=fpr_target)
    thr_p = np.percentile(scores_train_p, threshold_percentile)
    scores_test_p = all_padim_scores(state_p, test_loader).numpy()
    y_pred_p = (scores_test_p >= thr_p).astype(np.int64)
    results['padim'] = {
        'threshold': float(thr_p),
        **compute_metrics(y_true, scores_test_p, y_pred_p),
    }

    return {
        'category': category,
        'backbone': backbone,
        'results': results,
    }


## 実行（カテゴリ一括）
- 結果は runs/eval/<category>/metrics_{method}.json にも保存します。


In [8]:
#with warnings.catch_warnings():
#    # UserWarningを非表示
#    warnings.filterwarnings('ignore', category=UserWarning) 
all_rows = []
for cat in eval_categories:
    print(f'[RUN] category={cat}, backbone={backbone}')
    out = evaluate_category(cat, backbone)
    print('completed.')
    ts = datetime.now(timezone.utc).isoformat()
    # 保存
    out_dir = RESULTS / 'eval' / cat
    out_dir.mkdir(parents=True, exist_ok=True)
    for method, metrics in out['results'].items():
        with (out_dir / f'metrics_{method}.json').open('w', encoding='utf-8') as f:
            json.dump({
                'category': cat, 'method': method, 'backbone': backbone,
                'timestamp': ts, 'metrics': metrics
            }, f, indent=2, ensure_ascii=False)
        row = {'category': cat, 'method': method, 'backbone': backbone}
        row.update(metrics)
        all_rows.append(row)

df = pd.DataFrame(all_rows)
df_pivot = df.pivot(index='category', columns='method', values='auroc')
print('[Summary] AUROC by method/category')
display(df_pivot)
print('[All metrics]')
display(df)


[RUN] category=tile, backbone=resnet18
start Mahalanobis fitting...




KeyboardInterrupt: 