# Vehicle Detection

# Stage 0: Preperation

In [1]:
# imports
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
import numpy as np
import cv2
import glob
import time
from sklearn.svm import LinearSVC
from sklearn.preprocessing import StandardScaler
from skimage.feature import hog
from sklearn.model_selection import train_test_split
%matplotlib inline

# set random seed
np.random.seed = 2

# Stage 1: Feature Extraction and Classifier Training

# 我，会让这混乱，恢复秩序！

In [2]:
'''some useful functions for feature extraction'''

# Define a function to compute binned color features  
def bin_spatial(img, size=(32, 32)):
    # Use cv2.resize().ravel() to create the feature vector
    features = cv2.resize(img, size).ravel() 
    # Return the feature vector
    return features

# Define a function to compute color histogram features  
def color_hist(img, nbins=32, bins_range=(0, 256)):
    # Compute the histogram of the color channels separately
    channel1_hist = np.histogram(img[:,:,0], bins=nbins, range=bins_range)
    channel2_hist = np.histogram(img[:,:,1], bins=nbins, range=bins_range)
    channel3_hist = np.histogram(img[:,:,2], bins=nbins, range=bins_range)
    # Concatenate the histograms into a single feature vector
    hist_features = np.concatenate((channel1_hist[0], channel2_hist[0], channel3_hist[0]))
    # Return the individual histograms, bin_centers and feature vector
    return hist_features

# Define a function to return HOG features and visualization
def get_hog_features(img, orient, pix_per_cell, cell_per_block, 
                        vis=False, feature_vec=True):
    # Call with two outputs if vis==True
    if vis == True:
        features, hog_image = hog(img, orientations=orient, pixels_per_cell=(pix_per_cell, pix_per_cell),
                                  cells_per_block=(cell_per_block, cell_per_block), transform_sqrt=True, 
                                  visualise=vis, feature_vector=feature_vec)
        return features, hog_image
    # Otherwise call with one output
    else:      
        features = hog(img, orientations=orient, pixels_per_cell=(pix_per_cell, pix_per_cell),
                       cells_per_block=(cell_per_block, cell_per_block), transform_sqrt=True, 
                       visualise=vis, feature_vector=feature_vec)
        return features

# Define a function to extract features from a list of images
# Have this function call bin_spatial() and color_hist()
def extract_features(imgs, cspace='RGB', spatial_size=(32, 32),
                        hist_bins=32, hist_range=(0, 256), orient=9, 
                        pix_per_cell=8, cell_per_block=2, hog_channel=0):
    # Create a list to append feature vectors to
    features = []
    # Iterate through the list of images
    for file in imgs:
        if isinstance(file,str):
        # If the file is a filename in string, Read in each one by one
            image = mpimg.imread(file)
        else:
            image = file
        # apply color conversion if other than 'RGB'
        if cspace != 'RGB':
            if cspace == 'HSV':
                feature_image = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
            elif cspace == 'LUV':
                feature_image = cv2.cvtColor(image, cv2.COLOR_RGB2LUV)
            elif cspace == 'HLS':
                feature_image = cv2.cvtColor(image, cv2.COLOR_RGB2HLS)
            elif cspace == 'YUV':
                feature_image = cv2.cvtColor(image, cv2.COLOR_RGB2YUV)
        else: feature_image = np.copy(image)      
        # Apply bin_spatial() to get spatial color features
        spatial_features = bin_spatial(feature_image, size=spatial_size)
        # Apply color_hist() also with a color space option now
        hist_features = color_hist(feature_image, nbins=hist_bins, bins_range=hist_range)
        # Call get_hog_features() with vis=False, feature_vec=True
        hog_features = get_hog_features(feature_image[:,:,hog_channel], orient, 
                        pix_per_cell, cell_per_block, vis=False, feature_vec=True)
        # Append the new feature vector to the features list
        features.append(np.concatenate((spatial_features, hist_features, hog_features)))
    # Return list of feature vectors
    return features



In [3]:
'''find the best color space and hog channel for this task'''
def getModel(params):
    '''return the X_scaler and the Model'''
    print("cspace:",params["cspace"])
    print("hogchannel:",params["hog_channel"])
    cspace=params["cspace"]
    hog_channel=params["hog_channel"]
    car_features = extract_features(cars, cspace=cspace, spatial_size=(32, 32),
                            hist_bins=32, hist_range=(0, 256), orient=9, 
                            pix_per_cell=8, cell_per_block=2, hog_channel=hog_channel)
    notcar_features = extract_features(notcars, cspace=cspace, spatial_size=(32, 32),
                            hist_bins=32, hist_range=(0, 256), orient=9, 
                            pix_per_cell=8, cell_per_block=2, hog_channel=hog_channel)

    # Create an array stack of feature vectors
    X = np.vstack((car_features, notcar_features)).astype(np.float64)                        
    # Fit a per-column scaler
    X_scaler = StandardScaler().fit(X)
    # Apply the scaler to X
    scaled_X = X_scaler.transform(X)

    # Define the labels vector
    y = np.hstack((np.ones(len(car_features)), np.zeros(len(notcar_features))))

    # Split up data into randomized training and test sets
    rand_state = 2
    X_train, X_test, y_train, y_test = train_test_split(
        scaled_X, y, test_size=0.2, random_state=rand_state)

    # train classifier
    # Use a linear SVC 
    svc = LinearSVC()
    # Check the training time for the SVC
    %time svc.fit(X_train, y_train)
    #print(t2-t, 'Seconds to train SVC...')
    # Check the score of the SVC
    print('Test AUC of SVC = ', roc_auc_score(y_test,svc.predict(X_test)))
    print("-"*10)
    return X_scaler,svc

In [None]:
'''extract features and generates training and testing dataset for the first try'''

# Divide up into cars and notcars
cars=glob.glob("vehicles/*/*.png")
notcars=glob.glob("non-vehicles/*/*.png")

cspace="HLS"
orient = 9
pix_per_cell = 8
cell_per_block = 2
hog_channel = 2 # Actually I don't think it is cool to define the vars used inside the function as parameter in global haha

print("start feature extraction")

car_features = extract_features(cars, cspace=cspace, spatial_size=(32, 32),
                        hist_bins=32, hist_range=(0, 256), orient=orient, 
                        pix_per_cell=8, cell_per_block=2, hog_channel=hog_channel)

print("finish feature extraction for cars")

notcar_features = extract_features(notcars, cspace=cspace, spatial_size=(32, 32),
                        hist_bins=32, hist_range=(0, 256), orient=orient, 
                        pix_per_cell=8, cell_per_block=2, hog_channel=hog_channel)

print("finish feature extraction")

# Create an array stack of feature vectors
X = np.vstack((car_features, notcar_features)).astype(np.float64)                        
# Fit a per-column scaler
X_scaler = StandardScaler().fit(X)
# Apply the scaler to X
scaled_X = X_scaler.transform(X)

# Define the labels vector
y = np.hstack((np.ones(len(car_features)), np.zeros(len(notcar_features))))

# Split up data into randomized training and test sets
rand_state = 2
X_train, X_test, y_train, y_test = train_test_split(
    scaled_X, y, test_size=0.2, random_state=rand_state)



In [None]:
X_train.shape,X_test.shape

4932? maybe that's too much features, that's curse of dimensions.

In [None]:
'''first try: fit it with linear svc'''
# train classifier
# Use a linear SVC 
svc = LinearSVC()
# Check the training time for the SVC
t=time.time()
svc.fit(X_train, y_train)
t2 = time.time()
print(t2-t, 'Seconds to train SVC...')
# Check the score of the SVC
print('Train Accuracy of SVC = ', svc.score(X_train, y_train))
print('Test Accuracy of SVC = ', svc.score(X_test, y_test))
# Check the prediction time for a single sample
t=time.time()
prediction = svc.predict(X_test[0].reshape(1, -1))
t2 = time.time()
print(t2-t, 'Seconds to predict with SVC')

In [None]:
'''better evaluation metric: AUC'''
from sklearn.metrics import roc_auc_score

In [None]:
print('Test AUC of SVC = ', roc_auc_score(y_test,svc.predict(X_test)))

In [None]:
# decision tree for feature reduction
from sklearn.tree import DecisionTreeClassifier
dt=DecisionTreeClassifier(random_state=2)

In [None]:
%time dt.fit(X_train,y_train)
print("test accuracy:",dt.score(X_test,y_test))

The performance of decision tree is worse than SVM.
However, I use decision tree for feature selection.

In [None]:
good_features=[] #index of them
for i,imp in enumerate(dt.feature_importances_):
    if imp>0:
        good_features.append(i)
very_good_features=[]
for i in good_features:
    if dt.feature_importances_[i]>np.mean(dt.feature_importances_[good_features]):
        very_good_features.append(i)
len(very_good_features)        

In [None]:
'''try fit the svc with less features'''
def model_with_features(features):
    newsvc=LinearSVC()
    %time newsvc.fit(X_train[:,features],y_train)
    print('Test AUC of SVC = ', roc_auc_score(y_test,newsvc.predict(X_test[:,features])))

In [None]:
model_with_features(good_features)

In [None]:
model_with_features(very_good_features)

This shows that the elimination of features improves the training speeds a lot but decreases the AUC.

In [None]:
for csp in ["HSV","LUV","HLS","YUV"]:
    for hc in range(3):
        try:
            params[],params[]=grid_search(csp,hc)
        except:
            print(csp,hc,"failed,next")
        dontoutput=0

In [None]:
'''try random forest'''

# first get features done
cspace="LUV"
hog_channel=0
car_features = extract_features(cars, cspace=cspace, spatial_size=(32, 32),
                        hist_bins=32, hist_range=(0, 256), orient=9, 
                        pix_per_cell=8, cell_per_block=2, hog_channel=hog_channel)
notcar_features = extract_features(notcars, cspace=cspace, spatial_size=(32, 32),
                        hist_bins=32, hist_range=(0, 256), orient=9, 
                        pix_per_cell=8, cell_per_block=2, hog_channel=hog_channel)

# Create an array stack of feature vectors
X = np.vstack((car_features, notcar_features)).astype(np.float64)                        
# Fit a per-column scaler
X_scaler = StandardScaler().fit(X)
# Apply the scaler to X
scaled_X = X_scaler.transform(X)

# Define the labels vector
y = np.hstack((np.ones(len(car_features)), np.zeros(len(notcar_features))))

# Split up data into randomized training and test sets
rand_state = 2
X_train, X_test, y_train, y_test = train_test_split(
    scaled_X, y, test_size=0.2, random_state=rand_state)



In [None]:
# train classifier
# Use random forest
from sklearn.ensemble import RandomForestClassifier
rf=RandomForestClassifier(100)
# Check the training time for the SVC
%time rf.fit(X_train, y_train)
#print(t2-t, 'Seconds to train SVC...')
# Check the score of the SVC
print('Test AUC of SVC = ', roc_auc_score(y_test,rf.predict(X_test)))

In [None]:
'''grid search on SVC and RF'''
from sklearn.model_selection import GridSearchCV
params_svc={"C":[1,10,100]}
params_rf={"n_estimators":[50,100,250]}
svc_gs=GridSearchCV(LinearSVC(),params_svc)

In [None]:
svc_gs.fit(X_train,y_train)

In [None]:
svc_gs.best_params_,svc_gs.best_score_

Oh, even I forgot to put AUC as scoring metric, but it still shows that C=1 is the best parameter for SVC.
The AUC was 0.983 for SVC.

In [None]:
from sklearn.metrics import make_scorer
auc=make_scorer(roc_auc_score)

In [None]:
rf_gs=GridSearchCV(RandomForestClassifier(),params_rf,scoring=auc)
%timeit rf_gs.fit(X_train,y_train)

In [None]:
rf_gs.best_params_,rf_gs.best_score_

In [None]:
params["X_scaler"],params["clf"]=grid_search("HSV",2)

In [None]:
params = {
    "slides": 8,
    "basic":48,
    "width":100,
}

In [None]:
params["cspace"]="HSV"
params["hog_channel"]=2

In [None]:
def pred(img,params):
    '''pipeline for image predicting'''
    img_features=extract_features([img], cspace=params["cspace"], spatial_size=(32, 32),
                        hist_bins=32, hist_range=(0, 256),hog_channel=params["hog_channel"])
    x = params["X_scaler"].transform(img_features)
    return params["clf"].predict(x)[0]

In [None]:
# test the pred function
test_img=plt.imread("vehicles/GTI_Far/image0000.png")
%time pred(test_img,params)

# Stage 2: Sliding Windows and Detecting
Awesome performance for the model trained!

In [None]:
roadimg=plt.imread("test_images/test1.jpg")
plt.imshow(roadimg)

In [None]:
# Define a function that takes an image,
# start and stop positions in both x and y, 
# window size (x and y dimensions),  
# and overlap fraction (for both x and y)
def slide_window(img, basic_size=32, x_overlap=0.5, num_ver_slides=8,width_basic=50):
    
    '''
    This function is forked from the course and I made some change on it.
    The changes I made are:
    1. The span of image is the bottom half in default
    2. The size of windows grow larger and larger when sliding further 
       from the center of the image
    3. Work as a generator
    
    ''' 
    basic = int((img.shape[0]/2)/(num_ver_slides*basic_size))
    for i in range(1,num_ver_slides):
        # start and end position for this verticle slide
        # update: from 1 because 0 would cause plenty of false positives
        dist_from_centr = ((img.shape[0]/2)/num_ver_slides) * (i+1)
        ey = int((img.shape[0]/2) + dist_from_centr)
        win_size = basic*(i+1) # the windows size grows linearly when sliding out
        sy = int(ey-win_size)
        
        # get the start and width of x
        mid = img.shape[1]//2
        width=width_basic*(i+1)
        
        if width>mid:
            width=mid
        
        x_start=mid-width
        
        
        # slide across x
        nx_pix_per_step = int(win_size*(1-x_overlap)) 
        # OK, is that cool to use python.int, not np.int?
        num_hori_slides = int(2*width/nx_pix_per_step) - 1
        
        for j in range(num_hori_slides):
            sx = x_start+nx_pix_per_step * j
            ex = sx + win_size
            yield (sx,sy),(ex,ey)
            # much less code, right?

In [None]:
def rect(img, start, end, color=(0,0,255), thick=3):
    '''little func for rect drawing'''
    draw_on_image = np.copy(img)
    cv2.rectangle(draw_on_image, start, end, color, thick)
    return draw_on_image

In [None]:
# test the sliding windows
slider = slide_window(roadimg,basic_size=1,num_ver_slides=8,width_basic=150)
drawon = np.copy(roadimg)
for start,end in slider:
    cv2.rectangle(drawon,start,end,(0,0,255),3)
plt.imshow(drawon)

In [None]:
def vehicle_detect(img,params):
    '''detect the vehicle, yield when detected'''
    slider = slide_window(img,basic_size=params["basic"],num_ver_slides=params["slides"],width_basic=params["width"])
    for start,end in slider:
        part = img[start[1]:end[1],start[0]:end[0],:]
        part_resized = cv2.resize(part, (64,64))
        
        if pred(part_resized,params)==1:
            yield start,end
            # I love generator, much cooler than "return"

In [None]:
params["slide"]=8
params["basic"]=1
params["width"]=250

In [None]:
detector = vehicle_detect(roadimg,params=params)
detected = np.copy(roadimg)
for start,end in detector:
    cv2.rectangle(detected, start,end, (0,0,255), 5)
plt.imshow(detected)

Oh no, plenty of false positives.

It shows that AUC is not a good evaluation metric for this task.

I need a better metric: which is just plot and show me the result.

In [None]:
'''new way of gridsearch'''
for cspace in ["HSV","LUV","HLS","YUV"]:
    for hc in range(3):
        params["cspace"]=cspace
        params["hog_channel"]=hc
        try:
            params["X_scaler"],params["clf"]=grid_search()
        except:
            print("failed")
            continue
        detector = vehicle_detect(roadimg,params=params)
        detected = np.copy(roadimg)
        for start,end in detector:
            cv2.rectangle(detected, start,end, (0,0,255), 5)
        plt.imshow(detected)
        plt.show()

Congraduation, the hyperparameters picked this time are:
- cspace: HSV
- hog_channel: 2
- clf: SVC

In [None]:
params["cspace"]="HSV"
params["hog_channel"]=2
params["X_scaler"],params["clf"]=grid_search(params["cspace"],params["hog_channel"])

In [None]:
for img in glob.glob("test_images/*.jpg"):
    roadimg=plt.imread(img)
    detector = vehicle_detect(roadimg,params=params)
    detected = np.copy(roadimg)
    for start,end in detector:
        cv2.rectangle(detected, start,end, (0,0,255), 5)
    plt.imshow(detected)
    plt.show()

# Stage 3: Eliminating False Positives and Duplications

In [None]:
# heatmap for duplication reduction
class LastNFrames(object):
    def __init__(self,n):
        self.queue=[]
        self.n=n
    def join(self,item):
        '''join the queue, and pop the first item if the queue is full'''
        self.queue.append(item)
        if len(self.queue)>self.n:
            self.queue.pop()
    def init_with_default(self,default):
        '''init the queue which fulfilled with default term'''
        self.queue=[default for i in range(self.n)]

In [None]:
def heatmap_valid(predictions,imgshape,lnf,params):
    '''valid the prediction in heatmap, to eliminate the false positives'''
    
    # join the pred
    lnf.join(predictions)
    
    # creates a heatmap
    heatmap=np.zeros(imgshape)
    for frames in lnf.queue:
        for start,end in frames:
            heatmap[start[1]:end[1],start[0]:end[0]]+=1
            
    # validation and yield
    ## FutureImprove: change the "predictions" to "all predictions in frames"
    
    for start,end in predictions:
        part=heatmap[start[1]:end[1],start[0]:end[0]].reshape(-1)
        if part.mean()>=params["thre_lnf"]:
            yield start,end
    
        

In [None]:
def seperated(rect1,rect2):
    '''if not overlap, return True'''
    return rect1[1][0]<rect2[0][0] or rect1[0][0]>rect2[1][0]

In [None]:
def rect_combine(rect1,rect2):
    rects=np.vstack([np.array(rect1),np.array(rect2)])
    return (rects[:,0].min(),rects[:,1].min()),(rects[:,0].max(),rects[:,1].max())

In [None]:
def duplication_eliminate(predictions,plot=False):
    '''recursively reduce all duplications'''
    if plot==True:
        todraw=np.copy(roadimg)
        for start,end in predictions:
            cv2.rectangle(todraw,start,end,(0,0,255),3)
        plt.imshow(todraw)
        plt.show()
    if len(predictions)<2:
        return predictions
    temp = predictions[0]
    t_iter=0
    max_iter=100
    while t_iter<max_iter:
        found = False
        for i in range(1,len(predictions)):
            if not seperated(predictions[i],temp):
                found = True
                temp = rect_combine(temp,predictions[i])
                predictions.remove(predictions[i])
                break
        t_iter+=1
        if not found:
            return [temp]+duplication_eliminate(predictions[1:])      
        

In [None]:
# test the duplication elimination
preds=[(start,end) for start,end in vehicle_detect(roadimg,params)]
todraw=np.copy(roadimg)
todraw2=np.copy(roadimg)
for start,end in preds:
    cv2.rectangle(todraw,start,end,(0,0,255),3)
plt.imshow(todraw)
plt.show()
for start,end in duplication_eliminate(preds,True):
    cv2.rectangle(todraw2,start,end,(0,0,255),3)
plt.imshow(todraw2)
plt.show()

# Stage 4: Test On Videos

In [None]:
# pipeline
def process_image(img):
    global lnf
    detector=vehicle_detect(img,params)
    predictions_raw=[(start,end) for start,end in detector]
    for start,end in predictions_raw:
        cv2.rectangle(img,start,end,(150,150,150),3) # draw red rects
    predictions=[(start,end) for start,end in heatmap_valid(predictions_raw,img.shape[:2],lnf,params)]
    for start,end in duplication_eliminate(predictions):
        cv2.rectangle(img,start,end,(0,0,255),5) # draw blue rects
    return img

In [None]:
# Import everything needed to edit/save/watch video clips
from moviepy.editor import VideoFileClip
from IPython.display import HTML

In [None]:
# trial init
trial_i=10

In [None]:
# params tuning
params["basic"]=1
params["slides"]=8
params["width"]=250

params["n_lnf"]=15
params["thre_lnf"]=1.5 * params["slides"]

In [None]:
# video generation
lnf=LastNFrames(params["n_lnf"])
video_output = 'trial %d %s.mp4'%(trial_i,"a")
clip1 = VideoFileClip("project_video.mp4").subclip(40,50)
white_clip = clip1.fl_image(process_image) #NOTE: this function expects color images!!
%time white_clip.write_videofile(video_output, audio=False)
trial_i+=1

In [None]:
HTML("""
<video width="960" height="540" controls>
  <source src="{0}">
</video>
""".format(video_output))