In [1]:
import cv2
import numpy as np
import math

# Mask White Yellow

In [2]:
def convert_hls(img):
    return cv2.cvtColor(img, cv2.COLOR_BGR2HLS)

In [3]:
def mask_white_yellow(image):
    converted = convert_hls(image)
    lower = np.uint8([  0, 200,   0])
    upper = np.uint8([255, 255, 255])
    white_mask = cv2.inRange(converted, lower, upper)
    lower = np.uint8([ 10,   0, 100])
    upper = np.uint8([ 40, 255, 255])
    yellow_mask = cv2.inRange(converted, lower, upper)
    mask = cv2.bitwise_or(white_mask, yellow_mask)
    whiteYellowImage = cv2.bitwise_and(image, image, mask = mask)
    return whiteYellowImage

# BGR to Gray

In [4]:
def grayscale(img):
    return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

# Gaussian Blur

In [5]:
def gaussian_blur(img, kernel_size):
    return cv2.GaussianBlur(img, (kernel_size, kernel_size), 0)

# Canny Edge Detection

In [6]:
def canny(img, low_threshold, high_threshold):
    return cv2.Canny(img, low_threshold, high_threshold)

# ROI : 관심영역 추출

In [7]:
def region_of_interest(img, vertices):
    mask = np.zeros_like(img)   
    
    if len(img.shape) > 2:
        channel_count = img.shape[2]  # i.e. 3 or 4 depending on your image
        ignore_mask_color = (255,) * channel_count
    else:
        ignore_mask_color = 255
            
    cv2.fillPoly(mask, vertices, ignore_mask_color)
        
    masked_image = cv2.bitwise_and(img, mask)
    return masked_image

In [19]:
def weighted_img(img, initial_img, a=0.8, b=1., c=0.):
    return cv2.addWeighted(initial_img, a, img, b, c)

def draw_lines(img, line, color=[0, 0, 255], thickness=10): # 선 그리기
    #for line in lines:
    for x1,y1,x2,y2 in line:
        cv2.line(img, (x1, y1), (x2, y2), color, thickness)
            
def average_lane(lane_data):
    x1s = []
    y1s = []
    x2s = []
    y2s = []
    for data in lane_data:
        x1s.append(data[0][0])
        y1s.append(data[0][1])
        x2s.append(data[0][2])
        y2s.append(data[0][3])
    
    if np.isnan(np.mean(x1s)):
        x1 = np.nan_to_num(np.mean(x1s))
    else:
        x1 = np.mean(x1s)
    if np.isnan(np.mean(x2s)):
        x2 = np.nan_to_num(np.mean(x2s))
    else:
        x2 = np.mean(x2s)
    if np.isnan(np.mean(y1s)):
        y1 = np.nan_to_num(np.mean(y1s))
    else:
        y1 = np.mean(y1s)
    if np.isnan(np.mean(y2s)):
        y2 = np.nan_to_num(np.mean(y2s))
    else:
        y2 = np.mean(y2s)
    
    return int(x1), int(y1), int(x2), int(y2)

In [20]:
def perspective_transform(img,src,dst):
    """
    Execute perspective transform
    """
    img_size = (img.shape[1], img.shape[0])
    m = cv2.getPerspectiveTransform(src, dst)
    m_inv = cv2.getPerspectiveTransform(dst, src)

    warped = cv2.warpPerspective(img, m, img_size, flags=cv2.INTER_LINEAR)
    unwarped = cv2.warpPerspective(warped, m_inv, (warped.shape[1], warped.shape[0]), flags=cv2.INTER_LINEAR)  # DEBUG

    return warped, unwarped, m, m_inv

# Kalman

In [21]:
import cv2
import numpy as np
from scipy.linalg import block_diag


class LaneTracker:
    def __init__(self, n_lanes, proc_noise_scale, meas_noise_scale, process_cov_parallel=0, proc_noise_type='white'):
        self.n_lanes = n_lanes
        self.meas_size = 4 * self.n_lanes
        self.state_size = self.meas_size * 2
        self.contr_size = 0

        self.kf = cv2.KalmanFilter(self.state_size, self.meas_size, self.contr_size)
        self.kf.transitionMatrix = np.eye(self.state_size, dtype=np.float32)
        self.kf.measurementMatrix = np.zeros((self.meas_size, self.state_size), np.float32)
        for i in range(self.meas_size):
            self.kf.measurementMatrix[i, i*2] = 1

        if proc_noise_type == 'white':
            block = np.matrix([[0.25, 0.5],
                               [0.5, 1.]], dtype=np.float32)
            self.kf.processNoiseCov = block_diag(*([block] * self.meas_size)) * proc_noise_scale
        if proc_noise_type == 'identity':
            self.kf.processNoiseCov = np.eye(self.state_size, dtype=np.float32) * proc_noise_scale
        for i in range(0, self.meas_size, 2):
            for j in range(1, self.n_lanes):
                self.kf.processNoiseCov[i, i+(j*8)] = process_cov_parallel
                self.kf.processNoiseCov[i+(j*8), i] = process_cov_parallel

        self.kf.measurementNoiseCov = np.eye(self.meas_size, dtype=np.float32) * meas_noise_scale

        self.kf.errorCovPre = np.eye(self.state_size)

        self.meas = np.zeros((self.meas_size, 1), np.float32)
        self.state = np.zeros((self.state_size, 1), np.float32)

        self.first_detected = False

    def _update_dt(self, dt):
        for i in range(0, self.state_size, 2):
            self.kf.transitionMatrix[i, i+1] = dt

    def _first_detect(self, lanes):
        for l, i in zip(lanes, range(0, self.state_size, 8)):
            self.state[i:i+8:2, 0] = l
        self.kf.statePost = self.state
        self.first_detected = True

    def update(self, lanes):
        if self.first_detected:
            for l, i in zip(lanes, range(0, self.meas_size, 4)):
                if l is not None:
                    self.meas[i:i+4, 0] = l
            self.kf.correct(self.meas)
        else:
            if lanes.count is not None:
                self._first_detect(lanes)

    def predict(self, dt):
        if self.first_detected:
            self._update_dt(dt)
            state = self.kf.predict()
            lanes = []
            for i in range(0, len(state), 8):
                lanes.append((state[i], state[i+2], state[i+4], state[i+6]))
            return lanes
        else:
            return None

In [27]:
#lt = LaneTracker(2, 0.1, 500)
ticks = 0

capture = cv2.VideoCapture("../test/outside_clockwise.avi")
#capture = cv2.VideoCapture(0)

while True:    
    ret, img = capture.read()
    
    img_w = 720#img.shape[0]
    img_h = 380#img.shape[1]
    img = cv2.resize(img,(img_w,img_h))

    ori_img = img
    
    src = np.float32([[img_w*0.2, img_h*0.5],[0, img_h],[img_w, img_h*0.5], [img_w,img_h]])
    dst = np.float32([[0, img_h*0.5],[0, img_h],[img_w, img_h*0.5], [img_w,img_h]])
    
    img = mask_white_yellow(img)
    img = grayscale(img)
    img = gaussian_blur(img, 5)
    img = canny(img,40,80)
    
    warped, unwarped, m, m_inv = perspective_transform(ori_img,src,dst)
    
    yTopMask = img_h*0.55
    
    pts = np.array([[0, img_h], [img_w*0.2, img_h*0.6], [img_w*0.8, img_h*0.6], [img_w,img_h]])
    #pts = [[0, img_h],[img_w, 0],[img_w, 0],[img_w, img_h]]
    vertices = np.array(pts, np.int32)
    
    img = region_of_interest(img, [vertices])
    
    #houghLines = cv2.HoughLinesP(img, rho, theta, threshold, np.array([]), minLineLength=min_line_length, maxLineGap=max_line_gap)
    hough = cv2.HoughLinesP(img, 2, np.pi / 180, 100, np.array([]), minLineLength = 100, maxLineGap = 50)
    
    if hough is not None:
        line_arr = np.squeeze(hough,axis=1)
        
        slope_degree = (np.arctan2(line_arr[:,1] - line_arr[:,3], line_arr[:,0] - line_arr[:,2]) * 180) / np.pi

        # 수평 기울기 제한
        line_arr = line_arr[np.abs(slope_degree)<160]
        slope_degree = slope_degree[np.abs(slope_degree)<160]
        # 수직 기울기 제한
        line_arr = line_arr[np.abs(slope_degree)>95]
        slope_degree = slope_degree[np.abs(slope_degree)>95]
        # 필터링된 직선 버리기
        L_lines, R_lines = line_arr[(slope_degree>0),:], line_arr[(slope_degree<0),:]
        temp = np.zeros((img.shape[0], img.shape[1], 3), dtype=np.uint8)
        L_lines, R_lines = L_lines[:,None], R_lines[:,None]

        foundLinesImage = np.zeros((img_h, img_w, 3), dtype=np.uint8)
        #foundLinesImage = np.zeros((img_h, img_w), dtype=np.uint8)
        
        if L_lines is not None:
            L_line = np.array([average_lane(L_lines)])
            #draw_lines(foundLinesImage,L_line)
        if R_lines is not None:
            R_line = np.array([average_lane(R_lines)])
            #draw_lines(foundLinesImage,R_line)
        '''
        precTick = ticks
        ticks = cv2.getTickCount()
        dt = (ticks - precTick) / cv2.getTickFrequency()
        
        predicted = lt.predict(dt)
        
        if predicted is not None:
            cv2.line(foundLinesImage, (predicted[0][0], predicted[0][1]), (predicted[0][2], predicted[0][3]), (0, 0, 255), 5)
            cv2.line(foundLinesImage, (predicted[1][0], predicted[1][1]), (predicted[1][2], predicted[1][3]), (0, 0, 255), 5)

        lt.update([L_line,R_line])
        '''
        
        origWithFoundLanes = weighted_img(foundLinesImage,ori_img)
        
    else:
        origWithFoundLanes = ori_img
    
    cv2.imshow('image',origWithFoundLanes)
    cv2.imshow('warp',warped)
    cv2.imshow('unwarp',unwarped)
    
    if cv2.waitKey(33) > 0: break

capture.release()
cv2.destroyAllWindows()