In [6]:
import torch
import torch.nn as nn
import cv2
import time
import numpy as np

from ultralytics import YOLO
from torchvision import transforms
from module.train import test_binary_classification

# 모델
class driverstatusModel(nn.Module):

    def __init__(self, dropout_rate):
        super().__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(1,32 ,kernel_size=3 , stride=1, padding = 'same' ),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.MaxPool2d(kernel_size=2 , stride=2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(32,64 ,kernel_size=3 , stride=1, padding = 1 ),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.MaxPool2d(kernel_size=2 , stride=2)
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(64,128 ,kernel_size=3 , stride=1, padding = 'same' ),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.MaxPool2d(kernel_size=2 , stride=2)
        )
        self.conv4 = nn.Sequential(
            nn.Conv2d(128,64 ,kernel_size=3 , stride=1, padding = 'same' ),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.MaxPool2d(kernel_size=2 , stride=2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=64*9*9 , out_features=128 ),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.Linear(128,1)
        )

    def forward(self,X):
        out = self.conv1(X)
        out = self.conv2(out)
        out = self.conv3(out)
        out = self.conv4(out)
        out = self.classifier(out)
        return torch.sigmoid(out)

# 모델불러오기
def load_model(load_model_path):
    status_model = torch.load(load_model_path ,  map_location=torch.device("cpu"))
    status_model = status_model.to(device)

    return status_model

# Stopwatch : 초시계
class Stopwatch:
    def __init__(self):
        self.start_time = None
        self.elapsed_time = 0
        self.running = False

    def start(self):
        if not self.running:
            self.start_time = time.time() - self.elapsed_time
            self.running = True

    def stop(self):
        if self.running:
            self.elapsed_time = time.time() - self.start_time
            self.running = False

    def reset(self):
        self.start_time = None
        self.elapsed_time = 0
        self.running = False

    def get_elapsed_time(self):
        if self.running:
            return time.time() - self.start_time
        else:
            return self.elapsed_time

    def __str__(self):
        elapsed = self.get_elapsed_time()
        minutes, seconds = divmod(elapsed, 60)
        return f"{int(minutes):02}:{seconds:05.2f}"

# 얼굴을 감지하는 함수 
def faceDetect():
    stopwatch = Stopwatch()
    
    
    try:
        cam = cv2.VideoCapture(0)
    except:
        print("camera loading error")
        return
    model = YOLO("models/yolov8n-pose.pt")
    while cam.isOpened():
        ret, frame = cam.read()
        if not ret:
            print("연결 종료")
            break

        # 양수: 좌우반전, 0: 상하반전, 음수: 좌우,상하 반전
        frame = cv2.flip(frame, 1)
        # Gray scale로 변환함 => 모델이 Grayscale로 학습되어있음
        frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # 모델로 1 프레임을 받아서 모델로 결과값 반환
        result = model(frame , show=False , verbose = False , save = False)[0]
        try:
            keypoints = result.keypoints
            # 좌 , 우 눈동자 좌표값
            right_eyes = keypoints.xy[0,1]
            left_eyes = keypoints.xy[0,2]

            # 눈동자 좌표에서 임의로 값을 구해줌
            m_right_x = int(right_eyes[0].item()) - 70
            m_right_y = int(right_eyes[1].item()) - 70
            p_right_x = int(right_eyes[0].item()) + 70
            p_right_y = int(right_eyes[1].item()) + 70
            
            m_left_x = int(left_eyes[0].item()) - 70
            m_left_y = int(left_eyes[1].item()) - 70
            p_left_x = int(left_eyes[0].item()) + 70
            p_left_y = int(left_eyes[1].item()) + 70
            
            l_eyes = frame_gray[m_left_y:p_left_y , m_left_x:p_left_x]
            r_eyes = frame_gray[m_right_y:p_right_y , m_right_x:p_right_x]

            # 추론
            tf = transforms.Compose([transforms.ToTensor(),transforms.Resize((150,150)),transforms.Normalize([0.5],[0.5])])
            l_eyes_value = tf(l_eyes).unsqueeze(dim=0)
            l_eyes_value.to('cpu')
            r_eyes_value = tf(r_eyes).unsqueeze(dim=0)
            r_eyes_value.to('cpu')
            status_model.to('cpu')
            result_l_eyes = status_model(l_eyes_value)
            result_r_eyes = status_model(r_eyes_value)
            # print(result_l_eyes , result_r_eyes , frame.shape)
            l_eyes_item , r_eyes_item = result_l_eyes.item() , result_r_eyes.item()

            # 결과
            value = "Good" if result_l_eyes.item() >=0.5 and result_r_eyes.item() >=0.5 else "Sleepy"

            # 화면에 나타낼 값 표시
            if value == "Good":
                cv2.putText(frame, value , (0,50),cv2.FONT_HERSHEY_SIMPLEX, 1,(255,0,0),3,lineType=cv2.LINE_AA)
                cv2.putText(frame, f"{l_eyes_item:0.2f}", (m_left_x,m_left_y), cv2.FONT_HERSHEY_SIMPLEX, 1,(255,0,0),2,lineType=cv2.LINE_AA)
                cv2.putText(frame, f"{r_eyes_item:0.2f}", (m_right_x,m_right_y), cv2.FONT_HERSHEY_SIMPLEX, 1,(255,0,0),2,lineType=cv2.LINE_AA)
                cv2.rectangle(frame, (m_left_x,m_left_y), (p_left_x, p_left_y), (255,0,0), 2)  #사각형 범위
                cv2.rectangle(frame, (m_right_x,m_right_y), (p_right_x, p_right_y), (255,0,0), 2)  #사각형 범위
                stopwatch.stop()
                stopwatch.reset()
                
            elif value == "Sleepy":
                stopwatch.start()  # 초시계 시작
                elapsed_time_str = str(stopwatch)
                elapsed_time = stopwatch.get_elapsed_time() 
                cv2.putText(frame, elapsed_time_str, (150, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
                cv2.putText(frame, f"{l_eyes_item:0.2f}", (m_left_x,m_left_y), cv2.FONT_HERSHEY_SIMPLEX, 1,(0,0,255),2,lineType=cv2.LINE_AA)
                cv2.putText(frame, f"{r_eyes_item:0.2f}", (m_right_x,m_right_y), cv2.FONT_HERSHEY_SIMPLEX, 1,(0,0,255),2,lineType=cv2.LINE_AA)
                cv2.rectangle(frame, (m_left_x,m_left_y), (p_left_x, p_left_y), (0,0,255), 2)  #사각형 범위
                cv2.rectangle(frame, (m_right_x,m_right_y), (p_right_x, p_right_y), (0,0,255), 2)  #사각형 범위
                
                if  elapsed_time <= 2.0:
                    cv2.putText(frame, value , (0,50),cv2.FONT_HERSHEY_SIMPLEX, 1,(0,0,255),2,lineType=cv2.LINE_AA)
                else :
                    cv2.putText(frame, "Warning", (0 , 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
   
        except:
            continue
        cv2.imshow("frame", frame)
        # cv2.imshow("leye",l_eyes)
        # cv2.imshow("reye",r_eyes)
        if cv2.waitKey(1) == 27:
                break
            
    cam.release()
    cv2.waitKey()
    cv2.destroyAllWindows()
    cv2.waitKey(1)
    return value
    
if __name__ == '__main__':

    device = "mps" if torch.backends.mps.is_available() else "cpu"
    print(device)
    load_model_path = "models/driverstatusmodels.pth"
    status_model = load_model(load_model_path)
    value = faceDetect()
    print(f"운전자의 상태 {value}")
    

mps
운전자의 상태 Good
