<a href="https://www.kaggle.com/code/vovanquangnbk/drowsy-process?scriptVersionId=144340588" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

## Setup

In [None]:
!pip -q install facenet-pytorch
!pip -q install mediapipe

In [None]:
import os
import pickle
import gc
import json
import glob
import time
import threading
import queue
import itertools
import pandas as pd
import numpy as np
import math
from tqdm import tqdm
import torch
from torch import nn
import torch.nn.functional as F

# Image library
import cv2
import torchvision
from torchvision import transforms
import matplotlib.pyplot as plt
from PIL import Image

# Model
from facenet_pytorch import MTCNN, InceptionResnetV1, extract_face
import mediapipe as mp

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Running on device: {device}')

In [None]:
data_dir = '/kaggle/input/sust-ddd/SUST Driver Drowsiness Dataset'
meta_dir = '/kaggle/input/sust-ddd-metadata/dataset_metadata.json'

In [None]:
CFG = {
    'vectorize': False,
    'extract_face': False,
    'extract_keypoints': True,
    'batch_size': 60,
    'show_examples': False,
    'n_frames': 3,
    'face_shape': (160,160),
}

## Create CSV

Process metadata file so we can map video_id with fold

In [None]:
f = open(meta_dir)
metadata_org = json.load(f)
f.close

metadata = {}
for fold in metadata_org.keys():
    for class_name, vd_ids in metadata_org[fold].items():
        for idx in vd_ids:
            metadata[idx] = fold

Helper dicts

In [None]:
int2label = {
    1: "drowsiness",
    0: "not drowsiness"
}
label2int = {v:k for k,v in int2label.items()}

Main function

In [None]:
def create_csv(data_dir, label2int):
    """
    Input: data_dir
    - dir format: data_dir/class/img_file
    Output: csv
    """
    df = {'id': [], 'label': [], 'label_name':[], 'path':[]}
    for folder in glob.glob(data_dir + '/*'):
        # Extract label
        label_name = folder[folder.rfind("/")+1:]

        # Extract file path
        f_path = os.path.join(data_dir, folder)

        # Fill in df
        for f in glob.glob(f_path + '/*'):
            f_name = f[f.rfind("/")+1:]
            df['id'].append(f_name)
            df['label'].append(label2int[label_name])
            df['label_name'].append(label_name)
            df['path'].append(f)

    df = pd.DataFrame(df)
    df = df.dropna()
    return df

df = create_csv(data_dir, label2int)
df['fold'] = df['id'].replace(metadata)
df = df.drop_duplicates()
df

## Extract faces

#### FastMTCNN

In [None]:
class FastMTCNN(object):
    """Fast MTCNN implementation."""
    
    def __init__(self, stride, resize=1, *args, **kwargs):
        """Constructor for FastMTCNN class.
        
        Arguments:
            stride (int): The detection stride. Faces will be detected every `stride` frames
                and remembered for `stride-1` frames.
        
        Keyword arguments:
            resize (float): Fractional frame scaling. [default: {1}]
            *args: Arguments to pass to the MTCNN constructor. See help(MTCNN).
            **kwargs: Keyword arguments to pass to the MTCNN constructor. See help(MTCNN).
        """
        self.stride = stride
        self.resize = resize
        self.mtcnn = MTCNN(*args, **kwargs)
        
    def __call__(self, frames):
        """Detect faces in frames using strided MTCNN."""
        if self.resize != 1:
            frames = [
                cv2.resize(f, (int(f.shape[1] * self.resize), int(f.shape[0] * self.resize)))
                    for f in frames
            ]
                      
        boxes, probs = self.mtcnn.detect(frames[::self.stride])

        faces = []
        for i, frame in enumerate(frames):
            box_ind = int(i / self.stride)
            if boxes[box_ind] is None:
                continue
            for box in boxes[box_ind]:
                box = [int(b) for b in box]
                face = frame[box[1]:box[3], box[0]:box[2]]
                try:
                    faces.append(cv2.resize(face, CFG['face_shape'], interpolation = cv2.INTER_AREA))
                except:
                    continue
        
        return faces

In [None]:
fast_mtcnn = FastMTCNN(
    stride=4,
    resize=0.5,
    margin=14,
    factor=0.6,
    keep_all=True,
    device=device
)

In [None]:
# Test FastMTCNN
if CFG['show_examples']:
    idx = 25
    v_dir = df.iloc[idx]['path']
    v_cap = cv2.VideoCapture(v_dir)
    success = v_cap.grab()        
    v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fnos = list(range(0, v_len, CFG['n_frames']))

    # set initial frame 
    v_cap.set(cv2.CAP_PROP_POS_FRAMES, fnos[0])

    idx, count = 0, fnos[0]
    while success:
        if count == fnos[idx]:
            success, frame = v_cap.retrieve()
            if not success:               
                break
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            face = fast_mtcnn(np.expand_dims(frame,0))[0]
            plt.figure()
            plt.imshow(face)
            plt.show()
            
            idx += 1
            if idx >= len(fnos):
                break
        count += 1
        success = v_cap.grab()
    v_cap.release()

#### Face Vectorizer

In [None]:
resnet = InceptionResnetV1(pretrained='vggface2', classify=True).to(device).eval()

In [None]:
class FaceVectorizer:
    def __init__(self, detector, n_frames=None, batch_size=None, resize=None):
        self.detector = detector
        self.n_frames = n_frames
        self.batch_size = batch_size
        self.resize = resize
    
    def __call__(self, v_dir):
        v_cap = cv2.VideoCapture(v_dir)
        success = v_cap.grab()        
        v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fnos = list(range(0, v_len, self.n_frames))
        frames = []
        faces = []

        # set initial frame 
        v_cap.set(cv2.CAP_PROP_POS_FRAMES, fnos[0])

        idx, count = 0, fnos[0]
        while success:
            if count == fnos[idx]:
                success, frame = v_cap.retrieve()
                if not success:               
                    break
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frames.append(frame)
                idx += 1
                if (len(frames) >= self.batch_size) or (idx >= len(fnos)):
                    faces.extend(self.detector(frames))
                    frames = []

                if idx >= len(fnos):
                    break
            count += 1
            success = v_cap.grab()
        v_cap.release()
        return faces
    
def process_face(faces, vectorizer):
    # Filter out frames without faces
    faces = [torch.from_numpy(f).float().permute(2,0,1) for f in faces if f is not None]
    faces = torch.stack(faces).to(device)

    # Generate facial feature vectors using a pretrained model
    embeddings = vectorizer(faces).detach().cpu().numpy()

    return embeddings

## Extract facial landmarks

In [None]:
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
    static_image_mode=True,
    max_num_faces=1,
    refine_landmarks=True,
    min_detection_confidence=0.5
)

# LEFT_EYE: 384, 385, 386, 387, 388, 390, 263, 362, 398, 466, 373, 374, 249, 380, 381, 382
# RIGHT_EYE: 160, 33, 161, 163, 133, 7, 173, 144, 145, 246, 153, 154, 155, 157, 158, 159

LEFT_EYE_INDEXES = list(set(itertools.chain(*mp_face_mesh.FACEMESH_LEFT_EYE)))
RIGHT_EYE_INDEXES = list(set(itertools.chain(*mp_face_mesh.FACEMESH_RIGHT_EYE)))             
    
def extract_eye_keypoints(frame):    
    left_eye_pts = []
    right_eye_pts = []

    face_mesh_results = face_mesh.process(frame)
    if face_mesh_results.multi_face_landmarks:
        for face_landmarks in face_mesh_results.multi_face_landmarks:
            for LEFT_EYE_INDEX in LEFT_EYE_INDEXES:
                eye_x = face_landmarks.landmark[LEFT_EYE_INDEX].x 
                eye_y = face_landmarks.landmark[LEFT_EYE_INDEX].y 
                left_eye_pts.append((eye_x, eye_y))

            for RIGHT_EYE_INDEX in RIGHT_EYE_INDEXES:
                eye_x = face_landmarks.landmark[RIGHT_EYE_INDEX].x
                eye_y = face_landmarks.landmark[RIGHT_EYE_INDEX].y
                right_eye_pts.append((eye_x, eye_y))
    return left_eye_pts, right_eye_pts

class EyeKeypoints:
    def __init__(self, n_frames=None):
        self.n_frames = n_frames
    
    def __call__(self, v_dir):
        v_cap = cv2.VideoCapture(v_dir)
        success = v_cap.grab()        
        v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fnos = list(range(0, v_len, self.n_frames))
        eye_pts = []

        # set initial frame 
        v_cap.set(cv2.CAP_PROP_POS_FRAMES, fnos[0])

        idx, count = 0, fnos[0]
        while success:
            if count == fnos[idx]:
                success, frame = v_cap.retrieve()
                if not success:               
                    break
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                left_eye_pts, right_eye_pts = extract_eye_keypoints(frame)
                if left_eye_pts and right_eye_pts:
                    left_eye_pts = np.array(left_eye_pts)
                    right_eye_pts = np.array(right_eye_pts)
                    eye_pts.append(np.concatenate((left_eye_pts, right_eye_pts), axis=1))
                
                idx += 1
                if idx >= len(fnos):
                    break
            count += 1
            success = v_cap.grab()
        v_cap.release()
        if eye_pts:
            return np.stack(eye_pts, axis=0)
        else:
            return None

In [None]:
if CFG['show_examples']:
    idx = 25
    v_dir = df.iloc[idx]['path']
    extracter =  EyeKeypoints(
        n_frames = CFG['n_frames']
    )
    eye_pts = extracter(v_dir)
    print(eye_pts.shape)

## Main

In [None]:
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
secret_value_0 = user_secrets.get_secret("KAGGLE_KEY")

os.makedirs('/kaggle/dataset/', exist_ok=True)
os.makedirs('/root/.kaggle/', exist_ok=True)
    
api_token = {"username":"vovanquangnbk","key":"507e3751d7cd3d60453ea1abe2b9ca9c"}

with open('/root/.kaggle/kaggle.json', 'w') as file:
    json.dump(api_token, file)
!chmod 600 /root/.kaggle/kaggle.json

In [None]:
# Create and write file to dataset folder
if CFG['vectorize']:
    face_vectorizer = FaceVectorizer(
        detector = fast_mtcnn,
        n_frames = CFG['n_frames'],
        batch_size = CFG['batch_size'], 
        resize=1
    )

    folds = np.arange(1,len(df['fold'].unique())+1)
    for fold in folds:
        print(f"Running fold {fold}:")
        v_ids = df[df['fold'] == f"fold{fold}"].index.tolist()
        output_metadata = {'id': [], 'label': [], 'label_name':[], 'fold':[]}

        faces = None
        with torch.no_grad():
            for _, idx in tqdm(enumerate(v_ids), total=len(v_ids)):
                try:
                    v_dir = df.iloc[idx]['path']
                    faces = face_vectorizer(v_dir)
                    feats = process_face(faces, resnet)

                    # Save vector
                    vec_name = f"{df.iloc[idx]['id'].split('.')[0]}.npy"
                    os.makedirs(f"/kaggle/dataset/fold{fold}", exist_ok=True)
                    np.save(os.path.join(f"/kaggle/dataset/fold{fold}", vec_name), feats)

                    # Add metadata
                    output_metadata['id'].append(vec_name)
                    output_metadata['label'].append(df.iloc[idx]['label'])
                    output_metadata['label_name'].append(df.iloc[idx]['label_name'])
                    output_metadata['fold'].append(df.iloc[idx]['fold'])

                except KeyboardInterrupt:
                    print('\nStopped.')
                    break

                except Exception as e:
                    print(e)

        output_metadata = pd.DataFrame(output_metadata)
        output_metadata = output_metadata.dropna()
        output_metadata.to_csv(os.path.join(f"/kaggle/dataset", f"metadata_fold{fold}.csv"), index=False)

In [None]:
if CFG['extract_face']:
    
    folds = np.arange(1,len(df['fold'].unique())+1)
    for fold in folds:
        print(f"Running fold {fold}:")
        v_ids = df[df['fold'] == f"fold{fold}"].sample(10).index.tolist()
        output_metadata = {'id': [], 'label': [], 'label_name':[], 'fold':[]}

        faces = None
        with torch.no_grad():
            for _, idx in tqdm(enumerate(v_ids), total=len(v_ids)):
                try:
                    v_dir = df.iloc[idx]['path']
                    faces = face_detector(v_dir)
                    faces = np.stack(faces)

                    # Save face
                    face_name = f"{df.iloc[idx]['id'].split('.')[0]}.npy"
                    os.makedirs(f"/kaggle/dataset/face/fold{fold}", exist_ok=True)
                    np.save(os.path.join(f"/kaggle/dataset/face/fold{fold}", face_name), faces)

                    # Add metadata
                    output_metadata['id'].append(face_name)
                    output_metadata['label'].append(df.iloc[idx]['label'])
                    output_metadata['label_name'].append(df.iloc[idx]['label_name'])
                    output_metadata['fold'].append(df.iloc[idx]['fold'])

                except KeyboardInterrupt:
                    print('\nStopped.')
                    break

                except Exception as e:
                    print(e)

        output_metadata = pd.DataFrame(output_metadata)
        output_metadata = output_metadata.dropna()
        output_metadata.to_csv(os.path.join(f"/kaggle/dataset/face", f"metadata_fold{fold}.csv"), index=False)

In [None]:
# Extract eye keypoints
if CFG['extract_keypoints']:
    # Save data to Kaggle dataset
    meta = dict(
        id="vovanquangnbk/drowsy-eye-keypoints",
        title="My brand new dataset",
        isPrivate=True,
        licenses=[dict(name="other")]
    )

    with open('/kaggle/dataset/dataset-metadata.json', 'w') as f:
        json.dump(meta, f)
        
    # Run main
    extracter =  EyeKeypoints(n_frames = CFG['n_frames'])
    
    folds = np.arange(1,len(df['fold'].unique())+1)
    
    for fold in folds:
        os.makedirs(f"/kaggle/dataset/my_dataset/fold{fold}", exist_ok=True)
        print(f"Running fold {fold}:")
        v_ids = df[df['fold'] == f"fold{fold}"].index.tolist()
        output_metadata = {'id': [], 'label': [], 'label_name':[], 'fold':[]}

        eye_pts = None
        for _, idx in tqdm(enumerate(v_ids), total=len(v_ids)):
            try:
                v_dir = df.iloc[idx]['path']
                eye_pts = extracter(v_dir)

                # Save file
                f_name = f"{df.iloc[idx]['id'].split('.')[0]}.npy"
                np.save(os.path.join(f"/kaggle/dataset/my_dataset/fold{fold}", f_name), eye_pts)
                
                # Free up RAM
                del eye_pts
                gc.collect()
                
                # Add metadata
                output_metadata['id'].append(f_name)
                output_metadata['label'].append(df.iloc[idx]['label'])
                output_metadata['label_name'].append(df.iloc[idx]['label_name'])
                output_metadata['fold'].append(df.iloc[idx]['fold'])

            except KeyboardInterrupt:
                print('\nStopped.')
                break

            except Exception as e:
                print(e)

        output_metadata = pd.DataFrame(output_metadata)
        output_metadata = output_metadata.dropna()
        output_metadata.to_csv(os.path.join(f"/kaggle/dataset/my_dataset", f"metadata_fold{fold}.csv"), index=False)
        gc.collect()

In [None]:
# !kaggle datasets create -p "/kaggle/dataset" --dir-mode zip
!kaggle datasets version -p "/kaggle/dataset" -m "Updated via notebook" --dir-mode zip