All the necessary imports

In [0]:
import pickle
from google.colab import files
import io
from google.colab import drive
import numpy as np
import random
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior() 

Fetching the data

In [0]:
def load_data(training_data_file, testing_data_file):
    with open(training_data_file, mode='rb') as f:
        train_data = pickle.load(f)
    with open(testing_data_file, mode='rb') as f:
        test_data = pickle.load(f)
    return train, test

In [0]:
drive.mount('/content/drive')
data_path="/content/drive/My Drive/traffic_sign_classifier_data/"

training_data_file= open(data_path+'train.p','rb')
test_data_file= open(data_path+'test.p','rb')
valid_data_file=open(data_path+'valid.p', 'rb')

train_data = pickle.load(training_data_file)
test_data = pickle.load(test_data_file)
valid_data = pickle.load(valid_data_file)

X_train, y_train = train_data['features'], train_data['labels']
X_test, y_test = test_data['features'], test_data['labels']
X_valid,y_valid = valid_data['features'], valid_data['labels']

Pickled data is a dict having 4 key-values paris, namely
features, labels, size and coords

problem with coords --- it considers original image and pickled data has 32*32 downsampled image (this ate my head all the way)



---

Getting to know the data

In [0]:
# Number of training examples
n_train = len(X_train)

# Number of validation examples
n_validation = len(X_valid)

# Number of testing examples.
n_test = len(X_test)

# What's the shape of an traffic sign image?
image_shape = np.shape(X_train[0])

# How many unique classes/labels there are in the dataset.
n_classes = len(np.unique(y_train))

print("Number of training examples =", n_train)
print("Number of testing examples =", n_test)
print("Number of validation examples =", n_validation)
print("Image data shape =", image_shape)
print("Number of classes =", n_classes)



---

Visualize some samples

In [0]:
%matplotlib inline

# show image of 20 random data points
figs, axs = plt.subplots(4, 5, figsize = (16, 8))
axs = axs.ravel()
for i in range(20):
    index = random.randint(0, len(X_train))
    image = X_train[index]
    axs[i].axis('off')
    axs[i].imshow(image)
    axs[i].set_title(y_train[index])



Preprocessing the data
doing the normalization (image centering to zero image and dividing by std_div)

In [0]:
def data_preprocess(X):


    X = np.float32(X)
    
    # standardize features
    X -= np.mean(X)
    X /= (np.std(X) + np.finfo('float32').eps)

    # gray-scale conversion
    X = np.sum(X/3, axis = 2, keepdims = True)

    return X


X_train = np.array([data_preprocess(img) for img in X_train])

X_test = np.array([data_preprocess(img) for img in X_test])

X_valid = np.array([data_preprocess(img) for img in X_valid])


Defining model architecture
LeNet 5 which is taught in Udacity course is used here
paper of a model related to same problem is at http://yann.lecun.com/exdb/publis/pdf/sermanet-ijcnn-11.pdf

In [0]:
def LeNet(x):    
    # Arguments used for tf.truncated_normal, randomly defines variables for the weights and biases for each layer
    mu = 0
    sigma = 0.1
    
    # Layer 1: Convolution. Input = 32x32x1. Output = 28x28x6.
    conv1_w = tf.Variable(tf.truncated_normal(shape = (5, 5, 1, 6), mean = mu, stddev = sigma))
    conv1_b = tf.Variable(tf.zeros(6))
    conv1 = tf.nn.conv2d(x, conv1_w, strides=[1, 1, 1, 1], padding='VALID') + conv1_b
    
    # Activation
    conv1 = tf.nn.relu(conv1)
                          
    # Pooling. Input = 28x28x6. Output = 14x14x6.
    conv1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')
                   
    # Layer 2: Convolution. Output = 10x10x16.
    conv2_w = tf.Variable(tf.truncated_normal([5, 5, 6, 16], mean = mu, stddev = sigma))
    conv2_b = tf.Variable(tf.zeros(16))
    conv2 = tf.nn.conv2d(conv1, conv2_w, strides=[1, 1, 1, 1], padding='VALID') + conv2_b

    # Activation
    conv2 = tf.nn.relu(conv2)
    
    # Pooling. Input = 10x10x16. Output = 5x5x16.
    conv2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')

    # Flatten. Input = 5x5x16. Output = 400.
    conv2 = tf.layers.flatten(conv2)
    
    # Dropout layer 1
    conv2 = tf.nn.dropout(conv2, 0.80)
    
    # Layer 3: Fully Connected. Input = 400. Output = 120.
    fconnected1_w = tf.Variable(tf.truncated_normal([400, 120], mean = mu, stddev = sigma))
    fconnected1_b = tf.Variable(tf.zeros(120))
    fconnected_1 = tf.add(tf.matmul(conv2, fconnected1_w), fconnected1_b)
    
    # Activation.
    fconnected_1 = tf.nn.relu(fconnected_1)
    
    # Dropout layer 2
    fconnected_1 = tf.nn.dropout(fconnected_1, 0.70)                             
    
    # Layer 4: Fully Connected. Input = 120. Output = 84.
    fconnected2_w = tf.Variable(tf.truncated_normal([120, 84], mean = mu, stddev = sigma))
    fconnected2_b = tf.Variable(tf.zeros(84))
    fconnected_2 = tf.add(tf.matmul(fconnected_1, fconnected2_w), fconnected2_b)
    
    # Activation.
    fconnected_2 = tf.nn.relu(fconnected_2)
    
    # Layer 5: Fully Connected. Input = 84. Output = 43.
    fconnected3_w = tf.Variable(tf.truncated_normal([84, 43], mean = mu, stddev = sigma))
    fconnected3_b = tf.Variable(tf.zeros(43))
    logits = tf.add(tf.matmul(fconnected_2, fconnected3_w), fconnected3_b)
    
    return logits

In [0]:

x = tf.placeholder(tf.float32, (None, 32, 32, 1))
y = tf.placeholder(tf.int32, (None))
one_hot_y = tf.one_hot(y, 43)
rate = 0.001

In [0]:
EPOCHS = 50
BATCH_SIZE = 128
rate = 0.001

In [0]:
logits = LeNet(x)
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(labels=one_hot_y, logits=logits)
loss_operation = tf.reduce_mean(cross_entropy)
optimizer = tf.train.AdamOptimizer(learning_rate = rate)
training_operation = optimizer.minimize(loss_operation)

In [0]:
correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(one_hot_y, 1))
accuracy_operation = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
saver = tf.train.Saver()

In [0]:

def evaluate(X_data, y_data):
    """Calculate the accuracy of the model
    
    Parameters
    ----------
    X_data : Tensor
        the input data
    y_data : Tensor
        the labels for the input data
        
    Returns
    -------
    float
        the accuracy of the model
    
    """
    num_examples = len(X_data)
    total_accuracy = 0
    total_loss = 0
    sess = tf.get_default_session()
    # print("here...")
    # print()
    for offset in range(0, num_examples, BATCH_SIZE):
        batch_x, batch_y = X_data[offset:offset+BATCH_SIZE], y_data[offset:offset+BATCH_SIZE]
        loss, accuracy = sess.run([loss_operation, accuracy_operation], feed_dict={x: batch_x, y: batch_y})
        total_accuracy += (accuracy * len(batch_x))
        total_loss += (loss * len(batch_x))
    return total_loss/float(num_examples), total_accuracy/float(num_examples)

In [0]:
train_loss_history = []
valid_loss_history = []

train_accuracy_history = []
valid_accuracy_history = []

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    num_examples = len(X_train)

    print("Training...")
    print()
    for i in range(EPOCHS):
        X_train, y_train = shuffle(X_train, y_train)
        for offset in range(0, num_examples, BATCH_SIZE):
            end = offset + BATCH_SIZE
            batch_x, batch_y = X_train[offset:end], y_train[offset:end]
            sess.run(training_operation, feed_dict={x: batch_x, y: batch_y})

        validation_loss, validation_accuracy = evaluate(X_valid, y_valid)
        valid_loss_history.append(validation_loss)
        valid_accuracy_history.append(validation_accuracy)
        train_loss, train_accuracy = evaluate(X_train, y_train)
        train_loss_history.append(train_loss)
        train_accuracy_history.append(train_accuracy)
        print("EPOCH {} ...".format(i+1))
        print("Validation Accuracy = {:.3f}".format(validation_accuracy))
        print("Training Accuracy = {:.3f}".format(train_accuracy))
        print("Validation loss = {:.3f}".format(validation_loss))
        print("Training loss = {:.3f}".format(train_loss))
        
        print()

    saver.save(sess, './traffic-signs')
    print("Model saved")

Plotting epochs vs training/validation loss and epochs vs training/validation accuracy

In [0]:
plt.figure()
loss_plot = plt.subplot(2,1,1)
loss_plot.set_title('Loss')
loss_plot.plot(train_loss_history, 'r', label='Training Loss')
loss_plot.plot(valid_loss_history, 'b', label='Validation Loss')
loss_plot.set_xlim([0, EPOCHS])
loss_plot.legend(loc = 1)

plt.figure()
accuracy_plot = plt.subplot(2,1,1)
accuracy_plot.set_title('Accuracy')
accuracy_plot.plot(train_accuracy_history, 'r', label='Training Accuracy')
accuracy_plot.plot(valid_accuracy_history, 'b', label='Validation Accuracy')
accuracy_plot.set_xlim([0, EPOCHS])
accuracy_plot.legend(loc = 4)

#TODO: Run over test dataset, try to reduce overfitting, report exponential average over validation dataset