# CNN fine tuning for Intel Image Classification

# 環境設定とデータの準備

In [None]:
# 環境確認
import sys, keras
print('Python:', sys.version)
print('Keras :', keras.__version__)

In [None]:
# Google Colab環境を使用する場合、Google DriveをGoogle Colab環境にマウントしておきます。
# local環境の場合、このセルは実行不要です。
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# ディレクトリ構成の定義（環境に合わせて修正してください）
base_path     = '/content/drive/MyDrive/Colab Notebooks/easy_fine_tuning_with_keras/'  # 本 .ipynb を置いているフォルダ
datasets_path = base_path + 'datasets/intel-image-classification/archive.zip'          # データファイル

Keras の [Intel Image Classification](https://www.kaggle.com/puneet6060/intel-image-classification) の"Download"ボタンから archive.zip ファイルをダウンロードし、datasets_pathで指定したフォルダに保存してください。

## 前処理： データ取得 / 変換 / 分割

zipファイルから画像データを読み込み、リサイズ、正規化し、訓練データ/バリデーションデータ/テストデータに分割します。<br/>
archive.zipファイル内の画像データのパス名は以下の形式になっています。<br/>
<li>seg_train/seg_train/(label)/....jpg</li>
<li>seg_test/seg_test/(label)/....jpg</li>

In [None]:
import zipfile, io, re
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from sklearn.model_selection import train_test_split

def preprocessing(image_size, train_data_volume=0, test_data_volume=0):
    '''データセットを取得し前処理を行う
        parameters:
            image_size        - 画像サイズ。
            train_data_volume - １ラベルの訓練データ数の上限値。0にすると上限なし。
            test_data_volume  - １ラベルのテストデータ数の上限値。0にすると上限なし。
        returns:
            X_train, y_train, X_valid, y_valid, X_test, y_test, labels, num_classes
    '''

    # 訓練データをzipファイルから読み込む
    with zipfile.ZipFile(datasets_path) as z:
        # ラベルを抽出する
        labels = []
        for data_path in z.namelist():  # データファイルのリスト
            # data_pathからラベルの部分'([^/]*)'を抽出し、
            # 抽出したラベル match.group(1) がlabelsに含まれていないならば、
            # 抽出したラベルをlabelsに追加する。
            match = re.search('^seg_train/seg_train/([^/]*)/', data_path)
            if match and not(match.group(1) in labels): 
                labels.append(match.group(1))
        labels.sort()
        num_classes = len(labels)
        print('labels:', labels)

        # 画像データを読み込み加工する。ラベルを対応づける。
        X_train, y_train = get_imgs_with_labels(
            z, 'seg_train/seg_train/', labels, image_size, train_data_volume)

    # テストデータをzipファイルから読み込む。
    with zipfile.ZipFile(datasets_path) as z:
        # 画像データを読み込み加工する。ラベルを対応づける。
        X_test, y_test = get_imgs_with_labels(
            z, 'seg_test/seg_test/', labels, image_size, test_data_volume)
        
    # 正規化
    X_train = X_train.astype('float32') / 255
    X_test  = X_test .astype('float32') / 255

    # ラベルをone-hot-vector化する。
    y_train = np.identity(num_classes, dtype=int)[y_train]
    y_test  = np.identity(num_classes, dtype=int)[y_test ]

    # trainデータをtrainデータとvalidationデータに分割する。
    X_train, X_valid, y_train, y_valid = train_test_split(
        X_train, y_train, random_state = 0,
        stratify = y_train, test_size = 0.2 )

    # データの確認
    print(X_train.shape, y_train.shape)
    print(X_valid.shape, y_valid.shape)
    print(X_test .shape, y_test .shape)

    i = 0
    plt.title(labels[y_train[i].argmax()])
    plt.imshow(X_train[i])
    plt.show()
    
    return X_train, y_train, X_valid, y_valid, X_test, y_test, labels, num_classes

In [None]:
def get_imgs_with_labels(z, src_dir, labels, image_size, data_volume=0):
    ''' 画像データを読み込み加工する。ラベルを対応づける。
        parameters:
            z           - 読み込み元のZipFileオブジェクト
            src_dir     - 画像データの抽出元ディレクトリ
            labels      - ラベルのリスト
            image_size  - リサイズ後の画像サイズ
            data_volume - 各ラベルに対応するデータ数の上限値。0にすると上限なし。
        returns:
            X, y
    '''
    # 画像データのパスとラベルのリストを作る
    data_paths = z.namelist()  # データファイルのリスト
    img_paths = []
    y = []
    for i, label in enumerate(labels):
        src_path_pattern = '^' + src_dir + label + '/.*jpg$'
        c = 0
        for data_path in data_paths:
            if re.search(src_path_pattern, data_path):
                img_paths.append(data_path)
                y.append(i)
                c += 1
                if c == data_volume: break

    # 画像データを読み込み、リサイズし、配列に変換する。
    X = []
    for img_path in img_paths:
        image = Image.open(io.BytesIO(z.read(img_path))) # 読み込み
        image = image.convert('RGB') # RGB変換
        image = image.resize((image_size, image_size)) # リサイズ
        data = np.asarray(image) # 配列に変換
        X.append(data)

    X = np.array(X)
    y = np.array(y)
    return X, y

In [None]:
image_size        = 150
train_data_volume = 100  # 全データを使用する場合は0を指定する
test_data_volume  = 100  # 全データを使用する場合は0を指定する
X_train, y_train, X_valid, y_valid, X_test, y_test, labels, num_classes \
    = preprocessing(image_size, train_data_volume, test_data_volume)


## 共通の関数の定義

In [None]:
# 機械学習関連のパッケージ
from keras.models import Model, load_model
from keras.layers.core import Dense
from keras.layers.pooling import GlobalAveragePooling2D
from keras.layers.normalization import BatchNormalization
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from keras.optimizers import Adam

# fine tuningに用いる学習済みモデル
from keras.applications.vgg16 import VGG16
from keras.applications.resnet50 import ResNet50
from keras.applications.xception import Xception
from keras.applications.inception_resnet_v2 import InceptionResNetV2
from keras.applications.mobilenet_v2 import MobileNetV2

In [None]:
# fine-tuning する model を構築する
def build_FT_model(base_model, num_classes):
    # bese_model に top_model を追加する
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(1024, activation='relu')(x)
    predictions = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs=base_model.input, outputs=predictions)

    model.summary()  # モデル構造を確認
    print('total number of layers', len(model.layers))
    
    return model

In [None]:
# fine-tuning する model のコンパイル／学習／評価を行う
model_dir = base_path + 'model/'  # モデルを保存するフォルダ

def eval_FT_model(
        model, model_name, 
        X_train, y_train, X_valid, y_valid, X_test, y_test, 
        n_frozen_layers=0, lr=1e-3):

    # 学習させるlayerを指定する
    for i in range(len(model.layers)):
        # 前から(n_frozen_layers)個のlayerのパラメーターは学習不可にする。
        # ただし、Batch Normalization layerは全て学習可のままにする。
        model.layers[i].trainable = \
            (i >= n_frozen_layers) or \
            isinstance(model.layers[i], BatchNormalization)

    # コンパイル
    model.compile(optimizer=Adam(lr=lr),
        loss='categorical_crossentropy', metrics=['accuracy'] )
    
    # Data Augmentation の方法を定義
    datagen = ImageDataGenerator(
        width_shift_range=0.1, height_shift_range=0.1,
        horizontal_flip=True, vertical_flip=False )

    # 学習
    hist = model.fit_generator(
        datagen.flow(X_train, y_train, batch_size=32),
        steps_per_epoch=X_train.shape[0] // 32,
        epochs=50,
        validation_data=(X_valid, y_valid),
        callbacks=[
            EarlyStopping(monitor='val_loss', patience=10, verbose=1),
            ReduceLROnPlateau(monitor='val_loss', 
                              factor=0.25, patience=4, verbose=1)],
        shuffle=True, verbose=1 )

    # 学習曲線のプロット
    learning_plot(model_name, hist.history)

    # モデル評価
    last_score = model.evaluate(X_test, y_test, verbose = 1)
    print(f'test loss for the last model: {last_score[0]:.4f}')
    print(f'test acc  for the last model: {last_score[1]:.2%}')

    model.save(f'{model_dir}model_{model_name}_last.hdf5')

In [None]:
# 学習曲線のプロット
def learning_plot(model_name, history):
    plt.figure(figsize = (18,6))

    # loss
    plt.subplot(1, 2, 1)
    plt.plot(history['loss'],     label='loss',     marker='o')
    plt.plot(history['val_loss'], label='val_loss', marker='o')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.title(model_name)
    plt.legend(loc='best')
    plt.grid(color='gray', alpha=0.2)

    # accuracy
    plt.subplot(1, 2, 2)
    plt.plot(history['accuracy'],     label='acc',     marker='o')
    plt.plot(history['val_accuracy'], label='val_acc', marker='o')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.title(model_name)
    plt.legend(loc='best')
    plt.grid(color='gray', alpha=0.2)

    plt.show()

## VGG16

In [None]:
# モデル読み込み
base_model = VGG16(
    include_top=False, weights='imagenet',
    input_shape=(image_size, image_size, 3))

# fine-tuning model を構築する
model = build_FT_model(base_model, num_classes)

In [None]:
# モデルの評価
model_name      = 'VGG16'
n_frozen_layers = 1 # 凍結するlayer数
lr              = 1e-4
eval_FT_model(
    model, model_name, 
    X_train, y_train, X_valid, y_valid, X_test, y_test, 
    n_frozen_layers=n_frozen_layers, lr=lr)

## ResNet50

In [None]:
# モデル読み込み
base_model = ResNet50(
    include_top=False, weights='imagenet',
    input_shape=(image_size, image_size, 3))

# fine-tuning model を構築する
model = build_FT_model(base_model, num_classes)

In [None]:
# モデルの評価
model_name      = 'ResNet50'
n_frozen_layers = 143 # 凍結するlayer数
lr              = 1e-3
eval_FT_model(
    model, model_name, 
    X_train, y_train, X_valid, y_valid, X_test, y_test, 
    n_frozen_layers=n_frozen_layers, lr=lr)

## Xception

In [None]:
# モデル読み込み
base_model = Xception(
    include_top=False, weights='imagenet',
    input_shape=(image_size, image_size, 3))

# fine-tuning model を構築する
model = build_FT_model(base_model, num_classes)

In [None]:
# モデルの評価
model_name      = 'Xception'
n_frozen_layers = 26 # 凍結するlayer数
lr              = 1e-4
eval_FT_model(
    model, model_name, 
    X_train, y_train, X_valid, y_valid, X_test, y_test, 
    n_frozen_layers=n_frozen_layers, lr=lr)

## InceptionResNetV2

In [None]:
# モデル読み込み
base_model = InceptionResNetV2(
    include_top=False, weights='imagenet',
    input_shape=(image_size, image_size, 3))

# fine-tuning model を構築する
model = build_FT_model(base_model, num_classes)

In [None]:
# モデルの評価
model_name      = 'InceptionResNetV2'
n_frozen_layers = 367 # 凍結するlayer数
lr              = 1e-3
eval_FT_model(
    model, model_name, 
    X_train, y_train, X_valid, y_valid, X_test, y_test, 
    n_frozen_layers=n_frozen_layers, lr=lr)

## MobileNetV2

In [None]:
# モデル読み込み
base_model = MobileNetV2(
    include_top=False,
    weights='imagenet',
    input_shape=(image_size, image_size, 3))

# fine-tuning model を構築する
model = build_FT_model(base_model, num_classes)

In [None]:
# モデルの評価
model_name      = 'MobileNetV2'
n_frozen_layers = 37 # 凍結するlayer数
lr              = 1e-4
eval_FT_model(
    model, model_name, 
    X_train, y_train, X_valid, y_valid, X_test, y_test, 
    n_frozen_layers=n_frozen_layers, lr=lr)

## モデル予測のデモンストレーション

In [None]:
import random

def pred_model(model_name, X_test, y_test):
    
    # モデル読み込み
    model = load_model(f'{model_dir}model_{model_name}_last.hdf5')
    
    # モデル評価
    score = model.evaluate(X_test, y_test, verbose = 0)
    print(f'test loss for the last model: {score[0]:.4f}')
    print(f'test acc  for the last model: {score[1]:.2%}')
    
    # ランダムに30件のデータを抽出する
    samples = random.sample(range(len(X_test)), 30)
    X_samples = X_test[samples]
    y_samples = y_test[samples]

    # 正解ラベル、予測ラベル、予測確率を算出
    true_label_idx = np.argmax(y_samples, axis=1) # 正解ラベル
    pred_label_idx = np.argmax(model.predict(X_samples), axis=1) # 予測ラベル
    pred_probs     = np.max   (model.predict(X_samples), axis=1)
    pred_probs     = [f'{i:.4f}' for i in pred_probs]  # 予測確率

    # 表示
    plt.figure(figsize = (16, 16))
    for i in range(30):
        plt.subplot(5, 6, i+1)
        plt.axis('off')
        plt.title(
            f'pre : {labels[pred_label_idx[i]]}\n' +
            f'      {pred_probs[i]}\n' +
            f'true: {labels[true_label_idx[i]]}',
            color=('black' if pred_label_idx[i] == true_label_idx[i] else 'red' ) )
        plt.imshow(X_samples[i])
    plt.show()

In [None]:
model_name = 'VGG16'
pred_model(model_name, X_test, y_test)