##**import**

In [1]:
import tensorflow as tf
import numpy as np
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.datasets import mnist
from time import time
import os
print(tf.__version__)

2.4.0


##**Checkpoint function**

In [2]:
def load(model, checkpoint_dir):
  print(" [*] Reading checkpoints...")

  ckpt = tf.train.get_checkpoint_state(checkpoint_dir)
  if ckpt:
    ckpt_name = os.path.basename(ckpt.model_checkpoint_path)
    checkpoint = tf.train.Checkpoint(dnn=model)
    checkpoint.restore(save_path=os.path.join(checkpoint_dir, ckpt_name))
    counter = int(ckpt_name.split('-')[1])
    print(" [*] Success to read {}".format(ckpt_name))
    return True, counter
  else:
    print(" [*] Failed to find a checkpoint")
    return False, 0

def check_folder(dir):
  if not os.path.exists(dir):
    os.makedirs(dir)
  return dir

##**Data load & pre-processing function**

In [3]:
def load_mnist():
  (train_data, train_labels), (test_data, test_labels) = mnist.load_data()
  train_data = np.expand_dims(train_data, axis=-1)
  test_data = np.expand_dims(test_data, axis=-1)

  train_data, test_data = normalize(train_data, test_data)

  train_labels = to_categorical(train_labels, 10)
  test_labels = to_categorical(test_labels, 10)

  return train_data, train_labels, test_data, test_labels

def normalize(train_data, test_data):
  train_data = train_data.astype(np.float32) / 255.0
  test_data = test_data.astype(np.float32) / 255.0
  return train_data, test_data

##**Performance function**

In [4]:
def loss_fn(model, images, labels):
  logits = model(images, training=True)
  loss = tf.reduce_mean(tf.keras.losses.categorical_crossentropy(y_pred=logits, y_true=labels, from_logits=True))
  return loss

def accuracy_fn(model, images, labels):
  logits = model(images, training=False)
  prediction = tf.equal(tf.argmax(logits, -1), tf.argmax(labels, -1))
  accuracy = tf.reduce_mean(tf.cast(prediction, tf.float32))
  return accuracy

def grad(model, images, labels):
  with tf.GradientTape() as tape:
    loss = loss_fn(model, images, labels)
  return tape.gradient(loss, model.variables)

##**Model function**

In [5]:
def flatten():
  return tf.keras.layers.Flatten()

def dense(label_dim, weight_init):
  return tf.keras.layers.Dense(units=label_dim, use_bias=True, kernel_initializer=weight_init)

def relu():
  return tf.keras.layers.Activation(tf.keras.activations.relu)

##**Create model(class version)**

In [6]:
class create_model_class(tf.keras.Model):
  def __init__(self, label_dim):
    super(create_model_class, self).__init__()
    weight_init = tf.keras.initializers.glorot_uniform()

    self.model = tf.keras.Sequential()
    self.model.add(flatten())

    for i in range(4):
      self.model.add(dense(512, weight_init))
      self.model.add(relu())
    
    self.model.add(dense(label_dim, weight_init))

  def call(self, x, training=None, mask=None):
    x = self.model(x)
    return x

##**Create model(function version)**

In [7]:
def create_model_function(label_dim):
  weight_init = tf.keras.initializers.glorot_uniform()

  model = tf.keras.Sequential()
  model.add(flatten())

  for i in range(4):
    model.add(dense(512, weight_init))
    model.add(relu())

  model.add(dense(label_dim, weight_init))

  return model

##**Define data & hyper-parameter**

In [8]:
train_x, train_y, test_x, test_y = load_mnist()

learning_rate = 0.001
batch_size = 128

training_epochs = 1
training_iterations = len(train_x) // batch_size

label_dim = 10

train_flag = True

train_dataset = tf.data.Dataset.from_tensor_slices((train_x, train_y)).\
    shuffle(buffer_size=100000).\
    prefetch(buffer_size=batch_size).\
    batch(batch_size, drop_remainder=True)

test_dataset = tf.data.Dataset.from_tensor_slices((test_x, test_y)).\
    shuffle(buffer_size=100000).\
    prefetch(buffer_size=len(test_x)).\
    batch(len(test_x))

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


##**Define model & optimizer & writer**

In [9]:
network = create_model_function(label_dim)

optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

checkpoint_dir = 'checkpoints'
logs_dir = 'logs'
model_dir = 'nn_deep'

checkpoint_dir = os.path.join(checkpoint_dir, model_dir)
check_folder(checkpoint_dir)
checkpoint_prefix = os.path.join(checkpoint_dir, model_dir)
logs_dir = os.path.join(logs_dir, model_dir)

##**Restore checkpoint & start train or test phase**


In [10]:
if train_flag :

    checkpoint = tf.train.Checkpoint(dnn=network)

    summary_writer = tf.summary.create_file_writer(logdir=logs_dir)
    start_time = time()

    could_load, checkpoint_counter = load(network, checkpoint_dir)    

    if could_load:
        start_epoch = (int)(checkpoint_counter / training_iterations)        
        counter = checkpoint_counter        
        print(" [*] Load SUCCESS")
    else:
        start_epoch = 0
        start_iteration = 0
        counter = 0
        print(" [!] Load failed...")
    
    with summary_writer.as_default():  
        for epoch in range(start_epoch, training_epochs):
            for idx, (train_input, train_label) in enumerate(train_dataset):                
                grads = grad(network, train_input, train_label)
                optimizer.apply_gradients(grads_and_vars=zip(grads, network.variables))

                train_loss = loss_fn(network, train_input, train_label)
                train_accuracy = accuracy_fn(network, train_input, train_label)

                for test_input, test_label in test_dataset:                
                    test_accuracy = accuracy_fn(network, test_input, test_label)

                tf.summary.scalar(name='train_loss', data=train_loss, step=counter)
                tf.summary.scalar(name='train_accuracy', data=train_accuracy, step=counter)
                tf.summary.scalar(name='test_accuracy', data=test_accuracy, step=counter)

                print(
                    "Epoch: [%2d] [%5d/%5d] time: %4.4f, train_loss: %.8f, train_accuracy: %.4f, test_Accuracy: %.4f" \
                    % (epoch, idx, training_iterations, time() - start_time, train_loss, train_accuracy,
                       test_accuracy))
                counter += 1
        checkpoint.save(file_prefix=checkpoint_prefix + '-{}'.format(counter))
              
else :
    _, _ = load(network, checkpoint_dir)
    for test_input, test_label in test_dataset:    
        test_accuracy = accuracy_fn(network, test_input, test_label)

    print("test_Accuracy: %.4f" % (test_accuracy))

 [*] Reading checkpoints...
 [*] Failed to find a checkpoint
 [!] Load failed...
Epoch: [ 0] [    0/  468] time: 1.3946, train_loss: 2.04618406, train_accuracy: 0.4766, test_Accuracy: 0.2935
Epoch: [ 0] [    1/  468] time: 1.9490, train_loss: 1.94298220, train_accuracy: 0.4922, test_Accuracy: 0.3949
Epoch: [ 0] [    2/  468] time: 2.4984, train_loss: 1.71891379, train_accuracy: 0.5547, test_Accuracy: 0.4039
Epoch: [ 0] [    3/  468] time: 3.0393, train_loss: 1.61172628, train_accuracy: 0.5781, test_Accuracy: 0.5402
Epoch: [ 0] [    4/  468] time: 3.5894, train_loss: 1.41488147, train_accuracy: 0.7109, test_Accuracy: 0.7236
Epoch: [ 0] [    5/  468] time: 4.1456, train_loss: 1.08942461, train_accuracy: 0.7891, test_Accuracy: 0.7326
Epoch: [ 0] [    6/  468] time: 4.6846, train_loss: 0.83004171, train_accuracy: 0.7734, test_Accuracy: 0.7281
Epoch: [ 0] [    7/  468] time: 5.2321, train_loss: 0.74739665, train_accuracy: 0.7969, test_Accuracy: 0.7825
Epoch: [ 0] [    8/  468] time: 5.7755,

#**Test accuracy :96.30%**
