In [1]:
'''
1. データセットの読み込み
'''
# tensorflow.keras のインポート
from tensorflow import keras

# CIFAR-10データセットの読み込み
(x_train, t_train), (x_test, t_test) = keras.datasets.cifar10.load_data()

In [2]:
'''
2.モデルの作成
'''
class CNN(keras.Model):
    '''畳み込みニューラルネットワーク
    
    Attributes:
      l1(Dense): 隠れ層
      l2(Dense): 出力層
    '''
    def __init__(self, output_dim):
        '''
        Parameters:
          output_dim(int): 出力層のユニット数(次元)
          
        Attributes:
          c1(Conv2D): 畳み込み層1
          p1(MaxPooling2D): プーリング層1
          c2(Conv2D): 畳み込み層2
          p2(MaxPooling2D): プーリング層2
          c3(Conv2D): 畳み込み層3
          p3(MaxPooling2D): プーリング層3
          f1(Flatten): Flatten
          d1(Dropout): ドロップアウト        
          l1(Dense): 全結合層          
          l2(Dense): 出力層
        '''
        super().__init__()
        
        # 正則化の係数
        weight_decay = 1e-4

        # (第1層)畳み込み層1 正則化を行う
        self.c1 = keras.layers.Conv2D(
            filters=32,                   # フィルター数32
            kernel_size=(3, 3),           # 3×3のフィルター
            padding='same',               # ゼロパディング
            input_shape=x_train[0].shape, # 入力データの形状
            kernel_regularizer=keras.regularizers.l2(
                weight_decay), # 正則化
            activation='relu'             # 活性化関数はReLU
            )

        # (第2層)プーリング層1：ウィンドウサイズは2×2
        self.p1 = keras.layers.MaxPooling2D(
            pool_size=(2, 2))             # 縮小対象の領域は2×2

        # (第3層)畳み込み層2　正則化を行う
        self.c2 = keras.layers.Conv2D(
            filters=128,                  # フィルターの数は128
            kernel_size=(3, 3),           # 3×3のフィルターを使用
            padding='same',               # ゼロパディングを行う
            kernel_regularizer=keras.regularizers.l2(
                weight_decay),           # 正則化
            activation='relu'             # 活性化関数はReLU
            )
 
        # (第4層)プーリング層2：ウィンドウサイズは2×2
        self.p2 = keras.layers.MaxPooling2D(
            pool_size=(2, 2))             # 縮小対象の領域は2×2       
        
        # (第5層)畳み込み層3　正則化を行う
        self.c3 = keras.layers.Conv2D(
            filters=256,                  # フィルターの数は256
            kernel_size=(3, 3),           # 3×3のフィルターを使用
            padding='same',               # ゼロパディングを行う
            kernel_regularizer=keras.regularizers.l2(weight_decay), # 正則化
            activation='relu'             # 活性化関数はReLU
            )

        # (第6層)プーリング層3：ウィンドウサイズは2×2
        self.p3 = keras.layers.MaxPooling2D(
            pool_size=(2, 2))             # 縮小対象の領域は2×2
       
        # Flaten
        # ニューロン数＝4×4×256=4,096
        # (4, 4, 256)を(,4096)にフラット化
        self.f1 = keras.layers.Flatten()

        # ドロップアウト：ドロップアウトは40％
        self.d1 = keras.layers.Dropout(0.4)

        # （第7層）全結合層
        self.l1 =  keras.layers.Dense(
            512,                          # 出力層のニューロン数は512
            activation='relu')            # 活性化関数はReLU

        # （第8層）出力層
        self.l2 =  keras.layers.Dense(
            10,                           # 出力層のニューロン数は10
            activation='softmax')         # 活性化関数はソフトマックス
        
        # すべての層をリストにする
        self.ls = [self.c1, self.p1, self.c2, self.p2, self.c3, self.p3,
                   self.f1, self.d1, self.l1, self.l2]

    def call(self, x):
        '''CNNのインスタンスからコールバックされる関数
        
        Parameters: x(ndarray(float32)):訓練データ、または検証データ
        Returns(float32): CNNの出力として要素数10の1階テンソル        
        '''
        for layer in self.ls:
            x = layer(x)
        
        return x

In [3]:
'''
3.損失関数の定義
'''
# 損失関数はスパースラベル対応クロスエントロピー誤差
cce = keras.losses.SparseCategoricalCrossentropy()

def loss(t, y):
    '''損失関数
    Parameters: t(ndarray(float32)):正解ラベル
                y(ndarray(float32)):予測値
                
    Returns: クロスエントロピー誤差
    '''
    return cce(t, y)

In [4]:
'''
4.オプティマイザーのオブジェクトの生成
  損失と精度を測定するオブジェクトの生成
'''
# 勾配降下アルゴリズムを使用するオプティマイザーを生成
optimizer = keras.optimizers.SGD(learning_rate=0.1)

# 損失を記録するオブジェクトを生成
train_loss = keras.metrics.Mean()
# 精度を記録するオブジェクトを生成
train_acc = keras.metrics.SparseCategoricalAccuracy()
# 検証時の損失を記録するオブジェクトを生成
val_loss = keras.metrics.Mean()
# 検証時の精度を記録するオブジェクトを生成
val_acc = keras.metrics.SparseCategoricalAccuracy()

In [5]:
'''
5.勾配降下アルゴリズムによるパラメーターの更新処理
'''
import tensorflow as tf

def train_step(x, t):
    '''学習を1回行う
    
    Parameters: x(ndarray(float32)):訓練データ
                t(ndarray(float32)):正解ラベル
                
    Returns:
      ステップごとのクロスエントロピー誤差
    '''
    # 自動微分による勾配計算を記録するブロック
    with tf.GradientTape() as tape:
        # モデルに入力して順伝播の出力値を取得
        outputs = model(x)
        # 出力値と正解ラベルの誤差
        tmp_loss = loss(t, outputs)
        
    # tapeに記録された操作を使用して誤差の勾配を計算        
    grads = tape.gradient(
        # 現在のステップの誤差
        tmp_loss,
        # バイアス、重みのリストを取得
        model.trainable_variables)
    # 勾配降下法の更新式を適用してバイアス、重みを更新
    optimizer.apply_gradients(zip(grads,
                                  model.trainable_variables))
    
    # 損失をMeanオブジェクトに記録
    train_loss(tmp_loss)
    # 精度をCategoricalAccuracyオブジェクトに記録
    train_acc(t, outputs)

In [6]:
'''
6. 検証データによる評価を行う関数
'''
def val_step(x, t):
    '''検証データをモデルに入力して損失と精度を測定
    
    Parameters: x(ndarray(float32)):検証データ
                t(ndarray(float32)):正解ラベル
    '''
    # 検証データの予測値を取得
    preds = model(x)
    # 出力値と正解ラベルの誤差
    tmp_loss = loss(t, preds)
    # 損失をMeanオブジェクトに記録
    val_loss(tmp_loss)
    # 精度をSpase_CategoricalAccuracyオブジェクトに記録
    val_acc(t, preds)

In [7]:
%%time
'''
7.モデルを生成して学習する
'''
from sklearn.utils import shuffle

# エポック数
epochs = 100
# ミニバッチのサイズ
batch_size = 64
# ステップ数
train_steps = len(x_train)*0.8 // batch_size
# ステップ数
val_steps = len(x_train)*0.2 // batch_size

# 出力層10ニューロンのモデルを生成
model = CNN(10)

# ImageDataGeneratorを生成
datagen = keras.preprocessing.image.ImageDataGenerator(
    rescale=1.0/255.0,      # ピクセル値を255で割って正規化する
    validation_split=0.2,   # 20パーセントのデータを検証用にする
    rotation_range=15,      # 15度の範囲でランダムに回転させる
    width_shift_range=0.1,  # 横サイズの0.1の割合でランダムに水平移動
    height_shift_range=0.1, # 縦サイズの0.1の割合でランダムに垂直移動
    horizontal_flip=True,  # 水平方向にランダムに反転、左右の入れ替え
    zoom_range=0.2         # ランダムに拡大
)

# 訓練データ用のジェネレーターを生成
training_generator = datagen.flow(x_train, t_train,
                                  batch_size=batch_size,
                                  subset='training')      # 訓練用のデータを生成
# 検証データ用のジェネレーターを生成
validation_generator = datagen.flow(x_train, t_train,
                                    batch_size=batch_size,
                                    subset='validation') # 検証用のデータを生成

# 学習を行う
for epoch in range(epochs):
    # 訓練時のステップカウンター  
    step_counter = 0  
    # 1ステップ毎にミニバッチで学習する
    for x_batch, t_batch in training_generator:
        # ミニバッチでバイアス、重みを更新
        train_step(x_batch, t_batch)
        step_counter += 1
        # すべてのステップが終了したらbreak
        if step_counter >= train_steps:
            break
    
    # 検証時のステップカウンター
    v_step_counter = 0
    # 検証データによるモデルの評価
    for x_val_batch, t_val_batch  in validation_generator:
        # 検証データのミニバッチで損失と精度を測定
        val_step(x_val_batch, t_val_batch)
        v_step_counter += 1
        # すべてのステップが終了したらbreak
        if v_step_counter >= val_steps:
            break
   
    # 1エポックごとに結果を出力
    print('epoch({}) train_loss: {:.4} train_acc: {:.4} '
          'val_loss: {:.4} val_acc: {:.4}'.format(
              epoch+1,
              train_loss.result(), # 訓練データの損失を出力
              train_acc.result(),  # 訓練データの精度を出力
              val_loss.result(),   # 検証データの損失を出力
              val_acc.result()     # 検証データの精度を出力
              ))

epoch(1) train_loss: 1.93 train_acc: 0.3003 val_loss: 1.69 val_acc: 0.3708
epoch(2) train_loss: 1.728 train_acc: 0.3754 val_loss: 1.561 val_acc: 0.4281
epoch(3) train_loss: 1.607 train_acc: 0.4214 val_loss: 1.465 val_acc: 0.4669
epoch(4) train_loss: 1.517 train_acc: 0.4551 val_loss: 1.401 val_acc: 0.4959
epoch(5) train_loss: 1.443 train_acc: 0.4832 val_loss: 1.375 val_acc: 0.5055
epoch(6) train_loss: 1.38 train_acc: 0.5065 val_loss: 1.319 val_acc: 0.5277
epoch(7) train_loss: 1.325 train_acc: 0.5268 val_loss: 1.277 val_acc: 0.5438
epoch(8) train_loss: 1.276 train_acc: 0.5453 val_loss: 1.232 val_acc: 0.5608
epoch(9) train_loss: 1.232 train_acc: 0.5617 val_loss: 1.193 val_acc: 0.5753
epoch(10) train_loss: 1.193 train_acc: 0.5759 val_loss: 1.167 val_acc: 0.5857
epoch(11) train_loss: 1.158 train_acc: 0.5889 val_loss: 1.144 val_acc: 0.5949
epoch(12) train_loss: 1.125 train_acc: 0.6006 val_loss: 1.117 val_acc: 0.6046
epoch(13) train_loss: 1.096 train_acc: 0.6116 val_loss: 1.093 val_acc: 0.613

In [8]:
'''
8. テストデータによるモデルの評価
'''
# 損失を記録するオブジェクトを生成
test_loss = keras.metrics.Mean()
# 精度を記録するオブジェクトを生成
test_acc = keras.metrics.SparseCategoricalAccuracy()

def test_step(x, t):
    '''テストデータをモデルに入力して損失と精度を測定
    
    Parameters: x(ndarray(float32)):テストデータ
                t(ndarray(float32)):正解ラベル
    '''
    # テストデータの予測値を取得
    preds = model(x)
    # 出力値と正解ラベルの誤差
    tmp_loss = loss(t, preds)
    # 損失をMeanオブジェクトに記録
    test_loss(tmp_loss)
    # 精度をSpaseCategoricalAccuracyオブジェクトに記録
    test_acc(t, preds)

# テストデータを正規化
x_test =  x_test.astype('float32') / 255
# テストデータで予測して損失と精度を取得
test_step(x_test, t_test)

print('test_loss: {:.4f}, test_acc: {:.4f}'.format(
    test_loss.result(),
    test_acc.result()
))

test_loss: 0.8192, test_acc: 0.8229
