<a href="https://www.kaggle.com/code/vovanquangnbk/fas-train?scriptVersionId=146579980" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

## Set up

In [1]:
from glob import glob
from sklearn.model_selection import train_test_split, GroupShuffleSplit
import cv2
from skimage import io
import torch
from torch import nn
import os
from datetime import datetime
import time
import random
import cv2
import torchvision
from torchvision import transforms
import pandas as pd
import numpy as np
import math
from tqdm import tqdm

import matplotlib.pyplot as plt
from torch.utils.data import Dataset,DataLoader
from torch.utils.data.sampler import SequentialSampler, RandomSampler
from torch.cuda.amp import autocast, GradScaler
from torch.nn.modules.loss import _WeightedLoss
import torch.nn.functional as F
from torch.nn import Parameter
import torch.utils.model_zoo as model_zoo
from torch import nn
from torch import optim

import sklearn
import warnings
import joblib
from sklearn.metrics import roc_auc_score, log_loss, roc_curve, auc
from sklearn import metrics
import warnings
import cv2



In [2]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Running on device: {device}')

Running on device: cuda


In [3]:
data_dir = '/kaggle/input/liveness-detection-zalo-2022/train/train'
metadata_dir = os.path.join(data_dir, "label.csv")
videos_dir = os.path.join(data_dir, "videos")

In [4]:
CFG = {
    'show_examples': False,
    'theta': 0.7,
    'seed': 719,
    'epochs': 15,
    'step_size': 10,
    'gamma': 0.5,
    'train_bs': 16,
    'valid_bs': 2,
    'lr': 1e-4,
    'weight_decay':5e-5,
    'num_workers': 2,
}

In [5]:
df = pd.read_csv(metadata_dir)
if CFG['show_examples']:
    print(df.head())
    print(df['liveness_score'].value_counts())

## Utils

In [6]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

## Train Dataset

In [7]:
class RandomErasing(object):
    '''
    Class that performs Random Erasing in Random Erasing Data Augmentation by Zhong et al. 
    -------------------------------------------------------------------------------------
    probability: The probability that the operation will be performed.
    sl: min erasing area
    sh: max erasing area
    r1: min aspect ratio
    mean: erasing value
    -------------------------------------------------------------------------------------
    '''
    def __init__(self, probability = 0.5, sl = 0.01, sh = 0.05, r1 = 0.5, mean=[0.4914, 0.4822, 0.4465]):
        self.probability = probability
        self.mean = mean
        self.sl = sl
        self.sh = sh
        self.r1 = r1
       
    def __call__(self, sample):
        img, binary_mask, spoofing_label = sample['image_x'], sample['binary_mask'],sample['spoofing_label']
        
        if random.uniform(0, 1) < self.probability:
            attempts = np.random.randint(1, 3)
            for attempt in range(attempts):
                area = img.shape[0] * img.shape[1]
           
                target_area = random.uniform(self.sl, self.sh) * area
                aspect_ratio = random.uniform(self.r1, 1/self.r1)
    
                h = int(round(math.sqrt(target_area * aspect_ratio)))
                w = int(round(math.sqrt(target_area / aspect_ratio)))
    
                if w < img.shape[1] and h < img.shape[0]:
                    x1 = random.randint(0, img.shape[0] - h)
                    y1 = random.randint(0, img.shape[1] - w)

                    img[x1:x1+h, y1:y1+w, 0] = self.mean[0]
                    img[x1:x1+h, y1:y1+w, 1] = self.mean[1]
                    img[x1:x1+h, y1:y1+w, 2] = self.mean[2]
                    
        return {'image_x': img, 'binary_mask': binary_mask, 'spoofing_label': spoofing_label}


# Tensor
class Cutout(object):
    def __init__(self, length=50):
        self.length = length

    def __call__(self, sample):
        img, binary_mask, spoofing_label = sample['image_x'], sample['binary_mask'],sample['spoofing_label']
        h, w = img.shape[1], img.shape[2]    # Tensor [1][2],  nparray [0][1]
        mask = np.ones((h, w), np.float32)
        y = np.random.randint(h)
        x = np.random.randint(w)
        length_new = np.random.randint(1, self.length)
        
        y1 = np.clip(y - length_new // 2, 0, h)
        y2 = np.clip(y + length_new // 2, 0, h)
        x1 = np.clip(x - length_new // 2, 0, w)
        x2 = np.clip(x + length_new // 2, 0, w)

        mask[y1: y2, x1: x2] = 0.
        mask = torch.from_numpy(mask)
        mask = mask.expand_as(img)
        img *= mask
        return {'image_x': img, 'binary_mask': binary_mask, 'spoofing_label': spoofing_label}


class Normaliztion(object):
    """
        same as mxnet, normalize into [-1, 1]
        image = (image - 127.5)/128
    """
    def __call__(self, sample):
        image_x, binary_mask, spoofing_label = sample['image_x'], sample['binary_mask'],sample['spoofing_label']
        
        new_image_x = (image_x - 127.5)/128     # [-1,1]

        return {'image_x': new_image_x, 'binary_mask': binary_mask, 'spoofing_label': spoofing_label}


class RandomHorizontalFlip(object):
    """Horizontally flip the given Image randomly with a probability of 0.5."""
    def __call__(self, sample):
        image_x, binary_mask, spoofing_label = sample['image_x'], sample['binary_mask'],sample['spoofing_label']
        
        new_image_x = np.zeros((256, 256, 3))
        new_binary_mask = np.zeros((32, 32))

        p = random.random()
        if p < 0.5:

            new_image_x = cv2.flip(image_x, 1)
            new_binary_mask = cv2.flip(binary_mask, 1)
           
                
            return {'image_x': new_image_x, 'binary_mask': new_binary_mask, 'spoofing_label': spoofing_label}
        else:
            return {'image_x': image_x, 'binary_mask': binary_mask, 'spoofing_label': spoofing_label}



class ToTensor(object):
    """
        Convert ndarrays in sample to Tensors.
        process only one batch every time
    """

    def __call__(self, sample):
        image_x, binary_mask, spoofing_label = sample['image_x'], sample['binary_mask'],sample['spoofing_label']
        
        # swap color axis because
        # numpy image: (batch_size) x H x W x C
        # torch image: (batch_size) x C X H X W
        image_x = image_x[:,:,::-1].transpose((2, 0, 1))
        image_x = np.array(image_x)
        
        binary_mask = np.array(binary_mask)

                        
        spoofing_label_np = np.array([0])
        spoofing_label_np[0] = spoofing_label
        
        
        return {'image_x': torch.from_numpy(image_x).float(), 'binary_mask': torch.from_numpy(binary_mask).float(), 'spoofing_label': torch.from_numpy(spoofing_label_np).float()}

In [8]:
class TrainDataset(Dataset):

    def __init__(self, df, root_dir,  transform=None):

        self.df = df.copy()
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    
    def __getitem__(self, idx):
        fname = str(self.df.iloc[idx, 0])
        v_dir = os.path.join(self.root_dir, fname)
    
        image_x, binary_mask = self.get_single_image_x(v_dir)
        
        spoofing_label = self.df.iloc[idx, 1]
        if spoofing_label == 1:
            spoofing_label = 1            # real
        else:
            spoofing_label = 0            # fake
            binary_mask = np.zeros((32, 32))     

        sample = {'image_x': image_x, 'binary_mask': binary_mask, 'spoofing_label': spoofing_label}

        if self.transform:
            sample = self.transform(sample)
        return sample

    
    def get_single_image_x(self, v_dir):
        image_x = np.zeros((256, 256, 3))
        binary_mask = np.zeros((32, 32))
        
        ## Pick a random frame
        v_cap = cv2.VideoCapture(v_dir)

        if v_cap.grab():
            v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT))
            frame_idx = random.randint(0,v_len-1)
            v_cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
            _, image_x_temp = v_cap.retrieve()
        else:
            image_x_temp = np.zeros((256, 256, 3))
        v_cap.release()
        
        image_x_temp_gray = cv2.cvtColor(image_x_temp, cv2.COLOR_BGR2GRAY)

        image_x = cv2.resize(image_x_temp, (256, 256))
        image_x_temp_gray = cv2.resize(image_x_temp_gray, (32, 32))
        
        for i in range(32):
            for j in range(32):
                if image_x_temp_gray[i,j]>0:
                    binary_mask[i,j]=1
                else:
                    binary_mask[i,j]=0
        
        return image_x, binary_mask

In [9]:
# Show examples
if CFG['show_examples']:
    transform = transforms.Compose([RandomErasing(), RandomHorizontalFlip(),  ToTensor(), Cutout(), Normaliztion()])
    dataset = TrainDataset(df, videos_dir, transform)
    for i, sample in enumerate(dataset):

        print(sample['image_x'].shape)
        print(torch.mean(sample['image_x']))
        print(sample['binary_mask'].shape)
        print(torch.mean(sample['binary_mask']))
        print(sample['spoofing_label'])
        
        if i > 2:
            break

## Val Dataset

In [10]:
class Normaliztion_valtest(object):
    """
        same as mxnet, normalize into [-1, 1]
        image = (image - 127.5)/128
    """
    def __call__(self, sample):
        image_x, binary_mask, spoofing_label = sample['image_x'],sample['binary_mask'] ,sample['spoofing_label']
        new_image_x = (image_x - 127.5)/128     # [-1,1]
        return {'image_x': new_image_x, 'binary_mask': binary_mask , 'spoofing_label': spoofing_label}


class ToTensor_valtest(object):
    """
        Convert ndarrays in sample to Tensors.
        process only one batch every time
    """

    def __call__(self, sample):
        image_x, binary_mask, spoofing_label = sample['image_x'],sample['binary_mask'] ,sample['spoofing_label']
        
        # swap color axis because    BGR2RGB
        # numpy image: (batch_size) x T x H x W x C
        # torch image: (batch_size) x T x C X H X W
        image_x = image_x[:,:,:,::-1].transpose((0, 3, 1, 2))
        image_x = np.array(image_x)
        
        binary_mask = np.array(binary_mask)
                        
        spoofing_label_np = np.array([0])
        spoofing_label_np[0] = spoofing_label
        
        return {'image_x': torch.from_numpy(image_x).float(), 'binary_mask': torch.from_numpy(binary_mask).float(),'spoofing_label': torch.from_numpy(spoofing_label_np).long()}

In [11]:
class ValDataset(Dataset):
    
    def __init__(self, df, root_dir,  transform=None):

        self.df = df.copy()
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    
    def __getitem__(self, idx):

        fname = str(self.df.iloc[idx, 0])
        v_dir = os.path.join(self.root_dir, fname)
    
        image_x, binary_mask = self.get_single_image_x(v_dir)
        spoofing_label = self.df.iloc[idx, 1]
        if spoofing_label == 1:
            spoofing_label = 1            # real
        else:
            spoofing_label = 0            # fake
            
        sample = {'image_x': image_x, 'binary_mask': binary_mask, 'spoofing_label': spoofing_label}

        if self.transform:
            sample = self.transform(sample)
        return sample

    def get_single_image_x(self, v_dir):
        image_x = np.zeros((256, 256, 3))
        binary_mask = np.zeros((32, 32))
        
        # Extract randomly 3 frams
        frames = []
        v_cap = cv2.VideoCapture(v_dir)
        success = v_cap.grab()
        v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fnos = sorted(random.sample(range(v_len), 3))
        
        # set initial frame 
        v_cap.set(cv2.CAP_PROP_POS_FRAMES, fnos[0])

        pos, count = 0, fnos[0]
        while success:
            if count == fnos[pos]:
                success, frame = v_cap.retrieve()
                if not success:               
                    break
                    
                frames.append(frame)
                pos += 1
                if pos >= len(fnos):
                    break

            count += 1
            success = v_cap.grab()

        v_cap.release()
        
        frames_total = len(frames)
        
        image_x = np.zeros((frames_total, 256, 256, 3))
        binary_mask = np.zeros((frames_total, 32, 32))
        
        
        for ii in range(frames_total):
            image_x_temp = frames[ii]
            image_x[ii,:,:,:] = cv2.resize(image_x_temp, (256, 256))
            
            image_x_temp_gray = cv2.cvtColor(image_x_temp, cv2.COLOR_BGR2GRAY)
            image_x_temp_gray = cv2.resize(image_x_temp_gray, (32, 32))
            
            for i in range(32):
                for j in range(32):
                    if image_x_temp_gray[i,j]>0:
                        binary_mask[ii, i, j]=1.0
                    else:
                        binary_mask[ii, i, j]=0.0
            
        
        return image_x, binary_mask

In [12]:
# Show examples
if CFG['show_examples']:
    transform = transforms.Compose([Normaliztion_valtest(), ToTensor_valtest()])
    dataset = ValDataset(df, videos_dir, transform)
    for i, val_sample in enumerate(dataset):
        print(val_sample['image_x'].shape)
        print(torch.mean(val_sample['image_x']))
        print(val_sample['binary_mask'].shape)
        print(torch.mean(val_sample['binary_mask']))
        print(val_sample['spoofing_label'])
        
        if i > 1:
            break

## Model

In [13]:
############
## CDC block
############
class Conv2d_cd(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1,
                 padding=1, dilation=1, groups=1, bias=False, theta=CFG['theta']):

        super(Conv2d_cd, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding, dilation=dilation, groups=groups, bias=bias)
        self.theta = theta

    def forward(self, x):
        out_normal = self.conv(x)

        if math.fabs(self.theta - 0.0) < 1e-8:
            return out_normal 
        else:
            #pdb.set_trace()
            [C_out,C_in, kernel_size,kernel_size] = self.conv.weight.shape
            kernel_diff = self.conv.weight.sum(2).sum(2)
            kernel_diff = kernel_diff[:, :, None, None]
            out_diff = F.conv2d(input=x, weight=kernel_diff, bias=self.conv.bias, stride=self.conv.stride, padding=0, dilation=self.conv.dilation, groups=self.conv.groups)

            return out_normal - self.theta * out_diff

############
## Attention block
############
class SpatialAttention(nn.Module):
    def __init__(self, kernel = 3):
        super(SpatialAttention, self).__init__()


        self.conv1 = nn.Conv2d(2, 1, kernel_size=kernel, padding=kernel//2, bias=False)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        avg_out = torch.mean(x, dim=1, keepdim=True)
        max_out, _ = torch.max(x, dim=1, keepdim=True)
        x = torch.cat([avg_out, max_out], dim=1)
        x = self.conv1(x)
        
        return self.sigmoid(x)

############
## CDCN architecture
############

class CDCN(nn.Module):

    def __init__(self, basic_conv=Conv2d_cd, theta=CFG['theta']):   
        super(CDCN, self).__init__()
        
        
        self.conv1 = nn.Sequential(
            basic_conv(3, 80, kernel_size=3, stride=1, padding=1, bias=False, theta=theta),
            nn.BatchNorm2d(80),
            nn.ReLU(),    
            
        )
        
        self.Block1 = nn.Sequential(
            basic_conv(80, 160, kernel_size=3, stride=1, padding=1, bias=False, theta= theta),
            nn.BatchNorm2d(160),
            nn.ReLU(),  
            
            basic_conv(160, int(160*1.6), kernel_size=3, stride=1, padding=1, bias=False, theta= theta),
            nn.BatchNorm2d(int(160*1.6)),
            nn.ReLU(),  
            basic_conv(int(160*1.6), 160, kernel_size=3, stride=1, padding=1, bias=False, theta= theta),
            nn.BatchNorm2d(160),
            nn.ReLU(), 
            
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            
        )
        
        self.Block2 = nn.Sequential(
            basic_conv(160, int(160*1.2), kernel_size=3, stride=1, padding=1, bias=False, theta= theta),
            nn.BatchNorm2d(int(160*1.2)),
            nn.ReLU(),  
            basic_conv(int(160*1.2), 160, kernel_size=3, stride=1, padding=1, bias=False, theta= theta),
            nn.BatchNorm2d(160),
            nn.ReLU(),  
            basic_conv(160, int(160*1.4), kernel_size=3, stride=1, padding=1, bias=False, theta= theta),
            nn.BatchNorm2d(int(160*1.4)),
            nn.ReLU(),  
            basic_conv(int(160*1.4), 160, kernel_size=3, stride=1, padding=1, bias=False, theta= theta),
            nn.BatchNorm2d(160),
            nn.ReLU(),  
            
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
        )
        
        self.Block3 = nn.Sequential(
            basic_conv(160, 160, kernel_size=3, stride=1, padding=1, bias=False, theta= theta),
            nn.BatchNorm2d(160),
            nn.ReLU(), 
            basic_conv(160, int(160*1.2), kernel_size=3, stride=1, padding=1, bias=False, theta= theta),
            nn.BatchNorm2d(int(160*1.2)),
            nn.ReLU(),  
            basic_conv(int(160*1.2), 160, kernel_size=3, stride=1, padding=1, bias=False, theta= theta),
            nn.BatchNorm2d(160),
            nn.ReLU(), 
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
        )
        
        # Original
        
        self.lastconv1 = nn.Sequential(
            basic_conv(160*3, 160, kernel_size=3, stride=1, padding=1, bias=False, theta= theta),
            nn.BatchNorm2d(160),
            nn.ReLU(),
            basic_conv(160, 1, kernel_size=3, stride=1, padding=1, bias=False, theta= theta),
            nn.ReLU(),    
        )
        
      
        self.sa1 = SpatialAttention(kernel = 7)
        self.sa2 = SpatialAttention(kernel = 5)
        self.sa3 = SpatialAttention(kernel = 3)
        self.downsample32x32 = nn.Upsample(size=(32, 32), mode='bilinear')

 
    def forward(self, x):	    	# x [3, 256, 256]
        
        x_input = x
        x = self.conv1(x)		   
        
        x_Block1 = self.Block1(x)	    	    	
        attention1 = self.sa1(x_Block1)
        x_Block1_SA = attention1 * x_Block1
        x_Block1_32x32 = self.downsample32x32(x_Block1_SA)   
        
        x_Block2 = self.Block2(x_Block1)	    
        attention2 = self.sa2(x_Block2)  
        x_Block2_SA = attention2 * x_Block2
        x_Block2_32x32 = self.downsample32x32(x_Block2_SA)  
        
        x_Block3 = self.Block3(x_Block2)	    
        attention3 = self.sa3(x_Block3)  
        x_Block3_SA = attention3 * x_Block3	
        x_Block3_32x32 = self.downsample32x32(x_Block3_SA)   
        
        x_concat = torch.cat((x_Block1_32x32,x_Block2_32x32,x_Block3_32x32), dim=1)    
        
        map_x = self.lastconv1(x_concat)
        
        map_x = map_x.squeeze(1)
        
        return map_x, x_concat, attention1, attention2, attention3, x_input

In [14]:
# Test dataset
if CFG['show_examples']:
    model = CDCN(basic_conv=Conv2d_cd, theta=CFG['theta'])
    inputs, binary_mask, spoof_label = sample['image_x'], sample['binary_mask'], sample['spoofing_label']
    map_x, embedding, x_Block1, x_Block2, x_Block3, x_input =  model(inputs.unsqueeze(0))
    print(map_x.shape)
    print(map_x)
    print(binary_mask.shape)
    print(binary_mask)

## Loss

In [15]:
def contrast_depth_conv(inputs):
    ''' compute contrast depth in both of (out, label) '''
    '''
        input  32x32
        output 8x32x32
    '''
    

    kernel_filter_list =[
                        [[1,0,0],[0,-1,0],[0,0,0]], [[0,1,0],[0,-1,0],[0,0,0]], [[0,0,1],[0,-1,0],[0,0,0]],
                        [[0,0,0],[1,-1,0],[0,0,0]], [[0,0,0],[0,-1,1],[0,0,0]],
                        [[0,0,0],[0,-1,0],[1,0,0]], [[0,0,0],[0,-1,0],[0,1,0]], [[0,0,0],[0,-1,0],[0,0,1]]
                        ]
    
    kernel_filter = np.array(kernel_filter_list, np.float32)
    
    kernel_filter = torch.from_numpy(kernel_filter).float().to(device)
    # weights (in_channel, out_channel, kernel, kernel)
    kernel_filter = kernel_filter.unsqueeze(dim=1)
    
    inputs = inputs.unsqueeze(dim=1).expand(inputs.shape[0], 8, inputs.shape[1],inputs.shape[2])
    
    contrast_depth = F.conv2d(inputs, weight=kernel_filter, groups=8)  # depthwise conv
    
    return contrast_depth


class Contrast_depth_loss(nn.Module):    # Pearson range [-1, 1] so if < 0, abs|loss| ; if >0, 1- loss
    def __init__(self):
        super(Contrast_depth_loss,self).__init__()
        return
    def forward(self, out, label): 
        '''
        compute contrast depth in both of (out, label),
        then get the loss of them
        tf.atrous_convd match tf-versions: 1.4
        '''
        contrast_out = contrast_depth_conv(out)
        contrast_label = contrast_depth_conv(label)
        
        criterion_MSE = nn.MSELoss().to(device)
    
        loss = criterion_MSE(contrast_out, contrast_label)
    
        return loss

In [16]:
if CFG['show_examples']:
    criterion = Contrast_depth_loss()
    print(criterion(binary_mask.unsqueeze(0).to(device), map_x.to(device)))

## Trainer

In [17]:
## Utils
class AvgrageMeter(object):
    def __init__(self):
        self.reset()
        
    def reset(self):
        self.avg = 0
        self.sum = 0
        self.cnt = 0
        
    def update(self, val, n=1):
        self.sum += val * n
        self.cnt += n
        self.avg = self.sum / self.cnt

def accuracy(output, target, topk=(1,)):
    maxk = max(topk)
    batch_size = target.size(0)
    
    _, pred = output.topk(maxk, 1, True, True)
    pred = pred.t()
    correct = pred.eq(target.view(1, -1).expand_as(pred))
    
    res = []
    for k in topk:
        correct_k = correct[:k].view(-1).float().sum(0)
        res.append(correct_k.mul_(100.0/batch_size))
    return res

def get_threshold(score_file):
    with open(score_file, 'r') as file:
        lines = file.readlines()

    data = []
    count = 0.0
    num_real = 0.0
    num_fake = 0.0
    for line in lines:
        count += 1
        tokens = line.split()
        angle = float(tokens[0])
        #pdb.set_trace()
        type = int(tokens[1])
        data.append({'map_score': angle, 'label': type})
        if type==1:
            num_real += 1
        else:
            num_fake += 1

    min_error = count    # account ACER (or ACC)
    min_threshold = 0.0
    min_ACC = 0.0
    min_ACER = 0.0
    min_APCER = 0.0
    min_BPCER = 0.0
    
    
    for d in data:
        threshold = d['map_score']
        
        type1 = len([s for s in data if s['map_score'] <= threshold and s['label'] == 1])
        type2 = len([s for s in data if s['map_score'] > threshold and s['label'] == 0])
        
        ACC = 1-(type1 + type2) / count
        APCER = type2 / num_fake
        BPCER = type1 / num_real
        ACER = (APCER + BPCER) / 2.0
        
        if ACER < min_error:
            min_error = ACER
            min_threshold = threshold
            min_ACC = ACC
            min_ACER = ACER
            min_APCER = APCER
            min_BPCER = min_BPCER

    return min_threshold, min_ACC, min_APCER, min_BPCER, min_ACER



def test_threshold_based(threshold, score_file):
    with open(score_file, 'r') as file:
        lines = file.readlines()

    data = []
    count = 0.0
    num_real = 0.0
    num_fake = 0.0
    for line in lines:
        count += 1
        tokens = line.split()
        angle = float(tokens[0])
        type = int(tokens[1])
        data.append({'map_score': angle, 'label': type})
        if type==1:
            num_real += 1
        else:
            num_fake += 1
    
 
    type1 = len([s for s in data if s['map_score'] <= threshold and s['label'] == 1])
    type2 = len([s for s in data if s['map_score'] > threshold and s['label'] == 0])
    
    ACC = 1-(type1 + type2) / count
    APCER = type2 / num_fake
    BPCER = type1 / num_real
    ACER = (APCER + BPCER) / 2.0
    
    return ACC, APCER, BPCER, ACER


def get_err_threhold(fpr, tpr, threshold):
    RightIndex=(tpr+(1-fpr)-1); 
    right_index = np.argmax(RightIndex)
    best_th = threshold[right_index]
    err = fpr[right_index]

    differ_tpr_fpr_1=tpr+fpr-1.0
  
    right_index = np.argmin(np.abs(differ_tpr_fpr_1))
    best_th = threshold[right_index]
    err = fpr[right_index]    

    return err, best_th

def performances(map_score_val_filename, map_score_test_filename):

    # val 
    with open(map_score_val_filename, 'r') as file:
        lines = file.readlines()
    val_scores = []
    val_labels = []
    data = []
    count = 0.0
    num_real = 0.0
    num_fake = 0.0
    for line in lines:
        count += 1
        tokens = line.split()
        score = float(tokens[0])
        label = float(tokens[1])  #label = int(tokens[1])
        val_scores.append(score)
        val_labels.append(label)
        data.append({'map_score': score, 'label': label})
        if label==1:
            num_real += 1
        else:
            num_fake += 1
    
    fpr,tpr,threshold = roc_curve(val_labels, val_scores, pos_label=1)
    fnr = 1 - tpr
    val_err, val_threshold = get_err_threhold(fpr, tpr, threshold)
    
    type1 = len([s for s in data if s['map_score'] <= val_threshold and s['label'] == 1])
    type2 = len([s for s in data if s['map_score'] > val_threshold and s['label'] == 0])
    
    val_ACC = 1-(type1 + type2) / count
    val_APCER = type2 / num_fake
    val_BPCER = type1 / num_real
    val_ACER = (val_APCER + val_BPCER) / 2.0
    
    ## EER
    eer_threshold = threshold[np.nanargmin(np.absolute((fnr - fpr)))]
    val_EER = fpr[np.nanargmin(np.absolute((fnr - fpr)))]
    
    
    
    # test 
    with open(map_score_test_filename, 'r') as file2:
        lines = file2.readlines()
    test_scores = []
    test_labels = []
    data = []
    count = 0.0
    num_real = 0.0
    num_fake = 0.0
    for line in lines:
        count += 1
        tokens = line.split()
        score = float(tokens[0])
        label = float(tokens[1])    #label = int(tokens[1])
        test_scores.append(score)
        test_labels.append(label)
        data.append({'map_score': score, 'label': label})
        if label==1:
            num_real += 1
        else:
            num_fake += 1
    
    # test based on val_threshold     
    type1 = len([s for s in data if s['map_score'] <= val_threshold and s['label'] == 1])
    type2 = len([s for s in data if s['map_score'] > val_threshold and s['label'] == 0])
    
    test_ACC = 1-(type1 + type2) / max(count,1.0)
    test_APCER = type2 / max(num_fake,1.0)
    test_BPCER = type1 / max(num_real,1.0)
    test_ACER = (test_APCER + test_BPCER) / 2.0
    
    
    # test based on test_threshold     
    fpr_test,tpr_test,threshold_test = roc_curve(test_labels, test_scores, pos_label=1)
    fnr_test = 1 - tpr_test
    err_test, best_test_threshold = get_err_threhold(fpr_test, tpr_test, threshold_test)
    
    type1 = len([s for s in data if s['map_score'] <= best_test_threshold and s['label'] == 1])
    type2 = len([s for s in data if s['map_score'] > best_test_threshold and s['label'] == 0])
    
    test_threshold_ACC = 1-(type1 + type2) / max(count, 1.0)
    test_threshold_APCER = type2 / max(num_fake, 1.0)
    test_threshold_BPCER = type1 / max(num_real, 1.0)
    test_threshold_ACER = (test_threshold_APCER + test_threshold_BPCER) / 2.0
    
    ## EER
    eer_threshold = threshold_test[np.nanargmin(np.absolute((fnr_test - fpr_test)))]
    test_EER = fpr_test[np.nanargmin(np.absolute((fnr_test - fpr_test)))]
    
    return val_threshold, best_test_threshold, val_ACC, val_ACER, val_EER, test_ACC, test_APCER, test_BPCER, test_ACER, test_threshold_ACER, test_EER

In [18]:
def train_model():
    seed_everything(CFG['seed'])
    
    ## Log file
    log_file = open('_log.txt', 'w')

    print("Liveness Detection Zalo AI 2022:\n ")

    log_file.write('Liveness Detection Zalo AI 2022:\n ')
    log_file.flush()

    print('train from scratch!\n')
    log_file.write('train from scratch!\n')
    log_file.flush()
    
    ## train val test split - use train_test_split twice
    train, test = train_test_split(df, test_size=.20)
    val, test = train_test_split(test, test_size=.50)
    
    ## Model
    model = CDCN(basic_conv=Conv2d_cd, theta=CFG['theta'])
    model = model.to(device)
    
    ## Optimizer
    lr = CFG['lr']
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=CFG['weight_decay'])
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=CFG['step_size'], gamma=CFG['gamma'])
    
    ## Loss function
    criterion_absolute_loss = nn.MSELoss().to(device)
    criterion_contrastive_loss = Contrast_depth_loss().to(device)
    


    ACER_save = 1.0
    
    for epoch in range(CFG['epochs']):  # loop over the dataset multiple times
        if (epoch + 1) % CFG['step_size'] == 0:
            lr *= CFG['gamma']
        
        loss_absolute = AvgrageMeter()
        loss_contra =  AvgrageMeter()
        
        ###########################################
        '''                train             '''
        ###########################################
        model.train()
        
        # train dataloader
        train_transform = transforms.Compose([RandomErasing(), RandomHorizontalFlip(),  ToTensor(), Cutout(), Normaliztion()])
        train_data = TrainDataset(train, videos_dir, train_transform)
        dataloader_train = DataLoader(train_data, batch_size=CFG['train_bs'], shuffle=True, num_workers=CFG['num_workers'])

        for i, sample_batched in enumerate(dataloader_train):
            # get the inputs
            inputs, binary_mask, spoof_label = sample_batched['image_x'].to(device), sample_batched['binary_mask'].to(device), sample_batched['spoofing_label'].to(device)

            optimizer.zero_grad()

            
            # forward + backward + optimize
            map_x, embedding, x_Block1, x_Block2, x_Block3, x_input =  model(inputs)
            
            absolute_loss = criterion_absolute_loss(map_x, binary_mask)
            contrastive_loss = criterion_contrastive_loss(map_x, binary_mask)
            
            loss =  absolute_loss + contrastive_loss
             
            loss.backward()
            
            optimizer.step()
            
            n = inputs.size(0)
            loss_absolute.update(absolute_loss.data, n)
            loss_contra.update(contrastive_loss.data, n)           
        
        scheduler.step()
        
        # whole epoch average
        print('epoch:%d, Train:  Absolute_Depth_loss= %.4f, Contrastive_Depth_loss= %.4f\n' % (epoch + 1, loss_absolute.avg, loss_contra.avg))
        log_file.write('epoch:%d, Train: Absolute_Depth_loss= %.4f, Contrastive_Depth_loss= %.4f \n' % (epoch + 1, loss_absolute.avg, loss_contra.avg))
        log_file.flush()
           
            
        epoch_test = 1
        if epoch >= 0 and epoch % epoch_test == epoch_test-1:    
            model.eval()
            
            with torch.no_grad():
                ###########################################
                '''                val             '''
                ###########################################
                # val for threshold
                val_transform = transforms.Compose([Normaliztion_valtest(), ToTensor_valtest()])
                val_data = ValDataset(val, videos_dir, val_transform)
                dataloader_val = DataLoader(val_data, batch_size=CFG['valid_bs'], shuffle=False, num_workers=CFG['num_workers'])
                
                map_score_list = []
                
                for i, sample_batched in enumerate(dataloader_val):
                    # get the inputs
                    inputs, spoof_label = sample_batched['image_x'].to(device), sample_batched['spoofing_label'].to(device)
                    binary_mask = sample_batched['binary_mask'].to(device)
        
                    optimizer.zero_grad()
                    
                    
                    map_score = 0.0
                    for frame_t in range(inputs.shape[1]):
                        map_x, embedding, x_Block1, x_Block2, x_Block3, x_input =  model(inputs[:,frame_t,:,:,:].squeeze(dim=1))
                        score_norm = torch.mean(map_x)
                        map_score += score_norm
                    map_score = map_score/inputs.shape[1]
                    
                    if map_score>1:
                        map_score = 1.0
    
                    map_score_list.append('{} {}\n'.format(map_score, spoof_label[0][0]))
                    
                map_score_val_filename = '_map_score_val_%d.txt'% (epoch + 1)
                with open(map_score_val_filename, 'w') as file:
                    file.writelines(map_score_list) 
                    
                ###########################################
                '''                test             '''
                ###########################################
                # val for threshold
                test_transform = transforms.Compose([Normaliztion_valtest(), ToTensor_valtest()])
                test_data = ValDataset(test, videos_dir, val_transform)
                dataloader_test = DataLoader(test_data, batch_size=CFG['valid_bs'], shuffle=False, num_workers=CFG['num_workers'])
                
                map_score_list = []
                
                for i, sample_batched in enumerate(dataloader_test):
                    # get the inputs
                    inputs, spoof_label = sample_batched['image_x'].to(device), sample_batched['spoofing_label'].to(device)
                    binary_mask = sample_batched['binary_mask'].to(device)
        
                    optimizer.zero_grad()
                    
                    
                    map_score = 0.0
                    for frame_t in range(inputs.shape[1]):
                        map_x, embedding, x_Block1, x_Block2, x_Block3, x_input =  model(inputs[:,frame_t,:,:,:].squeeze(dim=1))
                        score_norm = torch.mean(map_x)
                        map_score += score_norm
                    map_score = map_score/inputs.shape[1]
                    
                    if map_score>1:
                        map_score = 1.0
    
                    map_score_list.append('{} {}\n'.format(map_score, spoof_label[0][0]))
                    
                map_score_test_filename = '_map_score_test_%d.txt'% (epoch + 1)
                with open(map_score_test_filename, 'w') as file:
                    file.writelines(map_score_list)
                
                #############################################################     
                #       performance measurement
                #############################################################     
                val_threshold, best_test_threshold, val_ACC, val_ACER, val_EER, test_ACC, test_APCER, test_BPCER, test_ACER, test_threshold_ACER, test_EER = performances(map_score_val_filename, map_score_test_filename)
                
                print('epoch:%d, Val:  val_threshold= %.4f, val_ACC= %.4f, val_ACER= %.4f, val_EER= %.4f' % (epoch + 1, val_threshold, val_ACC, val_ACER, val_EER))
                log_file.write('\n epoch:%d, Val:  val_threshold= %.4f, val_ACC= %.4f, val_ACER= %.4f, val_EER= %.4f \n' % (epoch + 1, val_threshold, val_ACC, val_ACER, val_EER))
              
                print('epoch:%d, Test:  ACC= %.4f, APCER= %.4f, BPCER= %.4f, ACER= %.4f, EER= %.4f' % (epoch + 1, test_ACC, test_APCER, test_BPCER, test_ACER, test_EER))
                print("="*20)
                print('')
                log_file.write('epoch:%d, Test:  ACC= %.4f, APCER= %.4f, BPCER= %.4f, ACER= %.4f, EER= %.4f \n' % (epoch + 1, test_ACC, test_APCER, test_BPCER, test_ACER, test_EER))
                
                log_file.flush()

            
            # save the model until the next improvement     
            torch.save(model.state_dict(), '_%d.pkl' % (epoch + 1))


    print('Finished Training')
    log_file.close()

## Main

In [19]:
if __name__ == "__main__":
    train_model()

Liveness Detection Zalo AI 2022:
 
train from scratch!

epoch:1, Train:  Absolute_Depth_loss= 0.3567, Contrastive_Depth_loss= 0.0331

epoch:1, Val:  val_threshold= 0.3941, val_ACC= 0.5254, val_ACER= 0.4741, val_EER= 0.4643
epoch:1, Test:  ACC= 0.5085, APCER= 0.5625, BPCER= 0.4074, ACER= 0.4850, EER= 0.5000

epoch:2, Train:  Absolute_Depth_loss= 0.2635, Contrastive_Depth_loss= 0.0188

epoch:2, Val:  val_threshold= 0.4834, val_ACC= 0.7627, val_ACER= 0.2362, val_EER= 0.2143
epoch:2, Test:  ACC= 0.6610, APCER= 0.4688, BPCER= 0.1852, ACER= 0.3270, EER= 0.4062

epoch:3, Train:  Absolute_Depth_loss= 0.2385, Contrastive_Depth_loss= 0.0088

epoch:3, Val:  val_threshold= 0.4890, val_ACC= 0.7288, val_ACER= 0.2702, val_EER= 0.2500
epoch:3, Test:  ACC= 0.6441, APCER= 0.5000, BPCER= 0.1852, ACER= 0.3426, EER= 0.3438

epoch:4, Train:  Absolute_Depth_loss= 0.2285, Contrastive_Depth_loss= 0.0074

epoch:4, Val:  val_threshold= 0.5070, val_ACC= 0.6949, val_ACER= 0.3041, val_EER= 0.3214
epoch:4, Test:  AC