In [None]:
import cv2
import numpy as np
import math
from sklearn.linear_model import RANSACRegressor


class LaneDetection:
    def __init__(self):
        pass

    def drawLines(self, img, lines):
        if isinstance(lines, tuple):
            x1, y1, x2, y2 = lines
            cv2.line(img, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
            return None

        for line in lines:
            x1, y1, x2, y2 = line[0]
            cv2.line(img, (x1, y1), (x2, y2), (0, 255, 0), 2)

    def preprocess_image(self, inpImage):
        # Apply HLS color filtering to filter out white lane lines
        hls = cv2.cvtColor(inpImage, cv2.COLOR_BGR2HLS)
        lower_white = np.array([130, 100, 10])
        upper_white = np.array([255, 255, 255])
        mask = cv2.inRange(inpImage, lower_white, upper_white)
        hls_result = cv2.bitwise_and(inpImage, inpImage, mask = mask)

        # Convert image to grayscale, apply threshold, blur & extract edges
        gray = cv2.cvtColor(hls_result, cv2.COLOR_BGR2GRAY)
        ret, thresh = cv2.threshold(gray, 120, 255, cv2.THRESH_BINARY)
        blur = cv2.GaussianBlur(thresh,(5, 5), 11)
        equalized_image = cv2.equalizeHist(blur)
        canny = cv2.Canny(equalized_image, 40, 60)
        return canny


    # Depending on the camera angle, modify vertices
    def region_of_interest(self, img):
        mask = np.zeros_like(img)
        imshape=img.shape
        vertices = np.array([[(200,imshape[0]),(200, imshape[0]//3), (imshape[1], imshape[0]//4*3), (imshape[1],imshape[0])]], dtype=np.int32)
        cv2.fillPoly(mask, vertices, 255)
        masked_image = cv2.bitwise_and(img, mask)
        return masked_image


    def extend_line(self, x1, y1, x2, y2, length):
        # Compose a vector for the line segment
        vx = x2 - x1
        vy = y2 - y1

        # Normalize vector
        mag = np.sqrt(vx**2 + vy**2)
        vx = vx / mag
        vy = vy / mag

        # Extend lines
        nx1 = int(x1 - vx * length)
        ny1 = int(y1 - vy * length)
        nx2 = int(x2 + vx * length)
        ny2 = int(y2 + vy * length)

        return nx1, ny1, nx2, ny2


    # Depending on the camera angle; adjust the angle
    def detect_lines_with_hough_transform(self, edges):
        lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=30, minLineLength=10, maxLineGap=50)

        extended_close_lines = []  # list for lines with 50-70 degree
        extended_far_lines = []    # list for lines with 10-25 degree

        if lines is not None:
            for line in lines:
                for x1, y1, x2, y2 in line:
                    # degree -> radian
                    angle = np.arctan2(y2 - y1, x2 - x1) * 180.0 / np.pi

                    if angle < 0:
                        angle += 180
                    if 50 <= angle <= 70:
                        x1, y1, x2, y2 = self.extend_line(x1, y1, x2, y2, length=1000) # 원하는 길이로 연장
                        extended_close_lines.append([x1, y1, x2, y2])
                    elif 10 <= angle <= 25:
                        x1, y1, x2, y2 = self.extend_line(x1, y1, x2, y2, length=1000) # 원하는 길이로 연장
                        extended_far_lines.append([x1, y1, x2, y2])

            close_lines = np.array(extended_close_lines).reshape((-1,1,4))
            far_lines = np.array(extended_far_lines).reshape((-1,1,4))

        return close_lines, far_lines

    # Calculate the intersection of two line segment clusters
    def find_vanishing_point(self, close_lines_detected, far_lines_detected):
        intersections = []
        for close_line in close_lines_detected:
            for far_line in far_lines_detected:
                x1, y1, x2, y2 = close_line[0]
                x3, y3, x4, y4 = far_line[0]

                # Intersection
                denominator = ((x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4) + 1e-6)
                px = ((x1 * y2 - y1 * x2) * (x3 - x4) - (x1 - x2) * (x3 * y4 - y3 * x4)) / denominator
                py = ((x1 * y2 - y1 * x2) * (y3 - y4) - (y1 - y2) * (x3 * y4 - y3 * x4)) / denominator

                # Filtering to prevent unpredictable large values or errors due to the denominator being close to zero
                if not (np.isinf(px) or np.isinf(py)):
                    intersections.append((px, py))

        #  Calculate the most common vanishing point based on the average location of all intersections
        if intersections:
            vp_x, vp_y = np.mean(intersections, axis=0)
            return int(vp_x), int(vp_y)
        else:
            return None

    # Calculate the distance to filter out lines that stray too far from the vanishing point
    def remove_noise_lines(self, lines, vanishing_point, threshold=10):
        valid_lines = []
        for line in lines:
            x1, y1, x2, y2 = line[0]

            distance = np.abs((vanishing_point[0] - x1) * (y2 - y1) - (vanishing_point[1] - y1) * (x2 - x1))
            distance = distance / np.sqrt((y2 - y1)**2 + (x2 - x1)**2)

            # Distance threshold
            if distance <= threshold:
                valid_lines.append(line)

        return np.array(valid_lines)


    # Extract start and end points of segments and convert to point data for RANSAC
    def estimate_lane_with_ransac(self, cleaned_lines):
        points = np.array([(x1, y1) for line in cleaned_lines for x1, y1, x2, y2 in [line[0]]] + \
                          [(x2, y2) for line in cleaned_lines for x1, y1, x2, y2 in [line[0]]])

        X = points[:,0].reshape(-1, 1)  # transform 2D array
        y = points[:,1]

        ransac = RANSACRegressor()
        ransac.fit(X, y)

        # Predict start and end point from the RANSAC model
        line_x = np.array([[np.min(X)], [np.max(X)]])
        line_y_ransac = ransac.predict(line_x)

        return (line_x[0][0], line_y_ransac[0], line_x[1][0], line_y_ransac[1])

class VehicleDetection():
  def __init__(self):
    pass

  # The complement of the image
  def preprocessing(self, img):
    img_enhanced = cv2.equalizeHist(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY))
    img_complement = cv2.bitwise_not(img_enhanced) # the complement of the image
    thresh, img_binary = cv2.threshold(img_complement, 252, 255, cv2.THRESH_BINARY)

    return img_enhanced, img_complement, img_binary

  # Initialize masks, draw lines and fill polygons for lane detection
  def masking(self, img, pts):
    height, width = img.shape[:2]

    # Initialize binary masks with zeros (black image)
    lane_mask = np.zeros((height, width), dtype=np.uint8)

    # Draw the lines for close and far lanes on the binary mask
    cv2.line(lane_mask, pts[0], pts[1], 255, thickness=5)
    cv2.line(lane_mask, pts[0], pts[2], 255, thickness=5)

    cv2.fillPoly(lane_mask, [pts], 255)

    vehicles_binary = cv2.bitwise_and(img, img, mask=lane_mask)

    # Find contours in vehicles_binary which should now only contain vehicles within the lane masks
    contours, _ = cv2.findContours(vehicles_binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)


    return contours

  # If the cntr value meets certain conditions, add a red tint to the entire image
  def finding(self, img, contours):
    # Find Maximal Area
    max_area = 0
    max_contour = None
    for contour in contours:
        area = cv2.contourArea(contour)
        if area > max_area:
            max_area = area
            max_contour = contour

    MIN_AREA = 100
    cntr = []
    if max_contour is not None and max_area > MIN_AREA:
        x, y, w, h = cv2.boundingRect(max_contour)
        cntr = [x + w // 2, y + h]
        cv2.rectangle(img, (x, y), (x + w, y + h), (0, 255, 0), 2)
        cv2.circle(img, tuple(cntr), 5, (0, 0, 255), 2)

        # When cntr satisfies a speific condition, an redish image warns a driver.
        if cntr[0] >= 350 and cntr[1] >= 150:
          # Create a red-tinted overlay
          red_tint = np.full(img.shape, (0, 0, 255), dtype=np.uint8)

          # Apply the red tint overlay over the original image with a certain opacity
          opacity = 0.5
          cv2.addWeighted(red_tint, opacity, img, 1 - opacity, 0, img)

In [None]:
import cv2

# Video input from a file or webcam
video_input_path = '/content/sample_data/left_lane.mkv'
video_output_path = '/content/sample_data/output_video.mp4'

cap = cv2.VideoCapture(video_input_path)

frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)

# Generate VideoWriter object to save file
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(video_output_path, fourcc, fps, (frame_width, frame_height))

lane_detector = LaneDetection()
vehicle_detector = VehicleDetection()

prev_far_lines_detected = None
prev_close_lines_detected = None

# Read frames in a loop from the video
while cap.isOpened():
    ret, frame = cap.read()

    # 성공적으로 프레임을 읽었다면
    if ret:
        # Preprocessing for lines
        preprocessed_edges = lane_detector.preprocess_image(frame)

        # RoI
        RoI_image = lane_detector.region_of_interest(preprocessed_edges)

        # Hough transform for lines detection 
        close_lines_detected, far_lines_detected= lane_detector.detect_lines_with_hough_transform(RoI_image)

        # Use the previous vanishing point and lines if no current vanishing point is found
        vanishing_point = lane_detector.find_vanishing_point(close_lines_detected, far_lines_detected)
        
        if vanishing_point is None and prev_vanishing_point is not None:
            vanishing_point = prev_vanishing_point
            far_lines_detected = prev_far_lines_detected
            close_lines_detected = prev_close_lines_detected
            print("Previous vanishing point and lines used.")
            
        prev_vanishing_point = vanishing_point
        prev_far_lines_detected = far_lines_detected
        prev_close_lines_detected = close_lines_detected

        #  Noise removal
        cleaned_close_lines = lane_detector.remove_noise_lines(close_lines_detected, vanishing_point)
        # print(far_lines_detected)
        cleaned_far_lines = lane_detector.remove_noise_lines(far_lines_detected, vanishing_point)

        # Estimation lanes
        try:
          expected_close_lane = lane_detector.estimate_lane_with_ransac(cleaned_close_lines)
        except:
          print(f'cleaned_close_lines {cleaned_close_lines}')
        try:
          expected_far_lane = lane_detector.estimate_lane_with_ransac(cleaned_far_lines)
        except:
          print(f'cleaned_far_lines {cleaned_far_lines}')

        # Preprocessing for vehicle
        img_enhanced, img_complement, img_binary = vehicle_detector.preprocessing(frame)

        polypnts = np.array([(int(vanishing_point[0]), int(vanishing_point[1])), (int(expected_close_lane[2]), int(expected_close_lane[3])),
                            (int(expected_far_lane[2]), int(expected_far_lane[3])), (int(vanishing_point[0]-1), int(vanishing_point[1]))], dtype=np.int32)

        # Vehicle detection
        contours = vehicle_detector.masking(img_binary, polypnts)

        # Apply the red tint overlay over the original image with a certain opacity
        vehicle_detector.finding(frame, contours)

        # Save frame
        out.write(frame)
        
        if vanishing_point:
            cv2.circle(frame, vanishing_point, 5, (0, 0, 255), 2)
        else:
            print("Not found vanishing point")
        
        # Show result frame (optional)
        # cleaned_lines = frame
        # lane_detector.drawLines(cleaned_lines, far_lines_detected)
        # cv2.imshow("cleaned_lines", cleaned_lines)
        # cv2.imshow("expected_lanes", expected_lanes)
        # lane_detector.drawLines(frame, expected_close_lane)
        # lane_detector.drawLines(frame, expected_far_lane)
        # cv2.imshow(frame)
        

        # Stop loop if pressing 'q' key
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    else:
        break

cap.release()
out.release()
cv2.destroyAllWindows()