# Prequisite Tasks

In [None]:
# Upload the Training dataset from it's source to Collab
!wget https://sid.erda.dk/public/archives/daaeac0d7ce1152aea9b61d9f1e19370/GTSRB_Final_Training_Images.zip

In [None]:
# Unzip the Training dataset 
!unzip GTSRB_Final_Training_Images.zip

In [None]:
# Upload the Test dataset from it's source to Collab
!wget https://sid.erda.dk/public/archives/daaeac0d7ce1152aea9b61d9f1e19370/GTSRB_Final_Test_Images.zip


In [None]:
# Unzip the Test dataset 
!unzip GTSRB_Final_Test_Images.zip

In [None]:
# Upload the information related to Test data from it's source to Collab
!wget https://sid.erda.dk/public/archives/daaeac0d7ce1152aea9b61d9f1e19370/GTSRB_Final_Test_GT.zip

In [None]:
# Unzip the information regarding the categories in Test dataset.
!unzip GTSRB_Final_Test_GT.zip

Necessary imports and setup: 

In [None]:
import numpy as np
import pandas as pd
import os
import os.path as path
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import layers 
from sklearn.metrics import accuracy_score
import random
from skimage import transform,io,color,exposure
from random import shuffle
np.random.seed(42)
import matplotlib.pyplot as plt
from matplotlib import style
from matplotlib.image import imread
import seaborn as sns
from keras.models import Sequential, load_model
from keras.layers import Conv2D, MaxPool2D, Dense, Flatten, Dropout
from tensorflow.keras.utils import to_categorical    
import tensorflow as tf
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
tf.config.run_functions_eagerly(True)
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

In [None]:
# Specify the dataset path in Collab.

data_path = '/content/GTSRB'
train_path = '/content/GTSRB/Final_Training/Images'
test_path = '/content/GTSRB/Final_Test/Images'

In [None]:
# Names for each of the 43 categories in order:

category_names = [
    'Speed limit (20km/h)',
    'Speed limit (30km/h)',
    'Speed limit (50km/h)',
    'Speed limit (60km/h)',
    'Speed limit (70km/h)',
    'Speed limit (80km/h)',
    'End of speed limit (80km/h)',
    'Speed limit (100km/h)',
    'Speed limit (120km/h)',
    'No passing',
    'No passing for vehicles over 3.5 metric tons',
    'Right-of-way at the next intersection',
    'Priority road',
    'Yield',
    'Stop',
    'No vehicles',
    'Vehicles over 3.5 metric tons prohibited',
    'No entry',
    'General caution',
    'Dangerous curve to the left',
    'Dangerous curve to the right',
    'Double curve',
    'Bumpy road',
    'Slippery road',
    'Road narrows on the right',
    'Road work',
    'Traffic signals',
    'Pedestrians',
    'Children crossing',
    'Bicycles crossing',
    'Beware of ice/snow',
    'Wild animals crossing',
    'End of all speed and passing limits',
    'Turn right ahead',
    'Turn left ahead',
    'Ahead only',
    'Go straight or right',
    'Go straight or left',
    'Keep right',
    'Keep left',
    'Roundabout mandatory',
    'End of no passing',
    'End of no passing by vehicles over 3.5 metric tons'
]

# Descriptive Analysis

In [None]:
# Dictionary sign_data is initialized to store the number of files in each class (folder).

train_folder = os.listdir(train_path)
# Dictionary which has {class_name : file in each class}
# Used to visualize the data
sign_data = {}

for f in train_folder:
  train_files = os.listdir(train_path + '/' + f)
  # Length - 1 so we dont consider the .csv file in each folder
  class_len = len(train_files) - 1
  class_name = str(f)
  sign_data.update({class_name : class_len})
  
# Convert the dict to df for further use
df_class = pd.DataFrame.from_dict(sign_data, orient = 'index', columns = ['Total Count'])
df_class = df_class.sort_index()

1. Total number of images in Train and Test dataset

In [None]:
class_len = len(os.listdir(train_path))
train_len = df_class['Total Count'].sum()
test_len = len(os.listdir(test_path))

print(f'Total # of Sign Classfication - {class_len}.\nTrain Images - {train_len}\nTest Images - {test_len}')

2. Read the csv file in each training class and convert it into dataframe for training set.

In [None]:
train_csv = [ train_path +'/' + f1 + '/' + f2 for f1 in os.listdir(train_path) 
            for f2 in os.listdir(train_path + '/' + f1) if f2.endswith('.csv')]
train_csv.sort()

df_traincsv = pd.DataFrame()
for csv in train_csv:
  df_temp = pd.read_csv(csv, sep = ';')
  frames = [df_traincsv, df_temp]
  df_traincsv = pd.concat(frames)

print(df_traincsv.shape)
df_traincsv.head()

3. Read the csv file in test image folder and convert it into dataframe for test set.

In [None]:
#test_csv = [test_path + '/' + f for f in os.listdir(test_path) if f.endswith('.csv')]
test_csv = '/content/GT-final_test.csv'
df_testcsv = pd.read_csv(test_csv, sep = ';')
print(df_testcsv.shape)
df_testcsv.head()

4. (a) Analysis for Training dataset.

Barplot to show the number of images per category in the training dataset. 

In [None]:
# count the number of pictures in each category
category_counts = df_traincsv['ClassId'].value_counts().sort_values()
sorted_categories = [category_names[i] for i in category_counts.index]
# create a bar plot using seaborn
sns.set(style="darkgrid")
plt.figure(figsize=(13,13))
sns.countplot(y='ClassId', data=df_traincsv, order=category_counts.index)
plt.yticks(range(len(category_names)), sorted_categories)
plt.xlabel('Number of Pictures')
plt.ylabel('Category Names')
plt.title('Number of Pictures in each Category')
# adjust the spacing
plt.subplots_adjust(left=0.25, right=0.9, top=0.9, bottom=0.1)

# adjust the font size
plt.rc('xtick', labelsize=10)
plt.rc('ytick', labelsize=20)
plt.rc('axes', labelsize=12)
plt.show()

Width and Height distribution of images in training set

In [None]:
fig, (ax1,ax2) = plt.subplots(1,2, figsize=(20,5))
sns.histplot(data = df_traincsv, x = df_traincsv['Width'],  binwidth = 10,  color = 'orange',  kde= True,ax = ax1)
sns.histplot(data = df_traincsv, x = df_traincsv['Height'],  binwidth = 10, color = 'magenta', kde= True, ax = ax2)
ax1.set_title('Width Distribution - Train')
ax2.set_title('Height Distribution - Train')

The width distribution of pictures per category in training set:

In [None]:
# Count the number of pictures in each category
category_counts = df_traincsv['ClassId'].value_counts().sort_values()

# Create a figure with subplots for each category
nrows = (len(category_counts) - 1) // 5 + 1
ncols = min(len(category_counts), 5)
fig, axes = plt.subplots(nrows=nrows, ncols=ncols, figsize=(30, nrows*6))

# Loop over each category that has data
for i, category in enumerate(category_counts.index):
    # Get the subset of data for the current category
    category_df = df_traincsv[df_traincsv['ClassId'] == category]
    
    # Plot the distribution of widths
    sns.histplot(data=category_df, x="Width", stat="count", ax=axes[i//ncols, i%ncols])
    axes[i//ncols, i%ncols].set_xlabel('Count')
    axes[i//ncols, i%ncols].set_ylabel('Width')
    axes[i//ncols, i%ncols].set_title(category_names[category])

# Remove empty subplots
for j in range(len(category_counts), nrows*ncols):
    fig.delaxes(axes[j//ncols, j%ncols])

plt.tight_layout()
plt.show()

4. (b) Analysis for test dataset:

Barplot to show the number of images per category in the test dataset. 

In [None]:
# count the number of pictures in each category
category_counts = df_testcsv['ClassId'].value_counts().sort_values()
sorted_categories = [category_names[i] for i in category_counts.index]
# create a bar plot using seaborn
sns.set(style="darkgrid")
plt.figure(figsize=(13,13))
sns.countplot(y='ClassId', data=df_testcsv, order=category_counts.index)
plt.yticks(range(len(category_names)), sorted_categories)
plt.xlabel('Number of Pictures')
plt.ylabel('Category Names')
plt.title('Number of Pictures in each Category - TEST')
# adjust the spacing
plt.subplots_adjust(left=0.25, right=0.9, top=0.9, bottom=0.1)

# adjust the font size
plt.rc('xtick', labelsize=10)
plt.rc('ytick', labelsize=20)
plt.rc('axes', labelsize=12)
plt.show()

Width and Height distribution of images in testset

In [None]:
fig, (ax1,ax2) = plt.subplots(1,2, figsize=(20,5))
sns.histplot(data = df_testcsv, x = df_testcsv['Width'],  binwidth = 10,  color = 'r',  kde= True,ax = ax1)
sns.histplot(data = df_testcsv, x = df_testcsv['Height'],  binwidth = 10, color = 'gold', kde= True, ax = ax2)
ax1.set_title('Width Distribution - Test')
ax2.set_title('Height Distribution - Test')

5. Display random images from the train set

In [None]:
# Common method to plot images based on flag
# flag = img -> img file is passed
# flag = np -> np array is passed
# tot_img = actual img required + 1
def plt_img(img_list,tot_img, flag = 'img'):
  plt.figure(figsize=(50,50))
  for i in range(1,tot_img):
    plt.subplot(5,5,i)
    if flag == 'img':
       #r_i = random.choice(img_list)
       r_img = imread(img_list[i-1])
       plt.imshow(r_img)
    elif flag == 'np':
        img = X_train[list_index[i-1]]
        plt.imshow(img , interpolation='nearest', cmap= 'gray')  
    plt.grid()

In [None]:
train_img = [ train_path +'/' + f1 + '/' + f2 for f1 in os.listdir(train_path) 
            for f2 in os.listdir(train_path + '/' + f1) if f2.endswith('.ppm')]
train_img.sort()

In [None]:
train_rimg = [random.choice(train_img) for i in range(0,25)]
plt_img(train_rimg,26)

6. Display random images from the test set

In [None]:
test_img = [ test_path +'/' + f1 for f1 in os.listdir(test_path) if f1.endswith('.ppm')]
test_img.sort()

In [None]:
test_rimg = [random.choice(test_img) for i in range(0,25)]
plt_img(test_rimg,26)

# Preprocessing

1. Resize images to a fixed size, apply other preprocessing like grayscaling, histogram equalization and normalization for Training data.

In [None]:
# Get the folder names located in the training data
train_folder = [folder for folder in os.listdir(train_path)]
train_folder.sort()

In [None]:
# Time to run ~1m5s
img_width = 32
img_height = 32
img_preprocessed = []
img_err = []

for folder_name in train_folder:
  img_folder = train_path + '/' + folder_name
  for img_file in os.listdir(img_folder):
    img_path = img_folder + '/' + img_file
    if img_path.endswith('.ppm'):
      try:
        img = io.imread(img_path)
        gray_img = color.rgb2gray(img)
        gray_img = exposure.equalize_hist(gray_img)
        img_resize = transform.resize(gray_img, (img_width, img_height))
        img = tf.keras.utils.normalize(img_resize,axis=1)
        img_preprocessed.append([img, folder_name])
      except Exception:
        img_err.append([img, folder_name])


In [None]:
# Shuffle the preprecoessed image data
shuffle(img_preprocessed)

X_train = []
y_train = []

for image,classid in img_preprocessed:
  X_train.append(image)
  y_train.append(classid)

2. Display the preprocessed images

In [None]:
list_index = [train_img.index(i) for i in train_rimg]
plt_img(None,11,'np')

3. Reshape the X_train to gray channel

In [None]:
X_array = np.array(X_train)
X_train = X_array.reshape(-1,img_width,img_height,1)
y_train = np.array(y_train)

print(f'Shape of X {X_train.shape}.\nShape of y {y_train.shape}')


4. Preprocessing for test data to fit the model.

In [None]:
# Time to exe ~37s
img_width = 32
img_height = 32
img_test_pre = []
img_test_err = []

for img in os.listdir(test_path):
  img_path = test_path +'/' + img
  if img_path.endswith('.ppm'):
     try:
        classid = df_testcsv[df_testcsv['Filename'] == img]['ClassId'].item()
        classid = str(classid)
        classid = '0000' + classid if len(classid) == 1 else '000' + classid
        img_i = io.imread(img_path)
        gray_img = color.rgb2gray(img_i)
        gray_img = exposure.equalize_hist(gray_img)
        img_resize = transform.resize(gray_img, (img_width, img_height))
        img = tf.keras.utils.normalize(img_resize,axis=1)
        img_test_pre.append([img, classid])
     except Exception as exp:
        img_test_err.append([img, folder_name])

In [None]:
# Shuffle the preprecoessed image data
shuffle(img_test_pre)

X_test = []
y_test = []

for image,classid in img_test_pre:
  X_test.append(image)
  y_test.append(classid)



5. Arranging preprocessed data so it can fit the model accurately.

In [None]:
X_arr = np.array(X_test)
X_test = X_arr.reshape(-1,img_width,img_height,1)
y_test = np.array(y_test)

print(f'Shape of X {X_test.shape}.\nShape of y {y_test.shape}')

In [None]:
y_train = to_categorical(y_train, 43)
y_test = to_categorical(y_test, 43)

print(y_train.shape)
print(y_test.shape)

# Model Implementation

 Neural Network Implemetation

In [None]:
model = Sequential()

model.add(Conv2D(filters=32, kernel_size=(5,5), activation='relu', input_shape=X_train.shape[1:]))
model.add(Conv2D(filters=64, kernel_size=(5,5), activation='relu'))

model.add(MaxPool2D(pool_size=(2, 2)))
model.add(Dropout(rate=0.25))

model.add(Conv2D(filters=32, kernel_size=(5, 5), activation='relu'))
model.add(Conv2D(filters=64, kernel_size=(5, 5), activation='relu'))

model.add(MaxPool2D(pool_size=(2, 2)))
model.add(Dropout(rate=0.25))

model.add(Flatten())
model.add(Dense(256, activation='relu'))

model.add(Dropout(rate=0.5))
model.add(Dense(43, activation='softmax'))

model.compile(loss='categorical_crossentropy', 
              optimizer="adam", 
              metrics=['accuracy'])

In [None]:
model.summary()

In [None]:
# Time to exe ~7m
batch_size = 64
epoch = 10

history = model.fit(X_train, y_train, 
                    batch_size=batch_size, 
                    epochs=epoch, 
                    validation_data=(X_test, y_test))

# Result And Analysis

 Model Evaluation

In [None]:
df_history = pd.DataFrame(history.history)

In [None]:
fig, (ax1,ax2) = plt.subplots(1,2, figsize=(20,5))
sns.lineplot(data = df_history[['accuracy','val_accuracy']], palette = 'hot_r', ax = ax1)
sns.lineplot(data = df_history[['loss','val_loss']],palette = 'hot_r', ax = ax2)
ax1.set_title('Model Accuracy')
ax2.set_title('Model Loss')
ax1.set(xlabel='Epochs', ylabel='Accuracy')
ax2.set(xlabel='Epochs', ylabel='Loss')
ax1.grid()
ax2.grid()

In [None]:
fig,ax = plt.subplots(figsize=(6,4))
sns.lineplot(data = df_history,  palette = 'seismic', ax = ax)
ax.grid()

In [None]:
loss,accuracy = model.evaluate(X_test, y_test)
print(f'Accuracy of the model is {accuracy:.4f}.\nLoss of the model is {loss:.4f}')

Test Image - Validation

In [None]:
img_data = []
img_labels = df_testcsv['ClassId'].values
img_name = df_testcsv['Filename'].values
img_size = 32


for img in img_name:
  img_path = test_path +'/' + img
  if img_path.endswith('.ppm'):
     try:
        img_i = io.imread(img_path)
        gray_img = color.rgb2gray(img_i)
        img_resize = transform.resize(gray_img, (img_width, img_height))
        img_arr = img_resize.reshape(img_size, img_size, 1)
        img_data.append(img_arr)
     except Exception as exp:
        print('Error occured!')

X_test_pred =  np.array(img_data) 
pred = np.argmax(model.predict(X_test_pred),axis= -1)


In [None]:
print(f'Test Data accuracy: {accuracy_score(img_labels, pred)*100: .4f}')

In [None]:
# Classification Report

class_report = classification_report(img_labels, pred,output_dict=True)
df_report = pd.DataFrame(class_report).transpose()
df_report

In [None]:
# Confusion matrix

conf_mat = confusion_matrix(img_labels, pred)
df_conf = pd.DataFrame(conf_mat, index = category_names, columns = category_names )
plt.figure(figsize = (15,15))
sns.heatmap(df_conf, cmap = 'bone', fmt = '.2g', annot=True)

In [None]:
# Randomly compare the predicted value along with ground truth
plt.figure(figsize=(50,50))
for i in range(1,26):
  plt.subplot(5,5,i)
  rand = random.randint(0,12000)
  y_pred = pred[rand]
  y = img_labels[rand]
  col = 'g' if y == y_pred else 'r'
  plt.title(f'Actual={y}  Pred={y_pred}', color = col,fontsize = 20)
  plt.imshow(X_test_pred[rand], cmap = 'gray')