In [None]:
#!pip install ultralytics
#!pip install numpy==1.23.5

In [None]:
import os

import numpy as np
from ultralytics import YOLO
from sklearn.metrics import accuracy_score, recall_score 

import itertools
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt

#### 시퀀스 설정

In [None]:
folder_path1 = './monitoring/images_sleep'
folder_path2 = './monitoring/images_phone'
folder_path3 = './monitoring/images_normal'

image_paths1 = sorted([os.path.join(folder_path1, f) for f in os.listdir(folder_path1) if f.startswith('SGA') and f.endswith('.jpg')])
image_paths2 = sorted([os.path.join(folder_path2, f) for f in os.listdir(folder_path2) if f.startswith('SGA') and f.endswith('.jpg')])
image_paths3 = sorted([os.path.join(folder_path3, f) for f in os.listdir(folder_path3) if f.startswith('SGA') and f.endswith('.jpg')])

In [None]:
videos = []

In [None]:
# 주어진 이미지 경로들을 기반으로 비디오 시퀀스 리스트를 생성
def making_video_list(videos, image_paths):
    current_video_sequences = []

    for image_path in image_paths:
        video_id = image_path.split('IMG')[0]

        # 현재 비디오 시퀀스와 비디오 ID가 동일한 경우 현재 시퀀스에 이미지 경로 추가
        if current_video_sequences and video_id == current_video_sequences[0].split('IMG')[0]:
            current_video_sequences.append(image_path)
        else:
            
            # 현재 비디오 시퀀스가 존재하면 이전 시퀀스를 비디오 리스트에 추가
            if current_video_sequences:
                videos.append(current_video_sequences)
            current_video_sequences = [image_path]

    # 마지막 비디오 시퀀스가 존재하면 비디오 리스트에 추가
    if current_video_sequences:
        videos.append(current_video_sequences)
    
    return videos

In [None]:
videos_sleep = making_video_list(videos, image_paths1)
videos_phone = making_video_list(videos, image_paths2)
videos_all = making_video_list(videos, image_paths3)

In [None]:
#주어진 비디오 시퀀스에 대해 특정 키워드가 있는지 탐색하는 함수 제작

def making_answer_list(videos):
    answer = []
    keywords = ['sleep', 'phone']

    for video in videos:
        found = False
        for image_path in video:
            for keyword in keywords:
                #이미지 경로에 키워드가 포함되어 있으면 발견으로 표시하고 종료
                if keyword in image_path:
                    found = True
                    break
            if found:
                break

        if found:
            answer.append(1)
        else:
            answer.append(0)

    return answer

In [None]:
result = making_answer_list(videos) # 영상 라벨링

In [None]:
def find_index(data, target):
    res = []
    lis = data
    while True:
        try:
            res.append(lis.index(target) + (res[-1]+1 if len(res)!=0 else 0))
            lis = data[res[-1]+1:]
        except:
            break     
    return res

In [None]:
abnormal_index = find_index(result,1)
normal_index = find_index(result,0)

In [None]:
use_index = abnormal_index[:1000]
use_index = use_index + abnormal_index[-1000:]
use_index = use_index + normal_index[:2000]

In [None]:
videos = [videos[index] for index in use_index]
result = [result[index] for index in use_index]

#### 이상행동 탐지 코드

In [None]:
pred = [0 for i in range(len(videos))] # 예측 결과 저장
phone_pred = [0 for i in range(len(videos))]

In [None]:
# print(model1.names) # {0: 'Leye', 1: 'Reye', 2: 'Phone'}
# print(model2.names) # {0: 'body', 1: 'face'}

In [None]:
model1 = YOLO('/home/work/VisionAI/subeen/runs/detect/train17/weights/best.pt')
model2 = YOLO('/home/work/VisionAI/monitoring/runs/detect/train13(epoch10)/weights/best.pt')

In [None]:
phone_ratio_model1 = [0 for i in range(len(videos))] 
sleep_ratio_model1 = [0 for i in range(len(videos))] 
sleep_ratio_model2_body = [0 for i in range(len(videos))] 
sleep_ratio_model2_face = [0 for i in range(len(videos))]

In [None]:
for i in range(len(videos)) :
    
    seq_lst = videos[i]
    
    phone_model1 = []  # 모델 1에서 탐지된 핸드폰의 확률 리스트 초기화
    sleep_model1 = []  # 모델 1에서 감은 눈의 확률 리스트 초기화
    sleep_model2_body = []  # 모델 2에서 감은 눈 (Body)의 BoundingBox 리스트 초기화
    sleep_model2_face = []  # 모델 2에서 감은 눈 (Face)의 BoundingBox 리스트 초기화

    # 각 비디오 시퀀스에 대한 졸음 예측 수행
    for seq_img in seq_lst : 

        # model 1 실행 코드 (epoch 10 기준)
        model1 = YOLO('/home/work/VisionAI/subeen/runs/detect/train17/weights/best.pt')
        model1_result = model1.predict(source = seq_img)

        model1_pred_class = model1_result[0].boxes.cls.cpu().detach().numpy()
        model1_pred_probs = model1_result[0].boxes.conf.cpu().detach().numpy()
        model1_dict = dict(zip(model1_pred_class, model1_pred_probs))

        # 핸드폰 탐지한 경우
        if 2 in model1_dict.keys() :
            phone_pred[i] = phone_model1.append(model1_dict[2])
            del model1_dict[2]
        
        # 감은 눈에 대한 확률
        if 2 == len(list(model1_dict.values())):
            sleep_model1.append(model1_dict[0] * model1_dict[1])
        elif 1 == len(list(model1_dict.values())):
            sleep_model1.append(list(model1_dict.values())[0]/2)
        else :
            sleep_model1.append(0)

        # model 2 실행 코드 (epoch 10 기준)
        model2 = YOLO('/home/work/VisionAI/monitoring/runs/detect/train13(epoch10)/weights/best.pt')
        model2_result = model2.predict(source = seq_img)

        model2_pred_class = model2_result[0].boxes.cls.cpu().detach().numpy()
        model2_pred_bbox = model2_result[0].boxes.xywh.cpu().detach().numpy()
        model2_dict = dict(zip(model2_pred_class, model2_pred_bbox))
        
        # 모델 2에서 감은 눈 (Body)와 감은 눈 (Face)의 BoundingBox 정보 저장
        sleep_model2_body.append(model2_dict[0])
        sleep_model2_face.append(model2_dict[1])
    
    # 결과 리스트에 각 모델의 결과 저장
    phone_ratio_model1[i] = phone_model1
    sleep_ratio_model1[i] = sleep_model1
    sleep_ratio_model2_body[i] = sleep_model2_body
    sleep_ratio_model2_face[i] = sleep_model2_face

In [None]:
len(phone_ratio_model1), len(sleep_ratio_model1), len(sleep_ratio_model2_body), len(sleep_ratio_model2_face)

#### 오차 행렬 출력 및 plotting

In [None]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
#     plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes)
    plt.yticks(tick_marks, classes)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
#         print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

#     print(cm)
    
    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
# threshold 설정
phone_threshold = 0.85
body_face_standard = 0.2
sleep_ratio_threshold = 0.15
sleep_threshold = 0.15

sleep_pred = [0 for i in range(len(videos))] # 예측 결과 저장

for i in range(len(videos)) :
    
    seq_lst = videos[i]
    sleep_ratio_lst = []

    for j in range(len(seq_lst)) :
        
        sleep_ratio = 0 # 해당 이미지의 졸음지수 초기화
        
        try :
            sleep_ratio = sleep_ratio_model1[i][j]
            # Body BoundingBox의 중간 지점에 해당하는 Y 좌표 계산
            body_guidpoint_y = sleep_ratio_model2_body[i][j][1] + (body_face_standard * sleep_ratio_model2_body[i][j][3])
            # Face BoundingBox의 비율 계산
            face_bbox_ratio =  (sleep_ratio_model2_face[i][j][1] + sleep_ratio_model2_face[i][j][3] - body_guidpoint_y) / sleep_ratio_model2_face[i][j][3]
            # 졸음 지수를 모델 1과 Face BoundingBox 비율의 평균으로 업데이트
            sleep_ratio = (sleep_ratio_model1[i][j] + face_bbox_ratio) / 2
            
        except :
        
            try :
                # Body BoundingBox의 중간 지점에 해당하는 Y 좌표 계산
                body_guidpoint_y = sleep_ratio_model2_body[i][j][1] + (body_face_standard * sleep_ratio_model2_body[i][j][3])
                # Face BoundingBox의 비율 계산
                face_bbox_ratio =  (sleep_ratio_model2_face[i][j][1] + sleep_ratio_model2_face[i][j][3] - body_guidpoint_y) / sleep_ratio_model2_face[i][j][3]
                # 졸음 지수를 Face BoundingBox 비율로 업데이트
                sleep_ratio = face_bbox_ratio
                # 졸음 비율이 기준보다 높으면 리스트에 1 추가
                if sleep_ratio >= sleep_ratio_threshold :
                    sleep_ratio_lst.append(1)
                
                continue
                
            except :
                continue
        
        # 졸음 비율이 기준보다 높으면 리스트에 1 추가
        if sleep_ratio >= sleep_ratio_threshold :
            sleep_ratio_lst.append(1)

    #졸음 비율이 기준보다 높은 경우 1로 설정
    if ((sum(sleep_ratio_lst) / len(seq_lst)) >= sleep_threshold) :
        sleep_pred[i] = 1


# 핸드폰 예측 결과 업데이트
final_pred = sleep_pred.copy()
for i in range(len(phone_ratio_model1)) :
    if len(phone_ratio_model1[i]) > 0 :
        if max(phone_ratio_model1[i]) >= phone_threshold :
            final_pred[i] = 1

# Confusion Matrix 계산
cnf_matrix = confusion_matrix(result, final_pred)
np.set_printoptions(precision=2)

# Plot normalized confusion matrix
print('phone_threshold :', phone_threshold)
print('body_face_standard :', body_face_standard)
print('sleep_ratio_threshold :', sleep_ratio_threshold)
print('sleep_threshold :', sleep_threshold)
print('Accaracy :', accuracy_score(result, final_pred))
print('Recall :', recall_score(result, final_pred))
print('-----')

plt.figure()
plot_confusion_matrix(cnf_matrix, classes=['0', '1'], normalize=True,
                      title='Normalized confusion matrix')

plt.show()