In [1]:
from tensorflow.examples.tutorials.mnist import input_data
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np

In [2]:
def get_shape(tensor):
    "return the shape of tensor as list"
    return tensor.get_shape().as_list()

def print_var(string):
    "print var_name and var_shape in collection named string"
    print("\n".join(["{} : {}".format(v.name, get_shape(v)) for v in tf.get_collection(string)]))
    
def clip(x, vmax = 1, vmin = 1e-10):
    "clip the x between vmax, vmin"
    return tf.clip_by_value(x, clip_value_max=vmax, clip_value_min=vmin)

def batch_norm(x, phase_train, name= None):
    '''
        batch normalization for 4D tensor
    '''
    nchannel = get_shape(x)[3]
    with tf.variable_scope(name or "batch_norm"):
        beta = tf.Variable(tf.constant(0.01, shape = [nchannel]), name = 'beta')
        gamma = tf.Variable(tf.constant(1.0, shape = [nchannel]), name = 'gamma')
        batch_mean, batch_var = tf.nn.moments(x, [0, 1, 2], name = 'moments')
        ema = tf.train.ExponentialMovingAverage(decay = 0.7)
        def mean_var_with_update():
            ema_apply_op = ema.apply([batch_mean, batch_var])
            with tf.control_dependencies([ema_apply_op]):
                return tf.identity(batch_mean), tf.identity(batch_var) 

        mean, var = tf.cond(tf.cast(phase_train, tf.bool), mean_var_with_update, lambda:(ema.average(batch_mean), ema.average(batch_var))) 
    normed = tf.nn.batch_normalization(x=x, mean=mean, variance=var, offset=beta, scale=gamma, variance_epsilon=1e-3)
    return normed

def convolution_layer(input_, filter_shape, istrain, scope = None):
    with tf.variable_scope(scope or "convolution_layer"):
        w = tf.Variable(initial_value = tf.random_normal(shape = filter_shape, mean = 0.0, stddev = 0.1),
                        name = 'weight')
        conv = tf.nn.conv2d(input_, w, strides = [1, 1, 1, 1], padding = "VALID")
        norm = batch_norm(conv, istrain) 
    return norm

In [3]:
mnist = input_data.read_data_sets('../MNIST_data', one_hot=True)

Extracting ../MNIST_data/train-images-idx3-ubyte.gz
Extracting ../MNIST_data/train-labels-idx1-ubyte.gz
Extracting ../MNIST_data/t10k-images-idx3-ubyte.gz
Extracting ../MNIST_data/t10k-labels-idx1-ubyte.gz


In [4]:
train = {}
test = {}
val = {}
train['input'] = mnist.train.images
train['output'] = mnist.train.labels
test['input'] = mnist.test.images
test['output'] = mnist.test.labels
val['input'] = mnist.validation.images
val['output'] = mnist.validation.labels

In [5]:
# Model parameter
image_size = 28*28
image_row = 28
conv_filter1 = [3, 3]
conv_channel1 = 32
conv_filter2 = [3, 3]
conv_channel2 = 64
conv_filter3 = [3, 3]
conv_channel3 = 64
conv_filter4 = [3, 3]
conv_channel4 = 128
conv_filter5 = [3, 3]
conv_channel5 = 128
conv_filter6 = [3, 3]
conv_channel6 = 256
conv_filter7 = [3, 3]
conv_channel7 = 10
nclasses = 10

# Train parameter
learning_rate = 1e-4
epoch = 20
ntrain = len(train['input'])
batch_size = 200
nbatch = ntrain//batch_size
log_every = 50

In [6]:
x = tf.placeholder(dtype = tf.float32, shape = [None, image_size], name = 'image')
y = tf.placeholder(dtype = tf.float32, shape = [None, nclasses], name = 'label')
istrain = tf.placeholder(dtype = tf.bool)

x_reshape = tf.reshape(x, [-1, image_row, image_row, 1]) # 4D 
conv1 = convolution_layer(x_reshape, [conv_filter1[0], conv_filter1[1], 1, conv_channel1], istrain, "conv_layer1")
# 26*26*32
relu1 = tf.nn.relu(conv1)
pool1 = tf.nn.max_pool(relu1, ksize = [1, 2, 2, 1], strides = [1, 2, 2, 1], padding = 'VALID')
# 13*13*32
conv2 = convolution_layer(pool1, [conv_filter2[0], conv_filter2[1], conv_channel1, conv_channel2], istrain, "conv_layer2")
# 11*11*64
relu2 = tf.nn.relu(conv2)
conv3 = convolution_layer(relu2, [conv_filter3[0], conv_filter3[1], conv_channel2, conv_channel3], istrain, "conv_layer3")
# 9*9*64
relu3 = tf.nn.relu(conv3)
conv4 = convolution_layer(relu3, [conv_filter3[0], conv_filter3[1], conv_channel3, conv_channel4], istrain, "conv_layer4")
# 7*7*128
relu4 = tf.nn.relu(conv4)
conv5 = convolution_layer(relu4, [conv_filter3[0], conv_filter3[1], conv_channel4, conv_channel5], istrain, "conv_layer5")
# 5*5*128
relu5 = tf.nn.relu(conv5)
conv6 = convolution_layer(relu5, [conv_filter3[0], conv_filter3[1], conv_channel5, conv_channel6], istrain, "conv_layer6")
# 3*3*256
relu6 = tf.nn.relu(conv6)
conv7 = convolution_layer(relu6, [conv_filter3[0], conv_filter3[1], conv_channel6, conv_channel7], istrain, "conv_layer7")
# 1*1*10

In [7]:
conv_shape = get_shape(conv7)    
flat_size = conv_shape[1]*conv_shape[2]*conv_shape[3]
y_hat = tf.reshape(conv7, [-1, flat_size])

In [8]:
print_var("trainable_variables")

conv_layer1/weight:0 : [3, 3, 1, 32]
conv_layer1/batch_norm/beta:0 : [32]
conv_layer1/batch_norm/gamma:0 : [32]
conv_layer2/weight:0 : [3, 3, 32, 64]
conv_layer2/batch_norm/beta:0 : [64]
conv_layer2/batch_norm/gamma:0 : [64]
conv_layer3/weight:0 : [3, 3, 64, 64]
conv_layer3/batch_norm/beta:0 : [64]
conv_layer3/batch_norm/gamma:0 : [64]
conv_layer4/weight:0 : [3, 3, 64, 128]
conv_layer4/batch_norm/beta:0 : [128]
conv_layer4/batch_norm/gamma:0 : [128]
conv_layer5/weight:0 : [3, 3, 128, 128]
conv_layer5/batch_norm/beta:0 : [128]
conv_layer5/batch_norm/gamma:0 : [128]
conv_layer6/weight:0 : [3, 3, 128, 256]
conv_layer6/batch_norm/beta:0 : [256]
conv_layer6/batch_norm/gamma:0 : [256]
conv_layer7/weight:0 : [3, 3, 256, 10]
conv_layer7/batch_norm/beta:0 : [10]
conv_layer7/batch_norm/gamma:0 : [10]


In [12]:
cross_entropy = -tf.reduce_mean(y*tf.log(clip(y_hat)))

correct_prediction = tf.equal(tf.argmax(y_hat, 1), tf.argmax(y, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

run_train = tf.train.AdamOptimizer(learning_rate).minimize(cross_entropy)

sess = tf.Session()
sess.run(tf.global_variables_initializer())

In [None]:
train_accuracy_tract = []
val_accuracy_tract = []

for epoch_ in range(epoch):
    index = np.arange(len(train['input']))
    np.random.shuffle(index)
    trX = train['input'][index]
    trY = train['output'][index]
    val_feed_dict = {x : val['input'], 
                     y : val['output'],
                     istrain : False}
    
    for nbatch_ in range(nbatch):
        train_feed_dict = {x : trX[batch_size*nbatch_:batch_size*(nbatch_+1)], 
                           y : trY[batch_size*nbatch_:batch_size*(nbatch_+1)],
                           istrain : True}
        sess.run(run_train, feed_dict=train_feed_dict)
        
        if nbatch_%log_every == log_every - 1:
            train_accuracy = 100*sess.run(accuracy, feed_dict=train_feed_dict)
            val_accuracy = 100*sess.run(accuracy, feed_dict= val_feed_dict)
            train_accuracy_tract.append(train_accuracy)
            val_accuracy_tract.append(val_accuracy)
            print("Epoch(%d/%d) %d th batch train : %.1f %%, validation : %.1f %%"%(epoch_+1, epoch, nbatch_+1, train_accuracy, val_accuracy))

test_feed_dict = {x : test['input'],  
                  y : test['output'],
                  istrain : False}

test_accuracy = 100*sess.run(accuracy, feed_dict=test_feed_dict)
print("Test accuracy=%.1f %%"%(test_accuracy))

plt.plot(train_accuracy_tract, 'r-', label = 'Train accuracy')
plt.plot(val_accuracy_tract, 'b-', label = 'Validation accuracy')
plt.legend()
plt.show()

Epoch(1/20) 50 th batch train : 28.0 %, validation : 25.7 %
