In [4]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [5]:
import cv2
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from PIL import Image
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import accuracy_score
np.random.seed(42)

from matplotlib import style
style.use('fivethirtyeight')

In [6]:
print(tf.__version__)

In [7]:
!python --version

In [8]:
data_dir = '../input/gtsrb-german-traffic-sign'
train_dir = '../input/gtsrb-german-traffic-sign/Train'
test_path = '../input/gtsrb-german-traffic-sign/Test'

In [9]:
IMG_HEIGHT = 30
IMG_WIDTH = 30
channels = 3

In [10]:
NUM_CATEGORIES = len(os.listdir(train_dir))
NUM_CATEGORIES

In [11]:
# Label Overview
classes = { 0:'Speed limit (20km/h)',
            1:'Speed limit (30km/h)', 
            2:'Speed limit (50km/h)', 
            3:'Speed limit (60km/h)', 
            4:'Speed limit (70km/h)', 
            5:'Speed limit (80km/h)', 
            6:'End of speed limit (80km/h)', 
            7:'Speed limit (100km/h)', 
            8:'Speed limit (120km/h)', 
            9:'No passing', 
            10:'No passing veh over 3.5 tons', 
            11:'Right-of-way at intersection', 
            12:'Priority road', 
            13:'Yield', 
            14:'Stop', 
            15:'No vehicles', 
            16:'Veh > 3.5 tons prohibited', 
            17:'No entry', 
            18:'General caution', 
            19:'Dangerous curve left', 
            20:'Dangerous curve right', 
            21:'Double curve', 
            22:'Bumpy road', 
            23:'Slippery road', 
            24:'Road narrows on the right', 
            25:'Road work', 
            26:'Traffic signals', 
            27:'Pedestrians', 
            28:'Children crossing', 
            29:'Bicycles crossing', 
            30:'Beware of ice/snow',
            31:'Wild animals crossing', 
            32:'End speed + passing limits', 
            33:'Turn right ahead', 
            34:'Turn left ahead', 
            35:'Ahead only', 
            36:'Go straight or right', 
            37:'Go straight or left', 
            38:'Keep right', 
            39:'Keep left', 
            40:'Roundabout mandatory', 
            41:'End of no passing', 
            42:'End no passing veh > 3.5 tons' }

## Visualize dataset

In [12]:
folders = os.listdir(train_dir)

train_number = []
class_num = []

for folder in folders:
    train_files = os.listdir(os.path.join(train_dir, folder))
    train_number.append(len(train_files))
    class_num.append(classes[int(folder)])
    
# Sorting the dataset on the basis of number of images in each class
zipped_lists = zip(train_number, class_num)
sorted_pairs = sorted(zipped_lists)
tuples = zip(*sorted_pairs)
train_number, class_number = [list(tup) for tup in tuples]

# Plotting the number of images in each class
plt.figure(figsize=(20,10))
plt.bar(x=class_num, height=train_number)
plt.xticks(class_number, rotation='vertical')
plt.show()

In [13]:
# Visualize 25 random images from test data
import random
from matplotlib.image import imread

test = pd.read_csv(os.path.join(data_dir, 'Test.csv'))
images = test['Path'].values

plt.figure(figsize=(25,25))

for i in range(1, 26):
    plt.subplot(5, 5, i)
    random_image_path = os.path.join(data_dir, random.choice(images))
    rand_img = imread(random_image_path)
    plt.imshow(rand_img)
    plt.grid()
    plt.xlabel(rand_img.shape[1], fontsize=20)
    plt.ylabel(rand_img.shape[0], fontsize=20)

## Collecting the training dataset

In [14]:
image_data = []
image_labels = []

for i in range(NUM_CATEGORIES):
    path = os.path.join(train_dir, str(i))
    images = os.listdir(path)
    for img in images:
        try:
            image = cv2.imread(os.path.join(path, img))
            image_fromarray = Image.fromarray(obj=image, mode='RGB')
            resize_image = image_fromarray.resize((IMG_HEIGHT, IMG_WIDTH))
            image_data.append(np.array(resize_image))
            image_labels.append(i)
        except:
            print("Error in: ", + img)

In [15]:
image_data = np.array(image_data)

In [16]:
image_labels = np.array(image_labels)

In [17]:
print(image_data.shape, image_labels.shape)

## Shuffling the training data
(I see, it's useless in this because i'll use train_test_split)

In [18]:
shuffle_indexes = np.arange(image_data.shape[0])
np.random.shuffle(shuffle_indexes)
image_data = image_data[shuffle_indexes]
image_labels = image_labels[shuffle_indexes]

## Split training dataset => train and validation set

In [19]:
X_train, X_val, y_train, y_val = train_test_split(image_data, image_labels, test_size=0.25, random_state=42)

X_train = X_train / 255.0
X_val = X_val / 255.0

print("X_train.shape", X_train.shape)
print("X_valid.shape", X_val.shape)
print("y_train.shape", y_train.shape)
print("y_valid.shape", y_val.shape)

In [20]:
y_train = keras.utils.to_categorical(y=y_train, num_classes=NUM_CATEGORIES)
y_val = keras.utils.to_categorical(y=y_val, num_classes=NUM_CATEGORIES)
print(y_train.shape)
print(y_val.shape)

## Create the model

In [21]:
model = keras.models.Sequential()

model.add(keras.layers.Conv2D(filters=16, kernel_size=(3,3), activation='relu', input_shape=(IMG_HEIGHT,IMG_WIDTH,channels)))
model.add(keras.layers.Conv2D(filters=32, kernel_size=(3,3), activation='relu'))
model.add(keras.layers.MaxPool2D(pool_size=(2, 2)))
model.add(keras.layers.BatchNormalization(axis=-1))

model.add(keras.layers.Conv2D(filters=64, kernel_size=(3,3), activation='relu'))
model.add(keras.layers.Conv2D(filters=128, kernel_size=(3,3), activation='relu'))
model.add(keras.layers.MaxPool2D(pool_size=(2, 2)))
model.add(keras.layers.BatchNormalization(axis=-1))

model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(512, activation='relu'))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Dropout(rate=0.5))

model.add(keras.layers.Dense(43, activation='softmax'))

In [22]:
lr = 0.001
epochs = 35

opt = Adam(learning_rate=lr, decay=lr / (epochs * 0.5))
model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'])

In [23]:
model.summary()

In [24]:
# Augment training set
aug = ImageDataGenerator(
    rotation_range=10,
    zoom_range=0.15,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.15,
    horizontal_flip=False,
    vertical_flip=False,
    fill_mode="nearest")
history = model.fit(aug.flow(X_train, y_train, batch_size=32), epochs=epochs, validation_data=(X_val, y_val))

## Training result

In [25]:
pd.DataFrame(history.history).plot(figsize=(8, 5))
plt.grid(True)
plt.gca().set_ylim(0, 1)
plt.show()

## Loading the test data and running the predictions

In [26]:
test = pd.read_csv(os.path.join(data_dir, 'Test.csv'))

labels = test['ClassId']
images = test['Path']

data = []

for img in images:
    try:
        image = cv2.imread(os.path.join(data_dir, img))
        image_fromarray = Image.fromarray(image, 'RGB')
        resize_image = image_fromarray.resize((IMG_HEIGHT, IMG_WIDTH))
        data.append(np.array(resize_image))
    except:
        print("Error in: ", img)

In [27]:
X_test = np.array(data) / 255.0
X_test

In [28]:
pred = model.predict(X_test)
pred.shape

In [29]:
pred_classes=np.argmax(pred,axis=1)

In [30]:
print('Test data accuracy: ',accuracy_score(labels, pred_classes)*100)

## Visualizing with confusion matrix

In [31]:
from sklearn.metrics import confusion_matrix
cf = confusion_matrix(labels, pred_classes)

In [32]:
import seaborn as sns
df_cm = pd.DataFrame(data=cf, index=classes, columns=classes)
plt.figure(figsize = (20, 20))
sns.heatmap(df_cm, annot=True)

## Classification Report

In [33]:
from sklearn.metrics import classification_report

print(classification_report(labels, pred_classes))

## Predictions on Test Data

In [34]:
plt.figure(figsize = (25, 25))

start_index = 0
for i in range(25):
    plt.subplot(5, 5, i + 1)
    plt.grid(False)
    plt.xticks([])
    plt.yticks([])
    prediction = pred_classes[start_index + i]
    actual = labels[start_index + i]
    col = 'b'
    if prediction != actual:
        col = 'r'
    plt.xlabel('Actual={} || Pred={}'.format(actual, prediction), color = col)
    plt.imshow(X_test[start_index + i])
plt.show()

## Save model for later inference

In [35]:
# model.save("model")

In [36]:
# model.save('my_model.h5')

In [37]:
# !zip -r model.zip model/

## Quantization aware training

In [39]:
!pip install -q tensorflow-model-optimization

In [None]:
# import tensorflow_model_optimization as tfmot

# quantize_model = tfmot.quantization.keras.quantize_model

# # q_aware stands for for quantization aware.
# q_aware_model = quantize_model(model)

# #Compilation of the model
# opt = Adam(learning_rate=lr, decay=lr / (epochs * 0.5))
# q_aware_model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'])