In [None]:
import cv2
import numpy as np

In [None]:
import glob
import pickle
import matplotlib.pyplot as plt
%matplotlib inline

# Camera Calibration
# first prepare object and image points by running the findChessboardCorners on all test images
# then save 

# path to image dirs
base_path = './training/'
calibration_path = 'camera_cal/'
output_path = 'cali_out/'
saved_calibration_file = 'calibration_output.p'


def read_image(image_path):
    """
        read the image in the given path
        as cv2.imread returns BGR images, apply transformation to expected RGB
    """
    image = cv2.imread(image_path)
    return cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

# simple helper to read an image and convert it to gray scale
def read_and_convert_to_gray_scale(image_path):
    image = read_image(image_path)
    return convert_to_gray_scale(image)

# convert BGR image to gray scale
def convert_to_gray_scale(image):
    return cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)

def persist_calibration_info(camera_matrix, distortion_coeffs):
    dist_pickle = {}
    dist_pickle["camera_matrix"] = camera_matrix
    dist_pickle["distortion_coeffs"] = distortion_coeffs
    pickle.dump(dist_pickle, open(base_path + output_path + saved_calibration_file, "wb"))

# use provided calibration images to find object and image points needed for camera calibration computations
def calibrate_camera(nx=9, ny=6):
    # prepare object points, like (0,0,0), (1,0,0), (2,0,0) ....,(6,5,0)
    objp = np.zeros((ny * nx, 3), np.float32)
    objp[:,:2] = np.mgrid[0:nx, 0:ny].T.reshape(-1,2)

    # Arrays to store object points and image points from all the images.
    obj_points = [] # 3d points in real world space
    img_points = [] # 2d points in image plane.

    image_regex = 'calibration*.jpg'
    images = glob.glob(base_path + calibration_path + image_regex)

    for idx, image in enumerate(images):
        # get gray scale image
        gray = read_and_convert_to_gray_scale(image)

        # find chessboard corners
        found, corners = cv2.findChessboardCorners(gray, (nx, ny), None)

        # If found, add object points, image points
        if found == True:
            obj_points.append(objp)
            img_points.append(corners)

    ret, mtx, dist, rvecs, tvecs = cv2.calibrateCamera(obj_points, img_points, gray.shape[::-1], None, None)
    return mtx, dist


img = read_image(base_path + calibration_path + 'calibration2.jpg')
mtx, dist = calibrate_camera()
dst = cv2.undistort(img, mtx, dist, None, None)
cv2.imwrite(base_path + output_path + 'calibration2_undistorted.jpg', dst)

# Visualize undistortion
f, (ax1, ax2) = plt.subplots(1, 2, figsize=(20,10))
ax1.imshow(img)
ax1.set_title('Original Image', fontsize=30)
ax2.imshow(dst)
ax2.set_title('Undistorted Image', fontsize=30)




In [None]:
# transformation and thresholding functions.

# define filter/transformtions
# 1 absolute sobel threshold
# 2 gradient magnitude threshold
# 3 gradient direction threshold
# 4 color space threshold(s)
# 5 maybe try laplace

def apply_threshold_to_image(image, threshold):
    binary_output = np.zeros_like(image)
    binary_output[(image >= threshold[0]) & (image <= threshold[1])] = 1
    return binary_output

def scale_to_eight_bit(image):
    return np.uint8(255 * image / np.max(image))

def scale_and_apply_threshold(image, threshold):
    scaled = scale_to_eight_bit(image)
    return apply_threshold_to_image(scaled, threshold)

def apply_x_and_y_sobel(image, sobel_kernel=3):
    sobel_x = cv2.Sobel(image, cv2.CV_64F, 1, 0, ksize=sobel_kernel)
    sobel_y = cv2.Sobel(image, cv2.CV_64F, 0, 1, ksize=sobel_kernel)
    return sobel_x, sobel_y

# applies sobel operator on image and applies threshold
# assumes image has been read by using cv2.imread, resulting in an BGR image.
def absolute_sobel_threshold(image, orient='x', sobel_kernel=3, threshold=(0, 255)):
    """
        transform image to binary image using the the sobel operator for a given direction and threshold
    """

    gray = convert_to_gray_scale(image)
    
    # Calculate directional gradient
    orientation = (1, 0) if orient == 'x' else (0, 1)
    sobel = cv2.Sobel(gray, cv2.CV_64F, orientation[0], orientation[1], ksize=sobel_kernel)
    
    abs_sobel = np.absolute(sobel)

    return scale_and_apply_threshold(abs_sobel, threshold)

# finds the magnitude of the gradient and filters given the threshold
# assumes image has been read by using cv2.imread, resulting in an BGR image.
def magnitude_threshold(image, sobel_kernel=3, threshold=(0, 255)):
    """
        transform image to binary image using the magnitude of the gradient and specified threshold
    """

    gray = convert_to_gray_scale(image)
    
    # take the gradient in x and y separately
    sobel_x, sobel_y = apply_x_and_y_sobel(gray, sobel_kernel=sobel_kernel)
    
    # calculate the magnitude 
    magnitude = np.sqrt(np.multiply(sobel_x, sobel_x) + np.multiply(sobel_y, sobel_y))
    
    return scale_and_apply_threshold(magnitude, threshold)

# threshold on the direction of gradient
# assumes image has been read by using cv2.imread, resulting in an BGR image.
def gradient_direction_threshold(image, sobel_kernel=3, threshold=(0, np.pi/2)):
    """
        transform image to binary image using the direction of the gradient and specified threshold
    """
    
    gray = convert_to_gray_scale(image)
    
    # take the gradient in x and y separately
    sobel_x, sobel_y = apply_x_and_y_sobel(gray, sobel_kernel=sobel_kernel)
    
    abs_sobel_x = np.absolute(sobel_x)
    abs_sobel_y = np.absolute(sobel_y)
    
    # calculate the direction of the gradient 
    gradient_dir = np.arctan2(abs_sobel_y, abs_sobel_x)
    
    return apply_threshold_to_image(gradient_dir, threshold)
    

In [None]:
# Remove distortion from images
def undistort(image):
    """
        undistort an image according to the coefficients computed above.
    """
    global mtx, dist
    undist = cv2.undistort(image, mtx, dist, None, mtx)
    return undist

In [None]:
# compute transformation matrixes
def get_perspective_transform(image, src):
    """
        compute M and Minv for image warping from the given src and dst points
    """
    img_size = image.shape
    dst = np.array([[0.23*img_size[1], 0.14*img_size[0]],
                    [0.77*img_size[1], 0.14*img_size[0]],
                    [0.77*img_size[1], img_size[0]],
                    [0.23*img_size[1], img_size[0]]], np.float32)
    M = cv2.getPerspectiveTransform(src, dst)
    Minv = cv2.getPerspectiveTransform(dst, src)
    return M, Minv

In [None]:
# Perform perspective transform
def warp_image(undistorted_image, M):
    """
        warp perspective of the image with the given matrix M
    """
    img_size = (undistorted_image.shape[1], undistorted_image.shape[0])
    warped = cv2.warpPerspective(undistorted_image, M, img_size, flags=cv2.INTER_LINEAR)
    return warped

In [None]:
from math import atan2, ceil, cos, sin

# use hough transform to find possible base points for warping the image to bird's eye view
def find_perspective_transform_src_points(image):
    """
        using hough transform to find suitable src coordinates for image transformation
    """
    # Computing perspective points automatically
    rho = 2              # distance resolution in pixels of the Hough grid
    theta = np.pi/180  # angular resolution in radians of the Hough grid
    threshold = 100       # minimum number of votes (intersections in Hough grid cell)
    min_line_length = 100 # minimum number of pixels making up a line
    max_line_gap = 25    # maximum gap in pixels between connectable line segments

    angle_min_mag = 25 * np.pi/180
    angle_max_mag = 40 * np.pi/180

    lane_markers_x = [[], []]
    lane_markers_y = [[], []]
    
    masked_image = np.copy(image)
    masked_image[:image.shape[0]*6//10,:] = 0
    lines = cv2.HoughLinesP(masked_image, rho, theta, threshold, min_line_length, max_line_gap)
    for line in lines:
        for x1, y1, x2, y2 in line:
            theta = atan2(y1-y2, x2-x1)
            rho = ((x1+x2) * cos(theta) + (y1+y2) * sin(theta))/2

            if (abs(theta) >= angle_min_mag and abs(theta) <= angle_max_mag):
                if theta > 0: # positive theta is downward in image space?
                    i = 0 # Left lane marker
                else:
                    i = 1 # Right lane marker
                lane_markers_x[i].append(x1)
                lane_markers_x[i].append(x2)
                lane_markers_y[i].append(y1)
                lane_markers_y[i].append(y2)

    if len(lane_markers_x[0]) < 1 or len(lane_markers_x[1]) < 1:
        # Failed to find two lane markers, falling back to defaults based on image size.
        img_size = image.shape
        return np.array([[0.45*img_size[1], 0.63*img_size[0]],
                         [0.55*img_size[1], 0.63*img_size[0]],
                         [0.88*img_size[1], img_size[0]],
                         [0.15*img_size[1], img_size[0]]], np.float32)
    
    
    p_left  = np.polyfit(lane_markers_y[0], lane_markers_x[0], 1)
    p_right = np.polyfit(lane_markers_y[1], lane_markers_x[1], 1)
    
    # Find intersection of the two lines
    apex_pt = np.linalg.solve([[p_left[0], -1], [p_right[0], -1]], [-p_left[1], -p_right[1]])
    top_y = ceil(apex_pt[0] + 0.075*image.shape[0])
    
    bl_pt = ceil(np.polyval(p_left, image.shape[0]))
    tl_pt = ceil(np.polyval(p_left, top_y))
    
    br_pt = ceil(np.polyval(p_right, image.shape[0]))
    tr_pt = ceil(np.polyval(p_right, top_y))

    src_points = np.array([[tl_pt, top_y],
                    [tr_pt, top_y],
                    [br_pt, image.shape[0]],
                    [bl_pt, image.shape[0]]], np.float32)

    return src_points

In [None]:
# apply thresholds to image
# assumes that image has been undistorted and warped
def apply_thresholds(image, draw_images=False):
    """
        compute various binary images using different channels and thresholds
        combine them into final image which will show the street lanes in white
    """
    
    s_channel = cv2.cvtColor(image, cv2.COLOR_RGB2HLS)[:,:,2]
    l_channel = cv2.cvtColor(image, cv2.COLOR_RGB2LUV)[:,:,0]
    b_channel = cv2.cvtColor(image, cv2.COLOR_RGB2LAB)[:,:,2]
    
    s_binary = apply_threshold_to_image(s_channel, (150, 255)) # picks up yellow very well
    l_binary = apply_threshold_to_image(l_channel, (225, 255)) # does not pick up yellow, but white
    b_binary = apply_threshold_to_image(b_channel, (150, 200))
    
    combined_channel_binary = np.zeros_like(s_binary)
    combined_channel_binary[(((l_binary == 1) & (s_binary == 1)) | (b_binary == 1))] = 1
    
    mag_binary = magnitude_threshold(image, sobel_kernel=3, threshold=(30, 100))
    direction_binary = gradient_direction_threshold(image, sobel_kernel=15, threshold=(0.7, 1.3))
    sobel_x = absolute_sobel_threshold(image, orient='x', sobel_kernel=9, threshold=(10, 100))
    sobel_y = absolute_sobel_threshold(image, orient='y', sobel_kernel=9, threshold=(10, 100))
    edges_binary = np.zeros_like(sobel_x)
    edges_binary[(sobel_x == 1) & (sobel_y == 1) & (mag_binary == 1) & (direction_binary == 1)] = 1
    
    overall_binary = np.zeros_like(mag_binary)
    overall_binary[(combined_channel_binary == 1) | (edges_binary == 1)] = 1
    
    
    if draw_images:
        f, ((ax1, ax2, ax3), (ax4, ax5, ax6)) = plt.subplots(2, 3, figsize=(20,10))
        ax1.set_title('s channel')
        ax1.imshow(s_binary, cmap='gray')

        ax2.set_title('b channel')
        ax2.imshow(b_binary, cmap='gray')

        ax3.set_title('edges combined')
        ax3.imshow(edges_binary, cmap='gray')

        ax4.set_title('channels combined')
        ax4.imshow(combined_channel_binary, cmap='gray')

        ax5.set_title('combined')
        ax5.imshow(overall_binary, cmap='gray')

        ax6.set_title('original')
        ax6.imshow(image)
        
    return overall_binary

In [None]:
from scipy.signal import find_peaks_cwt

def find_lane_base_points(image, peak_threshold=25.0):
    """
        use a histogram to find possible base points for lane lines
    """
    hist = np.sum(image[int(image.shape[0]*0.5):,:], axis=0)
    idx = find_peaks_cwt(hist, [100, 125, 150], max_distances=[100, 125, 150], noise_perc=50) 

    if len(idx) < 2: # should rarely ever happen if there's lanes in the picture
        return None

    # filter peaks to avoid noise.
    width = image.shape[1]
    idx = [i for i in idx if i > width * 0.075 # not on the far left
           and i < width * 0.875 # neither on the far right
           and max(hist[i-50:i+50]) > peak_threshold] # and has a minimum height
    return [min(idx), max(idx)]

In [None]:
def find_lanes(image, left_lane, right_lane, base_pts, num_bands=15, window_width=0.2):
    """
        using a histogram and a sliding window, find lane pixels in the given iamge
    """

    # setup variables
    height, width = image.shape[0], image.shape[1]
    band_height = int(1./num_bands * height)
    band_width = int(window_width * width)

    # arrays to track points for left and right
    l_x, l_y, r_x, r_y = [], [], [], []

    base_left, base_right = base_pts

    # from bottom to top, go step wise through the image
    for i in reversed(range(num_bands)):
        w_left = image[i*band_height:(i+1)*band_height,base_left-band_width//2:base_left+band_width//2]
        w_right = image[i*band_height:(i+1)*band_height,base_right-band_width//2:base_right+band_width//2]
        
        left_y_pt, left_x_pt = np.nonzero(w_left)
        right_y_pt, right_x_pt = np.nonzero(w_right)
        
        l_x.extend(left_x_pt + base_left-band_width//2)
        l_y.extend(left_y_pt + i*band_height)
        r_x.extend(right_x_pt+ base_right-band_width//2)
        r_y.extend(right_y_pt+ i*band_height)

        # Find 'x' with maximum nonzero elements as baseline for next window
        s_left = np.sum(w_left, axis=0)
        s_right = np.sum(w_right, axis=0)
        if np.any(s_left > 0):
            base_left = np.argmax(s_left) + base_left-band_width//2
        if np.any(s_right > 0):
            base_right = np.argmax(s_right) + base_right-band_width//2
    
    # update lane with points found
    left_lane.add_lane_pixels(l_x, l_y)
    right_lane.add_lane_pixels(r_x, r_y)

    return left_lane, right_lane


In [None]:
# run for all test images
for image_path in glob.glob('./training/test_images/test*.jpg'):
    image = read_image(image_path)
    undistorted = undistort(image)
    combined_binary = apply_thresholds(image, draw_images=False)
    src_points = find_perspective_transform_src_points(combined_binary)
    M, _ = get_perspective_transform(combined_binary, src_points)
    warped = warp_image(combined_binary, M)

In [None]:
# define line class to keep track of lane in each frame (base idea taken from class material)
import collections
from itertools import chain, repeat
    
# conversion factors
ym_per_pix = 30./720.
xm_per_pix = 3.7/700.

class Lane():
    def __init__(self, base_pt, img_size, error_threshold=0.025):
        
        # was the line detected in the last iteration?
        self.detected = False  
        
        # x and y values of the last n fits of the line
        self.recent_xfitted = collections.deque(maxlen=10)
        self.recent_yfitted = collections.deque(maxlen=10)

        #polynomial coefficients for the most recent fit
        self.current_fit = [np.array([False])]
        
        #radius of curvature of the line in some units
        self.radius_of_curvature = None 

        self.current_xfit = None
        
        self.img_size = img_size
        self.base_pt = base_pt
        
        self.yvals = np.linspace(0, img_size[0], 101) * 7.2 
        self.mask = np.ones(img_size, dtype=np.uint8)*255
        
        self.dropped_frames = 0
        self.error_threshold = error_threshold
    
    def add_lane_pixels(self, x, y):
        # Use all pixels from previous detections for curve fit
        x_hist = np.fromiter(chain(*self.recent_xfitted, x), np.int32)
        y_hist = np.fromiter(chain(*self.recent_yfitted, y), np.int32)

        try:
            p_lane = np.polyfit(y_hist, x_hist, 2)
            curvature = Lane.compute_curvature(x_hist, y_hist)
            self.radius_of_curvature = curvature
            self.detected = self.sanity_check_lane(curvature)
        except:
            self.detected = False

        if self.detected:
            x_fit = p_lane[0] * self.yvals**2 + p_lane[1] * self.yvals + p_lane[2]
        
            self.current_xfit = x_fit   # For drawing

            self.recent_xfitted.append(x_fit)
            self.recent_yfitted.append(self.yvals)
            
            self.dropped_frames = 0
        else:
            # use last fit as sanity check failed
            x_fit = self.current_fit[0]*self.yvals**2 + self.current_fit[1]*self.yvals + self.current_fit[2]
            self.dropped_frames += 1
        
        self.update_mask(x_fit)
        
    # compute curvate in real world space as defined in class material
    @staticmethod
    def compute_curvature(yvals, xvals):
        fit_cr = np.polyfit(yvals * ym_per_pix, xvals * xm_per_pix, 2)
        y_eval = np.max(yvals)
        return ((1 + (2 * fit_cr[0] * y_eval + fit_cr[1])**2)**1.5) / np.absolute(2 * fit_cr[0])
    
    def sanity_check_lane(self, curvature):
        # Return true if there is no prior data
        if self.radius_of_curvature is None:
            return True
        
        # check difference between given and previously computed curvature against the error threshold
        k = 1. / curvature
        k0 = 1. / self.radius_of_curvature
        return abs(k - k0) / k0 <= self.error_threshold

    def detect_from_mask(self, image):
        mask_lanes = cv2.bitwise_and(image, self.mask)
        all_pts = cv2.findNonZero(mask_lanes)
        if all_pts is not None:
            all_pts = all_pts.reshape((-1,2))
            self.add_lane_pixels(all_pts[:,0], all_pts[:,1])
        else:
            self.detected = False
    
    def update_mask(self, x_fit):
        self.mask.fill(0)
        pts = np.transpose(np.vstack([x_fit, self.yvals])).reshape((-1,1,2)).astype(np.int32)
        cv2.drawContours(self.mask, pts, -1, (255, 255, 255), thickness=80)


In [None]:
def process_image(image):
    global cam_mtx, cam_dist
        
    if process_image.cache is None:
                
        left_lane = Lane(int(0.16*image.shape[0]), image.shape[:2])
        right_lane = Lane(int(0.62*image.shape[0]), image.shape[:2])

        cache = {
            'mtx': mtx,
            'dist': dist,
            'M': None,
            'Minv': None,
            'left': left_lane,
            'right': right_lane,
            'base_pts': None
        }

    else:
        cache = process_image.cache
    

    left_lane = cache['left']
    right_lane = cache['right']

    # preprocess image and find lanes using thresholding defined above
    undistorted = undistort(image)
    combined_binary = apply_thresholds(undistorted)
    
    if cache['M'] is None:
        src = find_perspective_transform_src_points(combined_binary)
        M, Minv = get_perspective_transform(image, src)
        cache['M'] = M
        cache['Minv'] = Minv
    else:
        M, Minv = cache['M'], cache['Minv']

    warped = warp_image(combined_binary, M)
    
    base_pts = cache['base_pts']
    if base_pts is None:
        base_pts = find_lane_base_points(warped)

    if ((left_lane is None or not left_lane.dropped_frames > 16) 
            or (right_lane is None or not right_lane.dropped_frames > 16)):
        # detect from scratch
        find_lanes(warped, left_lane, right_lane, base_pts)
    else:
        left_lane.detect_from_mask(warped)
        right_lane.detect_from_mask(warped)

    cache['base_pts'] = base_pts
    process_image.cache = cache
    
    # create an image to draw the lines on
    color_warp = np.zeros_like(image).astype(np.uint8)
    
    yvals = left_lane.yvals
    left_fitx = left_lane.current_xfit
    right_fitx = right_lane.current_xfit
    
    # create an image to draw the lines on
    color_warp = np.zeros_like(image).astype(np.uint8)

    # recast the x and y points into usable format for cv2.fillPoly()
    pts_left = np.array([np.transpose(np.vstack([left_fitx, yvals]))])
    pts_right = np.array([np.flipud(np.transpose(np.vstack([right_fitx, yvals])))])
    pts = np.hstack((pts_left, pts_right))

    # draw the lane onto the warped blank image
    cv2.fillPoly(color_warp, np.int_([pts]), (0, 255, 0))
    
    # warp the blank back to original image space using inverse perspective matrix (Minv)
    new_warp = warp_image(color_warp, Minv)
    
    # combine the result with the original image
    result = cv2.addWeighted(undistorted, 1, new_warp, 0.3, 0)
    
    # print position off center and curvature information in image.
    middle = (left_fitx[-1] + right_fitx[-1]) / 2.
    veh_pos = image.shape[1] / 2. # assuming the camera is placed in the middle of the car.
    
    deviation = (veh_pos - middle) * xm_per_pix

    label_left_curvature = 'left radius of curvature = {:.2f} m'.format(left_lane.radius_of_curvature)
    label_right_curvature = 'right radius of curvature = {:.2f} m'.format(right_lane.radius_of_curvature)
    label_off_center = 'vehicle is {:.2f}m {} off center'.format(abs(deviation), 'right' if deviation > 0 else 'left')
    cv2.putText(result, label_left_curvature, (10, result.shape[0] - 600), 
                cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2)
    cv2.putText(result, label_right_curvature, (10, result.shape[0] - 550), 
                cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2)
    cv2.putText(result, label_off_center, (10, result.shape[0] - 500), 
                cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2)

    cache['left'] = left_lane
    cache['right'] = right_lane

    return result

In [None]:
# import video editing classes
from moviepy.editor import VideoFileClip
from IPython.display import HTML

def process_video(output_path, input_path):
    process_image.cache = None # that's kind of a hack to initialize the cache of the process_image function
    input_file = VideoFileClip(input_path)
    standard_clip = input_file.fl_image(process_image) #NOTE: this function expects color images!!
    %time standard_clip.write_videofile(output_path, audio=False, threads=4)
    return output_path
    

In [None]:
# standard video
output = process_video('./training/project_4_standard.mp4', "./training/project_video.mp4")

In [None]:
HTML("""
<video width="960" height="540" controls>
  <source src="{0}">
</video>
""".format(output))

In [None]:
# challenge video
output_challenge = process_video('./training/project_4_challenge.mp4', "./training/challenge_video.mp4")

In [None]:
HTML("""
<video width="960" height="540" controls>
  <source src="{0}">
</video>
""".format(output_challenge))

In [None]:
# hard challenge video
output_hard = process_video('./training/project_4_harder_challenge.mp4', "./training/harder_challenge_video.mp4")

In [None]:
HTML("""
<video width="960" height="540" controls>
  <source src="{0}">
</video>
""".format(output_hard))