# PBL_02 不良箇所自動検出 良否判定モデル構築用サンプルコード

本jupyter notebookはPBL_02 不良箇所自動検出のサンプルコードです。<br>
**で囲まれた箇所をご自身の環境に合わせて変更いただいた上で本jupyter notebookを上から下まで実行いただけると提出可能なファイルが出力されます。<br>
<br>
本jupyter notebookの構成は以下のようになっております。<br>
<ol>
<li>Googleドライブと接続</li>
    Googleドライブとサンプルコードを接続する。
<li>ライブラリimport</li>
    必要なライブラリのimportを行う。
<li>パラメータ設定</li>
    画像分類アルゴリズム"VGG"に関する設定とデータ、ウェイトのパス設定を行う。
<li>VGGのネットワーク定義</li>
    VGGのネットワークを本課題向けにカスタマイズし、カスタマイズしたVGGを宣言する。<br>
    具体的には、VGGの基本設定、事前重み有無設定、出力層のカスタマイズを行う。
<li>学習・検証データの読み込み</li>
    3. で指定した格納先の学習・検証データを読み込む。
<li>モデルの学習</li>
    読み込んだデータを用いてVGGを学習させる。
<li>モデルによる判定</li>
    構築したモデルによる判定を実施する。
<li>学習・検証データに対する精度評価</li>
    学習・検証データに対する精度をF1-score、Precision、Recallで評価します
<li>提出ファイルの出力</li>
    テストデータに対して良否判定を行い、その結果を提出フォーマットであるtsv形式で出力を行う。
</ol>

## 1. Googleドライブと接続
学習データなどを読み込めるようにするため、Googleドライブとサンプルコードを接続します。

In [None]:
# Goolgle Colaboratoryを使用する場合のみ実行。
from google.colab import drive
drive.mount('/content/drive')

## 2. ライブラリimport
本サンプルコードで使用するライブラリのバージョンを指定します。<br>
便利なプログラムをひとまとめにし、誰でも使いやすい状態にしたものを"ライブラリ"と読んでいます。<br>
本サンプルコードでは、いくつかの"ライブラリ"を使用するため、使用するライブラリが何かをこちらで宣言（import ~~）しています。<br>
宣言することでgoogle colaboratoryはどのライブラリを使用するのか認識し、ライブラリのプログラムを使用できるようになります。<br>
これに関する解説はSignate Cloudの [Gym > pandas入門道場 > Introduction](https://biz.quest.signate.jp/quests/10007/contents/1) をご覧ください。<br>

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
from sklearn.metrics import (
    f1_score,
    precision_score,
    recall_score,
)
from tensorflow import keras
from keras import optimizers
from keras.preprocessing.image import ImageDataGenerator
from keras.preprocessing import image
from keras.models import Sequential, Model
from keras.layers import Conv2D, MaxPooling2D, AveragePooling2D, Input
from keras.layers import Activation, Dropout, Flatten, Dense
from keras import backend as K
# from keras import optimizers
from keras.utils import np_utils
from keras.applications.vgg16 import VGG16
import keras.utils as image
import glob
import tensorflow as tf
tf.random.set_seed(1)
plt.style.use('ggplot')

## 3. パラメータ設定
画像分類アルゴリズム"VGG"に関する設定とデータ・ウェイトのパス設定を行います。<br>
次セルの「画像分類アルゴリズム"VGG"に関する設定」でパラメータを変更することにより、精度向上が期待できます。

In [None]:
# 画像分類アルゴリズム"VGG"に関する設定
# 入力画像サイズの高さと幅（サイズを大きくするとより画像の特徴が明らかになり、特徴を学習しやすくなる可能性があります。一方、学習に要する時間が増えてしまいます。
IMG_WIDTH, IMG_HEIGHT = 224, 224
TARGET_SIZE = (IMG_WIDTH, IMG_HEIGHT)
# 判定分類数（4分類を判定するモデルを構築し、そのモデルの判別結果を最後に良品、不良品の2分類に変換する前提）
NB_CLASSES = 4
# 学習時のエポック数（学習データを何回学習させるかのパラメータ。回数を増やすとその分モデルは学習データの特徴を学習できる。ただし、学習用データに過度に適合したモデルになる可能性がある。）
EPOCHS = 30
# バッチサイズ
# （一度の学習で何枚の画像データを使用するかというパラメータ。100枚の学習データがあった際にバッチサイズを5とすると、5枚ずつ20回学習することを意味する。
# ここで言う"一度の学習"とは、VGGモデルが持つパラメータを更新するタイミングのことを言う。バッチサイズ5のときは、5枚のデータ毎にパラメータが更新されることになる。）
BATCH_SIZE = 5

In [None]:
if K.image_data_format() == 'channels_first':
    input_shape = (3, IMG_WIDTH, IMG_HEIGHT)
else:
    input_shape = (IMG_WIDTH, IMG_HEIGHT, 3)

In [None]:
# # データとウェイトに関する設定
# 学習データ保存先にはbridge, horn, potato, regularのフォルダがあり、各フォルダ配下に画像が格納されている想定
train_data_dir = '/content/drive/MyDrive/DXQuest/train'
# 検証用データ保存先にはbridge, horn, potato, regularのフォルダがあり、各フォルダ配下に画像が格納されている想定。
# 本サンプルコードでは、簡便さを重視し、検証用データも学習データと同じものを使用。通常、学習と検証用のデータは任意の割合で分割する
validation_data_dir = '/content/drive/MyDrive/DXQuest/train'
# テストデータ保存先には画像データが格納されている想定
test_data_dir = '/content/drive/MyDrive/DXQuest/test'
# 画像分類アルゴリズムのweightファイル保存場所
weight_dir = '/content/drive/MyDrive/DXQuest/weights'
# weightファイルの名前
save_weights_path = os.path.join(weight_dir, 'weights.h5') # 'weights.h5'のファイル名は変更可

## 4. VGGのネットワーク定義
VGGのネットワークを本課題向けにカスタマイズします。<br>
具体的には、VGGの基本設定と事前重みの設定（以下「weights='imagenet'」がこれに相当）、出力層のカスタマイズなどを行います。<br>
事前重みの設定とは、あらかじめ様々なデータでVGGを学習させて見つけ出した良いパラメータ値をパラメータの初期値として設定することを意味します。これにより、始めからある程度汎用性のあるモデルになります。
更に、今回のデータでこのモデルを学習させることで学習速度を速め、かつ安定したモデルにすることを目指します。<br>
出力層のカスタマイズとは、今回の課題に合わせて、モデルの出力数を4種or2種に調整することを意味します。<br>
<br>
精度向上を目指すためには、以下の「VGGの学習方法の定義」セルにてoptimizerの種類と学習率を変更することが効果的かもしれません。これらに関する解説は、Signate Cloudの [Gym > DeepLearning入門〜画像分類編〜 > 畳み込みニューラルネットワーク](https://biz.quest.signate.jp/quests/10017) を参考にしてください。

In [None]:
# VGGの基本設定と事前重みの設定
base_model = VGG16(
    weights='imagenet',
    include_top=False,
    input_tensor=Input(shape=(IMG_WIDTH, IMG_HEIGHT, 3))
)

In [None]:
# ネットワーク構造の確認
base_model.summary()

In [None]:
# 出力層のカスタマイズ（出力に近い層を本課題に合わせて変更）
top_model = base_model.output
top_model = Flatten(name='flatten')(top_model)
top_model = Dense(512, activation='relu')(top_model)
top_model = Dropout(0.5)(top_model)
top_model = Dense(NB_CLASSES, activation='softmax')(top_model)

In [None]:
# カスタマイズ後のVGGの定義
model = Model(
    inputs=base_model.input,
    outputs=top_model
)
for layer in base_model.layers:
    layer.trainable = False

In [None]:
#  VGGの学習方法の定義
model.compile(
    loss='categorical_crossentropy',
    optimizer=keras.optimizers.RMSprop(learning_rate=1e-4), #optimizerの種類（"RMSprop"の箇所）と学習率（"lr"の箇所）を変更することにより、精度向上が期待できます。
    metrics=['accuracy'],
)

In [None]:
# ネットワーク構造の確認
model.summary()

## 5. 学習・検証データの読み込み
3.で指定した格納先の学習・検証データを読み込みます。<br>
読み込み前、読み込み時にこれらのデータに前処理（特徴量の強調、クラス分布の平準化、データの分布平準化）を施すことで精度向上が期待できます。

In [None]:
# VGGに入力できるよう画像サイズの圧縮
train_datagen = ImageDataGenerator(rescale=1.0/255) # 前処理を（）内に追加可能
valid_datagen = ImageDataGenerator(rescale=1.0/255) # 前処理を（）内に追加可能
# test_datagen = ImageDataGenerator(rescale=1.0/255) #要確認

In [None]:
#学習・検証データの読み込み
train_generator = train_datagen.flow_from_directory(
    train_data_dir,
    target_size=TARGET_SIZE,
    batch_size=BATCH_SIZE,
    shuffle=True,
)

validation_generator = valid_datagen.flow_from_directory(
    validation_data_dir,
    target_size=TARGET_SIZE,
    batch_size=BATCH_SIZE,
    shuffle=True,
)

In [None]:
# 学習・検証データとして上記にて読み込んだ画像を設定
nb_train_samples = train_generator.samples
nb_validation_samples = validation_generator.samples

## 6. モデルの学習
読み込んだ学習データを用いてVGGを学習させます。

In [None]:
model.fit(x=train_generator,
          epochs=EPOCHS,
          validation_data=validation_generator,
          steps_per_epoch=nb_train_samples/BATCH_SIZE,
          validation_steps=nb_validation_samples/BATCH_SIZE
          )

In [None]:
if not os.path.exists(weight_dir):
  os.mkdir(weight_dir)
model.save_weights(save_weights_path)

In [None]:
# loss curveの表示
plt.figure(figsize=[10,8])
plt.plot(model.history.history['loss'], 'r')
plt.plot(model.history.history['val_loss'], 'b')
plt.legend(['Training loss', 'Validation Loss'])
plt.xlabel('Epochs', fontsize=16)
plt.ylabel('Loss', fontsize=16)
plt.title('Loss Curves', fontsize=16)

plt.show()

In [None]:
# accuracy curveの表示
plt.figure(figsize=[10,8])
plt.plot(model.history.history['accuracy'], 'r')
plt.plot(model.history.history['val_accuracy'], 'b')
plt.legend(['Training Accuracy', 'Validation Accuracy'])
plt.xlabel('Epochs', fontsize=16)
plt.ylabel('Accuracy', fontsize=16)
plt.title('Accuracy Curves', fontsize=16)

plt.show()

## 7. モデルによる判定
学習・検証用データに構築したモデルで良品、不良品の判定を行います。

In [None]:
# weightファイルの読み込み
print('load model...')
model.load_weights(save_weights_path)

In [None]:
# 良否判定実行関数
def get_predict(model,
                train_data_dir: str,
                test_data_dir: str):
    """This function will performs model inferencing using test data
    and stores the results into the lists.

    Args:
        model (object): The trained model.
        train_data_dir (str): The location of train images.
        test_data_dir (str): The location of test images.

    Returns:
        filenames (list): filenames of predicted images.
        true_classes (list): true classes of predicted images.
        pred_classes (list): prediction classes of predicted images.
    """

    data_datagen = ImageDataGenerator(rescale=1/255.)

    test_generator = data_datagen.flow_from_directory(
        test_data_dir,
        target_size=TARGET_SIZE,
        class_mode=None,
        batch_size=1,
        shuffle=False,
    )
    preds = model.predict_generator(test_generator)

    preds_class_idx = preds.argmax(axis=-1)

    # get prediction class
    train_datagen = ImageDataGenerator(rescale=1./255)

    train_generator = train_datagen.flow_from_directory(
        train_data_dir,
        target_size=TARGET_SIZE,
        batch_size=BATCH_SIZE,
    )

    idx_to_class = {v: k for k, v in train_generator.class_indices.items()}
    pred_classes = np.vectorize(idx_to_class.get)(preds_class_idx)
    filenames_to_class = list(zip(test_generator.filenames, pred_classes))

    # get true class
    filenames = []
    true_classes = []

    for item in test_generator.filenames:
        filenames.append(item)
        # get true class from the filenames
        true_class = item.split('/')[0]
        true_classes.append(true_class)

    return filenames, true_classes, pred_classes

In [None]:
# 精度算出関数
def get_f1(true_labels_list: list,
           predictions_list: list,
           average_method: str,
          ) -> (float, float, float):
    """This function will performs model inferencing using test data
    and stores the results into the lists.

    Args:
        true_labels_list (list): List of true labels.
        predictions_list (list): List of predictions.
        average_method (string): method to average score.

    Returns:
        f1 (float): return f1 metric.
        precision (float): return precision metric.
        recall (float): return recall metric.
    """
    f1 = f1_score(
        y_true=true_labels_list,
        y_pred=predictions_list,
        average=average_method
    )

    precision = precision_score(
        y_true=true_labels_list,
        y_pred=predictions_list,
        average=average_method,
    )

    recall = recall_score(
        y_true=true_labels_list,
        y_pred=predictions_list,
        average=average_method,
    )

    f1 = round(f1, 2)
    precision = round(precision, 2)
    recall = round(recall, 2)

    return f1, precision, recall

In [None]:
# 良否判定実行
train_filenames, train_true_classes, train_pred_classes = get_predict(
    model=model,
    train_data_dir=train_data_dir,
    test_data_dir=train_data_dir,
)
valid_filenames, valid_true_classes, valid_pred_classes = get_predict(
    model=model,
    train_data_dir=train_data_dir,
    test_data_dir=validation_data_dir,
)

## 8. 学習・検証データに対する精度評価
学習・検証データに対する精度をF1-score、Precision、Recallで評価します。

In [None]:
# 精度算出
train_f1, train_prec, train_recall = get_f1(
    true_labels_list=train_true_classes,
    predictions_list=train_pred_classes,
    average_method='weighted',
)
valid_f1, valid_prec, valid_recall = get_f1(
    true_labels_list=valid_true_classes,
    predictions_list=valid_pred_classes,
    average_method='weighted',
)

In [None]:
# 精度表示
print('{:15}{:<15.2f}{:<15.2f}'.format('F1-score:', train_f1, valid_f1))
print('{:15}{:<15.2f}{:<15.2f}'.format('Precision:', train_prec, valid_prec))
print('{:15}{:<15.2f}{:<15.2f}'.format('Recall:', train_recall, valid_recall))

## 9. 提出ファイルの出力
テストデータに対して良否判定を行い、その結果を提出フォーマットであるtsv形式で出力を行います。

In [None]:
# 分類とラベルの対応確認
label_map = (train_generator.class_indices)
print(label_map)

In [None]:
# テストデータに対して1つずつ予測し、テストデータのファイル名と判定結果をリストに保存
file_list = []
pred_list = []
for file in glob.glob(test_data_dir + '/*'):
    image_data = file
    filename = file.split('/')[-1]
    img = image.load_img(image_data, target_size=(IMG_WIDTH, IMG_HEIGHT))
    x = image.img_to_array(img)
    x = np.expand_dims(x, axis=0)
    x = x / 255
    pred = model.predict(x)[0]
    judge = np.argmax(pred)

    # *bridge, horn, potatoを不良（'1'）に、regularを良品（'0'）に変換。if文の条件分岐は上の「分類とラベルの対応確認」セルの結果を参考に変更すること*
    if judge==0:
        judge=1
    elif judge==1:
        judge=1
    elif judge==2:
        judge=1
    else:
        judge=0

    pred_list.append(judge)
    file_list.append(filename)

In [None]:
#判別結果をDataFrameに変換し、tsvファイルに出力
df = pd.DataFrame([file_list, pred_list]).T
df.to_csv('/content/drive/MyDrive/DXQuest/my_submission.tsv',
         index=False,
         header=False,
         sep='\t')