# Using a Convolutional Neural Network (CNN) to Classify Images

Source code here:
https://github.com/wwoo/tf_box_classify

This is a simple example demonstrating how to classify images from a webcam using TensorFlow.  The model is a convolutional neural network (CNN) that classifies images of a box into four classes (upright, tilted, open and spilled).

In [None]:
import tensorflow as tf
import numpy as np
import os
import time

from IPython.display import display, Image

In [None]:
# hyper parameters to use for training
TRAIN_BATCH_SIZE = 10
TRAIN_EPOCHS = 5
VALID_BATCH_SIZE = 40
VALID_EPOCHS = None
SHUFFLE_BATCHES = True
LEARNING_RATE = 0.01
NUM_CLASSES = 4
KEEP_PROB = 0.75

In [None]:
# image parameters
IMAGE_SIZE = 150
IMAGE_RESIZE_FACTOR = 1
IMAGE_CHANNELS = 1

Let's see a sample of our training images.

In [None]:
display(Image(filename='../images/normal_processed/normal-0.jpg'))
display(Image(filename='../images/normal_processed/normal-5.jpg'))
display(Image(filename='../images/normal_processed/normal-10.jpg'))
display(Image(filename='../images/normal_processed/normal-15.jpg'))

Let's have a look at an excerpt of our labeled training data:

In [None]:
with open('../train.txt', 'r') as f:
    for i in range(0, 20):
        print(f.readline().rstrip())

Our validation dataset is similarly labeled.

In [None]:
with open('../valid.txt', 'r') as f:
    for i in range(0, 20):
        print(f.readline().rstrip())

We're going to use TensorFlow input queues to read our training and validation data.  Labeled training data is read from `train.txt`, and validation data from `valid.txt`.

`get_batch_inputs` to retrieve the next `TRAIN_BATCH_SIZE` batch of labeled data during training and validation. 

Assuming images are 150 x 150 pixels (single channel / grayscale) and using a batch size of 10, `get_batch_inputs` returns two values:

1. A 4-D tensor of shape [150, 150, 1, 10], which is a batch training images
2. A 2-D tensor of shape [4, 10], which is the corresponding batch of labels represented as "one-hot" vectors

In [None]:
def get_batch_inputs(train_file, batch_size=TRAIN_BATCH_SIZE, num_epochs=TRAIN_EPOCHS):
    image_list, label_list = get_image_label_list(train_file)
    input_queue = tf.train.slice_input_producer([image_list, label_list],
        num_epochs=num_epochs, shuffle=SHUFFLE_BATCHES)
    image, label = read_image_from_disk(input_queue)
    image = tf.reshape(image, [IMAGE_SIZE, IMAGE_SIZE, IMAGE_CHANNELS])
    image_batch, label_batch = tf.train.batch([image, label],
        batch_size=batch_size)

    return preprocess_images(image_batch), tf.one_hot(tf.to_int64(label_batch),
        NUM_CLASSES, on_value=1.0, off_value=0.0)  

We'll also define a few helper functions called by `get_batch_inputs`: 

`get_image_label_list` to read our text files into lists. 

`preprocess_images` to resize our images and convert them to grayscale.

`read_image_from_disk` to convert JPEG images on disk to tensors.

In [None]:
def read_image_from_disk(input_queue):
    label = input_queue[1]
    file_contents = tf.read_file(input_queue[0])
    rgb_image = tf.image.decode_jpeg(file_contents, channels=IMAGE_CHANNELS, name="decode_jpeg")  
    
    return rgb_image, label


def preprocess_images(image_batch, resize_factor=IMAGE_RESIZE_FACTOR):
    gs_image_batch = tf.image.rgb_to_grayscale(image_batch, name="rgb_to_grayscale")
    new_image_size = int(round(IMAGE_SIZE / resize_factor))

    return tf.image.resize_images(image_batch, new_image_size, new_image_size)

def get_image_label_list(image_label_file):
    filenames = []
    labels = []
    for line in open(image_label_file, "r"):
        filename, label = line[:-1].split(' ')
        filenames.append(filename)
        labels.append(int(label))

    print "get_image_label_list: read " + str(len(filenames)) + " items."
    
    return filenames, labels 

Define some helper functions for convolution and max pooling.

`conv2d` combines a convolution with a stride of 1. We're using `padding='SAME'`, so the input and output tensor will the same shape. We then apply the ReLU activation function to the output tensor.

`maxpool2d` performs max pool sampling, which will half the size of the input tensor.


In [None]:
def conv2d(x, W, b, strides=1):
    # Conv2D wrapper, with bias and relu activation
    x = tf.nn.conv2d(x, W, strides=[1, strides, strides, 1], padding='SAME')
    x = tf.nn.bias_add(x, b)
    return tf.nn.relu(x)

def maxpool2d(x, k=2):
    # MaxPool2D wrapper
    return tf.nn.max_pool(x, ksize=[1, k, k, 1], strides=[1, k, k, 1], padding='SAME')

`conv_net` builds most of the CNN specific parts of our TensorFlow graph. 

Each layer of the CNN performs a convolution, ReLU activation and down sampling, which is then fed as input to the next CNN layer.

In [None]:
def conv_net(x, weights, biases, image_size, keep_prob=KEEP_PROB):
    x = tf.reshape(x, shape=[-1, image_size, image_size, 1])

    # Convolution and max pooling layers
    # Each max pooling layer reduces dimensionality by 2

    with tf.name_scope('layer1'):
        # Convolution and max pooling layer 1
        conv1 = conv2d(x, weights['wc1'], biases['bc1'])
        conv1 = maxpool2d(conv1, k=2)

    with tf.name_scope('layer2'):
        # Convolution and max pooling layer 2
        conv2 = conv2d(conv1, weights['wc2'], biases['bc2'])
        conv2 = maxpool2d(conv2, k=2)

    with tf.name_scope('layer3'):
        # Convolution and max pooling layer 3
        conv3 = conv2d(conv2, weights['wc3'], biases['bc3'])
        conv3 = maxpool2d(conv3, k=2)

    with tf.name_scope('layer4'):
        # Convolution and max pooling layer 4
        conv4 = conv2d(conv3, weights['wc4'], biases['bc4'])
        conv4 = maxpool2d(conv4, k=2)

    with tf.name_scope('fully_connected'):
        # Fully-connected layer
        fc1 = tf.reshape(conv4, [-1, weights['wd1'].get_shape().as_list()[0]])
        fc1 = tf.add(tf.matmul(fc1, weights['wd1']), biases['bd1'])
        fc1 = tf.nn.relu(fc1)

        # Apply dropout
        fc1 = tf.nn.dropout(fc1, keep_prob)

    with tf.name_scope('output'):
        # Output, class prediction
        out = tf.add(tf.matmul(fc1, weights['out']), biases['out'])

    return out

Purely for debugging and educational purposes, generate a summary of (one) first layer convolution and activation.  These can be viewed in Tensorboard, which is a tool for visualising TensorFlow graphs.

In [None]:
def generate_image_summary(x, weights, biases, step, image_size=IMAGE_SIZE):
    x = tf.reshape(x, shape=[-1, image_size, image_size, 1])

    with tf.name_scope('generate_image_summary'):
        x = tf.nn.conv2d(x, weights['wc1'], strides=[1, 1, 1, 1], padding='SAME')
        x = tf.nn.bias_add(x, biases['bc1'])
        x_slice = tf.slice(x, [0, 0, 0, 0], [TRAIN_BATCH_SIZE, image_size, image_size, 1])
        conv_summary = tf.image_summary("img_conv_{:05d}".format(step), x_slice, max_images=1)
        relu_summary = tf.image_summary("img_relu_{:05d}".format(step), tf.nn.relu(x_slice), max_images=1)

    return conv_summary, relu_summary

With all our helper functions defined, we're ready to start building our TensorFlow graph.

Firstly, create input nodes to retrieve image batches. We'll define two batches:

`train_image_batch` and `train_label_batch` to get the next batch of training images and corresponding labels.  These are fed into `x_` and `y_` placeholders for each step of training.

In [None]:
# Read inventory of training images and labels
with tf.name_scope('batch_inputs'):
    train_file = "../train.txt"
    valid_file = "../valid.txt"

    train_image_batch, train_label_batch = get_batch_inputs(train_file, batch_size=TRAIN_BATCH_SIZE, 
                                                            num_epochs=TRAIN_EPOCHS)
    
    valid_image_batch, valid_label_batch = get_batch_inputs(valid_file, batch_size=VALID_BATCH_SIZE, 
                                                            num_epochs=VALID_EPOCHS)
    
    image_size = IMAGE_SIZE / IMAGE_RESIZE_FACTOR
    
    # These are image and label batch placeholders which we'll feed in during training
    x_ = tf.placeholder("float32", shape=[None, image_size, image_size, IMAGE_CHANNELS])
    y_ = tf.placeholder("float32", shape=[None, NUM_CLASSES])

Define weights and biases for each of our convolution layers:

In [None]:
# Store weights for our convolution & fully-connected layers
with tf.name_scope('weights'):
    weights = {
        # 5x5 conv, 1 input, 32 outputs
        'wc1': tf.Variable(tf.random_normal([5, 5, 1, 32])),
        # 5x5 conv, 32 inputs, 64 outputs
        'wc2': tf.Variable(tf.random_normal([5, 5, 32, 64])),
        # 5x5 conv, 64 inputs, 128 outputs
        'wc3': tf.Variable(tf.random_normal([5, 5, 64, 128])),
        # 5x5 conv, 128 inputs, 256 outputs
        'wc4': tf.Variable(tf.random_normal([5, 5, 128, 256])),
        # fully connected, 10*10*256 inputs, 1024 outputs
        'wd1': tf.Variable(tf.random_normal([10*10*256, 1024])),
        # 1024 inputs, 4 class labels (prediction)
        'out': tf.Variable(tf.random_normal([1024, NUM_CLASSES]))
    }
    
# Store biases for our convolution and fully-connected layers
with tf.name_scope('biases'):
    biases = {
        'bc1': tf.Variable(tf.random_normal([32])),
        'bc2': tf.Variable(tf.random_normal([64])),
        'bc3': tf.Variable(tf.random_normal([128])),
        'bc4': tf.Variable(tf.random_normal([256])),
        'bd1': tf.Variable(tf.random_normal([1024])),
        'out': tf.Variable(tf.random_normal([NUM_CLASSES]))
    }    

Dropping some of our activations will help prevent over-fitting on our training data.

In [None]:
# Define dropout rate to prevent overfitting
keep_prob = tf.placeholder(tf.float32)

Now that we've defined our weights, biases and dropout probability, we can call `conv_net` to run our current batch input through the convolution, maxpool and fully-connected layers.

The output from the fully-connected layer is a set of probability distributions across our `NUM_CLASSES`.

In [None]:
# Build our graph
pred = conv_net(x_, weights, biases, image_size, keep_prob)

We now measure the "loss" between our predicted probability distributions and the true distributions from our one-hot vectors.

In [None]:
# Calculate loss
with tf.name_scope('cross_entropy'):
    # Define loss and optimizer
    cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(pred, y_))
    cost_summary = tf.scalar_summary("cost_summary", cost)

Next, we define a training step to minimise the loss between the predicted and true distribution.  TensorFlow keeps track of the whole graph and performs auto-differentation at each step.

In [None]:
# Run optimizer step
with tf.name_scope('train'):
    train_step = tf.train.AdamOptimizer(learning_rate=LEARNING_RATE).minimize(cost)

Optionally, we'll define some steps to check the accuracy of our training.  We'll call this every n steps and write out a summary for visualisation via Tensorboard.

In [None]:
# Evaluate model accuracy
with tf.name_scope('predict'):
    correct_pred = tf.equal(tf.argmax(pred, 1), tf.argmax(y_, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
    accuracy_summary = tf.scalar_summary("accuracy_summary", accuracy)
    w_summary = tf.histogram_summary("weights", weights['wc1'])
    b_summary = tf.histogram_summary("biases", biases['bc1'])

At this point, we've finished defining what our graph looks like.  To start running the graph we've built, we need to create a TensorFlow session and initialize the variables in our graph:

In [None]:
sess = tf.Session()

writer = tf.train.SummaryWriter("./logs", sess.graph)

init_op = tf.initialize_all_variables()    
# we need init_local_op step only on tensorflow 0.10rc due to a regression from 0.9
# https://github.com/tensorflow/models/pull/297
init_local_op = tf.initialize_local_variables()

We're using TensorFlow input queues to feed our graph with each new batch of training data, until all the data is considered.

Each image in the training set is used to train our model `TRAIN_EPOCHS` times.  The input queue runner retrieves the next batch of training images and labels, feeds these into the `x_` and `y_` placeholders.  We then run `train_step` to execute one step of our training.  Since TensorFlow has a complete view of our graph dependencies, it runs all the previous dependent nodes of the graph before `train_step`.

In [None]:
with sess.as_default():
    
    sess.run(init_op)
    sess.run(init_local_op) # we need this only with tensorflow 0.10rc
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
    step = 0
    
    print(" == Start training == ")
    
    try:
        while not coord.should_stop():
            step += 1
            x, y = sess.run([train_image_batch, train_label_batch])
            train_step.run(feed_dict={keep_prob: 0.75, x_: x, y_: y})

            if step % TRAIN_BATCH_SIZE == 0:
                x, y = sess.run([valid_image_batch, valid_label_batch])
                conv_summary, relu_summary = generate_image_summary(x_, weights, biases, step, image_size)
                result = sess.run([cost_summary, accuracy_summary, accuracy, conv_summary, relu_summary, w_summary, 
                                   b_summary], feed_dict={keep_prob: 1.0, x_: x, y_: y})

                cost_summary_str = result[0]
                accuracy_summary_str = result[1]
                acc = result[2]
                conv_summary_str = result[3]
                relu_summary_str = result[4]
                w_summary_str = result[5]
                b_summary_str = result[6]

                # write summaries for viewing in Tensorboard
                writer.add_summary(accuracy_summary_str, step)
                writer.add_summary(cost_summary_str, step)
                writer.add_summary(conv_summary_str, step)
                writer.add_summary(relu_summary_str, step)
                writer.add_summary(w_summary_str, step)
                writer.add_summary(b_summary_str, step)

                print("Accuracy at step %s: %s" % (step, acc))    
                    
    except tf.errors.OutOfRangeError:
        x, y = sess.run([valid_image_batch, valid_label_batch])
        result = sess.run([accuracy], feed_dict={keep_prob: 1.0, x_: x, y_: y})
        
        print("Validation accuracy: %s" % result[0])

    finally:
        coord.request_stop()
        coord.join(threads)
        sess.close()    
        
print("You done! (☞ﾟヮﾟ)☞")