# Classification on MNIST with CNN

This code is supporting material for the book Building Machine Learning Systems with Python by Willi Richert, Luis Pedro Coelho and Matthieu Brucher published by PACKT Publishing

It is made available under the MIT License

Let's try to classify the MNIST database (written digits) with a convolutional network.

We will start with some hyper parameters

In [None]:
import tensorflow as tf
import numpy as np

n_epochs = 10
learning_rate = 0.0002
batch_size = 128
image_shape = [28,28,1]
step = 1000
export_dir = "data/classifier-mnist"
dim_W1 = 1024
dim_W2 = 128
dim_W3 = 64
dropout_rate = 0.1

It is time to load the data and shape it as we want

In [None]:
from sklearn.datasets import fetch_mldata
mnist = fetch_mldata('MNIST original')
mnist.data.shape = (-1, 28, 28)
mnist.data = mnist.data.astype(np.float32).reshape( [-1, 28, 28, 1]) / 255.
mnist.num_examples = len(mnist.data)
mnist.labels = mnist.target.astype(np.int64)

We should split our data between training and testing data (6 to 1)

In [None]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(mnist.data, mnist.labels, test_size=(1. / 7.))

The convolutional network builder will be stored in a class

In [None]:
class CNN():
    def __init__(
            self,
            image_shape=[28,28,1],
            dim_W1=1024,
            dim_W2=128,
            dim_W3=64,
            classes=10
            ):

        self.image_shape = image_shape

        self.dim_W1 = dim_W1
        self.dim_W2 = dim_W2
        self.dim_W3 = dim_W3
        self.classes = classes

    def build_model(self):
        image = tf.placeholder(tf.float32, [None]+self.image_shape, name="image")
        Y = tf.placeholder(tf.int64, [None], name="label")
        training = tf.placeholder(tf.bool, name="is_training")

        probabilities = self.discriminate(image, training)
        cost = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(labels=Y, logits=probabilities))
        accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.argmax(probabilities, axis=1), Y), tf.float32), name="accuracy")

        return image, Y, cost, accuracy, probabilities, training

    def create_conv2d(self, input, filters, kernel_size, name):
        layer = tf.layers.conv2d(
                    inputs=input,
                    filters=filters,
                    kernel_size=kernel_size,
                    activation=tf.nn.leaky_relu,
                    name="Conv2d_" + name,
                    padding="same")
        return layer
    
    def create_maxpool(self, input, name):
        layer = tf.layers.max_pooling2d(
                    inputs=input,
                    pool_size=[2,2],
                    strides=2,
                    name="MaxPool_" + name)
        return layer

    def create_dropout(self, input, name, is_training):
        layer = tf.layers.dropout(
                    inputs=input,
                    rate=dropout_rate,
                    name="DropOut_" + name,
                    training=is_training)
        return layer

    def create_dense(self, input, units, name):
        layer = tf.layers.dense(
                inputs=input,
                units=units,
                name="Dense" + name,
                )
        layer = tf.layers.batch_normalization(
                inputs=layer,
                momentum=0,
                epsilon=1e-8,
                training=True,
                name="BatchNorm_" + name,
        )
        layer = tf.nn.leaky_relu(layer, name="LeakyRELU_" + name)
        return layer

    def discriminate(self, image, training):
        h1 = self.create_conv2d(image, self.dim_W3, 5, "Layer1")
        h1 = self.create_maxpool(h1, "Layer1")

        h2 = self.create_conv2d(h1, self.dim_W2, 5, "Layer2")
        h2 = self.create_maxpool(h2, "Layer2")
        h2 = tf.reshape(h2, (-1, self.dim_W2 * 7 * 7))

        h3 = self.create_dense(h2, self.dim_W1, "Layer3")
        h3 = self.create_dropout(h3, "Layer3", training)
        
        h4 = self.create_dense(h3, self.classes, "Layer4")
        return h4

And now we can instantiate it and create our optimizer. We take the opportunity to create our two objects to save the Tensorflow graph, Saver and builder.

In [None]:
tf.reset_default_graph()

cnn_model = CNN(
        image_shape=image_shape,
        dim_W1=dim_W1,
        dim_W2=dim_W2,
        dim_W3=dim_W3,
        )
image_tf, Y_tf, cost_tf, accuracy_tf, output_tf, training_tf = cnn_model.build_model()
saver = tf.train.Saver(max_to_keep=10)

train_step = tf.train.AdamOptimizer(learning_rate, beta1=0.5).minimize(cost_tf)
builder = tf.saved_model.builder.SavedModelBuilder(export_dir)

This is a helper function that computes the global loss for the training and the testing data.
It will be used for each epoch, but in real life, you should "trust" the partial loss instead, as this value is very costly to compute.

In [None]:
accuracy_vec = []

def show_train(sess, epoch):
    traccuracy = []
    teaccuracy = []
    for j in range(0, len(X_train), batch_size):
        Xs = X_train[j:j+batch_size]
        Ys = y_train[j:j+batch_size]
        traccuracy.append(sess.run(accuracy_tf,
                feed_dict={
                    training_tf: False,
                    Y_tf: Ys,
                    image_tf: Xs
                    }))
    for j in range(0, len(X_test), batch_size):
        Xs = X_test[j:j+batch_size]
        Ys = y_test[j:j+batch_size]
        teaccuracy.append(sess.run(accuracy_tf,
                feed_dict={
                    training_tf: False,
                    Y_tf: Ys,
                    image_tf: Xs,
                    }))
    train_accuracy = np.mean(traccuracy)
    test_accuracy = np.mean(teaccuracy)
    accuracy_vec.append((train_accuracy, test_accuracy))
    
    result = sess.run(output_tf,
                feed_dict={
                    training_tf: False,
                    image_tf: X_test[:10]
                    })
    
    print('Epoch #%i\n  train accuracy = %f\n  test accuracy = %f' % (epoch, train_accuracy, test_accuracy))
    print('Result for the 10 first training images: %s' % np.argmax(result, axis=1))
    print('Reference for the 10 first training images: %s' % y_test[:10])

Let's train our model and save it.

In [None]:
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    show_train(sess, -1)
    for epoch in range(n_epochs):
        permut = np.random.permutation(len(X_train))

        print("epoch: %i" % epoch)
        for j in range(0, len(X_train), batch_size):
            if j % step == 0:
                print("  batch: %i" % j)

            batch = permut[j:j+batch_size]
            Xs = X_train[batch]
            Ys = y_train[batch]

            sess.run(train_step,
                    feed_dict={
                        training_tf: True,
                        Y_tf: Ys,
                        image_tf: Xs
                        })
            if j % step == 0:
                temp_cost, temp_prec = sess.run([cost_tf, accuracy_tf],
                    feed_dict={
                        training_tf: False,
                        Y_tf: Ys,
                        image_tf: Xs
                        })
                print("    cost: %f\n    prec: %f" % (temp_cost, temp_prec))
        saver.save(sess, './classifier', global_step=epoch)
        show_train(sess, epoch)
    saver.save(sess, './classifier-final')
    builder.add_meta_graph_and_variables(sess,
                                       [tf.saved_model.tag_constants.TRAINING])
builder.save()

We can check the global training and testing cost, as we created a function to compute it.

In [None]:
from matplotlib import pyplot as plt
%matplotlib inline

accuracy = np.array(accuracy_vec)
plt.semilogy(1 - accuracy[:,0], 'k-', label="train")
plt.semilogy(1 - accuracy[:,1], 'r-', label="test")
plt.title('Classification error per Epoch')
plt.xlabel('Epoch')
plt.ylabel('Classification error')
plt.legend()

We now check that Saver allowed to properly save and restore the network.

In [None]:
tf.reset_default_graph()
new_saver = tf.train.import_meta_graph("classifier-final.meta")  

with tf.Session() as sess:  
    new_saver.restore(sess, tf.train.latest_checkpoint('./'))

    graph = tf.get_default_graph()
    training_tf = graph.get_tensor_by_name('is_training:0')
    Y_tf = graph.get_tensor_by_name('label:0')
    image_tf = graph.get_tensor_by_name('image:0')
    accuracy_tf = graph.get_tensor_by_name('accuracy:0')
    output_tf = graph.get_tensor_by_name('LeakyRELU_Layer4/Maximum:0')
    
    show_train(sess, 0)

And the same for builder.

In [None]:
tf.reset_default_graph()
with tf.Session() as sess:  
    tf.saved_model.loader.load(sess, [tf.saved_model.tag_constants.TRAINING], export_dir)

    graph = tf.get_default_graph()
    training_tf = graph.get_tensor_by_name('is_training:0')
    Y_tf = graph.get_tensor_by_name('label:0')
    image_tf = graph.get_tensor_by_name('image:0')
    accuracy_tf = graph.get_tensor_by_name('accuracy:0')
    output_tf = graph.get_tensor_by_name('LeakyRELU_Layer4/Maximum:0')

    show_train(sess, 0)

# Test prediction with LTSMs

LSTMs are good tools to predict new values in a sequence. Can they predict text from Aesop's fables?

In [None]:
text="""A slave named Androcles once escaped from his master and fled to the forest. As he was wandering about there he came upon a Lion lying down moaning and groaning. At first he turned to flee, but finding that the Lion did not pursue him, he turned back and went up to him.
As he came near, the Lion put out his paw, which was all swollen and bleeding, and Androcles found that a huge thorn had got into it, and was causing all the pain. He pulled out the thorn and bound up the paw of the Lion, who was soon able to rise and lick the hand of Androcles like a dog. Then the Lion took Androcles to his cave, and every day used to bring him meat from which to live.
But shortly afterwards both Androcles and the Lion were captured, and the slave was sentenced to be thrown to the Lion, after the latter had been kept without food for several days. The Emperor and all his Court came to see the spectacle, and Androcles was led out into the middle of the arena. Soon the Lion was let loose from his den, and rushed bounding and roaring towards his victim.
But as soon as he came near to Androcles he recognised his friend, and fawned upon him, and licked his hands like a friendly dog. The Emperor, surprised at this, summoned Androcles to him, who told him the whole story. Whereupon the slave was pardoned and freed, and the Lion let loose to his native forest."""

We know remove commas and points and then split the text by words.

In [None]:
training_data = text.lower().replace(",", "").replace(".", "").split()

Python itsef has a module to count words:

In [None]:
import collections

def build_dataset(words):
    count = collections.Counter(words).most_common()
    dictionary = dict()
    for word, _ in count:
        dictionary[word] = len(dictionary)
    reverse_dictionary = dict(zip(dictionary.values(), dictionary.keys()))
    return dictionary, reverse_dictionary

dictionary, reverse_dictionary = build_dataset(training_data)
training_data_args = [dictionary[word] for word in training_data]

Our RNN will be a simple LSTM layer and then a dense layer to specify the word it selected. The input will be split so that we get several elements each time (here 3 words).

In [None]:
import tensorflow as tf
from tensorflow.contrib import rnn

def RNN(x):
    # Generate a n_input-element sequence of inputs
    # (eg. [had] [a] [general] -> [20] [6] [33])
    x = tf.split(x,n_input,1)

    # 1-layer LSTM with n_hidden units.
    rnn_cell = rnn.BasicLSTMCell(n_hidden)

    # generate prediction
    outputs, states = rnn.static_rnn(rnn_cell, x, dtype=tf.float32)

    # there are n_input outputs but we only want the last output
    return tf.layers.dense(inputs = outputs[-1], units = vocab_size)

Let's add our traditional hyper parameters:

In [None]:
import random
import numpy as np

tf.reset_default_graph()

vocab_size = len(dictionary)

# Parameters
learning_rate = 0.001
training_iters = 50000
display_step = 1000
# number of inputs (past words that we use)
n_input = 3
# number of units in the RNN cell
n_hidden = 512

# tf Graph input
x = tf.placeholder(tf.float32, [None, n_input])
y = tf.placeholder(tf.int64, [None])

And now the functions to optimize and our prediction functions as well. As for the MNIST CNN, we use sparse_softmax_cross_entropy_with_logits because we only want one word.

In [None]:
pred = RNN(x)

cost = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=pred, labels=y))
optimizer = tf.train.RMSPropOptimizer(learning_rate=learning_rate).minimize(cost)

correct_pred = tf.equal(tf.argmax(pred,1), y)
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

This train loop is a little bit different than the previous ones, as it does one sample at a time, and then we average the loss and the accuracy before we display it.

In [None]:
with tf.Session() as session:
    session.run(tf.global_variables_initializer())
    
    step = 0
    offset = random.randint(0,n_input+1)
    end_offset = n_input + 1
    acc_total = 0
    loss_total = 0

    while step < training_iters:
        # Batch with just one sample. Add some randomness on selection process.
        if offset > (len(training_data)-end_offset):
            offset = random.randint(0, n_input+1)

        symbols_in_keys = [ [training_data_args[i]] for i in range(offset, offset+n_input) ]
        symbols_in_keys = np.reshape(np.array(symbols_in_keys), [1, n_input])

        symbols_out_onehot = [training_data_args[offset+n_input]]

        _, acc, loss, onehot_pred = session.run([optimizer, accuracy, cost, pred], \
                                                feed_dict={x: symbols_in_keys, y: symbols_out_onehot})
        loss_total += loss
        acc_total += acc
        if (step+1) % display_step == 0:
            print("Iter= %i , Average Loss= %.6f, Average Accuracy= %.2f%%" % (step+1, loss_total/display_step, 100*acc_total/display_step))
            acc_total = 0
            loss_total = 0
            symbols_in = [training_data[i] for i in range(offset, offset + n_input)]
            symbols_out = training_data[offset + n_input]
            symbols_out_pred = reverse_dictionary[np.argmax(onehot_pred, axis=1)[0]]
            print("%s - [%s] vs [%s]" % (symbols_in, symbols_out, symbols_out_pred))
        step += 1
        offset += (n_input+1)

# Classification with LSTM

We start this time with hyperparameters because the way we reshape our images depends on our network archtecture.

In [None]:
import tensorflow as tf
from tensorflow.contrib import rnn

tf.reset_default_graph()

#rows of 28 pixels
n_input=28
#unrolled through 28 time steps (our images are (28,28))
time_steps=28

#hidden LSTM units
num_units=128

#learning rate for adam
learning_rate=0.001
n_classes=10
batch_size=128

n_epochs = 10
step = 100

Let's go back to our data:

In [None]:
import os
import numpy as np

from sklearn.datasets import fetch_mldata
from sklearn.model_selection import train_test_split
mnist = fetch_mldata('MNIST original')
mnist.data = mnist.data.astype(np.float32).reshape( [-1, time_steps, n_input]) / 255.
mnist.num_examples = len(mnist.data)
mnist.labels = mnist.target.astype(np.int8)

X_train, X_test, y_train, y_test = train_test_split(mnist.data, mnist.labels, test_size=(1. / 7.))

This is the network we will use (we don't store it in a class this time)

In [None]:
x=tf.placeholder(tf.float32,[None,time_steps,n_input])
y=tf.placeholder(tf.int64,[None])

#processing the input tensor from [batch_size,n_steps,n_input] to "time_steps" number of [batch_size,n_input] tensors
input=tf.unstack(x ,time_steps,1)

lstm_layer=rnn.BasicLSTMCell(num_units,forget_bias=True)
outputs,_=rnn.static_rnn(lstm_layer,input,dtype=tf.float32)

prediction=tf.layers.dense(inputs=outputs[-1], units = n_classes)

loss=tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=prediction,labels=y))
opt=tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss)

correct_prediction=tf.equal(tf.argmax(prediction,1),y)
accuracy=tf.reduce_mean(tf.cast(correct_prediction,tf.float32))

Here we go for the training:

In [None]:
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    for epoch in range(n_epochs):
        permut = np.random.permutation(len(X_train))
        print("epoch: %i" % epoch)
        for j in range(0, len(X_train), batch_size):
            if j % step == 0:
                print("  batch: %i" % j)

            batch = permut[j:j+batch_size]
            Xs = X_train[batch]
            Ys = y_train[batch]

            sess.run(opt, feed_dict={x: Xs, y: Ys})

            if j % step == 0:
                acc=sess.run(accuracy,feed_dict={x:Xs,y:Ys})
                los=sess.run(loss,feed_dict={x:Xs,y:Ys})
                print("  accuracy %f" % acc)
                print("  loss %f" % los)
                print("")