# **SDC Traffic Sign Classifier** 
## Overview 
Develop a Deep Leaning Network to classify traffic signs. To accomplish this a Convolutional Neural Network (CNN) will be developed to classify traffic signs. Specifically the CNN will focus on German traffic signs. [German Traffic Sign Dataset](http://benchmark.ini.rub.de/?section=gtsrb&subsection=dataset).
 
## Requirements

The requirements are:

- Architecure based on LeNet-5 implementation. 
- Outline any preprocessing techniques used (normalization, rgb to grayscale, etc)
- Outline any balancing techniques used on the number of examples per label (some have more than others).
- Evaluate if the Neural Network is over or underfitting?)
- Generate fake data 
- The Neural Network needs to have a validation set accuracy >= 0.93 

## Steps

The steps involved are:
- Get the Test Data and Display Attributes
- Plot the Test Data
- Plot Histogram of Test Data
- Pre-process the data set
- TrafficSignNet Architecture
- Train, Validate and Test the Model
- Features and Labels
- Training Pipeline
- Model Evaluation
- Train the Model
- Evaluate the Model
- Test on random images
- Predict the Sign Type for Each Image
- Output Top 5 Softmax Probabilities for each image found on the web

## Import Packages

In [1]:
import tensorflow as tf
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
import numpy as np
import cv2
import os
import math
import pickle
import csv
%matplotlib inline

## SDC Helper Functions
Below are the helper functions for Traffic Sign Classification!

In [2]:
# Create a Class for some useful Enums
class Enum( tuple ): __getattr__ = tuple.index 

# Define Enum for handling Data Sets
DataSet = Enum( ['TRAIN', 'VALID', 'TEST', 'RANDOM'] ) 

# Define Enum for different equalization modes
EqualizeMode = Enum( ['NORMAL', 'ADAPTIVE'] ) 

def get_test_data( test_data_dir ):
   """Get Test Data"""
   for filename in os.listdir( test_data_dir ):
       if any( filename.endswith( f ) for f in ( '.p' ) ):
           if filename == 'train.p':
               with open(test_data_dir+'/'+filename, mode='rb') as f:
                   train = pickle.load(f)
           elif filename == 'valid.p':
               with open(test_data_dir+'/'+filename, mode='rb') as f:
                   valid = pickle.load(f)
           elif filename == 'test.p':
               with open(test_data_dir+'/'+filename, mode='rb') as f:
                   test = pickle.load(f)
           else:
               print(" unknown filename {}".format( filename ) )
   return train, valid,test

def get_sign_names( test_data_dir ):
   """Extract sign names from CSV file"""
   with open('signnames.csv', mode='r') as f:
       reader = csv.reader(f) 
       next( reader ) 
       signs = list( reader ) 
   return signs 

def plot_dataset_images( dataset ): 
   """Plot images in a neat fashion using subplot"""
   plot_cols = 4
   plot_rows = math.ceil( n_classes / plot_cols )
   plt.figure( figsize = ( 15, 40 ), facecolor='white' )
   for i in range( n_classes ):
       plot_num = i+1
       ax = plt.subplot( plot_rows, plot_cols, plot_num ) 
       if dataset == DataSet.TRAIN: 
           img = X_train[y_train == i] 
       elif dataset == DataSet.VALID: 
           img = X_valid[y_valid == i] 
       elif dataset == DataSet.TEST: 
           img = X_test[y_test == i] 
       else:
           print(" unknown dataset") 
       plt.imshow( img[0, :, :, :] )
       ax.set_title( signs[i][1], fontsize = 12 )
   plt.tight_layout()

def plot_histogram( data, dataset ): 
   """Plot histogram to show distribution of test_data"""
   plt.figure( figsize = ( 20, 10 ), facecolor='white' )
   if dataset == DataSet.TRAIN: 
       plt.title( 'Histogram of X_train across all Class Ids', fontsize = 24 )
   elif dataset == DataSet.VALID: 
       plt.title( 'Histogram of X_valid across all Class Ids', fontsize = 24 )
   elif dataset == DataSet.TEST: 
       plt.title( 'Histogram of X_test across all Class Ids', fontsize = 24 )
   else:
       print(" unknown plottype") 
   plt.ylabel( 'Num Images per Class Id', fontsize = 16 )
   plt.xlabel( 'Class Id', fontsize = 16 )
   plt.hist( data, len( signs ) ) 

def equalize_data( img, mode ): 
   """Applies Histogram Equalization to enhance contrast"""
   img = img.copy() 
   if mode == EqualizeMode.ADAPTIVE: 
       lab = cv2.cvtColor( img, cv2.COLOR_BGR2LAB ) 
       lab_planes = cv2.split( lab ) 
       clahe = cv2.createCLAHE( clipLimit=2.0,tileGridSize=(8,8) ) 
       lab_planes[0] = clahe.apply(lab_planes[0]) 
       lab = cv2.merge(lab_planes) 
       img = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR) 
   elif mode == EqualizeMode.NORMAL: 
       for channel in range( 3 ): 
           img[:,:,channel] = cv2.equalizeHist( img[:,:,channel] ) 
   else:
       print(" unknown mode") 
   return img 

def normalize_data( img ): 
   """Normalize to have zero mean"""
   return ( img - 128. ) / 128. 

def find_images( img_dir ):
   """Find and return a list of various image file formats"""
   image_files = []
   for filename in os.listdir( img_dir ):
       if any( filename.endswith( f ) for f in ( '.jpg', '.jpeg','.gif', '.png' ) ):
           image_files.append( os.path.abspath( img_dir+'/'+filename ) )
   return sorted( image_files ) 

## Get the Test Data and Show Attributes

The test_data is a set of pickle files in dictionary format with 4 key/value pairs:
- `features` A 4D array of raw pixel data of the traffic sign images, (num examples, width, height, channels).
- `labels` A 1D array containing the label/class id of the traffic sign. The file `signnames.csv` contains id -> name mappings for each id.
- `sizes` A list of tuples, (width, height) representing the original width and height the image.
- `coords` A list of tuples, (x1, y1, x2, y2) representing coordinates of a bounding box around the sign in the image.


In [1]:
# Get the Test Data
train, valid, test = get_test_data( 'test_data' )

# Extract the features and labels from the test data
X_train = train['features']
y_train = train['labels']

X_valid = valid['features']
y_valid = valid['labels']

X_test  = test['features'] 
y_test  = test['labels'] 

# Display Test Data Summary
n_train = X_train.shape[0]
n_valid = X_valid.shape[0]
n_test = X_test.shape[0]

image_shape = X_train[0].shape
n_classes = len( set( y_train ) )

print( "Number of training examples =", n_train )
print( "Number of validation examples =", n_valid )
print( "Number of testing examples =", n_test )
print( "Image data shape =", image_shape )
print( "Number of classes =", n_classes )

# Get Sign Names
signs = get_sign_names( 'test_data' )

## Plot Training Images

Sample only - Plot the first image from each ClassId


In [None]:
plot_dataset_images( DataSet.TRAIN )

## Plot Histogram of Training Images

In [None]:
plot_histogram( y_train, DataSet.TRAIN )

## Plot Validation Images

Sample only - Plot the first image from each ClassId


In [None]:
plot_dataset_images( DataSet.VALID )

## Plot Histogram of Validation Images

In [None]:
plot_histogram( y_valid, DataSet.VALID )

## Plot Test Images

Sample only - Plot the first image from each ClassId


In [None]:
plot_dataset_images( DataSet.TEST )

## Plot Histogram of Test Images

In [None]:
plot_histogram( y_test, DataSet.TEST )

## Pre-process the data sets

For now we just normalized the data for zero mean and equal variance using `(pixel - 128)/ 128`


In [None]:
# Pre-process data
#print("Equalize each data set") 

equalize_mode = EqualizeMode.ADAPTIVE 

X_train = np.array( [ equalize_data( img, equalize_mode ) for img in X_train] )
X_valid = np.array( [ equalize_data( img, equalize_mode ) for img in X_valid] )
X_test  = np.array( [ equalize_data( img, equalize_mode ) for img in X_test] )

print("Normalize each data set") 
X_train = np.array( [ normalize_data( img ) for img in X_train] )
X_valid = np.array( [ normalize_data( img ) for img in X_valid] )
X_test  = np.array( [ normalize_data( img ) for img in X_test] )

print("Calculate and print the Mean of each data set. Mean should be close to 0 ")
print("X_train mean (normalized) = {}".format( np.mean( X_train ) ) ) 
print("X_valid mean (normalized) = {}".format( np.mean( X_valid ) ) ) 
print("X_test  mean (normalized) = {}".format( np.mean( X_test ) ) )

## Plot Normalized and Equalized Training Images

Sample only - Plot the first image from each ClassId

In [None]:
plot_dataset_images( DataSet.TRAIN )

## TrafficSignNet Architecture

This architecture takes LeNet as the base design and adds a Dropout after each Regularization layer. Dropout was pioneered by Geoffrey Hinton and is known to work very well in reducing overfitting.
I have run many training sesseions with Dropout and it definitely helps ( Consistenly get >=93% ).
- Layer 1: 
   - Input = 32x32x3. Output = 28x28x6.
   - conv1_W: 
       - Uses a 5x5 filter  with input depth=3 and output depth=6.
       - The output_height = (input_height - filter_height)+1/vertical_stride, i.e 32-5+1/1 = 28. 
       - The output_width = (input_width - filter_width)+1/horinzatal_stride, i.e.  32-5+1/1 = 28 
   - conv1_b: 
       - Bias vector, length = output_depth 
   - conv1: 
       - Convolve the filter over the images and add bias
       - Activate the output of conv1 with a RELU activation function
       - Pool the output with a 2x2 kernel with a 2x2 stride which gives us a pooling output of 14x14x6
       - Add a dropout layer with keep_prob < 1
- Layer 2: 
   - Input = 14x14x6. Output = 5x5x16.
   - The steps from Layer 1 are repeated to create another layer
- Layer 3: 
   - Fully Connected Layer. 
   - Input = 400. Output = 120.
- Layer 4: 
   - Fully Connected Layer. 
   - Input = 120. Output = 84.
- Layer 5: 
   - Fully Connected Layer. 
   - Input = 84. Output = 43 which is number of classes in dataset.


In [None]:
from tensorflow.contrib.layers import flatten

def TrafficSignNet( x ): 
   mu = 0 
   sigma = 0.1 

   # Layer 1: 
   conv1_W = tf.Variable( tf.truncated_normal( shape=(5, 5, 3, 6), mean = mu, stddev = sigma ) )
   conv1_b = tf.Variable( tf.zeros(6) )
   conv1   = tf.nn.conv2d( x, conv1_W, strides=[1, 1, 1, 1], padding='VALID' ) + conv1_b
   conv1   = tf.nn.relu(conv1)
   conv1   = tf.nn.max_pool( conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID' )
   conv1   = tf.nn.dropout( conv1, keep_prob ) 

   # Layer 2: 
   conv2_W = tf.Variable( tf.truncated_normal( shape=(5, 5, 6, 16), mean = mu, stddev = sigma ) )
   conv2_b = tf.Variable( tf.zeros( 16 ) )
   conv2   = tf.nn.conv2d( conv1, conv2_W, strides=[1, 1, 1, 1], padding='VALID' ) + conv2_b
   conv2   = tf.nn.relu( conv2 )
   conv2   = tf.nn.max_pool( conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID' )
   conv2   = tf.nn.dropout( conv2, keep_prob ) 

   # Flatten the output to create a single vector. Input = 5x5x16. Output = 400.
   fc0   = flatten( conv2 )

   # Layer 3: 
   fc1_W = tf.Variable( tf.truncated_normal( shape=(400, 120), mean = mu, stddev = sigma ) )
   fc1_b = tf.Variable( tf.zeros( 120 ) )
   fc1   = tf.matmul( fc0, fc1_W ) + fc1_b
   fc1   = tf.nn.relu( fc1 )
   fc1   = tf.nn.dropout( fc1, keep_prob ) 

   # Layer 4: 
   fc2_W  = tf.Variable( tf.truncated_normal( shape=(120, 84), mean = mu, stddev = sigma ) )
   fc2_b  = tf.Variable( tf.zeros(84) )
   fc2    = tf.matmul( fc1, fc2_W ) + fc2_b
   fc2    = tf.nn.relu( fc2 )
   fc2   = tf.nn.dropout( fc2, keep_prob ) 

   # Layer 5: 
   fc3_W  = tf.Variable( tf.truncated_normal( shape=(84, 43), mean = mu, stddev = sigma ) )
   fc3_b  = tf.Variable( tf.zeros( 43 ) )
   logits = tf.matmul( fc2, fc3_W ) + fc3_b
   
   return logits

## Train, Validate and Test the Model
A validation set can be used to assess how well the model is performing. A low accuracy on the training and validation
sets imply underfitting. A high accuracy on the training set but low accuracy on the validation set implies overfitting.

## Features and Labels
`x` is a placeholder for a batch of input images.
`y` is a placeholder for a batch of output labels.

In [None]:
x = tf.placeholder( tf.float32,( None, 32, 32, 3 ) )
y = tf.placeholder( tf.int32,( None ) )
one_hot_y = tf.one_hot( y, n_classes )
keep_prob = tf.placeholder( tf.float32 )

## Training Pipeline


In [None]:
EPOCHS = 50
BATCH_SIZE = 128
rate = 0.001

logits = TrafficSignNet( x )
cross_entropy = tf.nn.softmax_cross_entropy_with_logits( labels=one_hot_y, logits=logits )
loss_operation = tf.reduce_mean( cross_entropy )
optimizer = tf.train.AdamOptimizer( learning_rate = rate )
training_operation = optimizer.minimize( loss_operation )

## Model Evaluation


In [None]:
correct_prediction = tf.equal( tf.argmax( logits, 1 ), tf.argmax( one_hot_y, 1 ) )
accuracy_operation = tf.reduce_mean( tf.cast(correct_prediction, tf.float32 ) )
saver = tf.train.Saver()

def evaluate( X_data, y_data ):
    num_examples = len( X_data )
    total_accuracy = 0
    session = tf.get_default_session()
    for offset in range( 0, num_examples, BATCH_SIZE ):
        batch_x, batch_y = X_data[ offset:offset+BATCH_SIZE ], y_data[ offset:offset+BATCH_SIZE ]
        accuracy = session.run( accuracy_operation, feed_dict={ x: batch_x, y: batch_y, keep_prob: 1.0 } )
        total_accuracy += ( accuracy * len( batch_x ) )
    return total_accuracy / num_examples

## Train the Model
Run the training data through the training pipeline to train the model.
Before each epoch, shuffle the training set.
After each epoch, measure the loss and accuracy of the validation set.
Save the model after training.


In [None]:
with tf.Session() as session:
    session.run( tf.global_variables_initializer() )
    num_examples = len( X_train )
    
    print("Training...")
    print()
    for i in range( EPOCHS ):
        X_train, y_train = shuffle( X_train, y_train )
        for offset in range( 0, num_examples, BATCH_SIZE ):
            end = offset + BATCH_SIZE
            batch_x, batch_y = X_train[ offset:end ], y_train[ offset:end ]
            session.run( training_operation, feed_dict={ x: batch_x, y: batch_y, keep_prob: 0.80 } )
            
        validation_accuracy = evaluate( X_valid, y_valid )
        print("EPOCH {} ...".format( i+1 ) )
        print("Validation Accuracy = {:.3f}".format(validation_accuracy ) )
        print()
        
    saver.save( session, './traffic_sign_net' )
    print("Model saved")

## Evaluate the Model
Do this once!


In [None]:
with tf.Session() as session:
    saver.restore(session, tf.train.latest_checkpoint('.'))

    training_set_accuracy = evaluate(X_train, y_train)
    print( "Training Set Accuracy = {:.3f}".format( training_set_accuracy ) )

    validation_set_accuracy = evaluate(X_valid, y_valid )
    print( "Validation Set Accuracy = {:.3f}".format( validation_set_accuracy ) )

    test_set_accuracy = evaluate(X_test, y_test)
    print( "Test Set Accuracy = {:.3f}".format( test_set_accuracy ) )

## Test on random images

In [None]:
print( "Get Random Test Set")
random_images = find_images( 'random_traffic_signs' )

print("Resize randome images to 32x32") 
X_random = np.array( [ cv2.resize(plt.imread( img ), (32, 32) ) for img in random_images ] ) 
print( "X_random.shape = ", X_random.shape )

print("Equalize the data set") 
X_random = np.array( [ equalize_data( img, equalize_mode ) for img in X_random] )

print("Normalize each data set") 
X_random = np.array( [ normalize_data( img ) for img in X_random] )

print("Print the Mean of random data set. Mean should be close to 0 ")
print("X_random mean = {}".format( np.mean( X_random ) ) ) 

actual_label = [ 0,7,12,24,35,40 ] 

plt.figure( figsize = ( 15, 15 ), facecolor='white' )
plot_cols = 2
plot_rows = math.ceil( len(random_images) / plot_cols )
for i in range( len(random_images) ): 
   plot_num = i+1
   ax = plt.subplot( plot_rows, plot_cols, plot_num ) 
   plt.imshow(X_random[i, :, :, :])
   ax.set_title( signs[actual_label[i]][1], fontsize = 12 )
plt.tight_layout()



## Predict the Sign Type for Each Image

In [3]:
prediction = tf.argmax( logits, 1 )

with tf.Session() as session: 
   saver.restore( session, tf.train.latest_checkpoint('.') ) 
   predictions = session.run( prediction, feed_dict={ x: X_random, keep_prob: 1.0 } ) 

for i, p in enumerate( predictions ): 
   print("prediction label = {:2d}, actual label = {:2d}".format(p, actual_label[i] ) ) 


## Output Top 5 Softmax Probabilities For Each Image Found on the Web

In [3]:
softmax_logits = tf.nn.softmax( logits ) 
top5 = tf.nn.top_k( softmax_logits, k=5 ) 
with tf.Session() as session: 
    saver.restore( session, tf.train.latest_checkpoint('.') ) 
    softmax_data = session.run( softmax_logits, feed_dict={ x: X_random, keep_prob: 1.0 } ) 
    top5_softmax = session.run( top5, feed_dict={ x: X_random, keep_prob: 1.0 } ) 

for i in range( len(random_images) ): 
    print( "Image: {}, Softmax Probability 1: {:.4f}".format( signs[ actual_label[i]][1], top5_softmax[0][i][0] ) )
    print( "Image: {}, Softmax Probability 2: {:.4f}".format( signs[ actual_label[i]][1], top5_softmax[0][i][1] ) )
    print( "Image: {}, Softmax Probability 3: {:.4f}".format( signs[ actual_label[i]][1], top5_softmax[0][i][2] ) )
    print( "Image: {}, Softmax Probability 4: {:.4f}".format( signs[ actual_label[i]][1], top5_softmax[0][i][3] ) )
    print( "Image: {}, Softmax Probability 5: {:.4f}".format( signs[ actual_label[i]][1], top5_softmax[0][i][4] ) )
    print(" ")
