In [None]:
from os.path import dirname, abspath
import os
import cv2
from contextlib import contextmanager
import numpy as np
import logging

In [None]:
VIDEO = "test.mp4"
WEIGHTS = "YOLOFI2.weights"
CONFIG = "YOLOFI.cfg"
OBJ_NAMES = "obj.names"
SAVE_PATH = os.getcwd() + "/"

logging.basicConfig(level=logging.INFO)

In [None]:
#class define

class BeltVisible:
    # lists of frames (ids) where the belt part is supposed to be detected as closed
    # first frame has id 0
    def __init__(self, belt_frames, belt_corner_frames):
        self.belt_frames = belt_frames
        self.belt_corner_frames = belt_corner_frames


class BeltDetected:
    # list of frames (ids) where the belt part was detected as closed
    # first frame has id 0
    def __init__(self):
        self.belt_frames = []  # main part
        self.belt_corner_frames = []  # corner part

    def add_belt(self, frame):
        self.belt_frames.append(frame)

    def add_corner_belt(self, frame):
        self.belt_corner_frames.append(frame)


In [None]:
def video_capture(*args, **kwargs):
    cap = cv2.VideoCapture(*args, **kwargs)
    try:
        yield cap
    finally:
        cap.release()
        cv2.destroyAllWindows()


def get_layers(net):
    layer_names = net.getLayerNames()
    return [layer_names[layer[0] - 1] for layer in net.getUnconnectedOutLayers()]


def get_classes():
    classes = []
    with open(OBJ_NAMES, "r") as file:
        classes = [line.strip() for line in file.readlines()]
    return classes


def belt_detector(net, img, belt_detected, current_frame):
    blob = cv2.dnn.blobFromImage(img, 0.00392, (480, 480), (0, 0, 0), True, crop=False)
    net.setInput(blob)

    height, width, channels = img.shape

    outs = net.forward(get_layers(net))
    for out in outs:
        for detection in out:
            scores = detection[5:]
            class_id = np.argmax(scores)
            confidence = scores[class_id]

            if confidence > 0.2:
                center_x = int(detection[0] * width)
                center_y = int(detection[1] * height)
                w = int(detection[2] * width)
                h = int(detection[3] * height)
                x = int(center_x - w / 2)
                y = int(center_y - h / 2)
                cv2.rectangle(img, (x, y), (x + w, y + h), (0, 255, 0), 2)
                if class_id == 1:
                    belt_detected.add_corner_belt(current_frame)
                elif class_id == 0:
                    belt_detected.add_belt(current_frame)

    return belt_detected


def print_belt_report(belt_detected, total_frames):
    belt_visible = BeltVisible(
        belt_frames=[i for i in range(125)],
        belt_corner_frames=[i for i in range(125)]
    )
    success_belt_frames = set(belt_visible.belt_frames).intersection(belt_detected.belt_frames)
    success_belt_corner_frames = set(belt_visible.belt_corner_frames).intersection(
        belt_detected.belt_corner_frames
    )

    logging.info(
        "Total frames {}, successfully detected belt {} of {} times, corner belt - {} of {}".format(
            total_frames,
            len(success_belt_frames),
            len(belt_visible.belt_frames),
            len(success_belt_corner_frames),
            len(belt_visible.belt_corner_frames)
        ))
    logging.info("Non detected belt frames: {}".format(
        set(belt_visible.belt_frames).difference(belt_detected.belt_frames))
    )
    logging.info("False detected belt frames: {}".format(
        set(belt_detected.belt_frames).difference(belt_visible.belt_frames))
    )
    logging.info("Non detected belt corner frames: {}".format(
        set(belt_visible.belt_corner_frames).difference(belt_detected.belt_corner_frames))
    )
    logging.info("False detected belt corner frames: {}".format(
        set(belt_detected.belt_corner_frames).difference(belt_visible.belt_corner_frames))
    )


def apply_clahe(img, **kwargs):
    lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
    lab = cv2.split(lab)
    clahe = cv2.createCLAHE(**kwargs)
    lab[0] = clahe.apply(lab[0])
    lab = cv2.merge((lab[0], lab[1], lab[2]))
    return cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)


def apply_gabor(img, **kwargs):
    g_kernel = cv2.getGaborKernel(**kwargs)
    return cv2.filter2D(img, cv2.CV_8UC3, g_kernel.sum())


def increase_brightness(img):
    hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
    h, s, v = cv2.split(hsv)
    v += 255
    final_hsv = cv2.merge((h, s, v))
    return cv2.cvtColor(final_hsv, cv2.COLOR_HSV2BGR)


def build_filters():
    filters = []
    ksize = 31
    for theta in np.arange(0, np.pi, np.pi / 16):
        kern = cv2.getGaborKernel((ksize, ksize), 0.3, theta, 9.0, 0.6, 50, ktype=cv2.CV_32F)
    kern /= 1.5*kern.sum()
    filters.append(kern)
    return filters

def process(img, filters):
    accum = np.zeros_like(img)
    for kern in filters:
        fimg = cv2.filter2D(img, cv2.CV_8UC3, kern)
    np.maximum(accum, fimg, accum)
    return accum

def main():
    with video_capture(VIDEO) as cap:
        net = cv2.dnn.readNet(WEIGHTS, CONFIG)
        frame_id = -1
        belt_detected = BeltDetected()
        while True:
            frame = cap.read()
            frame_id += 1
            if not frame[0]:
                break
            img = frame[1][:, 50: -50]

            # TODO: your code here
            img = increase_brightness(img)

            img = apply_clahe(img=img, clipLimit=5, tileGridSize=(17, 17))
            # better results for corner belt, slightly worse results for main part
            # img = apply_gabor(img=img, ksize=(4, 4), sigma=5, theta=89,
            #                   lambd=1, gamma=2, psi=0, ktype=cv2.CV_64F)
            # the best result for main part
            img = apply_gabor(img=img, ksize=(31, 31), sigma=2.9, theta=160,
                              lambd=14.5, gamma=35, psi=50, ktype=cv2.CV_64F)
            belt_detected = belt_detector(net, img, belt_detected, frame_id)
            cv2.imshow("Image", img)

            # to decide which frame should be assumed as belt position changing
            # chosen id=124, although it is arguable
            # if frame_id in range(120, 126):
            #     cv2.imwrite(SAVE_PATH + "{id}.png".format(id=frame_id), img)

            key = cv2.waitKey(1)
            if key == 27:
                break
        print_belt_report(belt_detected, frame_id)

    
    net =cv2.dnn.readNet("YOLOFI2.weights","YOLOFI.cfg")
    cap = cv2.VideoCapture("test.mp4")
    classes=[]  
    l=1
    with open("obj.names","r")as f:
            classes = [line.strip()for line in f.readlines()]
            layers_names = net.getLayerNames()
            outputlayers= [layers_names[i[0]-1]for i in net.getUnconnectedOutLayers()]
            colors = np.random.uniform(0,255,size =(len(classes),3))
            font = cv2.FONT_HERSHEY_PLAIN
            frame_id=0  
            dd =-1
            time_now=time.time()
            frame_id=0
            err=0

            count = 0 # for counting frames

            while True:
                _, frame = cap.read()
                frame_id += 1           
                beltcornerdetected = False
                beltdetected = False 
                height , width , channels = frame.shape


                #Type you code here

                clahe = cv2.createCLAHE(clipLimit=5.0, tileGridSize=(8,8))

                R, G, B = cv2.split(frame)


                output1_R = clahe.apply(R)
                output1_G = clahe.apply(G)
                output1_B = clahe.apply(B)

                frame = cv2.merge((output1_R, output1_G, output1_B))

                cv2.fastNlMeansDenoising(frame,frame,3,5,11)

                filters = build_filters()
                frame = process(frame, filters)

                blob = cv2.dnn.blobFromImage(frame, 0.00392, (480,480),(0,0,0),True,crop= False)
                net.setInput(blob)
                outs = net.forward(outputlayers)
                class_ids=[]
                boxes=[]
                shape=[]
                confidence=0

                for out in outs:

                    for detection in out:
                        scores = detection[5:]
                        class_id = np.argmax(scores)
                        confidence = scores[class_id]
                        
                        if confidence> 0.2:
                            center_x= int(detection[0] *width)
                            center_y=int(detection[1]* height)
                            w= int(detection[2] *width)
                            h= int(detection[3] * height)
                            x= int(center_x- w /2)
                            y= int(center_y -h /2)
                            cv2.rectangle(frame,(x,y),(x+w,y+h),(0,255,0),2)
                            if class_id== 1:
                                beltcornerdetected=True
                            elif class_id == 0:
                                beltdetected=True
                            
                print(count, ' ', beltdetected)
                count+=1
                cv2.imshow("Image",frame)
                key =cv2.waitKey(1)
                if key == 27:
                  break
           
            cap.release()    
            cv2.destroyAllWindows()
            
if __name__ == "__main__":
    main()


In [12]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
seatbelt_detection.py
---------------------------------
Real‑time driver‑seat‑belt detector (YOLOv3 / OpenCV‑DNN).

依賴：
    pip install opencv-python numpy
    # 建議先安裝和 GPU 相容版本的 OpenCV /  CUDA / cuDNN，可大幅提升 FPS

用法：
    python seatbelt_detection.py               # 讀預設攝影機 0
    python seatbelt_detection.py --video test.mp4
    python seatbelt_detection.py --video 0     # 同上，顯式指定 webcam
"""

import argparse
import contextlib
import logging
import os
import time
from pathlib import Path

import cv2
import numpy as np

# ======== 路徑設定（請視需要修改） =========================================
ROOT_DIR = Path(os.getcwd())
WEIGHTS = str(ROOT_DIR / "YOLOFI2.weights")
CONFIG  = str(ROOT_DIR / "YOLOFI.cfg")
OBJ_NAMES = str(ROOT_DIR / "obj.names")
# ===========================================================================


# ---------- 資料結構 --------------------------------------------------------
class BeltDetected:
    """累積模型偵測到安全帶的影格索引"""
    def __init__(self):
        self.belt_frames = set()
        self.belt_corner_frames = set()

    def add_belt(self, frame_id: int):
        self.belt_frames.add(frame_id)

    def add_corner_belt(self, frame_id: int):
        self.belt_corner_frames.add(frame_id)


class BeltVisible:
    """
    預期在 ground‑truth 中看得到安全帶／轉角帶的影格清單
    這裡放示範值：0‑124 frame 皆應可見；實務請改成自己的標註。
    """
    def __init__(self, belt_frames=None, belt_corner_frames=None):
        self.belt_frames = belt_frames or []
        self.belt_corner_frames = belt_corner_frames or []


# ---------- 工具函式 --------------------------------------------------------
@contextlib.contextmanager
def video_capture(src):
    """確保離開時能自動釋放資源與關閉所有 OpenCV 視窗"""
    cap = cv2.VideoCapture(src)
    try:
        yield cap
    finally:
        cap.release()
        cv2.destroyAllWindows()


def get_layers(net):
    layer_names = net.getLayerNames()
    return [layer_names[i[0] - 1] for i in net.getUnconnectedOutLayers()]


def get_classes():
    with open(OBJ_NAMES, "r", encoding="utf-8") as f:
        return [line.strip() for line in f.readlines()]


# ---------- 影像增強 --------------------------------------------------------
def increase_brightness(img, delta: int = 60):
    hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
    h, s, v = cv2.split(hsv)
    v = np.clip(v.astype(np.int16) + delta, 0, 255).astype(np.uint8)
    final_hsv = cv2.merge((h, s, v))
    return cv2.cvtColor(final_hsv, cv2.COLOR_HSV2BGR)


def apply_clahe(img, clip_limit=5.0, grid_size=(8, 8)):
    lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
    l, a, b = cv2.split(lab)
    clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=grid_size)
    l = clahe.apply(l)
    lab = cv2.merge((l, a, b))
    return cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)


def build_gabor_filters(ksize=31):
    filters = []
    for theta in np.arange(0, np.pi, np.pi / 16):
        kern = cv2.getGaborKernel(
            (ksize, ksize), 0.3, theta, 9.0, 0.6, 50, ktype=cv2.CV_32F
        )
        kern /= 1.5 * kern.sum()
        filters.append(kern)
    return filters


def apply_gabor_bank(img, filters):
    accum = np.zeros_like(img)
    for kern in filters:
        fimg = cv2.filter2D(img, cv2.CV_8UC3, kern)
        np.maximum(accum, fimg, accum)  # element‑wise max
    return accum


def preprocess_frame(frame, filters):
    """可視需要刪改；此處做亮度提升 + CLAHE + Gabor 濾波"""
    frame = increase_brightness(frame, delta=60)
    frame = apply_clahe(frame, clip_limit=5.0, grid_size=(8, 8))
    frame = apply_gabor_bank(frame, filters)
    return frame


# ---------- YOLO 偵測 -------------------------------------------------------
def belt_detector(net, img, belt_detected: BeltDetected, frame_idx: int,
                  conf_thres=0.2):
    blob = cv2.dnn.blobFromImage(
        img, 0.00392, (480, 480), (0, 0, 0), swapRB=True, crop=False
    )
    net.setInput(blob)
    outs = net.forward(get_layers(net))

    h, w = img.shape[:2]

    for out in outs:
        for det in out:
            scores = det[5:]
            cls_id = int(np.argmax(scores))
            confidence = scores[cls_id]
            if confidence < conf_thres:
                continue

            # 轉回影像座標
            cx, cy, bw, bh = det[0:4] * np.array([w, h, w, h])
            x, y = int(cx - bw / 2), int(cy - bh / 2)
            bw, bh = int(bw), int(bh)

            # 畫框
            color = (0, 255, 0) if cls_id == 0 else (255, 0, 0)
            cv2.rectangle(img, (x, y), (x + bw, y + bh), color, 2)
            label = f"{cls_id}:{confidence:.2f}"
            cv2.putText(img, label, (x, y - 6),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

            # 累積偵測結果
            if cls_id == 1:          # 假設 1 = seatbelt corner
                belt_detected.add_corner_belt(frame_idx)
            elif cls_id == 0:        # 假設 0 = seatbelt
                belt_detected.add_belt(frame_idx)

    return belt_detected


# ---------- 統計報表 --------------------------------------------------------
def print_belt_report(belt_detected: BeltDetected, total_frames: int):
    # 示範：假設前 125 幀理論上都應偵測到
    belt_visible = BeltVisible(
        belt_frames=list(range(125)),
        belt_corner_frames=list(range(125)),
    )

    succ_belt = set(belt_visible.belt_frames) & belt_detected.belt_frames
    succ_corner = set(belt_visible.belt_corner_frames) & belt_detected.belt_corner_frames

    logging.info(
        "Total frames: %d | seat‑belt: %d / %d | corner: %d / %d",
        total_frames, len(succ_belt), len(belt_visible.belt_frames),
        len(succ_corner), len(belt_visible.belt_corner_frames)
    )
    logging.debug("Missed belt frames: %s",
                  set(belt_visible.belt_frames) - belt_detected.belt_frames)
    logging.debug("False‑positive belt frames: %s",
                  belt_detected.belt_frames - set(belt_visible.belt_frames))
    logging.debug("Missed corner frames: %s",
                  set(belt_visible.belt_corner_frames) - belt_detected.belt_corner_frames)
    logging.debug("False‑positive corner frames: %s",
                  belt_detected.belt_corner_frames - set(belt_visible.belt_corner_frames))


# ---------- 主程式 ----------------------------------------------------------
def parse_args():
    ap = argparse.ArgumentParser(description="Seat‑belt detector (YOLOv3 / OpenCV‑DNN)")
    ap.add_argument("--video", type=str, default="0",
                    help="影片路徑，或 camera index (預設 0)")
    ap.add_argument("--loglevel", type=str, default="info",
                    choices=["debug", "info", "warning", "error"])
    return ap.parse_args()


def main():
    args = parse_args()
    logging.basicConfig(
        format="%(asctime)s │ %(levelname)-7s │ %(message)s",
        level=getattr(logging, args.loglevel.upper(), logging.INFO),
    )

    # 讀取類別與模型
    classes = get_classes()
    net = cv2.dnn.readNet(WEIGHTS, CONFIG)

    # 若有 GPU / CUDA，可取消下行註解
    # net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
    # net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA_FP16)

    filters = build_gabor_filters()
    belt_detected = BeltDetected()

    # 相機或影片
    src = int(args.video) if args.video.isdigit() else args.video

    with video_capture(src) as cap:
        frame_idx = -1
        fps_time = time.time()
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame_idx += 1

            # 可視需求裁切 ROI，例如只留中段避免誤判
            frame_roi = frame[:, 50:-50]

            # 圖像增強
            frame_proc = preprocess_frame(frame_roi, filters)

            # YOLO 偵測
            belt_detected = belt_detector(net, frame_proc, belt_detected, frame_idx)

            # 顯示 FPS
            dt = time.time() - fps_time
            fps = 1.0 / dt if dt > 0 else 0
            fps_time = time.time()
            cv2.putText(frame_proc, f"FPS:{fps:.1f}", (10, 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)

            cv2.imshow("Seat‑belt Detection", frame_proc)
            if cv2.waitKey(1) & 0xFF == 27:    # ESC 離開
                break

    print_belt_report(belt_detected, frame_idx)


if __name__ == "__main__":
    main()


usage: ipykernel_launcher.py [-h] [--video VIDEO]
ipykernel_launcher.py: error: unrecognized arguments: --f=/Users/jaylong/Library/Jupyter/runtime/kernel-v3272ae5d6da9d872b8d94c73fe3d7d7dc7a23042b.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
