<a href="https://colab.research.google.com/github/jp3256/blink_detector/blob/main/blink_detector.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# connect to google drive folder
from google.colab import drive
import glob
drive.mount('/content/drive')
task1_directory = '/content/drive/My Drive/Applied Research/data/task1/final_study/'
task2_directory = '/content/drive/My Drive/Applied Research/data/task2/final_study/'
task1_output_directory = '/content/drive/My Drive/Applied Research/results/blink_detector_results/task1/results_with_timestamp_wait_3frames_with_ear/'
task2_output_directory = '/content/drive/My Drive/Applied Research/results/blink_detector_results/task2/results_with_timestamp_wait_3frames_with_ear/'
landmark_predictor_directory = '/content/drive/My Drive/Applied Research/data/shape_predictor_68_face_landmarks.dat'

# import the necessary packages
from scipy.spatial import distance as dist
from imutils import face_utils
import numpy as np
import pandas as pd
import imutils
import time
import dlib
import cv2
import re

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
def eye_aspect_ratio(eye):
    '''
    description: 
        - calculate eye aspect ratio
    input:
        - eye: x,y-coordinates of detected eye 
    output:
        - eye_aspect_ratio: float; eye aspect ratio
    '''
    # vertical distance between eye landmark coordinates
    vertical1 = dist.euclidean(eye[1], eye[5])
    vertical2 = dist.euclidean(eye[2], eye[4])

    # horizontal distance between eye landmark coordinates
    horizontal = dist.euclidean(eye[0], eye[3])

    # compute the eye aspect ratio
    eye_aspect_ratio = (vertical1 + vertical2) / (2.0 * horizontal)

    return eye_aspect_ratio

In [None]:
# reference: https://www.pyimagesearch.com/2017/04/24/eye-blink-detection-opencv-python-dlib/
def get_df_blinks(video_path, ear_threshold=0.25, num_consec_frames=3):
    '''
    description: 
        - get time series data on blinks
    inputs:
        - video_path: str; path to video file
        - ear_threshold: float; eye aspect ratio threshold for detecting blinks
        - num_consec_frames: int; required number of consecutive frames below 
          EAR threshold for blink detection
    output:
        - df_blinks: dataframe; 
    '''

    # capture video frames
    vs = cv2.VideoCapture(video_path)
    fps = vs.get(cv2.CAP_PROP_FPS) # frames per sec: ~approx. 29
    print(f'frames per second: {fps}')
    time.sleep(1.0)

    # initialize values
    timestamps = []
    blinks = []
    ears = []
    frame_count = 0
    just_blinked = False
    wait_frames = 0

    # loop over frames
    while (vs.isOpened()):
        # check if there are more frames to process
        frame_exists, frame = vs.read()
        if frame_exists: 
            # record timestamp
            timestamps.append(vs.get(cv2.CAP_PROP_POS_MSEC))
            blinks.append(0)
            ears.append(None)
            # resize frame
            frame = imutils.resize(frame, width=450)
            # convert to grayscale
            grayframe = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            # detect face
            face_rectangle = face_detector(grayframe, 0)
            for rect in face_rectangle:
                # detect facial landmarks
                shape = landmark_predictor(grayframe, rect)
                # convert facial landmark (x, y)-coordinates to numpy array
                shape = face_utils.shape_to_np(shape)
                # get left and right eye coordinates
                left_eye = shape[l_start:l_end]
                right_eye = shape[r_start:r_end]
                # calculate EAR for both eyes and average them
                left_ear = eye_aspect_ratio(left_eye)
                right_ear = eye_aspect_ratio(right_eye)
                average_ear = (left_ear + right_ear) / 2.0
                ears[-1] = average_ear
                # if EAR < blink threshold, increment frame count
                if average_ear < ear_threshold:
                    # frame_count += 1
                    # if blink was just recorded, wait num_consec_frames to record next one
                    if just_blinked==True:
                        wait_frames += 1
                        if wait_frames > num_consec_frames:
                            just_blinked = False
                            wait_frames = 0
                    else:
                        frame_count += 1
                else:
                    # if eyes were closed for num_consec_frames, record blink
                    if frame_count >= num_consec_frames:
                        blinks[-1] = 1
                        just_blinked = True
                    # reset frame counter
                    frame_count = 0
        else:
            break

    cv2.destroyAllWindows()
    vs.release()

    df_blinks = pd.DataFrame()
    df_blinks['timestamp'] = timestamps
    df_blinks['blink'] = blinks
    df_blinks['ear'] = ears
    df_blinks['minute'] = np.floor(df_blinks.timestamp/60000)
    df_blinks['second'] = np.floor(df_blinks.timestamp/1000)
    df_blinks['five_second_step'] = np.floor(df_blinks.second/5)
    df_blinks['ten_second_step'] = np.floor(df_blinks.second/10)
    df_blinks['thirty_second_step'] = np.floor(df_blinks.second/30)

    return df_blinks

In [None]:
if __name__ == '__main__':
    # initialize dlib face detector (HOG-based) 
    face_detector = dlib.get_frontal_face_detector()

    # load pretrained dlib facial landmark predictor
    landmark_predictor = dlib.shape_predictor(landmark_predictor_directory)

    # get indexes of facial landmarks for left and right eye
    (l_start, l_end) = face_utils.FACIAL_LANDMARKS_IDXS["left_eye"]
    (r_start, r_end) = face_utils.FACIAL_LANDMARKS_IDXS["right_eye"]

    # list file directory for each task
    file_directories = [task1_directory, task2_directory]

    # loop through batches
    start_time = time.time()
    for file_directory in file_directories:
        if file_directory == task1_directory:
            output_directory = task1_output_directory
        elif file_directory == task2_directory:
            output_directory = task2_output_directory
        print(f'Current file directory: {file_directory}')
        # compile results
        i = 0
        n_files = len(glob.glob(file_directory+'*.mp4'))
        for file in glob.glob(file_directory+'*.mp4'):
            group_number = re.search(r'(?<=final_study)/([0-9]+)', file).group(1)
            role = re.search(r'(?<=final_study)/([0-9]+)(-*)([A-Za-z]+)(-*)([A-Za-z]+)(-*)([A-Za-z]+)*', file).group(7)
            if type(role)==str:
                role = role.lower()
            # run blink detector if results don't already exist
            if not glob.glob(output_directory + f'{group_number}_{role}_blink_results.csv'):
                df_blinks = get_df_blinks(file, ear_threshold=0.25, num_consec_frames=3)
                df_blinks.to_csv(output_directory + f'{group_number}_{role}_blink_results.csv', index=False)
            i += 1
            print(f'Completed processing {i} of {n_files} files in directory', flush=True)

    # print time elapsed
    time_elapsed = time.time() - start_time
    print('Time elapsed (hours): ', time_elapsed/3600)
    print('Time elapsed (minutes): ', time_elapsed/60)

Current file directory: /content/drive/My Drive/Applied Research/data/task1/final_study/
Completed processing 1 of 293 files in directory
Completed processing 2 of 293 files in directory
Completed processing 3 of 293 files in directory
Completed processing 4 of 293 files in directory
Completed processing 5 of 293 files in directory
Completed processing 6 of 293 files in directory
Completed processing 7 of 293 files in directory
Completed processing 8 of 293 files in directory
Completed processing 9 of 293 files in directory
Completed processing 10 of 293 files in directory
Completed processing 11 of 293 files in directory
Completed processing 12 of 293 files in directory
Completed processing 13 of 293 files in directory
Completed processing 14 of 293 files in directory
Completed processing 15 of 293 files in directory
Completed processing 16 of 293 files in directory
Completed processing 17 of 293 files in directory
Completed processing 18 of 293 files in directory
Completed processing