In [15]:
import os
import numpy as np
from typing import Tuple
from concurrent.futures import ThreadPoolExecutor
import winsound
import mediapipe as mp
os.environ["OPENCV_VIDEOIO_MSMF_ENABLE_HW_TRANSFORMS"] = "0"
import cv2
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import urllib.request
import time
import pathlib

In [16]:

class WinSoundController:
    def __init__(self, wav_path: str):
        self.wav_path = wav_path
        self._on = False
    def set_on(self, on: bool):
        if on and not self._on:
            winsound.PlaySound(self.wav_path,
                               winsound.SND_FILENAME | winsound.SND_ASYNC | winsound.SND_LOOP)
            self._on = True
        elif not on and self._on:
            winsound.PlaySound(None, winsound.SND_PURGE)
            self._on = False

class IRIS_TASK:
    def __init__(self):
        self.face_cascPath = r"./haarcascade_frontalface_alt.xml"
        self.eye_cascPath  = r"./haarcascade_eye_tree_eyeglasses.xml"

        self.faceCascade = cv2.CascadeClassifier(self.face_cascPath)
        self.eyeCascade  = cv2.CascadeClassifier(self.eye_cascPath)

        if self.faceCascade.empty() or self.eyeCascade.empty():
            raise IOError("Error loading cascade XML files!")

    def process(self, frame: np.ndarray) -> Tuple[bool, str, np.ndarray]:

        img = frame
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        faces = self.faceCascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))

        close_eye_flag = False
        message = "eyes detected"

        for (x, y, w, h) in faces:
            cv2.rectangle(img, (x, y), (x+w, y+h), (0, 255, 0), 2)
            face_gray  = gray[y:y+h, x:x+w]

            eyes = self.eyeCascade.detectMultiScale(face_gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))

            for (ex, ey, ew, eh) in eyes:
                cv2.rectangle(img, (x+ex, y+ey), (x+ex+ew, y+ey+eh), (255, 0, 0), 2)

            if len(eyes) == 0:
                close_eye_flag = True
                message = "close eye detected"

        return close_eye_flag, message, frame

class HEAD_DIRECTION_TASK():
    def __init__(self):
        mp_face_mesh = mp.solutions.face_mesh
        self.face_mesh = mp_face_mesh.FaceMesh(
            max_num_faces=1,
            refine_landmarks=True,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5
        )
        # 時間管理用変数
        self.off_start_time = None
        self.warning_active = False

    def get_head_direction(self, landmarks, img_w, img_h):
        # 鼻と左右目の位置を取得
        nose = landmarks[1]        # 鼻先
        left_eye = landmarks[33]   # 左目端
        right_eye = landmarks[263] # 右目端

        # ピクセル座標に変換
        nose_point = np.array([nose.x * img_w, nose.y * img_h])
        left_point = np.array([left_eye.x * img_w, left_eye.y * img_h])
        right_point = np.array([right_eye.x * img_w, right_eye.y * img_h])

        eye_center = (left_point + right_point) / 2
        dx = nose_point[0] - eye_center[0]
        dy = nose_point[1] - eye_center[1]

        # 閾値調整
        direction = "Front"
        if dx > 40:
            direction = "Looking Right"
        elif dx < -40:
            direction = "Looking Left"
        elif dy > 30:
            direction = "Looking Down"
        elif dy < -30:
            direction = "Looking Up"

        return direction

    def process(self, frame) -> Tuple[bool, str, np.ndarray]:
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = self.face_mesh.process(rgb_frame)

        h, w, _ = frame.shape
        status = False
        message = "Front"

        if results.multi_face_landmarks:
            for face_landmarks in results.multi_face_landmarks:
                direction = self.get_head_direction(face_landmarks.landmark, w, h)
                message = direction

                # 顔の枠描画
                x_coords = [lm.x * w for lm in face_landmarks.landmark]
                y_coords = [lm.y * h for lm in face_landmarks.landmark]
                x_min, x_max = int(min(x_coords)), int(max(x_coords))
                y_min, y_max = int(min(y_coords)), int(max(y_coords))
                cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), (0, 255, 0), 2)

                # Front 以外なら時間を測定
                if direction != "Front":
                    if self.off_start_time is None:
                        self.off_start_time = time.time()
                    else:
                        elapsed = time.time() - self.off_start_time
                        if elapsed >= 3:  # 3秒以上
                            self.warning_active = True
                else:
                    # 正面を向いたらリセット
                    self.off_start_time = None
                    self.warning_active = False

                # 警告表示
                if self.warning_active:
                    status = True
                    cv2.putText(frame, "WARNING: Distracted!", (30, 100),
                                cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)

                # 現在の向きを描画
                cv2.putText(frame, f"Direction: {direction}", (30, 50),
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

        return status, message, frame

class HAND_TASK():
    def __init__(self):
        self.mp_hands = mp.solutions.hands
        self.hands = self.mp_hands.Hands(
            static_image_mode=False,
            max_num_hands=2,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5
        )
        self.mp_drawing = mp.solutions.drawing_utils


    def process(self, frame) -> Tuple[bool, str, np.ndarray]:
        h, w, _,  = frame.shape
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = self.hands.process(frame_rgb)

        # Define steering wheel region (adjust this based on your camera angle)
        roi_top_left = (int(w * 0.2), int(h * 0.5))
        roi_bottom_right = (int(w * 0.8), int(h * 0.98))
        cv2.rectangle(frame, roi_top_left, roi_bottom_right, (0, 255, 0), 2)

        hands_on_wheel = False

        if results.multi_hand_landmarks:
            for hand_landmarks in results.multi_hand_landmarks:
                self.mp_drawing.draw_landmarks(
                    frame, hand_landmarks, self.mp_hands.HAND_CONNECTIONS
                )

                for lm in hand_landmarks.landmark:
                    x, y = int(lm.x * w), int(lm.y * h)

                    # Check if landmark is inside steering wheel ROI
                    if roi_top_left[0] <= x <= roi_bottom_right[0] and roi_top_left[1] <= y <= roi_bottom_right[1]:
                        hands_on_wheel = True
                        break

        if hands_on_wheel:
            return False, "Hands on steering wheel", frame
        else:
            return True, "Warning: Hands removed from steering wheel!", frame
        

def add_label(res):
    label = res[1]
    color = (0,0,255) if res[0] else (255,0,0)  # keep your colors
    cv2.putText(res[2], label, (10,30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
    return res[2]

In [None]:
# https://developers.google.com/mediapipe/solutions/vision/object_detector
class CELL_PHONE_TASK():
    # https://storage.googleapis.com/mediapipe-models/
    base_url = 'https://storage.googleapis.com/mediapipe-tasks/object_detector/'
    model_name = 'efficientdet_lite0_fp32.tflite'
    model_folder_path = './learned_models/mediapipe'

    H_MARGIN = 10  # pixels
    V_MARGIN = 30  # pixels
    FONT_SIZE = 1
    FONT_THICKNESS = 1
    TEXT_COLOR = (0, 255, 0)  # green

    # full list(efficientdet_lite0_fp32.tflite)
    # https://storage.googleapis.com/mediapipe-tasks/object_detector/labelmap.txt

    # https://developers.google.com/mediapipe/solutions/vision/object_detector
    def __init__(
            self,
            model_folder_path=model_folder_path,
            base_url=base_url,
            model_name=model_name,
            max_results=2,
            score_threshold=0.05,
            mode="video"
            ):

        self.mode = mode
        if self.mode == "image":
            rmode = mp.tasks.vision.RunningMode.IMAGE
        else:
            rmode = mp.tasks.vision.RunningMode.VIDEO


        model_path = self.set_model(base_url, model_folder_path, model_name)
        options = mp.tasks.vision.ObjectDetectorOptions(
            base_options=mp.tasks.BaseOptions(model_asset_path=model_path),
            max_results=max_results,
            score_threshold=score_threshold,
            running_mode=rmode
        )
        self.detector = mp.tasks.vision.ObjectDetector.create_from_options(options)

        self.mp_hands = mp.solutions.hands
        self.hands = self.mp_hands.Hands(
            static_image_mode=False,
            max_num_hands=2,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5
        )
        self.mp_drawing = mp.solutions.drawing_utils

    def set_model(self, base_url, model_folder_path, model_name):
        model_path = model_folder_path+'/'+model_name
        # download model if model file does not exist
        if not os.path.exists(model_path):
            # make directory if model_folder directory does not exist
            os.makedirs(model_folder_path, exist_ok=True)
            # download model file
            url = base_url+model_name
            save_name = model_path
            urllib.request.urlretrieve(url, save_name)
        return model_path

    def process(self, frame) -> Tuple[bool, str, np.ndarray]:
        detected = False
        message = "No object detected in hand"

        img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        hand_results = self.hands.process(img_rgb)

        if hand_results.multi_hand_landmarks:
            print(f"Detected {len(hand_results.multi_hand_landmarks)} hands")
            h, w, _ = frame.shape

            for hand_landmarks in hand_results.multi_hand_landmarks:
                # Get hand bounding box
                x_min = int(min([lm.x for lm in hand_landmarks.landmark]) * w)
                y_min = int(min([lm.y for lm in hand_landmarks.landmark]) * h)
                x_max = int(max([lm.x for lm in hand_landmarks.landmark]) * w)
                y_max = int(max([lm.y for lm in hand_landmarks.landmark]) * h)

                pad = 10
                x_min = max(x_min - pad, 0)
                y_min = max(y_min - pad, 0)
                x_max = min(x_max + pad, w)
                y_max = min(y_max + pad, h)

                # Crop the hand region
                hand_crop = frame[y_min:y_max, x_min:x_max]
                if hand_crop.size == 0:
                    continue

                # Convert to MediaPipe Image
                mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=np.ascontiguousarray(cv2.cvtColor(hand_crop, cv2.COLOR_BGR2RGB)))

                # Detect objects in hand
                detection_result = self.detector.detect_for_video(mp_image, int(time.time() * 1000))
                print(detection_result)

                for det in detection_result.detections:
                    category = det.categories[0].category_name or "object"
                    score = det.categories[0].score
                    bbox = det.bounding_box

                    # Filter out hand itself
                    if category.lower() not in ["cell phone", "cellphone", "mobile phone", "phone"]:
                        continue


                    detected = True
                    message = f"{category} ({score:.2f})"

                    # Map bbox to original frame coordinates
                    abs_x_min = x_min + int(bbox.origin_x)
                    abs_y_min = y_min + int(bbox.origin_y)
                    abs_x_max = x_min + int(bbox.origin_x + bbox.width)
                    abs_y_max = y_min + int(bbox.origin_y + bbox.height)

                    # Draw bounding box and label
                    cv2.rectangle(frame, (abs_x_min, abs_y_min), (abs_x_max, abs_y_max), (0, 0, 255), 2)
                    cv2.putText(frame, message, (abs_x_min, abs_y_min - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

                # Draw hand landmarks for reference
                self.mp_drawing.draw_landmarks(frame, hand_landmarks, self.mp_hands.HAND_CONNECTIONS)
        
        print(detected, message)
        return detected, message, frame
        # return True, "cell phone in hand", frame

In [20]:
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)

iris_task = IRIS_TASK()
cell_phone_task = CELL_PHONE_TASK()
head_dir_task  = HEAD_DIRECTION_TASK()
hand_task = HAND_TASK()
tasks = [iris_task.process, cell_phone_task.process, head_dir_task.process, hand_task.process]

# Use a WAV file for reliable start/stop control
sound = WinSoundController('alert.wav')
verbose = True

with ThreadPoolExecutor(max_workers=len(tasks)) as pool:
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        futures = [pool.submit(func, frame.copy()) for func in tasks]
        results = [f.result() for f in futures]

        if verbose:
            concat_images = [add_label(res) for res in results]
            result_image = np.concatenate(
                (
                    np.concatenate((concat_images[0], concat_images[1]), axis=1),
                    np.concatenate((concat_images[2], concat_images[3]), axis=1)
                ), axis=0
            )
            cv2.imshow('result', result_image)

            # --- SOUND LOGIC: play once while any alert is True; stop when all False ---
            bad_flag = any(res[0] for res in results)
            sound.set_on(bad_flag)

        key = cv2.waitKey(1) & 0xFF
        if key == ord('q'):
            break
# ensure sound is stopped on exit
sound.set_on(False)
cv2.destroyAllWindows()
cap.release()


RuntimeError: Unable to open file at c:\oit\py25en\WPy64-312101\python\Lib\site-packages/C:\oit\py25en\source\projects\learned_models\mediapipe\efficientdet_lite0_fp32.tflite, errno=22