# **정차 차량 탐지 모델: 객체탐지(YOLOv8) + 객체추적(ByteTrack)**

### 기능: 유저가 직접 영상을 하나 업로드해서 정차 차량이 존재하는지 확인

### 추가해야할 것: UI, 종합적 사고 판단 로직, 실시간 스트리밍 판단


In [None]:
# YOLOv8
!pip install ultralytics

# ByteTrack 의존성
!pip install cython_bbox lap
!pip install git+https://github.com/ifzhang/ByteTrack.git
!pip install loguru
!pip install python-dotenv

Collecting ultralytics
  Downloading ultralytics-8.3.154-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.14-py3-none-any.whl.metadata (9.4 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.8.0->ultralytics)
  Downloading n

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
from ultralytics import YOLO
from yolox.tracker.byte_tracker import BYTETracker
from argparse import Namespace
import cv2
import numpy as np
from google.colab.patches import cv2_imshow
from google.colab import files
import math
import time

# 모델 로드
# model_path = "/content/drive/MyDrive/2025_1학기_캡스톤_12조/yolo_tmp_Streaming+Upload(only 1 class, vehicle-detection).pt"
model_path = "/content/drive/MyDrive/2025_1학기_캡스톤_12조/yolo_tmp_vehicle_detection(car, bus, truck).pt"
model = YOLO(model_path)

# ByteTracker 초기화
tracker_args = Namespace(
    track_thresh=0.25,
    match_thresh=0.8,
    track_buffer=30,
    aspect_ratio_thresh=1.6,
    min_box_area=10,
    mot20=False
)
tracker = BYTETracker(tracker_args, frame_rate=30)


Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [4]:
# 사고 판단 기준
STATIONARY_FRAME_THRESHOLD = 5  # 중심 좌표 위치 개수 문턱 값
POSITION_THRESHOLD = 5  # 위치 변화가 N px 이하이면 정지로 판단
ACCIDENT_FRAME_THRESHOLD = 5  # N번 연속 감지되면 사고로 판단
video_is_accident = False

# {track_id: [(x_center, y_center), ...]}  ← 프레임별 위치 저장
track_history = {}

# 사고로 감지된 차량 ID
accident_ids = set()

import numpy as np
np.float = float
np.int = int

In [None]:
import requests
import os
from dotenv import load_dotenv

load_dotnev()
my_api_key = os.getenv("ITS_API_KEY")

# API 요청 URL
api_url = "https://openapi.its.go.kr:9443/cctvInfo"
params = {
    "apiKey": my_api_key,
    "type": "ex",  # 고속도로
    "cctvType": "1",  # 실시간 스트리밍(HLS)
    "minX": "127.100000",
    "maxX": "128.890000",
    "minY": "34.100000",
    "maxY": "39.100000",
    "getType": "json"
}

# API 요청
response = requests.get(api_url, params=params)
data = response.json()

# CCTV URL 추출
cctv_list = data.get("response", {}).get("data", [])
for cctv in cctv_list:
    print(f"CCTV 이름: {cctv.get('cctvname')}")
    print(f"스트리밍 URL: {cctv.get('cctvurl')}\n")

CCTV 이름: [수도권제1순환선] 성남
스트리밍 URL: http://cctvsec.ktict.co.kr/2/WpkGTuzVZyZg7ZPihfeVcucNYv2MCduoKm8PyFub+jly3BoaFwKPxXvDMaGPWl1/gIiuKjolsFVYSm5UuJ4BZg==

CCTV 이름: [수도권제1순환선] 성남요금소
스트리밍 URL: http://cctvsec.ktict.co.kr/3/uEaMW6fqLQzhxp12IZdX/CyQH6manYyTLQbKV7bX17SZ/bjw+8EhoMeYi8MWvHA3TKypQcE53b9y4l7VVOO+YA==

CCTV 이름: [수도권제1순환선] 송파
스트리밍 URL: http://cctvsec.ktict.co.kr/4/MJXdVNudQ6VqRqEIJgDszilUIwtDWiwV4ygUY4mChkmMTxKleSWcUcpSTo2DU9WLp5Rj2gIk8m0cIzilZOa5WQ==

CCTV 이름: [수도권제1순환선] 서하남2
스트리밍 URL: http://cctvsec.ktict.co.kr/5/SlmPC1+Y/XRB4WIG9kreeWwjWftlUPyf7kfFxKBISnAut1fnJczXxagzB+ueNyqh7Z+CO/4Y8LOCgBf9ESqjYw==

CCTV 이름: [수도권제1순환선] 광암터널2
스트리밍 URL: http://cctvsec.ktict.co.kr/6/uTf4f3sJvMQyNhpqR4YpACKtIQMup0CC21HZhGjhgRc32ThjRqPqVUOlmQyD63mTCVn+mmclBzT1e7fPq+5AzA==

CCTV 이름: [수도권제1순환선] 광암터널3
스트리밍 URL: http://cctvsec.ktict.co.kr/7/nqm7CNfzr/LsFZpjfYLn2J0KLlg2ojkXyVEtFAIVZNSQeKjqyCf+UFbVcNh5HU/ewa3Y4TBJPaf15RdrjIRHUQ==

CCTV 이름: [수도권제1순환선] 하남분기점
스트리밍 URL: http://cctvsec.ktict.co.kr/8/H+RVm6tI2GfQ

In [6]:
def analyze_stream_from_url_2(stream_url, model, tracker_args,
                               target_classes=[0, 1, 2],
                               conf_threshold=0.3,
                               frame_resize=(720, 480),
                               max_analysis_frames=300,
                               stationary_frame_threshold=5,
                               position_threshold=2,
                               accident_frame_threshold=3,
                               frame_step=3):
    cap = cv2.VideoCapture(stream_url)
    if not cap.isOpened():
        return [], "❌ 스트림을 열 수 없습니다."

    tracker = BYTETracker(tracker_args, frame_rate=30)
    track_history = {}
    accident_ids = set()
    consecutive_accident_frames = 0
    output_frames = []
    processed_frame_count = 0  # 실제 분석한 프레임 수 기준

    while cap.isOpened() and processed_frame_count < max_analysis_frames:
        ret, frame = cap.read()
        if not ret:
            break

        resized_frame = cv2.resize(frame, frame_resize)
        results = model.predict(
            source=resized_frame,
            classes=target_classes,
            conf=conf_threshold,
            verbose=False
        )[0]

        detections = []
        for box in results.boxes:
            bbox = box.xyxy[0].cpu().numpy()
            score = float(box.conf[0])
            detections.append([*bbox, score])

        dets = np.array(detections) if detections else np.empty((0, 5))
        online_targets = tracker.update(dets, resized_frame.shape[:2], resized_frame.shape)

        for t in online_targets:
            track_id = t.track_id
            l, t_, r, b = map(int, t.tlbr)
            cx, cy = (l + r) // 2, (t_ + b) // 2

            if track_id not in track_history:
                track_history[track_id] = []
            track_history[track_id].append((cx, cy))

            history = track_history[track_id]
            if len(history) >= stationary_frame_threshold:
                movement = sum(
                    math.hypot(history[i][0] - history[i - 1][0],
                               history[i][1] - history[i - 1][1])
                    for i in range(-stationary_frame_threshold + 1, 0)
                )
                if movement < position_threshold:
                    accident_ids.add(track_id)

            color = (0, 0, 255) if track_id in accident_ids else (0, 255, 0)
            cv2.rectangle(resized_frame, (l, t_), (r, b), color, 2)
            cv2.putText(resized_frame, f'ID {track_id}', (l, t_ - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

        output_frames.append(cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB))
        processed_frame_count += 1

        if any(t.track_id in accident_ids for t in online_targets):
            consecutive_accident_frames += 1
        else:
            consecutive_accident_frames = 0

        if consecutive_accident_frames >= accident_frame_threshold:
            cap.release()
            return output_frames, "🚨 사고 상황 의심!"

        # 다음 프레임 건너뛰기 (frame_step 설정 반영)
        for _ in range(frame_step - 1):
            cap.grab()

    cap.release()
    return output_frames, "✅ 정상"


#**아래 셀 실행 후 .mp4 파일 업로드 필요**

In [7]:
def analyze_uploaded_video_with_tracking_2(model, tracker_args,
                                           target_classes=[0 ,1, 2],
                                           conf_threshold=0.3,
                                           frame_resize=(720, 480),
                                           stationary_frame_threshold=5,
                                           position_threshold=5,
                                           accident_frame_threshold=5,
                                           frame_step=3,
                                           video_path=None):

      #     업로드된 비디오 파일을 읽어 사고 여부를 추론하는 함수.

      #     Args:
      #         model: YOLOv8 모델 객체.
      #         tracker_args: BYTETracker 초기화 인자.
      #         target_classes (list[int]): 탐지할 클래스 인덱스 리스트.
      #         conf_threshold (float): 탐지 신뢰도 임계값.
      #         frame_resize (tuple): 프레임 리사이즈 크기 (width, height).
      #         stationary_frame_threshold (int): 정지 판단에 필요한 프레임 수.
      #         position_threshold (float): 정지 판단 이동 거리 임계값.
      #         accident_frame_threshold (int): 연속 사고 프레임 수 임계값.
      #         frame_step (int): 몇 프레임씩 건너뛸지 설정.
      #

      #     사고 감지 및 시각화된 프레임 리스트 반환.

    cap = cv2.VideoCapture(video_path)
    tracker = BYTETracker(tracker_args, frame_rate=30)

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_index = 0
    consecutive_accident_frames = 0
    track_history = {}
    accident_ids = set()
    output_frames = []
    flag = 0
    str1 = ""

    while frame_index < total_frames:
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_index)
        ret, frame = cap.read()
        if not ret:
            print("❌ 프레임 읽기 실패")
            break

        resized_frame = cv2.resize(frame, frame_resize)
        results = model.predict(source=resized_frame, classes=target_classes,
                                conf=conf_threshold, verbose=False)[0]

        detections = []
        for box in results.boxes:
            bbox = box.xyxy[0]
            bbox = bbox.cpu().numpy() if hasattr(bbox, 'cpu') else np.array(bbox)
            x1, y1, x2, y2 = bbox
            score = float(box.conf[0])
            detections.append([x1, y1, x2, y2, score])

        dets = np.array(detections) if detections else np.empty((0, 5))
        online_targets = tracker.update(dets, (resized_frame.shape[0], resized_frame.shape[1]), resized_frame.shape)

        for t in online_targets:
            track_id = t.track_id
            l, t_, r, b = map(int, t.tlbr)
            cx, cy = (l + r) // 2, (t_ + b) // 2

            if track_id not in track_history:
                track_history[track_id] = []
            track_history[track_id].append((cx, cy))

            history = track_history[track_id]
            if len(history) >= stationary_frame_threshold:
                movement = sum(
                    math.hypot(history[i][0] - history[i-1][0],
                               history[i][1] - history[i-1][1])
                    for i in range(-stationary_frame_threshold + 1, 0)
                )
                if movement < position_threshold:
                    accident_ids.add(track_id)

            color = (0, 0, 255) if track_id in accident_ids else (0, 255, 0)
            cv2.rectangle(resized_frame, (l, t_), (r, b), color, 2)
            cv2.putText(resized_frame, f'ID {track_id}', (l, t_ - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

        # 프레임 리스트에 저장 (BGR -> RGB 변환 후)
        rgb_frame = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB)
        output_frames.append(rgb_frame)

        if any(t.track_id in accident_ids for t in online_targets):
            consecutive_accident_frames += 1
        else:
            consecutive_accident_frames = 0

        if consecutive_accident_frames >= accident_frame_threshold:
            flag = 1

        frame_index += frame_step

    cap.release()
    if flag == 1:
      return output_frames, "🚨 사고 상황 의심!"
    else:
      return output_frames, "✅ 정상"


In [8]:
import gradio as gr
import cv2
import numpy as np
import requests
from ultralytics import YOLO
from types import SimpleNamespace as Namespace

# ✅ 모델 미리 로드 (성능 최적화)
model_path = "/content/drive/MyDrive/2025_1학기_캡스톤_12조/yolo_tmp_vehicle_detection(car, bus, truck).pt"
model = YOLO(model_path)

# ✅ ByteTrack 설정
tracker_args = Namespace(
    track_thresh=0.25,
    match_thresh=0.8,
    track_buffer=30,
    aspect_ratio_thresh=1.6,
    min_box_area=10,
    mot20=False
)

# ✅ CCTV 목록 요청 함수
def fetch_cctv_list(api_url):
    try:
        resp = requests.get(api_url)
        data = resp.json().get("response", {}).get("data", [])
        names = [c["cctvname"] for c in data]
        urls = [c["cctvurl"] for c in data]
        return dict(zip(names, urls))
    except:
        return {}

api_url = "https://openapi.its.go.kr:9443/cctvInfo?apiKey=7a7e0a02afe9486ab71f86210b1c6a20&type=ex&cctvType=1&minX=127.100000&maxX=128.890000&minY=34.100000&maxY=39.100000&getType=json"
name_to_url = fetch_cctv_list(api_url)

# ✅ 프레임 리스트를 비디오로 저장
def frames_to_video(frames, output_path, fps=10):
    if not frames:
        return None
    height, width, _ = frames[0].shape
    video_writer = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
    for frame in frames:
        bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        video_writer.write(bgr)
    video_writer.release()
    return output_path

# ✅ 통합 분석 콜백 함수
def gradio_callback(mode, video_file, selected_cctv_name,
                    conf_threshold, stationary_threshold,
                    position_threshold, accident_threshold,
                    frame_step, max_analysis_frames):
    if mode == "실시간 스트리밍 분석":
        stream_url = name_to_url.get(selected_cctv_name, "")
        if not stream_url:
            return None, "❌ 선택한 CCTV의 URL을 찾을 수 없습니다."
        output_frames, result_msg = analyze_stream_from_url_2(
            stream_url=stream_url,
            model=model,
            tracker_args=tracker_args,
            conf_threshold=conf_threshold,
            stationary_frame_threshold=stationary_threshold,
            position_threshold=position_threshold,
            accident_frame_threshold=accident_threshold,
            frame_step=frame_step,
            max_analysis_frames=max_analysis_frames  # ✅ 추가
        )
    else:
        if video_file is None:
            return None, "❌ 비디오 파일을 업로드하세요."
        output_frames, result_msg = analyze_uploaded_video_with_tracking_2(
            video_path=video_file,  # ✅ 반드시 전달해야 함
            model=model,
            tracker_args=tracker_args,
            conf_threshold=conf_threshold,
            stationary_frame_threshold=stationary_threshold,
            position_threshold=position_threshold,
            accident_frame_threshold=accident_threshold,
            frame_step=frame_step,
        )


    output_video_path = "/tmp/output_video.mp4"
    frames_to_video(output_frames, output_video_path, fps=10)
    return output_video_path, result_msg

# ✅ Gradio UI
with gr.Blocks() as demo:
    gr.Markdown("## 🚦 2025-1 Capstone Design AI Accident Detection")

    # 분석 방식 선택
    mode_radio = gr.Radio(
        choices=["실시간 스트리밍 분석", "영상 업로드 분석"],
        label="분석 방식 선택",
        value="실시간 스트리밍 분석"
    )

    # CCTV 드롭다운 (mode에 따라 표시 여부 토글)
    cctv_dropdown = gr.Dropdown(
        choices=list(name_to_url.keys()),
        label="CCTV 스트리밍 선택",
        visible=True
    )

    # 영상 업로드 입력
    video_input = gr.Video(label="영상 업로드", visible=False)

    # 파라미터 설명 블록
    gr.Markdown("""
    ### ⚙️ 분석 파라미터 설명

    - **Confidence Threshold**: 객체 탐지의 신뢰도 임계값 (0.1 ~ 1.0). 낮을수록 더 많은 객체 탐지, 높을수록 확실한 객체만 탐지.
    - **Stationary Frame Threshold**: 차량이 몇 프레임 연속 정지해야 '정지 상태'로 판단할지 설정.
    - **Position Threshold**: 이동 거리가 이 값 이하일 경우 정지로 간주함 (픽셀 단위로 거리 계산).
    - **Accident Frame Threshold**: '정지 상태'로 판단된 프레임이 몇 번 연속되면 사고로 판단할지 설정.
    - **Frame Step**: 분석 시 몇 프레임씩 건너뛸지 설정 (1이면 모든 프레임 분석, 2면 1프레임 건너뜀).
    - **Max Analysis Frames** *(실시간 전용)*: 실시간 분석 시 최대 분석할 프레임 수 (성능 및 응답 시간 조절).
    """)

    # 파라미터 슬라이더
    with gr.Row():
        conf_slider = gr.Slider(minimum=0.1, maximum=1.0, step=0.05, value=0.3, label="Confidence Threshold")
        stationary_slider = gr.Slider(minimum=1, maximum=10, step=1, value=5, label="Stationary Frame Threshold")
        position_slider = gr.Slider(minimum=1, maximum=20, step=1, value=5, label="Position Threshold")

    with gr.Row():
        accident_slider = gr.Slider(minimum=1, maximum=10, step=1, value=3, label="Accident Frame Threshold")
        frame_step_slider = gr.Slider(minimum=1, maximum=5, step=1, value=3, label="Frame Step")
        max_analysis_frames_slider = gr.Slider(minimum=100, maximum=2000, step=100, value=500, label="Max Analysis Frames (실시간 전용)")


    # 버튼 및 출력 영역
    start_button = gr.Button("분석 시작")
    output_video = gr.Video(label="분석 결과 영상")
    result_text = gr.Textbox(label="분석 결과")

    # ✅ 모드 전환 시 UI 토글
    def toggle_inputs(mode):
        return (
            gr.update(visible=(mode == "실시간 스트리밍 분석")),
            gr.update(visible=(mode == "영상 업로드 분석"))
        )

    mode_radio.change(
        fn=toggle_inputs,
        inputs=mode_radio,
        outputs=[cctv_dropdown, video_input]
    )

    # ✅ 분석 시작 버튼 클릭 시 콜백
    start_button.click(
        fn=gradio_callback,
        inputs=[
            mode_radio, video_input, cctv_dropdown,
            conf_slider, stationary_slider, position_slider,
            accident_slider, frame_step_slider, max_analysis_frames_slider  # ✅ 추가
        ],
        outputs=[output_video, result_text]
    )


# ✅ UI 실행
demo.launch()


It looks like you are running Gradio on a hosted a Jupyter notebook. For the Gradio app to work, sharing must be enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://3406377539b8fba6fb.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


