In [None]:
# 설치 실패시, Internet On/Off 확인
!pip install easydict

In [None]:
import os, random, pickle
from os.path import join
from glob import glob
from time import time

from tqdm.notebook import tqdm

from easydict import EasyDict as edict

import numpy as np
from PIL import Image

from sklearn.svm import OneClassSVM

from sklearn.decomposition import PCA, KernelPCA

from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve

from sklearn.preprocessing import StandardScaler, MinMaxScaler

import pandas as pd

import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML, display


#### DATA SET

train : 3629장 -> normal.<br>
test : 467(good) + 1258(fault case) -> normal(good) and abnormal(failt case). <br>
all : 5354장 <br>


{ClassName} Original dataset과 동일 <br>
|--val.csv : col1 : ID. test 영상 파일명, col2 : label. good:1, fault:-1 <br>
|--train <br>
|&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;|---good <br>
|__test <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;|--- good&fault case <br>

In [None]:
class MVTec_AD():
    def __init__(self, DB_PATH, OUT_PATH, size=(86,86), flatten=True):
        '''
        DB_PATH : str. e.g. {workspace}/dataset
        size : (int, int). default (86, 86)
        flatten : bool. [num_of_img_per_class, h*w]/[num_of_img_per_class, h, w]
        '''
        self.DB_PATH = DB_PATH
        self.out_path = OUT_PATH
        self.size, self.flatten = size, flatten

        self.class_names = ['bottle', 'cable', 'capsule', 'carpet', 'grid',
                            'hazelnut', 'leather', 'metal_nut', 'pill', 'screw',
                            'tile', 'transistor', 'wood', 'zipper']


    def read_mvtec(self, cls="bottle", mode='train'):
        '''
        input
          cls : str. class name.
          mode : str. {train, test, val}
        output
          db_dict : dict. 
                  if val in mode
                   {'imgs':[num_of_img_per_class, h*w or h, w], 'labels':gt, 'class_name':class_name}
                  else
                   {'imgs':[num_of_img_per_class, h*w or h, w], 'class_name':class_name}
        '''

        # Load cache data.
        CACHE_PATH = self.out_path + '/cache'
        cache_ = join(CACHE_PATH, mode)
        if self.flatten: cache_+= 't_'
        cache_+=f'{str(self.size)}_{cls}.pkl'

        # if exist cache data return data
        if os.path.isfile(cache_):
            with open(cache_, 'rb') as f:
                data = pickle.load(f)
            return data
        # else load imgs and cache data
        else:
            if os.path.isdir(CACHE_PATH)!=True:
                os.mkdir(CACHE_PATH)

            # load imgs
            data = edict()  
            if mode == 'val':
                csv = pd.read_csv(join(self.DB_PATH, cls, 'val.csv'))
                data.imgs = [f'{self.DB_PATH}/{cls}/test/{csv.iloc[id, 0]:03d}.png' for id in csv.index]
                data.labels = [csv.iloc[id, 1] for id in csv.index]
            else:
                data.imgs = sorted(glob(join(self.DB_PATH, cls, mode, '*.png')))

            data.class_name = cls

            data = self.read_img(data)

            # cache data
            with open(cache_, "wb") as f:
                pickle.dump(data, f)

            return data
        
    def read_img(self, db_dict):
        im_path = db_dict.imgs
        len_imgs = len(im_path)

        db_dict.imgs = np.array([np.array(Image.open(im).convert("L").resize(self.size)) for im in im_path])
        if self.flatten:
            db_dict.imgs = db_dict.imgs.reshape(len_imgs,-1)
      
        return db_dict

In [None]:
import matplotlib.pyplot as plt

def plot_gallery(images, titles, h, w, n_row=3, n_col=4):
    plt.figure(figsize=(1.8 * n_col, 2.4 * n_row))
    plt.subplots_adjust(bottom=0, left=.01, right=.99, top=.90, hspace=.35)
    for i in range(n_row * n_col):
        plt.subplot(n_row, n_col, i + 1)
        plt.imshow(images[i].reshape((h, w)), cmap=plt.cm.gray)
        plt.title(titles[i], size=12)
        plt.xticks(())
        plt.yticks(())

def title(y_pred, y_test, target_names, i):
    pred_name = target_names[y_pred[i]].rsplit(' ', 1)[-1]
    true_name = target_names[y_test[i]].rsplit(' ', 1)[-1]
    return 'predicted: %s\ntrue:      %s' % (pred_name, true_name)

In [None]:
# load val, train data
DB_PATH = "../input/2021-ml-tp04/MVTecAD"
OUT_PATH = "./"

size = (86, 86)
cls = 'bottle'
dataset = MVTec_AD(DB_PATH, OUT_PATH, flatten=True, size=size)

val = dataset.read_mvtec(cls=cls, mode='val')
train = dataset.read_mvtec(cls=cls, mode='train')
test = dataset.read_mvtec(cls=cls, mode='test')

# 해당 데이터 셋은 비지도 학습을 위한 데이터 셋이기 때문에,
# train data는 정상 케이스의 영상들로만 구성되어 있습니다.
# 베이스 코드에서는 train의 key 정보와 같이 train의 라벨을 주어주지 않습니다.
# 데이터 구성에 대한 내용은 https://colab.research.google.com/drive/1pdgvoPs3KDLq6pV9oLxkXDh6waEp76HT?usp=sharing에 있습니다. 해당 부분도 보시기 바랍니다.
# 위의 URL 주소는 Dataset 설명란의 MVTecAD_colab과 동일한 주소입니다.
# ++ 만약에 train의 라벨이 필요하신 분들은 각 클래스의 train 폴더의 data.csv를 사용하시기 바랍니다.
print(train.keys())
print(val.keys())
print(test.keys())

#### Reconstruction based anomaly detection

In [None]:
# [1] Reconstruction based anomaly detection

n_components = 0.8
    

scaler = StandardScaler() # StandardScaler() 선언
train_sc = scaler.fit_transform(train.imgs) # StandardScaler()를 train.imgs에 학습 및 적용
val_sc = scaler.transform(val.imgs) # StandardScaler()를 val.imgs에 적용

pca = PCA(n_components = n_components, random_state=777) # PCA 선언과 파라미터 설정
pca.fit(train_sc) # PCA에 정규화한 train 데이터를 학습

val_pca = pca.transform(val_sc) # 정규화된 val.imgs PCA transform

num_data_imgs = 53 # size of original img
img_width = size[0] # img width
img_height = size[1] # img height

Reconstruction_imgs = pca.inverse_transform(val_pca)
# 정규화, PCA를 적용했던 val_pca를 PCA의 inverse_trasform으로 복원하고 Reconstruction_imgs 에 저장
Reconstruction_imgs = scaler.inverse_transform(Reconstruction_imgs).reshape(num_data_imgs, img_width, img_height)
#  Reconstruction_imgs를 StrandardScaler()의 inverse_trasform으로 복원하고
# 오리지널 imgs의 크기로 reshape 한 후 Reconstruction_imgs 에 다시 저장

# Reconstruction_error = Original imgs - Reconstruction_imgs

ori = val.imgs.reshape(num_data_imgs, img_width, img_height)
# reshape val.imgs
Original_imgs = ori
Reconstruction_error = Original_imgs - Reconstruction_imgs

# score & predict normal/abnormal
min_max_scaler = MinMaxScaler()

cls_score = Reconstruction_error.sum(axis=1).sum(axis=1)
cls_score = min_max_scaler.fit_transform(cls_score.reshape(-1, 1))

y_pred = cls_score

th = 0.5
y_pred[cls_score < th] = -1
y_pred[cls_score > th] = 1
y_pred = y_pred.reshape(-1)

gt_list = np.asarray(val.labels)
fpr, tpr, _ = roc_curve(gt_list, cls_score)
img_roc_auc = roc_auc_score(gt_list, cls_score)

plt.plot(fpr, tpr, label='%s ROCAUC: %.3f' % (cls, img_roc_auc))
plt.title(f'{cls} ROCAUC: {img_roc_auc:.3f}')

print(f'{cls} ROCAUC: {img_roc_auc:.3f}')


target_names = {-1:'abnormal', 1:'normal'}
orginal_title = [f'{cls}_{target_names[label]}' for label in gt_list]
plot_gallery(ori, orginal_title, size[0], size[1])


prediction_titles = [title(y_pred, gt_list, target_names, i)
                     for i in range(y_pred.shape[0])]
# Reconstruction img                     
plot_gallery(Reconstruction_imgs, prediction_titles, size[0], size[1])
# Reconstruction error img                     
plot_gallery(Reconstruction_error, prediction_titles, size[0], size[1])


plt.show()

#### Embedding feature based anomaly detection

In [None]:
DB_PATH = "../input/2021-ml-tp04/MVTecAD"

OUT_PATH = './'
FIG_PATH = join(OUT_PATH, "fig")
if os.path.isdir(FIG_PATH)!=True:
    os.mkdir(FIG_PATH)

from sklearn.svm import OneClassSVM

dataset = MVTec_AD(DB_PATH, OUT_PATH, flatten=True, size=size)

n_components=180
size = (86, 86)

y_preds = []
y_test = []

roc_auc = dict()
total_roc_auc = []

plt.figure(figsize=[10, 10])
# print(f'\n=====clf:{clf_name}, DA:{dr_name}=====')
# total_roc_auc = []

pbar = tqdm(dataset.class_names)

sample_submit = pd.read_csv('../input/2021-ml-tp04/sample_submit.csv', index_col=0)

i=0

for cls in pbar:
    train = dataset.read_mvtec(cls=cls, mode='train')
    val = dataset.read_mvtec(cls=cls, mode='val')
    test = dataset.read_mvtec(cls=cls, mode='test')

    
    clf = OneClassSVM(kernel="rbf", gamma = 0.001, nu = 0.005) # 모델 파라미터 조정
    
    # Parameter histories
    # 아래는 성능을 올리기 위해 사용했던 파라미터 조정값들 입니다.
    #kernel="rbf", gamma = 0.001, nu = 0.01 // kaggle: 0.53167
    #kernel="rbf", gamma = 0.005, nu = 0.01 // kaggle: 0.55846
    #kernel="rbf", gamma = 0.01, nu = 0.01 // kaggle: 0.57840
    #kernel="rbf", gamma = 0.05, nu = 0.01 // kaggle: 0.53205
    #kernel="rbf", gamma = 0.02, nu = 0.01 // kaggle: 0.50525
    #kernel="rbf", gamma = 1, nu = 0.01 // kaggle: 0.51593
    #kernel="rbf", gamma = 0.01, nu = 0.05 // kaggle: 0.55829
    #kernel="rbf", gamma = 0.01, nu = 0.005 // kaggle: 0.54271
    #kernel="rbf", gamma= 'auto', nu=0.01 // kaggle: 0.51190
    #kernel="linear", gamma= '0.001', nu=0.01 // kaggle: 0.50098
    
    # 
    # kernel="rbf", gamma = 0.001, nu = 0.01 // kaggle: 0.71074
    # kernel="rbf", gamma = 0.01, nu = 0.01 // kaggle: 0.70439
    # kernel="rbf", gamma = 0.001, nu = 0.005 // kaggle: 0.71098
    
    
    #pca = PCA(n_components = n_components, random_state = 777)
    pca = PCA(n_components = n_components, random_state = 777, svd_solver = 'randomized', whiten = True)
    
    # Parameter histories
    # n_components = n_components, random_state = 777, svd_solver = 'randomized', whiten = True // kaggle: 0.57840
    # pca = PCA(n_components = n_components, random_state = 777)
    # n_components = n_components, random_state = 777 // kaggle: 0.57840 
    
    #sc = MinMaxScaler()
    sc = StandardScaler() # StandardScaler 선언
    train_sc = sc.fit_transform(train.imgs) # train.imgs 학습 및 변환
    val_sc = sc.transform(val.imgs) # val.imgs 변환

    
    train_pca = pca.fit_transform(train_sc) # pca에 정규화된 train_sc를 학습 및 변환
    val_pca = pca.transform(val_sc) # val_sc를 pca 변환

    
    clf.fit(train_pca) # 모델에 정규화와 차원축소를 거친 train_pca를 학습시킵니다.
    y_pred = clf.predict(val_pca) # val_pca를 예측하여  y_pred에 저장합니다.
    cls_score_val = clf.score_samples(val_pca) # 점수를 cls_score_val에 저장합니다.

    test_sc = sc.transform(test.imgs) # test.imgs를 정규화
    test_pca = pca.transform(test_sc) # test.imgs를 차원축소
    cls_score = clf.score_samples(test_pca) # 점수를 cls_score에 저장

    
    # eval~valid data
    y_preds.append(y_pred)
    gt_list = np.array(val.labels)
    fpr, tpr, _ = roc_curve(gt_list, cls_score_val)
    img_roc_auc = roc_auc_score(gt_list, cls_score_val)
    total_roc_auc.append(img_roc_auc)
    plt.plot(fpr, tpr, label='%s ROCAUC: %.3f' % (cls, img_roc_auc))
    
    sample_submit['score'][i:i+cls_score.shape[0]] = scaler.fit_transform(cls_score.reshape(-1,1)).reshape(-1)
    i = i+cls_score.shape[0]

print('Average ROCAUC: %.3f' % np.mean(total_roc_auc))
clf_name = type(clf).__name__
pca_name = type(pca).__name__
plt.title(f'{clf_name}_{pca_name}_{n_components}\nAverage image ROCAUC: {np.mean(total_roc_auc):.3f}' )

plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.legend(loc='lower right')

# Result Figure
plt.savefig(f'{FIG_PATH}/{clf_name}_{pca_name}_{n_components}.jpg')

# Submit CSV
sample_submit.to_csv(f'./{clf_name}_{pca_name}_{n_components}.csv')

In [None]:
test = dataset.read_mvtec(cls=cls, mode='test')
np.sqrt(test.imgs.shape[1])