In [None]:
### Imports
# Modules
import numpy as np
import cv2
import csv
import matplotlib.pyplot as plt
import os
import errno

# utils
from sklearn.utils import shuffle
from imgaug import augmenters as iaa
from sklearn.model_selection import train_test_split

# Keras
from keras.models import Sequential, load_model
from keras.layers import Flatten, Dense, Lambda, Cropping2D, Conv2D, Dropout, MaxPooling2D
from keras.optimizers import Adam, SGD
from keras.regularizers import l2

# Plot in the notebook
%matplotlib inline

In [None]:
### Constants
# Data
DATA_DIR = 'data/'
CSV_FILE = 'driving_log.csv'

# Resize constants
TOP_CUT = 30
BOTTOM_CUT = 30
NEW_WIDTH = 64
NEW_HEIGHT = 64
MAX_ROTATION_ANGLE = 15
MAX_SHEAR_SHIFT = 40

# Data generation constants
BATCH_SIZE = 32
STEERING_CORRECTION = 0.23

In [None]:
# Delete previous files:
def delete_file(file_name):
    try:
        os.remove(file_name)
    except OSError as error:
        if error.errno != errno.ENOENT:
            raise

In [None]:
### Load data
# Load csv log file
csv_file = []
with open(DATA_DIR+CSV_FILE) as csvfile:
    reader = csv.reader(csvfile)
    next(reader, None)  # skip the headers
    for line in reader:
        csv_file.append(line)

In [None]:
# Split csv in train and validation sets
csv_train, csv_valid = train_test_split(csv_file, test_size = 0.2)

In [None]:
### Sample image
# Get sample image for testing
sample_idx = np.random.choice(len(csv_file))
path = DATA_DIR + csv_file[sample_idx][0].strip()
sample_image = cv2.imread(path)
sample_angle = float(csv_file[sample_idx][3])
plt.imshow(cv2.cvtColor(sample_image, cv2.COLOR_BGR2RGB))

In [None]:
### Image transformations
# Crop image
def crop(image, top_cut = TOP_CUT, bottom_cut = BOTTOM_CUT, left_cut = 0, right_cut = 0):
    """
    Crop top, bottom, left and right sides of image
    """
    height, width = image.shape[0:2]
    cropped_image = image[top_cut:height-bottom_cut, left_cut:width-right_cut, :]
    return cropped_image

# Resize image
def resize(image, new_width = NEW_WIDTH, new_height = NEW_HEIGHT):
    """
    Resize image to new size
    """
    return cv2.resize(image,(new_width, new_height), interpolation=cv2.INTER_AREA)

# Random flip
def random_flip(image, angle):
    """
    Random flip image horizontally with probability of 0.5
    """
    if np.random.random() > 0.5:
        image = cv2.flip(image,1)
        angle *= -1.  
    return image, angle
    
# Random brightnes correction
def random_brightness_correction(image, angle):
    """
    Correct brightness of image in a random factor from (0.3, 1.7) in all channels
    """
    brightness_factor = np.random.uniform(0.4, 1.6)
    image = image * brightness_factor
    image[image > 255] = 255
    image = np.array(image, dtype = np.uint8)
    return image, angle

# Random shear
def random_shear(image, angle, shear_range = MAX_SHEAR_SHIFT):
    """
    Shear (affine transformation) horizontally by a random shift in the range of shear_range
    Implies also steering angle correction
    https://docs.opencv.org/2.4/doc/tutorials/imgproc/imgtrans/warp_affine/warp_affine.html
    """
    rows, cols, channels = image.shape
    shear_shift = np.random.randint(-shear_range, shear_range+1)
    random_point = [cols/2+shear_shift, rows/2]
    pts1 = np.float32([[0, rows], [cols, rows], [cols/2, rows/2]])
    pts2 = np.float32([[0, rows], [cols, rows], random_point])
    M = cv2.getAffineTransform(pts1, pts2)
    image = cv2.warpAffine(image, M, (cols, rows))
    shear_angle = shear_shift / (rows/2) * 180 / (np.pi*25.0) / 6   

    return image, angle + shear_angle

# Random rotation
def random_rotate(image, angle, rotation_range = MAX_ROTATION_ANGLE):
    """
    Rotate image by a random angle in the range of rotation_range
    Implies also steering angle correction
    """
    rotation_angle = np.random.uniform(-rotation_range, rotation_range+1)
    rotate_fun = iaa.Affine(rotate=(-rotation_angle))
    image = rotate_fun.augment_images([image])[0]
    rad = (np.pi / 180.0) * rotation_angle
    return image, angle - rad

In [None]:
### Testing image transformation
transformation_functions = [random_flip, random_brightness_correction, random_shear, random_rotate]

### Test image transformations
# Graph paramters
plot_width, plot_height = 15, 7
grid_rows, grid_cols = 4, 5
sample_size = grid_rows * grid_cols 

# Plot 
fig = plt.figure(figsize = (plot_width, plot_height))
fig.subplots_adjust(left = 0, right = 1, bottom = 0, top = 1, hspace = 0.05, wspace = 0.05)
counter = 1
for tr_fn in transformation_functions:
    # Print non-transformed image
    imgplt = fig.add_subplot(grid_rows, grid_cols, counter)
    imgplt.imshow(cv2.cvtColor(sample_image, cv2.COLOR_BGR2RGB))
    imgplt.axis('off')
    imgplt.set_title('Original')
    counter += 1
    # Print transformed images
    print_title = True
    for idx in range(4):
        tr_image, tr_angle = tr_fn(sample_image, sample_angle)
        imgplt = fig.add_subplot(grid_rows, grid_cols, counter)
        imgplt.imshow(cv2.cvtColor(tr_image, cv2.COLOR_BGR2RGB))
        imgplt.axis('off')
        if print_title:
            imgplt.set_title(tr_fn.__name__, loc = 'left')
            print_title = False
        counter += 1

In [None]:
### Generator
def transformed_data_generator(data, data_dir = DATA_DIR,
                               batch_size = BATCH_SIZE,
                               image_load = True,
                               new_width = NEW_WIDTH, new_height = NEW_HEIGHT):
    
    # Camera parameters
    cameras = ['center', 'left', 'right']
    cameras_index = {'center':0, 'left':1, 'right':2} # 0:center, 1:left, 2:right
    cameras_steering_correction = {'center':0, 'left':STEERING_CORRECTION, 'right':-STEERING_CORRECTION}
    
    num_samples = len(data)
    
    while 1: # Loop forever so the generator never terminates
        
        data = shuffle(data)
        images = []
        angles = []
        
        for offset in range(0, num_samples, batch_size):
            batch_samples = data[offset:offset+batch_size]
            
            for line in batch_samples:

                ### Randomly choose center, left or right image
                # Get random camera 0:center, 1:left, 2:right 
                camera = np.random.choice(cameras)
                file_name = line[cameras_index[camera]].split('/')[-1]
                path = data_dir + 'IMG/' + file_name
                if image_load:
                    image = cv2.imread(path)
                else:
                    image = np.zeros((160, 320, 3), dtype = np.uint8)
                # Adjust angle
                angle = float(line[3])
                angle += cameras_steering_correction[camera]            

                ### Random transformations
                image, angle = random_flip(image, angle)
                image, angle = random_brightness_correction(image, angle)
                image, angle = random_shear(image, angle)
                # image, angle = random_rotate(image, angle)
                
                ### Resize
                image = crop(image)
                image = resize(image, new_width, new_height)
                
                images.append(image)
                angles.append(angle)
            
            yield np.array(images), np.array(angles)

In [None]:
def lenet_generator(csv_file):
    # For lenet architecture 32x32x1 grayscaled images required
    for images, angles in transformed_data_generator(csv_file, new_width = 32, new_height = 32):
        images = 0.299*images[:, :, :, 0] + 0.587*images[:, :, :, 1] + 0.114*images[:, :, :, 2]
        images = np.expand_dims(images, axis=3)
        yield images, angles

In [None]:
for images, angles in transformed_data_generator(csv_file):
    break

plot_width, plot_height = 15, 8.3
grid_rows, grid_cols = 4, 8

fig = plt.figure(figsize = (plot_width, plot_height))
fig.subplots_adjust(left = 0, right = 1, bottom = 0, top = 1, hspace = 0.05, wspace = 0.05)
counter = 1
for image, angle in zip(images[:32], angles[:32]):
    imgplt = fig.add_subplot(grid_rows, grid_cols, counter)
    counter += 1
    imgplt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    imgplt.axis('off')
    imgplt.set_title('{:f}'.format(angle))

In [None]:
for images, angles in lenet_generator(csv_file):
    break
    
plot_width, plot_height = 15, 8.3
grid_rows, grid_cols = 4, 8

fig = plt.figure(figsize = (plot_width, plot_height))
fig.subplots_adjust(left = 0, right = 1, bottom = 0, top = 1, hspace = 0.05, wspace = 0.05)
counter = 1
for image, angle in zip(images[:32], angles[:32]):
    imgplt = fig.add_subplot(grid_rows, grid_cols, counter)
    counter += 1
    imgplt.imshow(image.squeeze(), cmap='gray_r')
    imgplt.axis('off')
    imgplt.set_title('{:f}'.format(angle))

In [None]:
### Model definition
learning_rate = 0.0001
keep_prob = 0.5
def create_nv_model():
    
    # Define nvidia model
    model = Sequential()

    # Lambda layer
    model.add(Lambda(lambda x: x / 255.0 - 0.5, input_shape = (64, 64, 3)))
    
    # Convolutional layers
    model.add(Conv2D(24, (5, 5), padding = 'valid', strides = (2, 2), activation = 'relu'))
    model.add(Conv2D(36, (5, 5), padding = 'valid', strides = (2, 2), activation = 'relu'))
    model.add(Conv2D(48, (5, 5), padding = 'valid', strides = (2, 2), activation = 'relu'))
    model.add(Conv2D(64, (3, 3), padding = 'same', strides = (2, 2), activation = 'relu'))
    model.add(Conv2D(64, (3, 3), padding = 'valid', strides = (2, 2), activation = 'relu'))
    
    model.add(Flatten())
    
    # Fully connected layers
    model.add(Dense(1164, activation = 'relu'))
    model.add(Dropout(keep_prob))
    model.add(Dense(100, activation = 'relu'))
    model.add(Dropout(keep_prob))
    model.add(Dense(50, activation = 'relu'))
    model.add(Dropout(keep_prob))
    model.add(Dense(10, activation = 'relu'))
    model.add(Dense(1))
    
    model.compile(loss = 'mse', optimizer = Adam())
    return model

In [None]:
nv_model = create_nv_model()
nv_model.summary()

In [None]:
### Model training
nv_hist = nv_model.fit_generator(transformed_data_generator(csv_train),
                                steps_per_epoch = len(csv_train)/BATCH_SIZE*3,
                                epochs=8,
                                validation_data = transformed_data_generator(csv_valid),
                                validation_steps = len(csv_valid)/BATCH_SIZE*3)

In [None]:
### Save model
model_name = 'models/nv_model.h5'
delete_file(model_name)
nv_model.save(model_name)

In [None]:
for images, angles in transformed_data_generator(csv_valid):
    break
steering_angles = nv_model.predict(images)
real_signs = np.sign(angles)
pred_signs = np.sign(steering_angles)
accuracy = np.count(real_signs == pred_signs)/len(angles)
print('Acuracy = {:%}'.format(acurracy))

In [None]:
### Show results
### plot the training and validation loss for each epoch
plt.plot(nv_hist.history['loss'])
plt.plot(nv_hist.history['val_loss'])
plt.title('model mean squared error loss')
plt.ylabel('mean squared error loss')
plt.xlabel('epoch')
plt.legend(['training set', 'validation set'], loc='upper right')

In [None]:
def create_lenet_model():
    # Define lenet model
    model = Sequential()
    model.add(Lambda(lambda x: x / 127.5 - 1, input_shape = (32, 32, 1)))
    model.add(Conv2D(6, (5, 5), padding = 'valid', activation = 'relu'))
    model.add(MaxPooling2D((2, 2), padding='valid'))
    model.add(Conv2D(16, (5, 5), padding = 'valid', activation = 'relu'))
    model.add(MaxPooling2D((2, 2), padding='valid'))
    model.add(Flatten())
    model.add(Dropout(0.5))
    model.add(Dense(120, activation = 'relu'))
    model.add(Dropout(0.5))
    model.add(Dense(84, activation = 'relu'))
    model.add(Dropout(0.5))
    model.add(Dense(1, kernel_initializer = 'truncated_normal'))
    model.compile(loss = 'mse', optimizer = 'adam')
    return model

In [None]:
ln_model = create_lenet_model()
ln_model.summary()

In [None]:
### Model training
ln_hist_object = ln_model.fit_generator(lenet_generator(csv_train),
                                steps_per_epoch = len(csv_train)/BATCH_SIZE*3,
                                epochs=5,
                                validation_data = lenet_generator(csv_valid),
                                validation_steps = len(csv_valid)/BATCH_SIZE*3)

In [None]:
### Save model
model_name = 'models/ln_model.h5'
delete_file(model_name)
ln_model.save(model_name)

In [None]:
for images, angles in lenet_generator(csv_valid):
    break
steering_angles = ln_model.predict(images)
real_signs = np.sign(angles)
pred_signs = np.sign(steering_angles).T
accuracy = np.sum(real_signs == pred_signs)/len(angles)
print('Acuracy = {:%}'.format(accuracy))

In [None]:
def create_simple_nv_model():
    # Define nvidia model
    model = Sequential()
    model.add(Lambda(lambda x: x / 255.0 - 0.5, input_shape = (64, 64, 3)))
    model.add(Conv2D(24, (5, 5), padding = 'valid', strides = (2, 2), activation = 'relu'))
    model.add(Conv2D(36, (5, 5), padding = 'valid', strides = (2, 2), activation = 'relu'))
    model.add(Conv2D(48, (5, 5), padding = 'valid', strides = (2, 2), activation = 'relu'))
    model.add(Conv2D(64, (3, 3), padding = 'same', strides = (2, 2), activation = 'relu'))
    model.add(Conv2D(64, (3, 3), padding = 'valid', strides = (2, 2), activation = 'relu'))
    model.add(Flatten())
    model.add(Dense(256))
    model.add(Dropout(0.5))
    model.add(Dense(64))
    model.add(Dropout(0.5))
    model.add(Dense(16))
    model.add(Dropout(0.5))
    model.add(Dense(10))
    model.add(Dense(1))
    model.compile(loss = 'mse', optimizer = 'adam')
    return model

In [None]:
simple_nv_model = create_simple_nv_model()
simple_nv_model.summary()

In [None]:
### Model training
simple_nv_hist_object = simple_nv_model.fit_generator(transformed_data_generator(csv_train),
                                steps_per_epoch = len(csv_train)/BATCH_SIZE*3,
                                epochs=8,
                                validation_data = transformed_data_generator(csv_valid),
                                validation_steps = len(csv_valid)/BATCH_SIZE*3)

In [None]:
### Save model
model_name = 'models/simple_ln_model.h5'
delete_file(model_name)
simple_nv_model.save(model_name)