<h2 style="text-align:center">LeNet in TensorFlow</h2>

In [1]:
import warnings
warnings.filterwarnings("ignore")

import os
import cv2
import pickle
import numpy as np
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split

import tensorflow as tf

tf.reset_default_graph()
experiment_name = "LeNet_exp"

<h3>Load data</h3>

In [2]:
def unpickle(file):
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

In [3]:
def load_cifar10(data_path):

    train_data = None
    train_labels = []
    test_data = None
    test_labels = None


    for i in range(1, 6):
        data_dict = unpickle(data_path + "data_batch_" + str(i))
        if (i == 1):
            train_data = data_dict[b'data']
        else:
            train_data = np.vstack((train_data, data_dict[b'data']))
        train_labels += data_dict[b'labels']

    test_data_dict = unpickle(data_path + "test_batch")
    test_data = test_data_dict[b'data']
    test_labels = test_data_dict[b'labels']

    train_data = train_data.reshape((50000, 3, 32, 32))
    train_data = np.rollaxis(train_data, 1, 4)
    train_labels = np.array(train_labels)
    
    test_data = test_data.reshape((10000, 3, 32, 32))
    test_data = np.rollaxis(test_data, 1, 4)
    test_labels = np.array(test_labels)
    
    return train_data, train_labels, test_data, test_labels

In [4]:
data_path = "../../cifar-10-batches-py/"
train_data, train_labels, test_data, test_labels = load_cifar10(data_path)

In [None]:
print("train_data.shape:", train_data.shape, "train_labels.shape:", train_labels.shape)
print("test_data.shape:", test_data.shape, "test_labels.shape:", test_labels.shape)

train_data.shape: (50000, 32, 32, 3) train_labels.shape: (50000,)
test_data.shape: (10000, 32, 32, 3) test_labels.shape: (10000,)


<h3>Data preprocessing</h3>

In [None]:
def scale_data(data):
    """ 
    Scale the row pixel intensities to the range [0, 1]
    """
    data = data.astype(np.float32) / 255.0
    return data

train_data = scale_data(train_data)
test_data = scale_data(test_data)

In [None]:
def convert_grayscale(data_image):
    """
    Convert image to grayscale
    """
    output = np.zeros((data_image.shape[:-1]))
    for i, image in enumerate(data_image):
        gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        output[i] = gray_image

    return output.reshape(-1, 32, 32, 1)
    
train_data = convert_grayscale(train_data)
test_data = convert_grayscale(test_data)

print("train_data.shape: ", train_data.shape)
print("test_data.shape: ", test_data.shape)

In [None]:
def one_hot_labels(label_data, num_classes=10):
    """
    One hot encode the labels
    """
    label_data = np.eye(num_classes)[label_data.reshape(-1)]
    
    return label_data

train_labels = one_hot_labels(train_labels)
test_labels = one_hot_labels(test_labels)

print(train_labels[:5])

<h3>Train/validation split</h3>

In [None]:
X_train, X_valid, y_train, y_valid = train_test_split(train_data, train_labels, \
                                                      test_size=0.1, \
                                                      random_state=42)

print("X_train.shape:", X_train.shape, "y_train.shape", y_train.shape)
print("X_valid.shape:", X_valid.shape, "y_valid.shape", y_valid.shape)

<h3>LeNet model</h3>

In [None]:
class LeNet():
    def __init__(self):
        pass

    def forward(self, input_image, name="LeNet"):
        """ Forward pass of the network """
        with tf.name_scope(name):
            # 1st conv layer : CONV + TANH + AVERAGE POOL 
            conv_1 = self.conv2d(input_image, num_input_channels=1, num_filters=6, \
                filter_shape=[5, 5], strides=[1, 1], name="conv1")
            avgpool_1 = self.avgpool2d(conv_1, filter_shape=[2, 2], \
                strides=[2, 2], name="avgpool1")

            # 2nd conv layer : CONV + TANH + AVERAGE POOL
            conv_2 = self.conv2d(avgpool_1, num_input_channels=6, num_filters=16, \
                filter_shape=[5, 5], strides=[1, 1], name="conv_2")
            avgpool_2 = self.avgpool2d(conv_2, filter_shape=[2, 2], \
                strides=[2, 2], name="avgpool2")

            # 3rd conv layer : CONV + TANH
            conv_3 = self.conv2d(avgpool_2, num_input_channels=16, num_filters=120, \
                filter_shape=[5, 5], strides=[1, 1], name="conv3")

            # Flatten 
            flattened = tf.reshape(conv_3, [-1, 1 * 1 * 120])

            # Fully connected layer 1 : DENSE + TANH
            fc_1 = self.fully_connected(flattened, 120, 84, act=True, name="fc1")

            # Output, Fully connected layer 2 : DENSE + RBF
            fc_2 = self.fully_connected(fc_1, 84, 10, act=False, name="fc2")
            
            output = fc_2 
            
            # output should not have softmax as it is inside tf.nn.softmax_cross_entropy_with_logits_v2
            # output = rbf(fc_2, name="rbf") 

            return output 
        
    def train_optimizer(self, loss_value, learning_rate, beta1=0.9, beta2=0.999, epsilon=1e-08):
        """ Use an AdamOptimizer to train the network """
        with tf.name_scope("optimizer"):
            # Create optimizer
            my_optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate, beta1=beta1, \
                beta2=beta2, epsilon=epsilon)
            # Initialize train step
            train_step = my_optimizer.minimize(loss_value)

            return train_step

    def conv2d(self, input, num_input_channels, num_filters, filter_shape=[5, 5], strides=[1, 1], name="conv"):
        """ Convolution layer """
        with tf.name_scope(name):
            w = tf.Variable(tf.truncated_normal([filter_shape[0], filter_shape[1], num_input_channels, num_filters], \
                stddev=0.02), name=name+"_W")
            b = tf.Variable(tf.constant(0.1, shape=[num_filters]), name=name+"_b")

            # conv, bias and then activation
            output = tf.nn.conv2d(input, w, strides=[1, strides[0], strides[1], 1], padding='VALID')
            output = tf.nn.bias_add(output, b)
            output = tf.tanh(output)

            # Histogram summaries for Tensorboard
            tf.summary.histogram("weights", w)
            tf.summary.histogram("biases", b)
            tf.summary.histogram("activations", output)
            
            return output

    def avgpool2d(self, input, filter_shape=[2, 2], strides=[2, 2], name="avgpool"):
        """ Average Pooling layer """
        with tf.name_scope(name):
            return tf.nn.pool(input, pooling_type='AVG', \
                window_shape=[filter_shape[0], filter_shape[1]], \
                strides=[strides[0], strides[1]], padding='VALID', \
                name=name)

    def fully_connected(self, input, neurons_in, neurons_out, act=True, name="fc"):
        """ Fully connected layer """
        with tf.name_scope(name):
            W = tf.Variable(tf.truncated_normal([neurons_in, neurons_out], stddev=0.02), name=name+"_W")
            b = tf.Variable(tf.constant(0.1, shape=[neurons_out]), name=name+"_b")

            # Histogram summaries for Tensorboard
            tf.summary.histogram("weights", W)
            tf.summary.histogram("biases", b)

            output = tf.add(tf.matmul(input, W), b)

            if act: # if True, use tanh activation function, otherwise do not apply any activation
                return tf.tanh(output)
            else:
                return output

Define the TensorFlow operations for training

In [None]:
# tf Graph input
X = tf.placeholder(tf.float32, [None, 32, 32, 1])
y = tf.placeholder(tf.float32, [None, 10])

# Add visualizer of input images in Tensorboard
tf.summary.image("input", tensor=X, max_outputs=3)

LeNet = LeNet()
logits = LeNet.forward(X)

print("logits.shape: ", logits.shape)
# Evaluate model


In [None]:
def compute_loss_xent(logits, targets):
    """ Compute cross entropy as our loss function """
    with tf.name_scope("cross_entropy"):
        # Get rid of extra dimensions and cast targets into integers
        targets = tf.squeeze(tf.cast(targets, tf.int32))
        # Calculate cross entropy from logits and targets
        cross_entropy = tf.nn.softmax_cross_entropy_with_logits_v2( \
            logits=logits, labels=targets)
        # Take the average loss across batch size
        cross_entropy_mean = tf.reduce_mean(cross_entropy, name="cross_entropy")

        # Display scalar on Tensorboard
        tf.summary.scalar("xent", cross_entropy_mean)
        
        return cross_entropy_mean

def compute_accuracy(logits, targets):
    """ Compute the accuracy """
    with tf.name_scope("accuracy"):
        correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(targets, 1)) # or tf.nn.in_top_k(logits, targets, 1)
        accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

        # Display scalar on Tensorboard
        tf.summary.scalar("accuracy", accuracy)
        
        return accuracy

In [None]:
# Define the loss function and optimizer
cost = compute_loss_xent(logits, targets=y)
optimizer = LeNet.train_optimizer(cost, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-08)

# Evaluate model
accuracy = compute_accuracy(logits, targets=y)

In [None]:
## Initializing all variables
init = tf.global_variables_initializer()

<h3>Merging all summaries</h3>

In [None]:
merged_summary = tf.summary.merge_all()

<h3>Saving and restoring model</h3>

In [None]:
saver = tf.train.Saver()

<h3>Launch the execution Graph</h3>

In [None]:
epochs = 50
num_batches = 74
batch_size = 128
display_step = 1

with tf.Session() as sess:
    sess.run(init)
    # Visualizing the Graph
    writer = tf.summary.FileWriter("./tensorboard/" + experiment_name)
    writer.add_graph(sess.graph)
    
    for i in range(epochs):
        for j in range(num_batches):
            batch_x = X_train[(j * batch_size):((j + 1) * batch_size)]
            batch_y = y_train[(j * batch_size):((j + 1) * batch_size)]
            sess.run(optimizer, feed_dict={X: batch_x, y: batch_y})
            loss, acc = sess.run([cost, accuracy], feed_dict={X:batch_x, y:batch_y})
        
        if (i % 1) == 0:
            # Run the merged summary and write it to disk
            s = sess.run(merged_summary, feed_dict={X: batch_x, y: batch_y})
            writer.add_summary(s, (i + 1))
        
        if (epochs % display_step) == 0:
            print("Epoch:", "%03d," % (i + 1), \
                  "loss=", "%.5f," % (loss), \
                  "train accuracy:", "%.5f" % (acc))
    print("Training complete")