<a href="https://colab.research.google.com/github/neerajguleria1/driver-drowsiness-detection/blob/main/Untitled20.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
%%writefile explainability.py

class ExplainabilityEngine:
    """
    Explainability Engine

    Purpose:
    - Convert raw inputs + temporal signals into human-readable explanations
    - Fully rule-based (no ML, no black box)
    - Deterministic and auditable
    """

    def explain(self, df, temporal_result):
        explanations = []

        fatigue = df["Fatigue"].iloc[0]
        alertness = df["Alertness"].iloc[0]
        heart_rate = df["Heart_rate"].iloc[0]
        speed = df["Speed"].iloc[0]


        if fatigue >= 7:
            explanations.append({
                "feature": "Fatigue",
                "value": fatigue,
                "threshold": ">= 7",
                "severity": "HIGH",
                "reason": "High fatigue reduces reaction time and attention"
            })

        if alertness <= 0.45:
            explanations.append({
                "feature": "Alertness",
                "value": alertness,
                "threshold": "<= 0.45",
                "severity": "HIGH",
                "reason": "Low alertness strongly correlates with drowsiness"
            })

        if heart_rate >= 105:
            explanations.append({
                "feature": "Heart_rate",
                "value": heart_rate,
                "threshold": ">= 105",
                "severity": "MEDIUM",
                "reason": "Elevated heart rate indicates physiological stress"
            })

        if speed <= 60:
            explanations.append({
                "feature": "Speed",
                "value": speed,
                "threshold": "<= 60",
                "severity": "MEDIUM",
                "reason": "Reduced driving speed may indicate loss of focus"
            })

        if temporal_result.get("trend") == "RISING":
            explanations.append({
                "feature": "RiskTrend",
                "value": "RISING",
                "threshold": "RISING",
                "severity": "HIGH",
                "reason": "Risk score has been consistently increasing over time"
            })

        if not explanations:
            explanations.append({
                "feature": "System",
                "value": "Stable",
                "threshold": "N/A",
                "severity": "LOW",
                "reason": "No abnormal driver behavior detected"
            })

        severity_order = {
            "HIGH": 0,
            "MEDIUM": 1,
            "LOW": 2
        }

        explanations.sort(key=lambda x: severity_order[x["severity"]])

        return explanations


Writing explainability.py


In [3]:
%%writefile cognitive_engine.py

"""
Cognitive Risk Engine

Purpose:
- Convert ML drowsiness probability into an explainable risk score
- Fuse ML output with physiological + behavioral signals
- Produce stable, decision-ready risk values
"""

import numpy as np
import pandas as pd


# ---- Design constants (no magic numbers) ----
HR_BASELINE = 90
HR_RANGE = 30
ALERTNESS_DROP_RANGE = 0.3


class CognitiveRiskEngine:
    REQUIRED_COLUMNS = [
        "Fatigue",
        "Alertness",
        "prev_alertness",
        "Heart_rate",
        "Speed",
        "speed_change"
    ]

    def __init__(self):
        self.prev_risk = 0.0

    def compute(self, df: pd.DataFrame, ml_prob: float) -> dict:
        missing = set(self.REQUIRED_COLUMNS) - set(df.columns)
        if missing:
            raise ValueError(f"Missing required input columns: {missing}")

        fatigue = df["Fatigue"].iloc[0]
        alertness = df["Alertness"].iloc[0]
        prev_alertness = df["prev_alertness"].iloc[0]
        heart_rate = df["Heart_rate"].iloc[0]
        speed = df["Speed"].iloc[0]
        speed_change = df["speed_change"].iloc[0]

        fatigue_score = np.clip(fatigue / 10, 0, 1)
        alertness_low = np.clip(1 - alertness, 0, 1)

        alertness_drop = np.clip(
            (prev_alertness - alertness) / ALERTNESS_DROP_RANGE,
            0,
            1
        )

        hr_stress = np.clip(
            (heart_rate - HR_BASELINE) / HR_RANGE,
            0,
            1
        )

        speed_instability = np.clip(speed_change / 30, 0, 1)

        raw_risk = (
            0.35 * ml_prob +
            0.20 * fatigue_score +
            0.15 * alertness_low +
            0.10 * alertness_drop +
            0.10 * hr_stress +
            0.10 * speed_instability
        )

        smoothed_risk = 0.7 * self.prev_risk + 0.3 * raw_risk
        self.prev_risk = smoothed_risk

        risk_score = int(np.clip(smoothed_risk * 100, 0, 100))

        if risk_score < 30:
            state = "SAFE"
            action = "No alert"
        elif risk_score < 60:
            state = "CAUTION"
            action = "Audio warning"
        else:
            state = "CRITICAL"
            action = "Audio + Visual + Haptic alert"

        return {
            "risk_score": risk_score,
            "state": state,
            "recommended_action": action,
            "explanation": {
                "ml_probability": round(ml_prob, 2),
                "fatigue_score": round(fatigue_score, 2),
                "alertness_low": round(alertness_low, 2),
                "alertness_drop": round(alertness_drop, 2),
                "hr_stress": round(hr_stress, 2),
                "speed_instability": round(speed_instability, 2)
            }
        }


Writing cognitive_engine.py


In [4]:
%%writefile temporal_risk.py

import numpy as np
from collections import deque

TREND_SLOPE_THRESHOLD = 2

class TemporalRiskTracker:
    """
    Temporal Risk Tracker

    Purpose:
    - Smooth noisy risk scores over time
    - Detect risk trends (RISING / FALLING / STABLE)
    - Produce a stable risk state for decision making
    """

    def __init__(self, window: int = 5, alpha: float = 0.3):
        self.window = window
        self.alpha = alpha

        self.risk_history = deque(maxlen=window)
        self.ema_risk = None
        self.current_state = "SAFE"

    def update(self, risk_score: int) -> dict:
        self.risk_history.append(risk_score)
        history = np.array(self.risk_history)

        sma = int(history.mean())

        if self.ema_risk is None:
            self.ema_risk = float(risk_score)
        else:
            self.ema_risk = (
                self.alpha * risk_score +
                (1 - self.alpha) * self.ema_risk
            )

        trend = self._compute_trend(history)

        stable_state = self._stable_state(self.ema_risk)

        return {
            "current_risk": risk_score,
            "sma_risk": sma,
            "ema_risk": int(self.ema_risk),
            "trend": trend,
            "stable_state": stable_state
        }

    def _compute_trend(self, history: np.ndarray) -> str:
        if len(history) < 3:
            return "INSUFFICIENT_DATA"

        slope = np.polyfit(range(len(history)), history, 1)[0]

        if slope > TREND_SLOPE_THRESHOLD:
            return "RISING"
        elif slope < -TREND_SLOPE_THRESHOLD:
            return "FALLING"
        else:
            return "STABLE"

    def _stable_state(self, risk: float) -> str:
        if risk < 30:
            self.current_state = "SAFE"
        elif risk < 60:
            self.current_state = "CAUTION"
        else:
            self.current_state = "CRITICAL"

        return self.current_state


Writing temporal_risk.py


In [6]:
%%writefile decision_engine.py

class DecisionEngine:
    """
    Decision Engine

    Purpose:
    - Convert stabilized risk signals into deterministic safety actions
    - Prioritize highest-risk scenarios first
    - Produce auditable, rule-based decisions
    """

    def decide(self, stable_state: str, trend: str, ema_risk: int) -> dict:
        reasons = []

        if ema_risk >= 85:
            reasons.append("EMA risk extremely high")
            return self._decision(
                action="Stop vehicle immediately",
                severity="CRITICAL",
                message="Driver in dangerous condition",
                reasons=reasons
            )

        if ema_risk >= 70 and trend == "RISING":
            reasons.extend([
                "High EMA risk",
                "Risk trending upward"
            ])
            return self._decision(
                action="Recommended immediate break",
                severity="HIGH",
                message="Driver fatigue escalating",
                reasons=reasons
            )

        if 50 <= ema_risk < 70:
            reasons.append("Moderate EMA risk")

            if trend == "RISING":
                reasons.append("Upward risk trend detected")
                return self._decision(
                    action="Recommended immediate break",
                    severity="MEDIUM",
                    message="Driver showing increasing fatigue",
                    reasons=reasons
                )

            return self._decision(
                action="Suggest short rest",
                severity="MEDIUM",
                message="Risk elevated but stable",
                reasons=reasons
            )

        if ema_risk < 50:
            reasons.append("Low EMA risk")

            if trend == "RISING":
                reasons.append("Early warning: upward trend")
                return self._decision(
                    action="Monitor closely",
                    severity="LOW",
                    message="Risk low but increasing",
                    reasons=reasons
                )

            return self._decision(
                action="Continue driving",
                severity="LOW",
                message="Driver condition stable",
                reasons=reasons
            )

        return self._decision(
            action="Monitor system",
            severity="UNKNOWN",
            message="Insufficient decision data",
            reasons=["Fallback triggered"]
        )

    def _decision(self, action: str, severity: str, message: str, reasons: list) -> dict:
        return {
            "action": action,
            "severity": severity,
            "message": message,
            "reasons": reasons
        }


Writing decision_engine.py


In [15]:
import joblib
import pandas as pd
from google.colab import files
#uploaded=files.upload()

from cognitive_engine import CognitiveRiskEngine
from temporal_risk import TemporalRiskTracker
from decision_engine import DecisionEngine
from explainability import ExplainabilityEngine

pipeline = joblib.load("final_driver_drowsiness_pipeline.pkl")

cognitive = CognitiveRiskEngine()
temporal = TemporalRiskTracker()
decision_engine = DecisionEngine()
explain_engine = ExplainabilityEngine()

sample = {
    "Speed": 55,
    "Alertness": 0.4,
    "Seatbelt": 1,
    "Heart_rate": 112,
    "Fatigue": 8,
    "speed_change": 4,
    "prev_alertness": 0.55
}

df = pd.DataFrame([sample])
df_ml = df.rename(columns={"Heart_rate": "HR"})

ml_prob = pipeline.predict_proba(df_ml)[0][1]

cog = cognitive.compute(df, ml_prob)

temp = temporal.update(cog["risk_score"])

decision = decision_engine.decide(
    stable_state=temp["stable_state"],
    trend=temp["trend"],
    ema_risk=temp["ema_risk"]
)

explanations = explain_engine.explain(df, temp)

print("\nFINAL OUTPUT")
print("Decision:", decision)
print("Explanations:")
for e in explanations:
    print("-", e)



FINAL OUTPUT
Decision: {'action': 'Continue driving', 'severity': 'LOW', 'message': 'Driver condition stable', 'reasons': ['Low EMA risk']}
Explanations:
- {'feature': 'Fatigue', 'value': np.int64(8), 'threshold': '>= 7', 'severity': 'HIGH', 'reason': 'High fatigue reduces reaction time and attention'}
- {'feature': 'Alertness', 'value': np.float64(0.4), 'threshold': '<= 0.45', 'severity': 'HIGH', 'reason': 'Low alertness strongly correlates with drowsiness'}
- {'feature': 'Heart_rate', 'value': np.int64(112), 'threshold': '>= 105', 'severity': 'MEDIUM', 'reason': 'Elevated heart rate indicates physiological stress'}
- {'feature': 'Speed', 'value': np.int64(55), 'threshold': '<= 60', 'severity': 'MEDIUM', 'reason': 'Reduced driving speed may indicate loss of focus'}
