Test on YawDD

In [None]:
from datasets import load_dataset
train_ds = load_dataset('../data/archive/train/mouth', split='train')
val_ds = load_dataset('../data/archive/val/mouth', split='train')
test_ds = load_dataset('../data/test/mouth', split='train')

In [None]:
train_ds['image'][0]

In [None]:
train_ds['label'][0]

In [None]:
id2label = {0: "Awake", 1: "Yawning"}
label2id = {label:id for id,label in id2label.items()}

In [None]:
id2label[train_ds['label'][0]]

In [None]:
from transformers import ViTImageProcessor

processor = ViTImageProcessor.from_pretrained("google/vit-base-patch16-224-in21k")
image_mean = processor.image_mean
image_std = processor.image_std
size = processor.size["height"]

In [None]:
from torchvision.transforms import (CenterCrop, 
                                    Compose, 
                                    Normalize, 
                                    RandomHorizontalFlip,
                                    RandomResizedCrop, 
                                    Resize, 
                                    ToTensor)

normalize = Normalize(mean=image_mean, std=image_std)
_train_transforms = Compose(
        [
            RandomResizedCrop(size),
            RandomHorizontalFlip(),
            ToTensor(),
            normalize,
        ]
    )

_val_transforms = Compose(
        [
            Resize(size),
            CenterCrop(size),
            ToTensor(),
            normalize,
        ]
    )

def train_transforms(examples):
    examples['pixel_values'] = [_train_transforms(image.convert("RGB")) for image in examples['image']]
    return examples

def val_transforms(examples):
    examples['pixel_values'] = [_val_transforms(image.convert("RGB")) for image in examples['image']]
    return examples

In [None]:
# Set the transforms
train_ds.set_transform(train_transforms)
val_ds.set_transform(val_transforms)
test_ds.set_transform(val_transforms)

In [None]:
train_ds[:2]

In [None]:
from torch.utils.data import DataLoader
import torch

def collate_fn(examples):
    pixel_values = torch.stack([example["pixel_values"] for example in examples])
    labels = torch.tensor([example["label"] for example in examples])
    return {"pixel_values": pixel_values, "labels": labels}

train_batch_size = 8
eval_batch_size = 8

train_dataloader = DataLoader(train_ds, shuffle=True, collate_fn=collate_fn, batch_size=train_batch_size)
val_dataloader = DataLoader(val_ds, collate_fn=collate_fn, batch_size=eval_batch_size)
test_dataloader = DataLoader(test_ds, collate_fn=collate_fn, batch_size=eval_batch_size)

In [None]:
batch = next(iter(train_dataloader))
for k,v in batch.items():
  if isinstance(v, torch.Tensor):
    print(k, v.shape)

In [None]:
assert batch['pixel_values'].shape == (train_batch_size, 3, 224, 224)
assert batch['labels'].shape == (train_batch_size,)

In [None]:
next(iter(val_dataloader))['pixel_values'].shape

In [None]:
import pytorch_lightning as pl
from transformers import ViTForImageClassification, AdamW
import torch.nn as nn

class ViTLightningModule(pl.LightningModule):
    def __init__(self, num_labels=2):
        super(ViTLightningModule, self).__init__()
        self.vit = ViTForImageClassification.from_pretrained('google/vit-base-patch16-224-in21k',
                                                              num_labels=2,
                                                              id2label=id2label,
                                                              label2id=label2id)

    def forward(self, pixel_values):
        outputs = self.vit(pixel_values=pixel_values)
        return outputs.logits
        
    def common_step(self, batch, batch_idx):
        pixel_values = batch['pixel_values']
        labels = batch['labels']
        logits = self(pixel_values)

        criterion = nn.CrossEntropyLoss()
        loss = criterion(logits, labels)
        predictions = logits.argmax(-1)
        correct = (predictions == labels).sum().item()
        accuracy = correct/pixel_values.shape[0]

        return loss, accuracy
      
    def training_step(self, batch, batch_idx):
        loss, accuracy = self.common_step(batch, batch_idx)     
        # logs metrics for each training_step,
        # and the average across the epoch
        self.log("training_loss", loss)
        self.log("training_accuracy", accuracy)

        return loss
    
    def validation_step(self, batch, batch_idx):
        loss, accuracy = self.common_step(batch, batch_idx)     
        self.log("validation_loss", loss, on_epoch=True)
        self.log("validation_accuracy", accuracy, on_epoch=True)

        return loss

    def test_step(self, batch, batch_idx):
        loss, accuracy = self.common_step(batch, batch_idx)     
        self.log("test_loss", loss)
        self.log("test_accuracy", accuracy)
        return loss

    def configure_optimizers(self):
        # We could make the optimizer more fancy by adding a scheduler and specifying which parameters do
        # not require weight_decay but just using AdamW out-of-the-box works fine
        return AdamW(self.parameters(), lr=5e-5)

    def train_dataloader(self):
        return train_dataloader

    def val_dataloader(self):
        return val_dataloader

    def test_dataloader(self):
        return test_dataloader

In [None]:
model = ViTLightningModule.load_from_checkpoint("./lightning_logs/version_3/checkpoints/epoch=6-step=938.ckpt")

In [None]:
from PIL import Image
image_path = '../data/test/mouth/Yawning/327781720_1287182322180121_2380712887574834957_n.jpg'
image = Image.open(image_path).convert("RGB")
input_tensor = _val_transforms(image)
input_batch = input_tensor.unsqueeze(0)

In [None]:
model = model.to('cuda')
input_batch = input_batch.to('cuda')

In [None]:
model.eval()

# Perform prediction
with torch.no_grad():
    output = model(input_batch)

# Get the predicted class or values based on your model's output format
prediction = output.argmax()

print("Predicted class:", prediction.item())

In [None]:
output[0, 1].item()

In [None]:
import cv2

video_path = '../data/transformer/val/yawning/10-MaleNoGlasses-Yawning.avi'
cap = cv2.VideoCapture(video_path)

# Check if the video opened successfully
if not cap.isOpened():
    print("Error: Could not open video.")
    exit()

frames = []
while True:
    ret, frame = cap.read()
    if not ret:
        break
    frames.append(frame)

cap.release()

In [None]:
from PIL import Image

#model = model.to('cuda')  

predictions = []
consecutive = 0

for frame in frames:
    image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    input_tensor = _val_transforms(image).to('cuda').unsqueeze(0)  
    with torch.no_grad():
        output = model(input_tensor)
    prediction = output.argmax().item()
    if prediction == 1:
        consecutive += 1
    else:
        consecutive = 0
    if consecutive == 60:
        print(True)
    predictions.append(prediction)

# Print or use the predictions as needed
print(predictions)


In [None]:
import matplotlib.pyplot as plt

# Assuming you have 'predictions' and 'frames' from your previous code

# Plotting
plt.plot(range(1, len(predictions) + 1), predictions, marker='o')
plt.xlabel('Frame Number')
plt.ylabel('Yawn Prediction')
plt.title('Predictions Over Video')
plt.show()


In [None]:
import os

In [None]:
def vidToFrame(video_path):
    cap = cv2.VideoCapture(video_path)

    # Check if the video opened successfully
    if not cap.isOpened():
        print("Error: Could not open video.")
        exit()

    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)

    cap.release()
    return frames

In [None]:
def predict(frames):
    consecutive = 0

    for frame in frames:
        image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        input_tensor = _val_transforms(image).to('cuda').unsqueeze(0)  
        with torch.no_grad():
            output = model(input_tensor)
        prediction = output.argmax().item()
        if prediction == 1:
            consecutive += 1
        else:
            consecutive = 0
        if consecutive == 45:
            return True
        
    return False

In [None]:
correct = 0
total = 0
TP = 0
FP = 0
FN = 0
directory_path = '../data/transformer/combine/'
wrong = []

for folder_name in os.listdir(directory_path):
    #if folder_name == "talking":
    #    continue
    folder_path = os.path.join(directory_path, folder_name)
    for file_name in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file_name)
        frames = vidToFrame(file_path)
        prediction = predict(frames)
        if prediction and folder_name == "yawning":
            correct += 1
            TP += 1
        elif prediction and (folder_name == "normal" or folder_name == "talking"):
            FP += 1
            wrong.append(file_path)
        elif not prediction and (folder_name == "normal" or folder_name == "talking"):
            correct += 1
        elif not prediction and folder_name == "yawning":
            FN += 1
            wrong.append(file_path)
        total += 1

In [None]:
print(correct/total) # accuracy

In [None]:
print(TP/(TP+FP)) # Precision

In [None]:
print(TP/(TP+FN)) # Recall

In [None]:
wrong

In [None]:
total

In [None]:
directory_path = '../data/transformer/combine/talking&yawning/'

for file_name in os.listdir(directory_path):
    file_path = os.path.join(directory_path, file_name)
    frames = vidToFrame(file_path)
    prediction = predict(frames)
    if prediction:
        correct += 1
        TP += 1
    else:
        FN += 1
        wrong.append(file_path)
    total += 1

In [None]:
import pandas as pd
import numpy as np

In [None]:
def predict(frames):
    consecutive = 0
    running = 0

    for frame in frames:
        image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        input_tensor = _val_transforms(image).to('cuda').unsqueeze(0)  
        with torch.no_grad():
            output = model(input_tensor)
        prediction = output.argmax().item()
        if prediction == 1:
            running += 1
        else:
            consecutive = max(running, consecutive)
            running = 0
        
    return max(running, consecutive)

In [None]:
directory_path = '../data/transformer/combine/'
columns = ['filename', 'frames']
data_list = []

for folder_name in os.listdir(directory_path):
    if folder_name == "yawning":
        continue
    folder_path = os.path.join(directory_path, folder_name)
    for file_name in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file_name)
        frames = vidToFrame(file_path)
        prediction = predict(frames)
        new_row = {'filename': file_path, 'frames': prediction}
        data_list.append(new_row)

df = pd.DataFrame(data_list)

In [None]:
df.to_csv("noyawnFrames.csv")

In [None]:
df_yawn = pd.read_csv("yawnFrames.csv")
df_Noyawn = pd.read_csv("noyawnFrames.csv")

In [None]:
yawn = df_yawn["frames"].values
noyawn = df_Noyawn["frames"].values

In [None]:
threshold_sizes = [30, 45, 60, 75]
#threshold_sizes = np.arange(25, 75)

In [None]:
best_threshold = None
best_accuracy = 0
precision = 0
recall = 0

for threshold in threshold_sizes:
    yawn_classified = [size >= threshold for size in yawn]
    noyawn_classified = [size < threshold for size in noyawn]

    # Calculate accuracy
    correct_yawn = sum(yawn_classified)
    correct_noyawn = sum(noyawn_classified)
    total_correct = correct_yawn + correct_noyawn
    total_instances = len(yawn) + len(noyawn)
    accuracy = total_correct / total_instances

    # Update best threshold if accuracy improves
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        best_threshold = threshold
        precision = correct_yawn / (correct_yawn + (len(noyawn) - correct_noyawn))
        recall = correct_yawn / (correct_yawn + (len(yawn) - correct_yawn))

print(f"Best Threshold: {best_threshold}, Best Accuracy: {best_accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")