In [None]:
#!/usr/bin/env python3
import logging
from pathlib import Path

import cv2
import numpy as np
import timm
import torch
from anomalib import TaskType
from anomalib.data.datamodules import Folder
from anomalib.data.datasets import FolderDataset
from anomalib.data.utils import ValSplitMode
from anomalib.engine.engine import Engine
from anomalib.loggers import AnomalibCometLogger
from anomalib.models import (
    Cfa,
    Cflow,
    Csflow,
    Dfkde,
    Dfm,
    Draem,
    Dsr,
    EfficientAd,
    Fastflow,
    Fre,
    Ganomaly,
    Padim,
    Patchcore,
    ReverseDistillation,
    Stfpm,
    Supersimplenet,
    Uflow,
    VlmAd,
    WinClip,
)
from anomalib.models.components.base import AnomalibModule
from anomalib.models.components.classification import FeatureScalingMethod
from anomalib.models.image.efficient_ad.torch_model import EfficientAdModelSize
from anomalib.models.image.reverse_distillation.anomaly_map import (
    AnomalyMapGenerationMode,
)
from anomalib.models.image.vlm_ad.utils import ModelName
from anomalib.pre_processing import PreProcessor
from anomalib.utils.post_processing import (
    anomaly_map_to_color_map,
    superimpose_anomaly_map,
)
from lightning.pytorch.callbacks import EarlyStopping, ModelCheckpoint
from lightning.pytorch.utilities.types import EVAL_DATALOADERS, TRAIN_DATALOADERS
from matplotlib import pyplot as plt
from PIL import Image
from torchvision.transforms.v2 import Compose, Resize, ToTensor

# logging.basicConfig(level=logging.DEBUG)

In [None]:
MODEL_NAMES = [
    # 0 # △一般に高速（データ規模依存） # 観測変数の因子構造を検証する統計的手法(Continuous Flow Analysis)
    # SUPPORTED_BACKBONES = ("vgg19_bn", "resnet18", "wide_resnet50_2", "efficientnet_b5")
    "CFA",
    # 1 # △アプローチ・条件で異なる # 連続確率を用いて離散生成モデルを拡張するフローマッチング手法(Conditional Flow Matching)
    "C-Flow",
    # 2 # △アプローチ・条件で異なる # 連続状態離散フローマッチングモデル(Continuous-State Flow Matching)
    # backboneなし
    "CS-Flow",
    # 3 # △大規模データ場合は処理時間が伸びる # 高次元データの外れ値検知向けのカーネル密度推定モデル(Distribution-Free Kernel Density Estimation)
    "DFKDE",
    # 4 # ×時間がかかる # 生成的フローマッチング(Deep Feature Matching)型モデル(Deep Flow Matching)
    "DFM",
    # 5 # ×時間がかかる # 深層自己監督と再構成で異常検知を行うモデル(Dual Reconstruction AutoEncoder-based Model)
    # backboneなし
    "DRAEM",
    # 6 # ◯比較的高速 # 正規化フローを用いて外れ値を検知するモデル(Deep Subspace Reconstruction)
    # backboneなし
    "DSR",
    # 7 # ◎非常に高速 # 計算効率重視の異常検知アルゴリズム(Efficient Anomaly Detection)
    # backboneなし
    "Efficient AD",
    # 8 # ◯GPU推論で高速 # 高速流ベース生成による異常検知法
    # SUPPORTED_BACKBONES = ("cait_m48_448", "deit_base_distilled_patch16_384", "resnet18", "wide_resnet50_2")
    "FastFlow",
    # 9 # ◎高速 # 再構成誤差に基づく異常検知ネットワーク(Feature Reconstruction Error)
    "FRE",
    # 10 # ×時間がかかる # 生成対向ネットワーク(GAN)による再構成誤差を活用する異常検知モデル(Generative Adversarial Network Anomaly Detection)
    # backboneなし
    "GANomaly",
    # 11 # △構造や実装でばらつきあり # 多変量分布で特徴空間の異常を検出するモデル(Patch Distribution Modeling)
    "PaDiM",
    # 12 # ◎非常に高速 # 高次元特徴空間におけるパッチベースの異常検知モデル
    "PatchCore",
    # 13 # ◯高速 # 教師あり逆蒸留法を用いた異常検知モデル
    "Reverse Distillation",
    # 14 # △構造や実装でばらつきあり # 教師なし空間的注意機構(フローベースパッチマッチング)を用いた異常検知モデル(Student-Teacher Feature Pyramid Matching)
    "STFPM",
    # 15 # ◎非常に高速 # シンプルで効果的な異常検知ニューラルネットワーク
    "SuperSimpleNet",
    # 16 # △アプローチ・条件で異なる # 光フロー(U-Net構造)を用いた異常検知モデル(U-Net-based Flow)
    # AVAILABLE_EXTRACTORS = ["mcait", "resnet18", "wide_resnet50_2"]
    "U-Flow",
    # 17 # △推論時間長め # 大規模視覚言語モデルを用いた異常検知モデル(Vision-Language Model for Anomaly Detection)
    # backboneなし
    "VLM-AD",
    # 18 # △推論時間長め # ウィンドウ注意機構を用いたCLIPベースの異常検知モデル(Windowed CLIP)
    # backboneなし
    "WinCLIP",
]

In [None]:
BACKBONES = [
    # ResNet 一般的なモデル
    # 0 # ◎非常に高速
    "resnet18",
    # 1 # ◯高速
    "resnet50",
    # 2 # △やや遅い
    "resnet101",
    # 3 # ×遅め
    "resnet152",
    # Wide ResNet より広い層を持つモデル
    # 4 # ◯高速
    "wide_resnet50_2",
    # 5 # △やや遅い
    "wide_resnet101_2",
    # EfficientNet 軽量で高性能なモデル
    # 6 # ◎非常に高速
    "efficientnet_b0",
    # 7 # ◯高速
    "efficientnet_b3",
    # 8 # △やや遅め
    "efficientnet_b4",
    # 9 # △やや遅め
    "efficientnet_b5",
    # Vision Transformer (ViT) 画像のパッチを特徴量として取り込むモデル
    # 10 # △やや遅い
    "vit_base_patch16_224",
    # 11 # ×遅め
    "vit_large_patch16_224",
    # Swin Transformer 局所的な注意機構を持つモデル
    # 13 # △やや遅い
    "swin_base_patch4_window7_224",
    # 14 # ×遅め
    "swin_large_patch4_window7_224",
    # DenseNet 高密度な接続を持つモデル
    # 15 # ◯高速
    "densenet121",
    # 16 # ◯高速
    "densenet169",
    # 17 # △やや遅い
    "densenet201",
    # RegNet 効率的なアーキテクチャを持つモデル
    # 18 # ◎非常に高速
    "regnetx_002",
    # 19 # ◯高速
    "regnetx_004",
    # 20 # ◎非常に高速
    "regnety_032",
    # MobileNet 軽量なモデル
    # 21 # ◎非常に高速
    "mobilenetv2_100",
    # 22 # ◎非常に高速
    "mobilenetv3_large_100",
    # VGG シンプルで広く使われるモデル
    # 23 # △やや遅い
    "vgg19_bn",
    # ViT派生の大規模モデルで、大規模データセット向き(Class-Attention in Image Transformers)
    # 24 # ×時間がかかる
    "cait_m48_448",
    # 蒸留で軽量・効率化したViTモデル(Data-efficient Image Transformer Base Distilled Patch)
    # 25 # ×時間がかかる
    "deit_base_distilled_patch16_384",
    # マルチヘッド層と階層的注意機構で視覚タスクに高精度をもたらすVision Transformerモデル
    # 26 # △やや遅い
    "mcait",
]

In [None]:
# ----- 設定 -----
DATASET_PATH = "./datasets/custom"
DATASET_PATH_ANOMALY = "./datasets/custom_anomaly"
IMAGE_SIZE = 128
BATCH_SIZE = 1
EPOCHS = 1
CHECKPOINT_PATH = "./checkpoint.ckpt"

In [None]:
# MODEL_NAME = "CFA"
# MODEL_NAME = "C-Flow"
# MODEL_NAME = "CS-Flow"
# MODEL_NAME = "DFKDE"
# MODEL_NAME = "DFM"
# MODEL_NAME = "DRAEM"
# MODEL_NAME = "DSR"
# MODEL_NAME = "Efficient AD"
# MODEL_NAME = "FastFlow"
# MODEL_NAME = "FRE"
# MODEL_NAME = "GANomaly"
# MODEL_NAME = "PaDiM"
# MODEL_NAME = "PatchCore"
# MODEL_NAME = "Reverse Distillation"
# MODEL_NAME = "STFPM"
# TODO
# MODEL_NAME = "SuperSimpleNet"
# MODEL_NAME = "U-Flow"
# MODEL_NAME = "VLM-AD"
MODEL_NAME = "WinCLIP"
# BACKBONE = ""
# BACKBONE = "resnet18"
# BACKBONE = "resnet50"
# BACKBONE = "efficientnet_b0"
BACKBONE = "wide_resnet50_2"
# BACKBONE = "mcait"

# バックボーンから特徴抽出層を取得
if BACKBONE in ["", "mcait"]:
    layers = []
else:
    feature_model = timm.create_model(BACKBONE, features_only=True)
    layers = feature_model.feature_info.module_name() # type: ignore
    print(layers)

# 前処理の設定
transform = Compose([Resize((IMAGE_SIZE, IMAGE_SIZE))])
pre_processor = PreProcessor(transform=transform)

In [None]:
# ----- モデル構築 -----
if MODEL_NAME == "CFA":
    model = Cfa(
        backbone=BACKBONE,
        pre_processor=pre_processor,
        visualizer=False,
        gamma_c=1,
        gamma_d=1,
        num_nearest_neighbors=3,
        num_hard_negative_features=3,
        radius=1e-5,
    )
    # max_epochs = 30
    # callbacks = [EarlyStopping(patience=5, monitor="pixel_AUROC", mode="max")]
elif MODEL_NAME == "C-Flow":
    model = Cflow(
        backbone=BACKBONE,
        pre_processor=pre_processor,
        layers=layers[-3:],
        fiber_batch_size=BATCH_SIZE,
        pre_trained=True,
        decoder="freia-cflow",
        condition_vector=128,
        coupling_blocks=8,
        clamp_alpha=1.9,
        permute_soft=False,
        lr=0.0001,
    )
    # max_epochs = 50
    # callbacks = [EarlyStopping(patience=5, monitor="pixel_AUROC", mode="max")]
elif MODEL_NAME == "CS-Flow":
    model = Csflow(
        pre_processor=pre_processor,
        cross_conv_hidden_channels=1024,
        n_coupling_blocks=4,
        clamp=3,
        num_channels=3,
    )
    # max_epochs = 240
elif MODEL_NAME == "DFKDE":
    model = Dfkde(
        backbone=BACKBONE,
        layers=[layers[-1]],
        # pre_processor = pre_processor,
        pre_trained=True,
        # n_pca_components=16,
        n_pca_components=1,
        feature_scaling_method=FeatureScalingMethod.SCALE,
        max_training_points=40000,
    )
    # max_epochs = 1
    # callbacks = [EarlyStopping(monitor="pixel_AUROC", mode="max")]
elif MODEL_NAME == "DFM":
    model = Dfm(
        backbone=BACKBONE,
        layer=layers[-2],
        pre_processor=pre_processor,
        pre_trained=True,
        pooling_kernel_size=4,
        pca_level=0.97,
        score_type="fre",
    )
    # max_epochs = 1
elif MODEL_NAME == "DRAEM":
    model = Draem(
        pre_processor=pre_processor,
        enable_sspcab=False,
        sspcab_lambda=0.1,
        anomaly_source_path=None,
        beta=(0.1, 1.0),
    )
    # max_epochs = 700
    # callbacks = [EarlyStopping(patience=20, monitor="pixel_AUROC", mode="max")]
elif MODEL_NAME == "DSR":
    model = Dsr(
        pre_processor=pre_processor,
        latent_anomaly_strength=0.2,
        upsampling_train_ratio=0.7,
    )
    # max_epochs = 700
elif MODEL_NAME == "Efficient AD":
    model = EfficientAd(
        pre_processor=pre_processor,
        imagenet_dir="./datasets/imagenette",
        teacher_out_channels=384,
        model_size=EfficientAdModelSize.S,
        lr=0.0001,
        weight_decay=0.00001,
        padding=False,
        pad_maps=True,
    )
    # max_epochs = 1000
elif MODEL_NAME == "FastFlow":
    model = Fastflow(
        backbone=BACKBONE,
        pre_processor=pre_processor,
        pre_trained=True,
        flow_steps=8,
        conv3x3_only=False,
        hidden_ratio=1.0,
    )
    # max_epochs = 500
    # callbacks = [EarlyStopping(monitor="pixel_AUROC", mode="max")]
elif MODEL_NAME == "FRE":
    model = Fre(
        backbone=BACKBONE,
        layer=layers[-2],
        pre_processor=pre_processor,
        input_dim=IMAGE_SIZE * IMAGE_SIZE,
        latent_dim=IMAGE_SIZE,
        pre_trained=True,
        pooling_kernel_size=2,
    )
    # max_epochs = 1220
elif MODEL_NAME == "GANomaly":
    model = Ganomaly(
        pre_processor=pre_processor,
        batch_size=32,
        n_features=64,
        latent_vec_size=100,
        extra_layers=0,
        add_final_conv_layer=True,
        wadv=1,
        wcon=50,
        wenc=1,
        lr=0.0002,
        beta1=0.5,
        beta2=0.999,
    )
    # max_epochs = 100
    # callbacks = [EarlyStopping(monitor="image_AUROC", mode="max")]
elif MODEL_NAME == "PaDiM":
    model = Padim(
        backbone=BACKBONE,
        layers=layers[-4:-1],
        pre_processor=pre_processor,
        n_features=100,
        pre_trained=True,
    )
    # max_epochs = 1
elif MODEL_NAME == "PatchCore":
    model = Patchcore(
        backbone=BACKBONE,
        layers=layers[-3:-1],
        pre_processor=pre_processor,
        pre_trained=True,
        coreset_sampling_ratio=0.1,
        num_neighbors=9,
    )
    # max_epochs = 1
elif MODEL_NAME == "Reverse Distillation":
    model = ReverseDistillation(
        backbone=BACKBONE,
        layers=layers[-4:-1],
        pre_processor=pre_processor,
        anomaly_map_mode=AnomalyMapGenerationMode.ADD,
        pre_trained=True,
    )
    # max_epochs = 200
    # callbacks = [EarlyStopping(monitor="pixel_AUROC", mode="max")]
elif MODEL_NAME == "STFPM":
    model = Stfpm(
        backbone=BACKBONE,
        layers=layers[-4:-1],
        pre_processor=pre_processor,
    )
    # max_epochs = 100
    # callbacks = [EarlyStopping(patience=5, monitor="pixel_AUROC", mode="max")]
elif MODEL_NAME == "SuperSimpleNet":
    model = Supersimplenet(
        backbone=BACKBONE,
        layers=layers[-3:-1],
        pre_processor=pre_processor,
        perlin_threshold = 0.2,
        supervised=False,
    )
    # max_epochs = 1
elif MODEL_NAME == "U-Flow":
    model = Uflow(
        backbone=BACKBONE,
        # pre_processor=pre_processor,
        flow_steps=4,
        affine_clamp=2.0,
        affine_subnet_channels_ratio=1.0,
        permute_soft=False,
    )
    # max_epochs = 200
    # callbacks = [EarlyStopping(patience=20, monitor="pixel_AUROC", mode="max")]
elif MODEL_NAME == "VLM-AD":
    model = VlmAd(
        model=ModelName.LLAMA_OLLAMA,
        api_key=None,
        k_shot=0,
    )
    # max_epochs = 1
elif MODEL_NAME == "WinCLIP":
    model = WinClip(
        # pre_processor=pre_processor,
        class_name="transistor",
        k_shot=0,
        scales=(2, 3),
        few_shot_source=None,
    )

In [None]:


if MODEL_NAME in ["SuperSimpleNet", "VLM-AD", "WinCLIP"]:
    # マスク付き
    datamodule = Folder(
        name="custom_anomaly",
        root=DATASET_PATH_ANOMALY,
        normal_dir=Path("train") / "good",
        abnormal_dir=Path("test") / "broken_large",
        normal_test_dir=Path("test") / "good",
        mask_dir=Path("ground_truth") / "broken_large",
        val_split_mode=ValSplitMode.FROM_TEST,
        val_split_ratio=0.2,
        train_batch_size=BATCH_SIZE,
        eval_batch_size=BATCH_SIZE,
        num_workers=1,
    )
else:
    datamodule = Folder(
        name="custom",
        root=DATASET_PATH,
        normal_dir="train",
        normal_test_dir="test",
        val_split_mode=ValSplitMode.FROM_TRAIN,
        # 検証データを入れるとスコアが1か0になるため、検証はしない
        # ratioを0にするとデフォルト値で分割されるため、非常に小さい値を設定
        val_split_ratio=0.0001,
        train_batch_size=BATCH_SIZE,
        eval_batch_size=BATCH_SIZE,
        num_workers=1,
    )
datamodule.setup()

print(len(datamodule.train_data))
print(len(datamodule.val_data))
print(len(datamodule.test_data))

In [None]:
# ----- 学習 -----
engine = Engine(
    # callbacks=callbacks,
    max_epochs=EPOCHS,
    accelerator="auto",
    devices=1,
)
engine.fit(
    model=model,
    datamodule=datamodule,
)

In [None]:
def get_item(prediction, key):
    if not hasattr(prediction, key):
        return None
    
    val = getattr(prediction, key)

    # list or tuple
    if isinstance(val, (list, tuple)):
        return val[0] if len(val) > 0 else None
    
    # PyTorch Tensor
    if isinstance(val, torch.Tensor):
        if val.numel() == 0:
            return None
        if val.numel() == 1:
            return val.item()
        return val[0] if val.dim() > 0 else val.item()
    
    # NumPy ndarray
    if isinstance(val, np.ndarray):
        if val.size == 0:
            return None
        if val.size == 1:
            return val.item()
        return val.flat[0]  # flat iteratorで最初の要素
    
    # それ以外（int, float, etc.）
    return val

In [None]:
try:
    # trainデータのスコア
    train_predictions = engine.predict(
        model=model, dataloaders=datamodule.train_dataloader()
    )
    train_scores = [get_item(prediction, "pred_score") for prediction in train_predictions]
    # 閾値（99.7％）
    threshold = np.mean(train_scores) + 3 * np.std(train_scores)
    # # 四分位範囲×1.5の場合
    # threshold = np.percentile(train_scores, 75) + 1.5 * (
    #     np.percentile(train_scores, 75) - np.percentile(train_scores, 25)
    # )
except Exception as e:
    print(f"exception: {e}")
    threshold = 0
print(f"threshold: {threshold}")

In [None]:
predictions = engine.predict(model=model, datamodule=datamodule)

In [None]:
fig, ax = plt.subplots(nrows=len(predictions), ncols=2, figsize=(8, 6))
fig.suptitle(f"{MODEL_NAME}[{BACKBONE}]", x=0.2)
fig.subplots_adjust(hspace=0.4, wspace=0.0, left=0, right=0.3)


def set_ax_style(ax, image):
    ax.axes.xaxis.set_visible(False)
    ax.axes.yaxis.set_visible(False)
    if image is not None:
        ax.imshow(image)


# TODO
if predictions[0].anomaly_map is not None:
    # マップの最小値、最大値、範囲
    map_min = min(
        prediction.anomaly_map[0].min().cpu().numpy()
        for prediction in predictions
    )
    map_max = max(
        prediction.anomaly_map[0].max().cpu().numpy()
        for prediction in predictions
    )
    map_ptp = map_max - map_min

    def superimpose_anomaly_map_g(
        anomaly_map: np.ndarray,
        image: np.ndarray,
        alpha: float = 0.4,
        gamma: int = 0,
    ) -> np.ndarray:
        nomalized_map = (((anomaly_map - map_min) / map_ptp) * 255).astype(
            np.uint8
        )
        color_map = cv2.applyColorMap(nomalized_map, cv2.COLORMAP_JET)
        rgb_color_map = cv2.cvtColor(color_map, cv2.COLOR_BGR2RGB)
        height, width = rgb_color_map.shape[:2]
        image = cv2.resize(image, (width, height))
        return cv2.addWeighted(rgb_color_map, alpha, image, (1 - alpha), gamma)

else:

    def superimpose_anomaly_map_g(
        anomaly_map: np.ndarray,
        image: np.ndarray,
        alpha: float = 0.4,
        gamma: int = 0,
    ) -> np.ndarray:
        return None


for i, prediction in enumerate(predictions):

    image_path = get_item(prediction, "image_path")
    image_size = prediction.image.shape[-2:]
    image = np.array(Image.open(image_path).resize(image_size))
    set_ax_style(ax[i, 0], image)
    pred_score = get_item(prediction, "pred_score")

    if pred_score is None:
        pred_score = 0.0

    if get_item(prediction, "gt_mask") is not None:
        # マスクがある場合はラベルから
        pred_labels = (
            "Anomaly" if get_item(prediction, "pred_label") else "Normal"
        )
    else:
        # 閾値で判定
        pred_labels = "Anomaly" if pred_score > threshold else "Normal"
    ax[i, 0].set_title(
        f"[{Path(image_path).name}] Score: {pred_score:.2f} [{pred_labels}]",
        loc="left",
    )

    anomaly_map = get_item(prediction, "anomaly_map")
    if anomaly_map is None:
        set_ax_style(ax[i, 1], anomaly_map)
        continue
    anomaly_map = anomaly_map.cpu().numpy().squeeze()

    heat_map = superimpose_anomaly_map_g(anomaly_map=anomaly_map, image=image)
    set_ax_style(ax[i, 1], heat_map)