<a href="https://colab.research.google.com/github/machine-perception-robotics-group/MPRGDeepLearningLectureNotebook/blob/master/12_gan/anomaly_detection_vae.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 繰り返し処理による異常検知

---
## 目的

Variational Autoencoder (VAE) を用いた繰り返し処理による異常検知の仕組みについて理解する．

## モジュールのインポート
はじめに必要なモジュールをインポートします．

### GPUの確認
GPUを使用した計算が可能かどうかを確認します．

`GPU availability: True`と表示されれば，GPUを使用した計算を行うことが可能です．
Falseとなっている場合は，上部のメニューバーの「ランタイム」→「ランタイムのタイプを変更」からハードウェアアクセラレータをGPUにしてください．

In [None]:
# モジュールのインポート
import os
from time import time
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

import torchvision
import torchvision.transforms as transforms

# GPUの確認
use_cuda = torch.cuda.is_available()
print('Use CUDA:', use_cuda)

## データセット（MVTec-AD）

この演習では，[MVTec Anomaly Detection (MVTec-AD) Dataset](https://www.mvtec.com/company/research/datasets/mvtec-ad)を使用します．

MVTec-AD Datasetは，異常検知評価データセットです．
このデータセットには下図に示すような，さまざまな種類の物体の画像データが含まれており，それぞれ正常，異常の画像データが含まれています．
今回はこのデータのうち，「capsule」のデータを例に異常検知を行います．

![MVTec-AD.jpg](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/143078/f608231e-295c-8f31-b48c-d9f46e6a3aad.jpeg)


### データセットのダウンロード

下記のURLからMVTec-ADの「capsule」の画像データをダウンロードします．

※ オリジナルのMVTec-ADの画像サイズは1枚あたり1024x1024 pixelsですが，ファイルサイズの削減などの観点から，下記のzipファイル内の画像サイズは512x512 pixelsとしています．

解凍したzipファイルの中身を表示して確認します．
まず，capsuleというフォルダがあり，この中に画像データがあります．
さらに，この中に学習用データの`train`フォルダや評価用データの`test`フォルダがあります．
`test`フォルダの中には，欠陥の種類ごとに画像フォルダが分かれており，いくつかの画像データが格納されています．

In [None]:
import gdown
gdown.download('https://drive.google.com/uc?id=1iYsTe0OVD3dawP7HWsMZp7YDlzkwDllG', 'anomaly_detection_data.zip', quiet=True)
!unzip -q anomaly_detection_data.zip

!echo "directory =============="
!ls anomaly_detection_data
!ls anomaly_detection_data/capsule
!ls anomaly_detection_data/capsule/train/
!ls anomaly_detection_data/capsule/test

### データセットクラスの作成

次に，このMVTec-ADを読み込むためのデータセットクラスを定義します．
ここでは`MVTecAD`というクラス名で，定義を行います．

まず，`__init__`で，読み込む画像のフォルダを指定する`image_dir`と画像に対する前処理を定義する`transform`を引数として読みこみます．

そして，`__getitem__`では，指定された番号`i`番目の画像を読み込み，必要に応じて前処理を行ってから画像データを返すよう定義します．

In [None]:
class MVTecAD(torch.utils.data.Dataset):
    def __init__(self, image_dir, transform):
        self.transform = transform
        self.image_dir = image_dir

    def __len__(self):
        return len(os.listdir(self.image_dir))

    def __getitem__(self, i):
        filename = '{:0>3}.png'.format(i)
        image = Image.open(os.path.join(self.image_dir, filename))
        if self.transform:
            image = self.transform(image)
        return image, torch.zeros(1)

## ネットワークモデル

次にVAEのネットワークモデルを定義します．
ここでは，繰り返し処理による異常検知の論文 [1] で使用されている構造と同様のネットワークを定義します．
少し大きな構造ですが，Encoder, Decoderそれぞれ，8層の畳み込みからなるVAEを定義します．
その他はこれまでのAE, VAEの演習と同様の処理を定義します．

In [None]:
class VAE(nn.Module):
    def __init__(self, z_dim=100, input_c=1):
        super(VAE, self).__init__()

        self.z_dim = z_dim

        # Encoder
        self.encoder = nn.Sequential(
                nn.Conv2d(input_c, 32, kernel_size=4, stride=2, padding=1), # 128 -> 64
                nn.LeakyReLU(negative_slope=0.2, inplace=True),
                nn.Conv2d(32, 32, kernel_size=4, stride=2, padding=1),      # 64 -> 32
                nn.LeakyReLU(negative_slope=0.2, inplace=True),
                nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1),      # 32 -> 32
                nn.LeakyReLU(negative_slope=0.2, inplace=True),
                nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1),      # 32 -> 16
                nn.LeakyReLU(negative_slope=0.2, inplace=True),
                nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),      # 16 -> 16
                nn.LeakyReLU(negative_slope=0.2, inplace=True),
                nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),     # 16 -> 8
                nn.LeakyReLU(negative_slope=0.2, inplace=True),
                nn.Conv2d(128, 64, kernel_size=3, stride=1, padding=1),     # 8 -> 8
                nn.LeakyReLU(negative_slope=0.2, inplace=True),
                nn.Conv2d(64, 32, kernel_size=3, stride=1, padding=1),      # 8 -> 8
                nn.LeakyReLU(negative_slope=0.2, inplace=True),
                nn.Conv2d(32, z_dim, kernel_size=8, stride=1)               # 8 -> 1
            )

        self.mu_fc = nn.Linear(z_dim, z_dim)
        self.logvar_fc = nn.Linear(z_dim, z_dim)

        # Decoder
        self.decoder = nn.Sequential(
            nn.Upsample(scale_factor=8, mode='nearest'),
            nn.Conv2d(z_dim, 32, kernel_size=3, stride=1, padding=1),  # 1 -> 8
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),     # 8 -> 8
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),    # 8 -> 8
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            nn.Upsample(scale_factor=2, mode='nearest'),
            nn.Conv2d(128, 64, kernel_size=3, stride=1, padding=1),    # 8 -> 16
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),     # 16 -> 16
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            nn.Upsample(scale_factor=2, mode='nearest'),
            nn.Conv2d(64, 32, kernel_size=3, stride=1, padding=1),     # 16 -> 32
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1),     # 32 -> 32
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            nn.Upsample(scale_factor=2, mode='nearest'),
            nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1),     # 32 -> 64
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            nn.Upsample(scale_factor=2, mode='nearest'),
            nn.Conv2d(32, input_c, kernel_size=3, stride=1, padding=1),  # 64 -> 128
            nn.Sigmoid())

    def initialize(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d) or isinstance(m, nn.LayerNorm):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                nn.init.constant_(m.bias, 0)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5*logvar)
        eps = torch.randn_like(std)
        return mu + eps*std

    def forward(self, x):
        h      = self.encoder(x)

        h = torch.flatten(h, start_dim=1)
        mu     = self.mu_fc(h)                    # 平均ベクトル
        logvar = self.logvar_fc(h)                # 分散共分散行列の対数
        z      = self.reparameterize(mu, logvar)  # 潜在変数

        x_hat  = self.decoder(z.view(z.size(0), -1, 1, 1))
        self.mu     = mu.squeeze()
        self.logvar = logvar.squeeze()
        return x_hat

## データセット・ネットワークモデル・最適化手法・誤差関数の設定

次に，学習を開始するための，データセット，ネットワークモデル，最適化手法，誤差関数を設定します．

データセットの設定では，学習データのバリエーションを増幅させるために，transformに画像変換の処理を加えた前処理を定義します．


誤差関数に関しては，前回行ったVAEと同様の誤差関数を定義して使用します．

In [None]:
batch_size = 10

# データセットの設定
transform = transforms.Compose([
                transforms.Resize((128, 128)),
                transforms.RandomAffine(degrees=[-60, 60], translate=(0.1, 0.1), scale=(0.5, 1.5)),
                transforms.RandomHorizontalFlip(p=0.5),
                transforms.RandomVerticalFlip(p=0.5),
                transforms.ToTensor()
            ])
train_data = MVTecAD(image_dir="./anomaly_detection_data/capsule/train/good", transform=transform)
train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True)

# ネットワークモデル・最適化手法の設定
model = VAE(z_dim=100, input_c=3)
if use_cuda:
    model = model.cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=2e-4, betas=(0.5, 0.999), weight_decay=1e-5)

# 誤差関数
def loss_function(recon_x, x, mu, logvar):
    recon = F.binary_cross_entropy(recon_x, x, reduction='sum')
    kl_div = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return recon + kl_div

## 学習

学習を開始します．
学習自体は通常のVAEと同様に画像を再構成するよう学習を行います．

※ **この学習は非常に時間がかかり演習時間内に終えることが難しいため，学習の演算を割愛し，学習済みモデルを用いて異常検知の確認を行います．**
ご興味のある方は，講義終了後にご自身でうごかしてみてください．

In [None]:
epochs = 1000

model.train()
for epoch in range(1, epochs + 1):
    for idx, (inputs, _) in enumerate(train_loader):
        if use_cuda:
            inputs = inputs.cuda()
        output = model(inputs)
        loss = loss_function(output, inputs, model.mu, model.logvar)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if idx % 100 == 0 and epoch % 10 == 0:
            print('%d epoch [%d/%d] | loss: %.4f |' % (epoch, idx, len(train_loader), loss.item()))

    if epoch % 100 == 0:
        torch.save(model.state_dict(), "anomaly_det_model_%04d.pt" % epoch)

## VAEによる画像の再構成結果の確認


異常検知を行う前に，上記の学習により，正しく画像が再構成できているかを確認します．
まず，欠陥のある画像データを読み込むよう，`test_bad_data`および`test_loader`を定義します．

そして，欠陥画像をVAEへと入力して得られた再構成画像を可視化して確認します．

結果は左から，入力画像，VAEからの出力画像，その2つの画像の差分（絶対値）です．

ボケた画像が生成されますが，ある程度正しい形状や色を保った画像が出力されていることがわかります．
また，大きな割れ目のある部分は差分が大きくなっていることがわかります．

In [None]:
# 学習済みモデルの読み込み
model.load_state_dict(torch.load("./anomaly_detection_data/trained_model.pt"))

transform_test = transforms.Compose([transforms.Resize((128,128)), transforms.ToTensor()])
test_bad_data = MVTecAD(image_dir="./anomaly_detection_data/capsule/test/crack", transform=transform_test)
test_loader = torch.utils.data.DataLoader(test_bad_data, batch_size=1, shuffle=False)

model.eval()
with torch.no_grad():
    for ind, (inputs, _) in enumerate(test_loader):
        if use_cuda:
            inputs = inputs.cuda()

        reconstructed = model(inputs).detach()
        b = inputs.data.cpu().numpy()[0].transpose(1,2,0)

        a = reconstructed.data.cpu().numpy()[0].transpose(1,2,0)

        diff = np.abs(a - b)

        plt.figure(figsize=(12, 4))
        plt.subplot(1, 3, 1)
        plt.imshow(b, cmap='gray', vmin = 0, vmax = 1, interpolation='none')
        plt.subplot(1, 3, 2)
        plt.imshow(a, cmap='gray', vmin = 0, vmax = 1, interpolation='none')
        plt.subplot(1, 3, 3)
        plt.imshow(diff, interpolation='none')
        plt.show()

        if ind == 2:
            break

## 繰り返しによる異常検知

それでは次に，このVAEを用いて繰り返し処理による異常検知を行います．

まず，欠陥のある画像データを読み込むための`test_bad_data`および`test_loader`を定義します．


異常検知の処理の流れは次のようになります．

まず1枚の画像を用意します．
次に，その画像をVAEへと入力し，出力画像と元画像の差（MSE）を計算し，$E(x_t)$とします．
そして，その勾配$\nabla E(x_t)$と二乗誤差$(x_t - f_{VAE} (x_t))^2$の積を入力画像へ加えることで新たな画像$x_t$を生成します．

この$x_t$を再度VAEへと入力して同様の手順を繰り返すことで，異常部分が徐々に再構成され正常な画像へと近づいていきます．


![a.jpg](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/143078/09698b6d-1b24-cc9f-abf2-b6f43c3cf2c7.jpeg)





In [None]:
max_iter = 99
alpha = 0.5
lam = 0.05
decay_rate = 0.1
minimum = 1e12
th = 0.5

transform_test = transforms.Compose([transforms.Resize((128, 128)), transforms.ToTensor()])
test_bad_data = MVTecAD(image_dir="./anomaly_detection_data/capsule/test/crack", transform=transform_test)
test_bad_loader = torch.utils.data.DataLoader(test_bad_data, batch_size=1, shuffle=False)

model.eval()

mse_loss = nn.MSELoss(reduction='sum')
if use_cuda:
    mse_loss = mse_loss.cuda()

for index, (x_org, _) in enumerate(test_bad_loader):
    # 2つ目のサンプルを例として実行するため，1つ目を飛ばして実行
    if index == 0:
        continue

    img = x_org[0].data.numpy()

    x_t_images = []
    grad_images = []
    reconstructed_images = []

    if use_cuda:
        x_org = x_org.cuda()

    x_org.requires_grad_(True)
    rec_x = model(x_org).detach()

    loss = mse_loss(x_org, rec_x)
    loss.backward()
    grads = x_org.grad.data
    x_t = x_org - alpha*grads*(x_org - rec_x)**2

    grad_images.append(grads[0].cpu().numpy().transpose(1,2,0))
    reconstructed_images.append(rec_x[0].data.cpu().numpy().transpose(1,2,0))
    x_t_images.append(x_t[0].data.cpu().numpy().transpose(1,2,0))

    losses = torch.zeros(max_iter)

    for i in range(max_iter):
        x_t = Variable(x_t.clamp(min=0, max=1), requires_grad=True)
        rec_x = model(x_t).detach()
        rec_loss = mse_loss(x_t, rec_x)
        losses[i] = rec_loss.item()

        if minimum <= rec_loss:
            minimum = min(minimum, rec_loss)
        if rec_loss <= th:
            break

        l1 = torch.abs(x_t - x_org).sum()
        loss = rec_loss + lam*l1
        loss.backward()
        grads = x_t.grad.data

        mask = (x_t - rec_x)**2
        energy = grads * mask

        x_t = x_t - alpha*energy

        grad_images.append(grads[0].cpu().numpy().transpose(1,2,0))
        reconstructed_images.append(rec_x[0].data.cpu().numpy().transpose(1,2,0))
        x_t_images.append(x_t[0].data.cpu().numpy().transpose(1,2,0))

    break  # 1サンプル分の処理が終わった段階でforループを抜ける

### 結果の表示

上記の処理で得られた結果を可視化して確認します．

まず，オリジナルの入力画像と繰り返し処理で得られた画像$x_t$を可視化します．

結果を確認すると，繰り返し処理を行うことで，欠陥部分が徐々に復元され，正常な画像へと近づいていることがわかります．


In [None]:
plt.imshow(img.transpose(1, 2, 0), vmin = 0, vmax = 1, interpolation='none')
plt.title("orig image")
plt.show()

plt.figure(figsize=(30, 30))
for i, x_t_img in enumerate(x_t_images):
    plt.subplot(10, 10, i+1)
    plt.imshow(x_t_img, vmin = 0, vmax = 1, interpolation='none')
plt.show()

### 類似度（SSIM）による欠陥領域の可視化

入力画像と繰り返し処理で得られた画像の類似度の差から，欠陥領域の特定を行います．

ここでは，類似どの指標としてStructural Similarity (SSIM) とStructural Dissimilarity (DSSIM) を使用します．

元画像`input_image`と，n回目の反復で得られた画像`n_iter_image`のSSIMを計算します．
ここで，SSIMの計算には，Pythonのscikit-imageの関数を活用します．

この関数を適用することで．画像全体での類似度`ssim`と微小領域（小さなパッチ）ごとの類似度`ssim_img`を獲得します．

DSSIMはSSIMの逆の関係性を示した指標のため，`1 - ssim`を行うことで獲得できます．

この微小領域ごとのDSSIMを可視化すると，欠陥領域に高いDSSIMの値となっていることがわかります．
このようにすることで欠陥領域を特定することが可能となります．


In [None]:
from skimage.metrics import structural_similarity as ssim

iter_index = 10  # 何回目の反復画像と比較するか

input_image = np.mean(img.transpose(1, 2, 0), axis=2)
n_iter_image = np.mean(x_t_images[iter_index], axis=2)

ssim_value, ssim_img = ssim(input_image, n_iter_image, win_size=5, multichannel=False, full=True)
dssim = 1. - ssim_value
dssim_img = 1. - ssim_img

plt.figure(figsize=(10,3))
plt.subplot(1, 3, 1)
plt.imshow(input_image, cmap='gray', vmin=0, vmax=1, interpolation='none')
plt.subplot(1, 3, 2)
plt.imshow(n_iter_image, cmap='gray', vmin=0, vmax=1, interpolation='none')
plt.subplot(1, 3, 3)
plt.imshow(dssim_img, vmin=0, vmax=1, interpolation='none')
plt.colorbar()
plt.tight_layout()
plt.show()

## 課題

1. その他の画像（正常や他の異常画像）に対する結果を確認しましょう．
2. 反復回数を増加させたときにどのような再構成画像が生成されるか確認しましょう．

## 参考文献

[1] David Dehaene, Oriel Frigo, Sébastien Combrexelle, Pierre Eline, "Iterative energy-based projection on a normal data manifold for anomaly localization," in ICLR, 2020.