In [54]:
import cv2
import dlib
import numpy as np
from imutils import face_utils
from scipy.spatial import distance
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import GradientBoostingClassifier

In [23]:
predictor_path = 'shape_predictor_68_face_landmarks.dat'
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor(predictor_path)

In [24]:
EYE_AR_THRESH = 0.3
EYE_AR_CONSEC_FRAMES = 3
YAWN_THRESH = 0.5  
HEAD_MOVEMENT_THRESH = 20  

In [25]:
def eye_aspect_ratio(eye):
    A = np.linalg.norm(eye[1] - eye[5])
    B = np.linalg.norm(eye[2] - eye[4])
    C = np.linalg.norm(eye[0] - eye[3])
    ear = (A + B) / (2.0 * C)
    return ear

In [26]:
def detect_yawn(shape):
    mouth = shape[48:68]
    A = distance.euclidean(mouth[3], mouth[9])
    B = distance.euclidean(mouth[2], mouth[10])
    C = distance.euclidean(mouth[4], mouth[8])
    D = distance.euclidean(mouth[0], mouth[6])
    ratio = (A + B + C) / (3.0 * D)
    return ratio > YAWN_THRESH

In [30]:
def detect_head_movement(shape, prev_shape):
    if prev_shape is None:
        return 0
    current_head_pos = np.mean(shape[30:36], axis=0) 
    prev_head_pos = np.mean(prev_shape[30:36], axis=0)
    movement = np.linalg.norm(current_head_pos - prev_head_pos)
    return movement


In [31]:
def process_video(video_path, duration=10):
    cap = cv2.VideoCapture(video_path)
    
    if not cap.isOpened():
        print(f"Error: Unable to open video file at {video_path}")
        return {}
    
    blink_count = 0
    consecutive_closed_frames = 0
    open_eyes_durations = []
    closed_eyes_durations = []
    yawning_count = 0
    head_movement_count = 0
    prev_shape = None

    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(duration * fps)
    frame_count = 0

    while cap.isOpened() and frame_count < total_frames:
        ret, frame = cap.read()
        if not ret:
            print("Warning: End of video or error reading frame")
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        rects = detector(gray, 0)

        for rect in rects:
            shape = predictor(gray, rect)
            shape = face_utils.shape_to_np(shape)

            leftEye = shape[face_utils.FACIAL_LANDMARKS_IDXS["left_eye"][0]:face_utils.FACIAL_LANDMARKS_IDXS["left_eye"][1]]
            rightEye = shape[face_utils.FACIAL_LANDMARKS_IDXS["right_eye"][0]:face_utils.FACIAL_LANDMARKS_IDXS["right_eye"][1]]

            leftEAR = eye_aspect_ratio(leftEye)
            rightEAR = eye_aspect_ratio(rightEye)
            ear = (leftEAR + rightEAR) / 2.0

            if ear < EYE_AR_THRESH:
                consecutive_closed_frames += 1
            else:
                if consecutive_closed_frames >= EYE_AR_CONSEC_FRAMES:
                    blink_count += 1
                    closed_eyes_durations.append(consecutive_closed_frames)
                consecutive_closed_frames = 0
                open_eyes_durations.append(frame_count)

            if detect_yawn(shape):
                yawning_count += 1

            head_movement = detect_head_movement(shape, prev_shape)
            if head_movement > HEAD_MOVEMENT_THRESH:
                head_movement_count += 1

            prev_shape = shape

        frame_count += 1

    cap.release()
    cv2.destroyAllWindows()

    open_eye_avg_duration = np.mean(open_eyes_durations) if open_eyes_durations else 0
    closed_eye_avg_duration = np.mean(closed_eyes_durations) if closed_eyes_durations else 0
    eye_closure_ratio = sum(closed_eyes_durations) / frame_count if frame_count > 0 else 0

    return {
        'blink_count': blink_count,
        'closed_eye_avg_duration': closed_eye_avg_duration,
        'open_eye_avg_duration': open_eye_avg_duration,
        'eye_closure_ratio': eye_closure_ratio,
        'yawning_count': yawning_count,
        'head_movement_count': head_movement_count
    }

In [32]:
def process_videos_in_folder(folder_path, label, duration=10):
    results = []
    for filename in os.listdir(folder_path):
        if filename.endswith(".mp4"):
            video_path = os.path.join(folder_path, filename)
            features = process_video(video_path, duration)
            features['label'] = label
            features['filename'] = filename
            results.append(features)
    return results

drowsiness_folder = 'C:/Users/Neomi/Desktop/DL project/project/drowsiness-level/data/drowsiness'
not_drowsiness_folder = 'C:/Users/Neomi/Desktop/DL project/project/drowsiness-level/data/not drowsiness'

drowsiness_results = process_videos_in_folder(drowsiness_folder, label=1)
not_drowsiness_results = process_videos_in_folder(not_drowsiness_folder, label=0)

results_df = pd.DataFrame(drowsiness_results + not_drowsiness_results)

results_df.to_csv('C:/Users/Neomi/Desktop/DL project/project/drowsiness-level/data/driver_fatigue_features.csv', index=False)

In [37]:
results_df = pd.DataFrame(drowsiness_results + not_drowsiness_results)
results_df.to_csv('C:/Users/Neomi/Desktop/DL project/project/drowsiness-level/data/driver_fatigue_features_results.csv', index=False)
drowsiness_results_df = pd.DataFrame(drowsiness_results)
not_drowsiness_results_df = pd.DataFrame(not_drowsiness_results)
drowsiness_results_df.to_csv('C:/Users/Neomi/Desktop/DL project/project/drowsiness-level/data/driver_fatigue_features_drowsiness_results.csv', index=False)
not_drowsiness_results_df.to_csv('C:/Users/Neomi/Desktop/DL project/project/drowsiness-level/data/driver_fatigue_features_not_drowsiness_results.csv', index=False)

In [46]:
data = pd.read_csv('C:/Users/Neomi/Desktop/DL project/project/drowsiness-level/data/driver_fatigue_features_results.csv')
numeric_data = data.select_dtypes(include=['float64', 'int64'])

numeric_data['label'] = data['label']

X = numeric_data.drop('label', axis=1) 
y = numeric_data['label'] 

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

model = RandomForestClassifier(n_estimators=100, random_state=42)

model.fit(X_train, y_train)

y_pred = model.predict(X_test)
print("Accuracy:", accuracy_score(y_test, y_pred))
print("Classification Report:\n", classification_report(y_test, y_pred))
print("Confusion Matrix:\n", confusion_matrix(y_test, y_pred))

Accuracy: 0.6265060240963856
Classification Report:
               precision    recall  f1-score   support

           0       0.63      0.69      0.66       219
           1       0.62      0.56      0.58       196

    accuracy                           0.63       415
   macro avg       0.63      0.62      0.62       415
weighted avg       0.63      0.63      0.62       415

Confusion Matrix:
 [[151  68]
 [ 87 109]]


In [50]:
param_grid = {
    'n_estimators': [100, 200, 300],
    'max_depth': [None, 10, 20, 30],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4]
}

grid_search = GridSearchCV(estimator=model, param_grid=param_grid, cv=5, n_jobs=-1, verbose=2)
grid_search.fit(X_train, y_train)

print("Best Parameters:", grid_search.best_params_)
best_model = grid_search.best_estimator_

y_pred = best_model.predict(X_test)
print("Accuracy:", accuracy_score(y_test, y_pred))
print("Classification Report:\n", classification_report(y_test, y_pred))
print("Confusion Matrix:\n", confusion_matrix(y_test, y_pred))

Fitting 5 folds for each of 108 candidates, totalling 540 fits
Best Parameters: {'max_depth': None, 'min_samples_leaf': 4, 'min_samples_split': 2, 'n_estimators': 100}
Accuracy: 0.6313253012048192
Classification Report:
               precision    recall  f1-score   support

           0       0.64      0.68      0.66       219
           1       0.62      0.57      0.59       196

    accuracy                           0.63       415
   macro avg       0.63      0.63      0.63       415
weighted avg       0.63      0.63      0.63       415

Confusion Matrix:
 [[150  69]
 [ 84 112]]


In [52]:
model = GradientBoostingClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

y_pred = model.predict(X_test)
print("Accuracy:", accuracy_score(y_test, y_pred))
print("Classification Report:\n", classification_report(y_test, y_pred))
print("Confusion Matrix:\n", confusion_matrix(y_test, y_pred))

Accuracy: 0.6409638554216868
Classification Report:
               precision    recall  f1-score   support

           0       0.64      0.73      0.68       219
           1       0.64      0.54      0.59       196

    accuracy                           0.64       415
   macro avg       0.64      0.64      0.63       415
weighted avg       0.64      0.64      0.64       415

Confusion Matrix:
 [[160  59]
 [ 90 106]]
