## Initial Settings

In [1]:
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
import time
import os
import sys
import math
import random
import numpy as np
import cv2
from PIL import Image
from IPython.display import display, clear_output
import ipywidgets as widgets
from pynq import Overlay, MMIO, PL, allocate
from pynq.lib.video import *
from pynq_dpu import DpuOverlay
import spidev
import colorsys
import keyboard
import threading


# 드라이빙 디렉토리 경로를 PYTHONPATH에 추가
sys.path.append(os.path.abspath('../driving'))

from yolo_utils import pre_process, evaluate
from config import KANAYAMA_CONFIG, HISTORY_CONFIG, MOTOR_ADDRESSES, ULTRASONIC_ADDRESSES, ADDRESS_RANGE, classes_path, anchors
from AutoLab_lib import init


from motor_controller import MotorController

# Initialize AutoLab library
init()

# Load the overlay
overlay = DpuOverlay("../dpu/dpu.bit")

# Load YOLOv3
overlay.load_model("../xmodel/tiny-yolov3_coco_256.xmodel")


    _     ___         ____           ____  
   / \   |_ _|       / ___|   ___   / ___| 
  / _ \   | |  _____ \___ \  / _ \ | |     
 / ___ \  | | |_____| ___) || (_) || |___  
/_/   \_\|___|       |____/  \___/  \____| 
                                           
 ____           _           _                
|  _ \   _ __  (_)__   __ (_)  _ __    __ _ 
| | | | | '__| | |\ \ / / | | | '_ \  / _` |
| |_| | | |    | | \ V /  | | | | | || (_| |
|____/  |_|    |_|  \_/   |_| |_| |_| \__, |
                                       |___/ 
  ____   _                    
 / ___| | |   __ _  ___  ___ 
| |     | |  / _` |/ __|/ __|
| |___  | | | (_| |\__ \__ \ 
 \____| |_|  \__,_||___/|___/
                           


AI-SoC 자율주행 교육과정
Sungkyunkwan University Automation Lab.

------------------Authors------------------
Gyuhyeon Hwang <rbgus7080@g.skku.edu>
Hobin Oh <hobin0676@daum.net>
Minkwan Choi <arbong97@naver.com>
Hyeonjin Sim <nufxwms@naver.com>
HyeongKeun Hong <whaihong@g.skku.edu>
EunHo K

## 모터 및 초음파 센서 초기설정

In [2]:
# Initialize SPI
spi0 = spidev.SpiDev()
spi0.open(0, 0)
spi0.max_speed_hz = 20000000
spi0.mode = 0b00

# 자율주행 모드 뒷바퀴 & 조향 속도 설정 (0 ~ 100)
speed = 5  # 안전한 속도로 설정
steering_speed = 50
motors = {}
parking_mode = False

# 주행 시스템 모터 초기화
for name, addr in MOTOR_ADDRESSES.items():
    motors[name] = MMIO(addr, ADDRESS_RANGE)

# 주차 시스템 초음파 센서 초기화
ultrasonic_sensors = {}
for name, addr in ULTRASONIC_ADDRESSES.items():
    ultrasonic_sensors[name] = MMIO(addr, ADDRESS_RANGE)

print("System initialization completed")

System initialization completed


## YOLOv3 Utility Functions

In [3]:
anchor_list = [10, 14, 23, 27, 37, 58, 81, 82, 135, 169, 344, 319]
anchors = np.array(anchor_list).reshape(-1, 2)

# 클래스 정보 가져오기 함수
def get_class(classes_path):
    with open(classes_path) as f:
        class_names = f.readlines()
    class_names = [c.strip() for c in class_names]
    return class_names
    
classes_path = "../xmodel/lane_class.txt"
class_names = get_class(classes_path)

num_classes = len(class_names)
hsv_tuples = [(1.0 * x / num_classes, 1., 1.) for x in range(num_classes)]
colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples))
colors = list(map(lambda x: 
                  (int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)), 
                  colors))
random.seed(0)
random.shuffle(colors)
random.seed(None)

# 이미지 비율을 유지하며 padding하여 리사이즈하는 함수
def letterbox_image(image, size):
    ih, iw, _ = image.shape
    w, h = size
    scale = min(w/iw, h/ih)
    nw = int(iw*scale)
    nh = int(ih*scale)
    image = cv2.resize(image, (nw, nh), interpolation=cv2.INTER_LINEAR)
    new_image = np.ones((h, w, 3), np.uint8) * 128
    h_start = (h-nh)//2
    w_start = (w-nw)//2
    new_image[h_start:h_start+nh, w_start:w_start+nw, :] = image
    return new_image

# 이미지 전처리 함수
def pre_process(image, model_image_size):
    image = image[..., ::-1]
    image_h, image_w, _ = image.shape
    if model_image_size != (None, None):
        assert model_image_size[0] % 32 == 0, 'Multiples of 32 required'
        assert model_image_size[1] % 32 == 0, 'Multiples of 32 required'
        boxed_image = letterbox_image(image, tuple(reversed(model_image_size)))
    else:
        new_image_size = (image_w - (image_w % 32), image_h - (image_h % 32))
        boxed_image = letterbox_image(image, new_image_size)
    image_data = np.array(boxed_image, dtype='float32')
    image_data /= 255.
    image_data = np.expand_dims(image_data, 0) 	
    return image_data

def _get_feats(feats, anchors, num_classes, input_shape):
    num_anchors = len(anchors)
    anchors_tensor = np.reshape(np.array(anchors, dtype=np.float32), [1, 1, 1, num_anchors, 2])
    grid_size = np.shape(feats)[1:3]
    nu = num_classes + 5
    predictions = np.reshape(feats, [-1, grid_size[0], grid_size[1], num_anchors, nu])
    grid_y = np.tile(np.reshape(np.arange(grid_size[0]), [-1, 1, 1, 1]), [1, grid_size[1], 1, 1])
    grid_x = np.tile(np.reshape(np.arange(grid_size[1]), [1, -1, 1, 1]), [grid_size[0], 1, 1, 1])
    grid = np.concatenate([grid_x, grid_y], axis=-1)
    grid = np.array(grid, dtype=np.float32)

    # Confidence score 계산 확인
    box_xy = (1 / (1 + np.exp(-predictions[..., :2])) + grid) / np.array(grid_size[::-1], dtype=np.float32)
    box_wh = np.exp(predictions[..., 2:4]) * anchors_tensor / np.array(input_shape[::-1], dtype=np.float32)
    box_confidence = 1 / (1 + np.exp(-predictions[..., 4:5]))
    box_class_probs = 1 / (1 + np.exp(-predictions[..., 5:]))

    return box_xy, box_wh, box_confidence, box_class_probs

def correct_boxes(box_xy, box_wh, input_shape, image_shape):
    box_yx = box_xy[..., ::-1]
    box_hw = box_wh[..., ::-1]
    input_shape = np.array(input_shape, dtype = np.float32)
    image_shape = np.array(image_shape, dtype = np.float32)
    new_shape = np.around(image_shape * np.min(input_shape / image_shape))
    offset = (input_shape - new_shape) / 2. / input_shape
    scale = input_shape / new_shape
    box_yx = (box_yx - offset) * scale
    box_hw *= scale

    box_mins = box_yx - (box_hw / 2.)
    box_maxes = box_yx + (box_hw / 2.)
    boxes = np.concatenate([
        box_mins[..., 0:1],
        box_mins[..., 1:2],
        box_maxes[..., 0:1],
        box_maxes[..., 1:2]
    ], axis = -1)
    boxes *= np.concatenate([image_shape, image_shape], axis = -1)
    return boxes

def boxes_and_scores(feats, anchors, classes_num, input_shape, image_shape):
    box_xy, box_wh, box_confidence, box_class_probs = _get_feats(feats, anchors, classes_num, input_shape)
    boxes = correct_boxes(box_xy, box_wh, input_shape, image_shape)
    boxes = np.reshape(boxes, [-1, 4])
    box_scores = box_confidence * box_class_probs
    box_scores = np.reshape(box_scores, [-1, classes_num])
    return boxes, box_scores

def nms_boxes(boxes, scores):
    """Suppress non-maximal boxes.

    # Arguments
        boxes: ndarray, boxes of objects.
        scores: ndarray, scores of objects.

    # Returns
        keep: ndarray, index of effective boxes.
    """
    x1 = boxes[:, 0]
    y1 = boxes[:, 1]
    x2 = boxes[:, 2]
    y2 = boxes[:, 3]

    areas = (x2-x1+1)*(y2-y1+1)
    order = scores.argsort()[::-1]

    keep = []
    while order.size > 0:
        i = order[0]
        keep.append(i)

        xx1 = np.maximum(x1[i], x1[order[1:]])
        yy1 = np.maximum(y1[i], y1[order[1:]])
        xx2 = np.minimum(x2[i], x2[order[1:]])
        yy2 = np.minimum(y2[i], y2[order[1:]])

        w1 = np.maximum(0.0, xx2 - xx1 + 1)
        h1 = np.maximum(0.0, yy2 - yy1 + 1)
        inter = w1 * h1

        ovr = inter / (areas[i] + areas[order[1:]] - inter)
        inds = np.where(ovr <= 0.1)[0]  # threshold
        order = order[inds + 1]

    return keep

# 모델 평가 함수: YOLOv3-Tiny의 2-출력 레이어 구조에 맞게 anchor_mask 설정
def evaluate(yolo_outputs, image_shape, class_names, anchors, max_boxes=20):
    score_thresh = 0.3
    anchor_mask = [[3, 4, 5], [0, 1, 2]]  # YOLOv3-Tiny에 맞춘 2개의 출력 레이어용 anchor mask
    boxes = []
    box_scores = []
    input_shape = np.shape(yolo_outputs[0])[1:3] * np.array([32, 32])

    for i in range(len(yolo_outputs)):
        _boxes, _box_scores = boxes_and_scores(
            yolo_outputs[i], anchors[anchor_mask[i]], len(class_names), 
            input_shape, image_shape)
        boxes.append(_boxes)
        box_scores.append(_box_scores)
    boxes = np.concatenate(boxes, axis=0)
    box_scores = np.concatenate(box_scores, axis=0)

    mask = box_scores >= score_thresh
    boxes_ = []
    scores_ = []
    classes_ = []
    for c in range(len(class_names)):
        class_boxes_np = boxes[mask[:, c]]
        class_box_scores_np = box_scores[:, c][mask[:, c]]
        
        # Non-Max Suppression with max_boxes limit
        nms_index_np = nms_boxes(class_boxes_np, class_box_scores_np)
        nms_index_np = nms_index_np[:max_boxes]  # Limit to max_boxes
        
        class_boxes_np = class_boxes_np[nms_index_np]
        class_box_scores_np = class_box_scores_np[nms_index_np]
        classes_np = np.ones_like(class_box_scores_np, dtype=np.int32) * c
        boxes_.append(class_boxes_np)
        scores_.append(class_box_scores_np)
        classes_.append(classes_np)
        
    boxes_ = np.concatenate(boxes_, axis=0)
    scores_ = np.concatenate(scores_, axis=0)
    classes_ = np.concatenate(classes_, axis=0)

    # confidence scores 출력
    print("Confidence scores for detected boxes:", scores_)
    
    return boxes_, scores_, classes_

## DPU Setting

In [4]:
dpu = overlay.runner
inputTensors = dpu.get_input_tensors()
outputTensors = dpu.get_output_tensors()
shapeIn = tuple(inputTensors[0].dims)
shapeOut0 = (tuple(outputTensors[0].dims)) # (1, 13, 13, 75)
shapeOut1 = (tuple(outputTensors[1].dims)) # (1, 26, 26, 75)
outputSize0 = int(outputTensors[0].get_data_size() / shapeIn[0]) # 12675
outputSize1 = int(outputTensors[1].get_data_size() / shapeIn[0]) # 50700
input_data = [np.empty(shapeIn, dtype=np.float32, order="C")]
output_data = [np.empty(shapeOut0, dtype=np.float32, order="C"), 
               np.empty(shapeOut1, dtype=np.float32, order="C")]
image = input_data[0]

## Image Processor

In [5]:
import cv2
import numpy as np
import math
import os
import colorsys
import random
from PIL import Image
import time
from collections import deque
# from yolo_utils import pre_process, evaluate
# from config import KANAYAMA_CONFIG, HISTORY_CONFIG

def slide_window_in_roi(binary, box, n_win=15, margin=30, minpix=10):
    """
    debugging/visualize.py의 slide_window_search_roi와 동일하게 슬라이딩 윈도우 적용
    화면 하단 10%만 사용
    
    
    Args:
        binary: 2D np.array (전체 BEV 이진화 이미지)
        box: (y1, x1, y2, x2) – 전체 이미지 좌표계
        n_win: 윈도우 개수 (15)
        margin: 윈도우 마진 (30)
        minpix: 최소 픽셀 수 (10)
    Returns:
        fit: (slope, intercept), lane_pts: (x, y) 리스트
    """
    y1, x1, y2, x2 = box
    roi = binary[int(y1):int(y2), int(x1):int(x2)]
    if roi.size == 0:
        return None, None

    # 화면 하단 10%만 사용하기 위한 y 좌표 필터링
    roi_height = roi.shape[0]
    use_bottom_10_percent = int(roi_height * 0.8)  # 하단 10% 시작점
    
    window_height = roi.shape[0] // n_win
    nonzero = roi.nonzero()
    nonzero_y, nonzero_x = nonzero
    
    # 하단 10%의 픽셀들만 사용
    valid_mask = nonzero_y >= use_bottom_10_percent
    nonzero_y = nonzero_y[valid_mask]
    nonzero_x = nonzero_x[valid_mask]
    
    left_inds = []

    # ROI 내부에서 히스토그램으로 초기 좌표 찾기 (하단 10%만 사용)
    hist = np.sum(roi[use_bottom_10_percent:,:], axis=0)
    if np.max(hist) > 0:
        current_x = np.argmax(hist)
    else:
        current_x = roi.shape[1] // 2

    for w in range(n_win):
        y_low = roi.shape[0] - (w+1)*window_height
        y_high = roi.shape[0] - w*window_height
        
        # 하단 10% 영역만 사용
        if y_high <= use_bottom_10_percent:
            continue
            
        # y_low도 하단 10% 영역 내에 있도록 조정
        y_low = max(y_low, use_bottom_10_percent)
        
        x_low = max(0, current_x-margin)
        x_high = min(roi.shape[1], current_x+margin)

        good_inds = ((nonzero_y>=y_low)&(nonzero_y<y_high)&(nonzero_x>=x_low)&(nonzero_x<x_high)).nonzero()[0]
        if len(good_inds) > minpix:
            current_x = int(nonzero_x[good_inds].mean())
            left_inds.append(good_inds)

    if left_inds:
        left_inds = np.concatenate(left_inds)
        leftx, lefty = nonzero_x[left_inds], nonzero_y[left_inds]
        # ROI 좌표를 전체 이미지 좌표로 변환
        leftx_global = leftx + int(x1)
        lefty_global = lefty + int(y1)
        if len(leftx_global) >= 2:
            left_fit = np.polyfit(lefty_global, leftx_global, 1)
            return left_fit, (leftx_global, lefty_global)
    return None, None

class LaneInfo:
    """차선 정보를 저장하는 클래스"""
    def __init__(self, w=256):
        self.left_x = w // 2  # 왼쪽 차선 x좌표 (기본값: 영상 중앙)
        self.right_x = w // 2  # 오른쪽 차선 x좌표 (기본값: 영상 중앙)
        self.left_slope = 0.0  # 왼쪽 차선 기울기
        self.left_intercept = 0.0  # 왼쪽 차선 y절편
        self.right_slope = 0.0  # 오른쪽 차선 기울기
        self.right_intercept = 0.0  # 오른쪽 차선 y절편
        # fitLine 파라미터 추가
        self.left_params = None   # (vx, vy, x0, y0)
        self.right_params = None  # (vx, vy, x0, y0)
        self.left_points = None  # 슬라이딩 윈도우 결과 저장용
        self.right_points = None

class ImageProcessor:
    def __init__(self, dpu, classes_path, anchors, parking_mode=False):
        # 클래스 변수로 저장
        self.dpu = dpu
            
        self.classes_path = classes_path
        self.anchors = anchors
        self.class_names = self.load_classes(classes_path)    
        
        self.reference_point_x = 200
        self.reference_point_y = 240
        self.point_detection_height = 20
        
        # 주차모드 플래그 추가
        self.parking_mode = parking_mode
        
        # DPU 초기화 상태 추적 플래그
        self.initialized = False
        self.init_dpu()
        
        # Kanayama 제어기 파라미터 (config에서 불러오기)
        self.K_y = KANAYAMA_CONFIG['K_y']
        self.K_phi = KANAYAMA_CONFIG['K_phi']
        self.L = KANAYAMA_CONFIG['L']
        self.lane_width = KANAYAMA_CONFIG['lane_width']
        self.v_r = KANAYAMA_CONFIG['v_r']
        
        # 조향각 히스토리 관리 (config에서 불러오기)
        self.steering_history = deque(maxlen=HISTORY_CONFIG['max_history_size'])
        self.no_lane_detection_count = 0
        self.max_no_lane_frames = HISTORY_CONFIG['max_no_lane_frames']
        self.default_steering_angle = HISTORY_CONFIG['default_steering_angle']
        self.avg_window_size = HISTORY_CONFIG['avg_window_size']
        self.smoothing_factor = HISTORY_CONFIG['smoothing_factor']
    
    def _log(self, msg):
        if not self.parking_mode:
            print(msg)
    
    def load_classes(self, classes_path):
        """Load class names from the given path"""
        with open(classes_path, 'r') as f:
            class_names = f.read().strip().split('\n')
        return class_names 
    
    def init_dpu(self):
        """DPU 초기화 - 한 번만 실행됨"""
        if self.initialized:
            self._log("DPU 이미 초기화됨")
            return  # 이미 초기화되었으면 바로 리턴

        self._log("DPU 초기화 중...")
        inputTensors = self.dpu.get_input_tensors()
        outputTensors = self.dpu.get_output_tensors()

        self.shapeIn = tuple(inputTensors[0].dims)
        self.shapeOut0 = tuple(outputTensors[0].dims)
        self.shapeOut1 = tuple(outputTensors[1].dims)

        outputSize0 = int(outputTensors[0].get_data_size() / self.shapeIn[0])
        outputSize1 = int(outputTensors[1].get_data_size() / self.shapeIn[0])

        self.input_data = [np.empty(self.shapeIn, dtype=np.float32, order="C")]
        self.output_data = [
            np.empty(self.shapeOut0, dtype=np.float32, order="C"),
            np.empty(self.shapeOut1, dtype=np.float32, order="C")
        ]

        # 초기화 완료 플래그 설정
        self.initialized = True
        self._log("DPU 초기화 완료")

    
    def roi_rectangle_below(self, img, cutting_idx):
        return img[cutting_idx:, :]

    def warpping(self, image, srcmat, dstmat):
        h, w = image.shape[0], image.shape[1]
        transform_matrix = cv2.getPerspectiveTransform(srcmat, dstmat)
        minv = cv2.getPerspectiveTransform(dstmat, srcmat)
        _image = cv2.warpPerspective(image, transform_matrix, (w, h))
        return _image, minv
    
    def bird_convert(self, img, srcmat, dstmat):
        srcmat = np.float32(srcmat)
        dstmat = np.float32(dstmat)
        img_warpped, _ = self.warpping(img, srcmat, dstmat)
        return img_warpped

    def calculate_angle(self, x1, y1, x2, y2):
        if x1 == x2:
            return 90.0
        slope = (y2 - y1) / (x2 - x1)
        return -math.degrees(math.atan(slope))

    def color_filter(self, image):
        """참고 코드와 동일한 흰색 픽셀 검출 방식"""
        # HSV 색공간으로 변환
        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        
        # 흰색 픽셀 검출 (참고 코드와 동일한 방식)
        lower = np.array([0, 255, 255])  # 흰색 필터
        upper = np.array([255, 255, 255])
        white_mask = cv2.inRange(hsv, lower, upper)
        masked = cv2.bitwise_and(image, image, mask=white_mask)
        
        return masked

    def generate_left_lane_from_right(self, right_x, right_slope, right_intercept, frame_width, lane_width_pixels=200):
        """
        오른쪽 차선을 기준으로 왼쪽 차선을 생성
        Args:
            right_x: 오른쪽 차선의 x 좌표
            right_slope: 오른쪽 차선의 기울기
            right_intercept: 오른쪽 차선의 y절편
            frame_width: 프레임 너비
            lane_width_pixels: 차선 간격 (픽셀)
        Returns:
            left_x, left_slope, left_intercept: 생성된 왼쪽 차선 정보
        """
        # 왼쪽 차선은 오른쪽 차선에서 일정 간격만큼 왼쪽에 위치
        left_x = right_x - lane_width_pixels
        
        # 기울기는 오른쪽 차선과 동일 (평행한 차선)
        left_slope = right_slope
        
        # y절편도 동일한 간격만큼 조정
        left_intercept = right_intercept - lane_width_pixels
        
        # 프레임 범위 내로 제한
        left_x = max(0, min(left_x, frame_width - 1))
        
        return left_x, left_slope, left_intercept

    def extract_lane_info_improved(self, xyxy_results, classes_results, processed_img):
        """여러 바운딩 박스의 픽셀들을 합쳐서 하나의 직선 생성 (driving_visualize.py와 동일)"""
        h, w = processed_img.shape[:2]
        lane_info = LaneInfo(w)
        
        if len(xyxy_results) == 0:
            return lane_info
            
        # 왼쪽 차선과 오른쪽 차선을 위한 픽셀 저장 리스트
        left_lane_pixels_x = []
        left_lane_pixels_y = []
        right_lane_pixels_x = []
        right_lane_pixels_y = []
        
        for i, box in enumerate(xyxy_results):
            y1, x1, y2, x2 = [int(v) for v in box]
            
            # 바운딩 박스 좌표가 이미지 범위를 벗어나는 경우 처리
            y1 = max(0, min(y1, processed_img.shape[0] - 1))
            x1 = max(0, min(x1, processed_img.shape[1] - 1))
            y2 = max(0, min(y2, processed_img.shape[0]))
            x2 = max(0, min(x2, processed_img.shape[1]))
            
            # 유효한 ROI 영역인지 확인
            if y1 >= y2 or x1 >= x2:
                continue
                
            roi = processed_img[y1:y2, x1:x2]
            
            # ROI가 비어있거나 유효하지 않은 경우 건너뛰기
            if roi is None or roi.size == 0:
                continue
                
            # ROI 크기가 너무 작은 경우도 건너뛰기 (최소 5x5 픽셀)
            if roi.shape[0] < 5 or roi.shape[1] < 5:
                continue
                
            # === driving_visualize.py의 process_roi와 동일한 전처리 파이프라인 ===
            # 1. 블러
            blurred = cv2.GaussianBlur(roi, (5,5), 1)
            
            # 2. 추천 전처리 파이프라인 (CLAHE 제외)
            # HLS 색공간 변환
            hls = cv2.cvtColor(blurred, cv2.COLOR_BGR2HLS)
            L = hls[:,:,1]  # Lightness
            S = hls[:,:,2]  # Saturation

            # 2-1. Adaptive thresholding on L (CLAHE 제외)
            binary_L = cv2.adaptiveThreshold(L, 255,
                cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY,
                blockSize=65, C=-10)

            # 2-2. HSV 색상 필터링 조합
            hsv = cv2.cvtColor(blurred, cv2.COLOR_BGR2HSV)
            mask_hsv = (hsv[:,:,2] > 10) & (hsv[:,:,1] < 180)  # V > 10, S < 180

            # 2-3. 최종 마스크 결합
            final_mask = binary_L & mask_hsv

            # 전체 이미지 크기의 이진화 마스크 생성
            full_binary_mask = np.zeros((h, w), dtype=np.uint8)
            full_binary_mask[y1:y2, x1:x2] = final_mask.astype(np.uint8) * 255

            # 슬라이딩 윈도우 적용 (전체 이미지 마스크 사용)
            fit, pts = slide_window_in_roi(full_binary_mask, (y1, x1, y2, x2), n_win=15, margin=30, minpix=10)
            
            if fit is not None and pts is not None:
                slope, intercept = fit
                xs, ys = pts
                
                # 클래스 기반 좌우 분류
                class_id = classes_results[i] if i < len(classes_results) else 0
                class_name = self.class_names[class_id] if class_id < len(self.class_names) else ""
                
                if "left" in class_name.lower():
                    # 왼쪽 차선 픽셀 추가
                    left_lane_pixels_x.extend(xs)
                    left_lane_pixels_y.extend(ys)
                elif "right" in class_name.lower():
                    # 오른쪽 차선 픽셀 추가
                    right_lane_pixels_x.extend(xs)
                    right_lane_pixels_y.extend(ys)
                else:
                    # 클래스가 명확하지 않은 경우 이미지 중앙 기준으로 분류
                    image_center_x = w / 2
                    x_bottom = slope * (h - 1) + intercept
                    if x_bottom < image_center_x:
                        left_lane_pixels_x.extend(xs)
                        left_lane_pixels_y.extend(ys)
                    else:
                        right_lane_pixels_x.extend(xs)
                        right_lane_pixels_y.extend(ys)
        
        # 왼쪽 차선 픽셀들로 직선 피팅
        if len(left_lane_pixels_x) > 10:  # 충분한 점이 있을 때만
            left_x = np.array(left_lane_pixels_x)
            left_y = np.array(left_lane_pixels_y)
            
            # 화면 하단 10%의 픽셀들만 사용
            h = processed_img.shape[0]
            use_bottom_10_percent = int(h * 0.8)
            valid_mask = left_y >= use_bottom_10_percent
            left_x = left_x[valid_mask]
            left_y = left_y[valid_mask]
            
            # 충분한 점이 남아있는지 확인
            if len(left_x) > 5:
                # 전체 픽셀들로 직선 피팅 (y = mx + b 형태로)
                left_fit = np.polyfit(left_y, left_x, 1)
                
                # 프레임 중심에서의 x 좌표 계산
                center_y = h // 2
                lane_info.left_x = left_fit[0] * center_y + left_fit[1]
                lane_info.left_slope = left_fit[0]
                lane_info.left_intercept = left_fit[1]
        
        # 오른쪽 차선 픽셀들로 직선 피팅
        if len(right_lane_pixels_x) > 10:  # 충분한 점이 있을 때만
            right_x = np.array(right_lane_pixels_x)
            right_y = np.array(right_lane_pixels_y)
            
            # 화면 하단 10%의 픽셀들만 사용
            h = processed_img.shape[0]
            use_bottom_10_percent = int(h * 0.8)
            valid_mask = right_y >= use_bottom_10_percent
            right_x = right_x[valid_mask]
            right_y = right_y[valid_mask]
            
            # 충분한 점이 남아있는지 확인
            if len(right_x) > 5:
                # 전체 픽셀들로 직선 피팅 (y = mx + b 형태로)
                right_fit = np.polyfit(right_y, right_x, 1)
                
                # 프레임 중심에서의 x 좌표 계산
                center_y = h // 2
                lane_info.right_x = right_fit[0] * center_y + right_fit[1]
                lane_info.right_slope = right_fit[0]
                lane_info.right_intercept = right_fit[1]
        
        return lane_info

    
    def kanayama_control(self, lane_info):
        """debugging/visualize.py와 동일한 Kanayama 제어기"""
        # 이미지 크기 (256x256으로 리사이즈됨)
        frame_width = 256
        
        # 1) 데이터 없으면 그대로
        if lane_info.left_x == frame_width // 2 and lane_info.right_x == frame_width // 2:
            self._log("차선을 찾을 수 없습니다.")
            # 히스토리에서 평균값 사용
            if len(self.steering_history) > 0:
                avg_steering = self.get_average_steering()
                self._log(f"히스토리 평균 조향각 사용: {avg_steering:.2f}°")
                return avg_steering, self.v_r
            else:
                return self.default_steering_angle, self.v_r
        
        lane_width_m = 0.9  # debugging/visualize.py와 동일한 차로 폭
        Fix_Speed = self.v_r
        
        # 2) 픽셀 단위 차로 폭 & 픽셀당 미터 변환 계수
        lane_pixel_width = lane_info.right_x - lane_info.left_x
        if lane_pixel_width > 0:
            pix2m = lane_pixel_width / lane_width_m
        else:
            pix2m = frame_width / lane_width_m  # fallback

        # 3) 횡방향 오차: 차량 중앙(pixel) - 차로 중앙(pixel) → m 단위
        image_cx = frame_width / 2.0
        lane_cx = (lane_info.left_x + lane_info.right_x) / 2.0
        lateral_err = (lane_cx - image_cx) / pix2m

        # 4) 헤딩 오차 (차선 기울기 평균)
        heading_err = -0.1 * (lane_info.left_slope + lane_info.right_slope)

        # 5) Kanayama 제어식 (debugging/visualize.py와 동일한 파라미터)
        K_y, K_phi, L = 0.3, 0.9, 0.5
        v_r = Fix_Speed
        v = v_r * (math.cos(heading_err))**2
        w = v_r * (K_y * lateral_err + K_phi * math.sin(heading_err))
        delta = math.atan2(w * L, v)

        # 6) 픽셀→도 단위 보정 (k_p) - debugging/visualize.py와 동일
        steering = math.degrees(delta) * (Fix_Speed/25)
        steering = max(min(steering, 30.0), -30.0)
        
        # 디버깅 정보 출력
        self._log(f"Lateral error: {lateral_err:.3f}m, Heading error: {math.degrees(heading_err):.1f}°")
        self._log(f"Lane center: {lane_cx:.1f}, Image center: {image_cx:.1f}")
        self._log(f"Steering: {steering:.2f}°, Speed: {v:.1f}")
        self._log("")  # 빈 줄 추가
        
        return steering, v

    def add_steering_to_history(self, steering_angle):
        """조향각을 히스토리에 추가"""
        self.steering_history.append(steering_angle)
        
    def get_average_steering(self, num_frames=None):
        """최근 N개 조향각의 평균 계산"""
        if len(self.steering_history) == 0:
            return self.default_steering_angle
        
        # 기본값 사용
        if num_frames is None:
            num_frames = self.avg_window_size
        
        # 최근 N개 값만 사용
        recent_values = list(self.steering_history)[-min(num_frames, len(self.steering_history)):]
        
        # 평균 계산
        average_steering = sum(recent_values) / len(recent_values)
        
        return average_steering
        
    def get_smoothed_steering(self, current_steering_angle):
        """스무딩을 적용한 조향각 계산"""
        if len(self.steering_history) == 0:
            return current_steering_angle
        
        # 이전 값과 현재 값을 가중 평균
        previous_angle = self.steering_history[-1]
        smoothed_angle = (self.smoothing_factor * previous_angle + 
                         (1 - self.smoothing_factor) * current_steering_angle)
        
        return smoothed_angle
        
    def should_use_history(self, lane_info):
        """히스토리 사용 여부 결정"""
        # 이미지 크기 (256x256으로 리사이즈됨)
        frame_width = 256
        
        # 양쪽 차선이 모두 보이지 않는 경우
        if lane_info.left_x == frame_width // 2 and lane_info.right_x == frame_width // 2:
            self.no_lane_detection_count += 1
            return True
        else:
            # 차선이 보이면 카운터 리셋
            self.no_lane_detection_count = 0
            return False
            
    def get_robust_steering_angle(self, lane_info, current_steering_angle):
        """강건한 조향각 계산 (히스토리 + 스무딩 적용)"""
        if self.should_use_history(lane_info):
            if self.no_lane_detection_count <= self.max_no_lane_frames:
                # 이전 값들의 평균 사용
                robust_angle = self.get_average_steering()
                self._log(f"차선 미검출: 이전 {min(self.avg_window_size, len(self.steering_history))}개 값 평균 사용 ({robust_angle:.2f}°)")
                return robust_angle
            else:
                # 너무 오래 차선을 못 찾으면 기본값 사용
                self._log(f"차선 미검출: 기본값 사용 ({self.default_steering_angle:.2f}°)")
                return self.default_steering_angle
        else:
            # 차선이 보이면 스무딩 적용
            smoothed_angle = self.get_smoothed_steering(current_steering_angle)
            
            # 히스토리에 추가 (스무딩된 값)
            self.add_steering_to_history(smoothed_angle)
            
            # 스무딩이 적용되었는지 출력
            if len(self.steering_history) > 1:
                self._log(f"스무딩 적용: {current_steering_angle:.2f}° → {smoothed_angle:.2f}°")
            
            return smoothed_angle

    def process_frame(self, img):
        """프레임 처리 및 조향각 계산"""
        h, w = img.shape[0], img.shape[1]
        dst_mat = [[round(w * 0.3), 0], [round(w * 0.7), 0], 
                  [round(w * 0.7), h], [round(w * 0.3), h]]
        src_mat = [[238, 276], [382, 276], [450, 436], [200, 436]]
        
        # BEV 변환
        bird_img = self.bird_convert(img, srcmat=src_mat, dstmat=dst_mat)
        
        # 원본 BEV 영상 저장 (바운딩 박스 그리기 전)
        original_bev = bird_img.copy()
        
        img = cv2.resize(bird_img, (256, 256))
        image_size = img.shape[:2]
        image_data = np.array(pre_process(img, (256, 256)), dtype=np.float32)
        
        start_time = time.time()  # 추론 시작시간

        # self를 사용하여 클래스 변수에 접근
        self.input_data[0][...] = image_data.reshape(self.shapeIn[1:])
        job_id = self.dpu.execute_async(self.input_data, self.output_data)
        self.dpu.wait(job_id)
        end_time = time.time()   # 추론 종료시간
        
        # DPU output
        conv_out0 = np.reshape(self.output_data[0], self.shapeOut0)
        conv_out1 = np.reshape(self.output_data[1], self.shapeOut1)
        yolo_outputs = [conv_out0, conv_out1]

        # YOLO 디코딩 (boxes, scores, classes)
        boxes, scores, classes = evaluate(yolo_outputs, image_size, self.class_names, self.anchors)

        # 바운딩 박스 시각화 (디버깅용)
        for i, box in enumerate(boxes):
            top_left = (int(box[1]), int(box[0]))
            bottom_right = (int(box[3]), int(box[2]))
            cv2.rectangle(img, top_left, bottom_right, (0, 255, 0), 2)

        # === 바운딩 박스 기반 차선(직선) 방정식 시각화 추가 ===
        lane_info = self.extract_lane_info_improved(boxes, classes, img)
        h, w = img.shape[:2]
        
        # 왼쪽 차선이 검출되지 않았을 때 오른쪽 차선을 기준으로 왼쪽 차선 생성
        if lane_info.left_x == w // 2 and lane_info.right_x != w // 2:
            left_x, left_slope, left_intercept = self.generate_left_lane_from_right(
                lane_info.right_x, lane_info.right_slope, lane_info.right_intercept, w, lane_width_pixels=200
            )
            lane_info.left_x = left_x
            lane_info.left_slope = left_slope
            lane_info.left_intercept = left_intercept
            self._log(f"왼쪽 차선 미검출, 오른쪽 차선 기준으로 생성: Left X={lane_info.left_x:.1f}, Slope={lane_info.left_slope:.3f}")
        
        # 오른쪽 차선이 검출되지 않았을 때 왼쪽 차선을 기준으로 오른쪽 차선 생성
        elif lane_info.right_x == w // 2 and lane_info.left_x != w // 2:
            # 왼쪽 차선을 기준으로 오른쪽 차선 생성 (180픽셀 오른쪽)
            right_x = lane_info.left_x + 200
            right_slope = lane_info.left_slope  # 기울기는 동일
            right_intercept = lane_info.left_intercept + 200  # y절편도 동일한 간격만큼 조정
            
            # 프레임 범위 내로 제한
            right_x = max(0, min(right_x, w - 1))
            
            lane_info.right_x = right_x
            lane_info.right_slope = right_slope
            lane_info.right_intercept = right_intercept
            self._log(f"오른쪽 차선 미검출, 왼쪽 차선 기준으로 생성: Right X={lane_info.right_x:.1f}, Slope={lane_info.right_slope:.3f}")
        
        # 왼쪽 차선 직선 그리기 (slope/intercept 방식)
        if lane_info.left_slope != 0.0:
            try:
                y_bot, y_top = h, 100
                x_bot = lane_info.left_slope * y_bot + lane_info.left_intercept
                x_top = lane_info.left_slope * y_top + lane_info.left_intercept
                cv2.line(img,
                         (int(round(x_bot)), y_bot),
                         (int(round(x_top)), y_top),
                         (255, 0, 0), 2)  # 파란색
            except:
                pass
                
        # 오른쪽 차선 직선 그리기 (slope/intercept 방식)
        if lane_info.right_slope != 0.0:
            try:
                y_bot, y_top = h, 100
                x_bot = lane_info.right_slope * y_bot + lane_info.right_intercept
                x_top = lane_info.right_slope * y_top + lane_info.right_intercept
                cv2.line(img,
                         (int(round(x_bot)), y_bot),
                         (int(round(x_top)), y_top),
                         (0, 0, 255), 2)  # 빨간색
            except:
                pass
        # 기울기 등 텍스트로 표시
        cv2.putText(img, f"Left slope: {lane_info.left_slope:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)
        cv2.putText(img, f"Right slope: {lane_info.right_slope:.2f}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
        # === 끝 ===

        # Kanayama 제어기 사용
        # 기본 조향각과 속도 계산
        base_steering_angle, calculated_speed = self.kanayama_control(lane_info)
        
        # 강건한 조향각 계산 (히스토리 적용)
        steering_angle = self.get_robust_steering_angle(lane_info, base_steering_angle)
        
        # 차선 정보 디버깅 출력
        self._log(f"Left: x={lane_info.left_x:.1f}, slope={lane_info.left_slope:.3f}")
        self._log(f"Right: x={lane_info.right_x:.1f}, slope={lane_info.right_slope:.3f}")
        self._log(f"Base steering: {base_steering_angle:.2f}°, Final: {steering_angle:.2f}°")
        self._log(f"Calculated speed: {calculated_speed:.1f} m/s")
        self._log(f"History size: {len(self.steering_history)}, No lane count: {self.no_lane_detection_count}")
        self._log("-" * 50)  # 구분선 추가
        
        # === 최종 주행각도 영상에 표시 ===
        cv2.putText(img, f"Steering Angle: {steering_angle:.2f}°", (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 128, 255), 2)
        cv2.putText(img, f"Speed: {calculated_speed:.1f} m/s", (10, 130), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
        # === 끝 ===

        # 바운딩 박스 정보를 BEV 좌표계로 변환 (이미 BEV 좌표계에 있음)
        bev_boxes = []
        if len(boxes) > 0:
            # 이미 BEV 좌표계에서 예측된 박스이므로 단순히 ROI 기준으로 좌표 조정
            for i, box in enumerate(boxes):
                x1, y1, x2, y2 = box[1], box[0], box[3], box[2]
                
                # 256x256 리사이즈된 이미지에서 원본 BEV ROI 크기로 스케일 조정
                # 원본 BEV ROI 크기: (w, h-300) = (w, h-300)
                # 리사이즈된 크기: (256, 256)
                scale_x = original_bev.shape[1] / 256.0
                scale_y = original_bev.shape[0] / 256.0
                
                # 스케일 조정된 좌표
                bev_x1 = x1 * scale_x
                bev_y1 = y1 * scale_y
                bev_x2 = x2 * scale_x
                bev_y2 = y2 * scale_y
                
                # ROI 영역 내에 있는 박스만 포함
                if bev_y2 > 0 and bev_y1 < original_bev.shape[0]:  # ROI 영역 내
                    bev_boxes.append({
                        'x1': int(bev_x1),
                        'y1': int(bev_y1),
                        'x2': int(bev_x2),
                        'y2': int(bev_y2),
                        'class': classes[i] if i < len(classes) else 'unknown',
                        'score': scores[i] if i < len(scores) else 0.0
                    })

        # 결과를 딕셔너리로 반환
        return {
            'steering_angle': steering_angle,
            'speed': calculated_speed,
            'original': img,  # 기존 처리된 이미지
            'bev': original_bev,  # BEV 변환된 원본 영상
            'bev_boxes': bev_boxes,  # BEV 좌표계의 바운딩 박스
            'lane_info': lane_info,  # 차선 정보
            'processing_time': end_time - start_time
        }


## Driving Functions

In [6]:
class DrivingSystemController:
    def __init__(self, dpu_overlay, dpu, motors, speed, steering_speed, parking_mode):
        """
        자율주행 차량 시스템 초기화
        Args:
            dpu_overlay: DPU 오버레이 객체
        """
        self.image_processor = ImageProcessor(dpu, classes_path, anchors, parking_mode)
        self.motor_controller = MotorController(motors)
        self.overlay = dpu_overlay
        
        # 제어 상태 변수
        self.is_running = False
        self.control_lock = Lock()
        self.control_mode = 1  # 1: Autonomous, 2: Manual
        
        self.speed = speed
        self.steering_speed = steering_speed
        self.parking_mode = parking_mode
        
        # 시스템 초기화
        self.init_system()
        
    def init_system(self):
        """시스템 초기화"""
        self.motor_controller.init_motors()

    def start_driving(self):
        """주행 시작"""
        with self.control_lock:
            self.is_running = True
            print("주행을 시작합니다.")
            if self.control_mode == 1:
                # 자율주행 모드 초기 설정
                self.motor_controller.left_speed = self.speed
                self.motor_controller.right_speed = self.speed
                self.motor_controller.steering_speed = self.steering_speed
            else:
                # 수동 주행 모드 초기 설정
                self.motor_controller.manual_speed = 0
                self.motor_controller.manual_steering_angle = 0

    def stop_driving(self):
        """주행 정지"""
        with self.control_lock:
            self.is_running = False
            print("주행을 정지합니다.")
            self.motor_controller.reset_motor_values()

    def switch_mode(self, new_mode):
        """
        주행 모드 전환
        Args:
            new_mode: 1(자율주행) 또는 2(수동주행)
        """
        if self.control_mode != new_mode:
            self.control_mode = new_mode
            self.is_running = False
            self.motor_controller.reset_motor_values()
            mode_str = "자율주행" if new_mode == 1 else "수동주행"
            print(f"{mode_str} 모드로 전환되었습니다.")
            print("Space 키를 눌러 주행을 시작하세요.")

    def process_and_control(self, frame):
        """
        프레임 처리 및 차량 제어
        Args:
            frame: 처리할 비디오 프레임
        Returns:
            처리된 이미지와 추가 정보
        """
        if self.control_mode == 1:  # Autonomous mode
            result = self.image_processor.process_frame(frame)
            
            # 새로운 반환값 구조에서 정보 추출
            steering_angle = result['steering_angle']
            calculated_speed = result['speed']
            processed_image = result['original']
            bev_image = result['bev']
            bev_boxes = result['bev_boxes']
            lane_info = result['lane_info']
            processing_time = result['processing_time']
            
            if self.is_running:
                # Kanayama에서 계산된 속도를 실제 모터에 적용
                # 계산된 속도를 모터 duty cycle로 변환 (0~30 m/s → 0~100%)
                # 30 m/s = 100%, 0 m/s = 0%로 선형 변환
                motor_speed_percent = (calculated_speed / 30.0) * 100.0
                motor_speed_percent = max(0, min(100, motor_speed_percent))  # 0~100 범위 제한
                
                # 모터에 적용
                self.motor_controller.left_speed = motor_speed_percent
                self.motor_controller.right_speed = motor_speed_percent
                
                print(f"Kanayama 속도 적용: {calculated_speed:.1f} m/s → {motor_speed_percent:.1f}%")
                
                # 조향 제어
                print(f"[CONTROLLER_DEBUG] 조향 제어 호출: steering_angle={steering_angle:.2f}°")
                self.motor_controller.control_motors(steering_angle, control_mode=1)
            
            # 시각화를 위한 정보 반환
            return {
                'processed_image': processed_image,
                'bev_image': bev_image,
                'bev_boxes': bev_boxes,
                'lane_info': lane_info,
                'steering_angle': steering_angle,
                'speed': calculated_speed,
                'processing_time': processing_time
            }
        else:  # Manual mode
            if self.is_running:
                self.motor_controller.handle_manual_control()
            return {
                'processed_image': frame,
                'bev_image': None,
                'bev_boxes': [],
                'lane_info': None,
                'steering_angle': 0.0,
                'speed': 0.0,
                'processing_time': 0.0
            }

    def wait_for_mode_selection(self):
        """시작 시 모드 선택 대기"""
        print("\n주행 모드를 선택하세요:")
        print("1: 자율주행 모드")
        print("2: 수동주행 모드")
        
        while True:
            if keyboard.is_pressed('1'):
                self.switch_mode(1)
                break
            elif keyboard.is_pressed('2'):
                self.switch_mode(2)
                break
            time.sleep(0.1)

    def run(self, video_path=None, camera_index=0):
        """
        메인 실행 함수
        Args:
            video_path: 비디오 파일 경로 (선택)
            camera_index: 카메라 인덱스 (기본값 0)
        """
        # 카메라 또는 비디오 초기화
        if video_path:
            cap = cv2.VideoCapture(video_path)
        else:
            cap = cv2.VideoCapture(camera_index)
            cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
            cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

        if not cap.isOpened():
            print("카메라를 열 수 없습니다.")
            return

        image_widget = widgets.Image(format='jpeg')
        
        # 시작 시 모드 선택
        self.wait_for_mode_selection()

        # 제어 안내 출력
        print("\n키보드 제어 안내:")
        print("Space: 주행 시작/정지")
        print("1/2: 자율주행/수동주행 모드 전환")
        print("Q: 프로그램 종료\n")

        try:
            while True:
                # 키보드 입력 처리
                if keyboard.is_pressed('space'):
                    time.sleep(0.3)  # 디바운싱
                    if self.is_running:
                        self.stop_driving()
                    else:
                        self.start_driving()
                
                elif keyboard.is_pressed('1') or keyboard.is_pressed('2'):
                    prev_mode = self.control_mode
                    new_mode = 1 if keyboard.is_pressed('1') else 2
                    if prev_mode != new_mode:
                        self.switch_mode(new_mode)
                    time.sleep(0.3)  # 디바운싱
                
                if keyboard.is_pressed('q'):
                    print("\n프로그램을 종료합니다.")
                    break

                # 프레임 처리
                ret, frame = cap.read()
                if not ret:
                    print("프레임을 읽을 수 없습니다.")
                    break

                # 이미지 처리 및 차량 제어
                processed_image = self.process_and_control(frame)
                # 처리된 이미지 표시 (노트북 상에서)
                ret_jpeg, jpeg = cv2.imencode('.jpeg', processed_image)
                if ret_jpeg:
                    image_widget.value = jpeg.tobytes()
                    display(image_widget)
                    clear_output(wait=True)
                else:
                    print("이미지 인코딩 실패")
                

        except KeyboardInterrupt:
            print("\n사용자에 의해 중지되었습니다.")
        finally:
            # 리소스 정리
            cap.release()
            cv2.destroyAllWindows()
            self.stop_driving()

print("done")

done


## Driving!

In [8]:
from threading import Lock

controller = DrivingSystemController(overlay, dpu, motors, speed, steering_speed, parking_mode)
# 카메라 im
controller.run(camera_index=0)

# 비디오 영상 in
controller.run(video_path="../test_video/test_video.mp4")

DPU 초기화 중...
DPU 초기화 완료

주행 모드를 선택하세요:
1: 자율주행 모드
2: 수동주행 모드

키보드 제어 안내:
Space: 주행 시작/정지
1/2: 자율주행/수동주행 모드 전환
Q: 프로그램 종료

Confidence scores for detected boxes: [0.7987887]
차선을 찾을 수 없습니다.
차선 미검출: 이전 0개 값 평균 사용 (0.00°)
Left: x=128.0, slope=0.000
Right: x=128.0, slope=0.000
Base steering: 0.00°, Final: 0.00°
Calculated speed: 23.0 m/s
History size: 0, No lane count: 1
--------------------------------------------------
주행을 정지합니다.
set_left_speed duty 0
set_right_speed duty 0
left duty 0


error: OpenCV(4.5.4) :-1: error: (-5:Bad argument) in function 'imencode'
> Overload resolution failed:
>  - img is not a numpy array, neither a scalar
>  - Expected Ptr<cv::UMat> for argument 'img'
