In [1]:
import cv2
import dlib
import torch
import torch.nn as nn
import torch.nn.functional as F 
from torchvision import transforms
import matplotlib.pyplot as plt

import numpy as np
from PIL import Image, ImageOps

import math
import time
import os

In [2]:
class EyeClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 16, 3)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(16, 32, 3)
        self.fc1 = nn.Linear(6*6*32, 256)
        self.fc2 = nn.Linear(256, 2)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 6*6*32)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [3]:
# Load pre-trained face detector
model_file = "./Pretrained Detectors/res10_300x300_ssd_iter_140000.caffemodel"
config_file = "./Pretrained Detectors/deploy.prototxt"
face_model = cv2.dnn.readNetFromCaffe(config_file, model_file)

# Load pre-trained landmark predictor
predictor = dlib.shape_predictor("./Pretrained Detectors/shape_predictor_68_face_landmarks.dat")

# Load CNN eye classifier
device = torch.device('cpu')
eye_model = EyeClassifier()
#eye_model.load_state_dict(torch.load("./Saved Models/model2.pt", map_location = device))
eye_model.load_state_dict(torch.load("./Saved Models/model_DataAug3.pt", map_location = device))
eye_model.eval()

EyeClassifier(
  (conv1): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1))
  (fc1): Linear(in_features=1152, out_features=256, bias=True)
  (fc2): Linear(in_features=256, out_features=2, bias=True)
)

In [4]:
# Locates bounding box for a single face
def detect_face(img):
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)     
    blob = cv2.dnn.blobFromImage(cv2.resize(img, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0))

    face_model.setInput(blob)
    detections = face_model.forward()

    (x1, y1, x2, y2) = 0, 0, 0, 0
    max_confidence = 0

    for i in range(detections.shape[2]):                          
        confidence = detections[0, 0, i, 2]

        if confidence > 0.5 and confidence > max_confidence:      # Only considers predictions with > 0.5 confidence
            (h, w) = img.shape[:2]
            x1 = int(detections[0, 0, i, 3] * w)
            y1 = int(detections[0, 0, i, 4] * h)
            x2 = int(detections[0, 0, i, 5] * w)
            y2 = int(detections[0, 0, i, 6] * h)

            max_confidence = confidence                           # If multiple faces are detected, only return the one with highest confidence

    return dlib.rectangle(x1, y1, x2, y2), max_confidence


# Locates bounding box for a single eye
def detect_eye(img, face):
    landmarks = predictor(img, face)

    if landmarks.num_parts == 0:
        return (0, 0, 0, 0), False
    
    """ Below is some random math I came up with to turn LEFT eye landmarks into a square box, feel free to change"""
    x1 = landmarks.part(17).x                   
    x2 = landmarks.part(21).x
    d = abs(x2-x1)
    k = d * 0.15

    x1 = x1 - int(k/2)
    x2 = x2 + int(k/2)
    y1 = landmarks.part(19).y - int(k/2)
    y2 = y1 + int(d+k)

    return (x1, y1, x2, y2), True

In [5]:
# Prepares an image for CNN eye classifier
def preprocess(img):
    t = transforms.Compose([transforms.Resize([32, 32]), 
                            transforms.ToTensor()]) 
                            
    img = Image.fromarray(img).convert("L")
    img = ImageOps.equalize(img)
    img = t(img)

    return img

# Predicts eye state given a single 1x24x24 tensor
def predict_eye_state(img):
    outputs = eye_model(img.unsqueeze(0))
    prob = F.softmax(outputs, dim = 1)
    pred = outputs.argmax(dim = 1).item()

    #print(f"Probabilities: ({prob[0][0]}, {prob[0][1]})")
    #print("Prediction:", pred)
    
    return pred

In [6]:
def _map(x, in_min, in_max, out_min, out_max):
    return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min

def perclos_to_kss(x):
    if x <= 0.075:
        return int(_map(x, 0, 0.075, 1, 5))
    if x <= 0.15:
        return int(_map(x, 0.075, 0.15, 6, 7))
    else:
        return int(_map(x, 0.15, 1, 8, 9))

def kss_def(x):
    if x <= 5:
        return "Low drowsiness"
    elif x <= 7:
        return "Moderate drowsiness"
    else:
        return "Severe drowsiness"

In [14]:
def run(video_path = None, kss = None):
    
    # If video path is not provided, use webcam
    if video_path:
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
    else:
        cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
        fps = 30                                    # assume 30fps for webcam

    sampling_t = 60                                 # sampling interval in seconds
    sampling_f = int(sampling_t * fps)              # sampling interval in frames

    # PERCLOS variables
    samples = []
    num_closed = 0
    perclos = -1
    pred = -1
    counter = 0
    t = 0

    while cap.isOpened():
        ret, frame = cap.read()     # return status and image
        if not ret:
            print("Can't retreive frame")
            break

        #time.sleep(0.1)             # enable to see blinks clearly

        # Face detection
        face, confidence = detect_face(frame)

        if confidence > 0:
            # Draw face visual
            cv2.rectangle(frame, (face.left(), face.top()), (face.right(), face.bottom()), (255, 0, 0), 2)

            # Eye detection
            (x1, y1, x2, y2), eye_found = detect_eye(frame, face)

            if eye_found:
                # Eye state classification
                eye = frame[y1:y2, x1:x2]
                eye = preprocess(eye)
                eye_state = predict_eye_state(eye)

                # Circular buffer to keep track of rolling predictions and PERCLOS values over sampling interval
                if(eye_state == 0):
                    num_closed += 1

                if counter < sampling_f:
                    samples.append(eye_state)
                else:
                    num_closed -= 1 - samples[counter % sampling_f]
                    samples[counter % sampling_f] = eye_state
                    perclos = num_closed/sampling_f
                    pred = perclos_to_kss(perclos)

                # Draw eye visual
                if(eye_state == 0):
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 1)
                    cv2.putText(frame, "Closed", (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
                else:
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 1)
                    cv2.putText(frame, "Open", (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

                counter += 1
        
        t += 1/fps

        # Draw visuals
        cv2.putText(frame, ("Self Assessment: {} - {}".format(str(kss), kss_def(kss))) if kss else "", (0, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1)
        cv2.putText(frame, "PERCLOS: " + ("{:.1%}".format(perclos) if perclos >= 0 else "-"), (0, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1)
        cv2.putText(frame, "Prediction: " + ("{} - {}".format(str(pred), kss_def(pred)) if perclos >= 0 else "-"), (0, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1)
        cv2.putText(frame, "Time: " + str(math.floor(t)), (0, frame.shape[0] - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1)
        
        # Display frame
        cv2.imshow("img", frame)

        # Exit window using "q" key
        if cv2.waitKey(1) == ord("q"):
            break
        
    cap.release()
    cv2.destroyAllWindows()

In [24]:
#run()
run("./Datasets/X/04/0.mp4", kss = 1)
#run("./Datasets/DROZY/videos_i8/1-3.mp4", kss = 7)