In [None]:
import cv2
from PIL import Image
import os
import psutil
import time
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from utils.thresholding import *
%matplotlib inline

from pynq import Overlay
import pynq.lib.dma
import numpy as np
from pynq import allocate

In [None]:
ol = Overlay('./lane_detection15.bit') # check your path
#ol.download() # it downloads your bit to FPGA

print("Inspect all the IP names")
ol.ip_dict.keys()
dma_ip = ol.axi_dma_0
print(dma_ip)
dma_ip.register_map
print(ol.axi_dma_0)

In [None]:
WIDTH, HEIGHT = (320, 180)
M1 = allocate(shape=(HEIGHT * WIDTH * 3), dtype=np.int32)
M2 = allocate(shape=(20002), dtype=np.int32)

In [None]:
CONTROL_REGISTER = 0x0
lane_detection_ip = ol.lane_detection_0
lane_detection_ip.write(CONTROL_REGISTER, 0x00)  # Reset
lane_detection_ip.write(CONTROL_REGISTER, 0x81)  # Start
dma0 = ol.axi_dma_0
dma0_send = ol.axi_dma_0.sendchannel
dma0_recv = ol.axi_dma_0.recvchannel

In [None]:
WIDTH = 640
HEIGHT = 480

RESIZED_WIDTH = 320
RESIZED_HEIGHT = 180

y_eval = 190

In [None]:
frame_in_w, frame_in_h = WIDTH, HEIGHT

scale_factor_h = frame_in_w / 1280
scale_factor_v = frame_in_h / 720
offset = 200 * scale_factor_h
src = np.float32([
    [10, 300],
    [630, 300],
    [640, 410],
    [0, 410]
])
dst = np.float32([[offset, 0], [frame_in_w - offset, 0], [frame_in_w - offset, frame_in_h], [offset, frame_in_h]])
M = cv2.getPerspectiveTransform(src,dst)

In [None]:
def get_crop_indices(points, high, low):
    bottom_crop, top_crop = 0, len(points)
    for i in range(len(points)):
        if points[i] >= high:
            top_crop = i
            break
    
    for i in range(len(points) - 1, -1, -1):
        if points[i] <= low:
            bottom_crop = i + 1
            break
    
    return top_crop, bottom_crop

In [None]:
def detect_lane_lines_connected_components(binary_warped):
    # Ensure the input is binary (0 and 255)
    binary = np.uint8(binary_warped * 255) if binary_warped.max() <= 1 else np.uint8(binary_warped)

    # Apply connected components
    num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(binary, connectivity=8)

    # Image dimensions
    height, width = binary.shape
    
    # Prepare blank output for visualization or processing
    output = np.zeros_like(binary)

    # Store points
    leftx, lefty, rightx, righty = [], [], [], []

    for i in range(1, num_labels):  # Label 0 is background
        x, y, w, h, area = stats[i]
        cx, cy = centroids[i]

        # Heuristic filters for likely lane lines
        if area > 500 and h > 15:
            if cx < width // 2:
                output[labels == i] = 127
                coords = np.column_stack(np.where(labels == i))
                for pt in coords:
                    lefty.append(pt[0])
                    leftx.append(pt[1])
            else:
                output[labels == i] = 255
                coords = np.column_stack(np.where(labels == i))
                for pt in coords:
                    righty.append(pt[0])
                    rightx.append(pt[1])

    # Convert lists to numpy arrays
    leftx = np.array(leftx)
    lefty = np.array(lefty)
    rightx = np.array(rightx)
    righty = np.array(righty)

    return leftx, lefty, rightx, righty, output

In [None]:
def compute_lane_heading_angle(left_fit=None, right_fit=None, y_eval=200):
    """
    Computes the average heading angle (in degrees) between the vehicle's forward direction (image y-axis)
    and the tangent direction of the lane lines at the bottom of the image (y = y_eval).
    
    Returns a signed angle:
    - Positive → turn right
    - Negative → turn left
    """
    angles = []

    def angle_from_fit(fit):
        # Derivative of the 2nd-order polynomial at y = y_eval
        dy = 1.0  # pixel change in y
        dx = 2 * fit[0] * y_eval + fit[1]  # derivative at y_eval
        angle_rad = np.arctan(dx)  # slope to angle
        return -np.degrees(angle_rad)

    if left_fit is not None:
        angles.append(angle_from_fit(left_fit))
    if right_fit is not None:
        angles.append(angle_from_fit(right_fit))

    if angles:
        # Average heading angle from both lanes
        return np.mean(angles)
    else:
        return None

In [None]:
expected_angles = []
angle = 0
total_time1 = 0

for i in range(9):
    test_image = cv2.imread(os.path.join('test_images', 'test' + str(i) + '.jpg'))
    t1 = datetime.now()
    b_thresholded = threshold(test_image)
    binary_warped = cv2.warpPerspective(b_thresholded,M, (WIDTH, HEIGHT))
    leftx, lefty, rightx, righty, output = detect_lane_lines_connected_components(binary_warped)
    left_fit, right_fit = [0,0,0], [0,0,0]

    if leftx.size > 0 and lefty.size > 0 and rightx.size > 0 and righty.size > 0:
        # Fit a second order polynomial to each
        left_fit = np.polyfit(lefty, leftx, 2)
        right_fit = np.polyfit(righty, rightx, 2)

        angle = compute_lane_heading_angle(left_fit=left_fit, right_fit=right_fit, y_eval=y_eval_org)
    elif leftx.size > 0 and lefty.size > 0:
        # Fit a second order polynomial to each
        left_fit = np.polyfit(lefty, leftx, 2)
        right_fit = [0,0,0]

        angle = compute_lane_heading_angle(left_fit=left_fit, y_eval=y_eval_org)
    elif rightx.size > 0 and righty.size > 0:
        # Fit a second order polynomial to each
        left_fit = [0,0,0]
        right_fit = np.polyfit(righty, rightx, 2)
        angle = compute_lane_heading_angle(right_fit=right_fit, y_eval=y_eval_org)

    t2 = datetime.now()
    expected_angles.append(angle)
    delta = t2 - t1

    # Accumulate the times for averaging
    total_time1 += delta.microseconds

In [None]:
angles = []
angle = 0
total_time2 = 0

for i in range(9):
    test_image = cv2.imread(os.path.join('test_images', 'test' + str(i) + '.jpg'))
    t1 = datetime.now()
    resized_image = cv2.resize(test_image, dsize=(RESIZED_WIDTH, RESIZED_HEIGHT), interpolation=cv2.INTER_LINEAR)
    np.copyto(M1, resized_image.flatten())

    # DMA and lane detection
    dma0_send.transfer(M1)
    dma0_recv.transfer(M2)
    dma0_send.wait()
    dma0_recv.wait()

    leftx, lefty, rightx, righty = ([], [], [], [])
    left_count, right_count = M2[0], M2[1]
    left_fit, right_fit = [0,0,0], [0,0,0]
    
    leftx = M2[2:left_count + 2]
    lefty = M2[5002: 5002 + left_count]
    rightx = M2[10002:10002 + right_count]
    righty = M2[15002:15002 + right_count]
    
    left_crop_top, left_crop_bottom = get_crop_indices(lefty, 5, 180)
    right_crop_top, right_crop_bottom = get_crop_indices(righty, 5, 180)
    
    lefty, leftx = lefty[left_crop_top:left_crop_bottom], leftx[left_crop_top:left_crop_bottom]
    righty, rightx = righty[right_crop_top:right_crop_bottom], rightx[right_crop_top:right_crop_bottom]
    
    left_count, right_count = len(lefty), len(righty)

    if left_count != 0 and right_count != 0:
        left_fit = np.polyfit(lefty, leftx, 2)
        right_fit = np.polyfit(righty, rightx, 2)
        angle = compute_lane_heading_angle(left_fit=left_fit, right_fit=right_fit, y_eval=y_eval)
    elif right_count != 0:
        right_fit = np.polyfit(righty, rightx, 2)
        angle = compute_lane_heading_angle(right_fit=right_fit, y_eval=y_eval)
    elif left_count != 0:
        left_fit = np.polyfit(lefty, leftx, 2)
        angle = compute_lane_heading_angle(left_fit=left_fit, y_eval=y_eval)
    
    t2 = datetime.now()
    angles.append(angle)
    
    delta = t2 - t1

    # Accumulate the times for averaging
    total_time2 += delta.microseconds

In [None]:
print("PS Time: " + str(total_time1))
print("PL Time: " + str(total_time2))
print("Speedup: " + str(total_time1 / total_time2))

In [None]:
print(angles)
print(expected_angles)

In [None]:
diff = [abs(angles[i] - expected_angles[i]) for i in range(9)]
mean = sum(diff) / 9
max_diff = max(diff)

print(mean)
print(max_diff)