In [None]:
# Re-loads all imports every time the cell is ran. 
%load_ext autoreload
%autoreload 2

from time import time
import math

import numpy as np
import pandas as pd
pd.options.display.float_format = '{:,.5f}'.format
import cv2

from IPython.display import display

# Sklearn tools
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from skimage.feature import hog

# Neural Networks
    import torch
    import torch.nn as nn
torch.multiprocessing.set_start_method('spawn')

from torch.utils.data import Dataset, DataLoader

import torchmetrics
import pytorch_lightning as pl
from pytorch_lightning import Trainer, seed_everything
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers.csv_logs import CSVLogger
from pytorch_lightning.loggers import TensorBoardLogger

from ultralytics import YOLO
import warnings
warnings.filterwarnings('ignore', module='ultralytics.yolo.engine.results.Boxes')

# Plotting
%matplotlib inline
import matplotlib.pyplot as plt

# Input data files are available in the read-only '../input/' directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
from collections import OrderedDict
from glob import glob as iglob
from random import shuffle
import re

# Data module

In [None]:
from annotations import ANNOTATIONS as annotations
ANNO_REGEX = r"(\d_[a-zA-Z]+)_([a-zA-Z0-9]+)_([a-zA-Z]+)"

In [None]:
annotation_by_state = {
    'Yawning': {},
    'Normal': {},
}
for k, a in annotations.items():
    m = re.search(ANNO_REGEX, k)
    if not m:
        state = 'Yawning' if len(a['yawns']) > 0 else 'Normal'
        who = k
    else:
        who, att, state = m.groups()
    if who not in annotation_by_state[state]:
        annotation_by_state[state][who] = []
    x = a['num'] + a['offset']
    y = np.zeros(x, dtype='int32')
    for yawn in a['yawns']:
        s, e, *_ = yawn
        y[s:e] = 1
    
    new_ann = {
        'offset': a['offset'],
        'yawns': a['yawns'],
        'name': k,
        'num': a['num'],
        'y': y,
        'path': a['path'] + k
    }
    annotation_by_state[state][who].append(new_ann)

In [None]:
annotation_by_who = {}
for k, a in annotations.items():
    m = re.search(ANNO_REGEX, k)
    if not m:
        state = 'Yawning' if len(a['yawns']) > 0 else 'Normal'
        who = k
    else:
        who, att, state = m.groups()
    if who not in annotation_by_who:
        annotation_by_who[who] = {
            'Yawning': [],
            'Normal': [],
        }
    y = np.zeros(a['num'], dtype='int32')
    for yawn in a['yawns']:
        s, e, *_ = yawn
        s -= a['offset']
        e -= a['offset']
        y[s:e] = 1
    
    new_ann = {
        'offset': a['offset'],
        'yawns': a['yawns'],
        'name': k,
        'num': a['num'],
        'y': y,
        'path': a['path'] + k + '/Jpg'
    }
    annotation_by_who[who][state].append(new_ann)

In [None]:
train, test = train_test_split(list(annotation_by_who.keys()), random_state=91)
train, test = train_test_split(train, test_size=0.1, random_state=91)

In [None]:
print(f"Number of persons: {len(annotation_by_who.keys())}")
print(f"Train: {train}")
print(f"Test: {test}")

In [None]:
from dataset import YawningDataModule, SingleSequenceDataset

# Model

In [None]:
class LSTMClassifier(pl.LightningModule):
    '''
    Standard PyTorch Lightning module:
    https://pytorch-lightning.readthedocs.io/en/latest/lightning_module.html
    '''
    def __init__(self, 
                 n_features, 
                 hidden_size, 
                 seq_len, 
                 batch_size,
                 num_layers, 
                 dropout, 
                 learning_rate,
                 criterion):
        super(LSTMClassifier, self).__init__()
        self.n_features = n_features
        self.hidden_size = hidden_size
        self.seq_len = seq_len
        self.batch_size = batch_size
        self.num_layers = num_layers
        self.dropout = dropout
        self.criterion = criterion
        self.learning_rate = learning_rate

        self.lstm = nn.LSTM(input_size=n_features, 
                            hidden_size=hidden_size,
                            num_layers=num_layers, 
                            dropout=dropout, 
                            batch_first=True)
        

        self.model = nn.Sequential(
            nn.Linear(hidden_size, 30),
            nn.ReLU(True),
            nn.Dropout(p=0.5),
            nn.Linear(30, 2),
            # nn.Softmax(dim=1),
        )

        self.train_metrics = nn.ModuleDict({
            'train_acc': torchmetrics.classification.MulticlassAccuracy(2),
            'train_f1': torchmetrics.classification.F1Score(task='multiclass', num_classes=2, average='macro'),
            'train_P': torchmetrics.classification.Precision(task='multiclass', num_classes=2, average='macro'),
            'train_R': torchmetrics.classification.Recall(task='multiclass', num_classes=2, average='macro'),
        })

        self.val_metrics = nn.ModuleDict({
            'val_acc': torchmetrics.classification.MulticlassAccuracy(2),
            'val_f1': torchmetrics.classification.F1Score(task='multiclass', num_classes=2, average='macro'),
            'val_P': torchmetrics.classification.Precision(task='multiclass', num_classes=2, average='macro'),
            'val_R': torchmetrics.classification.Recall(task='multiclass', num_classes=2, average='macro'),
        })

        self.test_metrics = nn.ModuleDict({
            'test_acc': torchmetrics.classification.MulticlassAccuracy(2),
            'test_f1': torchmetrics.classification.F1Score(task='multiclass', num_classes=2, average='macro'),
            'test_P': torchmetrics.classification.Precision(task='multiclass', num_classes=2, average='macro'),
            'test_R': torchmetrics.classification.Recall(task='multiclass', num_classes=2, average='macro'),
        })

        self.save_hyperparameters()
        
    def forward(self, x):
        # lstm_out = (batch_size, seq_len, hidden_size)
        lstm_out, _ = self.lstm(x)
        y_pred = self.model(lstm_out[:,-1])
        return y_pred
    
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.learning_rate)

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = self.criterion(y_hat, y)
        preds = nn.functional.softmax(y_hat, dim=1)
        
        for k, metric in self.train_metrics.items():
            metric(preds, y)
            self.log(k, metric, on_step=True, on_epoch=True)
        
        self.log('train_loss', loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = self.criterion(y_hat, y)
        preds = nn.functional.softmax(y_hat, dim=1)
        
        for k, metric in self.val_metrics.items():
            metric(preds, y)
            self.log(k, metric, on_step=True, on_epoch=True)
        
        self.log('val_loss', loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)

        return loss


# Parameters

In [None]:
parameters = dict(
    seq_len = 48,
    step = 8,
    yawn_thr = 0.5,
    batch_size = 32, 
    criterion = nn.CrossEntropyLoss(), #nn.MSELoss(),
    max_epochs = 100,
    n_features = 900,
    hidden_size = 600,
    num_layers = 1,
    dropout = 0.2,
    learning_rate = 0.001,
    yolo_weights = '../yolov8-face/runs/pose/best_yolov8-lite-t-pose-stematt-bifpn-t/weights/best.pt',
    num_workers = 12,
)

# Training loop

In [None]:
seed_everything(1)

results = []
num_folds = 4
split_seed = 91

for k in range(num_folds):
    
    # csv_logger = CSVLogger('./', name='lstm', version='1'),
    logger = TensorBoardLogger('runs', name=f'yawning_{num_folds}folds_{k}th')
    
    checkpoint_callback = ModelCheckpoint(dirpath=None, 
                                          filename='{epoch}_acc-{val_acc:.2f}_f1-{val_f1:.2f}',
                                          save_top_k=10, monitor="val_f1", mode='max')

    trainer = Trainer(
        max_epochs=parameters['max_epochs'],
        logger=logger,
        log_every_n_steps=1,
        callbacks=[checkpoint_callback],
    )
    
    model = LSTMClassifier(
        n_features = parameters['n_features'],
        hidden_size = parameters['hidden_size'],
        seq_len = parameters['seq_len'],
        batch_size = parameters['batch_size'],
        criterion = parameters['criterion'],
        num_layers = parameters['num_layers'],
        dropout = parameters['dropout'],
        learning_rate = parameters['learning_rate']
    )
    
    dm = YawningDataModule(
        annotation_by_who,
        model = parameters['yolo_weights'],
        seq_len = parameters['seq_len'],
        step = parameters['step'],
        yawn_thr = parameters['yawn_thr'],
        batch_size = parameters['batch_size'],
        num_workers = parameters['num_workers'],
        k = k,
        num_folds = num_folds,
        split_seed = split_seed,
    )

    trainer.fit(model, dm)
    print(f"Model {k}/{num_folds} trained")


In [None]:
model = LSTMClassifier.load_from_checkpoint(
    './runs/yawning_4folds_0th/version_0/checkpoints/epoch=98-step=10296.ckpt', 
    n_features = parameters['n_features'],
    hidden_size = parameters['hidden_size'],
    seq_len = parameters['seq_len'],
    batch_size = parameters['batch_size'],
    criterion = parameters['criterion'],
    num_layers = parameters['num_layers'],
    dropout = parameters['dropout'],
    learning_rate = parameters['learning_rate']
)

In [None]:
trainer.test(model, datamodule=dm)

# Inference

In [None]:
model = LSTMClassifier.load_from_checkpoint('./runs/yawning_4folds_0th/version_0/checkpoints/epoch=53_acc-val_acc=0.96_f1-val_f1=0.96.ckpt')

In [None]:
model.eval()