In [None]:
import tensforflow as tf
import tensorflow.examples.tutorial.mnist import input_data
import tensorflow.contrib.layers import flatten
import numpy as np

#Load data sets from MNIST DATABASE
mnist = input_data.read_data_sets("MNIST_data/",reshape=False,one_hot=True)
train_features,train_labels = mnist.train.images, mnist.train.labels
validation_featurs,validation_labels   = mnist.validation.images, mnist.validation.labels
test_features, test_labels = mnist.test.images,mnist.test.labels

assert(len(train_features) == len(train_labels))
assert(len(validation_features) == len(validation_labels))
assert(len(test_features) == len(test_labels))

print()
print("Image Shape: {}".format(train_features[0].shape))
print()
print("Training Set:   {} samples".format(len(train_features)))
print("Validation Set: {} samples".format(len(validation_features)))
print("Test Set:       {} samples".format(len(test_features)))

In [None]:
#Visualize data
import random
import matplotlib.pyplot as plt
%matplotlib inline

index_image = random.randint(0,len(train_features))
image = train_features[index_image]

plt.figure(figsize=(1,1))
plt.imshow(image, cmap="gray")
print(train_labels[index_image])

In [None]:
#model parameters

EPOCHS = 10
BATCH_SIZE = 128
mu=0
sigma = 0.1
learning_rate = 0.001

#define 5-layer lenet CONV=>RELU=>MAXPOOL=>CONV=>RELU=>MAXPOOL=>FC1=>RELU=>FC2=>RELU=>FC3
def LeNet(input):
       
    wc1 = tf.Variable(tf.truncated_normal(shape=[5,5,1,6],mu=mu, stddev=sigma))
    bc1 = tf.Variable(tf.zeros(6))
    
    # conv layer 1 input=32,32,1 output = 28,28,6
    conv1 = tf.nn.conv2d(input,wc1,strides=[1,2,2,1],padding="VALID")
    conv1 = tf.add(conv1,bc1)
    
    #activation input = 28,28,6 output = 14,14,6
    conv1 = tf.nn.relu(conv1)
    
    #pooling
    conv1 = tf.nn.maxpool(conv1,ksize=[1,2,2,1],strides=[1,2,2,1],padding="VALID")
    
    #conv layer 2 input = 14,14,6 output = 10,10,16
    wc2 = tf.Variable(tf.truncated_normal([5,5,6,16],mu=mu,stddev=sigma))
    bc2 = tf.Variable(tf.zeros(16))
    
    conv2 = tf.nn.conv2d(conv1,wc2,strides=[1,2,2,1],padding="VALID")
    conv2 = tf.add(conv2,bc2)
    
    #activation
    conv2 = tf.nn.relu(conv2)
    
    # max pooling input = 10,10,16 output = 5,5,16   
    conv2 = tf.nn.maxpool(conv2,ksize=[1,2,2,1],strides=[1,2,2,1],padding='VALID')
    
    wd1 = tf.Variable(tf.truncated_normal([400,120],mu=mu,stddev=sigma))
    bd1 = tf.Variable(tf.zeros(120))
    
    # fully connected layer
    fc1 =  tf.reshape(conv2,[-1,wd1.get_shape().as_list()[0]])
    fc1 =  tf.add(tf.matmul(fc1,wd1),bd1)
    fc1 =  tf.nn.relu(fc1)
    
    wd2 = tf.Variable(tf.truncated_normal([120,84],mu=mu,stddev=sigma))
    bd2 = tf.Variable(tf.zeros(84))

    fc2 = tf.add(tf.matmul(fc1,wd2),bd2)
    fc2 = tf.nn.relu(fc2)
    
    wd3 = tf.Variable(tf.truncated_normal([84,10],mu=mu,stddev=sigma))
    bd3 = tf.Variable(tf.zeros(10))
    
    logits = tf.add(tf.matmul(fc2,wd3),bd3)
    
    return logits
    

#function to normalize input image
def normalize_grayscale(image_data):    

    features = []
    for dataItem in image_data:
        feature = 0.1 + ((dataItem - 0)*(0.8))/(255)
        features.append(feature)
    return np.array(features)	


#preprocess data sets
#normalize data sets
train_features = normalize_grayscale(train_features)
validation_features = normalize_grayscale(validation_features)
test_features       = normalize_grayscale(test_features)

#resize input features of dimention 28x28x1 to 32x32x1
#add 2 rows of '0' on either side of input (both width,height)
train_features = np.padd(train_features,[[0,0],[2,2],[2,2],[0,0]], mode="CONSTANT")
validation_features = np.padd(validation_features,[[0,0],[2,2],[2,2],[0,0]],mode="CONSTANT")
test_features = np.padd(test_features,[[0,0],[2,2],[2,2],[0,0]],mode="CONSTANT")

#shuffle train data sets
train_features, train_labels = shuffle(train_features, train_labels)

x = tf.placeholder(tf.float32,[None,32,32,1])  #None = batch_size  
y = tf.placeholder(tf.float32,[None,10])

logits = LeNet(x)

cost = tf.reduce_mean(tf.nn.softmax.cross_entropy_with_logits(logits=logits,labels=y))
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost)

saver =  tf.train.Saver()

#Validate model accuracy
correct_prediction = tf.equal(tf.argmax(fc2,1),tf.argmax(y,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction,tf.float32))

#evalute model procedures

def evaluate(x_data,y_data):
    
    num_examples = len(x_data)
    total_accuracy = 0
    total_loss = 0
    sess=tf.get_default_session()
    for step in range(0,num_examples,BATCH_SIZE):
        data_start=step
        data_end  = step+BATCH_SIZE
        batch_x, batch_y = x_data[data_start:data_end],y_data[data_start:data_end]
        loss = sess.run(cost,feed_dict={x: batch_x, y: batch_y})
        accu = sess.run(accuracy, feed_dict = {x: batch_x, y: batch_y})
        total_accuracy += accu*BATCH_SIZE
        total_loss     += loss*BATCH_SIZE
    
    return total_loss/num_examples, total_accuracy/num_examples

init = tf.global_variables_initializer()

with tf.Session as sess:
    sess.run(init)
    steps_per_epoch = len(train_features)//BATCH_SIZE
    num_examples = steps_per_epoch*BATCH_SIZE
  
    print("Training model...")
    print()
    for epoch_iter in range(EPOCHS)    
        for step in range(steps_per_epoch):
            batch_start = step*BATCH_SIZE
            batch_x, batch_y = train_features[batch_start:batch_start+BATCH_SIZE],train_labels[batch_start:batch_start+BATCH_SIZE]
            sess.run(optimizer, feed_dict={x: batch_x, y:batch_y})
         
        #steps_per_epoch = mnist.validation.num_examples // BATCH_SIZE
        #num_examples =  steps_per_epoch*BATCH_SIZE
        #for step in range(steps_per_epoch):
        #    batch_start = step*BATCH_SIZE
        #    batch_x, batch_y = validation_features[batch_start:batch_start+BATCH_SIZE]
        #    loss = sess.run(cost, feed_dict= {x:batch_x, y:batch_y})
        #    acc  = sess.run(accuracy,feed_dict={x: batch_x, y: batch_y})
        #    total_acc += (acc*batch_x.shape[0])
        #    total_loss += (loss*batch_x.shape[0])
        
        total_loss, total_acc = evaluate(validation_features,validation_labels)
        print("EPOCH {} ...".format(epoch_iter+1))
        print("Validation loss = {:.3f}".format(total_loss))
        print("Validation accuracy = {:.3f}".format(total_acc))
        print()

        saver.save(sess, './lenet')
        print('Model Saved')
        
    # Evaluate on the test data
    #steps_per_epoch = mnist.test.num_examples//BATCH_SIZE
    #num_examples = steps_per_epoch*BATCH_SIZE
    #for step in range(steps_per_epoch):
    #    batch_x, batch_y = mnist.test_next_batch(BATCH_SIZE)
    #    test_loss = sess.run(cost,feed_dict={x: batch_x, y: batch_y})
    #    test_acc  = sess.run(accuracy,feed_dict={x: batch_x, y: batch_y})
    #    total_acc += test_acc*batch_x.shape[0]
    #    total_loss += test_loss*batch_x.shape[0]


In [None]:
#test the model
print("Testing Lenet.......")

#restore the model
saver.restore(sess, tf.train.latest_checkpoint('.'))

total_loss , total_acc = evaluate(test_features,test_labels)
print("Test loss= {:.3f}".format(total_loss))
print("Test accuracy = {:.3f}".format(total_acc))

