In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
import cv2
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.layers import Lambda, Conv2D, MaxPooling2D, Dropout, Dense, Flatten
from tensorflow.keras.models import load_model
import os

In [None]:
# Utils

IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_CHANNELS = 64, 64, 3
INPUT_SHAPE = (IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_CHANNELS)

def load_image(data_dir, image_file):
    return mpimg.imread(os.path.join(data_dir, image_file.strip()))

def preprocess(image):
    image = image[60:-25, :, :] # crop
    image = cv2.resize(image, (IMAGE_WIDTH, IMAGE_HEIGHT), cv2.INTER_AREA) # resize
    #image[np.where((image > [220, 220, 220]).all(axis=2))] = [255, 255, 255] # fix noisy white pixels
    image = cv2.cvtColor(image, cv2.COLOR_RGB2YUV) # convert to YUV
    image = cv2.GaussianBlur(image, (3, 3), 0) # add blur
    return image

def choose_image(data_dir, center, left, right, steering_angle):
    if (left == "no_data") and (right == "no_data"):
        return load_image(data_dir, center), steering_angle
    choice = np.random.choice(3)
    if choice == 0:
        return load_image(data_dir, left), steering_angle + 0.2
    elif choice == 1:
        return load_image(data_dir, right), steering_angle - 0.2
    return load_image(data_dir, center), steering_angle

def random_flip(image, steering_angle):
    if np.random.rand() < 0.5:
        image = cv2.flip(image, 1)
        steering_angle = -steering_angle
    return image, steering_angle

def random_translate(image, steering_angle, range_x, range_y):
    trans_x = range_x * (np.random.uniform() - 0.5)
    trans_y = range_y * (np.random.uniform() - 0.5)
    steering_angle += trans_x * 0.002
    trans_m = np.float32([[1, 0, trans_x], [0, 1, trans_y]])
    height, width = image.shape[:2]
    image = cv2.warpAffine(image, trans_m, (width, height))
    return image, steering_angle

def random_shadow(image):
    top_y = 320*np.random.uniform()
    top_x = 0
    bot_x = 160
    bot_y = 320*np.random.uniform()
    image_hls = cv2.cvtColor(image,cv2.COLOR_RGB2HLS)
    shadow_mask = 0*image_hls[:,:,1]
    X_m = np.mgrid[0:image.shape[0],0:image.shape[1]][0]
    Y_m = np.mgrid[0:image.shape[0],0:image.shape[1]][1]
    shadow_mask[((X_m-top_x)*(bot_y-top_y) -(bot_x - top_x)*(Y_m-top_y) >=0)]=1
    #random_bright = .25+.7*np.random.uniform()
    if np.random.randint(2)==1:
        random_bright = .4
        cond1 = shadow_mask==1
        cond0 = shadow_mask==0
        if np.random.randint(2)==1:
            image_hls[:,:,1][cond1] = image_hls[:,:,1][cond1]*random_bright
        else:
            image_hls[:,:,1][cond0] = image_hls[:,:,1][cond0]*random_bright    
    image = cv2.cvtColor(image_hls,cv2.COLOR_HLS2RGB)
    return image

def random_brightness(image):
    image1 = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
    image1 = np.array(image1, dtype=np.float64)
    random_bright = .5 + np.random.uniform()
    image1[:, :, 2] = image1[:, :, 2] * random_bright
    image1[:, :, 2][image1[:, :, 2] > 255] = 255
    image1 = np.array(image1, dtype=np.uint8)
    image1 = cv2.cvtColor(image1, cv2.COLOR_HSV2RGB)
    return image1

def augment(data_dir, center, left, right, steering_angle, range_x=100, range_y=10):
    image, steering_angle = choose_image(data_dir, center, left, right, steering_angle)
    #image, steering_angle = random_flip(image, steering_angle)
    image, steering_angle = random_translate(image, steering_angle, range_x, range_y)
    image = random_shadow(image)
    image = random_brightness(image)
    return image, steering_angle

def batch_generator(data_dir, image_paths, steering_angles, batch_size, is_training):
    images = np.empty((batch_size, IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_CHANNELS))
    steers = np.empty(batch_size) 
    while True:
        i = 0
        for index in np.random.permutation(image_paths.shape[0]):
            center, left, right = image_paths[index]
            steering_angle = steering_angles[index]
            # augmentation
            if is_training and np.random.rand() < 0.6:
                image, steering_angle = augment(data_dir, center, left, right, steering_angle)
            else:
                image = load_image(data_dir, center)
            # add the image and steering angle to the batch
            images[i] = preprocess(image)
            steers[i] = steering_angle
            i += 1
            if i == batch_size:
                break
        yield images, steers

In [None]:
# Check GPU (skip this if GPU hardware acceleration is not chosen)
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
    raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

In [None]:
# Mount google drive
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
# Get data from google drive
!unzip "/content/gdrive/My Drive/data.zip" -d "/content"

In [None]:
# Global variables and hyperparameters
np.random.seed(0)
data_dir = "/content/data_lane_t12m123"
test_size = 0.1
drop_rate = 0.35
learning_rate = 0.0002
batch_size = 32
steps_per_epoch = 2000
epochs = 80
initial_epoch = 0

In [None]:
# Load data
data_df = pd.read_csv(os.path.join(data_dir, 'driving_log.csv'))

X = data_df[['center', 'left', 'right']].values
y = data_df['steering'].values

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=test_size, random_state=0)
print(X_train.shape)

In [None]:
# Build model (Custom)
model = Sequential()
model.add(Lambda(lambda x: x/255.0 - 0.5, input_shape=INPUT_SHAPE))
model.add(Conv2D(3, (1, 1), activation='elu'))
model.add(Conv2D(32, (3, 3), activation='elu'))
model.add(Conv2D(32, (3, 3), activation='elu'))
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(drop_rate))
model.add(Conv2D(64, (3, 3), activation='elu'))
model.add(Conv2D(64, (3, 3), activation='elu'))
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(drop_rate))
model.add(Conv2D(128, (3, 3), activation='elu'))
model.add(Conv2D(128, (3, 3), activation='elu'))
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(drop_rate))
model.add(Flatten())
model.add(Dense(512, activation='elu'))
model.add(Dense(64, activation='elu'))
model.add(Dense(16, activation='elu'))
model.add(Dense(1))
model.summary()

model.compile(loss='mse', optimizer=Adam(lr=learning_rate))

In [None]:
# Train model
checkpoint = ModelCheckpoint(
    '/content/gdrive/My Drive/Colab Notebooks/UIT_custom/model-{epoch:03d}.h5',
    monitor='val_loss',
    verbose=0,
    save_best_only=True,
    mode='auto'
)

history = model.fit(
    batch_generator(data_dir, X_train, y_train, batch_size, True),
    epochs=epochs,
    initial_epoch=initial_epoch,
    steps_per_epoch=steps_per_epoch,
    validation_data=batch_generator(data_dir, X_val, y_val, batch_size, False),
    validation_steps=np.ceil(len(X_val) / batch_size),
    callbacks=[checkpoint],
    verbose=2
)

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.legend(['training', 'validation'])
plt.title('Loss')
plt.xlabel('Epoch')

In [None]:
# Save model
model.save("/content/gdrive/My Drive/Colab Notebooks/model-020.h5")

In [None]:
# Load model
model_path = "/content/gdrive/My Drive/Colab Notebooks/model-086.h5"
model = load_model(model_path)