# Import Libraries

In [1]:
import tensorflow as tf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import cv2

from tensorflow.keras.layers import GlobalAveragePooling2D, Dense
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import EarlyStopping

import os, datetime, re
from os.path import join, basename, dirname
from glob import glob
from IPython.display import display
from PIL import Image

# import imageio
# import imgaug as ia
from imgaug import augmenters as iaa


from typing import NewType, Tuple, List

Path = NewType('Path', str) # 파일 경로임을 명확히 한다.

In [2]:
# GPU 비활성화
# os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

In [3]:
# GPU 확인
from tensorflow.python.client import device_lib

print(device_lib.list_local_devices())

[name: "/device:CPU:0"
device_type: "CPU"
memory_limit: 268435456
locality {
}
incarnation: 14314436273896905588
xla_global_id: -1
, name: "/device:GPU:0"
device_type: "GPU"
memory_limit: 5765005312
locality {
  bus_id: 1
  links {
  }
}
incarnation: 712657584621666670
physical_device_desc: "device: 0, name: NVIDIA GeForce RTX 3060 Ti, pci bus id: 0000:0a:00.0, compute capability: 8.6"
xla_global_id: 416903419
]


# Data Load

## 경로 설정

In [4]:
path_to_current = os.getcwd()
path_to_dataset = join(path_to_current, 'open')
print(f'{path_to_current = }')

path_to_current = 'e:\\VSCodeProjects\\DACON_Anomoly_Detection'


## train data -> DataFrame

In [5]:
df = pd.read_csv(join(path_to_dataset, 'train_df.csv'), index_col=['index'])

display(df)

Unnamed: 0_level_0,file_name,class,state,label
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
0,10000.png,transistor,good,transistor-good
1,10001.png,capsule,good,capsule-good
2,10002.png,transistor,good,transistor-good
3,10003.png,wood,good,wood-good
4,10004.png,bottle,good,bottle-good
...,...,...,...,...
4272,14272.png,transistor,good,transistor-good
4273,14273.png,transistor,good,transistor-good
4274,14274.png,grid,good,grid-good
4275,14275.png,zipper,good,zipper-good


## 모델학습을 위해 데이터 값 변형

In [6]:
# 절대 경로로 변환
df.file_name = df.file_name.apply(lambda path: join(path_to_dataset, 'train', path))

# class numbering
categories = {class_:i for i, class_ in enumerate(df['class'].unique())}
print(f"{categories = }")

# class: str -> int
df['class'] = df['class'].apply(lambda class_: categories[class_])

display(df)

categories = {'transistor': 0, 'capsule': 1, 'wood': 2, 'bottle': 3, 'screw': 4, 'cable': 5, 'carpet': 6, 'hazelnut': 7, 'pill': 8, 'metal_nut': 9, 'zipper': 10, 'leather': 11, 'toothbrush': 12, 'tile': 13, 'grid': 14}


Unnamed: 0_level_0,file_name,class,state,label
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
0,e:\VSCodeProjects\DACON_Anomoly_Detection\open...,0,good,transistor-good
1,e:\VSCodeProjects\DACON_Anomoly_Detection\open...,1,good,capsule-good
2,e:\VSCodeProjects\DACON_Anomoly_Detection\open...,0,good,transistor-good
3,e:\VSCodeProjects\DACON_Anomoly_Detection\open...,2,good,wood-good
4,e:\VSCodeProjects\DACON_Anomoly_Detection\open...,3,good,bottle-good
...,...,...,...,...
4272,e:\VSCodeProjects\DACON_Anomoly_Detection\open...,0,good,transistor-good
4273,e:\VSCodeProjects\DACON_Anomoly_Detection\open...,0,good,transistor-good
4274,e:\VSCodeProjects\DACON_Anomoly_Detection\open...,14,good,grid-good
4275,e:\VSCodeProjects\DACON_Anomoly_Detection\open...,10,good,zipper-good


In [7]:
x_train_all, y_train_all = df.file_name.values, df['class'].values
x_train, x_val, y_train, y_val = train_test_split(x_train_all, y_train_all, stratify=y_train_all,
                                                  test_size=.05, random_state=0)
print(f'{len(x_train) = }\n{len(x_val) = }')

len(x_train) = 4063
len(x_val) = 214


# Model Train

## 입력데이터 조정
* 과적합을 위해서 매번 입력이미지를 변형
* GPU 메모리 부족 방지를 위해 Generator 사용

In [8]:
input_shape = (224, 224)

# 과적합 방지를 위해서 매번 입력 이미지를 변형한다.
seq = iaa.Sequential([
    iaa.Affine(translate_percent={'x': (-0.1, 0.1), 'y': (-0.1, 0.1)},
               scale={"x": (0.9, 1.1), "y": (0.9, 1.1)}),
    iaa.Rotate(rotate=(-180, 180)),
])

def normalize(X):
    return X / 255

# 과적합 방지를 위해서 매번 입력 이미지를 변형한다.
seq = iaa.Sequential([
    iaa.Affine(translate_percent={'x': (-0.1, 0.1), 'y': (-0.1, 0.1)},
               scale={"x": (0.9, 1.1), "y": (0.9, 1.1)}),
    iaa.Rotate(rotate=(-35, 35)),
])

def steps(X, batch_size):
    return (len(X) + 1) // batch_size if len(X) % batch_size else len(X) // batch_size

def check(X, input_shape):
    arrs = []
    mask = []
    for i, image in enumerate(X):
        try:
            arr = np.array(Image.open(image).resize(input_shape), dtype=np.uint8)
            if arr.shape == (*input_shape, 3):
                arrs.append(arr)
                mask.append(True)
            else:
                mask.append(False)
        except:
            mask.append(False)
    return np.array(arrs, dtype=np.float32), np.array(mask)

# 메모리 부족현상 방지를 위해서 제너레이터 사용한다.
def x_y_generator(x, y, batch_size=64, epochs=1, transform=False, input_shape=(224, 224)):
    seq = iaa.Sequential([
        iaa.Affine(translate_percent={'x': (-0.1, 0.1), 'y': (-0.1, 0.1)},
                scale={"x": (0.9, 1.1), "y": (0.9, 1.1)}),
        iaa.Rotate(rotate=(-35, 35)),
    ])
        
    steps_ = steps(x, batch_size)
    batch_order = np.arange(steps_)
    while epochs:
        np.random.shuffle(batch_order)  # 배치들간의 순서 섞기
        for order in batch_order:
            start = order * batch_size
            end   = start + batch_size
            
            images, mask = check(x[start:end], input_shape=input_shape)
            # images = np.array([
            #     np.array(Image.open(image).resize(input_shape), dtype=np.float32) for image in x[start:end]
            # ])

            yield (
                normalize(seq(images=images)) if transform else normalize(images),
                y[start:end][mask],
            )
        epochs -= 1

In [9]:
classes = len(df['class'].unique())

model = tf.keras.applications.densenet.DenseNet121(
    include_top=False,          # include fully-connected layer
    weights='imagenet',         # 미리 학습된 데이터 사용
    input_tensor=None,
    input_shape=(*input_shape, 3),     # 224 x 224 이미지 사용
    pooling=None,
    classes=classes,            # 모든 품종의 클래스 사용
)

# 모델 커스터마이징
x = model.output
x = GlobalAveragePooling2D(name = "avg_pool")(x)
outputs = Dense(classes, activation=tf.nn.softmax, name="predictions")(x)

model = tf.keras.Model(model.input, outputs)

opt = tf.keras.optimizers.SGD(learning_rate=0.01)
model.compile(loss='sparse_categorical_crossentropy', optimizer=opt,  metrics=['accuracy'])

In [10]:
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 zero_padding2d (ZeroPadding2D)  (None, 230, 230, 3)  0          ['input_1[0][0]']                
                                                                                                  
 conv1/conv (Conv2D)            (None, 112, 112, 64  9408        ['zero_padding2d[0][0]']         
                                )                                                                 
                                                                                              

In [11]:
batch_size = 8
steps_per_epoch = steps(x_train, batch_size)
validation_steps = steps(x_val, batch_size)
epochs = 5

In [12]:
early_stopping = EarlyStopping(monitor='val_loss', min_delta=0, patience=5, verbose=1,
                               restore_best_weights=True)

In [13]:
history_ = model.fit(x_y_generator(x_train, y_train, batch_size=batch_size, epochs=epochs, transform=True),
                     epochs=epochs, steps_per_epoch=steps_per_epoch,
                     validation_data=x_y_generator(x_val, y_val, batch_size=batch_size, epochs=-1),
                     validation_batch_size=batch_size, validation_steps=validation_steps,
                     callbacks=[early_stopping])

Epoch 1/5
Epoch 2/5

KeyboardInterrupt: 