In [1]:
import time
import os

import cv2
import numpy as np
from scipy.spatial import distance

import matplotlib
import matplotlib.pyplot as plt
from matplotlib.widgets import Button
plt.ion()  # Turn on interactive mode
matplotlib.use('TkAgg')  # Example of setting a backend

In [2]:
def warp_img(img):
    # https://nikolasent.github.io/opencv/2017/05/07/Bird's-Eye-View-Transformation.html
    IMAGE_H = img.shape[0]
    IMAGE_W = img.shape[1]
    src = np.float32(
        [[0, IMAGE_H], [IMAGE_W, IMAGE_H], [0, IMAGE_H // 10], [IMAGE_W, IMAGE_H // 10]]
    )
    dst = np.float32(
        [[IMAGE_W // 2.8, IMAGE_H], [IMAGE_W // 1.8, IMAGE_H], [0, 0], [IMAGE_W, 0]]
    )
    img = img[int(IMAGE_H // 2):IMAGE_H, :]  # Apply np slicing for ROI crop
    M = cv2.getPerspectiveTransform(src, dst)  # The transformation matrix
    img = cv2.warpPerspective(img, M, (IMAGE_W, IMAGE_H))  # Image warping
    img = img[
        int(IMAGE_H // 10) : int(IMAGE_H // 1.3),
        int(IMAGE_W // 3) : int(IMAGE_W // 1.7),
    ]
    return img

def warp_img2(img):
    IMAGE_H = img.shape[0]
    IMAGE_W = img.shape[1]
    new_h = 640
    new_w = 300
    src = np.float32(
        [
            [0, IMAGE_H], 
            [IMAGE_W , IMAGE_H], 
            [int(IMAGE_W // 2.2), int(IMAGE_H // 1.7)], 
            [int(IMAGE_W // 1.8), int(IMAGE_H // 1.7)],
        ]
    )
    dst = np.float32(
        [
            [0, new_h], 
            [new_w, new_h], 
            [0, 0], 
            [new_w, 0],
        ]
    )

    M = cv2.getPerspectiveTransform(src, dst)  # The transformation matrix
    img = cv2.warpPerspective(img, M, (new_w, new_h))  # Image warping
    return img

def gaussian_blur(img, kernel_size=(3, 3)):
    return cv2.GaussianBlur(img, kernel_size, 0)

def canny_edge(img, low_threshold=100, high_threshold=200):
    return cv2.Canny(img, low_threshold, high_threshold)

def harris_corner(img):
    # img = np.float32(img)
    corners = cv2.cornerHarris(img, blockSize=2, ksize=3, k=0.04)
    # corners = cv2.dilate(corners, None)
    # img[corners > 0.01 * corners.max()] = 255
    return corners

def dilation(img, kernel_size=(3, 3)):
    return cv2.dilate(img, np.ones(kernel_size, np.uint8))

def clahe(img, kernel_size=(3, 3)):
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    clahe_image = clahe.apply(img)
    return clahe_image

def detect_hough_lines(img):
    return cv2.HoughLinesP(
        img, rho=1, theta=np.pi / 180, threshold=20, minLineLength=5, maxLineGap=10
    )


def draw_hough_lines(img, lines):
    for line in lines:
        for x1, y1, x2, y2 in line:
            cv2.line(img, (x1, y1), (x2, y2), 255, 3)
    return img


def detect_correct_mark(img):
    contours, _ = cv2.findContours(img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    rects = [cv2.minAreaRect(contour) for contour in contours]

    # correct_rects = [rect for rect in rects if min(rect[1][1], rect[1][0]) < img.shape[1] // 2]
    # print(f"# of correct rects: {len(correct_rects)}")

    # d1 = rect[1][0]
    # d2 = rect[1][1]
    # width = min(rect[1][1], rect[1][0])
    # height = max(rect[1][1], rect[1][0])

    center_bottom = (img.shape[1] // 2, img.shape[0] // 1.2)
    distances = [distance.euclidean(rect[0], center_bottom) for rect in rects]
    btm_cntr_rect_idx = np.argmin(distances)

    return rects, btm_cntr_rect_idx


def draw_rectangle_features(img, rect, idx):
    width = min(rect[1][1], rect[1][0])
    box = np.intp(cv2.boxPoints(rect))

    top_left = box[np.argmax(box[:, 1])] - np.array([0, 50])
    bottom_left = box[np.argmin(box[:, 1])] - np.array([0, 50])

    cv2.drawContours(img, [box], 0, (255, 0, 0), 2)

    cv2.circle(
        img,
        center=tuple(map(int, rect[0])),
        radius=3,
        color=(255, 255, 255),
        thickness=5,
    )

    cv2.putText(
        img,
        text=str(idx),
        org=tuple(map(int, top_left)),
        fontFace=cv2.FONT_HERSHEY_SIMPLEX,
        fontScale=2,
        color=(255, 255, 255),
        thickness=3,
    )
    return img


def map_values(
    rect: tuple, img: np.ndarray, car_steer: float = 0
) -> tuple[float, float]:
    """
    Map the values for the steer to (-1, 1)
    and the values for the throttle to (0, 1)

    Parameters
    ----------
    rect: tuple
        the rectangle that is used to determine the throttle and steering angle
    img: np.ndarray
        the image that is used to determine the mapping
    Return
    ------
    throttle: float
        the throttle for the car
    steer: float
        the steering angle for the car
    """
    center = rect[0]
    img_center = (img.shape[1] // 2, img.shape[0] // 1.2)

    offset = center[0] - img_center[0]  # offset from the center of the image
    offset = offset / img.shape[1]  # normalize the offset

    d1 = rect[1][0]
    d2 = rect[1][1]
    width = min(rect[1][1], rect[1][0])
    height = max(rect[1][1], rect[1][0])
    angle = rect[2]

    # rounding to the nearest 5
    width = int(5 * round(width / 5))
    angle = int(5 * round(angle / 5))

    if angle in (0, 90, -0, -90, 0.0, 90.0, -0.0, -90.0):
        angle = 0

    elif d1 < d2:
        angle = 90 - angle

    else:
        angle = -angle

    throttle = max(width / (120 + car_steer), 0.4)  # A trial and error value
    steer = angle / (90 + throttle * 100) + (offset)

    return throttle, steer

In [3]:
def show_process_image(img):
    img = warp_img2(img)
    img_warp = img.copy()
    img = gaussian_blur(img)
    # img = harris_corner(img)
    img = canny_edge(img)
    img_canny = img.copy()
    img = dilation(img)
    img_w, img_h = img.shape[1], img.shape[0]

    lines = detect_hough_lines(img)
    if lines is None:
        return img, np.zeros((img_h, img_w), dtype=np.uint8)

    img_hou = np.zeros((img_h, img_w), dtype=np.uint8)
    img_hou = draw_hough_lines(img_hou, lines)

    rects, bottom_center_rect_idx = detect_correct_mark(img_hou)

    throttle, steer = map_values(rects[bottom_center_rect_idx], img_hou)
    # return throttle, steer

    img_hou = cv2.cvtColor(img_hou, cv2.COLOR_GRAY2RGB)

    center_bottom = (img_hou.shape[1] // 2, img_hou.shape[0] // 1.2)
    cv2.circle(
        img_hou,
        center=tuple(map(int, center_bottom)),
        radius=50,
        color=(0, 0, 255),
        thickness=10,
    )
    for i, rect in enumerate(rects):
        img_hou = draw_rectangle_features(img_hou, rect, idx=i)
    
    cv2.putText(
        img_hou,
        text=f"{bottom_center_rect_idx} SELECTED",
        org=(0, 50),
        fontFace=cv2.FONT_HERSHEY_SIMPLEX,
        fontScale=1,
        color=(0, 255, 255),
        thickness=3,
    )
    # with open("data.txt", "a") as f:
    #     f.write(f"width: {width}, angle: {angle}, throttle: {throttle:.2f}, steer: {steer:.2f}\n")
    return img_warp, img_hou


def process_image(img):
    img = warp_img(img)
    img = gaussian_blur(img)
    img = canny_edge(img)
    img = dilation(img)
    img_w, img_h = img.shape[1], img.shape[0]

    lines = detect_hough_lines(img)
    if lines is None:
        return -1, 0

    img_hou = np.zeros((img_h, img_w), dtype=np.uint8)
    draw_hough_lines(img_hou, lines)

    rects, bottom_center_rect_idx = detect_correct_mark(img_hou)

    throttle, steer = map_values(rects[bottom_center_rect_idx], img_hou)
    return steer, throttle

In [4]:
def plot_imgs(imgs, titles):
    rows = int(np.ceil(len(imgs) / 3))
    cols = 3
    figsize = (cols * 5, rows * 6)
    _, axs = plt.subplots(rows, cols, figsize=figsize)
    for img, title, ax in zip(imgs, titles, axs.flatten()):
        ax.imshow(img, cmap="gray")
        ax.set_title(title)
        # ax.axis("off")

    plt.tight_layout()
    plt.show()

### Expermintation results
1. Angle 0, Speed 0 detection is working decently
2. Angle 0, Speed 5 detection is not workin correctly if the first segment is cropped at the bottom of the image.
3. Angle 0, Speed 15 detection is working correctly in terms of the anlge, but the speed is not detected correctly and it depends on the distance of the segment from the bottom of the image. Some form of normalization is needed.
4. Angle 0, Speed 30 detection is same as above.

5. Angle 10, Speed 5 detection seems to working for the most part. Some rounding to values needs to be done.
6. Angle 10, Speed 15 detection is similar to above. There migh be an overlap with the angle 20.

7. Angle 20, Speed 5 detection is not workin correctly if the first segment is cropped at the bottom of the image.
8. Angle 20, Speed 15 detection is 

<br><br>

##### Notes:
- Using a smaller size for the images would greatly improve the performance.

In [5]:
# List of image paths
path = r"C:\Users\medha\AppData\Local\Temp\airsim_car"
image_paths = os.listdir(path)
image_paths = [os.path.join(path, img) for img in image_paths]

# Index to track the current image
current_image_idx = 0

# Function to update images
def update_images(delta):
    global current_image_idx
    current_image_idx = (current_image_idx + delta) % len(image_paths)
    img = cv2.imread(image_paths[current_image_idx], cv2.IMREAD_COLOR)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img_warp, img_hou = show_process_image(cv2.cvtColor(img, cv2.COLOR_RGB2GRAY))
    
    # Update subplots
    axs[0].imshow(img)
    axs[0].set_title('Original')
    axs[0].axis('off')

    axs[1].imshow(img_warp, cmap='gray')
    axs[1].set_title('Warped')
    axs[1].axis('off')

    axs[2].imshow(img_hou, cmap='gray')
    axs[2].set_title('Hough')
    axs[2].axis('off')
    plt.draw()

# Button event handlers
def next_image(event):
    update_images(1)

def prev_image(event):
    update_images(-1)


# Create figure and axis
fig, axs = plt.subplots(1, 3, figsize=(10, 5))

# Initialize subplots with the first image
update_images(0)

# Adjust the bottom to make room for buttons
plt.subplots_adjust(bottom=0.2)  

# Add buttons
axprev = plt.axes([0.1, 0.05, 0.1, 0.075])
axnext = plt.axes([0.8, 0.05, 0.1, 0.075])
bnext = Button(axnext, 'Next')
bprev = Button(axprev, 'Previous')
bnext.on_clicked(next_image)
bprev.on_clicked(prev_image)

plt.show(block=True)

In [10]:
%%timeit
# start_time = time.perf_counter()

folder = r"C:\Users\medha\AppData\Local\Temp\airsim_car"
images = os.listdir(folder)

for img_path in images:
    img = cv2.imread(os.path.join(folder, img_path), cv2.IMREAD_GRAYSCALE)
    steer, throttle = process_image(img)
# end_time = time.perf_counter()
# print(f"Time taken: {end_time - start_time:.2f} seconds for {len(images)} images")

29.4 s ± 382 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [7]:
# %%timeit
cap = cv2.VideoCapture("imgs/output.mp4")

while cap.isOpened():
    ret, frame = cap.read()
    if cv2.waitKey(1) & 0xFF == ord("q") or ret == False:
        break
    frame_warp, frame_hou = show_process_image(frame)
    cv2.imshow("Frame", frame)
    cv2.imshow("Frame Warp", frame_warp)
    cv2.imshow("Frame Hough", frame_hou)
    # time.sleep(0.1)

cap.release()
cv2.destroyAllWindows()