In [1]:
# !pip install --upgrade tensorflow

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.utils import to_categorical

import numpy as np
import matplotlib.pyplot as plt
import os
from scipy import ndimage
import time

print(tf.__version__)
print(keras.__version__)

2.1.0
2.2.4-tf


## 1. Set hyper parameters

In [0]:
lr = 0.001
train_epoch = 12
bsize = 80

tf.random.set_seed(777)

#### + Create a checkpoint directory

In [0]:
cur_dir = os.getcwd()
ckpt_dir_name = 'checkpoints'
model_dir_name = 'mnist_cnn_best'

ckpt_dir = os.path.join(cur_dir, ckpt_dir_name, model_dir_name)
ckpt_prefix = os.path.join(ckpt_dir_name, model_dir_name)

os.makedirs(ckpt_dir, exist_ok=True)

## 2. Data Augmentation (rotate & shift)

In [0]:
def data_augmentation(images, labels):
  aug_images = []; aug_labels = []

  for image, label in zip(images, labels):
    aug_images.append(image)
    aug_labels.append(label)
    
    bg_value = np.median(image)
    for _ in range(4):
      angle = np.random.randint(-15, 15, 1)
      rot_image = ndimage.rotate(image, angle[0], reshape=False, cval=bg_value)

      shift = np.random.randint(-2, 2, 2)
      shift_image = ndimage.shift(rot_image, shift, cval=bg_value)
      
      aug_images.append(shift_image)
      aug_labels.append(label)

  aug_images = np.array(aug_images)
  aug_labels = np.array(aug_labels)
  return aug_images, aug_labels

## 3. Make a data pipelining

In [0]:
mnist = keras.datasets.mnist

(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train, y_train = data_augmentation(X_train, y_train)

X_train = X_train.astype(np.float32) / 255.
X_train = np.expand_dims(X_train, axis=-1)
y_train = to_categorical(y_train, num_classes=10)
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).\
                  shuffle(buffer_size=500000).batch(bsize)

X_test = X_test.astype(np.float32) / 255.
X_test = np.expand_dims(X_test, axis=-1)
y_test = to_categorical(y_test, num_classes=10)
test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test)).\
                  batch(bsize)

## 4. Build a neural network model

In [0]:
class ConvBNRelu(tf.keras.Model):
  def __init__(self, filters, kernel_size=3, strides=1, padding='same'):
    super(ConvBNRelu, self).__init__()
    self.conv = keras.layers.Conv2D(filters=filters, kernel_size=kernel_size,
                                    strides=strides, padding=padding, 
                                    kernel_initializer='glorot_normal')
    self.batchnorm = keras.layers.BatchNormalization()

  def call(self, inputs, training=False):
    layer = self.conv(inputs)
    layer = self.batchnorm(layer)
    layer = tf.nn.relu(layer)
    return layer

In [0]:
class DenseBNRelu(tf.keras.Model):
  def __init__(self, units):
    super(DenseBNRelu, self).__init__()
    self.dense = keras.layers.Dense(units=units, 
                                    kernel_initializer='glorot_normal')
    self.batchnorm = keras.layers.BatchNormalization()
  
  def call(self, inputs, training=False):
    layer = self.dense(inputs)
    layer = self.batchnorm(layer)
    layer = tf.nn.relu(layer)
    return layer

In [0]:
class MNISTModel(keras.Model):
  def __init__(self):
    super(MNISTModel, self).__init__()
    self.conv1 = ConvBNRelu(filters=32)
    self.pool1 = keras.layers.MaxPool2D(pool_size=2, strides=1,
                                        padding='same')
    self.conv2 = ConvBNRelu(filters=64)
    self.pool2 = keras.layers.MaxPool2D(pool_size=2, strides=1,
                                        padding='same')
    self.conv3 = ConvBNRelu(filters=128)
    self.pool3 = keras.layers.MaxPool2D(pool_size=2, strides=1,
                                        padding='same')
    self.pool3_flat = keras.layers.Flatten()

    self.dense4 = DenseBNRelu(units=256)
    self.drop4 = keras.layers.Dropout(rate=0.4)
    self.dense5 = keras.layers.Dense(units=10,
                                     kernel_initializer='glorot_normal')
  
  def call(self, inputs, training=False):
    net = self.conv1(inputs)
    net = self.pool1(net)
    net = self.conv2(net)
    net = self.pool2(net)
    net = self.conv3(net)
    net = self.pool3(net)
    net = self.pool3_flat(net)

    net = self.dense4(net)
    net = self.drop4(net)
    net = self.dense5(net)
    return net

In [0]:
models = []
num_models = 5
for _ in range(num_models):
  models.append(MNISTModel())

## 5. Define a loss function

In [0]:
def loss_fn(model, images, labels):
  logits = model(images, training=True)
  loss = tf.reduce_mean(
      tf.keras.losses.categorical_crossentropy(y_true=labels, 
                                          y_pred=logits, from_logits=True))
  return loss

## 6. Calculate a gradient

In [0]:
def grad(model, images, labels):
  with tf.GradientTape() as tape:
    loss = loss_fn(model, images, labels)
  return tape.gradient( loss, model.trainable_variables )

## 7. Select optimizer

In [0]:
lr_decay = keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=lr,
                                                       decay_steps=X_train.shape[0]/bsize*num_models*5,
                                                       decay_rate=0.5,
                                                       staircase=True)
optimizer = keras.optimizers.Adam(learning_rate=lr_decay)

## 8. Define a metric for model's performance

In [0]:
def evaluate(models, images, labels):
  predictions = np.zeros_like(labels)
  for model in models:
    logits = model(images, training=False)
    predictions += logits
  correct_predictions = tf.equal( tf.argmax(predictions, 1),
                                  tf.argmax(labels, 1) )
  accuracy = tf.reduce_mean(tf.cast(correct_predictions, dtype=tf.float32))
  return accuracy

## 9. Make a checkpoint for saving

In [0]:
checkpoints = []
for i in range(num_models):
  checkpoints.append(tf.train.Checkpoint(cnn=models[i]))

## 10. Train and Validate a neural network model

In [15]:
print("Learning started. It takes some time.")
start_time = time.time()

for epoch in range(train_epoch):
  avg_loss = 0.; avg_train_acc = 0.; avg_test_acc = 0.
  train_step = 0; test_step = 0

  for images, labels in train_dataset:
    for model in models:
      grads = grad(model, images, labels)
      optimizer.apply_gradients(zip(grads, model.trainable_variables))
      
      loss = loss_fn(model, images, labels)
      avg_loss += loss / num_models
    acc = evaluate(models, images, labels)
    avg_train_acc += acc
    train_step += 1
  avg_loss = avg_loss / train_step
  avg_train_acc = avg_train_acc / train_step

  for images, labels in test_dataset:
    acc = evaluate(models, images, labels)
    avg_test_acc += acc
    test_step += 1
  avg_test_acc = avg_test_acc / test_step

  print(f'Epoch: {epoch+1:3}  |  loss = {avg_loss:10.8f}  |  ' +
        f'train_accuracy = {avg_train_acc:6.4f}  |  ' +
        f'test_accuracy = {avg_test_acc:6.4f}')

  for idx, checkpoint in enumerate(checkpoints):
    checkpoint.save(file_prefix = ckpt_prefix+f'-{idx}')

print('Learning Finished!')
print(f'time elapsed : {(time.time() - start_time)/60:.4f} min.')

Learning started. It takes some time.
Epoch:   1  |  loss = 0.05480709  |  train_accuracy = 0.9396  |  test_accuracy = 0.9938
Epoch:   2  |  loss = 0.02539429  |  train_accuracy = 0.9959  |  test_accuracy = 0.9943
Epoch:   3  |  loss = 0.01816603  |  train_accuracy = 0.9976  |  test_accuracy = 0.9960
Epoch:   4  |  loss = 0.01358163  |  train_accuracy = 0.9986  |  test_accuracy = 0.9956
Epoch:   5  |  loss = 0.01055219  |  train_accuracy = 0.9991  |  test_accuracy = 0.9962
Epoch:   6  |  loss = 0.00614742  |  train_accuracy = 0.9996  |  test_accuracy = 0.9966
Epoch:   7  |  loss = 0.00448895  |  train_accuracy = 0.9998  |  test_accuracy = 0.9962
Epoch:   8  |  loss = 0.00382253  |  train_accuracy = 0.9998  |  test_accuracy = 0.9961
Epoch:   9  |  loss = 0.00324192  |  train_accuracy = 0.9999  |  test_accuracy = 0.9964
Epoch:  10  |  loss = 0.00286996  |  train_accuracy = 0.9999  |  test_accuracy = 0.9968
Epoch:  11  |  loss = 0.00185576  |  train_accuracy = 1.0000  |  test_accuracy = 0