In [45]:
import cv2 as cv
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

from scipy.stats import bernoulli
from scipy.ndimage import rotate

import tensorflow as tf
from keras.utils import Sequence
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten, Input, add
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras import regularizers, Model
from tensorflow.keras import optimizers

# Image Utils

In [63]:
def imgCenterCrop(img, crop_size):
    """
    Crop source img
    
    :param img: source img
    
    :param crop_size: ints tuple
        size of cropped img
        
    :return: cropped img
    """
    crop_height, crop_width = crop_size 
    w_l = np.int32((img.shape[1] - crop_width) / 2)
    return img[img.shape[0] - crop_height:, w_l: w_l + crop_width]


def imgRandomFlip(img, steering_angle, flipping_prob=0.5):
    """
    Flip a coin, if head -> flip the image. If the image is flipped
    the steering angle is flipped sign
    
    :param img: original image
    
    :param steering_angle: original steering angle
    
    :param flipping_prob:
    
    :return:
        (flipped) image, (flipped) steering angle
    
    """
    head = bernoulli.rvs(flipping_prob)
    if head:
        return np.fliplr(img), -1 * steering_angle
    else:
        return img, steering_angle
    

def imgRandomAdjustGamma(img):
    """
    Randomly apply gamma correction on img to adjust its brightness
    
    :param img: source image
    """
    gamma = np.random.uniform(0.4, 1.5)
    inv_gamma = 1.0 / gamma
    table = np.array([((i / 255.0) ** inv_gamma) * 255
                      for i in np.arange(0, 256)]).astype("uint8") 
    
    # apply gamma correction using lookup table
    return cv.LUT(img, table)


def imgRandomShear(img, steering_angle, shear_range=200):
    """
    Source:  https://medium.com/@ksakmann/behavioral-cloning-make-a-car-drive-like-yourself-dc6021152713#.7k8vfppvk

    :param img: source img
    
    :param steering_angle: associate steering_angle
    
    :param shear_range: random shear between [-shear_range, shear_range + 1]

    :return:
        sheared img & sheared steering angle
    """
    if len(img.shape) > 2:
        rows, cols, ch = img.shape
    else:
        rows, cols = img.shape
    dx = np.random.randint(-shear_range, shear_range + 1)
    random_point = [cols / 2 + dx, rows / 2]
    pts1 = np.float32([[0, rows], [cols, rows], [cols / 2, rows / 2]])
    pts2 = np.float32([[0, rows], [cols, rows], random_point])
    dsteering = dx / (rows / 2) * 360 / (2 * np.pi * 25.0) / 6.0
    M = cv.getAffineTransform(pts1, pts2)
    img = cv.warpAffine(img, M, (cols, rows), borderMode=1)
    steering_angle += dsteering

    return img, steering_angle


def imgRandomRotate(img, steering_angle, max_rotation=15):
    """
    Rotate input image
    
    :param img: source img
    
    :param steering_angle: assoc. steering_angle
    
    :return:
        Rotated img & rotated steering angle
    """
    rot_angle = np.random.uniform(-max_rotation, max_rotation + 1) * np.pi / 180
    return rotate(img, rot_angle, reshape=False), steering_angle - rot_angle


def imgPreprocess(img, steering_angle, crop_size=(300, 400), shear_prob=0.75, resize=(128, 128)):
    """
    Apply all random treatment on source image
    
    :return:
        pre-processed img & its associate steering angle
    """
    # crop 
    img = imgCenterCrop(img, crop_size)
    
    # adjust brightness
    img = imgRandomAdjustGamma(img)
    
    # randomly flip
    img, steering_angle = imgRandomFlip(img, steering_angle)
    
    # randomly rotate
    img, steering_angle = imgRandomRotate(img, steering_angle)
    
    # randomly shear
    head = bernoulli.rvs(shear_prob)
    if head:
        img, steering_angle = imgRandomShear(img, steering_angle)
    
    # resize 
    img = np.float32(cv.resize(img, resize))
    
    if len(img.shape) == 2:
        img = img.reshape((resize[0], resize[1], 1))
    
    return img, steering_angle

In [64]:
t_img = cv.imread('data/center/1479424215880976321.png', 0)
print(t_img.shape)
steering_angle = 0
img, steering_angle = imgPreprocess(t_img, steering_angle)
print(img.shape)

(480, 640)
(128, 128, 1)


# Train Utils

In [65]:
STEERING_COEFFICIENT = 0.229  # to calculate steering angle associate with (left/ right) camera 

In [66]:
def trainGetImgFiles(df, batch_idx, batch_size=64):
    """
    Extract name of image files & associate steering angle
    from dataframe starting from the (batch_idx * batch_size)-th row
    
    :param df: 
        dataframe object storing csv file
    
    :param batch_idx: int
        which rows to start getting img path & steering angle
        
    :param batch_size:
        
    :return:
        List of tuples of frame_id(which camera), img_path & steering angle
    """
    path_prefix = 'data/'
    _batch = []
    row_0 = batch_idx * batch_size
    for i in range(min(batch_size, len(df) - row_0)):
        frame_id = df.loc[row_0 + i].frame_id
        img_path = path_prefix + df.loc[row_0 + i].filename
        steering_angle = df.loc[row_0 + i].angle
        _batch.append((frame_id, img_path, steering_angle))

    return _batch


def trainGenerateBatch(df, batch_idx, batch_size=64, resize=(128, 128), color_mode=0):
    """
    Generate batch for training.
    
    :param df: 
        dataframe object storing csv file
    
    :param batch_idx: int
        which rows to start getting img path & steering angle
        
    :param batch_size:
    
    :param color_mode:
        1: color img, 0: grayscale img 
        
    :return:
        1 array of preprocessid imgs & 1 array assoc. steering angle
    """
    # read csv & extract frame_id, path to img file, steering angle
    _batch = trainGetImgFiles(df, batch_idx, batch_size)
    
    batch_X = []  # features
    batch_y = []  # labels
    
    # preprocess img & steering angle 
    for _frame_id, _path, _steering in _batch:
        # get raw image
        _img = cv.imread(_path, color_mode)  
        
        # adjust steering angle if img is obtained by side camera
        if _frame_id == 'left_camera':
            _steering -= STEERING_COEFFICIENT
        elif _frame_id == 'right_camera':
            _steering += STEERING_COEFFICIENT
        
        # manipulate img & steering 
        img, steering = imgPreprocess(_img, _steering, resize=resize)
        
        batch_X.append(img)
        batch_y.append(steering)
    
    return np.array(batch_X), np.array(batch_y)


## Data generator

In [73]:
class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, data_file, batch_size=64, color_mode=0, img_size=(128, 128), shuffle=True, 
                 is_training=True):
        """
        :param data_file:
            name of csv file contains driving info
            
        :param batch_size: int
        
        :param img_size: tuple of ints
            size of training & testing img
        
        :param shuffle: bool
            shuffle training & validation set after every epoch
            
        :param is_training: bool
            generate data for training or testing
        """
        self.df = pd.read_csv(data_file)  # dataframe
        self.batch_size = batch_size
        self.img_size = img_size
        self.color_mode = color_mode
        self.shuffle = shuffle
        self.on_epoch_end()
        self.is_training = is_training
    
    def __len__(self):
        """
        :return:
            Number of batches per epoch
        """
        return int(np.floor(len(self.df) / float(self.batch_size)))
    
    def __getitem__(self, idx):
        """
        Generate 1 batch of data
        
        :param idx: int
            starting idx of this batch
        """
        if self.is_training:
            return trainGenerateBatch(self.df, idx, self.batch_size, self.img_size, self.color_mode)
        else:
            return testGenerateBatch(self.df, idx, self.batch_size, self.img_size, self.color_mode)
    
    def on_epoch_end(self):
        if self.shuffle:
            self.df = self.df.sample(frac=1).reset_index(drop=True)

# Model - Learn to Fly by Driving

In [68]:
def resnet8(img_width, img_height, img_channels, output_dim):
    """
    Define model architecture.
    
    # Arguments
       img_width: Target image widht.
       img_height: Target image height.
       img_channels: Target image channels.
       output_dim: Dimension of model output.
       
    # Returns
       model: A Model instance.
    """

    # Input
    img_input = Input(shape=(img_height, img_width, img_channels))

    x1 = Conv2D(32, (5, 5), strides=[2,2], padding='same')(img_input)
    x1 = MaxPooling2D(pool_size=(3, 3), strides=[2,2])(x1)

    # First residual block
    x2 = BatchNormalization()(x1)
    x2 = Activation('relu')(x2)
    x2 = Conv2D(32, (3, 3), strides=[2,2], padding='same',
                kernel_initializer="he_normal",
                kernel_regularizer=regularizers.l2(1e-4))(x2)

    x2 = BatchNormalization()(x2)
    x2 = Activation('relu')(x2)
    x2 = Conv2D(32, (3, 3), padding='same',
                kernel_initializer="he_normal",
                kernel_regularizer=regularizers.l2(1e-4))(x2)

    x1 = Conv2D(32, (1, 1), strides=[2,2], padding='same')(x1)
    x3 = add([x1, x2])

    # Second residual block
    x4 = BatchNormalization()(x3)
    x4 = Activation('relu')(x4)
    x4 = Conv2D(64, (3, 3), strides=[2,2], padding='same',
                kernel_initializer="he_normal",
                kernel_regularizer=regularizers.l2(1e-4))(x4)

    x4 = BatchNormalization()(x4)
    x4 = Activation('relu')(x4)
    x4 = Conv2D(64, (3, 3), padding='same',
                kernel_initializer="he_normal",
                kernel_regularizer=regularizers.l2(1e-4))(x4)

    x3 = Conv2D(64, (1, 1), strides=[2,2], padding='same')(x3)
    x5 = add([x3, x4])

    # Third residual block
    x6 = BatchNormalization()(x5)
    x6 = Activation('relu')(x6)
    x6 = Conv2D(128, (3, 3), strides=[2,2], padding='same',
                kernel_initializer="he_normal",
                kernel_regularizer=regularizers.l2(1e-4))(x6)

    x6 = BatchNormalization()(x6)
    x6 = Activation('relu')(x6)
    x6 = Conv2D(128, (3, 3), padding='same',
                kernel_initializer="he_normal",
                kernel_regularizer=regularizers.l2(1e-4))(x6)

    x5 = Conv2D(128, (1, 1), strides=[2,2], padding='same')(x5)
    x7 = add([x5, x6])

    x = Flatten()(x7)
    x = Activation('relu')(x)
    x = Dropout(0.5)(x)

    # Steering channel
    steer = Dense(output_dim)(x)

    # Define steering-collision model
    model = Model(inputs=[img_input], outputs=[steer])
    print(model.summary())

    return model

# Training

In [69]:
model = resnet8(128, 128, 1, 1)
model.compile(optimizer=optimizers.Adam(decay=1e-5), loss="mse", )

__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_7 (InputLayer)            [(None, 128, 128, 1) 0                                            
__________________________________________________________________________________________________
conv2d_42 (Conv2D)              (None, 64, 64, 32)   832         <tensorflow.python.keras.engine.i
__________________________________________________________________________________________________
max_pooling2d_5 (MaxPooling2D)  (None, 31, 31, 32)   0           <tensorflow.python.keras.layers.c
__________________________________________________________________________________________________
batch_normalization_v1_25 (Batc (None, 31, 31, 32)   128         <tensorflow.python.keras.layers.p
__________________________________________________________________________________________________
activation

In [70]:
# create two generators for training and validation
train_gen = DataGenerator('data/ch2_training.csv')
validation_gen = DataGenerator('data/ch2_validation.csv')

history = model.fit_generator(train_gen,
                              epochs=1,
                              validation_data=validation_gen,
                              verbose=1)




# Get Test Set

In [104]:
def testGenerateBatch(df, batch_idx, batch_size=64, resize=(128, 128), color_mode=0):
    """
    Generate batch for testing
    from dataframe starting from the (batch_idx * batch_size)-th row
    
    :param df: 
        dataframe object storing csv file
    
    :param batch_idx: int
        which rows to start getting img path & steering angle
        
    :param batch_size:
        
    :return:
        List of tuples of img_path & steering angle
    """
    path_prefix = 'data_test/center/'
    path_suffix = '.jpg'
    
    batch_X = []  # features
    batch_y = []  # labels
    
    row_0 = batch_idx * batch_size
    for i in range(min(batch_size, len(df) - row_0)):
        # get image
        img_path = path_prefix + df['frame_id'][i].astype(str) + path_suffix
        img = cv.imread(img_path, color_mode)
        
        # resize & reshape  
        img = np.float32(cv.resize(img, resize))
        if len(img.shape) == 2:
            img = img.reshape((resize[0], resize[1], 1))
        
        # get steering_angle        
        steering_angle = df.loc[row_0 + i].steering_angle
        
        batch_X.append(img)
        batch_y.append(steering_angle)
        
    return np.array(batch_X), np.array(batch_y)


In [105]:
test_gen = DataGenerator('data_test/final_example.csv', is_training=False)


test_diary = model.evaluate_generator(test_gen,
                                     verbose=1)

