In [None]:
import tensorflow as tf
import keras
from keras.datasets import fashion_mnist
from keras.models import Sequential
from keras.layers import Dense, Dropout, Conv2D, Flatten
from keras.optimizers import SGD
#from keras.callbacks import EarlyStopping
from keras.utils import np_utils
from sklearn.utils import shuffle

import matplotlib.pyplot as plt

In [None]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()

# 訓練データ
x_train = x_train.reshape(-1, 28, 28, 1) # 4階のテンソルに変換
x_train = x_train.astype('float32')
x_train /= 255
class_num = 10
# 正解ラベルをOne-Hot表現に変換
y_train = tf.keras.utils.to_categorical(y_train, class_num)

# テストデータ
x_test = x_test.reshape(-1, 28, 28, 1) # 4階のテンソルに変換
x_test = x_test.astype('float32')
x_test /= 255
# 正解ラベルをOne-Hot表現に変換
y_test = tf.keras.utils.to_categorical(y_test, class_num)

In [None]:
class CNN(tf.keras.Model):
    '''
    Attributes:
        conv2D_1(Conv2D):
        conv2D_2(Conv2D):
        flatten(Flatten):
        dorpout(Dropout):
        d1(Dence):
    '''
    def __init__(self):
        super().__init__()
        self.conv2D_1 = tf.keras.layers.Conv2D(
            filters=32,
            kernel_size=(3,3),
            padding='same',
            input_shape=(28, 28, 1),
            activation='relu'
        )
        self.conv2D_2 = tf.keras.layers.Conv2D(
            filters=64,
            kernel_size=(3,3),
            padding='same',
            activation='relu'
        )

        self.flatten = tf.keras.layers.Flatten()
        self.dropout1 = tf.keras.layers.Dropout(0.5)
        self.d1 = tf.keras.layers.Dense(10, activation='softmax')

    @tf.function #編集できない関数に機能を追加
    def call(self, x, training=None):
        '''CNNのインスタンスからコールバックされる関数
        Parametes:x(ndarray(float32)):訓練データ
        Returns(float32):CNNの出力として要素数3の1階のテンソル
        '''
        x = self.conv2D_1(x)
        x = self.conv2D_2(x)
        x = self.flatten(x)
        if training:
            x = self.dropout1(x)
        x = self.d1(x)
        return x
        
model = CNN()

In [None]:
# セル3
loss_fn = tf.keras.losses.CategoricalCrossentropy()
optimizer =tf.keras.optimizers.SGD(learning_rate=0.1)

In [None]:
# セル4
train_loss = tf.keras.metrics.Mean()
train_accuracy = tf.keras.metrics.CategoricalAccuracy()

@tf.function
def train_step(x, t):
    with tf.GradientTape() as tape:
        outputs = model(x, training=True)
        tmp_loss = loss_fn(t, outputs)

    grads = tape.gradient(
        tmp_loss,
        model.trainable_variables)
    
    optimizer.apply_gradients(zip(grads, model.trainable_variables))
    train_loss(tmp_loss)
    train_accuracy(t, outputs)

In [None]:
# セル5
val_loss = tf.keras.metrics.Mean()
val_accuracy = tf.keras.metrics.CategoricalAccuracy()

@tf.function
def valid_step(val_x, val_y):
    pred = model(val_x, training=False)
    tmp_loss = loss_fn(val_y, pred)
    val_loss(tmp_loss)
    val_accuracy(val_y, pred)

In [None]:
# セル6
class EarlyStopping:
    def __init__(self, patience=10, verbose=0):
        self.epoch = 0
        self.pre_loss = float('inf')
        self.patience = patience
        self.verbose = verbose

    def __call__(self, current_loss):
        if self.pre_loss < current_loss:
            self.epoch += 1
            if self.epoch > self.patience:
                if self.verbose:
                    print('early stopping')
                return True
        else:
            self.epoch = 0
            self.pre_loss = current_loss

        return False

In [None]:
# セル7
from sklearn.model_selection import train_test_split
tr_x, val_x, tr_y, val_y = train_test_split(x_train, y_train, test_size=0.2)

In [None]:
epochs = 100
batch_size = 64

tr_steps = tr_x.shape[0]
val_steps = val_x.shape[0]

history = {'loss':[], 'accuracy':[], 'val_loss':[], 'val_accuracy':[]}

ers = EarlyStopping(patience=5,
                    verbose=1)

for epoch in range(epochs):
    train_loss.reset_states()
    train_accuracy.reset_states()
    val_loss.reset_states()
    val_accuracy.reset_states()
    
    x_, y_ = shuffle(tr_x, tr_y, )

    for step in range(tr_steps):
        start = step * batch_size
        end = start + batch_size
        train_step(x_[start:end], y_[start:end])
    
    for step in range(val_steps):
        start = step * batch_size
        end = start + batch_size
        valid_step(val_x[start:end], val_y[start:end])
    
    avg_train_loss = train_loss.result()
    avg_train_acc = train_accuracy.result()
    avg_val_loss = val_loss.result()
    avg_val_acc = val_accuracy.result()
    
    history['loss'].append(avg_train_loss)
    history['val_loss'].append(avg_val_loss)
    history['accuracy'].append(avg_train_acc)
    history['val_accuracy'].append(avg_val_acc)
    
    if(epoch+1)%1 == 0:
        print(
            'epoch({}) train_loss: {:.4} train_acc: {:.4} val_loss: {:.4} val_acc: {:.4}'.format(
                epoch+1,
                avg_train_loss,
                avg_train_acc,
                avg_val_loss,
                avg_val_acc
            )
        )
    
    if ers(val_loss.result()):
        break

model.summary()




In [None]:
# 訓練データの損失
plt.plot(history['loss'],
            marker='.',
            label='loss (Training)')
plt.plot(history['val_loss'],
            marker='.',
            label='loss (Validation)')
plt.legend(loc='best')
plt.grid()
plt.xlabel('epoch')
plt.ylabel('loss')
plt.show()