# Lane detection using deep convolutional neural network autoencoder

Starting this project, I have a hypothesis that deep convolutional neural network will be able to find road structure.

Then I will label real road videos and will test it on video from Udacity. I expect to find train videos on youtube and I expect that 1 000 labeled images will be enough for success. I estimate labeling effort as highest effort in whole project - I expect to spend ~11 hours on this, labeling 90 images per hour (2 images per minute for 45 minutes and 15 minutes rest).

To find out how good model is and how many actual images I need, I will solve simplier task first: I will first prove simplier hypothesis: sumulate a bunch of road images and will train dCNN autoencoder to recognize it from image back to model.

I will simulate only one lane (in which car is currently moving) for purpose of simplicity.

Udacity challenge has three videos:
1. "project_video.mp4" has solid yellow left line and dash-dot white right line. Lines are curved with one curve direction or straight 
2. "challenge_video.mp4" same as previous but with changing lightning conditions
3. "harder_challenge_video.mp4" has double solid yellow left line and solid right line, lighning conditions are changing very fast. Lines are curved with two extremums (which means it cannot be described with parabola, but with cubic polynomial).

In [6]:
class Point:
    def __init__(self, x: float, y: float):
        self.x: float = x
        self.y: float = y
            
    def __str__(self):
        return "({}, {})".format(self.x, self.y)

In [7]:
point = Point(x=0.25, y=0.75)
print(point)

(0.25, 0.75)


In [39]:
class SimulatedLine:
    def __init__(self, polynomial_coefficients: [float]):
        self.polynomial_coefficients: [float] = polynomial_coefficients
    
    def __str__(self):
        polynomial_coefficients = self.polynomial_coefficients
        polynomial_equation_parts = []
        for index, polynomial_coefficient in enumerate(polynomial_coefficients):
            degree = len(polynomial_coefficients) - index - 1
            
            if degree > 1:
                polynomial_equation_part = "{}*x^{}".format(polynomial_coefficient, degree)
            if degree == 1:
                polynomial_equation_part = "{}*x".format(polynomial_coefficient)
            elif degree == 0:
                polynomial_equation_part = "{}".format(polynomial_coefficient)
                
            polynomial_equation_parts.append(polynomial_equation_part)
        
        return "+".join(polynomial_equation_parts)

In [40]:
simulated_line = SimulatedLine(polynomial_coefficients=[1.0, 0.5, 0.25, 0.125])
print(simulated_line)

1.0*x^3+0.5*x^2+0.25*x+0.125


In [None]:
class SimulatedLane(Parabola):
    /*
    

In [None]:
class Lane:
    def __init__(self, left_line: PolyLine, right_line: PolyLine):
        self.left_line: PolyLine = left_line
        self.right_line: PolyLine = right_line
            
    def __str__(self):
        return "left line: {}\nright line: {}".format(self.left_line, self.right_line)

In [None]:
lane = Lane(
    left_line=PolyLine(a=0.5, b=0.25, c=0.125),
    right_line=PolyLine(a=0.75, b=0.5, c=0.25)
)

print(lane)

In [None]:
class LaneInput:
    def __init__(self, img):
        self.img = img

In [None]:
class LaneRecognizeInteractor:
    def __init__(self):
        self.model = load_model() 
        
    def interact(self, lane_input) -> :
        model.predict(...)
        return lane

In [None]:
class LanePresenter:
    def __init__(self, lane: Lane, image):
        self.lane: Lane = lane
            
    def present():
        pass
        

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
import cv2
import numpy as np
import random

class ImagePairGenerator:
    def __init__(self, image_width: int, image_height: int):
        self.image_width: int = image_width
        self.image_height: int = image_height
            
    def make_blank_image(self, width: int, height: int):
        return np.zeros((height, width, 3), dtype=np.uint8)

    def put_value_in_boundaries(self, value: int, max_value: int):
        if value > max_value:
            return max_value
        else:
            return value

    def draw_poly_line(self, image, points: [Point]):
        height = image.shape[0]
        width = image.shape[1]

        thickness = 1

        points_list = np.array([[
            self.put_value_in_boundaries(value=int(point.x * width), max_value=width - thickness), 
            self.put_value_in_boundaries(value=int(point.y * height), max_value=height - thickness)
        ] for point in points])

        cv2.polylines(image, [points_list], isClosed=True, color=[0, 255, 0], thickness=thickness)

    def draw_filled_poly_line(self, image, points: [Point]):
        height = image.shape[0]
        width = image.shape[1]

        points_list = np.array([[int(point.x * width), int(point.y * height)] for point in points])

        cv2.fillPoly(image, [points_list], color=[0, 255, 0])

    def generate_random_poly_line(self):
        left_bottom_x = 0.5 - random.uniform(0, 0.5)
        left_bottom = Point(x=left_bottom_x, y=1)

        left_top_x = 0.5 - random.uniform(0, 0.5)
        left_top = Point(x=left_top_x, y=0)

        right_top_x = 0.5 + random.uniform(0, 0.5)
        right_top = Point(x=right_top_x, y=0)

        right_bottom_x = 0.5 + random.uniform(0, 0.5)
        right_bottom = Point(x=right_bottom_x, y=1)

        return [
            left_bottom,
            left_top,
            right_top,
            right_bottom
        ]

    def generate(self):
        image_width = self.image_width
        image_height = self.image_height
                
        poly_line = self.generate_random_poly_line()

        X_image = self.make_blank_image(width=image_width, height=image_height)
        self.draw_poly_line(image=X_image, points=poly_line)

        y_image = self.make_blank_image(width=image_width, height=image_height)
        self.draw_filled_poly_line(image=y_image, points=poly_line)

        return X_image, y_image

image_width = 224
image_height = 224
image_pair_generator = ImagePairGenerator(image_width=image_width, image_height=image_height)
X_image, y_image = image_pair_generator.generate()

plt.imshow(X_image)
plt.show()
plt.imshow(y_image)
plt.show()

In [None]:
from PIL import Image

def save_image(image, file_name: str):
    pil_image = Image.fromarray(image)
    pil_image.save(file_name)
    
save_image(image=X_image, file_name='X.png')
save_image(image=y_image, file_name='y.png')

In [None]:
!rm -rf ./data

In [None]:
import os

def generate_data(image_width: int, image_height: int, image_quantity: int, folder: str):
    os.makedirs('./data', exist_ok=True)
    os.makedirs('./data/{}'.format(folder), exist_ok=True)
    os.makedirs('./data/{}/X'.format(folder), exist_ok=True)
    os.makedirs('./data/{}/y'.format(folder), exist_ok=True)
    
    image_pair_generator = ImagePairGenerator(image_width=image_width, image_height=image_height)
    for i in range(image_quantity):
        X_image, y_image = image_pair_generator.generate()
        save_image(image=X_image, file_name='./data/{}/X/{:05}.png'.format(folder, i))
        save_image(image=y_image, file_name='./data/{}/y/{:05}.png'.format(folder, i))
        
def generate_train_data(image_width: int, image_height: int):
    generate_data(image_width=image_width, image_height=image_height, image_quantity=500, folder='train')
        
generate_train_data(image_width=image_width, image_height=image_height)

def generate_valid_data(image_width: int, image_height: int):
    generate_data(image_width=image_width, image_height=image_height, image_quantity=100, folder='valid')
        
generate_valid_data(image_width=image_width, image_height=image_height)

In [None]:
def generate_test_data(image_width: int, image_height: int):
    generate_data(image_width=image_width, image_height=image_height, image_quantity=100, folder='test')
        
generate_test_data(image_width=image_width, image_height=image_height)

In [None]:
def load_images(folder: str):
    result = []
    for filename in sorted(os.listdir(folder)):
        image = cv2.imread(os.path.join(folder, filename))
        result.append(image)
    return np.array(result)
    
X_train = load_images(folder='./data/train/X') / 255.0
y_train = load_images(folder='./data/train/y') / 255.0

X_valid = load_images(folder='./data/valid/X') / 255.0
y_valid = load_images(folder='./data/valid/y') / 255.0

X_test = load_images(folder='./data/test/X') / 255.0
y_test = load_images(folder='./data/test/y') / 255.0

In [None]:
import tensorflow as tf
from tensorflow.keras import models
from tensorflow.keras.models import Model
from tensorflow.keras import layers
from tensorflow.keras import losses

def create_model(input_shape, pool_size):
    # Create the actual neural network here
    model = models.Sequential()

    # Below layers were re-named for easier reading of model summary; this not necessary
    # Conv Layer 1
    model.add(layers.Conv2D(8, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv1'))

    # Conv Layer 2
    model.add(layers.Conv2D(16, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv2'))

    # Pooling 1
    model.add(layers.MaxPooling2D(pool_size=pool_size))

    # Conv Layer 3
    model.add(layers.Conv2D(16, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv3'))
    model.add(layers.Dropout(0.2))

    # Conv Layer 4
    model.add(layers.Conv2D(32, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv4'))
    model.add(layers.Dropout(0.2))

    # Conv Layer 5
    model.add(layers.Conv2D(32, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv5'))
    model.add(layers.Dropout(0.2))

    # Pooling 2
    model.add(layers.MaxPooling2D(pool_size=pool_size))

    # Conv Layer 6
    model.add(layers.Conv2D(64, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv6'))
    model.add(layers.Dropout(0.2))

    # Conv Layer 7
    model.add(layers.Conv2D(64, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv7'))
    model.add(layers.Dropout(0.2))

    # Pooling 3
    model.add(layers.MaxPooling2D(pool_size=pool_size))

    # Upsample 1
    model.add(layers.UpSampling2D(size=pool_size))

    # Deconv 1
    model.add(layers.Conv2DTranspose(64, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv1'))
    model.add(layers.Dropout(0.2))

    # Deconv 2
    model.add(layers.Conv2DTranspose(64, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv2'))
    model.add(layers.Dropout(0.2))

    # Upsample 2
    model.add(layers.UpSampling2D(size=pool_size))

    # Deconv 3
    model.add(layers.Conv2DTranspose(32, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv3'))
    model.add(layers.Dropout(0.2))

    # Deconv 4
    model.add(layers.Conv2DTranspose(32, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv4'))
    model.add(layers.Dropout(0.2))

    # Deconv 5
    model.add(layers.Conv2DTranspose(16, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv5'))
    model.add(layers.Dropout(0.2))

    # Upsample 3
    model.add(layers.UpSampling2D(size=pool_size))

    # Deconv 6
    model.add(layers.Conv2DTranspose(16, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv6'))

    # Final layer - only including one channel so 1 filter
    model.add(layers.Conv2DTranspose(3, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Final'))

    return model

def create_model_fixed(input_shape, pool_size):
    # Create the actual neural network here
    model = models.Sequential()

    # Below layers were re-named for easier reading of model summary; this not necessary
    # Conv Layer 1
    model.add(layers.Conv2D(16, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv1'))

    # Conv Layer 2
    model.add(layers.Conv2D(16, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv2'))

    # Pooling 1
    model.add(layers.MaxPooling2D(pool_size=pool_size))


    # Conv Layer 4
    model.add(layers.Conv2D(32, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv4'))
    model.add(layers.Dropout(0.2))

    # Conv Layer 5
    model.add(layers.Conv2D(32, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv5'))
    model.add(layers.Dropout(0.2))

    # Pooling 2
    model.add(layers.MaxPooling2D(pool_size=pool_size))

    # Conv Layer 6
    model.add(layers.Conv2D(64, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv6'))
    model.add(layers.Dropout(0.2))

    # Conv Layer 7
    model.add(layers.Conv2D(64, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv7'))
    model.add(layers.Dropout(0.2))

    # Pooling 3
    model.add(layers.MaxPooling2D(pool_size=pool_size))

    # Upsample 1
    model.add(layers.UpSampling2D(size=pool_size))

    # Deconv 1
    model.add(layers.Conv2DTranspose(64, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv1'))
    model.add(layers.Dropout(0.2))

    # Deconv 2
    model.add(layers.Conv2DTranspose(64, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv2'))
    model.add(layers.Dropout(0.2))

    # Upsample 2
    model.add(layers.UpSampling2D(size=pool_size))

    # Deconv 3
    model.add(layers.Conv2DTranspose(32, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv3'))
    model.add(layers.Dropout(0.2))

    # Deconv 4
    model.add(layers.Conv2DTranspose(32, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv4'))
    model.add(layers.Dropout(0.2))


    # Upsample 3
    model.add(layers.UpSampling2D(size=pool_size))

    # Deconv 6
    model.add(layers.Conv2DTranspose(16, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv6'))
    
    # Deconv 6
    #model.add(layers.Conv2DTranspose(16, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv7'))

    # Final layer - only including one channel so 1 filter
    model.add(layers.Conv2DTranspose(3, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Final'))

    return model

def create_model_2():
    # Create the actual neural network here
    model = models.Sequential()

    filters = [16, 32, 64]
    
    for filter in filters:
        model.add(layers.Conv2D(filter, (3, 3), padding='valid', strides=(1,1), activation = 'relu'))
        model.add(layers.Dropout(0.2))
        model.add(layers.Conv2D(filter, (3, 3), padding='valid', strides=(1,1), activation = 'relu'))
        model.add(layers.Dropout(0.2))
        model.add(layers.MaxPooling2D(pool_size=(1,1)))

    filters_in_reverse_order = filters[::-1]
    for filter in filters_in_reverse_order:
        model.add(layers.UpSampling2D(size=(1,1)))
        model.add(layers.Conv2DTranspose(filter, (3, 3), padding='valid', strides=(1,1), activation = 'relu'))
        model.add(layers.Dropout(0.2))
        model.add(layers.Conv2DTranspose(filter, (3, 3), padding='valid', strides=(1,1), activation = 'relu'))
        model.add(layers.Dropout(0.2))        

    model.add(layers.Conv2DTranspose(3, (3, 3), padding='valid', strides=(1,1), activation = 'relu'))

    return model

class Fill(Model):
    def __init__(self, image_width: int, image_height: int):
        super(Fill, self).__init__()
        number_of_channels = 3
        self.encoder = tf.keras.Sequential([
            layers.Input(shape=(image_height, image_width, number_of_channels, )),
            layers.Conv2D(8, (3,3), activation=None, padding='same', strides=(1, 1)),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.Conv2D(8, (3,3), activation=None, padding='same', strides=(1, 1)),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.Conv2D(8, (3,3), activation=None, padding='same', strides=(1, 1)),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.MaxPooling2D((2, 2), padding='same'),
            layers.Conv2D(16, (3,3), activation=None, padding='same', strides=(1, 1)),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.Conv2D(16, (3,3), activation=None, padding='same', strides=(1, 1)),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.Conv2D(16, (3,3), activation=None, padding='same', strides=(1, 1)),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.MaxPooling2D((2, 2), padding='same'),
            layers.Conv2D(32, (3,3), activation=None, padding='same', strides=(1, 1)),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.Conv2D(32, (3,3), activation=None, padding='same', strides=(1, 1)),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.Conv2D(32, (3,3), activation=None, padding='same', strides=(1, 1)),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.MaxPooling2D((2, 2), padding='same'),
            layers.Conv2D(64, (3,3), activation=None, padding='same', strides=(1, 1)),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.Conv2D(64, (3,3), activation=None, padding='same', strides=(1, 1)),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.Conv2D(64, (3,3), activation=None, padding='same', strides=(1, 1)),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.MaxPooling2D((2, 2), padding='same'),
#             layers.Conv2D(128, (3,3), activation=None, padding='same', strides=(1, 1)),
#             layers.BatchNormalization(),
#             layers.Activation(activation='relu'),
#             layers.Conv2D(128, (3,3), activation=None, padding='same', strides=(1, 1)),
#             layers.BatchNormalization(),
#             layers.Activation(activation='relu'),
#             layers.Conv2D(128, (3,3), activation=None, padding='same', strides=(1, 1)),
#             layers.BatchNormalization(),
#             layers.Activation(activation='relu'),
#             layers.MaxPooling2D((2, 2), padding='same'),
        ])

        self.decoder = tf.keras.Sequential([
#             layers.Conv2D(128, kernel_size=3, strides=(1, 1), activation=None, padding='same'),
#             layers.BatchNormalization(),
#             layers.Activation(activation='relu'),
#             layers.Conv2D(128, kernel_size=3, strides=(1, 1), activation=None, padding='same'),
#             layers.BatchNormalization(),
#             layers.Activation(activation='relu'),
#             layers.Conv2D(128, kernel_size=3, strides=(1, 1), activation=None, padding='same'),
#             layers.BatchNormalization(),
#             layers.Activation(activation='relu'),
#             layers.UpSampling2D((2, 2)),
            layers.Conv2D(64, kernel_size=3, strides=(1, 1), activation=None, padding='same'),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.Conv2D(64, kernel_size=3, strides=(1, 1), activation=None, padding='same'),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.Conv2D(64, kernel_size=3, strides=(1, 1), activation=None, padding='same'),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.UpSampling2D((2, 2)),
            layers.Conv2D(32, kernel_size=3, strides=(1, 1), activation=None, padding='same'),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.Conv2D(32, kernel_size=3, strides=(1, 1), activation=None, padding='same'),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.Conv2D(32, kernel_size=3, strides=(1, 1), activation=None, padding='same'),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.UpSampling2D((2, 2)),
            layers.Conv2D(16, kernel_size=3, strides=(1, 1), activation=None, padding='same'),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.Conv2D(16, kernel_size=3, strides=(1, 1), activation=None, padding='same'),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.Conv2D(16, kernel_size=3, strides=(1, 1), activation=None, padding='same'),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.UpSampling2D((2, 2)),
            layers.Conv2D(8, kernel_size=3, strides=(1, 1), activation=None, padding='same'),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.Conv2D(8, kernel_size=3, strides=(1, 1), activation=None, padding='same'),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.Conv2D(8, kernel_size=3, strides=(1, 1), activation=None, padding='same'),
            layers.BatchNormalization(),
            layers.Activation(activation='relu'),
            layers.UpSampling2D((2, 2)),
            layers.Conv2D(number_of_channels, kernel_size=(3,3), activation='relu', padding='same'),
        ])

    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

fill_autoencoder = Fill(image_width=image_width, image_height=image_height)
#fill_autoencoder = create_model(input_shape=(image_height, image_width, 3, ), pool_size=(1, 1))
#fill_autoencoder = create_model_fixed(input_shape=(image_height, image_width, 3, ), pool_size=(1, 1))
#fill_autoencoder = create_model_2()

In [None]:
from tensorflow.keras import optimizers

optimizer = optimizers.Adam(learning_rate=1e-4)
fill_autoencoder.compile(optimizer=optimizer, loss='mean_squared_error', metrics=['acc'])

In [None]:
X_image = X_train[0]
plt.imshow(X_image)
plt.show()

In [None]:
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping

reduce_learning_rate_callback = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.75,
    patience=3,
    min_delta=1e-5, 
    mode='min',
    verbose=1
)

early_stopping_callback = EarlyStopping(
    monitor='val_loss',
    patience=10,
    min_delta=1e-5, 
    mode='min',
    verbose=1
)

fill_autoencoder.fit(X_train, y_train,
                epochs=1000,
                shuffle=True,
                batch_size=2,
                validation_data=(X_valid, y_valid),
                verbose=1, 
                callbacks=[
                    reduce_learning_rate_callback,
                    early_stopping_callback,
                    #ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, verbose=1, min_delta=1e-5, mode='min')
                ]
)

In [None]:
fill_autoencoder.summary()

In [None]:
y_pred = fill_autoencoder.predict(X_test)

In [None]:
i = 1

plt.imshow(X_test[i])
plt.show()

plt.imshow(y_test[i])
plt.show()

prediction = y_pred[i]
plt.imshow(prediction, vmin=0.0, vmax=1.0)
plt.show()

In [None]:
np.amin(y_test[i])

In [None]:
np.amin(prediction)

In [None]:
np.amax(y_test[i])

In [None]:
np.amax(prediction)