#### Set up a CNN for image recognition
Images are rectangles in the foreground and various backgrounds as noise. The goal is to determine the orientation of the image, horizontal or vertical

In [1]:
import pandas as pd
import numpy as np
from numpy import random
import tensorflow as tf
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
import time

IMAGE_HEIGHT  = 28
IMAGE_WIDTH   = 28
CHANNELS      = 1
NUM_CLASSES   = 2                       # Vertical or Horizontal alignment
BATCH_SIZE    = 128
EPOCHS        = 2
LEARNING_RATE = .0003
DROPOUT       = True                   # Toggle to dropout or not
DROPOUT_KEEP  = 0.6
POOL1         = False
POOL2         = True
TEST_PCT      = 0.15
VALID_PCT     = 0.15                    # Validation is 20% of remaining (after Test)
DATA_DIR      = '/home/tom/data/'
FILENAME      = 'rectangles-images.txt'
SUMMARIES_DIR = '/home/tom/tflogs'     # where to store Summary data

In [2]:
# Both the data and the label are in the passed dataframe
# Split them apart and put into numpy array
def split_label(df):
    labels = np.array(df['label'], dtype='f')
    # convert labels to a "one-hot" vector
    labels = (np.arange(NUM_CLASSES) == labels[:, None]).astype(np.float32)

    data = df.iloc[:,:len(df.columns)-1]
    # Turn "data" into a np array which will be used to load the tensor
    data = np.array(data, dtype='f')
    data = np.reshape(data, [len(df),IMAGE_HEIGHT,IMAGE_WIDTH,CHANNELS])
    return data, labels

In [3]:
# Data consists of images of rectangles, 28x28, single channel
df = pd.read_csv(DATA_DIR + FILENAME)
print('data has {:,} rows and {} columns'.format(df.shape[0], df.shape[1]))

data has 62,000 rows and 785 columns


In [4]:
# Split incoming data into Train/Validation/Test
df_train, df_test = train_test_split(df, test_size=TEST_PCT)
df_train, df_val  = train_test_split(df_train, test_size=VALID_PCT)
print('Training:    {:,} rows'.format(df_train.shape[0]))
print('Validation:  {:,} rows'.format(df_val.shape[0]))
print('Test:        {:,} rows'.format(df_test.shape[0]))

Training:    44,795 rows
Validation:  7,905 rows
Test:        9,300 rows


In [5]:
# Split the features from the labels
train_data, train_labels = split_label(df_train)
val_data,   val_labels = split_label(df_val)
test_data,  test_labels  = split_label(df_test)

In [6]:
image_batch = tf.placeholder(tf.float32, shape=[None,IMAGE_HEIGHT,IMAGE_WIDTH,CHANNELS])
label_batch = tf.placeholder(tf.float32, shape=[None,NUM_CLASSES])

In [7]:
def conv_layer(img, size, depth, filters, strides):
    weight = tf.Variable(tf.truncated_normal(shape=[size, size, depth, filters], stddev=0.3))
    bias   = tf.Variable(tf.truncated_normal([filters]))
    print(img.get_shape())
    print(weight.get_shape())
    conv   = tf.nn.conv2d(input=img,
                          filter=weight,
                          strides=[1,strides,strides,1],
                          padding='SAME')
    print(conv.get_shape())
    conv = tf.nn.bias_add(conv, bias)
    return tf.nn.relu(conv)

In [8]:
L1size    = 3            # size of a filter
L1depth   = CHANNELS
L1filters = 32           # number of filters
L1strides = 1
l1_out        = conv_layer(image_batch, L1size, L1depth, L1filters, L1strides)
if POOL1:
    l1_out    = tf.nn.max_pool(value=l1_out,
                           ksize=[1,2,2,1],
                           strides=[1,2,2,1],
                           padding='SAME')

(?, 28, 28, 1)
(3, 3, 1, 32)
(?, 28, 28, 32)


In [9]:
L2size    = 3            # size of a filter
L2depth   = L1filters
L2filters = 64           # number of filters
L2strides = 1
l2_out    = conv_layer(l1_out, L2size, L2depth, L2filters, L2strides)
if POOL2:
    l2_out = tf.nn.max_pool(value=l2_out,
                            ksize=[1,2,2,1],
                            strides=[1,2,2,1],
                            padding='SAME')

(?, 28, 28, 32)
(3, 3, 32, 64)
(?, 28, 28, 64)


In [10]:
# Reshape to allow conversion to Fully Connected layer
L2_flat = tf.reshape(l2_out, [-1, 14*14*L2filters])

FCw    = tf.Variable(tf.truncated_normal(shape=[14*14*L2filters,32], stddev=0.3))
FCb    = tf.Variable(tf.truncated_normal([32]))
fc      = tf.matmul(L2_flat, FCw) + FCb
fc_out  = tf.nn.relu(fc)

In [11]:
OLw    = tf.Variable(tf.truncated_normal(shape=[32,NUM_CLASSES], stddev=0.3))
OLb    = tf.Variable(tf.truncated_normal([NUM_CLASSES]))
ol     = tf.matmul(fc_out, OLw) + OLb

keep_prob = tf.placeholder(tf.float32)

if DROPOUT:
    ol = tf.nn.dropout(ol, keep_prob)
    
loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(ol, label_batch))
optimize = tf.train.AdamOptimizer(LEARNING_RATE).minimize(loss)

correct_prediction = tf.equal(tf.argmax(ol,1), tf.argmax(label_batch,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, 'float'))

In [12]:
# Capture Tensorboard data
summary_loss = tf.scalar_summary('Loss function', loss)
summary_accuracy = tf.scalar_summary('accuracy', accuracy)

num_training_batches = int(len(train_data) / BATCH_SIZE)

start = time.time()
count = 0                           # For Tensorboard

with tf.Session() as sess:
    sess.run(tf.initialize_all_variables())
    train_writer = tf.train.SummaryWriter(SUMMARIES_DIR + '/train/'+'LearnRate', sess.graph)
    test_writer = tf.train.SummaryWriter(SUMMARIES_DIR + '/test/'+'LearnRate', sess.graph)
    for i in range(EPOCHS):
        x,y = shuffle(train_data,train_labels)
        print('Epoch {}'.format(i+1))
        for j in range(num_training_batches):
            x_mini = x[j*BATCH_SIZE:j*BATCH_SIZE+BATCH_SIZE]
            y_mini = y[j*BATCH_SIZE:j*BATCH_SIZE+BATCH_SIZE]
            _, tb = sess.run([optimize, summary_loss], feed_dict={
                image_batch:x_mini,
                label_batch:y_mini,
                keep_prob:DROPOUT_KEEP})
            if j % 20 ==0:      
                count += 1                
                train_writer.add_summary(tb, count)
                _, tb = sess.run([accuracy, summary_accuracy], feed_dict={
                        image_batch:val_data,
                        label_batch:val_labels,
                        keep_prob:1.0})
                test_writer.add_summary(tb, count)
        !aplay /usr/share/sounds/chime_down.wav
    # Training is done, run the test
    score = sess.run(accuracy, feed_dict={
                image_batch:test_data,
                label_batch:test_labels,
                keep_prob:1.0})
    print('Accuracy against test set: {:.1%}'.format(score))
train_writer.close()
test_writer.close()
print('Elapsed time: {} minutes'.format((time.time() - start)//60))
!aplay /usr/share/sounds/bicycle_bell.wav

Epoch 1
Playing WAVE '/usr/share/sounds/chime_down.wav' : Unsigned 8 bit, Rate 11025 Hz, Mono
Epoch 2
Playing WAVE '/usr/share/sounds/chime_down.wav' : Unsigned 8 bit, Rate 11025 Hz, Mono
Accuracy against test set: 57.2%
Elapsed time: 26.0 minutes
Playing WAVE '/usr/share/sounds/bicycle_bell.wav' : Signed 16 bit Little Endian, Rate 11127 Hz, Mono
