In [None]:
## このプログラムは「CIFAR-10 - Object Recognition in Images」において
## 作成したノートブックで動作します
## ローカル環境のJupyter Notebookで作成したノートブックでも動作可能です

import numpy as np
from keras.datasets import cifar10
from keras.utils import to_categorical

def prepare_data():
    """データを用意する
    
    Returns:
    X_train(ndarray):
        訓練データ(50000.32.32.3)
    X_test(ndarray):
        テストデータ(10000.32.32.3)
    y_train(ndarray):
        訓練データのOne-Hot化した正解ラベル(50000,10)
    y_train(ndarray):
        テストデータのOne-Hot化した正解ラベル10000,10)
    y_test_label(ndarray):
        テストデータの正解ラベル(10000)
    """
    (X_train, y_train), (X_test, y_test) = cifar10.load_data()
    
    # 訓練用とテスト用の画像データを標準化する
    # 4次元テンソルのすべての軸方向に対して平均、標準偏差を求めるので
    # axis=(0,1,2,3)は省略してもよい
    mean = np.mean(X_train,axis=(0,1,2,3))
    std = np.std(X_train,axis=(0,1,2,3))
    # 標準化する際に分母の標準偏差に極小値を加える
    x_train = (X_train-mean)/(std+1e-7)
    x_test = (X_test-mean)/(std+1e-7)
    
    # テストデータの正解ラベルを2階テンソルから1階テンソルへフラット化
    y_test_label = np.ravel(y_test)
    # 訓練データとテストデータの正解ラベルをOne-Hot表現に変換(10クラス化)
    y_train, y_test = to_categorical(y_train), to_categorical(y_test)
    
    return X_train, X_test, y_train, y_test, y_test_label 

In [None]:
from keras.layers import Input, Conv2D, Dense, Activation
from keras.layers import AveragePooling2D, GlobalAvgPool2D
from keras.layers import BatchNormalization
from keras import regularizers
from keras.models import Model

def make_convlayer(input, fsize, layers):
    """畳み込み層を生成する

       Parameters: inp(Input): 入力層
                   fsize(int): フィルターのサイズ
                   layers(int) : 層の数
       Returns:
         Conv2Dを格納したTensorオブジェクト
    """
    x = input
    for i in range(layers):
        x =Conv2D(
            filters=fsize,
            kernel_size=3,
            padding="same")(x)
        x = BatchNormalization()(x)
        x = Activation("relu")(x)
    return x

def create_model():
    """モデルを生成する

    Returns:
      Conv2Dを格納したModelオブジェクト
    """
    input = Input(shape=(32,32,3))
    x = make_convlayer(input, 64, 3)
    x = AveragePooling2D(2)(x)
    x = make_convlayer(x, 128, 3)
    x = AveragePooling2D(2)(x)
    x = make_convlayer(x, 256, 3)
    x = GlobalAvgPool2D()(x)
    x = Dense(10, activation="softmax")(x)

    model = Model(input, x)
    return model

In [None]:
from scipy.stats import mode

def ensemble_majority(models, X):
    """多数決をとるアンサンブル

    Parameters:
        models(list): Modelオブジェクトのリスト
        X(array): 検証用のデータ
    Returns:
        各画像の正解ラベルを格納した(10000)のnp.ndarray
    """
    # (データ数,モデル数)のゼロ行列を作成
    pred_labels = np.zeros((X.shape[0],   # 行数は画像の枚数と同じ
                            len(models))) # 列数はモデルの数
    # modelsからインデックス値と更新をフリーズされたモデルを取り出す
    for i, model in enumerate(models):
        # モデルごとの予測確率(データ数,クラス数)の各行(axis=1)から
        # 最大値のインデックスをとって、(データ数,モデル数)の
        # モデル列の各行にデータの数だけ格納する
        pred_labels[:, i] = np.argmax(model.predict(X), axis=1)
    # mode()でpred_labelsの各行の最頻値のみを[0]指定で取得する
    # (データ数,1)の形状をravel()で(,データ数)の形状にフラット化する    
    return np.ravel(mode(pred_labels, axis=1)[0])

In [None]:
from tensorflow.keras.callbacks import Callback

class Checkpoint(Callback):
    """Callbackのサブクラス
    
    Attributes:
        model(object): 学習中のModelオブジェクト
        filepath(str): 重みを保存するファイルのパス
        best_val_acc : 最高精度を保持する
    """
    def __init__(self, model, filepath):
        """
        Parameters:
            model(Model): 現在実行中のModelオブジェクト
            filepath(str): 重みを保存するファイルのパス
            best_val_acc(int): 1モデルの最も高い精度を保持
        """
        self.model = model
        self.filepath = filepath
        self.best_val_acc = 0.0

    def on_epoch_end(self, epoch, logs):
        """エポック終了時に呼ばれるメソッドをオーバーライド
        
        これまでのエポックより精度が高い場合は重みをファイルに保存する
        
        Parameters:
            epoch(int): エポックの回数
            logs(dict): {'val_acc':損失, 'val_acc':精度}
        """
        if self.best_val_acc < logs['val_acc']:
            # 前回のエポックより精度が高い場合は重みを保存する
            self.model.save_weights(self.filepath)  # ファイルパス
            # 精度をlogsに保存
            self.best_val_acc = logs['val_acc']
            # 重みが保存されたことを精度と共に通知する
            print('Weights saved.', self.best_val_acc)


In [None]:
import math
import pickle
import numpy as np

from sklearn.metrics import accuracy_score

from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import LearningRateScheduler
from keras.callbacks import History

def train(X_train, X_test, y_train, y_test, y_test_label):
    """
    学習を行う
    
    Parameters:
        X_train(ndarray): 訓練データ
        X_test(ndarray): 訓練データの正解ラベル
        y_train(ndarray): テストデータ
        y_test(ndarray): テストデータの正解ラベル(One-Hot表)
        y_test_label(ndarray): テストデータの正解ラベル
    """
    models_num  = 5   # アンサンブルするモデルの数
    batch_size = 1024 # ミニバッチの数
    epoch = 80        # エポック数   
    models = []       # モデルを格納するリスト
    # 各モデルの学習履歴を保持するdict
    history_all = {"hists":[], "ensemble_test":[]}
    # 各モデルの推測結果を登録する2階テンソルを0で初期化
    # (データ数, モデル数)
    model_predict = np.zeros((X_test.shape[0], # 行数は画像の枚数
                             models_num))      # 列数はモデルの数

    # モデルの数だけ繰り返す
    for i in range(models_num):
        # 何番目のモデルかを表示
        print('Model',i+1)
        # CNNのモデルを生成
        train_model = create_model()
        # モデルをコンパイルする
        train_model.compile(optimizer='adam',
                            loss='categorical_crossentropy',
                            metrics=["acc"])
        # コンパイル後のモデルをリストに追加
        models.append(train_model)

        # コールバックに登録するHistoryオブジェクトを生成
        hist = History()
        # コールバックに登録するCheckpointオブジェクトを生成
        cpont = Checkpoint(train_model,       # Modelオブジェクト
                           f'weights_{i}.h5') # 重みを保存するファイル名
        # ステップ減衰関数
        def step_decay(epoch):
            initial_lrate = 0.001 # ベースにする学習率
            drop = 0.5            # 減衰率
            epochs_drop = 10.0    # ステップ減衰は10エポックごと
            lrate = initial_lrate * math.pow(
                drop,
                math.floor((1+epoch)/epochs_drop)
            )
            return lrate
            
        lrate = LearningRateScheduler(step_decay) # スケジューラ―オブジェクト
        
        # データ拡張
        datagen = ImageDataGenerator(
            rotation_range=15,      # 15度の範囲でランダムに回転させる
            width_shift_range=0.1,  # 横サイズの0.1の割合でランダムに水平移動
            height_shift_range=0.1, # 縦サイズの0.1の割合でランダムに垂直移動
            horizontal_flip=True,   # 水平方向にランダムに反転、左右の入れ替え
            zoom_range=0.2,         # ランダムに拡大
            )

        # 学習を行う
        train_model.fit_generator(
            datagen.flow(X_train,
                         y_train,
                         batch_size=batch_size),
            epochs=epoch,
            steps_per_epoch=X_train.shape[0] // batch_size,
            validation_data=(X_test, y_test),
            verbose=1,
            callbacks=[hist, cpont, lrate] # コールバック
            )       

        # 学習に用いたモデルで最も精度が高かったときの重みを読み込む
        train_model.load_weights(f'weights_{i}.h5')
        
        # 対象のモデルのすべての重み更新をフリーズする
        for layer in train_model.layers:
            layer.trainable = False

        # テストデータで推測し、各画像ごとにラベルの最大値を求め、
        # 対象のインデックスを正解ラベルとしてmodel_predictのi列に格納
        model_predict[:, i] = np.argmax(train_model.predict(X_test),
                                       axis=-1) # 行ごとの最大値を求める

        # 学習に用いたモデルの学習履歴をhistory_allのhistsキーに登録
        history_all['hists'].append(hist.history)
        
        # 多数決のアンサンブルを実行
        ensemble_test_pred = ensemble_majority(models, X_test)
        
        # scikit-learn.accuracy_score()でアンサンブルによる精度を取得
        ensemble_test_acc = accuracy_score(y_test_label, ensemble_test_pred)
        
        # アンサンブルの精度をhistory_allのensemble_testキーに追加
        history_all['ensemble_test'].append(ensemble_test_acc)
        # 現在のアンサンブルの精度を出力
        print('Current Ensemble Accuracy : ', ensemble_test_acc)

    history_all['corrcoef'] = np.corrcoef(model_predict,
                                          rowvar=False) # 列ごとの相関係数を求める
    print('Correlation predicted value')
    print(history_all['corrcoef'])

In [None]:
# 実行部

# データを用意する
X_train, X_test, y_train, y_test, y_test_label  = prepare_data()

# アンサンブルを実行
train(X_train, X_test, y_train, y_test, y_test_label)