In [20]:
from __future__ import annotations
import os
import sys
import shutil
import glob
import json
import random
import argparse
from dataclasses import dataclass
from pathlib import Path
from typing import List, Dict, Tuple


import cv2
import numpy as np
from sklearn.model_selection import train_test_split
from dotenv import load_dotenv


try:
    import mediapipe as mp # tu·ª≥ ch·ªçn ƒë·ªÉ t·∫°o bbox m·∫Øt/mi·ªáng chu·∫©n h∆°n
    _HAS_MEDIAPIPE = True
except Exception:
    _HAS_MEDIAPIPE = False


from ultralytics import YOLO


SEED = 42
random.seed(SEED)
np.random.seed(SEED)

In [21]:

# ---------------------------- Utils ---------------------------- #

def ensure_dir(p: Path):
    p.mkdir(parents=True, exist_ok=True)


def clean_tmp(tmp_dir: Path):
    if tmp_dir.exists():
        for f in tmp_dir.glob('*'):
            try:
                f.unlink()
            except Exception:
                pass


def natural_key(s: str):
    import re
    return [int(text) if text.isdigit() else text.lower() for text in re.split(r'(\d+)', s)]

In [22]:
# ---------------------------- Config ---------------------------- #

@dataclass
class Paths:
    root: Path
    open_eyes: Path
    closed_eyes: Path
    yawns: Path
    videos: Path
    new_videos: Path
    yolo_dataset: Path
    output: Path
    model_dir: Path
    tmp: Path

    @staticmethod
    def from_env() -> 'Paths':
        load_dotenv(override=True)
        root = Path(os.getenv('DATA_ROOT', './data')).resolve()
        return Paths(
            root=root,
            open_eyes=Path(os.getenv('OPEN_EYES', root/'Open_Eyes')).resolve(),
            closed_eyes=Path(os.getenv('CLOSED_EYES', root/'Closed_Eyes')).resolve(),
            yawns=Path(os.getenv('YAWNS', root/'Yawns')).resolve(),
            videos=Path(os.getenv('VIDEOS', root/'Videos')).resolve(),
            new_videos=Path(os.getenv('NEW_VIDEOS', root/'New_Videos')).resolve(),
            yolo_dataset=Path(os.getenv('YOLO_DATASET', root/'YOLO_Dataset')).resolve(),
            output=Path(os.getenv('OUTPUT', './outputs')).resolve(),
            model_dir=Path(os.getenv('MODEL_DIR', './models')).resolve(),
            tmp=Path(os.getenv('TMP_DIR', './tmp')).resolve(),
        )


In [23]:

# ---------------------------- Label helpers ---------------------------- #

CLASS_MAP = {
    'open_eye': 0,
    'closed_eye': 1,
    # c√≥ th·ªÉ m·ªü r·ªông: 'yawn': 2
}

CLASS_NAMES = ['open_eye', 'closed_eye']


def write_yolo_label(label_path: Path, cls_id: int, xyxy: Tuple[int,int,int,int], img_w: int, img_h: int):
    x1, y1, x2, y2 = xyxy
    # clip
    x1, y1 = max(0, x1), max(0, y1)
    x2, y2 = min(img_w-1, x2), min(img_h-1, y2)
    # convert to xywh normalized
    w = x2 - x1
    h = y2 - y1
    if w <= 1 or h <= 1:
        return False
    cx = x1 + w/2
    cy = y1 + h/2
    nx = cx / img_w
    ny = cy / img_h
    nw = w / img_w
    nh = h / img_h
    label_path.write_text(f"{cls_id} {nx:.6f} {ny:.6f} {nw:.6f} {nh:.6f}\n")
    return True

In [24]:
# ---------------------------- Detectors (for auto-label) ---------------------------- #

class ROIExtractor:
    """T·∫°o bbox m·∫Øt/mi·ªáng t·ª´ ·∫£nh khu√¥n m·∫∑t. ∆Øu ti√™n MediaPipe (·ªïn ƒë·ªãnh), fallback HaarCascade.
    """
    def __init__(self):
        self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
        self.eye_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_eye.xml')
        if _HAS_MEDIAPIPE:
            self.mp_face = mp.solutions.face_mesh.FaceMesh(static_image_mode=True, refine_landmarks=True)
        else:
            self.mp_face = None

    def infer_eye_boxes(self, img: np.ndarray) -> List[Tuple[int,int,int,int]]:
        h, w = img.shape[:2]
        # --- MediaPipe path ---
        if self.mp_face is not None:
            img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            res = self.mp_face.process(img_rgb)
            if res.multi_face_landmarks:
                boxes = []
                for lm in res.multi_face_landmarks:
                    # indices cho m·∫Øt tr√°i/ph·∫£i (FaceMesh mesh indices)
                    left_ids = [33, 133, 159, 145]
                    right_ids = [362, 263, 386, 374]
                    for ids in (left_ids, right_ids):
                        xs = [int(lm.landmark[i].x * w) for i in ids]
                        ys = [int(lm.landmark[i].y * h) for i in ids]
                        x1, y1, x2, y2 = min(xs), min(ys), max(xs), max(ys)
                        # m·ªü r·ªông nh·∫π ƒë·ªÉ ch·ª©a vi·ªÅn
                        pad = int(0.25 * max(x2-x1, y2-y1))
                        x1, y1 = max(0, x1 - pad), max(0, y1 - pad)
                        x2, y2 = min(w-1, x2 + pad), min(h-1, y2 + pad)
                        boxes.append((x1, y1, x2, y2))
                return boxes
        # --- Haar fallback ---
        faces = self.face_cascade.detectMultiScale(img, 1.2, 5)
        eye_boxes: List[Tuple[int,int,int,int]] = []
        for (x, y, fw, fh) in faces:
            roi = img[y:y+fh, x:x+fw]
            eyes = self.eye_cascade.detectMultiScale(roi)
            for (ex, ey, ew, eh) in eyes[:2]:
                eye_boxes.append((x+ex, y+ey, x+ex+ew, y+ey+eh))
        return eye_boxes


In [25]:
# ---------------------------- Dataset preparation ---------------------------- #

class YOLODatasetBuilder:
    def __init__(self, paths: Paths, img_size: int = 640):
        self.p = paths
        self.img_size = img_size
        self.roi = ROIExtractor()
        self.yolo_dirs = [
            self.p.yolo_dataset/'images'/'train',
            self.p.yolo_dataset/'images'/'val',
            self.p.yolo_dataset/'labels'/'train',
            self.p.yolo_dataset/'labels'/'val',
        ]
        for d in self.yolo_dirs:
            ensure_dir(d)
        ensure_dir(self.p.tmp)

    def _collect_images(self) -> List[Tuple[Path, int]]:
        pairs: List[Tuple[Path,int]] = []
        for img_path in sorted(self.p.open_eyes.glob('**/*.*'), key=lambda s: natural_key(str(s))):
            if img_path.suffix.lower() in {'.jpg', '.jpeg', '.png'}:
                pairs.append((img_path, CLASS_MAP['open_eye']))
        for img_path in sorted(self.p.closed_eyes.glob('**/*.*'), key=lambda s: natural_key(str(s))):
            if img_path.suffix.lower() in {'.jpg', '.jpeg', '.png'}:
                pairs.append((img_path, CLASS_MAP['closed_eye']))
        return pairs

    def _frames_from_videos(self) -> List[Tuple[Path, int]]:
        results: List[Tuple[Path,int]] = []
        for folder in [self.p.videos, self.p.new_videos]:
            if not folder.exists():
                continue
            for v in sorted(folder.glob('**/*.*')):
                if v.suffix.lower() not in {'.mp4', '.avi', '.mov', '.mkv'}:
                    continue
                name = v.stem.lower()
                label = None
                if any(k in name for k in ['open', 'mo']):
                    label = CLASS_MAP['open_eye']
                elif any(k in name for k in ['closed', 'dong', 'nhammat', 'sleep', 'drowsy']):
                    label = CLASS_MAP['closed_eye']
                if label is None:
                    continue
                cap = cv2.VideoCapture(str(v))
                if not cap.isOpened():
                    continue
                frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                step = max(frame_count // 40, 1)  # l·∫•y t·ªëi ƒëa 40 frame/video
                idx = 0
                saved = 0
                while True:
                    ok = cap.grab()
                    if not ok:
                        break
                    if idx % step == 0:
                        ok, frame = cap.retrieve()
                        if not ok:
                            break
                        fp = self.p.tmp / f"{v.stem}_{idx}.jpg"
                        cv2.imwrite(str(fp), frame)
                        results.append((fp, label))
                        saved += 1
                        if saved >= 40:
                            break
                    idx += 1
                cap.release()
        return results

    def _auto_label(self, img_path: Path, cls_id: int, out_img: Path, out_label: Path):
        img = cv2.imread(str(img_path))
        if img is None:
            return False
        h, w = img.shape[:2]
        # gi·ªØ nguy√™n ·∫£nh g·ªëc; YOLO s·∫Ω letterbox sau
        cv2.imwrite(str(out_img), img)
        boxes = self.roi.infer_eye_boxes(img)
        ok_any = False
        for box in boxes:
            ok_any |= write_yolo_label(out_label, cls_id, box, w, h)
        return ok_any

    def build(self, val_ratio: float = 0.2):
        print('üîÑ ƒêang gom ·∫£nh‚Ä¶')
        pairs = self._collect_images()
        pairs += self._frames_from_videos()
        if len(pairs) == 0:
            print('‚ùå Kh√¥ng t√¨m th·∫•y ·∫£nh/video!')
            return
        labels = [c for _, c in pairs]
        train_pairs, val_pairs = train_test_split(pairs, test_size=val_ratio, random_state=SEED, stratify=labels)

        for split, items in [('train', train_pairs), ('val', val_pairs)]:
            img_dir = self.p.yolo_dataset/'images'/split
            lbl_dir = self.p.yolo_dataset/'labels'/split
            for src, cls_id in items:
                out_img = img_dir / f"{src.stem}.jpg"
                out_lbl = lbl_dir / f"{src.stem}.txt"
                ensure_dir(out_img.parent)
                ensure_dir(out_lbl.parent)
                success = self._auto_label(src, cls_id, out_img, out_lbl)
                if not success:
                    # fallback: n·∫øu kh√¥ng ph√°t hi·ªán m·∫Øt, d√πng bbox m·∫∑t to√†n ·∫£nh (k√©m l√Ω t∆∞·ªüng nh∆∞ng c√≤n h∆°n b·ªè)
                    img = cv2.imread(str(src))
                    if img is None:
                        continue
                    h, w = img.shape[:2]
                    cv2.imwrite(str(out_img), img)
                    # KH√îNG khuy·∫øn ngh·ªã: ch·ªâ ƒë√≥ng vai tr√≤ d·ª± ph√≤ng
                    write_yolo_label(out_lbl, cls_id, (0, 0, w-1, h-1), w, h)

        # t·∫°o data.yaml
        data_yaml = self.p.yolo_dataset/'data.yaml'
        data_yaml.write_text(
            "\n".join([
                f"path: {self.p.yolo_dataset}",
                "train: images/train",
                "val: images/val",
                f"nc: {len(CLASS_NAMES)}",
                f"names: {CLASS_NAMES}",
            ]) + "\n"
        )
        print(f"‚úÖ Ho√†n t·∫•t build dataset ‚Üí {self.p.yolo_dataset}")
        # d·ªçn tmp
        clean_tmp(self.p.tmp)


In [26]:
# ---------------------------- Trainer ---------------------------- #

class YOLOTrainer:
    def __init__(self, paths: Paths, model: str = 'yolov8n.pt', imgsz: int = 640):
        self.p = paths
        ensure_dir(paths.model_dir)
        self.imgsz = imgsz
        print(f"üöÄ Loading model: {model}")
        self.model = YOLO(model)

    def train(self,
              epochs: int = 100,
              batch: int | str = 'auto',
              workers: int = 8,
              lr0: float = 0.002,  # h∆°i th·∫•p cho ·ªïn ƒë·ªãnh
              lrf: float = 0.12,   # cosine final lr ratio
              mosaic: float = 0.3, # ƒë·ªëi t∆∞·ª£ng nh·ªè: gi·ªØ mosaic nh·∫π
              mixup: float = 0.0,
              hsv_h: float = 0.015,
              hsv_s: float = 0.7,
              hsv_v: float = 0.4,
              degrees: float = 5.0,
              translate: float = 0.05,
              scale: float = 0.2,
              shear: float = 1.0,
              erasing: float = 0.0,
              patience: int = 30,
              device: str = None):
        data_yaml = self.p.yolo_dataset/'data.yaml'
        assert data_yaml.exists(), "data.yaml ch∆∞a t·ªìn t·∫°i, h√£y ch·∫°y --prepare tr∆∞·ªõc"

        args = dict(
            data=str(data_yaml),
            imgsz=self.imgsz,
            epochs=epochs,
            batch=batch,
            workers=workers,
            device=device or (0 if cv2.cuda.getCudaEnabledDeviceCount() > 0 else 'cpu'),
            seed=SEED,
            project=str(self.p.output),
            name='drowsy_det',
            cos_lr=True,
            lr0=lr0,
            lrf=lrf,
            optimizer='SGD',  # SGD + cosine th∆∞·ªùng b·ªÅn v·ªõi small objects; c√≥ th·ªÉ th·ª≠ AdamW
            momentum=0.937,
            weight_decay=0.0005,
            warmup_epochs=3.0,
            warmup_momentum=0.8,
            warmup_bias_lr=0.1,
            amp=True,
            patience=patience,
            cache='ram',  # tƒÉng t·ªëc IO
            # Augment (theo ƒë·∫∑c th√π m·∫Øt nh·ªè)
            mosaic=mosaic,
            mixup=mixup,
            hsv_h=hsv_h,
            hsv_s=hsv_s,
            hsv_v=hsv_v,
            degrees=degrees,
            translate=translate,
            scale=scale,
            shear=shear,
            erasing=erasing,
            box=7.5,  # tƒÉng nh·∫π loss bbox
            cls=0.5,  # gi·∫£m tr·ªçng s·ªë cls ƒë·ªÉ tr√°nh overfit label noise
            fl_gamma=1.5,  # focal loss
            iou=0.2,  # iou loss gain
            imgsz_min=self.imgsz,
            imgsz_max=self.imgsz,
            save_json=False,
            val=True,
        )
        print('üìä Training args:', json.dumps({k:v for k,v in args.items() if k not in {'data'}}, indent=2))
        results = self.model.train(**args)
        save_dir = Path(results.save_dir)
        best = save_dir/'weights'/'best.pt'
        if best.exists():
            dst = self.p.model_dir/'best_drowsy.pt'
            shutil.copy2(best, dst)
            print(f"‚úÖ Best model ‚Üí {dst}")
        else:
            print("‚ö†Ô∏è Kh√¥ng t√¨m th·∫•y best.pt!")
        return str(best)

    def validate(self, weights: Path | None = None):
        w = Path(weights) if weights else (self.p.model_dir/'best_drowsy.pt')
        print(f"üîé ƒê√°nh gi√°: {w}")
        m = YOLO(str(w))
        data_yaml = self.p.yolo_dataset/'data.yaml'
        return m.val(data=str(data_yaml), imgsz=self.imgsz, iou=0.6, conf=0.25)

    def export(self, weights: Path | None = None, fmt: str = 'onnx'):
        w = Path(weights) if weights else (self.p.model_dir/'best_drowsy.pt')
        m = YOLO(str(w))
        file = m.export(format=fmt)
        dst = self.p.model_dir/f'drowsy_export.{fmt}'
        shutil.copy2(file, dst)
        print(f"‚úÖ Exported ‚Üí {dst}")
        return str(dst)

In [27]:

# ---------------------------- Realtime tester ---------------------------- #

class Realtime:
    def __init__(self, weights: Path, device: str | int | None = None):
        self.model = YOLO(str(weights))
        self.device = device or (0 if cv2.cuda.getCudaEnabledDeviceCount()>0 else 'cpu')

    def run(self, src=0, conf=0.4):
        cap = cv2.VideoCapture(src)
        if not cap.isOpened():
            print('‚ùå Kh√¥ng m·ªü ƒë∆∞·ª£c camera/video')
            return
        while True:
            ok, frame = cap.read()
            if not ok:
                break
            res = self.model.predict(frame, conf=conf, verbose=False, device=self.device)
            for r in res:
                for b in r.boxes:
                    x1, y1, x2, y2 = map(int, b.xyxy[0].tolist())
                    cls = int(b.cls[0])
                    cf = float(b.conf[0])
                    label = f"{CLASS_NAMES[cls]} {cf:.2f}"
                    cv2.rectangle(frame, (x1,y1), (x2,y2), (0,255,0), 2)
                    cv2.putText(frame, label, (x1, max(20, y1-10)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2)
            cv2.imshow('Drowsy Detect ‚Äì YOLO', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        cap.release()
        cv2.destroyAllWindows()

In [28]:
# --- thay th·∫ø ƒëo·∫°n parse_args + main c≈© ---

def parse_args(argv=None):
    ap = argparse.ArgumentParser(description='Drowsy YOLO pipeline')
    ap.add_argument('--prepare', action='store_true', help='Chu·∫©n ho√° & auto-label dataset YOLO')
    ap.add_argument('--train', action='store_true', help='Hu·∫•n luy·ªán m√¥ h√¨nh')
    ap.add_argument('--val', action='store_true', help='ƒê√°nh gi√° m√¥ h√¨nh')
    ap.add_argument('--export', action='store_true', help='Xu·∫•t ONNX/TorchScript')
    ap.add_argument('--realtime', action='store_true', help='Ch·∫°y realtime')
    ap.add_argument('--weights', type=str, default='yolov8n.pt', help='Kh·ªüi t·∫°o/ƒë·ªçc weights')
    ap.add_argument('--imgsz', type=int, default=640, help='K√≠ch th∆∞·ªõc ·∫£nh train/infer')
    ap.add_argument('--fmt', type=str, default='onnx', choices=['onnx','torchscript','pt'])
    ap.add_argument('--val_ratio', type=float, default=0.2)

    # Khi ch·∫°y trong Jupyter/Spyder, b·ªè h·∫øt argv ‚Äúl·∫°‚Äù ƒë·ªÉ tr√°nh nu·ªët nh·∫ßm
    if argv is None:
        if any(m in sys.modules for m in ('ipykernel', 'spyder_kernels')):
            argv = []
        else:
            argv = sys.argv[1:]

    args, unknown = ap.parse_known_args(argv)
    if unknown:
        print(f"‚ö†Ô∏è Ignoring unknown args from environment: {unknown}")
    return args

def main(argv=None):
    args = parse_args(argv)
    p = Paths.from_env()
    for d in [p.open_eyes, p.closed_eyes, p.videos, p.new_videos, p.yolo_dataset, p.output, p.model_dir, p.tmp]:
        ensure_dir(d)

    if args.prepare:
        YOLODatasetBuilder(p, img_size=args.imgsz).build(val_ratio=args.val_ratio)

    trainer = YOLOTrainer(p, model=args.weights, imgsz=args.imgsz)

    if args.train:
        trainer.train()
    if args.val:
        trainer.validate()
    if args.export:
        trainer.export(fmt=args.fmt)
    if args.realtime:
        weights = p.model_dir/'best_drowsy.pt'
        if not weights.exists():
            print('‚ö†Ô∏è Ch∆∞a th·∫•y best_drowsy.pt, d√πng weights ƒë√£ cung c·∫•p')
            weights = Path(args.weights)
        Realtime(weights).run()

if __name__ == '__main__':
    main()


üöÄ Loading model: yolov8n.pt
