## Classify Trafic Signs image data using Keras CNN

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import keras
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
from keras.utils.np_utils import to_categorical
from keras.layers import Dropout, Flatten
from keras.layers.convolutional import Conv2D, MaxPooling2D
import pickle
import pandas as pd
import random

In [None]:
np.random.seed(0)

In [None]:
# Clone the github repo for train, validation and test data
!git clone https://bitbucket.org/jadslim/german-traffic-signs

# Read the pickle files
with open('german-traffic-signs/train.p', 'rb') as f:
    train_data = pickle.load(f)

with open('german-traffic-signs/valid.p', 'rb') as f:
    val_data = pickle.load(f)

with open('german-traffic-signs/test.p', 'rb') as f:
    test_data = pickle.load(f)

# Convert the dictionary to required array format for all the data
X_train, y_train = train_data['features'], train_data['labels']
X_val, y_val = val_data['features'], val_data['labels']
X_test, y_test = test_data['features'], test_data['labels']

In [None]:
print(X_train.shape)
print(X_val.shape)
print(X_test.shape)

In [None]:
# Check the input data
assert(X_train.shape[0] == y_train.shape[0]), 'Train: The number of images are not equal to the number of labels'
assert(X_val.shape[0] == y_val.shape[0]), 'Validation: The number of images are not equal to the number of labels'
assert(X_test.shape[0] == y_test.shape[0]), 'Test: The number of images are not equal to the number of labels'
assert(X_train.shape[1:] == (32, 32, 3)), "Train: The dimensions of the images are not 32 x 32 x 3"
assert(X_val.shape[1:] == (32, 32, 3)), "Train: The dimensions of the images are not 32 x 32 x 3"
assert(X_test.shape[1:] == (32, 32, 3)), "Train: The dimensions of the images are not 32 x 32 x 3"

In [None]:
# Read the csv file
data = pd.read_csv('german-traffic-signs/signnames.csv')
num_of_samples = []
cols = 5
num_classes = 43

fig, axs = plt.subplots(nrows=num_classes, ncols=cols, figsize=(5, 50))
fig.tight_layout()
for i in range(cols):
    for j, row in data.iterrows():
        x_selected = X_train[y_train==j]
        axs[j, i].imshow(x_selected[random.randint(0, (len(x_selected) - 1)), :, :], cmap=plt.get_cmap('gray'))
        axs[j, i].axis('off')
        if i == 2:
            axs[j, i].set_title(str(j) + '-' + row['SignName'])
            num_of_samples.append(len(x_selected))

In [None]:
 print(num_of_samples)
 plt.figure(figsize=(12, 4))
 plt.bar(range(num_classes), num_of_samples)
 plt.title('Distribution of the train dataset')
 plt.xlabel('Class number')
 plt.ylabel('Number of images')
 plt.show()

In [None]:
import cv2
plt.imshow(X_train[1000])
plt.axis('off')
print(X_train[1000].shape)
print(y_train[1000])

In [None]:
def grayscale(img):
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    return img

img = grayscale(X_train[1000])
print(img.shape)
plt.imshow(img, cmap='gray_r')
plt.axis('off')

In [None]:
def equalize(img):
    img = cv2.equalizeHist(img)
    return img

img = equalize(img)
print(img.shape)
plt.imshow(img, cmap='gray_r')
plt.axis('off')

In [None]:
def preprocessing(img):
    img = grayscale(img)
    img = equalize(img)
    img = img/255
    return img

X_train = np.array(list(map(preprocessing, X_train)))
X_val = np.array(list(map(preprocessing, X_val)))
X_test = np.array(list(map(preprocessing, X_test)))

In [None]:
plt.imshow(X_train[random.randint(0, len(X_train) - 1)], cmap='gray_r')
plt.axis('off')
print(X_train.shape)

In [None]:
X_train = X_train.reshape(34799, 32, 32, 1)
X_val = X_val.reshape(4410, 32, 32, 1)
X_test = X_test.reshape(12630, 32, 32, 1)

In [None]:
y_train = to_categorical(y_train, 43)
y_val = to_categorical(y_val, 43)
y_test = to_categorical(y_test, 43)

In [None]:
def leNet_model():
    model = Sequential()
    model.add(Conv2D(60, (5, 5), input_shape=(32, 32, 1), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(30, (3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Flatten())
    model.add(Dense(500, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(num_classes, activation='softmax'))
    model.compile(Adam(lr=0.001), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

In [None]:
model = leNet_model()
print(model.summary())

In [None]:
history = model.fit(X_train, y_train, epochs=10, validation_data=(X_val, y_val), batch_size=400, verbose=1, shuffle=1)

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.legend(['training', 'validation'])
plt.title('Loss')
plt.xlabel('epoch')
plt.show()

In [None]:
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.legend(['training', 'validation'])
plt.title('Accuracy')
plt.xlabel('epoch')
plt.show()

In [None]:
score = model.evaluate(X_test, y_test, verbose=0)
print("Test Score: ", score[0])
print("Test Accuracy: ", score[1])

In [None]:
# Modifications to improve accuracy: 
# Decrease Learning Rate from 0.01 to 0.001, 
# Increase the Convolutional filters from 30 to 60 and 15 to 30, 
# Add more Convolutional layers before each Pooling layer, 
# Add Dropout layer after 2nd Pool layer to prevent overfitting
def modified_model():
    model = Sequential()
    model.add(Conv2D(60, (5, 5), input_shape=(32, 32, 1), activation='relu'))
    model.add(Conv2D(60, (3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(30, (3, 3), activation='relu'))
    model.add(Conv2D(30, (3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.5))

    model.add(Flatten())
    model.add(Dense(500, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(num_classes, activation='softmax'))
    model.compile(Adam(lr=0.001), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

model_2 = modified_model()
print(model_2.summary())

In [None]:
history_2 = model_2.fit(X_train, y_train, epochs=10, validation_data=(X_val, y_val), batch_size=400, verbose=1, shuffle=1)

In [None]:
plt.plot(history_2.history['loss'])
plt.plot(history_2.history['val_loss'])
plt.legend(['training', 'validation'])
plt.title('Loss')
plt.xlabel('epoch')
plt.show()

In [None]:
plt.plot(history_2.history['accuracy'])
plt.plot(history_2.history['val_accuracy'])
plt.legend(['training', 'validation'])
plt.title('Accuracy')
plt.xlabel('epoch')
plt.show()

In [None]:
score_2 = model_2.evaluate(X_test, y_test, verbose=0)
print("Test Score: ", score_2[0])
print("Test Accuracy: ", score_2[1])