# USING THREADING INSTEAD OF SEQUENTIAL

# Libraries

In [1]:
# %pip install keyboard
# %pip install mss
# %pip install numpy
# %pip install matplotlib
# %pip install opencv-python

import pyautogui as py
import pygetwindow as gw

import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import PIL as ImageGrab

import keyboard

import os
import math
import time
import numpy
from mss import mss
import cv2
import numpy as np
import time
import pydirectinput
import threading
from datetime import datetime
# %pip install line_profiler
from line_profiler import LineProfiler

import mss.tools  # Using mss.tools for safer access

# Edge Detection

In [1]:
import cv2 # Import the OpenCV library to enable computer vision
import numpy as np # Import the NumPy scientific computing library
import matplotlib.pyplot as plt # Used for plotting and error checking
 
def binary_array(array, thresh, value=0):
  """
  Return a 2D binary array (mask) in which all pixels are either 0 or 1
     
  :param array: NumPy 2D array that we want to convert to binary values
  :param thresh: Values used for thresholding (inclusive)
  :param value: Output value when between the supplied threshold
  :return: Binary 2D array...
           number of rows x number of columns = 
           number of pixels from top to bottom x number of pixels from
             left to right 
  """
  if value == 0:
    # Create an array of ones with the same shape and type as 
    # the input 2D array.
    binary = np.ones_like(array) 
         
  else:
    # Creates an array of zeros with the same shape and type as 
    # the input 2D array.
    binary = np.zeros_like(array)  
    value = 1
 
  # If value == 0, make all values in binary equal to 0 if the 
  # corresponding value in the input array is between the threshold 
  # (inclusive). Otherwise, the value remains as 1. Therefore, the pixels 
  # with the high Sobel derivative values (i.e. sharp pixel intensity 
  # discontinuities) will have 0 in the corresponding cell of binary.
  binary[(array >= thresh[0]) & (array <= thresh[1])] = value
 
  return binary
 
def blur_gaussian(channel, ksize=3):
  """
  Implementation for Gaussian blur to reduce noise and detail in the image
     
  :param image: 2D or 3D array to be blurred
  :param ksize: Size of the small matrix (i.e. kernel) used to blur
                i.e. number of rows and number of columns
  :return: Blurred 2D image
  """
  return cv2.GaussianBlur(channel, (ksize, ksize), 0)
         
def mag_thresh(image, sobel_kernel=3, thresh=(0, 255)):
  """
  Implementation of Sobel edge detection
 
  :param image: 2D or 3D array to be blurred
  :param sobel_kernel: Size of the small matrix (i.e. kernel) 
                       i.e. number of rows and columns
  :return: Binary (black and white) 2D mask image
  """
  # Get the magnitude of the edges that are vertically aligned on the image
  sobelx = np.absolute(sobel(image, orient='x', sobel_kernel=sobel_kernel))
         
  # Get the magnitude of the edges that are horizontally aligned on the image
  sobely = np.absolute(sobel(image, orient='y', sobel_kernel=sobel_kernel))
 
  # Find areas of the image that have the strongest pixel intensity changes
  # in both the x and y directions. These have the strongest gradients and 
  # represent the strongest edges in the image (i.e. potential lane lines)
  # mag is a 2D array .. number of rows x number of columns = number of pixels
  # from top to bottom x number of pixels from left to right
  mag = np.sqrt(sobelx ** 2 + sobely ** 2)
 
  # Return a 2D array that contains 0s and 1s   
  return binary_array(mag, thresh)
 
def sobel(img_channel, orient='x', sobel_kernel=3):
  """
  Find edges that are aligned vertically and horizontally on the image
     
  :param img_channel: Channel from an image
  :param orient: Across which axis of the image are we detecting edges?
  :sobel_kernel: No. of rows and columns of the kernel (i.e. 3x3 small matrix)
  :return: Image with Sobel edge detection applied
  """
  # cv2.Sobel(input image, data type, prder of the derivative x, order of the
  # derivative y, small matrix used to calculate the derivative)
  if orient == 'x':
    # Will detect differences in pixel intensities going from 
        # left to right on the image (i.e. edges that are vertically aligned)
    sobel = cv2.Sobel(img_channel, cv2.CV_64F, 1, 0, sobel_kernel)
  if orient == 'y':
    # Will detect differences in pixel intensities going from 
    # top to bottom on the image (i.e. edges that are horizontally aligned)
    sobel = cv2.Sobel(img_channel, cv2.CV_64F, 0, 1, sobel_kernel)
 
  return sobel
 
def threshold(channel, thresh=(128,255), thresh_type=cv2.THRESH_BINARY):
  """
  Apply a threshold to the input channel
     
  :param channel: 2D array of the channel data of an image/video frame
  :param thresh: 2D tuple of min and max threshold values
  :param thresh_type: The technique of the threshold to apply
  :return: Two outputs are returned:
             ret: Threshold that was used
             thresholded_image: 2D thresholded data.
  """
  # If pixel intensity is greater than thresh[0], make that value
  # white (255), else set it to black (0)
  return cv2.threshold(channel, thresh[0], thresh[1], thresh_type)


# Sliding Window

In [2]:
import cv2
import numpy as np
import matplotlib.pyplot as plt

# Create edge detection module
class EdgeDetection:
    def threshold(self, img, thresh=(0, 255)):
        binary = np.zeros_like(img)
        binary[(img >= thresh[0]) & (img <= thresh[1])] = 255
        return True, binary

    def blur_gaussian(self, img, ksize=3):
        return cv2.GaussianBlur(img, (ksize, ksize), 0)

    def mag_thresh(self, img, sobel_kernel=3, thresh=(0, 255)):
        sobelx = cv2.Sobel(img, cv2.CV_64F, 1, 0, ksize=sobel_kernel)
        sobely = cv2.Sobel(img, cv2.CV_64F, 0, 1, ksize=sobel_kernel)
        gradmag = np.sqrt(sobelx**2 + sobely**2)
        scale_factor = np.max(gradmag)/255
        gradmag = (gradmag/scale_factor).astype(np.uint8)
        binary_output = np.zeros_like(gradmag)
        binary_output[(gradmag >= thresh[0]) & (gradmag <= thresh[1])] = 255
        return binary_output

edge = EdgeDetection()
filename = 'D:/Thinkin in programming/Metopen/my_game_screenshots/screenshot_4.png'
 
class Lane:
  """
  Represents a lane on a road.
  """
  def __init__(self, orig_frame):
    """
      Default constructor
         
    :param orig_frame: Original camera image (i.e. frame)
    """
    self.orig_frame = orig_frame

    # This will hold an image with the lane lines       
    self.lane_line_markings = None

    # Previous polynomial coefficients
    self.prev_left_fit = None
    self.prev_right_fit = None
 
    # This will hold the image after perspective transformation
    self.warped_frame = None
    self.transformation_matrix = None
    self.inv_transformation_matrix = None
 
    # (Width, Height) of the original video frame (or image)
    self.orig_image_size = self.orig_frame.shape[::-1][1:]
 
    width = self.orig_image_size[0]
    height = self.orig_image_size[1]
    self.width = width
    self.height = height
     
    ''' Static ROI points.
      This ROI are based on the screen dimensions of 1920 x 1080 pixels. 
      The roi must satisfy the game screen dimensions.
    
    '''
    # self.roi_points = np.float32([
    #         (634, 457),   # Top-left
    #         (1266, 457),  # Top-right
    #         (1700, 600),  # Bottom-right
    #         (200, 600)    # Bottom-left
    # ])

    # ''' for width": 1270, "height": 813 screen resolution '''
    # self.roi_points = np.float32([
    #       (419, 344),  # Top-left
    #       (837, 344),  # Top-right
    #       (1124, 452), # Bottom-right
    #       (132, 452)   # Bottom-left
    #   ])
    
    ''' for 800 x 600 screen resolution '''
    self.roi_points = np.float32([
        (266, 200),  # Top-left
        (532, 200),  # Top-right
        (700, 300),  # Bottom-right
        (100, 300)  # Bottom-left
    ])
    
    """
    Calculate ROI points based on screen dimensions
    Returns points in clockwise order: top-left, top-right, bottom-right, bottom-left
    """ 
    # # # Define points as percentages of width and height
    # self.roi_points = np.float32([
    #     (self.width * 0.25, self.height * 0.5),   # Top-left (25% from left, 50% down)
    #     (self.width * 0.75, self.height * 0.5),   # Top-right (75% from left, 50% down)
    #     (self.width * 0.85, self.height * 0.9),   # Bottom-right (85% from left, 90% down)
    #     (self.width * 0.15, self.height * 0.9)    # Bottom-left (15% from left, 90% down)
    #     ])
    
    # The desired corner locations  of the region of interest
    # after we perform perspective transformation.
    # Assume image width of 600, padding == 150.
    self.padding = int(0.25 * width) # padding from side of the image in pixels

    self.desired_roi_points = np.float32([
      [self.padding, 0], # Top-left corner
      [self.padding, self.orig_image_size[1]], # Bottom-left corner         
      [self.orig_image_size[
        0]-self.padding, self.orig_image_size[1]], # Bottom-right corner
      [self.orig_image_size[0]-self.padding, 0] # Top-right corner
    ]) 
         
    # Histogram that shows the white pixel peaks for lane line detection
    self.histogram = None
         
    # Sliding window parameters
    # no of windows
    self.no_of_windows = 10

    # window size
    self.margin = int((1/12) * width)  # Window width is +/- margin
    self.minpix = int((1/24) * width)  # Min no. of pixels to recenter window
         
    # Best fit polynomial lines for left line and right line of the lane
    self.left_fit = None
    self.right_fit = None
    self.left_lane_inds = None
    self.right_lane_inds = None
    self.ploty = None
    self.left_fitx = None
    self.right_fitx = None
    self.leftx = None
    self.rightx = None
    self.lefty = None
    self.righty = None
         
    # Pixel parameters for x and y dimensions
    self.YM_PER_PIX = 10.0 / 1000 # meters per pixel in y dimension
    self.XM_PER_PIX = 3.7 / 781 # meters per pixel in x dimension
         
    # Radii of curvature and offset
    self.left_curvem = None
    self.right_curvem = None
    self.center_offset = None
 
  def calculate_car_position(self, print_to_terminal=False):
    """
    Calculate the position of the car relative to the center
         
    :param: print_to_terminal Display data to console if True       
    :return: Offset from the center of the lane
    """
    # Assume the camera is centered in the image.
    # Get position of car in centimeters
    # In warped space (800x600), car is assumed at bottom center of ROI
    car_location = (self.desired_roi_points[2][0] + self.desired_roi_points[3][0]) / 2
 
    # Lane bottoms in warped space (green lines)
    height = self.warped_frame.shape[0]  # 600
    bottom_left = self.left_fitx[-1]  # Last point of left green line
    bottom_right = self.right_fitx[-1]  # Last point of right green line

    # center of the lane in warped space 
    center_lane = (bottom_right + bottom_left)/2

    # offset in pixels
    center_offset_px = car_location - center_lane

    # Convert to centimeters using XM_PER_PIX (adjusted for game)
    center_offset = center_offset_px * self.XM_PER_PIX * 100

    lane_width_px = bottom_right - bottom_left
    lane_width_m = lane_width_px * self.XM_PER_PIX * 100
    
    # Assume car width ~0.5m, green lines ~0.1m each
    offset_adjustment = (0.5 / lane_width_m) * (bottom_right - bottom_left) * self.XM_PER_PIX * 100 / 2
    center_offset -= offset_adjustment if center_offset > 0 else -offset_adjustment
 
    if print_to_terminal == True:
      print(str(center_offset) + 'cm')
             
    self.center_offset = center_offset
       
    return center_offset
 
  def calculate_curvature(self, print_to_terminal=False):
    """
    Calculate the road curvature in meters.
 
    :param: print_to_terminal Display data to console if True
    :return: Radii of curvature
    """
    ym_per_pix = 10/600 # meters per pixel in y dimension
    xm_per_pix = 2/800 #adjust for 800 x 600 screen resolution
    ploty = self.ploty
    left_fitx = self.left_fitx
    right_fitx = self.right_fitx

    # Define y-value where we want radius of curvature
    left_fit_cr = np.polyfit(ploty * ym_per_pix, left_fitx * xm_per_pix, 2)
    right_fit_cr = np.polyfit(ploty * ym_per_pix, right_fitx * xm_per_pix, 2)

    y_eval = np.max(ploty)
    left_curverad = ((1 + (2 * left_fit_cr[0] * y_eval * ym_per_pix + left_fit_cr[1])**2)**1.5) / np.absolute(2 * left_fit_cr[0])
    right_curverad = ((1 + (2 * right_fit_cr[0] * y_eval * ym_per_pix + right_fit_cr[1])**2)**1.5) / np.absolute(2 * right_fit_cr[0])
    
    self.left_curvem = left_curverad
    self.right_curvem = right_curverad

    return left_curverad, right_curverad     
         
  def calculate_histogram(self,frame=None,plot=True):
    """
    Calculate the image histogram to find peaks in white pixel count
         
    :param frame: The warped image
    :param plot: Create a plot if True
    """
    if frame is None:
      frame = self.warped_frame
             
    # Generate the histogram
    self.histogram = np.sum(frame[int(
              frame.shape[0]/2):,:], axis=0)
 
    if plot == True:
         
      # Draw both the image and the histogram
      figure, (ax1, ax2) = plt.subplots(2,1) # 2 row, 1 columns
      figure.set_size_inches(10, 5)
      ax1.imshow(frame, cmap='gray')
      ax1.set_title("Warped Binary Frame")
      ax2.plot(self.histogram)
      ax2.set_title("Histogram Peaks")
      plt.show()
             
    return self.histogram
 
  def display_curvature_offset(self, frame=None, plot=False):
    """
    Display curvature and offset statistics on the image
         
    :param: plot Display the plot if True
    :return: Image with lane lines and curvature
    """
    image_copy = None
    if frame is None:
      image_copy = self.orig_frame.copy()
    else:
      image_copy = frame
 
    cv2.putText(image_copy,'Curve Radius: '+str((
      self.left_curvem+self.right_curvem)/2)[:7]+' m', (int((
      5/600)*self.width), int((
      20/338)*self.height)), cv2.FONT_HERSHEY_SIMPLEX, (float((
      0.5/600)*self.width)),(
      255,255,255),2,cv2.LINE_AA)
    cv2.putText(image_copy,'Center Offset: '+str(
      self.center_offset)[:7]+' cm', (int((
      5/600)*self.width), int((
      40/338)*self.height)), cv2.FONT_HERSHEY_SIMPLEX, (float((
      0.5/600)*self.width)),(
      255,255,255),2,cv2.LINE_AA)
    # debug
  
             
    if plot==True:       
      cv2.imshow("Image with Curvature and Offset", image_copy)
 
    return image_copy
     
  def get_lane_line_previous_window(self, left_fit, right_fit, plot=False):
    """
    Use the lane line from the previous sliding window to get the parameters
    for the polynomial line for filling in the lane line
    :param: left_fit Polynomial function of the left lane line
    :param: right_fit Polynomial function of the right lane line
    :param: plot To display an image or not
    """
    # margin is a sliding window parameter
    margin = self.margin
 
    # Find the x and y coordinates of all the nonzero 
    # (i.e. white) pixels in the frame.         
    nonzero = self.warped_frame.nonzero()  
    nonzeroy = np.array(nonzero[0])
    nonzerox = np.array(nonzero[1])
         
    # Store left and right lane pixel indices
    left_lane_inds = ((nonzerox > (left_fit[0]*(
      nonzeroy**2) + left_fit[1]*nonzeroy + left_fit[2] - margin)) & (
      nonzerox < (left_fit[0]*(
      nonzeroy**2) + left_fit[1]*nonzeroy + left_fit[2] + margin))) 
    right_lane_inds = ((nonzerox > (right_fit[0]*(
      nonzeroy**2) + right_fit[1]*nonzeroy + right_fit[2] - margin)) & (
      nonzerox < (right_fit[0]*(
      nonzeroy**2) + right_fit[1]*nonzeroy + right_fit[2] + margin)))           
    self.left_lane_inds = left_lane_inds
    self.right_lane_inds = right_lane_inds
 
    # Get the left and right lane line pixel locations  
    leftx = nonzerox[left_lane_inds]
    lefty = nonzeroy[left_lane_inds] 
    rightx = nonzerox[right_lane_inds]
    righty = nonzeroy[right_lane_inds]  
 
    self.leftx = leftx
    self.rightx = rightx
    self.lefty = lefty
    self.righty = righty        
     
    # Fit a second order polynomial curve to each lane line
    left_fit = np.polyfit(lefty, leftx, 2)
    right_fit = np.polyfit(righty, rightx, 2)
    self.left_fit = left_fit
    self.right_fit = right_fit
         
    # Create the x and y values to plot on the image
    ploty = np.linspace(
      0, self.warped_frame.shape[0]-1, self.warped_frame.shape[0]) 
    left_fitx = left_fit[0]*ploty**2 + left_fit[1]*ploty + left_fit[2]
    right_fitx = right_fit[0]*ploty**2 + right_fit[1]*ploty + right_fit[2]
    self.ploty = ploty
    self.left_fitx = left_fitx
    self.right_fitx = right_fitx
         
    if plot==True:
         
      # Generate images to draw on
      out_img = np.dstack((self.warped_frame, self.warped_frame, (
                           self.warped_frame)))*255
      window_img = np.zeros_like(out_img)
             
      # Add color to the left and right line pixels
      out_img[nonzeroy[left_lane_inds], nonzerox[left_lane_inds]] = [255, 0, 0]
      out_img[nonzeroy[right_lane_inds], nonzerox[right_lane_inds]] = [
                                                                     0, 0, 255]
      # Create a polygon to show the search window area, and recast 
      # the x and y points into a usable format for cv2.fillPoly()
      margin = self.margin
      left_line_window1 = np.array([np.transpose(np.vstack([
                                    left_fitx-margin, ploty]))])
      left_line_window2 = np.array([np.flipud(np.transpose(np.vstack([
                                    left_fitx+margin, ploty])))])
      left_line_pts = np.hstack((left_line_window1, left_line_window2))
      right_line_window1 = np.array([np.transpose(np.vstack([
                                     right_fitx-margin, ploty]))])
      right_line_window2 = np.array([np.flipud(np.transpose(np.vstack([
                                     right_fitx+margin, ploty])))])
      right_line_pts = np.hstack((right_line_window1, right_line_window2))
             
      # Draw the lane onto the warped blank image
      cv2.fillPoly(window_img, np.int_([left_line_pts]), (0,255, 0))
      cv2.fillPoly(window_img, np.int_([right_line_pts]), (0,255, 0))
      result = cv2.addWeighted(out_img, 1, window_img, 0.3, 0)
       
      # Plot the figures 
      figure, (ax1, ax2, ax3) = plt.subplots(3,1) # 3 rows, 1 column
      figure.set_size_inches(10, 10)
      figure.tight_layout(pad=3.0)
      ax1.imshow(cv2.cvtColor(self.orig_frame, cv2.COLOR_BGR2RGB))
      ax2.imshow(self.warped_frame, cmap='gray')
      ax3.imshow(result)
      ax3.plot(left_fitx, ploty, color='yellow')
      ax3.plot(right_fitx, ploty, color='yellow')
      ax1.set_title("Original Frame")  
  def get_lane_line_indices_sliding_windows(self, plot=False):
    histogram = self.histogram
    midpoint = int(histogram.shape[0] / 2)
    leftx_base = np.argmax(histogram[:midpoint])
    rightx_base = np.argmax(histogram[midpoint:]) + midpoint

    # If we don't have previous fits, start with base points
    if self.prev_left_fit is None:
        leftx_current = leftx_base
        rightx_current = rightx_base
    histogram = self.histogram
    midpoint = int(histogram.shape[0] / 2)
    leftx_base = np.argmax(histogram[:midpoint])
    rightx_base = np.argmax(histogram[midpoint:]) + midpoint
    
    nwindows = 10
    window_height = int(self.warped_frame.shape[0] / nwindows)
    nonzero = self.warped_frame.nonzero()
    nonzeroy = np.array(nonzero[0])
    nonzerox = np.array(nonzero[1])
    
    leftx_current = leftx_base
    rightx_current = rightx_base
    margin = 100
    minpix = 50
    
    left_lane_inds = []
    right_lane_inds = []
    
    for window in range(nwindows):
        win_y_low = self.warped_frame.shape[0] - (window + 1) * window_height
        win_y_high = self.warped_frame.shape[0] - window * window_height
        
        if self.prev_left_fit is not None and window > 0:
            leftx_current = int(self.prev_left_fit[0] * (win_y_high**2) + self.prev_left_fit[1] * win_y_high + self.prev_left_fit[2])
            rightx_current = int(self.prev_right_fit[0] * (win_y_high**2) + self.prev_right_fit[1] * win_y_high + self.prev_right_fit[2])
        
        win_xleft_low = leftx_current - margin
        win_xleft_high = leftx_current + margin
        win_xright_low = rightx_current - margin
        win_xright_high = rightx_current + margin
        
        good_left_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & 
                          (nonzerox >= win_xleft_low) & (nonzerox < win_xleft_high)).nonzero()[0]
        good_right_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & 
                           (nonzerox >= win_xright_low) & (nonzerox < win_xright_high)).nonzero()[0]
        
        left_lane_inds.append(good_left_inds)
        right_lane_inds.append(good_right_inds)
        
        if len(good_left_inds) > minpix:
            leftx_current = int(np.mean(nonzerox[good_left_inds]))
        if len(good_right_inds) > minpix:
            rightx_current = int(np.mean(nonzerox[good_right_inds]))
    
    left_lane_inds = np.concatenate(left_lane_inds)
    right_lane_inds = np.concatenate(right_lane_inds)
    
    leftx = nonzerox[left_lane_inds]
    lefty = nonzeroy[left_lane_inds]
    rightx = nonzerox[right_lane_inds]
    righty = nonzeroy[right_lane_inds]
    
    left_fit = np.polyfit(lefty, leftx, 2)
    right_fit = np.polyfit(righty, rightx, 2)
    
    return left_fit, right_fit
 
  def get_line_markings(self, frame=None):
    if frame is None:
        frame = self.orig_frame
    # Faster edge detection (adjust thresholds as needed)
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    _, binary = cv2.threshold(gray, 120, 255, cv2.THRESH_BINARY)
    self.lane_line_markings = cv2.GaussianBlur(binary, (5, 5), 0)
    return self.lane_line_markings
         
  def histogram_peak(self):
    """
    Get the left and right peak of the histogram
 
    Return the x coordinate of the left histogram peak and the right histogram
    peak.
    """
    midpoint = int(self.histogram.shape[0]/2)
    leftx_base = np.argmax(self.histogram[:midpoint])
    rightx_base = np.argmax(self.histogram[midpoint:]) + midpoint
 
    # (x coordinate of left peak, x coordinate of right peak)
    return leftx_base, rightx_base
         
  def overlay_lane_lines(self, plot=False, left_color=(255, 0, 0),
                         right_color=(255, 0, 0), center_color=(0, 255, 0)):
    """
    Overlay detected lane lines on the original frame with custom colors and a center line.
    
    :param plot: If True, return the overlay plot (optional)
    :param left_color: BGR tuple for left lane (default green)
    :param right_color: BGR tuple for right lane (default red)
    :param center_color: BGR tuple for center line (default blue)
    :return: Frame with overlaid lanes
    """
    # Generate an image to draw the lane lines on 
    warp_zero = np.zeros_like(self.warped_frame).astype(np.uint8)
    color_warp = np.dstack((warp_zero, warp_zero, warp_zero))       
         
    # Recast the x and y points into usable format for cv2.fillPoly()
    pts_left = np.array([np.transpose(np.vstack([self.left_fitx, self.ploty]))])

    pts_right = np.array([np.flipud(np.transpose(np.vstack([self.right_fitx, self.ploty])))])

    pts = np.hstack((pts_left, pts_right))
         
    # Draw lane on the warped blank image
    cv2.fillPoly(color_warp, np.int_([pts]), (0,255, 255))

    # draw left and right lane lines
    for i in range(len(self.ploty)-1):
      cv2.line(color_warp, 
                (int(self.left_fitx[i]), int(self.ploty[i])), 
                (int(self.left_fitx[i+1]), int(self.ploty[i+1])), 
                left_color, 16)
      cv2.line(color_warp, 
                (int(self.right_fitx[i]), int(self.ploty[i])), 
                (int(self.right_fitx[i+1]), int(self.ploty[i+1])), 
                right_color, 16)
 
    # Draw center line
    center_x = (self.left_fitx + self.right_fitx) / 2
    for i in range(len(self.ploty)-1):
      cv2.line(color_warp, 
                (int(center_x[i]), int(self.ploty[i])), 
                (int(center_x[i+1]), int(self.ploty[i+1])), 
                center_color, 4)
      
    # Warp the blank back to original image space using inverse perspective 
    # matrix (Minv)
    # Warp back to original image space
    newwarp = cv2.warpPerspective(color_warp, self.inv_transformation_matrix, 
                                 (self.orig_frame.shape[1], self.orig_frame.shape[0]))
    
    # Combine with original image
    result = cv2.addWeighted(self.orig_frame, 1, newwarp, 0.3, 0)
    
    if plot:
        # Optional plotting code
        pass
    
    return result        
     
  def perspective_transform(self, frame=None, plot=False):
    """
    Perform the perspective transform.
    :param: frame Current frame
    :param: plot Plot the warped image if True
    :return: Bird's eye view of the current lane
    """
    if frame is None:
      frame = self.lane_line_markings
             
    # Calculate the transformation matrix
    self.transformation_matrix = cv2.getPerspectiveTransform(
      self.roi_points, self.desired_roi_points)
 
    # Calculate the inverse transformation matrix           
    self.inv_transformation_matrix = cv2.getPerspectiveTransform(
      self.desired_roi_points, self.roi_points)
 
    # Perform the transform using the transformation matrix
    self.warped_frame = cv2.warpPerspective(
      frame, self.transformation_matrix, self.orig_image_size, flags=(
     cv2.INTER_LINEAR)) 
 
    # Convert image to binary
    (thresh, binary_warped) = cv2.threshold(
      self.warped_frame, 127, 255, cv2.THRESH_BINARY)           
    self.warped_frame = binary_warped
 
    # Display the perspective transformed (i.e. warped) frame
    if plot == True:
      warped_copy = self.warped_frame.copy()
      warped_plot = cv2.polylines(warped_copy, np.int32([
                    self.desired_roi_points]), True, (147,20,255), 3)
 
      # Display the image
      while(1):
        cv2.imshow('Warped Image', warped_plot)
             
        # Press any key to stop
        if cv2.waitKey(0):
          break
 
      cv2.destroyAllWindows()   
             
    return self.warped_frame        
     
  def plot_roi(self, frame=None, plot=False):
    """
    Plot the region of interest on an image.
    :param: frame The current image frame
    :param: plot Plot the roi image if True
    """
    if plot == False:
      return
             
    if frame is None:
      frame = self.orig_frame.copy()
 
    # Overlay trapezoid on the frame
    this_image = cv2.polylines(frame, np.int32([
      self.roi_points]), True, (147,20,255), 3)
 
    # Display the image
    while(1):
      cv2.imshow('ROI Image', this_image)
             
      # Press any key to stop
      if cv2.waitKey(0):
        break
 
    cv2.destroyAllWindows()
     
def main():
     
  # Load a frame (or image)
  original_frame = cv2.imread(filename)
 
  # Create a Lane object
  lane_obj = Lane(orig_frame=original_frame)
 
  # Perform thresholding to isolate lane lines
  lane_line_markings = lane_obj.get_line_markings()
 
  # Plot the region of interest on the image
  lane_obj.plot_roi(plot=False)
 
  # Perform the perspective transform to generate a bird's eye view
  # If Plot == True, show image with new region of interest
  warped_frame = lane_obj.perspective_transform(plot=False)
 
  # Generate the image histogram to serve as a starting point
  # for finding lane line pixels
  histogram = lane_obj.calculate_histogram(plot=False)  
     
  # Find lane line pixels using the sliding window method 
  left_fit, right_fit = lane_obj.get_lane_line_indices_sliding_windows(
    plot=False)
 
  # Fill in the lane line
  lane_obj.get_lane_line_previous_window(left_fit, right_fit, plot=False)
     
  # Overlay lines on the original frame
  frame_with_lane_lines = lane_obj.overlay_lane_lines(plot=False)
 
  # Calculate lane line curvature (left and right lane lines)
  lane_obj.calculate_curvature(print_to_terminal=False)
 
  # Calculate center offset                                                                 
  lane_obj.calculate_car_position(print_to_terminal=False)
     
  # Display curvature and center offset on image
  frame_with_lane_lines2 = lane_obj.display_curvature_offset(
    frame=frame_with_lane_lines, plot=True)
     
  # Create the output file name by removing the '.jpg' part
  size = len(filename)
  new_filename = filename[:size - 4]
  new_filename = new_filename + '_thresholded.jpg'     
     
  # Save the new image in the working directory
  #cv2.imwrite(new_filename, lane_line_markings)
 
  # Display the image 
  #cv2.imshow("Image", lane_line_markings) 
     
  # Display the window until any key is pressed
  cv2.waitKey(0) 
     
  # Close all windows
  cv2.destroyAllWindows() 
     
main()

# Debugging screen res and roi coordinates

In [3]:
import cv2
import numpy as np
from mss import mss
import time


In [4]:
class LaneRealTime(Lane):
    def __init__(self, frame_shape):
        dummy_frame = np.zeros(frame_shape, dtype=np.uint8)
        super().__init__(dummy_frame)
        self.last_print_time = time.time()
        
        # ROI for 800x600 (scaled from your 1270x813)
        # Wider and taller ROI for better lane detection
        '''decent one'''
        # self.roi_points = np.float32([
        #     (274, 254),  # Top-left (screenshot-based)
        #     (517, 254),  # Top-right
        #     (748, 313),  # Bottom-right
        #     (13, 313)    # Bottom-left
        # ])

        '''wider roi'''
        # # Wider ROI points with same vertical position for 800 x 600
        # self.roi_points = np.float32([
        #     (200, 254),  # Top-left (moved left)
        #     (600, 254),  # Top-right (moved right)
        #     (790, 313),  # Bottom-right (moved right)
        #     (10, 313)    # Bottom-left (moved left)
        # ])

        # # Match bottom width even closer to original for 800 x  600
        # self.desired_roi_points = np.float32([
        #     [250, 0],    # Top-left (closer to 400px)
        #     [550, 0],    # Top-right
        #     [750, 600],  # Bottom-right (closer to 780px)
        #     [50, 600]    # Bottom-left
        # ])
        
        # for 400 x 300 screen resolution
        self.roi_points = np.float32([
            (137, 127),  # Top-left (274/2, 254/2)
            (258, 127),  # Top-right (517/2, 254/2)
            (374, 156),  # Bottom-right (748/2, 313/2)
            (6, 156)     # Bottom-left (13/2, 313/2)
        ])
        self.desired_roi_points = np.float32([
            [50, 0],     # Top-left (100/2)
            [350, 0],    # Top-right (700/2)
            [350, 300],  # Bottom-right (700/2, 600/2)
            [50, 300]    # Bottom-left (100/2, 600/2)
        ])

        self.XM_PER_PIX = 2 / 800
        self.YM_PER_PIX = 10 / 600
        self.transformation_matrix = cv2.getPerspectiveTransform(self.roi_points, self.desired_roi_points)
        self.inv_transformation_matrix = cv2.getPerspectiveTransform(self.desired_roi_points, self.roi_points)
        self.roi_image = None
        self.prev_left_fit = None
        self.prev_right_fit = None
        self.debug = False  # Toggle debug

    def process_frame(self, frame):
        self.orig_frame = frame

        '''Try Grayscale''' 
        # result decent
        # gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        # gray = cv2.GaussianBlur(gray, (5, 5), 0)

        # use hsv to detect green
        hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

        # Use Canny edge detection instead of HSV
        # gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
        # edges = cv2.Canny(gray, 50, 150)
        # edges = cv2.dilate(edges, None, iterations=1)
        
        # edges = cv2.resize(edges, 
        #                    (frame.shape[1], frame.shape[0]), 
        #                    interpolation=cv2.INTER_NEAREST)
        
        # # Apply ROI mask
        # mask = np.zeros_like(edges)
        # cv2.fillPoly(mask, [np.int32(self.roi_points)], 255)
        # self.lane_line_markings = cv2.bitwise_and(edges, mask)
        
        # if self.debug:
        #     cv2.imshow("Edge Mask", self.lane_line_markings)

        # # black lane markings are in the range of 0-10
        # lower_black = np.array([0, 0, 0])
        # upper_black = np.array([10, 255, 255])
        # black_mask = cv2.inRange(hsv, lower_black, upper_black)

        # try green marking
        lower_green = np.array([40, 50, 50])
        upper_green = np.array([80, 255, 255])
        green_mask = cv2.inRange(hsv, lower_green, upper_green)

        # Ensure green = 255 (white), non-green = 0 (black)
        self.lane_line_markings = green_mask  # Already binary (0 or 255)
        
        # Debug: Show mask before warp
        # cv2.imshow("Green Mask", self.lane_line_markings)
        
        # Warp
        self.warped_frame = cv2.warpPerspective(
            self.lane_line_markings, 
            self.transformation_matrix, 
            (400, 300), 
            flags=cv2.INTER_LINEAR
        )

        self.calculate_histogram(plot=False)
        try:
            left_fit, right_fit = self.get_lane_line_indices_sliding_windows(plot=False)
            if self.prev_left_fit is not None:
                left_fit = 0.8 * self.prev_left_fit + 0.2 * left_fit
                right_fit = 0.8 * self.prev_right_fit + 0.2 * right_fit
            self.prev_left_fit = left_fit
            self.prev_right_fit = right_fit
            self.get_lane_line_previous_window(left_fit, right_fit, plot=False)
            self.ploty = np.linspace(0, self.warped_frame.shape[0] - 1, self.warped_frame.shape[0])
            self.left_fitx = np.clip(left_fit[0] * self.ploty**2 + left_fit[1] * self.ploty + left_fit[2], 50, 350)
            self.right_fitx = np.clip(right_fit[0] * self.ploty**2 + right_fit[1] * self.ploty + right_fit[2], 50, 350)
            self.curve_radius = self.calculate_curvature()
            self.calculate_car_position()
            result = self.overlay_lane_lines(plot=False)
            result = self.display_curvature_offset(result, plot=False)
        except Exception as e:
            print(f"Lane detection failed: {e}")
            result = frame.copy()
        self.roi_image = frame.copy()
        cv2.polylines(self.roi_image, [np.int32(self.roi_points)], isClosed=True, color=(0, 0, 255), thickness=2)
        # cv2.imshow("Warped Frame", self.warped_frame)
        return result
    

    def print_log(self, print_to_terminal=False, interval=1):
        # how do I skip the print statement if the lane is not detected? so that 
        # it doesn't have to print the debug everytime
        current_time = time.time()
        if current_time - self.last_print_time < interval:
            return 
        
        try:
            if hasattr(self, "curve_radius") and hasattr(self, "center_offset"):
                self.last_print_time = current_time
                
                print_curves = self.calculate_curvature()
                car_offset = self.calculate_car_position()
                return f"Offset: {car_offset:.1f} cm, Radius: {print_curves:.1f} m"
            return 

        except Exception as e:
            print(f"Error: {e}")
            return "Error: No Lane Detected"
        

def real_time_lane_detection():
    sct = mss()
    monitor = {"top": 300, "left": 100, "width": 400, "height": 300}  # 800x600 window
    lane_detector = LaneRealTime((600, 800, 3))
    
    while True:
        screenshot = sct.grab(monitor)
        frame = np.array(screenshot)
        frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR)
        
        processed_frame = lane_detector.process_frame(frame)
        debug_log = lane_detector.print_log(True)
        if debug_log:
            print(debug_log)
        
        cv2.imshow("Lane Detection", processed_frame)
        cv2.imshow("ROI Image", lane_detector.roi_image)
        cv2.imshow("Warped Frame", lane_detector.warped_frame)
        # print(f"Offset: {lane_detector.center_offset:.1f} cm, Radius: {lane_detector.curve_radius:.1f} m")
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cv2.destroyAllWindows()

if __name__ == "__main__":
    real_time_lane_detection()

Error: unsupported format string passed to tuple.__format__
Error: No Lane Detected
Error: unsupported format string passed to tuple.__format__
Error: No Lane Detected
Error: unsupported format string passed to tuple.__format__
Error: No Lane Detected


  right_fit = np.polyfit(righty, rightx, 2)
  right_fit = np.polyfit(righty, rightx, 2)


Lane detection failed: expected non-empty vector for x
Lane detection failed: expected non-empty vector for x
Lane detection failed: expected non-empty vector for x
Lane detection failed: expected non-empty vector for x
Lane detection failed: expected non-empty vector for x
Lane detection failed: expected non-empty vector for x
Lane detection failed: expected non-empty vector for x
Lane detection failed: expected non-empty vector for x
Lane detection failed: expected non-empty vector for x
Lane detection failed: expected non-empty vector for x
Lane detection failed: expected non-empty vector for x
Lane detection failed: expected non-empty vector for x
Lane detection failed: expected non-empty vector for x
Lane detection failed: expected non-empty vector for x
Lane detection failed: expected non-empty vector for x
Lane detection failed: expected non-empty vector for x
Lane detection failed: expected non-empty vector for x
Lane detection failed: expected non-empty vector for x
Lane detec

  left_fit = np.polyfit(lefty, leftx, 2)


Error: unsupported format string passed to tuple.__format__
Error: No Lane Detected
Error: unsupported format string passed to tuple.__format__
Error: No Lane Detected
Error: unsupported format string passed to tuple.__format__
Error: No Lane Detected
Error: unsupported format string passed to tuple.__format__
Error: No Lane Detected
Error: unsupported format string passed to tuple.__format__
Error: No Lane Detected
Error: unsupported format string passed to tuple.__format__
Error: No Lane Detected
Error: unsupported format string passed to tuple.__format__
Error: No Lane Detected
Error: unsupported format string passed to tuple.__format__
Error: No Lane Detected
Error: unsupported format string passed to tuple.__format__
Error: No Lane Detected
Error: unsupported format string passed to tuple.__format__
Error: No Lane Detected
Error: unsupported format string passed to tuple.__format__
Error: No Lane Detected
Error: unsupported format string passed to tuple.__format__
Error: No Lane D

# Driving Algo

In [None]:
import cv2
import numpy as np
import mss
import time
import pydirectinput
import threading

class LaneRealTime(Lane):
    def __init__(self, frame_shape):
        dummy_frame = np.zeros(frame_shape, dtype=np.uint8)
        super().__init__(dummy_frame)
        self.roi_points = np.float32([
            (137, 127),  # Top-left
            (258, 127),  # Top-right
            (374, 156),  # Bottom-right
            (6, 156)     # Bottom-left
        ])
        self.desired_roi_points = np.float32([
            [50, 0],     # Top-left
            [350, 0],    # Top-right
            [350, 300],  # Bottom-right
            [50, 300]    # Bottom-left
        ])
        self.transformation_matrix = cv2.getPerspectiveTransform(self.roi_points, self.desired_roi_points)
        self.inv_transformation_matrix = cv2.getPerspectiveTransform(self.desired_roi_points, self.roi_points)
        self.roi_image = None
        self.frame_count = 0
        self.skip_frames = 6  # Increased to reduce load
        self.processing_scale = 0.15

    def process_frame(self, frame):
        self.frame_count += 1
        if self.frame_count % self.skip_frames != 0:
            return self.roi_image.copy() if self.roi_image is not None else np.zeros((300, 400, 3), dtype=np.uint8)
        
        self.orig_frame = frame
        small_frame = cv2.resize(frame, None, 
                                 fx=self.processing_scale, 
                                 fy=self.processing_scale, 
                                 interpolation=cv2.INTER_AREA)

        hsv = cv2.cvtColor(small_frame, cv2.COLOR_BGR2HSV)
        lower_green = np.array([40, 50, 50])
        upper_green = np.array([80, 255, 255])
        green_mask = cv2.inRange(hsv, lower_green, upper_green)
        
        green_mask = cv2.resize(green_mask, 
                                (frame.shape[1], frame.shape[0]), 
                                interpolation=cv2.INTER_NEAREST)
        
        self.lane_line_markings = green_mask
        
        self.warped_frame = cv2.warpPerspective(
            self.lane_line_markings, 
            self.transformation_matrix, 
            (400, 300), 
            flags=cv2.INTER_LINEAR
        )
        
        try:
            self.calculate_histogram(plot=False)
            left_fit, right_fit = self.get_lane_line_indices_sliding_windows(plot=False)
            self.get_lane_line_previous_window(left_fit, right_fit, plot=False)
            
            self.ploty = np.linspace(0, self.warped_frame.shape[0] - 1, self.warped_frame.shape[0])
            self.left_fitx = np.clip(left_fit[0] * self.ploty**2 + left_fit[1] * self.ploty + left_fit[2], 50, 350)
            self.right_fitx = np.clip(right_fit[0] * self.ploty**2 + right_fit[1] * self.ploty + right_fit[2], 50, 350)
            
            self.calculate_curvature()
            self.calculate_car_position()
            
            result = self.overlay_lane_lines(plot=False, left_color=(128, 0, 128), right_color=(128, 0, 128), center_color=(0, 0, 255))
            result = self.display_curvature_offset(result, plot=False)
        except Exception as e:
            print(f"Lane detection failed: {e}")
            result = frame.copy()
        
        self.roi_image = frame.copy()
        cv2.polylines(self.roi_image, [np.int32(self.roi_points)], isClosed=True, color=(0, 0, 255), thickness=2)
        
        return result

class AutoDriver:
    def __init__(self, screen_region):
        self.screen_region = screen_region
        self.steering_sensitivity = 0.7  # Increased for quicker turns
        self.speed_control = 0.1
        self.center_threshold = 15
        self.is_running = False
        self.current_speed = 0
        self.current_steering = 0
        self.last_control_time = time.time()
        self.control_delay = 0.05  # Reduced for faster updates
        self.keys = {'forward': 'w', 'left': 'a', 'right': 'd', 'brake': 'space'}
        self.debug = True
        self.drive_thread = None
        self.last_center_offset = 0
        self.last_steering = 0
        self.failure_count = 0
        self.lane_detector = LaneRealTime((300, 400, 3))

    def capture_screen(self):
        try:
            with mss.mss() as sct:
                sct_img = sct.grab(self.screen_region)
                return cv2.cvtColor(np.array(sct_img), cv2.COLOR_BGRA2BGR)
        except Exception as e:
            print(f"Screen capture error: {e}")
            return np.zeros((300, 400, 3), dtype=np.uint8)

    def control_steering(self, center_offset, curve_radius):
        raw_steering = center_offset / 100.0 * self.steering_sensitivity
        steering_value = 0.7 * self.last_steering + 0.3 * raw_steering
        self.last_steering = steering_value
        if abs(center_offset) < self.center_threshold:
            steering_value = steering_value * 0.4
        if curve_radius < 500:
            curve_factor = min(1.0, 300 / max(curve_radius, 50)) ** 2
            steering_value += curve_factor * 0.3 * (-1 if center_offset < 0 else 1)
        return max(-1.0, min(1.0, steering_value))

    def control_speed(self, curve_radius, center_offset):
        speed_value = self.speed_control
        if curve_radius < 500:
            speed_factor = min(1.0, curve_radius / 300) ** 2
            speed_value *= speed_factor
        if abs(center_offset) > 50:
            speed_value *= 0.8
        return max(0.05, speed_value)

    def apply_controls(self, steering_value, speed_value):
        if time.time() - self.last_control_time < self.control_delay:
            return
        self.last_control_time = time.time()
        self.reset_controls()
        press_duration = min(0.05, abs(steering_value) * 0.4)  # Increased multiplier
        if steering_value < -0.1:
            pydirectinput.keyDown(self.keys['left'])
            time.sleep(press_duration)
            pydirectinput.keyUp(self.keys['left'])
            self.current_steering = -1
        elif steering_value > 0.1:
            pydirectinput.keyDown(self.keys['right'])
            time.sleep(press_duration)
            pydirectinput.keyUp(self.keys['right'])
            self.current_steering = 1
        else:
            self.current_steering = 0
        if speed_value > 0.05:
            pydirectinput.keyDown(self.keys['forward'])
            self.current_speed = speed_value
        elif self.failure_count > 5:
            pydirectinput.keyDown(self.keys['brake'])
            time.sleep(0.1)
            pydirectinput.keyUp(self.keys['brake'])
            self.current_speed = 0

    def reset_controls(self):
        for key in self.keys.values():
            pydirectinput.keyUp(key)

    def drive_loop(self):
        frame_count = 0
        while self.is_running:
            start_time = time.time()
            frame = self.capture_screen()
            processed_frame = self.lane_detector.process_frame(frame)
            
            try:
                center_offset = self.lane_detector.calculate_car_position()
                curve_radius = (self.lane_detector.left_curvem + self.lane_detector.right_curvem) / 2
                self.last_center_offset = center_offset
                self.failure_count = 0
            except Exception as e:
                print(f"Calculation error: {e}")
                center_offset = self.last_center_offset
                curve_radius = 500
                self.failure_count += 1

            steering_value = self.control_steering(center_offset, curve_radius)
            speed_value = self.control_speed(curve_radius, center_offset)
            self.apply_controls(steering_value, speed_value)

            if self.debug:
                cv2.putText(processed_frame, f"Steering: {steering_value:.2f}", (10, 30), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                cv2.putText(processed_frame, f"Speed: {speed_value:.2f}", (10, 60), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                cv2.putText(processed_frame, f"Offset: {center_offset:.1f} cm", (10, 90), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                cv2.putText(processed_frame, f"Curve: {curve_radius:.1f} m", (10, 120), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                cv2.imshow("Lane Detection Driver", processed_frame)
                # Disabled ROI Image to boost FPS
                # if self.lane_detector.roi_image is not None and self.lane_detector.roi_image.size > 0:
                #     cv2.imshow("ROI Image", self.lane_detector.roi_image)
                if frame_count % 10 == 0:
                    fps = 1.0 / (time.time() - start_time)
                    print(f"FPS: {fps:.2f}, Steering: {steering_value:.2f}, Speed: {speed_value:.2f}")
            if cv2.waitKey(1) & 0xFF == ord('q'):
                self.stop()
                break
            frame_count += 1

    def start(self):
        if not self.is_running:
            self.is_running = True
            self.drive_thread = threading.Thread(target=self.drive_loop)
            self.drive_thread.daemon = True
            self.drive_thread.start()
            print("Autonomous driving started")

    def stop(self):
        if self.is_running:
            self.is_running = False
            self.reset_controls()
            if self.drive_thread:
                self.drive_thread.join(timeout=1.0)
            cv2.destroyAllWindows()
            print("Autonomous driving stopped")

def main():
    screen_region = {"top": 300, "left": 100, "width": 400, "height": 300}
    driver = AutoDriver(screen_region)
    print("Starting in 3 seconds...")
    for i in range(3, 0, -1):
        print(f"{i}...")
        time.sleep(1)
    driver.start()
    try:
        while driver.is_running:
            time.sleep(0.1)
    except KeyboardInterrupt:
        driver.stop()

if __name__ == "__main__":
    main()

Starting in 3 seconds...
3...
2...
1...
Autonomous driving started
Calculation error: 'NoneType' object has no attribute 'shape'
FPS: 1.86, Steering: 0.00, Speed: 0.10
Calculation error: 'NoneType' object has no attribute 'shape'
Calculation error: 'NoneType' object has no attribute 'shape'
Lane detection failed: expected non-empty vector for x
Calculation error: 'NoneType' object is not subscriptable
Calculation error: 'NoneType' object is not subscriptable
Calculation error: 'NoneType' object is not subscriptable
Autonomous driving stopped


# Try Optimize the FPS 

In [5]:
import cv2
import numpy as np
import mss
import time
import pydirectinput
import threading
from queue import Queue, Empty
from collections import deque

class OptimizedLaneDetector:
    def __init__(self, frame_shape):
        self.roi_points = np.float32([
            (137, 127),  # Top-left
            (258, 127),  # Top-right
            (374, 156),  # Bottom-right
            (6, 156)     # Bottom-left
        ])
        self.desired_roi_points = np.float32([
            [50, 0],     # Top-left
            [350, 0],    # Top-right
            [350, 300],  # Bottom-right
            [50, 300]    # Bottom-left
        ])
        self.transformation_matrix = cv2.getPerspectiveTransform(self.roi_points, self.desired_roi_points)
        self.roi_image = None
        self.frame_count = 0
        self.processing_scale = 0.15  # Reduced from 0.25
        
        # Pre-define color ranges
        self.lower_green = np.array([40, 50, 50])
        self.upper_green = np.array([80, 255, 255])
        
        # Store recent results for smoothing
        self.recent_offsets = deque(maxlen=5)
        self.recent_curves = deque(maxlen=5)
        
        # Last valid results (for fallback)
        self.last_center_offset = 0
        self.last_curve_radius = 500
        
        # Create small output image for visualization
        self.output_width = 200
        self.output_height = 150
        self.vis_image = np.zeros((self.output_height, self.output_width, 3), dtype=np.uint8)

    def detect_lanes_minimal(self, frame):
        """Simplified lane detection optimized for speed"""
        try:
            # Downscale image immediately
            small_frame = cv2.resize(frame, None, 
                                    fx=self.processing_scale, 
                                    fy=self.processing_scale, 
                                    interpolation=cv2.INTER_AREA)
            
            # HSV conversion and green mask
            hsv = cv2.cvtColor(small_frame, cv2.COLOR_BGR2HSV)
            green_mask = cv2.inRange(hsv, self.lower_green, self.upper_green)
            
            # Perspective transform (reduced size)
            warped = cv2.warpPerspective(
                green_mask, 
                self.transformation_matrix, 
                (self.output_width, self.output_height), 
                flags=cv2.INTER_LINEAR
            )
            
            # Split into regions of interest
            bottom_half = warped[self.output_height//2:, :]
            left_side = bottom_half[:, :self.output_width//2]
            right_side = bottom_half[:, self.output_width//2:]
            
            # Find lane positions
            if np.sum(left_side) > 50:  # Check if we have enough points
                left_indices = np.where(left_side > 0)
                if len(left_indices[1]) > 0:
                    left_x = np.mean(left_indices[1])
                else:
                    left_x = 0
            else:
                left_x = 0
                
            if np.sum(right_side) > 50:
                right_indices = np.where(right_side > 0)
                if len(right_indices[1]) > 0:
                    right_x = np.mean(right_indices[1]) + self.output_width//2
                else:
                    right_x = self.output_width
            else:
                right_x = self.output_width
            
            # Calculate center position
            lane_center = (left_x + right_x) / 2
            center_offset = lane_center - self.output_width/2
            
            # Scale offset to original size
            center_offset = center_offset * (400/self.output_width)
            
            # Estimate curve based on lane positions at different heights
            top_section = warped[:self.output_height//3, :]
            middle_section = warped[self.output_height//3:2*self.output_height//3, :]
            bottom_section = warped[2*self.output_height//3:, :]
            
            # Quick curve calculation based on lane position differences
            try:
                left_top = np.mean(np.where(top_section[:, :self.output_width//2] > 0)[1])
                left_bottom = np.mean(np.where(bottom_section[:, :self.output_width//2] > 0)[1])
                curve_factor = abs(left_top - left_bottom)
                curve_radius = 1000 / (curve_factor + 1) * 5
            except:
                curve_radius = 500
            
            # Clamp values
            curve_radius = min(1000, max(100, curve_radius))
            
            # Update recent values for smoothing
            self.recent_offsets.append(center_offset)
            self.recent_curves.append(curve_radius)
            
            # Apply smoothing
            smooth_offset = sum(self.recent_offsets) / len(self.recent_offsets)
            smooth_curve = sum(self.recent_curves) / len(self.recent_curves)
            
            # Update last valid results
            self.last_center_offset = smooth_offset
            self.last_curve_radius = smooth_curve
            
            # Create visualization (only if needed)
            self.vis_image = cv2.cvtColor(warped, cv2.COLOR_GRAY2BGR)
            cv2.line(self.vis_image, 
                    (int(left_x), self.output_height-1), 
                    (int(left_x), self.output_height//2), 
                    (0, 0, 255), 2)
            cv2.line(self.vis_image, 
                    (int(right_x), self.output_height-1), 
                    (int(right_x), self.output_height//2), 
                    (0, 0, 255), 2)
            cv2.line(self.vis_image, 
                    (int(lane_center), self.output_height-1), 
                    (int(lane_center), self.output_height//2), 
                    (0, 255, 0), 2)
            
            return {
                "center_offset": smooth_offset,
                "curve_radius": smooth_curve,
                "vis_image": self.vis_image,
                "success": True
            }
            
        except Exception as e:
            print(f"Lane detection error: {e}")
            return {
                "center_offset": self.last_center_offset,
                "curve_radius": self.last_curve_radius,
                "vis_image": self.vis_image,
                "success": False
            }

class OptimizedAutoDriver:
    def __init__(self, screen_region):
        self.screen_region = screen_region
        self.steering_sensitivity = 0.8  # Increased for better responsiveness
        self.speed_control = 0.12
        self.center_threshold = 10
        self.is_running = False
        
        # Control parameters
        self.current_speed = 0
        self.current_steering = 0
        self.last_control_time = time.time()
        self.control_delay = 0.03  # Reduced for faster updates
        self.keys = {'forward': 'w', 'left': 'a', 'right': 'd', 'brake': 'space'}
        
        # Threading and communication
        self.frame_queue = Queue(maxsize=1)  # Only keep most recent frame
        self.result_queue = Queue(maxsize=1)  # Only keep most recent result
        self.control_queue = Queue(maxsize=1)  # Only keep most recent control command
        
        # Create threads
        self.capture_thread = None
        self.processing_thread = None
        self.control_thread = None
        self.visualization_thread = None
        
        # Tracking variables
        self.failure_count = 0
        self.last_steering = 0
        self.debug = True
        self.fps_capture = 0
        self.fps_processing = 0
        self.fps_control = 0
        
        # Create lane detector
        self.lane_detector = OptimizedLaneDetector((300, 400, 3))
        
        # For visualization
        self.vis_frame = None
        
    def capture_screen_loop(self):
        """Thread for capturing screen frames"""
        last_time = time.time()
        frame_count = 0
        
        with mss.mss() as sct:
            while self.is_running:
                try:
                    # Capture screen without converting immediately
                    sct_img = sct.grab(self.screen_region)
                    # Convert to BGR and put in queue (replace old frame if queue full)
                    frame = cv2.cvtColor(np.array(sct_img), cv2.COLOR_BGRA2BGR)
                    
                    if self.frame_queue.full():
                        self.frame_queue.get()
                    self.frame_queue.put(frame)
                    
                    # Calculate FPS
                    frame_count += 1
                    if frame_count % 30 == 0:
                        self.fps_capture = 30 / (time.time() - last_time)
                        last_time = time.time()
                        frame_count = 0
                    
                except Exception as e:
                    print(f"Screen capture error: {e}")
                    time.sleep(0.01)
    
    def process_frames_loop(self):
        """Thread for processing frames"""
        last_time = time.time()
        frame_count = 0
        
        while self.is_running:
            try:
                # Get frame with timeout
                frame = self.frame_queue.get(timeout=0.1)
                
                # Process frame
                results = self.lane_detector.detect_lanes_minimal(frame)
                
                # Store original frame in results for visualization
                results["original_frame"] = frame
                
                # Put results in queue (replace old results if queue full)
                if self.result_queue.full():
                    self.result_queue.get()
                self.result_queue.put(results)
                
                # Calculate FPS
                frame_count += 1
                if frame_count % 10 == 0:
                    self.fps_processing = 10 / (time.time() - last_time)
                    last_time = time.time()
                    frame_count = 0
                    
            except Empty:
                pass
            except Exception as e:
                print(f"Processing error: {e}")
    
    def control_loop(self):
        """Thread for applying controls"""
        last_time = time.time()
        frame_count = 0
        
        while self.is_running:
            try:
                # Get results with timeout
                results = self.result_queue.get(timeout=0.1)
                center_offset = results["center_offset"]
                curve_radius = results["curve_radius"]
                
                # Calculate steering and speed
                steering_value = self.calculate_steering(center_offset, curve_radius)
                speed_value = self.calculate_speed(curve_radius, center_offset)
                
                # Store control values for visualization
                control_data = {
                    "steering": steering_value,
                    "speed": speed_value,
                    "center_offset": center_offset,
                    "curve_radius": curve_radius,
                    "success": results["success"]
                }
                
                # Put control data in queue (replace old data if queue full)
                if self.control_queue.full():
                    self.control_queue.get()
                self.control_queue.put(control_data)
                
                # Apply controls if enough time has passed
                if time.time() - self.last_control_time >= self.control_delay:
                    self.apply_controls(steering_value, speed_value)
                    self.last_control_time = time.time()
                
                # Update failure count
                if not results["success"]:
                    self.failure_count += 1
                else:
                    self.failure_count = max(0, self.failure_count - 1)
                
                # Calculate FPS
                frame_count += 1
                if frame_count % 10 == 0:
                    self.fps_control = 10 / (time.time() - last_time)
                    last_time = time.time()
                    frame_count = 0
                    
            except Empty:
                pass
            except Exception as e:
                print(f"Control error: {e}")
    
    def visualization_loop(self):
        """Thread for visualization (only runs if debug is enabled)"""
        last_update = time.time()
        
        while self.is_running and self.debug:
            try:
                # Only update visualization 10 times per second
                if time.time() - last_update < 0.1:
                    time.sleep(0.01)
                    continue
                
                # Get latest results and control data
                try:
                    results = self.result_queue.get(block=False)
                    self.result_queue.put(results)  # Put it back
                    
                    control_data = self.control_queue.get(block=False)
                    self.control_queue.put(control_data)  # Put it back
                    
                    original_frame = results["original_frame"]
                    vis_image = results["vis_image"]
                    
                    # Create visualization
                    display_frame = original_frame.copy()
                    
                    # Overlay lane detection visualization
                    h, w = vis_image.shape[:2]
                    display_frame[10:10+h, 10:10+w] = vis_image
                    
                    # Add ROI outline
                    cv2.polylines(display_frame, [np.int32(self.lane_detector.roi_points)], 
                                isClosed=True, color=(0, 0, 255), thickness=2)
                    
                    # Add text info
                    cv2.putText(display_frame, f"Steering: {control_data['steering']:.2f}", 
                                (10, h+30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
                    cv2.putText(display_frame, f"Speed: {control_data['speed']:.2f}", 
                                (10, h+60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
                    cv2.putText(display_frame, f"Offset: {control_data['center_offset']:.1f} cm", 
                                (10, h+90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
                    cv2.putText(display_frame, f"Curve: {control_data['curve_radius']:.1f} m", 
                                (10, h+120), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
                    
                    # Add FPS info
                    cv2.putText(display_frame, f"Capture: {self.fps_capture:.1f} FPS", 
                                (w+20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
                    cv2.putText(display_frame, f"Process: {self.fps_processing:.1f} FPS", 
                                (w+20, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
                    cv2.putText(display_frame, f"Control: {self.fps_control:.1f} FPS", 
                                (w+20, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
                    
                    # Show the frame
                    cv2.imshow("Lane Detection Driver", display_frame)
                    if cv2.waitKey(1) & 0xFF == ord('q'):
                        self.stop()
                    
                    self.vis_frame = display_frame
                    last_update = time.time()
                    
                except Empty:
                    pass
                
            except Exception as e:
                print(f"Visualization error: {e}")
                time.sleep(0.1)
    
    def calculate_steering(self, center_offset, curve_radius):
        """Calculate steering value based on center offset and curve radius"""
        # Base steering on center offset
        raw_steering = center_offset / 100.0 * self.steering_sensitivity
        
        # Apply smoothing with previous steering value
        steering_value = 0.6 * self.last_steering + 0.4 * raw_steering
        self.last_steering = steering_value
        
        # Reduce steering when near center
        if abs(center_offset) < self.center_threshold:
            steering_value = steering_value * 0.5
            
        # Add curve compensation
        if curve_radius < 500:
            curve_factor = min(1.0, 300 / max(curve_radius, 50)) ** 2
            curve_steering = curve_factor * 0.4 * (-1 if center_offset < 0 else 1)
            steering_value += curve_steering
            
        # Limit steering value
        return max(-1.0, min(1.0, steering_value))
    
    def calculate_speed(self, curve_radius, center_offset):
        """Calculate speed value based on curve radius and center offset"""
        # Base speed
        speed_value = self.speed_control
        
        # Reduce speed in curves
        if curve_radius < 500:
            curve_factor = min(1.0, curve_radius / 300) ** 2
            speed_value *= max(0.5, curve_factor)
            
        # Reduce speed if far from center
        if abs(center_offset) > 40:
            offset_factor = max(0.6, 1.0 - (abs(center_offset) - 40) / 100)
            speed_value *= offset_factor
            
        # Limit speed value
        return max(0.05, min(self.speed_control, speed_value))
    
    def apply_controls(self, steering_value, speed_value):
        """Apply steering and speed controls"""
        # Reset all controls first
        self.reset_controls()
        
        # Apply steering
        press_duration = min(0.03, abs(steering_value) * 0.3)
        if steering_value < -0.15:
            pydirectinput.keyDown(self.keys['left'])
            time.sleep(press_duration)
            pydirectinput.keyUp(self.keys['left'])
            self.current_steering = -1
        elif steering_value > 0.15:
            pydirectinput.keyDown(self.keys['right'])
            time.sleep(press_duration)
            pydirectinput.keyUp(self.keys['right'])
            self.current_steering = 1
        else:
            self.current_steering = 0
            
        # Apply speed
        if speed_value > 0.05:
            pydirectinput.keyDown(self.keys['forward'])
            self.current_speed = speed_value
        elif self.failure_count > 5:
            pydirectinput.keyDown(self.keys['brake'])
            time.sleep(0.1)
            pydirectinput.keyUp(self.keys['brake'])
            self.current_speed = 0
    
    def reset_controls(self):
        """Reset all controls"""
        for key in self.keys.values():
            pydirectinput.keyUp(key)
    
    def start(self):
        """Start all threads"""
        if not self.is_running:
            self.is_running = True
            
            # Create and start threads
            self.capture_thread = threading.Thread(target=self.capture_screen_loop)
            self.processing_thread = threading.Thread(target=self.process_frames_loop)
            self.control_thread = threading.Thread(target=self.control_loop)
            
            self.capture_thread.daemon = True
            self.processing_thread.daemon = True
            self.control_thread.daemon = True
            
            self.capture_thread.start()
            self.processing_thread.start()
            self.control_thread.start()
            
            # Start visualization thread if debug is enabled
            if self.debug:
                self.visualization_thread = threading.Thread(target=self.visualization_loop)
                self.visualization_thread.daemon = True
                self.visualization_thread.start()
                
            print("Optimized autonomous driving started")
    
    def stop(self):
        """Stop all threads"""
        if self.is_running:
            self.is_running = False
            self.reset_controls()
            
            # Wait for threads to finish
            if self.capture_thread:
                self.capture_thread.join(timeout=1.0)
            if self.processing_thread:
                self.processing_thread.join(timeout=1.0)
            if self.control_thread:
                self.control_thread.join(timeout=1.0)
            if self.visualization_thread:
                self.visualization_thread.join(timeout=1.0)
                
            cv2.destroyAllWindows()
            print("Autonomous driving stopped")

def main():
    screen_region = {"top": 300, "left": 100, "width": 400, "height": 300}
    driver = OptimizedAutoDriver(screen_region)
    
    print("Starting in 3 seconds...")
    for i in range(3, 0, -1):
        print(f"{i}...")
        time.sleep(1)
        
    driver.start()
    
    try:
        while True:
            time.sleep(0.1)
            if not driver.is_running:
                break
    except KeyboardInterrupt:
        driver.stop()

if __name__ == "__main__":
    main()

Starting in 3 seconds...
3...
2...
1...
Optimized autonomous driving started


  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


Visualization error: cannot join current thread


# CUda


In [1]:
import cv2
print(cv2.getBuildInformation())


  Version control:               4.11.0

  Platform:
    Timestamp:                   2025-01-16T09:57:07Z
    Host:                        Windows 10.0.17763 AMD64
    CMake:                       3.24.2
    CMake generator:             Visual Studio 14 2015
    CMake build tool:            MSBuild.exe
    MSVC:                        1900
    Configuration:               Debug Release
    Algorithm Hint:              ALGO_HINT_ACCURATE

  CPU/HW features:
    Baseline:                    SSE SSE2 SSE3
      requested:                 SSE3
    Dispatched code generation:  SSE4_1 SSE4_2 AVX FP16 AVX2
      requested:                 SSE4_1 SSE4_2 AVX FP16 AVX2 AVX512_SKX
      SSE4_1 (16 files):         + SSSE3 SSE4_1
      SSE4_2 (1 files):          + SSSE3 SSE4_1 POPCNT SSE4_2
      AVX (8 files):             + SSSE3 SSE4_1 POPCNT SSE4_2 AVX
      FP16 (0 files):            + SSSE3 SSE4_1 POPCNT SSE4_2 AVX FP16
      AVX2 (36 files):           + SSSE3 SSE4_1 POPCNT SSE4_2 AVX FP16 A

In [None]:
# run this command in terminal
conda install -c conda-forge opencv=4.5.5 cudatoolkit=11.2 cudnn=8.1.0