In [None]:
wimport cv2
import numpy as np
from google.colab.patches import cv2_imshow
from google.colab import files
import time
# Upload and read the input video
uploaded = files.upload()
input_video = list(uploaded.keys())[0]

cap = cv2.VideoCapture(input_video)
frame_width = int(cap.get(3)) if cap.isOpened() else None
frame_height = int(cap.get(4)) if cap.isOpened() else None
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('output_video.mp4', fourcc, 20.0, (frame_width, frame_height))

# Initialize previous line variables
prev_left_line = None
prev_right_line = None

# Kalman Filter Setup
kalman = cv2.KalmanFilter(4, 2)  # 4 state variables, 2 measurements
kalman.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32)
kalman.transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32)
kalman.processNoiseCov = np.eye(4, dtype=np.float32) * 0.03

# Initialize EMA parameters
alpha = 0.6  # Smoothing factor (adjust based on performance)
ema_offset = 0  # Initial EMA offset

# Variables for metrics
lane_consistencies = []
offset_differences = []
frame_times = []
prev_hybrid_offset = None

# Function to average the slopes and intercepts of lane lines
def average_slope_intercept(lines):
    global prev_left_line, prev_right_line
    left_fit = []
    right_fit = []

    if lines is None:
        return None, None  # Return None if no lines

    for line in lines:
        for x1, y1, x2, y2 in line:
            if x2 == x1:  # Skip vertical lines
                continue
            slope = (y2 - y1) / (x2 - x1)
            intercept = y1 - (slope * x1)
            if abs(slope) < 0.5 or abs(slope) > 2:  # Ignore extreme slopes
                continue
            if slope < 0:  # Negative slope -> Left lane
                left_fit.append((slope, intercept))
            else:  # Positive slope -> Right lane
                right_fit.append((slope, intercept))

    left_fit_avg = np.mean(left_fit, axis=0) if len(left_fit) > 0 else None
    right_fit_avg = np.mean(right_fit, axis=0) if len(right_fit) > 0 else None

    return left_fit_avg, right_fit_avg

def make_line_points(y1, y2, line_fit):
    if line_fit is None:
        return None

    slope, intercept = line_fit

    if slope == 0 or np.isinf(slope):
        return None
    try:
        x1 = int((y1 - intercept) / slope)
        x2 = int((y2 - intercept) / slope)
    except OverflowError:
        return None

    return [x1, y1, x2, y2]

def region_of_interest(frame):
    height, width = frame.shape[:2]
    polygon = np.array([[
        #for cropped input
         (40, 480),  # Bottom-left
        (300, 288),  # Top-left
        (500, 288),  # Top-right
        (848, 400)
      ]], np.int32)
    mask = np.zeros_like(frame)
    cv2.fillPoly(mask, polygon, 255)
    masked_image = cv2.bitwise_and(frame, mask)
    return masked_image

def detect_lanes(frame, prev_left_line, prev_right_line):
    hls = cv2.cvtColor(frame, cv2.COLOR_BGR2HLS)
    lower_white = np.array([0, 200, 0], dtype=np.uint8)
    upper_white = np.array([255, 255, 255], dtype=np.uint8)
    mask_white = cv2.inRange(hls, lower_white, upper_white)

    mask = cv2.GaussianBlur(mask_white, (5, 5), 0)
    edges = cv2.Canny(mask, 50, 150)
    masked_edges = region_of_interest(edges)

    lines = cv2.HoughLinesP(masked_edges, 1, np.pi/180, 50, minLineLength=20, maxLineGap=150)
    left_fit, right_fit = average_slope_intercept(lines)

    y1 = frame.shape[0]
    y2 = int(y1 * 0.7)
    left_line = make_line_points(y1, y2, left_fit) if left_fit is not None else prev_left_line
    right_line = make_line_points(y1, y2, right_fit) if right_fit is not None else prev_right_line

    if left_line is None:
        left_line = prev_left_line
    if right_line is None:
        right_line = prev_right_line

    line_image = np.zeros_like(frame)

    if left_line is not None and right_line is not None:
        left_points = [(left_line[0], left_line[1]), (left_line[2], left_line[3])]
        right_points = [(right_line[0], right_line[1]), (right_line[2], right_line[3])]
        points = np.array(left_points + right_points[::-1], dtype=np.int32)
        cv2.fillPoly(line_image, [points], (0, 255, 0))

    if left_line is not None:
        cv2.line(line_image, (left_line[0], left_line[1]), (left_line[2], left_line[3]), (255, 0, 0), 10)
    if right_line is not None:
        cv2.line(line_image, (right_line[0], right_line[1]), (right_line[2], right_line[3]), (255, 0, 0), 10)

    final_frame = cv2.addWeighted(frame, 0.8, line_image, 0.5, 1)
    return final_frame, left_line, right_line

def calculate_offset(frame_width, left_line, right_line):
    if left_line is not None and right_line is not None:
        lane_center = (left_line[2] + right_line[2]) // 2
        frame_center = frame_width // 2
        offset = lane_center - frame_center
    else:
        offset = 0
    return offset

# Additional function to calculate lane width
def calculate_lane_width(left_line, right_line):
    if left_line is not None and right_line is not None:
        lane_width = abs(left_line[2] - right_line[2])  # Width at the bottom of the frame
    else:
        lane_width = 0  # No width if one lane line is missing
    return lane_width
def calculate_consistency(prev_line, curr_line):
    if prev_line is None or curr_line is None:
        return 0
    prev_slope = (prev_line[3] - prev_line[1]) / (prev_line[2] - prev_line[0])
    curr_slope = (curr_line[3] - curr_line[1]) / (curr_line[2] - curr_line[0])
    slope_diff = abs(curr_slope - prev_slope)
    return 1 - min(slope_diff / 10, 1)

def calculate_heading_angle(left_line, right_line):
    if left_line is not None and right_line is not None:
        left_slope = (left_line[3] - left_line[1]) / (left_line[2] - left_line[0])
        right_slope = (right_line[3] - right_line[1]) / (right_line[2] - right_line[0])
        average_slope = (left_slope + right_slope) / 2
        heading_angle = np.arctan(average_slope) * (180 / np.pi)  # Convert to degrees
    else:
        heading_angle = 0  # No angle if one lane line is missing
    return heading_angle

def calculate_ema_offset(measured_offset, previous_ema_offset):
    return alpha * measured_offset + (1 - alpha) * previous_ema_offset

def detect_lanes_and_estimate_offset(frame, prev_left_line, prev_right_line):
    global ema_offset
    lane_frame, left_line, right_line = detect_lanes(frame, prev_left_line, prev_right_line)

    frame_width = frame.shape[1]
    measured_offset = calculate_offset(frame_width, left_line, right_line)

    prediction = kalman.predict()
    estimated_offset = prediction[0][0]

    # Calculate additional parameters
    lane_width = calculate_lane_width(left_line, right_line)
    heading_angle = calculate_heading_angle(left_line, right_line)

    # Measurement correction
    measurement = np.array([[np.float32(measured_offset)], [0]], np.float32)
    kalman.correct(measurement)

    # Hybrid EMA and Kalman Optimization: Apply EMA on Kalman predictions
    ema_offset = calculate_ema_offset(measured_offset, ema_offset)
    hybrid_offset = (estimated_offset + ema_offset) / 2  # Hybrid approach for smoother tracking
    prev_hybrid_offset = hybrid_offset
    cv2.putText(lane_frame, f"EMA Offset: {int(ema_offset)} px", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, cv2.LINE_AA)
    cv2.putText(lane_frame, f"Kalman Offset: {int(estimated_offset)} px", (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, cv2.LINE_AA)
    cv2.putText(lane_frame, f"Hybrid Offset: {int(hybrid_offset)} px", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, cv2.LINE_AA)
    cv2.putText(lane_frame, f"Lane Width: {int(lane_width)} px", (10, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255),1)
    cv2.putText(lane_frame, f"Heading Angle: {heading_angle:.2f} degrees", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

    return lane_frame, left_line, right_line, hybrid_offset, lane_width, heading_angle
frame_count = 0
start_time = time.time()
prev_left_line = None
prev_right_line = None
prev_hybrid_offset = None
# Process the video

# Process the video
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_count += 1
    frame_start_time = time.time()

    # Detect lanes and calculate parameters
    lane_frame, left_line, right_line, hybrid_offset, lane_width, heading_angle = detect_lanes_and_estimate_offset(
        frame, prev_left_line, prev_right_line
    )

    # Update previous lines for consistency calculation
    if prev_left_line is not None and prev_right_line is not None:
        consistency = calculate_consistency(prev_left_line, left_line)
        lane_consistencies.append(consistency)

    if prev_hybrid_offset is not None:
        offset_diff = abs(prev_hybrid_offset - hybrid_offset)
        offset_differences.append(offset_diff)

    prev_left_line = left_line
    prev_right_line = right_line
    prev_hybrid_offset = hybrid_offset

    out.write(lane_frame)

    # Measure frame processing time
    frame_end_time = time.time()
    frame_times.append(frame_end_time - frame_start_time)

# Release resources
cap.release()
out.release()

# Calculate metrics
average_consistency = np.mean(lane_consistencies) * 100 if lane_consistencies else 0
average_offset_difference = np.mean(offset_differences) if offset_differences else 0
fps = frame_count / sum(frame_times) if frame_times else 0

# Print results
print("Lane detection and hybrid offset estimation complete. Video saved as output_video.mp4.")
print(f"Lane Detection Consistency: {average_consistency:.2f}%")
print(f"Average Offset Stability (Lower is better): {average_offset_difference:.2f} px")
print(f"Average FPS: {fps:.2f}")

Saving input.mp4 to input.mp4
Lane detection and hybrid offset estimation complete. Video saved as output_video.mp4.
Lane Detection Consistency: 0.00%
Average Offset Stability (Lower is better): 0.00 px
Average FPS: 77.07
