# Self-Driving Car Engineer Nanodegree

## Deep Learning

## Project: Build a Traffic Sign Recognition Classifier

---
## Step 0: Load The Data

In [1]:
import time
import pickle
import numpy as np
import tensorflow as tf
import csv
import cv2
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from keras.optimizers import Adam, SGD
from keras.layers import Input, Flatten, Dense, Conv2D, Add, Lambda, Cropping2D, MaxPooling2D
from keras.layers import BatchNormalization, Activation, Dropout, Concatenate
from keras.models import Model

import matplotlib.pyplot as plt
# Visualizations will be shown in the notebook.
%matplotlib inline

# Load pickled data
training_file = './traffic-signs-data/train.p'
validation_file= './traffic-signs-data/valid.p'
testing_file = './traffic-signs-data/test.p'

train_dict = pickle.load(open(training_file, mode='rb') )
valid_dict = pickle.load(open(validation_file, mode='rb'))
test_dict  = pickle.load(open(testing_file, mode='rb'))

X_train, y_train = train_dict['features'], train_dict['labels']
X_valid, y_valid = valid_dict['features'], valid_dict['labels']
X_test, y_test   = test_dict['features'], test_dict['labels']

Using TensorFlow backend.


In [2]:
# Image processing helper functions

def rotate_image(image, angle_range = 30):
    rows,cols,ch = image.shape
    random_angle = np.random.uniform(angle_range)-angle_range/2
    rot_mat = cv2.getRotationMatrix2D((cols/2,rows/2), random_angle, scale = 1)
    return cv2.warpAffine(image,rot_mat,(cols,rows))

# 20
def translate_image(image, trans_range = 5):
    rows,cols,ch = image.shape
    tr_x = trans_range*np.random.uniform()-trans_range/2
    tr_y = trans_range*np.random.uniform()-trans_range/2
    Trans_M = np.float32([[1,0,tr_x],[0,1,tr_y]])
    return cv2.warpAffine(image,Trans_M,(cols,rows))

def shear_image(image, shear_range = 5):
    rows,cols,ch = image.shape
    pts1 = np.float32([[5,5],[20,5],[5,20]])
    pt1 = 5+shear_range*np.random.uniform()-shear_range/2
    pt2 = 20+shear_range*np.random.uniform()-shear_range/2
    pts2 = np.float32([[pt1,5],[pt2,pt1],[5,pt2]])
    shear_M = cv2.getAffineTransform(pts1,pts2)
    return cv2.warpAffine(image,shear_M,(cols,rows))

def gray_scale_image(image):  
    gray_scale_image = cv2.cvtColor(image,cv2.COLOR_RGB2GRAY)
    image[..., 0]    = gray_scale_image
    image[..., 1]    = change_brightness_image(gray_scale_image, isGrayScale = True)
    image[..., 2]    = change_brightness_image(gray_scale_image, isGrayScale = True)
    return image
    
    
# 0.5, 1.5
def change_brightness_image(image, brightness_range = 1.5, isGrayScale = False):  
    random_bright = 1 + brightness_range*np.random.uniform() - brightness_range/2 
    if not isGrayScale:
        image = cv2.cvtColor(image,cv2.COLOR_RGB2YCrCb).astype('float64')
        image[:,:,0] = np.ceil(image[:,:,0]*random_bright)
        image[:,:,0][image[:,:,0] > 255]  = 255
        image = image.astype('uint8')
        image = cv2.cvtColor(image,cv2.COLOR_YCrCb2RGB)
        return image
    else:    
        image = image.astype('float64')
        image = np.ceil(image*random_bright)
        image[image > 255]  = 255
        image = image.astype('uint8')
        return image        

def motion_blur_image(image):
    size = 3
    # generating the kernel
    kernel_motion_blur = np.zeros((size, size))
    kernel_motion_blur[int((size-1)/2), :] = np.ones(size)
    kernel_motion_blur = kernel_motion_blur / size
    return cv2.filter2D(image, -1, kernel_motion_blur)
    
def random_transform_image(image):    
    if np.random.randint(2) == 0:
        return image
    
    transformation_library = ['rotation',
                          'translation',
                          'shear',
                          'brightness',
                           'blur',
                           'grayscale'
                          ]
    
    transformation_id = transformation_library[np.random.randint(len(transformation_library))]
    
    if transformation_id == 'rotation':
        image = rotate_image(image)
        
    if transformation_id == 'translation':
        image = translate_image(image)
    
    if transformation_id == 'shear':
        image = shear_image(image)

    if transformation_id == 'brightness':
        image = change_brightness_image(image)
        
    if transformation_id == 'blur':
        image = motion_blur_image(image)
        
    if transformation_id == 'grayscale':
        image = gray_scale_image(image)
    
    return image


def generator_data(X_data):
    return np.array([random_transform_image(image) for image in X_data]).reshape(X_data.shape[0], 32,32,-1)





In [3]:
#*******************************************************************************#
# Sameer Pawar, Oct-2018                                                                  #
# LeNet implementation in TensorFlow                              
# To Do:
# 1. No data preprocessing or augmentation
# 1. accept a different initialization function
# 1. accept different activation function
# 1. Loss: only support cross-entropy, no l_2 regularizations
# 1. 

"""
1. Had to add inits for initializer, batc_norm, optimizer
1. had to remove shape from biases when values are specified
1. had to provide none for bool placeholder.
1. 
"""

# Usage:
# 1. Define class with n_classes
# 1. Call compile_model to set opitmizers etc
# 1. Call fit with X_train, y_train, X_valid, y_ valid
# 1. Call get_fit_results_per_epoch
#*******************************************************************************#
import tensorflow as tf
from tensorflow.contrib.layers import flatten, batch_norm
import numpy as np
import cv2
from sklearn.model_selection import train_test_split
from cv2 import createCLAHE

class LeNet:

    def __init__(self, n_classes):        
        self.n_classes = n_classes        
        self.initializer = tf.contrib.layers.xavier_initializer() 
        self.activation_fun = 'relu'
        self.batch_norm = False
        self.optimizer = tf.train.AdamOptimizer()
        self.layers = {}

        # Define weights and biases for the graph
        self.weights = {
        'w_conv_0': tf.get_variable("w_conv_0", shape = [1, 1, 3, 1],       initializer = self.initializer),
        'w_conv_1': tf.get_variable("w_conv_1", shape = [5, 5, 1, 6],       initializer = self.initializer),
        'w_conv_2': tf.get_variable("w_conv_2", shape = [5, 5, 6, 16],      initializer = self.initializer),
        'w_fc_3':   tf.get_variable("w_fc_3",   shape = [400, 120],         initializer = self.initializer),
        'w_fc_4':   tf.get_variable("w_fc_4",   shape = [120, 84],          initializer = self.initializer),
        'w_fc_5':   tf.get_variable("w_fc_5",  shape = [84, self.n_classes],initializer = self.initializer)
        }

        self.biases = {
        'b_conv_0': tf.get_variable("b_conv_0", initializer = tf.zeros(1)),
        'b_conv_1': tf.get_variable("b_conv_1", initializer = tf.zeros(6)),
        'b_conv_2': tf.get_variable("b_conv_2", initializer = tf.zeros(16)),
        'b_fc_3':   tf.get_variable("b_fc_3",   initializer = tf.zeros(120)),
        'b_fc_4':   tf.get_variable("b_fc_4",   initializer = tf.zeros(84)),
        'b_fc_5':   tf.get_variable("b_fc_5", initializer = tf.zeros(self.n_classes))
        }
        
        self.keep_probabilities = np.ones(len(self.weights))

        self.x = tf.placeholder(tf.float32, (None, 32, 32, 3))
        self.y = tf.placeholder(tf.int32, (None))
        self.training = tf.placeholder(tf.bool, (None))

        #********************************************************************************************
        # Define compute nodes in a graph
        #********************************************************************************************
        self.one_hot_y          = tf.one_hot(self.y, self.n_classes)
        self.logits             = self.get_logits(self.x, self.training)
        self.cross_entropy      = tf.nn.softmax_cross_entropy_with_logits(labels=self.one_hot_y, logits=self.logits)
        self.loss_operation     = tf.reduce_mean(self.cross_entropy)    
        self.training_operation = self.optimizer.minimize(self.loss_operation)        
        #********************************************************************************************
    
    def init_dropout_probabilities(self, dropout_probabilities):        
        if 'keep_conv_0' in dropout_probabilities:
            self.keep_probabilities[1] = dropout_probabilities['keep_conv_0']
        if 'keep_conv_1' in dropout_probabilities:
            self.keep_probabilities[2] = dropout_probabilities['keep_conv_1']
        if 'keep_conv_2' in dropout_probabilities:
            self.keep_probabilities[3] = dropout_probabilities['keep_conv_2']
        if 'keep_fc_3' in dropout_probabilities:
            self.keep_probabilities[4] = dropout_probabilities['keep_fc_3']
        if 'keep_fc_4' in dropout_probabilities:
            self.keep_probabilities[5] = dropout_probabilities['keep_fc_4']
        
        return


    def one_hot_encoding(self, labels):
        b = np.zeros((len(labels), self.n_classes))
        b[np.arange(len(labels)), labels] = 1
        return b

    def maxpool2d(self, x, k=2):
        return tf.nn.max_pool(
            x,
            ksize=[1, k, k, 1],
            strides=[1, k, k, 1],
            padding='SAME')

    def activation(self, x):    
        if self.activation_fun == 'relu':
            output = tf.nn.relu(x)
        elif self.activation_fun == 'elu':
            output = tf.nn.elu(x)
        elif self.activation_fun == 'lrelu':
            output = tf.nn.leaky_relu(x, alpha = 0.1)
        else:
            # default activation is 'linear'
            output = x

        if self.batch_norm:
            return batch_norm(output)
        else:
            return output

    def conv2d(self, x, W, b, strides=1):
        return tf.nn.conv2d(x, W, strides=[1, strides, strides, 1], padding='VALID') + b

    #*****************************************************************************************
    def histogram_equalize_image(self, image, gridSize = (4, 4)):
        clahe = createCLAHE(tileGridSize=gridSize)
        img_YCrCb = cv2.cvtColor(image, cv2.COLOR_RGB2YCrCb)            
        img_YCrCb[...,0] = clahe.apply(img_YCrCb[...,0])            
        image = cv2.cvtColor(img_YCrCb, cv2.COLOR_YCrCb2RGB)
        return image

    def normalize_image(self, image):
        # pixels after normalization are in range [-0.5, +0.5]
        return (image - np.min(image))/(np.max(image)-np.min(image)) - 0.5

    def preprocess_data(self, X_data):
        return np.array([self.normalize_image(self.histogram_equalize_image(image)) for image in X_data]).reshape(X_data.shape[0], 32,32,-1)
    #*****************************************************************************************    
   
    
    def get_logits(self, x, training = False):        
        if training == True:
            self.batch_norm = True
        else:
            self.batch_norm = False
                
        # conv-Layer 1 - 32*32*3 -> 32*32*1
        # maps 3 color channels to one generalized grayscale channel
        self.layers['l_1'] = self.conv2d(x, W = self.weights['w_conv_0'], b = self.biases['b_conv_0'])
        if self.keep_probabilities[1] < 1 and training:
            self.layers['l_1'] = tf.nn.dropout(self.layers['l_1'], self.dropout_probabilities[1])        

        # conv-Layer 2 - 32*32*1 -> 28*28*6->14*14*6
        self.layers['l_2'] = self.conv2d(self.layers['l_1'], self.weights['w_conv_1'], self.biases['b_conv_1'])
        self.layers['l_2'] = self.maxpool2d(self.layers['l_2'])
        self.layers['l_2'] = self.activation(self.layers['l_2'])
        if self.keep_probabilities[2] < 1 and training:
            self.layers['l_2'] = tf.nn.dropout(self.layers['l_2'], self.dropout_probabilities[2])        

        # conv-Layer 3 - 14*14*6 -> 10*10*16->5*5*16
        self.layers['l_3'] = self.conv2d(self.layers['l_2'], self.weights['w_conv_2'], self.biases['b_conv_2'])
        self.layers['l_3'] = self.maxpool2d(self.layers['l_3'])
        self.layers['l_3'] = self.activation(self.layers['l_3'])
        if self.keep_probabilities[3] < 1 and training:
            self.layers['l_3'] = tf.nn.dropout(self.layers['l_3'], self.dropout_probabilities[3])        

        # FC-Layer 4 - 5*5*16-> 400->120
        self.layers['l_4'] = tf.matmul(flatten(self.layers['l_3']), self.weights['w_fc_3']) + self.biases['b_fc_3']
        self.layers['l_4'] = self.activation(self.layers['l_4'])
        if self.keep_probabilities[4] < 1 and training:
            self.layers['l_4'] = tf.nn.dropout(self.layers['l_4'], self.dropout_probabilities[4])        
        
        # FC-Layer 5 - 120-> 84
        self.layers['l_5'] = tf.matmul(self.layers['l_4'], self.weights['w_fc_4']) + self.biases['b_fc_4']
        self.layers['l_5'] = self.activation(self.layers['l_5'])
        if self.keep_probabilities[5] < 1 and training:
            self.layers['l_5'] = tf.nn.dropout(self.layers['l_5'], self.dropout_probabilities[5])

        # FC-Layer 6 - 84 -> 43 (n_classes)
        self.layers['l_6'] = tf.matmul(self.layers['l_5'], self.weights['w_fc_5']) + self.biases['b_fc_5']

        
        logits = self.layers['l_6']

        return logits

    def compile(self, optimizer = None, loss=None, metrics = None):
        """
        #************************************************************************#
            - optimizer: String (name of optimizer) or optimizer instance.
            - loss: String (name of objective function) or objective function.
            - metrics:  List of metrics to be evaluated by the model during training 
            and testing. Typically you will use  metrics=['accuracy']. 
            Also can pass multiple metrics such as 'accuracy', 
            'Precision-recall-f1score', etc.
        #************************************************************************#            
        """
        if optimizer is None:
            self.optimizer = tf.train.AdamOptimizer()
        else:
            self.optimizer = optimizer
        
        if loss is None:
            self.loss = 'cross-entropy'
        else:
            self.loss = loss

        if metrics is None:
            self.metrics = 'accuracy'    
        else:
            self.metrics = metrics # dictionary of different metrics
           
    def fit(self, x=None, y=None, generator = None, batch_size=None, epochs=1, validation_split=0.0, validation_data=None, dropout_probabilities = None):
        """
        #************************************************************************#
            - x: input features
            - y: labels
            - generator: function that takes in a batch of data (x, y) and returns a batch of data of same size but with transformed/augmented data
            - batch_size
            - epochs
            - validation_split 
            - validation_data: dictionary {'features': x_valid, 'labels': y_valid}
            - dropout_probabilities: dictionary of Keep probabilities with key = 'keep_conv/fc_id'
        #************************************************************************#
        """
        self.training_loss          = np.zeros(epochs)
        self.training_accuracy      = np.zeros(epochs)
        self.training_precision     = np.zeros((self.n_classes, epochs))
        self.training_recall        = np.zeros((self.n_classes, epochs))
        self.training_F1score       = np.zeros((self.n_classes, epochs))

        self.validation_loss        = np.zeros(epochs)
        self.validation_accuracy    = np.zeros(epochs)
        self.validation_precision   = np.zeros((self.n_classes, epochs))
        self.validation_recall      = np.zeros((self.n_classes, epochs))
        self.validation_F1score     = np.zeros((self.n_classes, epochs))


        # Get training and validation data        
        if validation_data is not None:
            x_valid, y_valid = validation_data['features'], validation_data['labels']
        elif validation_split > 0:
            x, x_valid, y, y_valid = train_test_split(x, y, train_size=1-validation_split)
        else:
            x, x_valid, y, y_valid = train_test_split(x, y, train_size=1-0.2)
            
        # generate pre-processed training and validation sets for evaluations.
        x_train_preprocessed, y_train_preprocessed = (self.preprocess_data(x), y)
        x_valid_preprocessed, y_valid_preprocessed = (self.preprocess_data(x_valid), y_valid)
        
        # Read the keep-probabilities for dropout
        if dropout_probabilities is not None:
            self.init_dropout_probabilities(dropout_probabilities)

        with tf.Session() as sess:      
            sess.run(tf.global_variables_initializer())            
            for i in range(epochs):          
                print("running EPOCH {}".format(i+1))
                x, y = shuffle(x, y)
                for offset in range(0, x.shape[0], batch_size):
                    end = np.minimum(offset + batch_size, x.shape[0])
                    if generator is None:
                        batch_x, batch_y = self.preprocess_data(x[offset:end]), y[offset:end]
                    else:
                        batch_x, batch_y = self.preprocess_data(generator(x[offset:end])), y[offset:end]
                            
                    
                    sess.run(self.training_operation, feed_dict={self.x: batch_x, self.y: batch_y, self.training: True})
            
                # Compute the metrics for each epoch and log the results
                
                
                training_results   = self.evaluate(x_train_preprocessed, y_train_preprocessed)
                validation_results = self.evaluate(x_valid_preprocessed, y_valid_preprocessed)
                
                print("[training validation] accuracy is [{} {}]".format(training_results['accuracy'], validation_results['accuracy']))
                self.training_accuracy[i]    = training_results['accuracy']
                self.training_loss[i]        = training_results['loss']        
                self.training_precision[:,i] = training_results['precision']
                self.training_recall[:,i]    = training_results['recall']
                self.training_F1score[:,i]   = training_results['F1score']
            
                self.validation_accuracy[i]    = validation_results['accuracy']
                self.validation_loss[i]        = validation_results['loss']        
                self.validation_precision[:,i] = validation_results['precision']
                self.validation_recall[:,i]    = validation_results['recall']
                self.validation_F1score[:,i]   = validation_results['F1score']    
        return

    def evaluate(self, x=None, y=None):
        batch_size = 1024
        num_examples  = x.shape[0]
        sess = tf.get_default_session()
        total_TP = np.zeros(self.n_classes, dtype=np.float32)
        total_FP = np.zeros(self.n_classes, dtype=np.float32)
        total_FN = np.zeros(self.n_classes, dtype=np.float32)    
        total_correct_predictions = 0
        total_loss = 0        
        error_list = []
        for offset in range(0, num_examples, batch_size):
            end = np.minimum(offset + batch_size, num_examples)
            batch_x, batch_y = x[offset:end], y[offset:end]
            batch_logits, batch_loss = sess.run([self.logits, self.loss_operation], 
                                                    feed_dict = {self.x: batch_x, 
                                                            self.y: batch_y,
                                                            self.training: False}
                                                    )
                       
            predicted_labels = np.argmax(batch_logits, axis = 1)
            
            actuals      = self.one_hot_encoding(batch_y)                       
            predictions  = self.one_hot_encoding(predicted_labels)                
            total_TP += np.count_nonzero(predictions * actuals, axis=0)
            total_FP += np.count_nonzero(predictions * (actuals - 1), axis=0)
            total_FN += np.count_nonzero((predictions - 1) * actuals, axis=0)     
        
            
            total_correct_predictions += np.sum(np.equal(predicted_labels, batch_y))
            total_loss                += (batch_loss * batch_x.shape[0])

            predictions = np.array(predicted_labels).flatten()
            actuals     = np.array(batch_y).flatten()
            error_idx   = np.argwhere(np.not_equal(predictions, actuals)).ravel() + offset
            error_list.append(error_idx)        
        
        
        error_list = np.concatenate(error_list)
        accuracy  = total_correct_predictions/num_examples
        loss      = total_loss/num_examples
        precision = total_TP/(total_TP + total_FP + 1e-6)
        recall    = total_TP/(total_TP + total_FN + 1e-6)
        F1score   = 2 * precision * recall / (precision + recall + 1e-6)
        return {"loss": loss,
                "accuracy": accuracy,
                "precision": precision,
                "recall": recall,
                "F1score": F1score,
                "error_list": error_list
            }

    def get_fit_results_per_epoch(self):
        return {'training_loss': self.training_loss, 
                'training_accuracy': self.training_accuracy,
                'validation_loss': self.validation_loss,
                'validation_accuracy': self.validation_accuracy
        }
    
    def predict(self, x, top_k = 1):
        batch_size = 1024
        num_examples  = x.shape[0]
        sess = tf.get_default_session()
        prediction_result = np.ones((top_k, num_examples), dtype=np.float32)*(-1)
        for offset in range(0, num_examples, batch_size):
            end = np.minimum(offset + batch_size, num_examples)
            batch_x = x[offset:end]
            predicted_logits = sess.run(self.logits, feed_dict = {self.x: batch_x, self.training: False})
            prediction_result[:,offset:end] = np.argsort(predicted_logits, axis=0)[::-1][0:top_k+1,:]

        return prediction_result
    

# Train the Model


In [None]:
model = LeNet(43)
model.compile()
model.fit(x = X_train, y = y_train, batch_size = 64, 
          epochs = 5, 
          generator = generator_data,
          dropout_probabilities = {'keep_fc_3': 0.5, 'keep_fc_4': 0.5},
          validation_data = {'features': X_valid, 'labels': y_valid}
         )
results = model.get_fit_results_per_epoch()
training_loss = results['training_loss']
training_accuracy = results['training_accuracy']
validation_accuracy = results['validation_accuracy']
validation_loss = results['validation_loss']
print(training_accuracy)
print(validation_accuracy)



Instructions for updating:

Future major versions of TensorFlow will allow gradients to flow
into the labels input on backprop by default.

See @{tf.nn.softmax_cross_entropy_with_logits_v2}.

running EPOCH 1
[training validation] accuracy is [0.9291358947096181 0.8809523809523809]
running EPOCH 2
[training validation] accuracy is [0.9738498232707836 0.9328798185941043]
running EPOCH 3
