#data load

In [1]:
from PIL import Image
import sys, os, urllib.request, tarfile, cv2
import numpy as np
import matplotlib.pyplot as plt
from keras.utils import to_categorical
from keras.preprocessing import image

def convert(x, size=96):
    result = []
    for i in range(len(x)):
        img = cv2.resize(x[i],(size, size))
        result.append(img)
        
    return np.array(result)

class AD:
    def __init__(self, download_dir, path):
        self.path = "data/"

        if not os.path.exists(download_dir):
            os.mkdir(download_dir)

        # download file
        def _progress(count, block_size, total_size):
            sys.stdout.write('\rDownloading %s %.2f%%' % (source_path,
                float(count * block_size) / float(total_size) * 100.0))
            sys.stdout.flush()

        source_path = path
        dest_path = os.path.join(download_dir, "data.tar.xz")
        urllib.request.urlretrieve(source_path, filename=dest_path, reporthook=_progress)
        # untar
        with tarfile.open(dest_path, "r:xz") as tar:
            tar.extractall(self.path)

    def load_images(self, path, num):
        result = []
        for i in range(num):
            if i < 10:
                img = Image.open(self.path + path + "00" + str(i) + ".png")
            elif i < 100:
                img = Image.open(self.path + path + "0" + str(i) + ".png")
            else:
                img = Image.open(self.path + path + str(i) + ".png")
            img = image.img_to_array(img)
            img = cv2.resize(img,(224,224))
            result.append(img)
        return np.array(result)

Using TensorFlow backend.


In [2]:
print("\nHazelnut data download...")
Hazelnut = AD("./ad", "ftp://guest:GU.205dldo@ftp.softronics.ch/mvtec_anomaly_detection/hazelnut.tar.xz")
hazelnut_train = Hazelnut.load_images("hazelnut/train/good/", 391)
hazelnut_test_normal = Hazelnut.load_images("hazelnut/test/good/", 40)
hazelnut_test_anomaly = Hazelnut.load_images("hazelnut/test/crack/", 18)
hazelnut_test_anomaly = np.vstack((hazelnut_test_anomaly, Hazelnut.load_images("hazelnut/test/cut/", 17)))
hazelnut_test_anomaly = np.vstack((hazelnut_test_anomaly, Hazelnut.load_images("hazelnut/test/print/", 17)))
hazelnut_test_anomaly = np.vstack((hazelnut_test_anomaly, Hazelnut.load_images("hazelnut/test/hole/", 18)))


Hazelnut data download...
Downloading ftp://guest:GU.205dldo@ftp.softronics.ch/mvtec_anomaly_detection/hazelnut.tar.xz 100.00%

In [0]:
hazelnut_train /= 255
hazelnut_test_normal /= 255
hazelnut_test_anomaly /= 255

x_train = convert(hazelnut_train)
x_test_normal = convert(hazelnut_test_normal)
x_test_anomaly = convert(hazelnut_test_anomaly)

#異常検知の学習

##cifar10

In [0]:
import keras
import matplotlib.pyplot as plt
from keras.preprocessing.image import array_to_img, img_to_array
from keras.preprocessing.image import ImageDataGenerator

from keras.datasets import cifar10
from keras.utils import to_categorical
from keras.layers import Input, Dense, Activation
from keras.optimizers import SGD, Adam
from keras.models import Model
from keras import backend as K
from keras.applications import MobileNetV2

Height = 96
Width = 96
channel = 3

def resize(x):
    x_out = []
    
    for i in range(len(x)):
        img = cv2.resize(x[i], dsize=(Width, Height))
        x_out.append(img)
                
    return np.array(x_out)

def cifar(x_class6):
    # dataset
    (x_train, y_train), (x_test, y_test) = cifar10.load_data()

    x_train = x_train.astype('float32') / 255
    x_test = x_test.astype('float32') / 255

    #refデータからランダムに6000個抽出
    number = np.random.choice(np.arange(0,len(x_train)),3000,replace=False)

    x, y = [], []

    for i in number:
        x.append(x_train[i])
        y.append(y_train[i])
    
    x = resize(x)

    #正常データからランダムに300個抽出
    number = np.random.choice(np.arange(0,len(x_class6)),300,replace=False)

    xx, yy = [], []

    for i in number:
        xx.append(x_class6[i])
        yy.append(10)
    
    #正常データと結合
    x = np.vstack((x,xx))
    y = np.vstack((y,np.array(yy).reshape((-1,1))))

    X_train = np.array(x)
    Y_train = to_categorical(y)

    print(X_train.shape)
    print(Y_train.shape)
    print(x_test_anomaly.shape)
    print(np.max(X_train))
    print(np.min(X_train))
    print(np.max(x_test_anomaly))
    print(np.min(x_test_anomaly))

    return X_train, Y_train

In [5]:
X_train, Y_train = cifar(x_train)

Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
(3300, 96, 96, 3)
(3300, 11)
(70, 96, 96, 3)
1.0
0.0
1.0
0.08235294


##MobileNet V2

In [0]:
def mobilenet(x, y):
    classes = 11
    #alpha_ = 5

    mobile = MobileNetV2(include_top=True, input_shape=x.shape[1:], alpha=1.0,
                         weights='imagenet')
    
    # 最終層削除
    f_model = Model(inputs=mobile.input,outputs=mobile.layers[-2].output)
            
    # (L2層と)全結合層を付ける
    #c = keras.layers.Lambda(lambda xx: alpha_*(xx)/K.sqrt(K.sum(xx**2)))(f_model.output) #metric learning
    c = Dense(classes, activation='softmax')(f_model.output)
    f_model = Model(inputs=f_model.input,outputs=c)

    f_model.compile(loss='categorical_crossentropy',
                  optimizer = SGD(lr=5e-4, decay=0.00005),#Adam(lr=0.0001, amsgrad=True)
                  metrics=['accuracy'])
    
    hist = f_model.fit(x,
                       y,
                       batch_size=64,
                       shuffle = True,
                       epochs=50,#30,
                       verbose = True)

    return f_model

In [9]:
model = mobilenet(X_train, Y_train)







Downloading data from https://github.com/JonathanCMitchell/mobilenet_v2_keras/releases/download/v1.1/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_96.h5

Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [0]:
# 層削除
f_model = Model(inputs=model.input,outputs=model.layers[-24].output, name="f_net")
#f_model.summary()

#Partial Convolutions(Unet)

In [0]:
#Partial Convolutionsの最小サイズは256*256のため画像をリサイズ
x_class6_256 = convert(hazelnut_train, size=256)
x_test_anomaly_256 = convert(hazelnut_test_anomaly, size=256)

!git clone https://github.com/shinmura0/PConv-Keras
%cd PConv-Keras/

Cloning into 'PConv-Keras'...
remote: Enumerating objects: 20, done.[K
remote: Counting objects: 100% (20/20), done.[K
remote: Compressing objects: 100% (20/20), done.[K
remote: Total 445 (delta 9), reused 0 (delta 0), pack-reused 425
Receiving objects: 100% (445/445), 63.89 MiB | 44.02 MiB/s, done.
Resolving deltas: 100% (121/121), done.
/content/PConv-Keras


In [0]:
import gc
from copy import deepcopy
import cv2
import numpy as np

import matplotlib
import matplotlib.pyplot as plt

from libs.util import random_mask
from libs.pconv_model import PConvUnet

# Settings
BATCH_SIZE = 4

%matplotlib inline
%load_ext autoreload
%autoreload 2

class DataGenerator(ImageDataGenerator):
    def flow(self, x, *args, **kwargs):
        while True:
            
            # Get augmentend image samples
            ori = next(super().flow(x, *args, **kwargs))

            # Get masks for each image sample
            mask = np.stack([random_mask(ori.shape[1], ori.shape[2]) for _ in range(ori.shape[0])], axis=0)

            # Apply masks to all image sample
            masked = deepcopy(ori)
            masked[mask==0] = 1

            # Yield ([ori, masl],  ori) training batches
            # print(masked.shape, ori.shape)
            gc.collect()
            yield [masked, mask], ori        

# Create datagen
train_datagen = DataGenerator(rotation_range=10,
                              #width_shift_range=0.2,
                              #height_shift_range=0.2,
                              zoom_range = [0.9,1],
                              horizontal_flip=True
                              )

# Create generator from numpy arrays
train_generator = train_datagen.flow(x=x_class6_256, batch_size=BATCH_SIZE)

# Create datagen
test_datagen = DataGenerator(horizontal_flip=True)

# Get samples & Display them
test_generator = test_datagen.flow(x=x_test_anomaly_256, batch_size=3)
(masked, mask), ori = next(test_generator)

def plot_callback(model):
    """Called at the end of each epoch, displaying our previous test images,
    as well as their masked predictions and saving them to disk"""
    
    # Get samples & Display them        
    pred_img = model.predict([masked, mask])

    # Clear current output and display test images
    for i in range(len(ori)):
        _, axes = plt.subplots(1, 3, figsize=(20, 5))
        axes[0].imshow(masked[i,:,:,:])
        axes[1].imshow(pred_img[i,:,:,:] * 1.)
        axes[2].imshow(ori[i,:,:,:])
        axes[0].set_title('Masked Image')
        axes[1].set_title('Predicted Image')
        axes[2].set_title('Original Image')                
        plt.show()

##学習（5,6時間かかる）

In [0]:
p_model = PConvUnet(img_rows=256, img_cols=256)
p_model.fit(train_generator,
            steps_per_epoch=1000,
            epochs=30,
            plot_callback=plot_callback)

#Ano-Unet V2実行

In [0]:
from sklearn.preprocessing import MinMaxScaler
from sklearn.neighbors import LocalOutlierFactor

def grid_score(target_img, lof, ms, p_model, f_model, divide):
    shape = target_img.shape
    result = np.zeros((shape[0], shape[1]),np.float32)

    for i in range(divide):
        for j in range(divide):
            img = np.zeros(target_img.shape, np.uint8)
            x_begin = int(i*shape[1]/divide)
            x_end = int((i+1)*shape[0]/divide)
            y_begin = int(j*shape[1]/divide)
            y_end = int((j+1)*shape[0]/divide)
            cv2.rectangle(img, (x_begin, y_begin), (x_end, y_end), (1, 1, 1), thickness=-1)
            mask = 1-img

            # Image + mask
            masked_img = deepcopy(target_img)
            masked_img[mask==0] = 1
            predict_img = p_model.predict([np.expand_dims(masked_img, axis=0), np.expand_dims(mask, axis=0)])
            predict_img = cv2.resize(predict_img[0], (96,96))

            score = f_model.predict(np.expand_dims(predict_img,axis=0))
            score = ms.transform(score.reshape((1,-1)))
            score = -lof._decision_function(score)

            result[y_begin:y_end, x_begin:x_end] = score[0]

    result = (result - np.min(result))/(np.max(result)-np.min(result))# 0-1
    result = 1-result # 異常度大きい場所＝1
    return result**2

def get_lof(f_model, x):
    train = f_model.predict(x)
    train = train.reshape((len(train),-1))

    ms = MinMaxScaler()
    train = ms.fit_transform(train)

    # fit the model
    lof = LocalOutlierFactor(n_neighbors=5)
    y_pred = lof.fit(train[:1000])

    return lof, ms

def connect(x, x_predict):
    jet = cv2.applyColorMap(np.uint8(255 * x_predict), cv2.COLORMAP_JET)  # 疑似的に色をつける
    jet = cv2.cvtColor(jet, cv2.COLOR_BGR2RGB)  # 色をRGBに変換
    jet = (np.float32(jet) + x*255 / 2)   # もとの画像に合成
    return array_to_img(jet)

def heatmap(target, lof, ms, p_model, f_model):
    map1 = grid_score(target, lof, ms, p_model, f_model, divide=20)
    map2 = grid_score(target, lof, ms, p_model, f_model, divide=18)
    map3 = grid_score(target, lof, ms, p_model, f_model, divide=16)
    map4 = grid_score(target, lof, ms, p_model, f_model, divide=14)
    result = map1 + map2 + map3 + map4

    plt.figure(figsize=(8,16))
    plt.subplot(1,2,1)
    plt.imshow(target)
    plt.axis("off")

    plt.subplot(1,2,2)
    plt.imshow(connect(target, result/np.max(result)))
    plt.axis("off")
    plt.show()

In [0]:
lof, ms = get_lof(f_model, x_train)

for i in range(20):
    print(i)
    target = np.copy(x_test_anomaly_256[i])
    heatmap(target, lof, ms, p_model, f_model)

Output hidden; open in https://colab.research.google.com to view.