In [None]:
from keras.layers import MaxPool2D, Dense, Activation, Dropout, Flatten, Conv2D, MaxPooling2D
from keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from keras.models import Sequential
from keras.optimizers import SGD
from matplotlib.image import imread
from keras.utils import to_categorical
from PIL import Image, ImageFont
import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np
import visualkeras
import random
import cv2
import os

In [None]:
df = pd.read_csv("data/Meta.csv")
num_classes = df.shape[0]
meta_path, classes = df["Path"], df["ClassLabel"]
font = ImageFont.truetype("arial.ttf", 15)
seed = 123

In [None]:
train_path = "data/Train/"
test_path = "data/Test/"

#### Traffic signs visualization

In [None]:
fig, ax = plt.subplots(7, 7, figsize=(10, 10))

for i, path in enumerate(meta_path):
    img = plt.imread("data/" + path)

    row, col = i // 7, i % 7

    ax[row, col].imshow(img)
    ax[row, col].axis("off")
    ax[row, col].set_title(f"({i}) {classes[i]}", fontdict={"fontsize": 7}, y=-0.3)

fig.suptitle("Traffic signs", fontsize=15)
fig.subplots_adjust(top=1.5)
fig.tight_layout(pad=0.75)
plt.show()

#### Plotting the distribution of images among different classes in the training set

In [None]:
folders = os.listdir(train_path)

label_amount, class_num = [], []

for label in folders:
    train_files = os.listdir(train_path + "/" + label)
    label_amount.append(len(train_files))
    class_num.append(label)

cmap = plt.get_cmap("BuPu")
normalize = plt.Normalize(vmin=min(label_amount), vmax=max(label_amount))
colors = [cmap(normalize(value)) for value in label_amount]

plt.figure(figsize=(12, 8))
plt.bar(class_num, label_amount, color=colors)
plt.xticks(class_num, rotation="vertical")
plt.title("Distribution of images among different classes", fontsize=15)
plt.xlabel("Class ID")
plt.ylabel("amount")
plt.show()

#### Displaying random selection of test images with dimensions

In [None]:
images_path = os.listdir(test_path)

fig, ax = plt.subplots(5, 5, figsize=(10, 10))

for i in range(25):
    rand_img = imread(test_path + "/" + random.choice(images_path))

    row, col = i // 5, i % 5

    ax[row, col].imshow(rand_img)
    ax[row, col].set_xlabel(rand_img.shape[1], fontsize=10)
    ax[row, col].set_ylabel(rand_img.shape[0], fontsize=10)

fig.suptitle("Random selection of test images", fontsize=15)
fig.subplots_adjust(top=1.5)
fig.tight_layout(pad=0.8)
plt.show()

#### Data loading

In [None]:
images, labels = [], []

for label in range(num_classes):
    path = os.path.join(train_path, str(label))
    imgs = os.listdir(path)
    for img in imgs:
        image = Image.open(path + "/" + img)
        image = image.resize((30, 30))
        image = np.array(image)

        images.append(image)
        labels.append(label)

In [None]:
images, labels = np.array(images), np.array(labels)

print(images.shape, labels.shape)

#### Data splitting

In [None]:
X_train, X_rem, y_train, y_rem = train_test_split(images, labels, test_size=0.3, shuffle=True, random_state=seed)
X_val, X_test, y_val, y_test = train_test_split(X_rem, y_rem, test_size=0.5, shuffle=True, random_state=seed)

print(X_train.shape, X_val.shape, X_test.shape, y_train.shape, y_val.shape, y_test.shape)

#### Preprocessing

In [None]:
def normalize(img):
    return cv2.normalize(img, None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8U)


def grayscale(img):
    return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)


def equalize(img):
    return cv2.equalizeHist(img)


def preprocess(img):
    img = normalize(img)
    img = grayscale(img)
    img = equalize(img)
    img = img / 255
    return img

#### Preprocessing steps for sample image from training dataset

In [None]:
fig, ax = plt.subplots(1, 4, figsize=(8, 8))

img_sample = X_train[16]
ax[0].imshow(img_sample)

img_sample = normalize(img_sample)
ax[1].imshow(img_sample)

img_sample = grayscale(img_sample)
ax[2].imshow(img_sample)

img_sample = equalize(img_sample)
ax[3].imshow(img_sample)

ax[0].set_title("X_train[0]")
ax[1].set_title("normalize")
ax[2].set_title("grayscale")
ax[3].set_title("equalize")

In [None]:
X_train = np.array(list(map(preprocess, X_train)))
X_test = np.array(list(map(preprocess, X_test)))
X_val = np.array(list(map(preprocess, X_val)))

In [None]:
X_train = np.expand_dims(X_train, axis=-1)
X_test = np.expand_dims(X_test, axis=-1)
X_val = np.expand_dims(X_val, axis=-1)

#### Data Augmentation

In [None]:
datagen = ImageDataGenerator(rotation_range=10,
                             width_shift_range=0.1,
                             height_shift_range=0.1,
                             shear_range=0.1,
                             zoom_range=0.1,
                             horizontal_flip=True,
                             fill_mode="nearest")

In [None]:
y_train = to_categorical(y_train, num_classes)
y_val = to_categorical(y_val, num_classes)
y_test = to_categorical(y_test, num_classes)

#### Model Initialization

In [None]:
input_shape = (30, 30, 1)

In [None]:
LeNet5_model = Sequential([
    Conv2D(filters=6, kernel_size=(5, 5), activation="relu", input_shape=input_shape),
    MaxPool2D(pool_size=(2, 2)),
    Conv2D(filters=16, kernel_size=(5, 5), activation="relu"),
    MaxPool2D(pool_size=(2, 2)),
    Flatten(),
    Dense(units=120, activation="relu"),
    Dense(units=84, activation="relu"),
    Dense(units=num_classes, activation="softmax")
])

In [None]:
AlexNet_model = Sequential([
    # 1st Convolutional Layer
    Conv2D(filters=96, kernel_size=(11, 11), strides=(4, 4), padding="same", input_shape=input_shape),
    Activation("relu"),
    MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding="same"),

    # 2nd Convolutional Layer
    Conv2D(filters=256, kernel_size=(5, 5), strides=(1, 1), padding="same"),
    Activation("relu"),
    MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding="same"),

    # 3rd Convolutional Layer
    Conv2D(filters=384, kernel_size=(3, 3), strides=(1, 1), padding="same"),
    Activation("relu"),

    # 4th Convolutional Layer
    Conv2D(filters=384, kernel_size=(3, 3), strides=(1, 1), padding="same"),
    Activation("relu"),

    # 5th Convolutional Layer
    Conv2D(filters=256, kernel_size=(3, 3), strides=(1, 1), padding="same"),
    Activation("relu"),
    MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding="same"),

    # Flatten the layers
    Flatten(),

    # 1st Dense Layer
    Dense(4096, input_shape=(30 * 30 * 1,)),
    Activation("relu"),
    Dropout(0.4),

    # 2nd Dense Layer
    Dense(4096),
    Activation("relu"),
    Dropout(0.4),

    # Output Layer
    Dense(num_classes),
    Activation("softmax")
])

In [None]:
VGG16_model = Sequential([
    # Block 1
    Conv2D(filters=64, kernel_size=(3, 3), activation="relu", padding="same", input_shape=input_shape),
    Conv2D(filters=64, kernel_size=(3, 3), activation="relu", padding="same"),
    MaxPooling2D(pool_size=(2, 2), strides=(2, 2)),

    # Block 2
    Conv2D(filters=128, kernel_size=(3, 3), activation="relu", padding="same"),
    Conv2D(filters=128, kernel_size=(3, 3), activation="relu", padding="same"),
    MaxPooling2D(pool_size=(2, 2), strides=(2, 2)),

    # Block 3
    Conv2D(filters=256, kernel_size=(3, 3), activation="relu", padding="same"),
    Conv2D(filters=256, kernel_size=(3, 3), activation="relu", padding="same"),
    Conv2D(filters=256, kernel_size=(3, 3), activation="relu", padding="same"),
    MaxPooling2D(pool_size=(2, 2), strides=(2, 2)),

    # Block 4
    Conv2D(filters=512, kernel_size=(3, 3), activation="relu", padding="same"),
    Conv2D(filters=512, kernel_size=(3, 3), activation="relu", padding="same"),
    Conv2D(filters=512, kernel_size=(3, 3), activation="relu", padding="same"),
    MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding="same"),

    # Block 5
    Conv2D(filters=512, kernel_size=(3, 3), activation="relu", padding="same"),
    Conv2D(filters=512, kernel_size=(3, 3), activation="relu", padding="same"),
    Conv2D(filters=512, kernel_size=(3, 3), activation="relu", padding="same"),
    MaxPooling2D(pool_size=(2, 2), strides=(2, 2), padding="same"),

    # Fully connected layers
    Flatten(),
    Dense(units=4096, activation="relu"),
    Dropout(0.5),
    Dense(units=4096, activation="relu"),
    Dropout(0.5),
    Dense(units=num_classes, activation="softmax")
])

#### Visualization of model architecture

In [None]:
visualkeras.layered_view(LeNet5_model, legend=True, font=font, draw_volume=False)

In [None]:
visualkeras.layered_view(AlexNet_model, legend=True, font=font, draw_volume=False)

In [None]:
visualkeras.layered_view(VGG16_model, legend=True, font=font, draw_volume=False)

In [None]:
def train_model(model, X_train, y_train, X_val, y_val, lr, momentum, batch_size, num_epochs):
    # optimizer = SGD(learning_rate=lr, momentum=momentum)
    optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
    model.compile(loss="categorical_crossentropy", optimizer=optimizer, metrics=["accuracy"])

    model.summary()

    history = model.fit(datagen.flow(X_train, y_train, batch_size=batch_size), epochs=num_epochs,
                        validation_data=(X_val, y_val))

    return history

In [None]:
def evaluate_model(model, X_test, y_test):
    test_loss, test_acc = model.evaluate(X_test, y_test)

    print(f"Test Loss: {test_loss}")
    print(f"Test Accuracy: {round(test_acc * 100, 3)}%")

In [None]:
LeNet5_history = train_model(LeNet5_model, X_train, y_train, X_val, y_val,
                             lr=1e-4, momentum=0.9, batch_size=32, num_epochs=30)

In [None]:
evaluate_model(LeNet5_model, X_test, y_test)

In [None]:
AlexNet_history = train_model(AlexNet_model, X_train, y_train, X_val, y_val,
                              lr=1e-4, momentum=0.9, batch_size=32, num_epochs=15)

In [None]:
evaluate_model(AlexNet_model, X_test, y_test)

In [None]:
VGG16_history = train_model(VGG16_model, X_train, y_train, X_val, y_val,
                            lr=1e-4, momentum=0.9, batch_size=32, num_epochs=15)

In [None]:
evaluate_model(VGG16_model, X_test, y_test)

#### Plotting training and validation accuracy of each model

In [None]:
def plot_history(history, model_name):
    train_acc = history.history["accuracy"]
    val_acc = history.history["val_accuracy"]
    epochs = range(1, len(train_acc) + 1)
    plt.plot(epochs, train_acc, label="Training accuracy")
    plt.plot(epochs, val_acc, label="Validation accuracy")
    plt.title(f"Training and validation accuracy for {model_name}")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.show()

In [None]:
plot_history(LeNet5_history, "LeNet-5")

In [None]:
plot_history(AlexNet_history, "AlexNet")

In [None]:
plot_history(VGG16_history, "VGG16")

In [None]:
test_data = pd.read_csv("data/Test.csv")

test_labels, test_imgs_path = test_data["ClassId"].values, test_data["Path"].values
test_imgs = []

for img in test_imgs_path:
    image = Image.open("data/" + img)
    image = image.resize((30, 30))
    test_imgs.append(np.array(image))

test_imgs = np.array(test_imgs) / 255

In [None]:
test_pred_LeNet5 = np.argmax(LeNet5_model.predict(test_imgs), axis=1)
test_pred_AlexNet = np.argmax(AlexNet_model.predict(test_imgs), axis=1)
test_pred_VGG16 = np.argmax(VGG16_model.predict(test_imgs), axis=1)

#Accuracy with the test data
print(accuracy_score(test_labels, test_pred_LeNet5))
print(accuracy_score(test_labels, test_pred_AlexNet))
print(accuracy_score(test_labels, test_pred_VGG16))

#### Saving machine learning models

In [None]:
LeNet5_model.save("traffic_classifier_LeNet5.h5")
AlexNet_model.save("traffic_classifier_AlexNet.h5")
VGG16_model.save("traffic_classifier_VGG16.h5")