In [1]:
import numpy as np
from skimage import io, color, exposure, transform
from sklearn.model_selection import train_test_split
import os
import glob
import h5py

from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential, model_from_json
from keras.layers import Dense, Dropout, Activation, Flatten, Reshape, Input
from keras.layers import Convolution2D, MaxPooling2D
from keras.optimizers import SGD
from keras.utils import np_utils
from keras.layers import Merge
from keras.callbacks import LearningRateScheduler, ModelCheckpoint

from matplotlib import pyplot as plt

Using TensorFlow backend.


In [2]:
NUM_CLASSES = 43
IMG_SIZE = 48

In [3]:
def get_class(img_path):
    return int(img_path.split('/')[-2])

def preprocess_img(img):
    # Histogram normalization in y
    hsv = color.rgb2hsv(img)
    hsv[:,:,2] = exposure.equalize_hist(hsv[:,:,2])
    img = color.hsv2rgb(hsv)

    # central scrop
    min_side = min(img.shape[:-1])
    centre = img.shape[0]//2, img.shape[1]//2
    img = img[centre[0]-min_side//2:centre[0]+min_side//2,
              centre[1]-min_side//2:centre[1]+min_side//2,
              :]

    # rescale to standard size
    img = transform.resize(img, (IMG_SIZE, IMG_SIZE))

    # roll color axis to axis 0
    img = np.rollaxis(img,-1)

    return img

In [4]:
def readTrafficSigns(root_dir):
    imgs = []
    labels = []

    all_img_paths = glob.glob(os.path.join(root_dir, '*/*.ppm'))

    for img_path in all_img_paths:
        try:
            img = preprocess_img(io.imread(img_path))
            label = get_class(img_path)
            imgs.append(img)
            labels.append(label)

            if len(imgs)%1000 == 0: print("Processed {}/{}".format(len(imgs), len(all_img_paths)))
        except (IOError, OSError):
            print('missed', img_path)
            pass
    return imgs,labels

In [5]:
if os.path.isfile("Image_n_Labels/trainImagesCNN.npy") &  os.path.isfile("Image_n_Labels/trainLabelsCNN.npy") :
    imgs = np.load("Image_n_Labels/trainImagesCNN.npy")
    labels = np.load("Image_n_Labels/trainLabelsCNN.npy")
    print("[INFO] Training images and labels are loaded in variables ==> X,y")
    #print("[INFO] Number of training Images {} \nNumber of Labels {}".format(len(imgs), len(y)))
    
else:
    trainImages, trainLabels =readTrafficSigns("/home/nikita/Downloads/gtsrb/dataset/GTSRB_finaltraining/Final_Training/Images")
    np.save("Image_n_Labels/trainImagesCNN.npy",trainImages)
    np.save("Image_n_Labels/trainLabelsCNN.npy",trainLabels)
    print("[INFO] training images and labels are read from the dataset directory")
    print("[INFO] training images saved to Image_n_Labels/trainingImagesCNN.npy for further use")
    print("[INFO] training labels saved to Image_n_Labels/trainingLabelsCNN.npy for further use")
    imgs = np.load("Image_n_Labels/trainImagesCNN.npy")
    labels = np.load("Image_n_Labels/trainLabelsCNN.npy")

[INFO] Training images and labels are loaded in variables ==> X,y


In [6]:
X = np.array(imgs, dtype='float32')
Y = np.eye(NUM_CLASSES, dtype='uint8')[labels]

with h5py.File('X.h5','w') as hf:
    hf.create_dataset('imgs', data=X)
    hf.create_dataset('labels', data=Y)


In [7]:
def leNet():
    model = Sequential()

    model.add(Convolution2D(6, (5, 5), padding='valid', input_shape = (3, IMG_SIZE, IMG_SIZE), data_format="channels_first"))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Activation("sigmoid"))
    #model.add(Dropout(0.5))

    model.add(Convolution2D(16, (5, 5), padding='valid', data_format="channels_first"))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Activation("sigmoid"))
    model.add(Dropout(0.5))

    model.add(Convolution2D(120, (1, 1), padding='valid',data_format="channels_first"))

    model.add(Flatten())
    model.add(Dense(84))
    model.add(Activation("sigmoid"))
    model.add(Dense(43))
    model.add(Activation('softmax'))
    return model

In [8]:
model = leNet()
# let's train the model using SGD + momentum (how original).
lr = 0.01
sgd = SGD(lr=lr, decay=1e-6, momentum=0.9, nesterov=True)
model.compile(loss='categorical_crossentropy',
          optimizer=sgd,
          metrics=['accuracy'])

In [9]:
def lr_schedule(epoch):
    return lr*(0.1**int(epoch/10))

In [None]:

batch_size = 32
nb_epoch = 30

model.fit(X, Y,
          batch_size=batch_size,
          epochs=nb_epoch,
          validation_split=0.2,
          shuffle=True,
          callbacks=[LearningRateScheduler(lr_schedule),
                    ModelCheckpoint('leNet.h5',save_best_only=True)]
            )

Train on 31367 samples, validate on 7842 samples
Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30