# Real-Time Driver Drowsiness Detection System

This script performs live drowsiness detection using a combination of eye aspect ratio (EAR) and a deep learning model:

- **Model Setup**: Loads a pretrained MobileNetV2-based model (DrowsyNet) to classify drowsiness from facial crops.
- **Face and Landmark Detection**: Uses MediaPipe Face Mesh to extract detailed facial landmarks in real-time from webcam input.
- **Eye Aspect Ratio (EAR)**: Calculates EAR from specific eye landmarks to detect eye closure as a simple heuristic.
- **Drowsiness Scoring**: Maintains rolling windows for EAR-based and CNN-based detections to smooth predictions over time.
- **Alert Logic**:
  - When sustained signs of drowsiness (eye closure or CNN prediction) are detected for over 3 seconds, a first-level alert ("Take a short break!") is shown.
  - If drowsiness persists beyond 6 seconds, a serious alert ("Pull Over Immediately!") is triggered.
- **User Interface**: Real-time video feed shows the detection status with colored text and alerts.
- **Controls**: Press 'q' to quit the application.

This hybrid approach combining classical EAR metrics with a CNN model improves robustness in detecting driver fatigue and enhances road safety by timely warnings.


In [2]:
import time
import cv2
import numpy as np
import torch
import torch.nn as nn
from torchvision import transforms, models
from collections import deque
import mediapipe as mp


class DrowsyNet(nn.Module):
    def __init__(self, dropout_rate=0.3):
        super().__init__()
        base = models.mobilenet_v2(pretrained=True)
        # Freeze all pretrained layers to avoid training them
        for p in base.parameters():
            p.requires_grad = False
        self.features = base.features
        self.pool     = nn.AdaptiveAvgPool2d((1,1))
        self.dropout  = nn.Dropout(dropout_rate)
        self.fc       = nn.Linear(base.last_channel, 2)

    def forward(self, x):
        x = self.features(x)
        x = self.pool(x).view(x.size(0), -1)
        x = self.dropout(x)
        return self.fc(x)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load pretrained model weights and set to evaluation mode
model = DrowsyNet().to(device)
model.load_state_dict(torch.load("best_drowsy_model.pth", map_location=device))
model.eval()

# Initialize MediaPipe Face Mesh for facial landmark detection
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1,
                                   refine_landmarks=True, min_detection_confidence=0.5)

# Define image transformation pipeline for model input
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Calculate Eye Aspect Ratio (EAR) to estimate eye openness
def get_eye_ratio(landmarks, left=True):
    if left:
        ids = [362, 385, 387, 263, 373, 380]
    else:
        ids = [33, 160, 158, 133, 153, 144]

    p = [np.array([landmarks[i].x, landmarks[i].y]) for i in ids]
    A = np.linalg.norm(p[1] - p[5])
    B = np.linalg.norm(p[2] - p[4])
    C = np.linalg.norm(p[0] - p[3])
    ear = (A + B) / (2.0 * C)
    return ear

EAR_THRESH = 0.25
CNN_THRESH = 0.6
FPS = 30
SUSTAIN_FRAMES = int(FPS * 0.5)

ear_win = deque(maxlen=SUSTAIN_FRAMES)
cnn_win = deque(maxlen=SUSTAIN_FRAMES)

eyes_closed_start = None
first_warn = False
second_warn = False

last_warning_time = 0
active_warning_text = None
alert_level = 0  # 0: no alert, 1: first alert given

# Open webcam for real-time video capture
cap = cv2.VideoCapture(0)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Convert frame to RGB for MediaPipe processing
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    result = face_mesh.process(rgb)
    now = time.time()

    ear_flag = False
    cnn_flag = False

    # If face landmarks detected, process eyes and run CNN prediction
    if result.multi_face_landmarks:
        landmarks = result.multi_face_landmarks[0].landmark
        left_ear = get_eye_ratio(landmarks, left=True)
        right_ear = get_eye_ratio(landmarks, left=False)
        ear_val = (left_ear + right_ear) / 2.0
        ear_flag = ear_val < EAR_THRESH
        ear_win.append(1 if ear_flag else 0)

        # Crop face region for CNN input based on landmarks
        h, w, _ = frame.shape
        x_coords = [int(landmarks[i].x * w) for i in range(33, 133)]
        y_coords = [int(landmarks[i].y * h) for i in range(33, 133)]
        x1, x2 = max(min(x_coords)-10, 0), min(max(x_coords)+10, w)
        y1, y2 = max(min(y_coords)-10, 0), min(max(y_coords)+10, h)
        face_crop = frame[y1:y2, x1:x2]

        if face_crop.size:
            inp = transform(face_crop).unsqueeze(0).to(device)
            with torch.no_grad():
                out = model(inp)
                probs = torch.softmax(out, 1)
            conf, pred = probs.max(1)
            cnn_flag = (pred.item() == 1) and (conf.item() > CNN_THRESH)
            cnn_win.append(1 if cnn_flag else 0)

    # Combine EAR and CNN detections over sustained frames
    combined_detect = sum(ear_win) >= SUSTAIN_FRAMES or sum(cnn_win) >= SUSTAIN_FRAMES

    if combined_detect:
        if eyes_closed_start is None:
            eyes_closed_start = now
    else:
        if eyes_closed_start is not None:
            # If eyes closed longer than 3 seconds and first alert given, escalate alert
            if now - eyes_closed_start >= 3 and alert_level == 1:
                alert_level = 2  # Second alert condition met
            else:
                alert_level = 0
        eyes_closed_start = None
        first_warn = False
        second_warn = False
        active_warning_text = None

    # Set alert messages based on duration of eyes closed
    if eyes_closed_start:
        dur = now - eyes_closed_start
        if dur >= 6 and not second_warn:
            active_warning_text = "!!! SERIOUS ALERT: Pull Over Immediately !!!"
            last_warning_time = now
            second_warn = True
            alert_level = 2
        elif dur >= 3 and not first_warn:
            active_warning_text = "Take a short break!"
            last_warning_time = now
            first_warn = True
            alert_level = 1

    # Display alert message on screen for 3 seconds
    if active_warning_text and now - last_warning_time < 3:
        cv2.putText(frame, active_warning_text, (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,165,255), 2)

    status = 'normal'
    if eyes_closed_start and (now - eyes_closed_start) >= 1:
        status = 'DROWSY'
    elif sum(cnn_win) >= SUSTAIN_FRAMES:
        status = 'DROWSY'

    color = (0,0,255) if status=='DROWSY' else (0,255,0)
    cv2.putText(frame, f"Final: {status}", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)

    cv2.imshow("MediaPipe Drowsiness Detection", frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()