In [None]:
# Load pickled data
import pickle
import numpy as np

import cv2
import random
import matplotlib

from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt
from math import *
import sys
training_file = "./datasets/train.p"
x_training_file = "./datasets/x_train_augmented_test.p"
y_training_file = "./datasets/y_train_augmented_test.p"
validation_file = "./datasets/valid.p"
testing_file = "./datasets/test.p"

with open(training_file, mode='rb') as f:
    train = pickle.load(f)
with open(validation_file, mode='rb') as f:
    valid = pickle.load(f)
with open(testing_file, mode='rb') as f:
    test = pickle.load(f)

X_train, y_train = train['features'], train['labels']
X_train, y_train = shuffle(X_train, y_train)

X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size=0.10, random_state=42)

In [None]:
# TODO: Number of training examples
n_train = X_train.shape[0]
n_valid = X_valid.shape[0]
# TODO: Number of testing examples.
#n_test = X_test.shape[0]

# TODO: What's the shape of an traffic sign image?
image_shape = X_train[0].shape

# TODO: How many unique classes/labels there are in the dataset.
unique_labels = np.unique(y_train)
n_classes = len(unique_labels)

print("Number of training examples =", n_train)
print("Number of validation examples =", n_valid)
#print("Number of testing examples =", n_test)
print("Image data shape =", image_shape)
print("Number of classes =", n_classes)


# Show an example of each class
i = 1
for label in unique_labels:
        # Pick the first image for each label.
        index = np.where(y_train == label)[0][0]
        image = X_train[index]
        plt.subplot(7, 7, i)
        plt.axis('off')
        i += 1
        _ = plt.imshow(image)
        
plt.show()

In [None]:
# Let's check the distribution of training data in terms of classes

x= np.arange(0,n_classes)
plt.hist(y_train, bins=n_classes,rwidth=0.85)
plt.show()

In [None]:
# PRE PROCESSING

def pre_process(img):
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    img = ((img - [128.0]) / 128.0)
    return img.reshape(32,32,1)

X_train = np.array(list(map(pre_process, X_train)))
X_valid = np.array(list(map(pre_process, X_valid)))
print("After pre-processing")
print(X_train.shape)

In [None]:
# MODEL ARCHITECTURE

import tensorflow as tf


EPOCHS = 100
BATCH_SIZE = 100

from tensorflow.contrib.layers import flatten
import tfhelper

strides = {  "l1": 1, "p1": 2, 'l2': 1, 'p2': 2, 'l3': 1, 'l4':1, "out": 1}

dropout = { "l3": .75,  "l4": .75, 'out': 0.75 }


shapes = {
    'input': [32,32,1],
    'l1': [28,28,6],
    'p1': [14,14,6],
    'l2': [10,10,16],
    'p2': [5,5,16],
    'flat': [400],
    'l3': [120],
    'l4': [84],
    'out': [43]
}

pipeline = ['input', 'l1','p1',  'l2', 'p2', 'flat', 'l3', 'l4', 'out']

In [None]:
x = tf.placeholder(tf.float32, (None, 32, 32, 1))
y = tf.placeholder(tf.int32, (None))
one_hot_y = tf.one_hot(y, 43)

In [None]:

mu = 0
sigma = 0.1

# Layer 1: Convolutional.
height_l1 = 1 + ceil(float(shapes['input'][0] - shapes['l1'][0] ) / float(strides['l1']))
width_l1 =  1 + ceil((shapes['input'][1] - shapes['l1'][1] ) / float(strides['l1']))
shape_l1 = [height_l1, width_l1, shapes['input'][-1], shapes['l1'][-1]]
print("Filter l1 : "+str(height_l1)+"x"+str(width_l1))
weights_l1 = tf.Variable(tf.truncated_normal(shape_l1, mean=mu, stddev=sigma))
strides_l1 = [1, strides['l1'], strides['l1'], 1]
print("Weights l1 : "+str(weights_l1))
l1 = tf.nn.conv2d(x, weights_l1, strides=strides_l1, padding="VALID")
bias_l1 = tf.Variable(tf.zeros(shapes['l1'][-1]))
print("bias l1 : "+str(bias_l1))
l1 = tf.nn.bias_add(l1, bias_l1)
l1 = tf.nn.relu(l1, name='l1')

print(l1)
# Pooling
strides_p1 = [1, strides['p1'], strides['p1'], 1]
p1 = tf.nn.max_pool(l1, ksize=strides_p1, strides=strides_p1, padding='VALID', name='p1')
print(p1)
# Layer 2: Convolutional.
height_l2 = 1 + ceil(float(shapes['p1'][0] - shapes['l2'][0] ) / float(strides['l2']))
width_l2 =  1 + ceil((shapes['p1'][1] - shapes['l2'][1] ) / float(strides['l2']))
shape_l2 = [height_l2, width_l2, shapes['p1'][-1], shapes['l2'][-1]]
print("Filter l2 : "+str(height_l2)+"x"+str(width_l2))
weights_l2 = tf.Variable(tf.truncated_normal(shape_l1, mean=mu, stddev=sigma))
strides_l2 = [1, strides['l1'], strides['l1'], 1]

l2 = tf.nn.conv2d(x, weights_l2, strides=strides_l2, padding="VALID")
bias_l2 = tf.Variable(tf.zeros(shapes['l2'][-1]))
print("Weights l2 : "+str(weights_l2))
print("bias l2 : "+str(bias_l2))
l2 = tf.nn.bias_add(l2, bias_l2)
l2 = tf.nn.relu(l2, name='l1')
print(l2)
# Pooling.

strides_p2 = [1, strides['p2'], strides['p2'], 1]
p2 = tf.nn.max_pool(l1, ksize=strides_p2, strides=strides_p2, padding='VALID', name='p2')
print(p2)
# Flatten.
flat = flatten(p2)
print(flat)

# Layer 3: Fully Connected.

l3_neurons = shapes['l3'][0]
l3_incoming = shapes['flat'][0]

l3_weights = tf.Variable(tf.truncated_normal([l3_incoming, l3_neurons], mean=mu, stddev=sigma))
l3_biases = tf.Variable(tf.zeros([l3_neurons])
l3 = tf.nn.relu(tf.add(tf.matmul(flat, l3_weights), l3_biases))
print("Weights l3 : "+str(l3_weights))
print("bias l3 : "+str(l3_biases))
l3 = tf.nn.dropout(l3, tf.constant(dropout['l3']))

print(l3)
# Layer 4: Fully Connected.

l4_neurons = shapes['l4'][0]
l4_incoming = shapes['l3'][0]

l4_weights = tf.Variable(tf.truncated_normal([l4_incoming, l4_neurons], mean=mu, stddev=sigma))
l4_biases = tf.Variable(tf.zeros([l4_neurons])
l4 = tf.nn.relu(tf.add(tf.matmul(l3, l4_weights), l4_biases))
print("Weights l4 : "+str(l4_weights))
print("bias l4 : "+str(l4_biases))
l4 = tf.nn.dropout(l4, tf.constant(dropout['l4']))
print(l4)


# Layer 5: Fully Connected.
l5_neurons = shapes['out'][0]
l5_incoming = shapes['l4'][0]

l5_weights = tf.Variable(tf.truncated_normal([l5_incoming, l5_neurons], mean=mu, stddev=sigma))
l5_biases = tf.Variable(tf.zeros([l5_neurons])
l5 = tf.nn.relu(tf.add(tf.matmul(l4, l5_weights), l5_biases))
print("Weights l5 : "+str(l5_weights))
print("bias l5 : "+str(l5_biases))
logits = tf.nn.dropout(l5, tf.constant(dropout['out']))
print(logits)

In [None]:
def evaluate(X_data, y_data):
    num_examples = len(X_data)
    total_accuracy = 0
    sess = tf.get_default_session()
    for offset in range(0, num_examples, BATCH_SIZE):
        batch_x, batch_y = X_data[offset:offset+BATCH_SIZE], y_data[offset:offset+BATCH_SIZE]
        accuracy = sess.run(accuracy_operation, feed_dict={x: batch_x, y: batch_y})
        total_accuracy += (accuracy * len(batch_x))
    return total_accuracy / num_examples

def update_progress(progress):
    sys.stdout.write('\r[{0}{1}] {2:.2f}%'.format('#'*ceil(progress/10), ' '*(10-ceil(progress/10)), progress))


In [None]:
rate = 0.003

cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=one_hot_y)
loss_operation = tf.reduce_mean(cross_entropy)

optimizer = tf.train.AdamOptimizer(learning_rate=rate)
training_operation = optimizer.minimize(loss_operation)

correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(one_hot_y, 1))
accuracy_operation = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
saver = tf.train.Saver()

In [None]:

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    num_examples = len(X_train)
    print(X_train.shape)
    print("Training...")
    print()
    for i in range(EPOCHS):
        for offset in range(0, num_examples, BATCH_SIZE):
            end = offset + BATCH_SIZE
            batch_x, batch_y = X_train[offset:end], y_train[offset:end]

            sess.run(training_operation, feed_dict={x: batch_x, y: batch_y})
            update_progress(end * 100 / num_examples)


        print("\n\nEPOCH {} ...".format(i + 1))
        validation_accuracy = evaluate(X_valid, y_valid)
        print("Validation Accuracy = {:.3f}".format(validation_accuracy))
        print()