In [None]:
!pip install tensorflow-addons

Collecting tensorflow-addons
  Downloading tensorflow_addons-0.16.1-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.1 MB)
[?25l[K     |▎                               | 10 kB 32.2 MB/s eta 0:00:01[K     |▋                               | 20 kB 40.8 MB/s eta 0:00:01[K     |▉                               | 30 kB 26.4 MB/s eta 0:00:01[K     |█▏                              | 40 kB 14.6 MB/s eta 0:00:01[K     |█▌                              | 51 kB 12.6 MB/s eta 0:00:01[K     |█▊                              | 61 kB 14.8 MB/s eta 0:00:01[K     |██                              | 71 kB 12.9 MB/s eta 0:00:01[K     |██▍                             | 81 kB 14.3 MB/s eta 0:00:01[K     |██▋                             | 92 kB 15.9 MB/s eta 0:00:01[K     |███                             | 102 kB 14.1 MB/s eta 0:00:01[K     |███▏                            | 112 kB 14.1 MB/s eta 0:00:01[K     |███▌                            | 122 kB 14.1 MB/s eta 0:00:01[K

In [None]:
import sys
import os
import numpy as np
import cv2
from sklearn.utils import shuffle
import tensorflow as tf
import tensorflow_addons as tfa
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report, accuracy_score, matthews_corrcoef, balanced_accuracy_score, precision_recall_fscore_support
import glob
from random import sample
from keras.models import load_model
from keras.callbacks import EarlyStopping
from keras.callbacks import ModelCheckpoint

In [None]:
def read_all_frames(video_path):
  frame_list = []
  video = cv2.VideoCapture(video_path)
  success = True
  while success:
    success,frame = video.read()
    if success == True:
      frame = cv2.resize(frame, (64, 64), interpolation = cv2.INTER_AREA) # size reduce karny ka hai 
      frame = cv2.resize(frame, (224, 224), interpolation = cv2.INTER_AREA)
      frame_list.append(frame)
    else:
      break
  return np.asarray(frame_list)

def my_frames(data_path, N = 2):
  # c_frames = np.empty((0, 240, 320, 3))
  c_frames = np.empty((0, 224, 224, 3))
  for file in os.listdir(data_path):
    if file.endswith('.mp4'): # FOR REPLAY-ATTACK AND REPLAY-MOBILE DATASET
      path = os.path.join(data_path, file)
      # al_frames = read_first_N_frames(path, N)
      al_frames = read_all_frames(path)
      total_frames = list(np.asarray(range(0,al_frames.shape[0],1)))
      selected_samples = sample(total_frames,N)
      selected_frames = al_frames[selected_samples]
      c_frames = np.concatenate((c_frames, selected_frames), axis = 0)
      # print(c_frames.shape[0])
  return c_frames

In [None]:
data_path_train_real = '/content/drive/MyDrive/Anti-Spoofing_datasets/Rose_Youtu/train/real/'
data_path_train_fixed = '/content/drive/MyDrive/Anti-Spoofing_datasets/Rose_Youtu/train/attack/'

data_path_devel_real = '/content/drive/MyDrive/Anti-Spoofing_datasets/Rose_Youtu/devel/real/'
data_path_devel_fixed = '/content/drive/MyDrive/Anti-Spoofing_datasets/Rose_Youtu/devel/attack/'

data_path_test_real = '/content/drive/MyDrive/Anti-Spoofing_datasets/Rose_Youtu/test/real/'
data_path_test_fixed = '/content/drive/MyDrive/Anti-Spoofing_datasets/Rose_Youtu/test/attack/'

In [None]:
def load_all_data_RM(data_path_train_real, data_path_train_fixed,
                  data_path_devel_real, data_path_devel_fixed,
                  data_path_test_real, data_path_test_fixed, Nr=2):
  train_real = my_frames(data_path_train_real, Nr)  # 25
  train_fixed = my_frames(data_path_train_fixed)
  
  yr = train_real.shape[0]
  ya = train_fixed.shape[0]
  y_real = np.zeros(yr, dtype=int)
  y_attack = np.ones(ya, dtype=int)
  x_train = np.concatenate((train_real, train_fixed), axis = 0)
  y_train = np.concatenate((y_real, y_attack), axis = 0)
  
  devel_real = my_frames(data_path_devel_real, Nr)
  devel_fixed = my_frames(data_path_devel_fixed)
  
  yrd = devel_real.shape[0]
  yad = devel_fixed.shape[0]
  yd_real = np.zeros(yrd, dtype=int)
  yd_attack = np.ones(yad, dtype=int)
  x_val = np.concatenate((devel_real, devel_fixed), axis = 0) 
  y_val = np.concatenate((yd_real, yd_attack), axis = 0)
  
  test_real = my_frames(data_path_test_real) #, Nr
  test_fixed = my_frames(data_path_test_fixed)
  
  ytr = test_real.shape[0]
  yta = test_fixed.shape[0] 
  yt_real = np.zeros(ytr, dtype=int)
  yt_attack = np.ones(yta, dtype=int)
  x_test = np.concatenate((test_real, test_fixed), axis = 0)
  y_test = np.concatenate((yt_real, yt_attack), axis = 0)
  
  return x_train, y_train, x_val, y_val, x_test, y_test

In [None]:
input_shape = (224, 224, 3)
num_classes = 2

In [None]:
x_train, y_train, x_val, y_val, x_test, y_test = load_all_data_RM(data_path_train_real, data_path_train_fixed,
                                                                  data_path_devel_real, data_path_devel_fixed,
                                                                  data_path_test_real, data_path_test_fixed, Nr=2)
print(x_train.shape, x_val.shape, x_test.shape)

(2796, 224, 224, 3) (700, 224, 224, 3) (3498, 224, 224, 3)


In [None]:
data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(0.02),
        layers.RandomWidth(0.2),
        layers.RandomHeight(0.2),
    ]
)

# Setting the state of the normalization layer.
data_augmentation.layers[0].adapt(x_train)

In [None]:
def create_encoder():
    resnet = keras.applications.ResNet50V2(
        include_top=False, weights='imagenet', input_shape=input_shape, pooling="avg"
    )
    for layer in resnet.layers:
        layer.trainable = trainable

    inputs = keras.Input(shape=input_shape)
    augmented = data_augmentation(inputs)
    outputs = resnet(augmented)
    model = keras.Model(inputs=inputs, outputs=outputs, name="casia-encoder")
    return model


encoder = create_encoder()
encoder.summary()

learning_rate = 0.001
batch_size = 64
hidden_units = 512
projection_units = 128
num_epochs = 100
dropout_rate = 0.5
temperature = 0.05

Model: "casia-encoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 sequential (Sequential)     (None, 224, 224, 3)       7         
                                                                 
 resnet50v2 (Functional)     (None, 2048)              23564800  
                                                                 
Total params: 23,564,807
Trainable params: 23,519,360
Non-trainable params: 45,447
_________________________________________________________________


In [None]:
def create_classifier(encoder, trainable=True):

    for layer in encoder.layers:
        layer.trainable = trainable

    inputs = keras.Input(shape=input_shape)
    features = encoder(inputs)
    features = layers.Dropout(dropout_rate)(features)
    features = layers.Dense(hidden_units, activation="relu")(features)
    features = layers.Dropout(dropout_rate)(features)
    outputs = layers.Dense(num_classes, activation="softmax")(features)

    model = keras.Model(inputs=inputs, outputs=outputs, name="RA-classifier")
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate),
        loss=keras.losses.SparseCategoricalCrossentropy(),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )
    return model

In [None]:
class SupervisedContrastiveLoss(keras.losses.Loss):
    def __init__(self, temperature=1, name=None):
        super(SupervisedContrastiveLoss, self).__init__(name=name)
        self.temperature = temperature

    def __call__(self, labels, feature_vectors, sample_weight=None):
        # Normalize feature vectors
        feature_vectors_normalized = tf.math.l2_normalize(feature_vectors, axis=1)
        # Compute logits
        logits = tf.divide(
            tf.matmul(
                feature_vectors_normalized, tf.transpose(feature_vectors_normalized)
            ),
            self.temperature,
        )
        return tfa.losses.npairs_loss(tf.squeeze(labels), logits)


def add_projection_head(encoder):
    inputs = keras.Input(shape=input_shape)
    features = encoder(inputs)
    outputs = layers.Dense(projection_units, activation="relu")(features)
    model = keras.Model(
        inputs=inputs, outputs=outputs, name="RA-encoder_with_projection-head"
    )
    return model

In [None]:
encoder = create_encoder()

encoder_with_projection_head = add_projection_head(encoder)
encoder_with_projection_head.compile(
    optimizer=keras.optimizers.Adam(learning_rate),
    loss=SupervisedContrastiveLoss(temperature),
)

encoder_with_projection_head.summary()

checkpoint = ModelCheckpoint('/content/drive/MyDrive/face_pad_results/RY_supervised_contrastive_encoder.h5',
                              verbose=0, monitor='val_loss',save_best_only=True, mode='auto')

history = encoder_with_projection_head.fit(
    x=x_train, y=y_train, batch_size=batch_size, epochs=num_epochs, validation_data = (x_val, y_val),
    callbacks = checkpoint)

Model: "RA-encoder_with_projection-head"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_5 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 casia-encoder (Functional)  (None, 2048)              23564807  
                                                                 
 dense (Dense)               (None, 128)               262272    
                                                                 
Total params: 23,827,079
Trainable params: 23,781,632
Non-trainable params: 45,447
_________________________________________________________________
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch

In [None]:
classifier = create_classifier(encoder, trainable=False)

checkpoint = ModelCheckpoint('/content/drive/MyDrive/face_pad_results/RY_supervised_contrastive.h5',
                              verbose=0, monitor='val_loss',save_best_only=True, mode='auto')

history = classifier.fit(x=x_train, y=y_train, batch_size=batch_size, epochs=num_epochs, validation_data = (x_val, y_val), callbacks = checkpoint)

classifier = load_model('/content/drive/MyDrive/face_pad_results/RY_supervised_contrastive.h5')

accuracy = classifier.evaluate(x_test, y_test)[1]
print(f"Test accuracy: {round(accuracy * 100, 2)}%")

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

In [None]:
## Define performance measures
def yoden_index(y, y_pred):
  tn, fp, fn, tp = confusion_matrix(y, y_pred).ravel()
  j = (tp/(tp+fn)) + (tn/(tn+fp)) - 1
  return j

def pmeasure(y_true, y_pred):
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    sensitivity = tp / (tp + fn)
    specificity = tn / (tn + fp)
    f1score = 2 * tp / (2 * tp + fp + fn)
    FAR = fp/(fp + tn)
    FRR = fn/(fn + tp)
    HTER = (FAR + FRR)/2
    return ({'Sensitivity': sensitivity, 'Specificity': specificity, 'F1-Score': f1score, 'HTER': HTER})

def Show_Statistics(msg,Stats):
  print(msg.upper())
  print(70*'-')
  print('Accuracy:',Stats[0])
  print('Sensitivity:',Stats[1])
  print('Specificity:',Stats[2])
  print('F1-Score:',Stats[3])
  print('HTER:',Stats[4])  
  print('Balance Accuracy:',Stats[5])
  print('Youden-Index:',Stats[6])
  print(70*'-')

In [None]:
y_train_pred = classifier.predict(x_train, batch_size=64, verbose=0)
y_train_pred = np.round(np.squeeze(y_train_pred))

y_val_pred = classifier.predict(x_val, batch_size=64, verbose=0)
y_val_pred = np.round(np.squeeze(y_val_pred))

y_test_pred = classifier.predict(x_test,batch_size=64, verbose=0)
y_test_pred = np.round(np.squeeze(y_test_pred))

In [None]:
from keras.utils.np_utils import to_categorical
y_train = to_categorical(y_train)
y_val = to_categorical(y_val)
y_test = to_categorical(y_test)

In [None]:
tn, fp, fn, tp = confusion_matrix(y_train.argmax(axis=1), y_train_pred.argmax(axis=1)).ravel()
print(tn, fp, fn, tp)

acc_train = accuracy_score(y_train, y_train_pred)
Y_I_train =(tp/(tp+fn)) + (tn/(tn+fp)) - 1
sensitivity_train = tp / (tp + fn)
specificity_train = tn / (tn + fp)
f1score_train = 2 * tp / (2 * tp + fp + fn)
FAR = fp/(fp + tn)
FRR = fn/(fn + tp)
HTER_train = (FAR + FRR)/2
train_bacc = balanced_accuracy_score(y_train.argmax(axis=1), y_train_pred.argmax(axis=1))
print('Training Results')
print(70*'-')
print('Acc:', acc_train,'YI:', Y_I_train, 'Sen:', sensitivity_train, 'Spe:', specificity_train, '\n F1:', f1score_train, 'HTER:', HTER_train, 'BACC:', train_bacc)

In [None]:
tn, fp, fn, tp = confusion_matrix(y_val.argmax(axis=1), y_val_pred.argmax(axis=1)).ravel()
print(tn, fp, fn, tp)

acc_val = accuracy_score(y_val, y_val_pred)
Y_I_val =(tp/(tp+fn)) + (tn/(tn+fp)) - 1
sensitivity_val = tp / (tp + fn)
specificity_val = tn / (tn + fp)
f1score_val = 2 * tp / (2 * tp + fp + fn)
FAR = fp/(fp + tn)
FRR = fn/(fn + tp)
HTER_val = (FAR + FRR)/2
val_bacc = balanced_accuracy_score(y_val.argmax(axis=1), y_val_pred.argmax(axis=1))
print('Validation Results')
print(70*'-')
print('Acc:', acc_val,'YI:', Y_I_val, 'Sen:', sensitivity_val, 'Spe:', specificity_val, '\n F1:', f1score_val, 'HTER:', HTER_val, 'BACC:', val_bacc)

162 18 9 511
Validation Results
----------------------------------------------------------------------
Acc: 0.9614285714285714 YI: 0.8826923076923077 Sen: 0.9826923076923076 Spe: 0.9 
 F1: 0.9742612011439467 HTER: 0.058653846153846154 BACC: 0.9413461538461538


In [None]:
tn, fp, fn, tp = confusion_matrix(y_test[0:4500].argmax(axis=1), y_test_pred.argmax(axis=1)).ravel()
print(tn, fp, fn, tp)

acc_val = accuracy_score(y_test[0:4500], y_test_pred)
Y_I_val =(tp/(tp+fn)) + (tn/(tn+fp)) - 1
sensitivity_val = tp / (tp + fn)
specificity_val = tn / (tn + fp)
f1score_val = 2 * tp / (2 * tp + fp + fn)
FAR = fp/(fp + tn)
FRR = fn/(fn + tp)
HTER_val = (FAR + FRR)/2
val_bacc = balanced_accuracy_score(y_test[0:4500].argmax(axis=1), y_test_pred.argmax(axis=1))
print('Testing Results')
print(70*'-')
print('Acc:', acc_val,'YI:', Y_I_val, 'Sen:', sensitivity_val, 'Spe:', specificity_val, '\n F1:', f1score_val, 'HTER:', HTER_val, 'BACC:', val_bacc)

682 216 68 2532
Testing Results
----------------------------------------------------------------------
Acc: 0.9188107489994283 YI: 0.7333116326880247 Sen: 0.9738461538461538 Spe: 0.7594654788418709 
 F1: 0.9468960359012715 HTER: 0.13334418365598766 BACC: 0.8666558163440123
682 216 68 2532
Testing Results
----------------------------------------------------------------------
Acc: 0.9188107489994283 YI: 0.7333116326880247 Sen: 0.9738461538461538 Spe: 0.7594654788418709 
 F1: 0.9468960359012715 HTER: 0.13334418365598766 BACC: 0.8666558163440123
