<a href="https://colab.research.google.com/github/riorafe/Mini-ResNet-using-MNIST_dataset/blob/master/ResNet_using_MNIST_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Lets get started**


1. Mount google drive

In [0]:
from google.colab import drive
drive.mount('/content/drive')

Before begin to code, we should **understand the dataset**

Run the code below, I have prepared the demonstration

In [0]:
from tensorflow.examples.tutorials.mnist import input_data
import numpy as np


# import data and set one_hot param True so the label will be encoded with one_hot encoder automatically
mnist = input_data.read_data_sets('/content/drive/My Drive/MNIST_data/', one_hot=True)
# we take the 50 data per batch
x_batch, y_batch = mnist.train.next_batch(50)


print('\nAs we know, NIST dataset is image with 28x28 dimension, but in MNIST dataset, they got flatten')
print('\nShape of original x_batch (flatten): ', x_batch.shape)
print('Data type of original x_batch: ', x_batch.dtype, '\n')
print(x_batch[0]) # UNCOMMENT THIS ONE IF U WANT TO SEE
print('\nand its already got normalized by 255, so we dont need to normalize it again\n')


print('\nThen for label data,')
print('shape of original y_batch: ', y_batch.shape)
print('data type of original y_batch: ', y_batch.dtype, '\n')
print(y_batch[0])
print('Looks like our label data are ready to be trained\n')


print('\nBut i need the x_batch to be in [50, 28, 28, 1] shape to fit in my neural net')
x_batch = np.array([image.reshape(28, 28, 1) for image in x_batch])
print('so i do -> x_batch = np.array([image.reshape(28, 28, 1) for image in x_batch])')
print('\nShape of reshaped x_batch : ', x_batch.shape)
print('Data type of reshaped x_batch: ', x_batch.dtype, '\n')


2. Start to code

In [0]:
# ============================
# == Made by: Rio Rafelino ===
# Forky Engineering Specialist
# ============================

import tensorflow as tf 
import numpy as np
from tensorflow import keras
from tensorflow.examples.tutorials.mnist import input_data
from sklearn.preprocessing import OneHotEncoder

# variabel buat disesuaikan
dataDir = '/content/drive/My Drive/MNIST_data/'
modelDir = '/content/drive/My Drive/Model/'
imgSize = (28,28) # ukuran MNIST dataset
inputChannel = 1 # gambar sudah grayscale
num_output = 10 # klasifikasi digit menjadi 10 kelas
earlyStopping = 5

class Model:
  def __init__(self):
    self.x = tf.placeholder(tf.float32, shape = (None, imgSize[0], imgSize[1], inputChannel), name = 'x_placeholder')
    self.y_true = tf.placeholder(tf.float32, shape = [None, num_output], name= 'y_true_placeholder')
    self.y_prediction = tf.placeholder(tf.float32, shape = [None, num_output], name = 'y_prediction_placeholder')
    self.setupCNN()
    self.learningRate = 0.01
    self.loss = tf.nn.softmax_cross_entropy_with_logits_v2(labels = self.y_true, logits = self.y_prediction)
    self.optimizer = tf.train.AdamOptimizer(learning_rate = self.learningRate)
    self.train = self.optimizer.minimize(tf.reduce_mean(self.loss))
    (self.sess, self.saver) = self.setupTF()

  def setupCNN(self):
    input_img = tf.reshape(self.x, [-1, imgSize[0], imgSize[1], inputChannel])
    
    kernelVals = [5, 5, 3, 3, 3]
    featureVals = [1, 32, 64, 64, 128, 128, 256, 256]
    strideVals = poolVals = [(2,2), (2,2), (1,1), (1,1), (1,1)]
    numLayers = len(strideVals)

    layer = input_img

    for i in range (numLayers):
      shortcut_layer = layer

      initializer = tf.contrib.layers.xavier_initializer()

      kernel = tf.Variable(initializer([kernelVals[i], kernelVals[i], featureVals[i], featureVals[i + 1]]), name='k1_'+str(i))
      layer = tf.nn.conv2d(layer, kernel, padding='SAME',  strides=(1,1,1,1), name='c1_'+str(i))
      # layer = tf.layers.batch_normalization(layer, name='b1_'+str(i))
      layer = tf.layers.batch_normalization(layer)
      layer = tf.nn.relu(layer, name = 'r1_'+str(i))

      kernel = tf.Variable(initializer([kernelVals[i], kernelVals[i], featureVals[i+1], featureVals[i + 2]]), name='k2_'+str(i))
      layer = tf.nn.conv2d(layer, kernel, padding='SAME',  strides=(1,1,1,1), name='c2_'+str(i))
      # layer = tf.layers.batch_normalization(layer, name='b2_'+str(i))
      layer = tf.layers.batch_normalization(layer)
      layer = tf.nn.relu(layer, name = 'r2_'+str(i))

      if i >= 1:
        kernel = tf.Variable(initializer([kernelVals[i], kernelVals[i], featureVals[i+2], featureVals[i + 3]]), name='k3_'+str(i))
        layer = tf.nn.conv2d(layer, kernel, padding='SAME',  strides=(1,1,1,1), name='c3_'+str(i))
        # layer = tf.layers.batch_normalization(layer, name='b3_'+str(i))
        layer = tf.layers.batch_normalization(layer)

        shortcut_layer = tf.nn.conv2d(shortcut_layer, kernel, padding='SAME',  strides=(1,1,1,1), name ='cs_'+str(i))
        # shortcut_layer = tf.layers.batch_normalization(shortcut_layer, name ='bs_'+str(i))
        shortcut_layer = tf.layers.batch_normalization(shortcut_layer)

        layer = layer + shortcut_layer
        layer = tf.nn.relu(layer, name='r3_'+str(i))
      else:
        kernel = tf.Variable(initializer([kernelVals[i], kernelVals[i], featureVals[i+2], featureVals[i + 3]]), name='k3_'+str(i))
        layer = tf.nn.conv2d(layer, kernel, padding='SAME',  strides=(1,1,1,1), name='c3_'+str(i))
        # layer = tf.layers.batch_normalization(layer, name='b3_'+str(i))
        layer = tf.layers.batch_normalization(layer)
        layer = tf.nn.relu(layer, name='r3_'+str(i))
      
      layer = tf.nn.max_pool(layer, (1, poolVals[i][0], poolVals[i][1], 1), (1, strideVals[i][0], strideVals[i][1], 1), 'VALID', name='p_'+str(i))

    output_flatten = tf.reshape(layer, [-1, 7 * 7 * 256])

    # fully connected
    weight1 = tf.Variable(tf.random_normal([12544, 1568]))
    bias1 = tf.Variable(tf.random_normal([1568]))
    y1 = tf.matmul(output_flatten, weight1) + bias1

    weight2 = tf.Variable(tf.random_normal([1568, num_output]))
    bias2 = tf.Variable(tf.random_normal([num_output]))
    self.y_prediction = tf.matmul(y1, weight2) + bias2
    
  def setupTF(self):
    sess = tf.Session()
    saver = tf.train.Saver(max_to_keep=1)
    latestSnap = tf.train.latest_checkpoint(modelDir)
    # cek apakah ada trained model
    if latestSnap :
      saver.restore(sess, latestSnap)
    else:
      sess.run(tf.global_variables_initializer())
    return (sess,saver);

  def trainBatch(self, x_batch, y_batch, batch):
    self.learningRate = 0.01 if batch < 5 else (0.001 if batch < 50 else 0.0001) # learning rate decay
    self.sess.run(self.train, feed_dict={
        self.x: x_batch,
        self.y_true: y_batch
    })
  
  def validate(self, x_batch, y_batch):
    matches = tf.equal(tf.argmax(self.y_true, 1), tf.argmax(self.y_prediction, 1))
    acc = tf.reduce_mean(tf.cast(matches, tf.float32))
    acc_val = self.sess.run(acc, feed_dict={
        self.x: x_batch,
        self.y_true: y_batch
    })
    return acc_val
  
  def save(self):
    self.saver.save(self.sess, modelDir)
    
class Main:
  def __init__(self):
    model = Model()
    data = self.dataLoader()
    self.train(model, data)
  
  def dataLoader(self):
    try:
      mnist = input_data.read_data_sets('/content/drive/My Drive/MNIST_data/', one_hot=True)
      print("\nData loaded successfully\n")
      return mnist
    except:
      print("\nData failed to load\n")

  def train(self, model, data):
    epoch = 0
    noImprovementSince = 0
    bestAccVal = 0
    
    while True:
      epoch += 1
      # load dan preprocess data tiap batch
      x_batch, y_batch = data.train.next_batch(50)
      # reshape dari flatten balik ke bentuk image
      x_batch = np.array([image.reshape(28, 28, 1) for image in x_batch])
      y_batch = np.float32(y_batch)
      model.trainBatch(x_batch, y_batch, epoch)
      accVal = self.validate(model, data)
      print ('Batch ke: ', epoch, 'Akurasi: ', accVal*100)

      if accVal > bestAccVal:
        bestAccVal = accVal
        noImprovementSince = 0
        model.save()
        print('Model lebih baik, model disimpan..')
      else:
        noImprovementSince += 1
        print('Model tidak lebih baik, tidak ada perubahan sejak: ', noImprovementSince)
      
      if noImprovementSince >= earlyStopping:
        print('Training dihentikan')
        break
  
  def validate(self, model, data):
    x_batch = data.test.images
    y_batch = data.test.labels
    x_batch = np.array([image.reshape(28, 28, 1) for image in x_batch])
    y_batch = np.float32(y_batch)
    acc = model.validate(x_batch, y_batch)
    return acc

Main()