<a href="https://www.kaggle.com/code/selmagrosse/drowsiness-detection-eye-classification?scriptVersionId=250598172" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
import os
import numpy as np
import cv2 as cv

# Process raw data


In [None]:
# Path of the directory with raw images
raw_dir = '/kaggle/input/mrl-dataset/train'
folders = ['Open_Eyes', 'Closed_Eyes']

# Path of the directory where processed images will be stored
processed_dir = '/kaggle/working'


In [None]:
# Mean and std of ImageNet will be used to normalize the images
mean = np.array([0.485, 0.456, 0.406])
std  = np.array([0.229, 0.224, 0.225])

images = []
labels = []

def process_image(img_path):
    img = cv.imread(img_path)
    if img is None:
        print(f"Error loading {img_path}.")
        return None
    # Resize image to 224 x 224 as this image size is expected by ResNet or MobileNet which will be used for image classification
    img = cv.resize(img, (224, 224))
    # cv2 uses the BGR color format, so we convert it to the RGB format
    img = cv.cvtColor(img, cv.COLOR_BGR2RGB)
    # Normalize the image values to range [0, 1]
    img = img / 255.0
    img = (img - mean) / std
    return img


for label, folder in enumerate(folders):
    folder_path = os.path.join(raw_dir, folder)
    for filename in os.listdir(folder_path):
        file_path = os.path.join(folder_path, filename)
        img = process_image(file_path)
        if img is not None:
            images.append(img)
            labels.append(label)

# Convert images and labels to NumPy arrays
images = np.array(images)
labels = np.array(labels)

# Save processed images and labels
np.save(os.path.join(processed_dir, "images.npy"), images)
np.save(os.path.join(processed_dir, "labels.npy"), labels)

In [None]:
from sklearn.model_selection import train_test_split

# Load the processed dataset
images = np.load(os.path.join(processed_dir, "images.npy"))
labels = np.load(os.path.join(processed_dir, "labels.npy"))

'''Split the dataset into train+val (80%) and test datasets (20%). The data is automatically shuffled. 
stratify=labels: the generated splits gave the same proprotion of labels as given by parameter "labels".'''
images_train_val, images_test, labels_train_val, labels_test = train_test_split(images, labels, test_size=0.2, random_state=42, stratify=labels)

# Split the train_val dataset into train (75%) and val datasets (25%)
images_train, images_val, labels_train, labels_val = train_test_split(images_train_val, labels_train_val, test_size=0.25, random_state=42, stratify=labels_train_val)

print(f"Train set size: {images_train.shape[0]}")
print(f"Validation set size: {images_val.shape[0]}")
print(f"Test set size: {images_test.shape[0]}")

# Save split data and labels
np.save(os.path.join(processed_dir, "train_images.npy"), images_train)
np.save(os.path.join(processed_dir, "train_labels.npy"), labels_train)
np.save(os.path.join(processed_dir, "val_images.npy"), images_val)
np.save(os.path.join(processed_dir, "val_labels.npy"), labels_val)
np.save(os.path.join(processed_dir, "test_images.npy"), images_test)
np.save(os.path.join(processed_dir, "test_labels.npy"), labels_test)

# Finetune ResNet50

In [None]:
from tensorflow.keras.applications import ResNet50
from tensorflow.keras import models, layers, Sequential

def get_resnet50_model(variant='base'):
    # Load the ResNet50 model pretrained on ImageNet
    base_model = ResNet50(include_top=False, input_shape=(224, 224, 3))
    train_from = 150

    if variant == 'finetune':
        base_model.trainable = True
        for layer in base_model.layers[:train_from]:
            layer.trainable = False
    else:
        # Freeze all layers
        base_model.trainable = False
    
    model_append = [layers.GlobalAveragePooling2D()]
    model_append.append(layers.Dense(1, activation='sigmoid'))
    model = Sequential([base_model] + model_append)

    return model

# Train

In [None]:
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.metrics import BinaryAccuracy, Precision, Recall, AUC, F1Score
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard

# Load train dataset
X_train = np.load(os.path.join(processed_dir, "train_images.npy"))
y_train = np.load(os.path.join(processed_dir, "train_labels.npy"))
# Reshape y_train (num_train_examples,) to have the same shape as model predictions in tf (num_train_examples, 1)
y_train = y_train.reshape(-1, 1)

# Load validation dataset
X_val = np.load(os.path.join(processed_dir, "val_images.npy"))
y_val = np.load(os.path.join(processed_dir, "val_labels.npy"))
# Reshape y_val (num_val_examples,) to have the same shape as model predictions in tf (num_val_examples, 1)
y_val = y_val.reshape(-1, 1)

# Choose model variant: 'base', 'finetune'
model_variant = 'finetune'
# Load ResNet50 model
model = get_resnet50_model(variant=model_variant)

# Configure model settings for training
model.compile(optimizer=Adam(learning_rate=1e-4), loss=BinaryCrossentropy(), metrics=[BinaryAccuracy(), Precision(), Recall(), AUC(), F1Score(threshold=0.5)])

# Callbacks
# Create the EarlyStopping callback
early_stop = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
checkpoint = ModelCheckpoint(f"/kaggle/working/{model_variant}.h5", save_best_only=True)

# Train modified ResNet50
history = model.fit(X_train, y_train, validation_data=[X_val, y_val], batch_size=32, epochs=15, callbacks=[early_stop, checkpoint])

# Test

In [None]:
from tensorflow.keras.models import load_model
from sklearn.metrics import classification_report, confusion_matrix

# Load test dataset
X_test = np.load("/kaggle/working/test_images.npy")
y_test = np.load("/kaggle/working/test_labels.npy")
# Reshape y_test (num_test_examples,) to have the same shape as model predictions in tf (num_test_examples, 1)
y_test = y_test.reshape(-1, 1)

# Load the updated resnet model
model = load_model("/kaggle/working/finetune.h5")

# Evaluate on the test dataset
results = model.evaluate(X_test, y_test, batch_size=32, verbose=1)

# Predict labels
y_pred_probs = model.predict(X_test)
y_pred = (y_pred_probs > 0.5).astype(int)

# Print classification report
print("\nClassification Report:")
print(classification_report(y_test, y_pred, target_names=["Open", "Closed"]))

# Print confusion matrix
print("\nConfusion Matrix:")
print(confusion_matrix(y_test, y_pred))