# Vehicle detection

In [4]:
import numpy as np
import cv2
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
%matplotlib inline
import glob
import os.path
import pickle
import time
from sklearn.svm import LinearSVC, SVC
from sklearn.ensemble import AdaBoostClassifier, RandomForestClassifier
from sklearn.preprocessing import StandardScaler, RobustScaler
from skimage.feature import hog
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from scipy.ndimage.measurements import label

from moviepy.editor import VideoFileClip
from IPython.display import HTML

### Config

In [5]:
color_space = 'YCrCb' # RGB, HSV, LUV, HLS, YUV or YCrCb

## HOG
hog_feat = True # HOG features on or off
orient = 9 # number of orientation bins
pix_per_cell = 8 # size of a cell in pixels
cell_per_block = 2 # number of cells per block
hog_channel = 'ALL' # image channel to include. Can be 0, 1, 2, or "ALL"

## Spatial
spatial_feat = True # Spatial features on or off
spatial_size = (32, 32) # Spatial binning dimensions

## Histogram
hist_feat = True # Histogram features on or off
hist_bins = 32    # Number of histogram bins

## Misc
window_size = (64, 64) # default sliding window size
y_start_stop = [390, 670] # min and max y coordinate to search in slide_window()

### Features

In [6]:
def bin_spatial(img, size=spatial_size):
    """
    Get a vector of spatial features. 
    
    This is just the raw pixels in a resized version of the original image.
    """
    features = cv2.resize(img, size).ravel() 
    return features

def color_hist(img, nbins=hist_bins, bins_range=(0, 256)):
    """
    Get a vector containing histograms of the color channels in the image.
    """
    # Compute the histogram of the color channels separately
    channel1_hist = np.histogram(img[:,:,0], bins=nbins, range=bins_range)
    channel2_hist = np.histogram(img[:,:,1], bins=nbins, range=bins_range)
    channel3_hist = np.histogram(img[:,:,2], bins=nbins, range=bins_range)
    # Concatenate the histograms into a single feature vector
    hist_features = np.concatenate((channel1_hist[0], channel2_hist[0], channel3_hist[0]))
    return hist_features

def get_hog_features(img, orient=orient, pix_per_cell=pix_per_cell, 
                     cell_per_block=cell_per_block, 
                     vis=False, feature_vec=True):
    """
    Return HOG features and optionally a visualization.
    """
    # Call with two outputs if vis==True
    if vis == True:
        features, hog_image = hog(img, orientations=orient, 
                                  pixels_per_cell=(pix_per_cell, pix_per_cell),
                                  cells_per_block=(cell_per_block, cell_per_block), 
                                  transform_sqrt=True, 
                                  visualise=vis, feature_vector=feature_vec)
        return features, hog_image
    # Otherwise call with one output
    else:      
        features = hog(img, orientations=orient, 
                       pixels_per_cell=(pix_per_cell, pix_per_cell),
                       cells_per_block=(cell_per_block, cell_per_block), 
                       transform_sqrt=True, 
                       visualise=vis, feature_vector=feature_vec)
        return features
    
def convert_color(img, color_space):
    """
    Convert image to `color_space`.
    
    Image is assumed to be in RGB.
    `color_space` should be one of HSV, LUV, HLS, YUV or YCrCb.
    """
    if color_space == 'HSV':
        return cv2.cvtColor(img, cv2.COLOR_RGB2HSV)
    elif color_space == 'LUV':
        return cv2.cvtColor(img, cv2.COLOR_RGB2LUV)
    elif color_space == 'HLS':
        return cv2.cvtColor(img, cv2.COLOR_RGB2HLS)
    elif color_space == 'YUV':
        return cv2.cvtColor(img, cv2.COLOR_RGB2YUV)
    elif color_space == 'YCrCb':
        return cv2.cvtColor(img, cv2.COLOR_RGB2YCrCb)
    else:
        return img

### Feature extraction

In [7]:
def extract_features(img, color_space=color_space, spatial_size=spatial_size,
                     hist_bins=hist_bins, orient=orient, 
                     pix_per_cell=pix_per_cell, cell_per_block=cell_per_block, 
                     hog_channel=hog_channel, spatial_feat=spatial_feat, 
                     hist_feat=hist_feat, hog_feat=hog_feat, precomputed_hog_features=None):
    """
    Extract features from a single image.
    """
    
    if isinstance(img, str):
        img = read_img(img)
    
    #1) Define an empty list to receive features
    img_features = []
    #2) Apply color conversion if other than 'RGB'
    feature_image = convert_color(img, color_space)
    #3) Compute spatial features if flag is set
    if spatial_feat == True:
        spatial_features = bin_spatial(feature_image, size=spatial_size)
        #4) Append features to list
        img_features.append(spatial_features)
    #5) Compute histogram features if flag is set
    if hist_feat == True:
        hist_features = color_hist(feature_image, nbins=hist_bins)
        #6) Append features to list
        img_features.append(hist_features)
    #7) Compute HOG features if flag is set
    if hog_feat == True:
        if precomputed_hog_features is not None:
            hog_features = precomputed_hog_features
        elif hog_channel == 'ALL':
            hog_features = []
            for channel in range(feature_image.shape[2]):
                hog_features.extend(get_hog_features(feature_image[:,:,channel], 
                                    orient, pix_per_cell, cell_per_block, 
                                    vis=False, feature_vec=True))      
        else:
            hog_features = get_hog_features(feature_image[:,:,hog_channel], orient, 
                        pix_per_cell, cell_per_block, vis=False, feature_vec=True)
        #8) Append features to list
        img_features.append(hog_features)

    #9) Return concatenated array of features
    return np.concatenate(img_features)

### Training data

In [8]:
images = glob.glob('data/**/*.png', recursive=True)
cars = []
notcars = []

for image in images:
    if 'non-vehicles' in image:
        notcars.append(image)
    else:
        cars.append(image)    

In [9]:
def read_img(fpath):
    img = mpimg.imread(fpath)
    if '.png' in fpath:
        return (img*255).astype(np.uint8)
    else:
        return img.astype(np.uint8)

In [10]:
# Split into training and test sets
cars_train, cars_test = train_test_split(cars)
notcars_train, notcars_test = train_test_split(notcars)

augmentation_factor = 0 # optionally, augment training data with randomly "jittered" examples

X_train = []
for img in np.concatenate((cars_train, notcars_train)):
    img = read_img(img)
    X_train.append(extract_features(img))
    for _ in range(augmentation_factor):
        X_train.append(extract_features(augmentation_pipeline(img)))

X_train = np.array(X_train)
y_train = np.concatenate((np.ones(len(cars_train)*(1+augmentation_factor), np.uint8), 
                          np.zeros(len(notcars_train)*(1+augmentation_factor), np.uint8)))
X_train, y_train = shuffle(X_train, y_train)

X_test = []
for img in np.concatenate((cars_test, notcars_test)):
    X_test.append(extract_features(img))
X_test = np.array(X_test)
y_test = np.concatenate((np.ones_like(cars_test, np.uint8), np.zeros_like(notcars_test, np.uint8)))
X_test, y_test = shuffle(X_test, y_test)

### Data normalization

In [11]:
# Use a scaler to normalize data
scaler = RobustScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

### Classifiers

In [12]:
if os.path.exists('lsvcclassifier.p'):
    lsvc = pickle.load(open('lsvcclassifier.p', 'rb'))
else:
    lsvc = LinearSVC(C=0.25)
    lsvc.fit(X_train, y_train)
    pickle.dump(lsvc, open('lsvcclassifier.p', 'wb'))

svc = lsvc
print("Accuracy: {:.4f}".format(lsvc.score(X_test, y_test)))

Accuracy: 0.9982


In [10]:
if os.path.exists('adaboostclassifier.p'):
    booster = pickle.load(open('adaboostclassifier.p', 'rb'))
else:
    booster = AdaBoostClassifier()
    booster.fit(X_train, y_train)
    pickle.dump(booster, open('adaboostclassifier.p', 'wb'))

print("Accuracy: {:.4f}".format(booster.score(X_test, y_test)))

Accuracy: 0.9953


### Sliding windows

In [11]:
def slide_window(img, x_start_stop=[None, None], y_start_stop=y_start_stop, 
                    xy_window=window_size, xy_overlap=(0.5, 0.5)):
    """
    Return a list of sliding windows (boxes) given x and y ranges, size of window
    and the percentage overlap between consecutive windows.
    """
    # If x and/or y start/stop positions not defined, set to image size
    if x_start_stop[0] == None:
        x_start_stop[0] = 0
    if x_start_stop[1] == None:
        x_start_stop[1] = img.shape[1]
    if y_start_stop[0] == None:
        y_start_stop[0] = 0
    if y_start_stop[1] == None:
        y_start_stop[1] = img.shape[0]
    # Compute the span of the region to be searched    
    xspan = x_start_stop[1] - x_start_stop[0]
    yspan = y_start_stop[1] - y_start_stop[0]
    # Compute the number of pixels per step in x/y
    nx_pix_per_step = np.int(xy_window[0]*(1 - xy_overlap[0]))
    ny_pix_per_step = np.int(xy_window[1]*(1 - xy_overlap[1]))
    # Compute the number of windows in x/y
    nx_buffer = np.int(xy_window[0]*(xy_overlap[0]))
    ny_buffer = np.int(xy_window[1]*(xy_overlap[1]))
    nx_windows = np.int((xspan-nx_buffer)/nx_pix_per_step) 
    ny_windows = np.int((yspan-ny_buffer)/ny_pix_per_step) 
    # Initialize a list to append window positions to
    window_list = []
    # Loop through finding x and y window positions
    # Note: you could vectorize this step, but in practice
    # you'll be considering windows one by one with your
    # classifier, so looping makes sense
    for ys in range(ny_windows):
        for xs in range(nx_windows):
            # Calculate window position
            startx = xs*nx_pix_per_step + x_start_stop[0]
            endx = startx + xy_window[0]
            starty = ys*ny_pix_per_step + y_start_stop[0]
            endy = starty + xy_window[1]
            
            # Append window position to list
            window_list.append(((startx, starty), (endx, endy)))
    # Return the list of windows
    return window_list

### Window prediction

In [12]:
def search_windows(img, windows, classifiers, scaler, color_space=color_space, 
                    spatial_size=spatial_size, hist_bins=hist_bins, 
                    hist_range=(0, 256), orient=orient, 
                    pix_per_cell=pix_per_cell, cell_per_block=cell_per_block, 
                    hog_channel=hog_channel, spatial_feat=spatial_feat, 
                    hist_feat=hist_feat, hog_feat=hog_feat, window_size = window_size):
    """
    Run a prediction for a list of window coordinates. 
    
    Returns windows for which all classifiers returned a positive result.
    """
    #1) Create an empty list to receive positive detection windows
    on_windows = []
   
    #2) Iterate over all windows in the list
    for window in windows:
        #3) Extract the test window from original image
        test_img = cv2.resize(img[window[0][1]:window[1][1], window[0][0]:window[1][0]], (64, 64)) 
        #4) Extract features for that window
        features = extract_features(test_img, color_space=color_space, 
                            spatial_size=spatial_size, hist_bins=hist_bins, 
                            orient=orient, pix_per_cell=pix_per_cell, 
                            cell_per_block=cell_per_block, 
                            hog_channel=hog_channel, spatial_feat=spatial_feat, 
                            hist_feat=hist_feat, hog_feat=hog_feat)
        #5) Scale extracted features to be fed to classifiers
        test_features = scaler.transform(np.array(features).reshape(1, -1))
        #6) Predict using the classifiers
        predictions = [clf.predict(test_features) for clf in classifiers]
        #7) If positive (all predictions == True) then save the window
        if all(predictions):
            on_windows.append(window)
    #8) Return windows for positive detections
    return on_windows

In [13]:
def draw_labeled_bboxes(img, labels):
    """
    Draw boxes given a (car) labeled array.
    """
    # Iterate through all detected cars
    for car_number in range(1, labels[1]+1):
        # Find pixels with each car_number label value
        nonzero = (labels[0] == car_number).nonzero()
        # Identify x and y values of those pixels
        nonzeroy = np.array(nonzero[0])
        nonzerox = np.array(nonzero[1])
        # Define a bounding box based on min/max x and y
        bbox = ((np.min(nonzerox), np.min(nonzeroy)), (np.max(nonzerox), np.max(nonzeroy)))
        # If predicted box sufficiently wide draw the box on the image
        if bbox[1][0] - bbox[0][0] > 100:
            cv2.rectangle(img, bbox[0], bbox[1], (0,0,255), 6)

    return img

### Video/sequence processing

In [14]:
class RunningHeatmap:
    """
    Container for a heatmap formed by a sequence of box predictions.
    """
    def __init__(self, shape, history=60, threshold=3):
        self.history = history
        self.threshold = threshold
        self._heatmap = np.zeros(shape, np.uint32)
        self._boxes = []
        
    def add(self, boxes):
        """
        Add a list of boxes to the running heatmap.
        
        Method automatically removes old boxes if max history exceeded.
        """
        self._boxes.append(boxes)
        if len(self._boxes) > self.history:
            popped = self._boxes.pop(0)
            for box in popped:
                self._heatmap[box[0][1]:box[1][1], box[0][0]:box[1][0]] -= 1
        
        for box in boxes:
            self._heatmap[box[0][1]:box[1][1], box[0][0]:box[1][0]] += 1
            
    @property
    def heatmap(self):
        """
        Get the current tresholded value of the heatmap.
        """
        threshold = max(min(self.threshold, len(self._boxes)//2), 3)
        ret = np.clip(self._heatmap, 0, 255).astype(np.uint8)
        ret[ret <= self.threshold] = 0
        return ret

In [15]:
class Centroid:
    """
    Container class which tracks an object via its centroid.
    
    New detections which fall within the range of this object are
    considered to be a detection of the same object.
    """
    MAX_PIXEL_DISTANCE = 100
    
    def __init__(self, box):
        self.previous_box = box
        self.box = box
        self.draw_box = box
        self.activations = [True]
        self.update_center()
        
    def update_center(self):
        """
        Recaculate the center of the centroid after receiving a new box.
        """
        self.center = ((self.box[0][0]+self.box[1][0])//2, (self.box[0][1]+self.box[1][1])//2)

    def near(self, point):
        """
        Check if a point falls in the range of this centroid.
        
        Uses Euclidean distance.
        """
        distance = ((self.center[0]-point[0])**2 + (self.center[1]-point[1])**2)**0.5
        return distance <= self.MAX_PIXEL_DISTANCE
    
    def update_box(self, new_box):
        """
        Update the bounding box of the object with a new observation.
        """
        # take the average of the current box position with it's new position
        self.draw_box = (((self.box[0][0]+new_box[0][0])//2, (self.box[0][1]+new_box[0][1])//2),
                        ((self.box[1][0]+new_box[1][0])//2, (self.box[1][1]+new_box[1][1])//2))
        self.previous_box = self.box
        self.box = new_box
        self.update_center()
        
    def set_active(self):
        """
        Mark centroid as active at this time step.
        """
        self.activations.append(True)
        
    def set_inactive(self):
        """
        Mark centroid as inactive (not detected) at this time step.
        """
        self.activations.append(False)

## Detection entry point

In [16]:
class Detector:
    """
    Detector is the entry class for car detection.
    
    It keeps a running heatmap of detections in the image, 
    as well as a set of centroids for all detected objects.
    """
    MAX_INACTIVITY = 60 # max number of frames without activity
    MIN_ACTIVITY_COUNT = 3 # min number of activations before drawing
    
    def __init__(self, shape, classifiers, scaler):
        self.heatmap = RunningHeatmap(shape)
        self.classifiers = classifiers
        self.scaler = scaler
        self.centroids = set()
        
    def process_frame(self, image):
        """
        Given the next frame in a video returns the same image with
        boxes drawn around all detected cars.
        """
        # get sliding windows
        windows = slide_window(image, x_start_stop=[400, None], xy_window=(112, 112), xy_overlap=(0.6, 0.6))
        # search all windows for cars and get the 'hot' windows with a positive detection
        hot_windows = search_windows(image, windows, self.classifiers, self.scaler)
        # update the running heatmap with the latest hot windows
        self.heatmap.add(hot_windows)
        
        # get a labeled array from the heatmap, merging neighboring pixels which are
        # considered to overlap as part of a single object with multiple hot windows
        labels = label(self.heatmap.heatmap)
        boxes = []
        centroid_pts = []
        for i in range(1, labels[1]+1):
            nonzeroy, nonzerox = (labels[0] == i).nonzero()
            box = ((np.min(nonzerox), np.min(nonzeroy)), (np.max(nonzerox), np.max(nonzeroy)))
            centroid_pts.append(((box[0][0]+box[1][0])//2, (box[0][1]+box[1][1])//2))
            boxes.append(box)
        
        found_centroids = set()
        
        # loop through all detected boxes and their centroid points
        # and allocate points to current centroid objects if they fall in range
        for i, cpt in enumerate(centroid_pts):
            found = False
            for centroid in self.centroids:
                if centroid.near(cpt):
                    centroid.set_active()
                    centroid.update_box(boxes[i])
                    found = True
                    found_centroids.add(centroid)
            
            # if no capturing centroid object found, create a new one
            if not found:
                new_centroid = Centroid(boxes[i])
                self.centroids.add(new_centroid)
                found_centroids.add(new_centroid)
                
        # clean up old detections
        dead_centroids = set()
        for centroid in self.centroids - found_centroids:
            centroid.set_inactive()
            if len(centroid.activations) > self.MAX_INACTIVITY and not any(centroid.activations[-self.MAX_INACTIVITY:]):
                dead_centroids.add(centroid)
        
        for c in dead_centroids:
            self.centroids.remove(c)
        
        # draw boxes for objects with the minimum number of positive detections
        for centroid in self.centroids:
            if np.sum(centroid.activations) >= self.MIN_ACTIVITY_COUNT:
                cv2.rectangle(image, centroid.draw_box[0], centroid.draw_box[1], (0,0,255), 6)
        
        return image

### Detection on project video

In [17]:
detector = Detector((720, 1280), [svc, booster], scaler)
output_video = 'submission.mp4'
input_clip = VideoFileClip("project_video.mp4")
output_clip = input_clip.fl_image(detector.process_frame)
%time output_clip.write_videofile(output_video, audio=False)

[MoviePy] >>>> Building video submission.mp4
[MoviePy] Writing video submission.mp4


100%|█████████▉| 1260/1261 [12:24<00:00,  1.66it/s]


[MoviePy] Done.
[MoviePy] >>>> Video ready: submission.mp4 

CPU times: user 1h 52s, sys: 1min 8s, total: 1h 2min 1s
Wall time: 12min 24s
