In [1]:
# Import modules
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.utils import to_categorical
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from sklearn.utils import class_weight
from skimage import io
from skimage import transform
from skimage import exposure
import cv2
%matplotlib inline
from importlib import reload
from model import TrafficClassifier


## Splitting images and save them

In [None]:
def get_images(basePath, csvPath):
    # create empty lists to store image paths & labels
    images = []
    labels = []
    
    # open csv file and git rid of first row
    data = open(csvPath).read().split('\n')[1:]
    for (i, d) in enumerate(data):
        if i > 0 and i % 1000 == 0:
            print("[INFO] processed {} total images".format(i))
            
        if len(d.split(',')) > 1:

            # get the label of each image
            labels.append(d.split(',')[-2])
            # get the path of each image
            path = basePath + d.split(',')[-1]
            
            # read the image
            img = io.imread(path)
            
            # improve image contrast by applying Adaptive Histogram Equalization
            img = exposure.equalize_adapthist(img, clip_limit=0.1)
            
            # resize the image to be 32x32 pixels, ignoring aspect ratio
            img = transform.resize(img, (32,32))
            
            # add processsed image to the list
            images.append(img)
            
    # convert the data and labels to NumPy arrays
    images = np.array(images)
    labels = np.array(labels)
        
    return images, labels

In [None]:
# path to the training and testing CSV files
basePath = 'images/'
trainPath = "images/Train.csv"
testPath = "images/Test.csv"

# load the training and testing data
print("[INFO] loading training and testing data...")
(trainX, trainY) = get_images(basePath, trainPath)
(testX, testY) = get_images(basePath, testPath)

In [None]:
# split train data into trani dataset & validation dataset
trainX, valX, trainY, valY = train_test_split(trainX, trainY, test_size=0.1, random_state=42)

In [None]:
# save dataset
np.save('datasets/trainX', trainX)
np.save('datasets/trainy', trainY)
np.save('datasets/valX', valX)
np.save('datasets/valY', valY)
np.save('datasets/testX', testX)
np.save('datasets/testY', testY)

## load images

In [None]:
trainX = np.load('datasets/trainX.npy')
trainY = np.load('datasets/trainY.npy')
valX = np.load('datasets/valX.npy')
valY = np.load('datasets/valY.npy')
testX = np.load('datasets/testX.npy')
testY = np.load('datasets/testY.npy')

In [None]:
# load sign name file
signNames = pd.read_csv('images/signnames.csv')

In [None]:
# scale data to the range of [0, 1]
trainX = trainX.astype("float32") / 255.0
valX = valX.astype("float32") / 255.0
testX = testX.astype("float32") / 255.0

In [None]:
# get uniqe label of data
num_labels = len(np.unique(trainY))

# create weighted classes to overcome the imbalance in classes
weight = class_weight.compute_class_weight('balanced', np.unique(trainY), trainY)
weight = {i : weight[i] for i in range(num_labels)}

In [None]:
# convert labels to categorical data in all datasets 
trainY = to_categorical(trainY, num_labels)
valY = to_categorical(valY, num_labels)
testY = to_categorical(testY, num_labels)

## Model architecture

In [None]:
# construct the image generator for data augmentation
aug = ImageDataGenerator(
    rotation_range=10,
    zoom_range=0.15,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.15,
    horizontal_flip=False,
    vertical_flip=False,
    fill_mode="nearest")

In [None]:
epochs = 100
learning_rate = 0.0001
batch_size = 32
    
# initialize the optimizer and compile the model
print("[INFO] compiling model...")
model = TrafficClassifier.createCNN(width=32, height=32, depth=3, classes=43)
optimizer = Adam(lr=learning_rate, decay=learning_rate / (epochs))
model.compile(optimizer=optimizer, loss="categorical_crossentropy", metrics=["accuracy"])

# train the network
print("[INFO] training network...")
fit = model.fit(
    aug.flow(trainX, trainY, batch_size=batch_size), 
    epochs=epochs,
    validation_data=(valX, valY),
    class_weight=weight,
    verbose=1)

In [None]:
score = model.evaluate(testX, testY, verbose = 0) 
print('Test Loss: ', score[0]) 
print('Test Accuracy: ', score[1]) 

In [None]:
plt.plot(fit.history['loss']) 
plt.plot(fit.history['val_loss']) 
plt.legend(['training', 'validation']) 
plt.title('Loss') 
plt.xlabel('epoch') 

In [None]:
plt.plot(fit.history['accuracy']) 
plt.plot(fit.history['val_accuracy']) 
plt.legend(['training', 'validation']) 
plt.title('Accuracy') 
plt.xlabel('epoch')

In [None]:
model.save("sign_recognition_final_model/model.pb")