### Mediapipe & OpenCV & Pytorch project : Drowsiness Detection

In [1]:
all = [var for var in globals() if var[0] != "_"]
for var in all:
    del globals()[var]

In [2]:
import cv2
import time
import numpy as np
import mediapipe as mp
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torchsummary

from torchvision import models
from torchvision import transforms
from PIL import Image

import pygame

pygame 2.5.0 (SDL 2.28.0, Python 3.9.16)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [3]:
# mediapipe를 이용해 양쪽 눈에 대한 landmark(index) 포인트를 가져옴

mp_facemesh = mp.solutions.face_mesh
mp_drawing  = mp.solutions.drawing_utils
denormalize_coordinates = mp_drawing._normalized_to_pixel_coordinates

Class_Names = ['Closed_Eyes', 'Open_Eyes']

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = torch.device('cpu')

model = torch.load('drowsiness_detect.pt').to(device)
model.load_state_dict(torch.load('drowsiness_detect_state_dict.pt')) 

model.eval()

transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.Grayscale(num_output_channels=3),
    transforms.ToTensor()
])

In [5]:
# FaceMash 설정

def get_facemesh(
                max_num_faces=1,                # 감지할 얼굴 수
                refine_landmarks=False,         # 눈 외의 landmark는 세분화시키지 않음
                min_detection_confidence=0.1,   # 얼굴 인식에 성공한 것으로 간주되는 최소 신뢰도
                min_tracking_confidence= 0.2    # 성공적으로 추적한 것으로 간주되는 최소 신뢰도
):
    face_mesh = mp_facemesh.FaceMesh(
        max_num_faces=max_num_faces,
        refine_landmarks=refine_landmarks,
        min_detection_confidence=min_detection_confidence,
        min_tracking_confidence=min_tracking_confidence
    )

    return face_mesh
# 감지된 landmark points 목록
# face_mesh.multi_face_landmarks

In [6]:
def distance(point_1, point_2):
    # L2 norm 계산 (두 벡터 사이의 거리 계산)
    dist = sum([(i - j) ** 2 for i, j in zip(point_1, point_2)]) ** 0.5
    return dist

In [7]:
# EAR 공식 적용

def get_ear(landmarks, refer_idxs, frame_width, frame_height):
    # landmarks : 검출된 lanmarks list
    # refer_idxs : 검출을 위해 지정한 landmarks list [index]

    try:
        # 수평 거리 계산
        coords_points = []
        for i in refer_idxs:
            lm = landmarks[i]
            coord = denormalize_coordinates(lm.x, lm.y, frame_width, frame_height)
            coords_points.append(coord)
 
        # EAR 공식에 맞춰 P2-P6, P3-P5, P1-P4를 연산함
        P2_P6 = distance(coords_points[1], coords_points[5])
        P3_P5 = distance(coords_points[2], coords_points[4])
        P1_P4 = distance(coords_points[0], coords_points[3])
 
        ear = (P2_P6 + P3_P5) / (2.0 * P1_P4)
 
    except:
        ear = 0.0
        coords_points = None
 
    return ear, coords_points

In [8]:
def calculate_avg_ear(landmarks, left_eye_idxs, right_eye_idxs, image_w, image_h):
    
    # 왼쪽 눈의 EAR 값과 landmarks의 좌표 값을 반환함
    left_ear, left_lm_coordinates = get_ear(
                                      landmarks, 
                                      left_eye_idxs, 
                                      image_w, 
                                      image_h
                                    )
    
    # 오른쪽 눈의 EAR 값과 landmarks의 좌표 값을 반환함
    right_ear, right_lm_coordinates = get_ear(
                                      landmarks, 
                                      right_eye_idxs, 
                                      image_w, 
                                      image_h
                                    )
    # 최종 EAR 값을 얻기 위해 왼쪽 오른쪽의 EAR 값 평균을 계산함
    Avg_EAR = (left_ear + right_ear) / 2.0
 
    return Avg_EAR, (left_lm_coordinates, right_lm_coordinates)

In [9]:
def plot_eye_landmarks(frame, left_lm_coordinates, 
                       right_lm_coordinates, color
                       ):
    for lm_coordinates in [left_lm_coordinates, right_lm_coordinates]:
        if lm_coordinates:
            for coord in lm_coordinates:
                cv2.circle(frame, coord, 2, color, -1)
 
    # frame = cv2.flip(frame, 1)
    return frame

In [10]:
def plot_text(image, text, origin, 
              color, font=cv2.FONT_HERSHEY_SIMPLEX, 
              fntScale=0.8, thickness=2
              ):
    image = cv2.putText(image, text, origin, font, fntScale, color, thickness)
    return image

In [11]:
def pred_eyeslabel(crop_eye):
    # eye = cv2.cvtColor(crop_eye, cv2.COLOR_BGR2RGB)
    # crop_eye = cv2.cvtColor(crop_eye, cv2.COLOR_RGB2GRAY)
    eye_img = Image.fromarray(crop_eye)

    eye_img = transform(eye_img).to(device)
    eye_img = eye_img.unsqueeze(0)
    
    test_preds = model(eye_img)
    predicted_class_idx = torch.argmax(test_preds, dim=1)
    predicted_class_label = Class_Names[predicted_class_idx.item()]

    return predicted_class_label

In [12]:
def alarm(path):
    pygame.mixer.init()
    pygame.mixer.music.load(path)
    pygame.mixer.music.play()

In [13]:
def process(frame: np.array, thresholds : dict):

        frame.flags.writeable = False
        frame_h, frame_w, _ = frame.shape
        
        DROWSY_TIME_txt_pos = (10, int(frame_h // 2 * 1.7))
        ALM_txt_pos = (10, int(frame_h // 2 * 1.85))

        red = (0,0,255)
        green = (0,255,0)
 
        facemesh_model = get_facemesh()
        results = facemesh_model.process(frame)

        chosen_left_eye_idxs  = [362, 385, 387, 263, 373, 380]
        chosen_right_eye_idxs = [33,  160, 158, 133, 153, 144]
 
        state_tracker = {
            "start_time": time.perf_counter(),
            "DROWSY_TIME": 0.0,  # Holds time passed with EAR < EAR_THRESH
            "COLOR": green,
            "play_alarm": False,
        }

        if results.multi_face_landmarks:        

            landmarks = results.multi_face_landmarks[0].landmark

            ############################################################### EAR 계산 및 감지한 eye landmark 시각화
            EAR, coordinates = calculate_avg_ear(landmarks,
                                                 chosen_left_eye_idxs, 
                                                 chosen_right_eye_idxs, 
                                                 frame_w, 
                                                 frame_h
                                                 )
            # frame = plot_eye_landmarks(frame, 
            #                            coordinates[0], 
            #                            coordinates[1],
            #                            state_tracker["COLOR"]
            #                            )
            
            ############################################################### model에 적용할 eye 영역 추출 및 결과 출력
            global eye, eye_left, eye_right
            # eye = frame[coordinates[1][2][1]-10 : coordinates[0][4][1]+10, coordinates[1][0][0]-10 : coordinates[0][3][0]+10]
            eye_left = frame[coordinates[0][2][1]-10 : coordinates[0][4][1]+10, coordinates[0][0][0]-10 : coordinates[0][3][0]+10]
            eye_right = frame[coordinates[1][2][1]-10 : coordinates[1][4][1]+10, coordinates[1][0][0]-10 : coordinates[1][3][0]+10]

            global model_result, left_pred, right_pred
            # model_result = pred_eyes(eye)
            left_pred = pred_eyeslabel(eye_left)
            right_pred = pred_eyeslabel(eye_right)


            ############################################################### EAR 연산값 출력 및 알람 기능
            if EAR < thresholds["EAR_THRESH"] and right_pred=='Closed_Eyes' and left_pred=='Closed_Eyes':
 
                end_time = time.perf_counter()
                # end_time = time.process_time()
 
                state_tracker["DROWSY_TIME"] += end_time - state_tracker["start_time"]
                state_tracker["start_time"] = end_time
                state_tracker["COLOR"] = red
 
                if state_tracker["DROWSY_TIME"] >= thresholds["WAIT_TIME"]:
                    state_tracker["play_alarm"] = True
                    plot_text(frame, "WAKE UP! WAKE UP", 
                              ALM_txt_pos, state_tracker["COLOR"])
                    alarm('alarm.wav')
 
            else:
                state_tracker["start_time"] = time.perf_counter() #time.process_time()
                state_tracker["DROWSY_TIME"] = 0.0
                state_tracker["COLOR"] = green
                state_tracker["play_alarm"] = False
 
            EAR_txt = f"EAR: {round(EAR, 2)}"
            DROWSY_TIME_txt = f"DROWSY: {round(state_tracker['DROWSY_TIME'], 3)} Secs"
            plot_text(frame, EAR_txt, 
                      (10,30), state_tracker["COLOR"])
            plot_text(frame, DROWSY_TIME_txt, 
                      DROWSY_TIME_txt_pos, state_tracker["COLOR"])
 
        else:
            state_tracker["start_time"] = time.perf_counter() #time.process_time()
            state_tracker["DROWSY_TIME"] = 0.0
            state_tracker["COLOR"] = green
            state_tracker["play_alarm"] = False
 
            # frame = cv2.flip(frame, 1)
 
        return frame, state_tracker["play_alarm"]

In [14]:
# webcam = cv2.VideoCapture(0)
webcam = cv2.VideoCapture('d:/drowsiness_video_data/SGA2100300S0042.mp4')

webcam.set(cv2.CAP_PROP_FRAME_WIDTH, 640) #1920, 640
webcam.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) #1080, 480

thresholds = {
    "EAR_THRESH": 0.22, #0.25 (videocapture:0)
    "WAIT_TIME": 0.035, #0.02 (videocapture:0), 0.032
}

if not webcam.isOpened():
    print("Could not open webcam")
    exit()

frame_cnt=0

while webcam.isOpened():
    status, frame = webcam.read()

    if status:
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  #외부 영상 재생 시에 설정

        frame, play_alarm = process(frame, thresholds)
        print(frame_cnt, right_pred, left_pred)

        # cv2.imshow("test", eye_left)
        cv2.imshow("test", frame)
        frame_cnt+=1

    if cv2.waitKey(30) & 0xFF == ord('q'): # 0또는 1의 경우 계속적으로 읽어옴
        break

webcam.release()
cv2.destroyAllWindows()

0 Open_Eyes Open_Eyes
1 Open_Eyes Open_Eyes
2 Open_Eyes Open_Eyes
3 Open_Eyes Open_Eyes
4 Open_Eyes Open_Eyes
5 Open_Eyes Open_Eyes
6 Open_Eyes Open_Eyes
7 Open_Eyes Open_Eyes
8 Open_Eyes Open_Eyes
9 Open_Eyes Open_Eyes
10 Open_Eyes Open_Eyes
11 Open_Eyes Open_Eyes
12 Open_Eyes Open_Eyes
13 Open_Eyes Open_Eyes
14 Open_Eyes Open_Eyes
15 Open_Eyes Open_Eyes
16 Open_Eyes Open_Eyes
17 Open_Eyes Open_Eyes
18 Open_Eyes Open_Eyes
19 Open_Eyes Open_Eyes
20 Open_Eyes Closed_Eyes
21 Open_Eyes Closed_Eyes
22 Open_Eyes Closed_Eyes
23 Open_Eyes Closed_Eyes
24 Open_Eyes Closed_Eyes
25 Open_Eyes Open_Eyes
26 Open_Eyes Closed_Eyes
27 Open_Eyes Open_Eyes
28 Open_Eyes Open_Eyes
29 Open_Eyes Open_Eyes
30 Open_Eyes Open_Eyes
31 Open_Eyes Open_Eyes
32 Closed_Eyes Open_Eyes
33 Open_Eyes Open_Eyes
34 Open_Eyes Closed_Eyes
35 Closed_Eyes Closed_Eyes
36 Closed_Eyes Closed_Eyes
37 Open_Eyes Open_Eyes
38 Open_Eyes Open_Eyes
39 Closed_Eyes Open_Eyes
40 Closed_Eyes Closed_Eyes
41 Closed_Eyes Closed_Eyes
42 Closed_

KeyboardInterrupt: 

In [None]:
# webcam = cv2.VideoCapture('d:/drowsiness_video_data/SGA2100300S0042.mp4')

# webcam.set(cv2.CAP_PROP_FRAME_WIDTH, 640) #1920, 640
# webcam.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) #1080, 480

# if not webcam.isOpened():
#     print("Could not open webcam")
#     exit()
    

# while webcam.isOpened():
#     status, frame = webcam.read()

#     if status:
#         frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
#         frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)  #외부 영상 재생 시에 설정

#         cv2.imshow("test", frame)

#     if cv2.waitKey(30) & 0xFF == ord('q'): # 0또는 1의 경우 계속적으로 읽어옴
#         break

# webcam.release()
# cv2.destroyAllWindows()