In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds
import numpy as np
import scrapbook as sb
import time

In [None]:
LEAKY_RELU_ALPHA = 0.3     # Keras default
BATCH_NORM_MOMENTUM = 0.99 # Keras default

conv_0_0_params = dict(filters=16)
conv_0_1_params = dict(filters=16)
conv_1_0_params = dict(filters=32)
conv_1_1_params = dict(filters=32)

pooling = 'max'
reduce = 'flatten'
extra_dense = False
extra_dense_units = 128

train_batch_size = 20
num_epochs = 1

eval_batch_size = 20
prefetch_size = 100
shuffle_buffer_size = 1000

In [None]:
tf.enable_eager_execution()
tf.logging.set_verbosity(tf.logging.ERROR)

In [None]:
ds, info = tfds.load('fashion_mnist', split=['train', 'test'], with_info=True)

In [None]:
# https://www.tensorflow.org/tutorials/eager/custom_layers
class ConvBlock(tf.keras.Model):
    def __init__(self, 
                 filters=16, 
                 kernel_size=3, 
                 strides=1,
                 padding='same', 
                 activation='leaky_relu',
                 batch_normalization=True, 
                 conv_first=True):
        """2D Convolution -> Batch Normalization -> Activation stack builder

        # Arguments
            ## Conv2D features:
            num_filters (int): number of filters used by Conv2D
            kernel_size (int): square kernel dimension
            strides (int): square stride dimension
            padding (str): one of 'same' or 'valid'

            ## Other cell features
            activation (string): name of activation function to be used or None
            batch_normalization (bool): whether to use batch normalization
            conv_first (bool): conv -> bn         -> activation, if True; 
                               bn   -> activation -> conv,       if False
        """
        super(ConvBlock, self).__init__(name='')

        self.conv_first = conv_first
        self.conv = tf.keras.layers.Conv2D(
            filters, 
            kernel_size=kernel_size,
            strides=strides,
            padding=padding)
        
        if batch_normalization:
            self.batch_norm = \
                tf.keras.layers.BatchNormalization(momentum=BATCH_NORM_MOMENTUM)
        else:
            self.batch_norm = None
        
        # Determine which activation function to use:
        if isinstance(activation, str):
            if activation.lower() == 'leaky_relu':
                self.activation_fn = \
                    tf.keras.layers.LeakyReLU(alpha=LEAKY_RELU_ALPHA)
            else:
                self.activation_fn = \
                    tf.keras.layers.Activation(activation) # May raise an error
        else:
            self.activation_fn = None

    def call(self, input_tensor, training=False):
        x = input_tensor
        if self.conv_first:
            x = self.conv(x)
            if self.batch_norm is not None:
                x = self.batch_norm(x, training=training)
            if self.activation_fn is not None:
                x = self.activation_fn(x)
        else:
            if self.batch_norm is not None:
                x = self.batch_norm(x, training=training)
            if self.activation_fn is not None:
                x = self.activation_fn(x)
            x = self.conv(x)
        return x

In [None]:
# Select the pooling method
if pooling.lower() == 'max':
    Pooling2D = tf.keras.layers.MaxPooling2D
else:
    assert (pooling.lower() == 'average')
    Pooling2D = tf.keras.layers.AveragePooling2D
    
# Select reduce method
if reduce.lower() == 'flatten':
    Reduce = tf.keras.layers.Flatten
else:
    assert (reduce.lower() == 'gap')
    Reduce = tf.keras.layers.GlobalAveragePooling2D

In [None]:
inputs = tf.keras.Input(shape=(28, 28, 1))
x = inputs
x = ConvBlock(**conv_0_0_params)(x)    # conv_0_0
x = ConvBlock(**conv_0_1_params)(x)    # conv_0_1
x = Pooling2D()(x)
x = ConvBlock(**conv_1_0_params)(x)    # conv_1_0
x = ConvBlock(**conv_1_1_params)(x)    # conv_1_1
x = Reduce()(x)
if extra_dense == True:
    x = tf.keras.layers.Dense(extra_dense_units)(x)
x = tf.keras.layers.Dense(10, activation='softmax')(x)

In [None]:
model = tf.keras.Model(inputs=inputs, outputs=x)

In [None]:
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

In [None]:
model.summary()

In [None]:
param_count = model.count_params()

In [None]:
fashion_train, fashion_test = ds

In [None]:
def parser(example):
    return example["image"] / 255, example["label"]

In [None]:
fashion_train = fashion_train.shuffle(shuffle_buffer_size).map(parser).batch(train_batch_size).prefetch(prefetch_size)

In [None]:
time_per_epoch = []
loss = []
accuracy = []
for i in range(1, num_epochs + 1):
    fashion_iterator = fashion_train.make_one_shot_iterator()
    
    start = time.time()
    hist = model.fit(x=fashion_iterator, steps_per_epoch=60000 // train_batch_size, epochs=1)
    end = time.time()
    
    time_per_epoch.append(end - start)
    loss.append(hist.history['loss'][0])
    accuracy.append(hist.history['acc'][0])

In [None]:
fashion_test = fashion_test.map(parser).batch(eval_batch_size).prefetch(prefetch_size)

In [None]:
test_iterator = fashion_test.make_one_shot_iterator()
eval_results = model.evaluate(x=test_iterator, steps=10000 // eval_batch_size)

In [None]:
sb.glue("time_per_epoch", time_per_epoch)
sb.glue("loss", loss)
sb.glue("accuracy", accuracy)
sb.glue("param_count", param_count)
sb.glue("eval_results", eval_results)