In [1]:
!pip install albumentations==1.3.0
!pip install pytorch_lightning
!pip install comet-ml

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting albumentations==1.3.0
  Downloading albumentations-1.3.0-py3-none-any.whl (123 kB)
[K     |████████████████████████████████| 123 kB 13.4 MB/s 
Installing collected packages: albumentations
  Attempting uninstall: albumentations
    Found existing installation: albumentations 1.2.1
    Uninstalling albumentations-1.2.1:
      Successfully uninstalled albumentations-1.2.1
Successfully installed albumentations-1.3.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pytorch_lightning
  Downloading pytorch_lightning-1.8.2-py3-none-any.whl (798 kB)
[K     |████████████████████████████████| 798 kB 10.2 MB/s 
Collecting torchmetrics>=0.7.0
  Downloading torchmetrics-0.10.3-py3-none-any.whl (529 kB)
[K     |████████████████████████████████| 529 kB 15.9 MB/s 
Collecting lightning-utilities==0.3.*
  Downloading lightning_utili

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
import os
import argparse
import importlib
import torch
from torch.utils.data import Dataset
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve
import random
import cv2
from tqdm.auto import tqdm
from torch.nn import functional as F
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR
from sklearn.metrics import accuracy_score
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from pytorch_lightning.loggers import CSVLogger, CometLogger
from pytorch_lightning.callbacks import LearningRateMonitor
from pytorch_lightning.callbacks import ModelCheckpoint
import albumentations as A 
from albumentations.pytorch.transforms import ToTensorV2
import torch.nn as nn
import torch.optim as optim
import torch.backends.cudnn as cudnn
from torchvision import models
import time
import copy

## Utils

In [5]:
def seed_torch(seed=42):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

In [6]:
def eer_metric(y_truth, y_pred):
    fpr, tpr, threshold = roc_curve(y_truth, y_pred, pos_label=1)
    fnr = 1 - tpr
    eer_threshold = threshold[np.nanargmin(np.absolute((fnr - fpr)))]
    eer = fpr[np.nanargmin(np.absolute((fnr - fpr)))]
    return eer, fpr, fnr, eer_threshold

## Load data

In [7]:
class LivenessDataset(Dataset):
    def __init__(self, cfg, df, video_dir, transforms):
        self.cfg = cfg
        self.df = df.reset_index(drop=True)
        self.video_dir = video_dir
        self.transforms = transforms

    def __len__(self):
        return len(self.df)

    def __getitem__(self, item):
        row = self.df.iloc[item]
        vid_name = row['fname']
        vid_path = os.path.join(self.video_dir, vid_name)
        cap = cv2.VideoCapture(vid_path)
        frame_no = np.random.choice(int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), 1, replace=False)
        cap.set(1, frame_no[0])  # Where frame_no is the frame you want
        ret, im = cap.read()
        if len(im) == 0:
            assert False
        im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
        im_ts = self.transforms(image=im)['image'].float()
        if 'liveness_score' in self.df.columns:
            label = torch.tensor(row['liveness_score']).float()
        else:
            label = -1
        return im_ts, label

## Model

In [8]:
class LivenessModel(pl.LightningModule):
    def __init__(self, cfg):
        super().__init__()
        self.cfg = cfg
        self.backbone = models.efficientnet_v2_l(pretrained=True)
        clf_in_feature = self.backbone.classifier[1].in_features
        self.backbone.classifier[1] = nn.Linear(clf_in_feature, 1)
        self.criterion = nn.BCEWithLogitsLoss()
        
    def forward(self, X):
        return  self.backbone(X)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
        optimizer = AdamW(self.backbone.parameters(), lr=self.cfg.init_lr, eps=self.cfg.eps, betas=self.cfg.betas)
        num_train_steps = int(self.cfg.num_train_examples / self.cfg.batch_size * self.cfg.epochs)

        lr_scheduler = CosineAnnealingLR(optimizer, T_max=num_train_steps, eta_min=self.cfg.min_lr)

        return {
            "optimizer": optimizer,
            "lr_scheduler": {
                "scheduler": lr_scheduler,
            },
        }

    def step(self, X, y):
        X = X.to(self.device)
        y = y.to(self.device)
        y_pred = self(X).view(-1)
        loss = self.criterion(y_pred, y)
        return loss, y_pred

    def training_step(self, train_batch, batch_idx):
        X, y = train_batch
        loss, y_pred = self.step(X, y)
        self.log('train_loss', loss)
        return {'loss': loss, 'preds':y_pred, 'labels':y}

    def validation_step(self, val_batch, batch_idx):
        X, y = val_batch
        loss, y_pred = self.step(X, y)
        self.log('val_loss', loss)
        y_prob = y_pred.sigmoid()
        return {'loss': loss, 'preds':y_prob, 'labels':y}

    def predict_step(self, test_batch, batch_idx):
        X = test_batch[0]
        X = X.to(self.device)
        y_pred = self(X).view(-1)
        y_prob = y_pred.sigmoid()
        return y_prob

    def compute_metrics(self, outputs):
        all_preds = np.concatenate([out['preds'].detach().cpu().numpy() for out in outputs])
        all_labels = np.concatenate([out['labels'].detach().cpu().numpy() for out in outputs])
        all_preds = (all_preds > 0.5).astype(int)
        acc = float(accuracy_score(y_true=all_labels, y_pred=all_preds))
        return acc

    def training_epoch_end(self, training_step_outputs):
        train_acc = self.compute_metrics(training_step_outputs)
        self.log('train_acc', train_acc)
        
    def validation_epoch_end(self, validation_step_outputs):
        val_acc = self.compute_metrics(validation_step_outputs)
        self.log('val_acc', val_acc)

## Config

In [9]:
class dotdict(dict):
    """ dot.notation access to dictionary attributes """
    __getattr__ = dict.get
    __setattr__ = dict.__setitem__
    __delattr__ = dict.__delitem__

In [10]:
root_folder = '/content/drive/MyDrive/ZaloAI_2022/LivenessDetection'
cfg = {
    'root_folder': '/content/drive/MyDrive/ZaloAI_2022/LivenessDetection',
    'run_folds': [0, 1], 
    'accelerator': 'gpu', 
    'devices': 'cuda',
    'comet_api_key': 'XlgNWw0MV9ThHRKRSxLMcfNo3', 
    'comet_project_name': 'Zalo22Liveness',
    'im_size': 224, 
    'num_workers': 0,
    'gradient_checkpointing': False, 
    'scheduler': 'cosine',
    'batch_scheduler': True,
    'num_cycles': 0.5, 
    'num_warmup_steps': 0,
    'epochs': 20,
    'init_lr': 1e-4,
    'min_lr': 1e-6,
    'eps': 1e-6, 
    'betas': (0.9, 0.999),
    'batch_size': 32,
    'weight_decay': 0.01,
    'gradient_accumulation_steps': 1,
    "max_grad_norm": 1000,
    "seed": 42,
    'sample': None,
    'patience': 10,
    'metadata_file': f'{root_folder}/Dataset/train/identified_metadata.csv',
    'video_dir': f'{root_folder}/Dataset/train/videos',
    'weight': f'{root_folder}/models/efficientnet_v2_l/fold0/epoch=17-val_loss=0.029-val_acc=1.000.ckpt',
    'test_video_dir': f'{root_folder}/Dataset/public/videos',
    }

cfg = dotdict(cfg)

In [12]:
cfg.val_transforms = A.Compose(
        [
            A.Resize(height=cfg.im_size, width=cfg.im_size, always_apply=True),
            A.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
            ToTensorV2(always_apply=True),
        ],
        p=1.0,
       
    )

## Predict

In [None]:
model = LivenessModel.load_from_checkpoint(cfg.weight, cfg=cfg)

In [None]:
# Choose frames at each vid to infer
fnames = os.listdir(cfg.test_video_dir)
test_df = pd.DataFrame(fnames)
test_df.columns = ['fname']

vid_names = []
frame_indices = []
for i, row in test_df.iterrows():
    np.random.seed(cfg.seed)
    vid_path = os.path.join(cfg.test_video_dir, row['fname'])
    cap = cv2.VideoCapture(vid_path)
    length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    indices = np.random.choice(int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), 1, replace=False)
    vid_names.append(row['fname'])
    frame_indices.append(indices[0])

ind_df = pd.DataFrame({'fname': vid_names, 'frame_index': frame_indices})
test_df = ind_df.merge(test_df, on=['fname'])

In [None]:
test_df

In [None]:
test_ds = LivenessDataset(cfg, test_df, cfg.test_video_dir, cfg.val_transforms)


batch_size = cfg.batch_size
test_loader = torch.utils.data.DataLoader(test_ds,batch_size=batch_size,num_workers=cfg.num_workers,
                                            shuffle=False,pin_memory=True,drop_last=False)

In [None]:
trainer = pl.Trainer(default_root_dir=cfg.output_dir,  
                    # logger=logger,
                    accelerator=cfg.accelerator, devices=1)

In [None]:
test_preds = trainer.predict(model, dataloaders=test_loader)

In [None]:
test_preds = torch.cat(test_preds)
test_preds = test_preds.cpu().numpy()
test_df['prob'] = test_preds

In [None]:
test_df

In [None]:
sub = test_df[['fname', 'prob']]
sub.columns = ['fname', 'liveness_score']

In [None]:
sub.to_csv(os.path.join('/content/drive/MyDrive/ZaloAI_2022/LivenessDetection/models/efficientnet_v2_l/submission_fold0_test1.csv'), index=False)

## Evaluate

In [None]:
cfg.fold = 0
df = pd.read_csv(cfg.metadata_file)
df = df[df['set']=="train"].reset_index(drop=True)
train_df = df[df.fold != cfg.fold]
val_df = df[df.fold == cfg.fold]

In [None]:
val_ds = LivenessDataset(cfg, val_df, cfg.video_dir, cfg.val_transforms)

batch_size = cfg.batch_size
valid_loader = torch.utils.data.DataLoader(val_ds,batch_size=batch_size,num_workers=cfg.num_workers,
                                            shuffle=False,pin_memory=True,drop_last=False)

In [None]:
val_preds = trainer.predict(model, dataloaders=valid_loader)

In [None]:
val_preds = torch.cat(val_preds)
val_preds = val_preds.cpu().numpy()

In [None]:
val_df.loc[:, 'prob'] = val_preds

In [None]:
y = val_df['liveness_score']
y_pred = val_df['prob']

In [None]:
eer, fpr, fnr, eer_threshold = eer_metric(y, y_pred)

In [None]:
fpr, tpr, threshold = roc_curve(y, y_pred, pos_label=1)
fnr = 1 - tpr
eer_threshold = threshold[np.nanargmin(np.absolute((fnr - fpr)))]
_filter = threshold <= 1
plt.plot(threshold[_filter], fnr[_filter], label='FRR')
plt.plot(threshold[_filter], fpr[_filter], label='FAR')
plt.legend()
plt.show()

In [None]:
eer = fpr[np.nanargmin(np.absolute((fnr - fpr)))]
print('Threshold at the intersection of FRR and FAR:', eer_threshold)
print(f'Equal Error Rate (EER) on valid fold {cfg.fold}:', eer)