In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc

import torch, torchvision
import torch.nn.functional as F
from torch import nn, optim
from torchvision import transforms, datasets

### 세팅 

In [2]:
# Computational device
# Device will be set to GPU if it is available.(you should install valid Pytorch version with CUDA. Otherwise, it will be computed using CPU)
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
print("Using Device:", DEVICE)

Using Device: cuda


In [3]:
# Fashion MNIST dataset
trainset = datasets.FashionMNIST(
    root      = './.data/', train = True,
    download  = True,
    transform = transforms.ToTensor())
testset = datasets.FashionMNIST(
    root      = './.data/', train     = False,
    download  = True,
    transform = transforms.ToTensor())

In [4]:
SELECT_NORMAL = 2 # Set 2 class as train dataset.
trainset.data = trainset.data[trainset.targets == SELECT_NORMAL]
trainset.targets = trainset.targets[trainset.targets == SELECT_NORMAL] # Set 2 class as train dataset.

test_label = [2,4,6] # Define actual test class that we use
actual_testdata = torch.isin(testset.targets, torch.tensor(test_label))
testset.data = testset.data[actual_testdata]
testset.targets = testset.targets[actual_testdata]

test_loader = torch.utils.data.DataLoader(
    dataset     = testset, batch_size  = 1,
    shuffle     = False,num_workers = 2)

train_data_size = len(trainset)
test_data_size = len(testset)

print("Train data size:", train_data_size, "Test data size:", test_data_size)

Train data size: 6000 Test data size: 3000


#### 데이터 증강 기법 사용 class 

In [5]:
class GaussianNoise(nn.Module):
    def __init__(self, std=0.1):
        super().__init__()
        self.std = std

    def forward(self, x):
        if self.training:
            noise = x.data.new(x.size()).normal_(0, self.std)
            return x + noise
        return x

In [6]:
# 몇 배로 Augmentation을 할 것인지 알려주면 해당 배수만큼 Augmentation을 수행하는 클래스
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(20),
    GaussianNoise(0.1)
])

class AugmentedDataset(torch.utils.data.Dataset):
    def __init__(self, dataset, transform=None, augmentation_factor=1):
        '''
        dataset: 원본 데이터셋\
        transform: 증강을 위한 transform
        augmentation_factor: 몇 배로 Augmentation
        '''
        self.dataset = dataset
        self.transform = transform
        self.augmentation_factor = augmentation_factor
        self.original_length = len(dataset)

    def __len__(self):
        # 전체 데이터 수 = 원본 * 배수
        return self.original_length * self.augmentation_factor

    def __getitem__(self, idx):
        # 원본 인덱스를 순환해서 접근
        original_idx = idx % self.original_length
        x, y = self.dataset[original_idx]

        # 증강 적용
        if self.transform:
            x = self.transform(x)

        return x, y

In [7]:
# 데이터셋을 먼저 train과 val로 나누고, train에 대해서만 증강을 적용
n_val = int(len(trainset) * 0.2)
n_train = len(trainset) - n_val
BATCH_SIZE = 1024

augset, valset = torch.utils.data.random_split(trainset, [n_train, n_val], generator=torch.Generator().manual_seed(2025))

augset = AugmentedDataset(augset, transform=transform, augmentation_factor=10)

train_loader = torch.utils.data.DataLoader(
    dataset     = augset, batch_size  = BATCH_SIZE,
    shuffle     = True,num_workers = 0) 

val_loader = torch.utils.data.DataLoader(
    dataset     = valset, batch_size = BATCH_SIZE,
    shuffle     = False,num_workers = 0)

# data size check
print("Train data size:", len(augset),"Val data size:", len(valset),"Test data size:", len(testset))

Train data size: 48000 Val data size: 1200 Test data size: 3000


### 모델 및 Training Setting 

In [8]:
class EarlyStopping():
    def __init__(self, patience=10, verbose=False, delta=0):
        '''
        patience (int): 얼마나 기다릴지
        verbose (bool): True일 경우 각 epoch의 loss 출력
        delta (float): 개선이 되었다고 인정되는 최소한의 loss
        '''
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.inf
        self.delta = delta

    def __call__(self, val_loss, model):
        score = -val_loss # validation loss가 작을수록 좋다고 가정

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''validation loss가 감소하면 모델을 저장한다.'''
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), 'checkpoint.pt')
        self.val_loss_min = val_loss

### 그리드 서치 

In [9]:
import torch
import copy, os
from tqdm import tqdm, trange

# Auto-scaled learning rate based on batch size
lr = 1e-3 * BATCH_SIZE / 256  # 1e-3 is the default learning rate for AdamW in PyTorch

class GridSearchTrainer:
    def __init__(self, models, criterions, train_loader, val_loader, n_epochs=50, patience=10, save_dir='./checkpoints', verbose=True, device=None):
        """
        models: {"model_name": model()}
        criterions: {"loss_name": loss_function}
        """
        self.models = models
        self.criterions = criterions
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.n_epochs = n_epochs
        self.patience = patience
        self.save_dir = save_dir
        self.verbose = verbose
        self.device = device if device is not None else torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        os.makedirs(save_dir, exist_ok=True)

    def train_one_epoch(self, model, optimizer, criterion):
        model.train()
        total_loss = 0
        pbar = tqdm(self.train_loader, desc="Train", leave=False)
        for x, _ in pbar:
            x = x.to(self.device)
            output = model(x)
            loss = criterion(output, x)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            pbar.set_postfix(loss=loss.item())
        return total_loss / len(self.train_loader)

    def validate_one_epoch(self, model, criterion):
        model.eval()
        total_loss = 0
        pbar = tqdm(self.val_loader, desc="Validation", leave=False)
        with torch.no_grad():
            for x, _ in pbar:
                x = x.to(self.device)
                output = model(x)
                loss = criterion(output, x)
                total_loss += loss.item()
                pbar.set_postfix(loss=loss.item())
        return total_loss / len(self.val_loader)

    def run(self):
        results = []

        for model_name, model in self.models.items():
            for loss_name, criterion in self.criterions.items():
                print(f'▶ Training [{model_name}] with [{loss_name}]')

                model = model.to(self.device)
                optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
                criterion = criterion.to(self.device)
                best_val_loss = float('inf')
                best_model = None
                early_stop_counter = 0

                # ✅ tqdm으로 epoch 진행률 표시
                for epoch in trange(self.n_epochs, desc=f"{model_name} | {loss_name}"):
                    train_loss = self.train_one_epoch(model, optimizer, criterion)
                    val_loss = self.validate_one_epoch(model, criterion)

                    if self.verbose:
                        print(f'[Epoch {epoch+1}/{self.n_epochs}] Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}')

                    if val_loss < best_val_loss:
                        best_val_loss = val_loss
                        best_model = copy.deepcopy(model.state_dict())
                        early_stop_counter = 0
                        if self.verbose:
                            print(f'>> Best Updated (Val Loss: {best_val_loss:.4f})')
                    else:
                        early_stop_counter += 1

                    if early_stop_counter >= self.patience:
                        if self.verbose:
                            print(f'>> Early Stopping at Epoch {epoch+1}')
                        break

                clean_loss_name = loss_name.replace("+", "and")
                save_path = f'{self.save_dir}/{model_name}_{clean_loss_name}.pth'
                torch.save(best_model, save_path)
                print(f'>> Saved Best [{model_name}] + [{loss_name}] -> {save_path}')

                results.append({
                    "model": model_name,
                    "loss": loss_name,
                    "best_val_loss": best_val_loss,
                    "save_path": save_path
                })

        return results


### 모델 및 loss 불러오기 

In [10]:
import torch.nn as nn
import model

def get_model_classes():
    """
    model 폴더 내에서 nn.Module 기반 클래스만 자동으로 dict로 반환
    """
    model_classes = {}
    for k in dir(model):
        obj = getattr(model, k)
        if isinstance(obj, type) and issubclass(obj, nn.Module) and obj.__module__.startswith('model.'):
            model_classes[k] = obj
    return model_classes

model_classes = {name: cls() for name, cls in get_model_classes().items()}
print("Available models:", model_classes.keys())

# loss function
from loss.losses import FlexibleLoss
loss_functions = {
    "MSE": FlexibleLoss(mode="mse"),
    "MSE+SSIM": FlexibleLoss(mode="mse+ssim"),
    "MSE+SSIM+Perceptual": FlexibleLoss(mode="mse+ssim+perceptual"),
}

Available models: dict_keys(['Autoencoder', 'Autoencoder2D', 'CAE', 'DenoisingAutoencoder', 'DiffusionUNet', 'GANDiscriminator', 'GANGenerator', 'GANomaly', 'PatchEmbed', 'RobustAutoencoder', 'SimpleDDPM', 'SkipConnectionAutoencoder', 'TransformerAnomalyDetector', 'VAE'])


### Trainer 실행 

In [None]:
EPOCHS = 50
PATIENCE = 20
trainer = GridSearchTrainer(
    models=model_classes,
    criterions=loss_functions,
    train_loader=train_loader,
    val_loader=val_loader,
    n_epochs=EPOCHS,
    patience=PATIENCE,
    save_dir='./checkpoints',
    verbose=False,
    device=DEVICE
)
results = trainer.run()

results_df = pd.DataFrame(results)

▶ Training [Autoencoder] with [MSE]


Autoencoder | MSE:   4%|▍         | 2/50 [00:25<10:09, 12.69s/it]

In [None]:
results_df

### eval

In [None]:
import os
import torch
import torch.nn.functional as F
import numpy as np
import pandas as pd
from sklearn.metrics import roc_auc_score, precision_recall_curve, auc, f1_score
from tqdm import tqdm

class Evaluator:
    def __init__(self, checkpoint_dir, val_loader, test_loader, device=None, percentile=0.95, save_dir='./eval_results'):
        self.checkpoint_dir = checkpoint_dir
        self.val_loader = val_loader
        self.test_loader = test_loader
        self.device = device if device else torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.percentile = percentile
        self.save_dir = save_dir
        os.makedirs(self.save_dir, exist_ok=True)

    def load_model(self, model_name):
        model_class = get_model_classes()[model_name]
        model = model_class().to(self.device)
        return model

    def get_scores(self, loader, model):
        model.eval()
        scores, labels = [], []
        with torch.no_grad():
            for x, y in tqdm(loader, desc="Scoring"):
                x = x.to(self.device)
                output = model(x)
                if isinstance(output, tuple):
                    output = output[1]
                error = F.mse_loss(output, x, reduction='none')
                score = error.view(error.size(0), -1).mean(dim=1)
                scores.append(score.cpu())
                labels.append(y.cpu())
        return torch.cat(scores).numpy(), torch.cat(labels).numpy()

    def run(self):
        checkpoint_files = [f for f in os.listdir(self.checkpoint_dir) if f.endswith(".pth")]
        results = []

        for ckpt in checkpoint_files:
            print(f"\n▶ Evaluating {ckpt}")

            model_name, loss_name = ckpt[:-4].split("_", 1)
            model = self.load_model(model_name)
            model.load_state_dict(torch.load(os.path.join(self.checkpoint_dir, ckpt)))

            # Validation으로 threshold 계산
            val_scores, _ = self.get_scores(self.val_loader, model)
            threshold = np.percentile(val_scores, self.percentile * 100)
            print(f" >> Threshold (@{self.percentile*100:.0f}%) = {threshold:.4f}")

            # Test 평가
            test_scores, test_labels = self.get_scores(self.test_loader, model)
            test_labels = (test_labels != 2).astype(int)  # 2 = normal class

            preds = (test_scores > threshold).astype(int)

            auc_score = roc_auc_score(test_labels, test_scores)
            precision, recall, _ = precision_recall_curve(test_labels, test_scores)
            pr_auc = auc(recall, precision)
            f1 = f1_score(test_labels, preds)

            print(f"ROC-AUC: {auc_score:.4f} | PR-AUC: {pr_auc:.4f} | F1: {f1:.4f}")

            results.append({
                "checkpoint": ckpt,
                "threshold": threshold,
                "roc_auc": auc_score,
                "pr_auc": pr_auc,
                "f1": f1
            })

        df = pd.DataFrame(results)
        return df

In [None]:
evaluator = Evaluator(
    checkpoint_dir='./checkpoints',
    val_loader=val_loader,
    test_loader=test_loader,
    device=DEVICE,
    percentile=0.95,
    save_dir='./eval_results'
)
eval_results = evaluator.run()
eval_results.to_csv('./eval_results/eval_results.csv', index=False)
eval_results