<a href="https://colab.research.google.com/github/kugmax/tensorflow_learn/blob/master/Digit_Recognizer_(MNIST).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tensorflow as tf
import tensorflow.contrib as tf_contrib
import numpy as np
from keras.datasets import mnist
from keras.utils import to_categorical

from tensorflow.python.framework import ops
import time
import random

Using TensorFlow backend.


In [0]:
ops.reset_default_graph()

In [0]:
#from tensorboardcolab import *

In [0]:
#tbc=TensorBoardColab()

In [0]:
def normalize(X_train, X_test):

  mean = np.mean(X_train, axis=(0, 1, 2, 3))
  std = np.std(X_train, axis=(0, 1, 2, 3))

  X_train = (X_train - mean) / std
  X_test = (X_test - mean) / std

  return X_train, X_test

In [0]:
def load_mnist() :
    (train_data, train_labels), (test_data, test_labels) = mnist.load_data()
    train_data = np.expand_dims(train_data, axis=-1)
    test_data = np.expand_dims(test_data, axis=-1)

#  TODO:   Why? For it there is batch norm in restnet
    train_data, test_data = normalize(train_data, test_data)

    train_labels = to_categorical(train_labels, 10)
    test_labels = to_categorical(test_labels, 10)

    seed = 7
    np.random.seed(seed)
    np.random.shuffle(train_data)
    np.random.seed(seed)
    np.random.shuffle(train_labels)
    
    return train_data, train_labels, test_data, test_labels

In [0]:
train_x, train_y, test_x, test_y = load_mnist()

In [8]:
train_x.shape, train_y.shape, test_x.shape, test_y.shape

((60000, 28, 28, 1), (60000, 10), (10000, 28, 28, 1), (10000, 10))

In [0]:
img_size = 28
c_dim = 1
label_dim = 10

checkpoint_dir = './checkpoint_dir'
log_dir = './log'

epoches = 100
batch_size = 100
iteration = len(train_x) // batch_size

init_lr = 0.001

In [0]:
#train_y[55]

In [0]:
#train_x[55]

In [0]:
import matplotlib.pyplot as plt

In [0]:
def show_img(img, title=""):
    plt.imshow(img, cmap="gray")
    plt.title(title)

In [0]:
#show_img(train_x[55], train_y[55])

In [0]:
weight_init = tf_contrib.layers.variance_scaling_initializer()
weight_regularizer = tf_contrib.layers.l2_regularizer(0.0001)

In [0]:
def conv(x, channels, kernel=4, stride=2, padding='SAME', use_bias=True, scope='conv_0'):
  with tf.variable_scope(scope):
    x = tf.layers.conv2d(inputs=x, filters=channels,
                         kernel_size=kernel, kernel_initializer=weight_init,
                         kernel_regularizer=weight_regularizer,
                         strides=stride, use_bias=use_bias, padding=padding)

    return x

In [0]:
def batch_norm(x, is_training=True, scope='batch_norm'):
  return tf_contrib.layers.batch_norm(x,
                                      decay=0.9, epsilon=1e-05,
                                      center=True, scale=True, updates_collections=None,
                                      is_training=is_training, scope=scope)

In [0]:
def relu(x):
    return tf.nn.relu(x)

In [0]:
def global_avg_pooling(x):
    gap = tf.reduce_mean(x, axis=[1, 2], keepdims=True)
    return gap

In [0]:
def flatten(x) :
    return tf.layers.flatten(x)

In [0]:
def fully_conneted(x, units, use_bias=True, scope='fully_0'):
    with tf.variable_scope(scope):
        x = flatten(x)
        x = tf.layers.dense(x, units=units, kernel_initializer=weight_init, kernel_regularizer=weight_regularizer, use_bias=use_bias)

        return x

In [0]:
#  bn-relu-conv2d-bn-relu-conv2d->+input
   
def resblock(x_init, channels, is_training=True, downsample=False, use_bias=True, scope='resblock'):  
  with tf.variable_scope(scope):
      x = x_init
    
      x = batch_norm(x, is_training, scope='batch_norm_0')
      x = relu(x)    
      if downsample :
        x = conv(x, channels, kernel=3, stride=2, use_bias=use_bias, scope='conv_0')
        x_init = conv(x_init, channels, kernel=1, stride=2, use_bias=use_bias, scope='conv_init')
      else :
        x = conv(x, channels, kernel=3, stride=1, use_bias=use_bias, scope='conv_0')

      x = batch_norm(x, is_training, scope='batch_norm_1')
      x = relu(x)
      x = conv(x, channels, kernel=3, stride=1, use_bias=use_bias, scope='conv_1')

      return x + x_init
   

In [0]:
def model(x, is_training=True, reuse=False):
  with tf.variable_scope("model", reuse=reuse):
      residual_list = [3, 4, 6, 3]
      ch = 32

      x = conv(x, channels=ch, kernel=3, stride=1, scope='conv')      
      for i in range(1, residual_list[0]):
        x = resblock(x, channels=ch, is_training=is_training, scope='resblock0_' + str(i))


      x = resblock(x, channels=ch*2, is_training=is_training, downsample=True, scope='resblock1_0')
      for i in range(1, residual_list[1]):
        x = resblock(x, channels=ch*2, is_training=is_training, scope='resblock1_' + str(i))


      x = resblock(x, channels=ch*4, is_training=is_training, downsample=True, scope='resblock2_0')
      for i in range(1, residual_list[2]):
        x = resblock(x, channels=ch*4, is_training=is_training, scope='resblock2_' + str(i))


      x = resblock(x, channels=ch*8, is_training=is_training, downsample=True, scope='resblock3_0')
      for i in range(1, residual_list[3]):
        x = resblock(x, channels=ch*8, is_training=is_training, scope='resblock3_' + str(i))


      x = batch_norm(x, is_training, scope='batch_norm')
      x = relu(x)

      x = global_avg_pooling(x)
      x = fully_conneted(x, units=label_dim, scope='logit')

      print(x.shape) #must be 10

      return x

In [0]:
def classification_loss(logit, label) :
    loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(labels=label, logits=logit))
    prediction = tf.equal(tf.argmax(logit, -1), tf.argmax(label, -1))
    accuracy = tf.reduce_mean(tf.cast(prediction, tf.float32))

    return loss, accuracy

In [25]:
""" Graph Input """
train_inptus = tf.placeholder(tf.float32, [batch_size, img_size, img_size, c_dim], name='train_inputs')
train_labels = tf.placeholder(tf.float32, [batch_size, label_dim], name='train_labels')

test_inptus = tf.placeholder(tf.float32, [len(test_x), img_size, img_size, c_dim], name='test_inputs')
test_labels = tf.placeholder(tf.float32, [len(test_y), label_dim], name='test_labels')

lr = tf.placeholder(tf.float32, name='learning_rate')

""" Model """
train_logits = model(train_inptus)
test_logits = model(test_inptus, is_training=False, reuse=True)

train_loss, train_accuracy = classification_loss(logit=train_logits, label=train_labels)
test_loss, test_accuracy = classification_loss(logit=test_logits, label=test_labels)


""" Training """
optim = tf.train.MomentumOptimizer(lr, momentum=0.9).minimize(train_loss)

"""" Summary """
summary_train_loss = tf.summary.scalar("train_loss", train_loss)
summary_train_accuracy = tf.summary.scalar("train_accuracy", train_accuracy)

summary_test_loss = tf.summary.scalar("test_loss", test_loss)
summary_test_accuracy = tf.summary.scalar("test_accuracy", test_accuracy)

train_summary = tf.summary.merge([summary_train_loss, summary_train_accuracy])
test_summary = tf.summary.merge([summary_test_loss, summary_test_accuracy])

(100, 10)
(10000, 10)


In [0]:
def _random_crop(batch, crop_shape, padding=None):
    oshape = np.shape(batch[0])

    if padding:
        oshape = (oshape[0] + 2 * padding, oshape[1] + 2 * padding)
    new_batch = []
    npad = ((padding, padding), (padding, padding), (0, 0))
    for i in range(len(batch)):
        new_batch.append(batch[i])
        if padding:
            new_batch[i] = np.lib.pad(batch[i], pad_width=npad,
                                      mode='constant', constant_values=0)
        nh = random.randint(0, oshape[0] - crop_shape[0])
        nw = random.randint(0, oshape[1] - crop_shape[1])
        new_batch[i] = new_batch[i][nh:nh + crop_shape[0],
                       nw:nw + crop_shape[1]]
    return new_batch

In [0]:
def data_augmentation(batch, img_size):
  batch = _random_crop(batch, [img_size, img_size], 4)
  return batch


In [0]:
sess = tf.Session()
init = tf.global_variables_initializer()
sess.run(init)
#ops.reset_default_graph()

In [0]:
model_dir = "{}{}_{}_{}_{}".format('rest_net', '34', 'MNIST', batch_size, init_lr)
# summary writer
writer = tf.summary.FileWriter(log_dir + '/' + model_dir, sess.graph)

epoch_lr = init_lr
start_epoch = 0
start_batch_id = 0
counter = 1
    
# loop for epoch
start_time = time.time()
for epoch in range(start_epoch, epoches):
    if epoch == int(epoch * 0.5) or epoch == int(epoch * 0.75) :
        epoch_lr = epoch_lr * 0.1                 

    # get batch data
    for idx in range(start_batch_id, iteration):
        batch_x = train_x[idx*batch_size:(idx+1)*batch_size]
        batch_y = train_y[idx*batch_size:(idx+1)*batch_size]

#TODO:         I have to try with different augmn
        batch_x = data_augmentation(batch_x, img_size)        
  
        
  
        train_feed_dict = {
            train_inptus : batch_x,
            train_labels : batch_y,
            lr : epoch_lr
        }

        test_feed_dict = {
            test_inptus : test_x,
            test_labels : test_y
        }
                      

        #train
        _, summary_str, train_loss_tmp, train_accuracy_tmp = sess.run(
            [optim, train_summary, train_loss, train_accuracy],
            feed_dict=train_feed_dict
        )
        writer.add_summary(summary_str, counter)

        # test
        summary_str, test_loss_tmp, test_accuracy_tmp = sess.run(
            [test_summary, test_loss, test_accuracy], feed_dict=test_feed_dict)
        writer.add_summary(summary_str, counter)

        # display training status
        counter += 1                        

    # After an epoch, start_batch_id is set to zero
    # non-zero value is only for the first epoch after loading pre-trained model
    start_batch_id = 0    
    
  if (epoch+1) % 25 == 0:
    print("Epoch: [%2d] [%5d/%5d] time: %4.4f, train_accuracy: %.2f, test_accuracy: %.2f, learning_rate : %.4f" \
          % (epoch, idx, iteration, time.time() - start_time, train_accuracy_tmp, test_accuracy_tmp, epoch_lr))    