<a href="https://www.kaggle.com/code/sharooqfarzeenak/fine-tuning-google-vivit-for-deception-detection?scriptVersionId=212417102" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# Fine-tuning Google Video Vision Transformer (ViViT) using Real-life Deception Detection Dataset

## About

Dataset used - [Real-life Deception Detection Dataset](https://public.websites.umich.edu/~zmohamed/resources.html)

Model Used - [ViViT (Video Vision Transformer) - Video Classifier](https://huggingface.co/google/vivit-b-16x2)

Research Paper for the dataset - [Deception Detection using Real-life Trial Data](https://web.eecs.umich.edu/~zmohamed/PDFs/Trial.ICMI.pdf)

## Use-cases

1. Assessing job interview candidates
2. Criminal proceedings, trials

In [21]:
# Setting Global Variables

# Number of frames to read from each video
NO_OF_FRAMES = 32
# One frame from every FRAME_SAMPLE_RATE number of frames will be sampled
FRAME_SAMPLE_RATE = 8

# Installing required modules

In [22]:
!pip install -q transformers torch scikit-learn pyav datasets tqdm wandb

  pid, fd = os.forkpty()


In [23]:
# Setting device to GPU, if available
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

device

device(type='cuda')

In [24]:
# Importing required modules

import os
import numpy as np
import pandas as pd

import av # Video processing

from datasets import Dataset, load_from_disk

import torch
from transformers import TrainingArguments, Trainer # Fine-tuning
from transformers import VivitImageProcessor, VivitForVideoClassification # Model
from sklearn.metrics import accuracy_score

In [25]:
np.random.seed(0)

# Preparing the Data

## Function to read the video, get selected frames and convert it to a NumPy Array

In [26]:
import av

# Function to read the video, get selected frames and convert it to a NumPy Array
def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.
    Args:
        container (`av.container.input.InputContainer`): PyAV container.
        indices (`List[int]`): List of frame indices to decode.
    Returns:
        result (np.ndarray): np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

## Function to pick frame indices from the video for training

In [27]:
def sample_frame_indices(no_of_frames, frame_sample_rate, seg_len):
    '''
    Sample a given number of frame indices from the video.
    Args:
        no_of_frames (`int`): Total number of frames to sample.
        frame_sample_rate (`int`): Sample every n-th frame.
        seg_len (`int`): Maximum allowed index of sample's last frame.
    Returns:
        indices (`List[int]`): List of sampled frame indices
    '''
    # Uncomment to choose frames randomly; also comment out the next part
    # converted_len = int(clip_len * frame_sample_rate)
    # end_idx = np.random.randint(converted_len, seg_len)
    # start_idx = end_idx - converted_len
    # indices = np.linspace(start_idx, end_idx, num=clip_len)
    # indices = np.clip(indices, start_idx, end_idx - 1).astype(np.int64)
    # return indices

    end_idx = no_of_frames * frame_sample_rate
    start_idx = 0
    indices = np.linspace(start_idx, end_idx, num=no_of_frames, dtype=int)
    return indices

## Function to parse through all videos in both folders and create Dataset Dictionary

In [28]:
import os

# Function to parse through all video in both folders and create Dataset Dictionary
def frames_convert_and_create_dataset_dictionary(video_dir):
    all_videos=[]

    # Creating list of all video file names
    video_files = [
            os.path.join(video_dir, f)
            for f in os.listdir(video_dir)
            if f.endswith(".mp4")
    ]
    
    # Parsing through each file
    for file in video_files:
        # Extracting label name from file name
        if "lie" in file.lower():
            label = 0
        elif "truth" in file.lower():
            label = 1
        # Initializing the container
        container = av.open(file)

        # # Setting number of frames required
        # no_of_frames = 32
        # frame_sample_rate = 4

        # Total frames in video
        total_frames = container.streams.video[0].frames

        # Process only if total number of frames in the video is greater than what we are seeking
        if total_frames > (NO_OF_FRAMES * FRAME_SAMPLE_RATE):
            indices = sample_frame_indices(no_of_frames=NO_OF_FRAMES, frame_sample_rate=FRAME_SAMPLE_RATE, seg_len=total_frames)
            video = read_video_pyav(container=container, indices=indices)
            all_videos.append({'video': video, 'labels': label})
    
    return all_videos

## Passing dataset through VivitImageProcessor

In [29]:
from transformers import VivitImageProcessor

# Initializing image processor
image_processor = VivitImageProcessor.from_pretrained("google/vivit-b-16x2-kinetics400")

def process_example(example):
    inputs = image_processor(list(np.array(example['video'])), return_tensors='pt')
    inputs['labels'] = example['labels']
    return inputs

def create_vivit_dataset(list_of_dict):
    processed_list_of_dict = list(map(process_example,list_of_dict))
    return processed_list_of_dict

## Preparing the training and eval datasets

In [30]:
# Setting training path
train_videos_path = "/kaggle/input/real-life-deception-detection-dataset/Real-life Deception Detection Dataset With Train Test/Train"

In [31]:
import torch

# Some rows in the Dataset has frames less than 32
# Function finds such rows and removes them
def remove_bad_rows(dataset):
    bad_rows = []
    for i,row in enumerate(dataset):
     if torch.tensor(row['pixel_values']).shape[1] < 32:
         bad_rows.append(i)

    # Creating a list of indices excluding the rows to be removed
    indices_to_keep = [i for i in range(len(dataset)) if i not in bad_rows]
    
    # Select only the rows with those indices
    dataset = dataset.select(indices_to_keep)

    return dataset

In [32]:
def pre_process(path):

    # Preparing the training and eval datasets

    # Converts video files to a list of dictionaries containing keys 'video' and 'labels',
    # where 'video' contains 32 frames each from every video
    print("Creating list of dictionaries...\n")
    list_of_dictionaries = frames_convert_and_create_dataset_dictionary(video_dir=path)

    # Passing above dictionary through VivitImageProcessor
    print("Passing through VivitImageProcessor...\n")
    dataset = create_vivit_dataset(list_of_dictionaries)

    # Converting above dataset to Hugging Face Dataset for fine-tuning
    print("Converting to Hugging Face Dataset...\n")
    dataset_hf = Dataset.from_list(dataset)

    # Encoding classes to the Dataset
    print("Adding class encoding labels...\n")
    dataset_hf = dataset_hf.class_encode_column("labels")

    # Removing bad rows (rows with less than 32 frames)
    print("\nFinding and removing bad rows...\n")
    dataset_hf = remove_bad_rows(dataset_hf)


    # Squeezing; fine-tuning step will throw an errror without this step
    print("\nSqueezing pixel_values...\n")
    dataset_hf = dataset_hf.map(lambda x: {'pixel_values': torch.tensor(x['pixel_values']).to(device).squeeze()})

    print("\nSuccess.\n")

    return dataset_hf

In [33]:
# Getting training dataset
train_eval_dataset = pre_process(train_videos_path)

# Splitting to Train and Eval sets
train_eval_dataset = train_eval_dataset.train_test_split(test_size=0.1)

Creating list of dictionaries...

Passing through VivitImageProcessor...

Converting to Hugging Face Dataset...

Adding class encoding labels...



Stringifying the column:   0%|          | 0/98 [00:00<?, ? examples/s]

Casting to class labels:   0%|          | 0/98 [00:00<?, ? examples/s]


Finding and removing bad rows...



0it [00:00, ?it/s]


Squeezing pixel_values...



Map:   0%|          | 0/98 [00:00<?, ? examples/s]


Success.



In [34]:
# Saving dataset to disk
train_eval_dataset.save_to_disk("./processed_datasets/train")

Saving the dataset (0/4 shards):   0%|          | 0/88 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/10 [00:00<?, ? examples/s]

# Training the model

## Loading the dataset

In [73]:
from datasets import load_from_disk

train_dataset = load_from_disk("./processed_datasets/train/train")
eval_dataset = load_from_disk("./processed_datasets/train/test")

## Defining training arguments

In [74]:
from transformers import Trainer, TrainingArguments, EarlyStoppingCallback

In [75]:
# Defining training arguments

training_args = TrainingArguments(
    output_dir="./results",  # Directory to save model checkpoints
    evaluation_strategy="epoch",  # Evaluate at the end of every epoch
    save_strategy="epoch",        # Save checkpoints at every epoch
    logging_dir="./logs",         # Directory to save logs
    logging_strategy="epoch",
    save_total_limit=2,           # Only keep the 2 most recent checkpoints
    load_best_model_at_end=True,  # Automatically load the best model
    metric_for_best_model="eval_loss",  # Use validation loss to determine the best model
    greater_is_better=False,      # Lower validation loss is better
    num_train_epochs=5,          # Set max epochs (early stopping will halt earlier if needed)
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    seed=42,
)



## Defining the trainer

In [76]:
# Defining the trainer

import torch
from sklearn.metrics import accuracy_score
from transformers import VivitImageProcessor, VivitForVideoClassification

# Initializing model
model = VivitForVideoClassification.from_pretrained("google/vivit-b-16x2-kinetics400")
# Initializing image processor
image_processor = VivitImageProcessor.from_pretrained("google/vivit-b-16x2-kinetics400")


def compute_metrics(pred):
    logits, labels = pred
    logits = torch.tensor(logits)  # Convert logits to a PyTorch tensor
    predictions = torch.argmax(logits, dim=-1)
    acc = accuracy_score(labels, predictions)
    return {"accuracy": acc}

# Initialize the Trainer
trainer = Trainer(
    model=model,                   # Your Hugging Face model
    args=training_args,            # Training arguments
    train_dataset=train_dataset,   # Training data
    eval_dataset=eval_dataset,     # Validation data
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)],  # Add early stopping
    processing_class=image_processor,
    compute_metrics=compute_metrics,
)

##### **[Get your wandb api key](https://wandb.ai/authorize); if you do not already have one. You'll need it to track your training run.**

In [77]:
# Loading wandb api key

import wandb
import getpass
from kaggle_secrets import UserSecretsClient

user_secrets = UserSecretsClient()

try:
    my_secret = user_secrets.get_secret("wandb_api_key") 
    wandb.login(key=my_secret)
except:
    my_secret = getpass.getpass("Enter your wandb API Key")
    wandb.login(key=my_secret)



## Training

In [78]:
# Training
trainer.train()

  with torch.cuda.device(device), torch.cuda.stream(stream), autocast(enabled=autocast_enabled):


Epoch,Training Loss,Validation Loss,Accuracy
1,1.6994,3.525926,0.4
2,0.9957,0.071005,1.0
3,0.3436,1.156387,0.8
4,0.3169,2.825284,0.7


  with torch.cuda.device(device), torch.cuda.stream(stream), autocast(enabled=autocast_enabled):
  with torch.cuda.device(device), torch.cuda.stream(stream), autocast(enabled=autocast_enabled):
  with torch.cuda.device(device), torch.cuda.stream(stream), autocast(enabled=autocast_enabled):


TrainOutput(global_step=176, training_loss=0.8388915116136725, metrics={'train_runtime': 1248.4622, 'train_samples_per_second': 0.352, 'train_steps_per_second': 0.176, 'total_flos': 9.04954336247808e+17, 'train_loss': 0.8388915116136725, 'epoch': 4.0})

## Saving the model

In [79]:
# Saving the model
trainer.save_model("./vivit_finetuned_deception_detection")

# Testing

## Creating the test dataset

In [80]:
# Setting test video path
test_videos_path = "/kaggle/input/real-life-deception-detection-dataset/Real-life Deception Detection Dataset With Train Test/Test"

In [81]:
from transformers import VivitImageProcessor
import av

def create_test_data(path):
    
    image_processor = VivitImageProcessor.from_pretrained("google/vivit-b-16x2-kinetics400")
    
    test_dataset=[]
    test_labels = []

    # Creating list of all video file names
    video_files = [
            os.path.join(path, f)
            for f in os.listdir(path)
            if f.endswith(".mp4")
    ] 

    for file in video_files:

        if "lie" in file.lower():
            label = 0
        elif "truth" in file.lower():
            label = 1
            
        container = av.open(file)

        # sample 32 frames
        indices = sample_frame_indices(no_of_frames=NO_OF_FRAMES, frame_sample_rate=FRAME_SAMPLE_RATE, seg_len=container.streams.video[0].frames)
        
        video = read_video_pyav(container=container, indices=indices)
        
        image_processor = VivitImageProcessor.from_pretrained("google/vivit-b-16x2-kinetics400")
        
        inputs = image_processor(list(video), return_tensors="pt")

        test_dataset.append(inputs)
        test_labels.append(label)
    
    return test_dataset, test_labels    

## Predicting for test data

In [82]:
test_data, test_labels = create_test_data(test_videos_path)

In [83]:
from transformers import VivitModel

def test(test_data):
    
    model = VivitForVideoClassification.from_pretrained("./vivit_finetuned_deception_detection")

    predictions = []
    
    for inputs in test_data:
        
        with torch.no_grad():
            outputs = model(**inputs)
            logits = outputs.logits
        
        # model predicts one of the 400 Kinetics-400 classes
        predicted_label = logits.argmax(-1).item()

        predictions.append(predicted_label)

    return predictions

In [84]:
pred_labels = test(test_data)

## Accuracy

In [85]:
from sklearn.metrics import accuracy_score



score = accuracy_score(y_true = test_labels, y_pred=pred_labels)

score

0.25

#### Accuracy can be improved by training with more frames per video, by increasing 'NO_OF_FRAMES' and/or 'FRAME_SAMPLE_RATE' variables, given better hardware capabilities.