In [1]:
import cv2
import os
import matplotlib.pyplot as plt
import cachetools
import random 
import numpy as np
import tensorflow as tf


2024-07-07 12:34:30.256013: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:

@cachetools.cached(cache={})
def read_video(path: str, frame_index: int = None, min_brightness: int = 10):
    video = cv2.VideoCapture(path)
    if not video.isOpened():
        raise IOError(f"Error opening video file: {path}")
    

    
    frames = []
    def normalize(img):
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) 
        img = cv2.normalize(img, None, 0, 1, cv2.NORM_MINMAX, cv2.CV_8UC1)
        img = cv2.resize(img, (480, 640), None)
        return img 
    
    while video.isOpened():
        ret, frame = video.read()
        frames.append(frame)
        if not ret:
            break
        if len(frames) >= frame_index and np.average(frame) > min_brightness:
            return normalize(frame)
    if frame_index is None:
        frame_index = len(frames) // 2
        while np.average(frames[frame_index]) < min_brightness:
            frame_index += 1
        return normalize(frames[frame_index]) 
    raise ValueError("frame not found")
REAL_FILES = (
    "1.avi",
    "2.avi",
    "HR_1.avi",
    "HR_4.avi",
)
def read_dir(path: str):
    real_images = []
    spoof_images = []
    for file in os.listdir(path):
        if os.path.isdir(os.path.join(path, file)):
            r, s = read_dir(os.path.join(path, file))
            real_images.extend(r)
            spoof_images.extend(s)
        else:
            if file.endswith(".avi"):
                obj = (os.path.join(path, file))
                if file in REAL_FILES:
                    real_images.append(obj)
                    
                else:
                    spoof_images.append(obj)
    return real_images, spoof_images


def show_data(ds, rows, cols):
    random.shuffle(ds)
    for i in range(rows):
        for j in range(cols):
            ind = i * cols + j 
            plt.subplot(rows, cols, ind + 1)
            plt.imshow(ds[ind])
            plt.axis("off")


def load_data(path: str, show = False):
    @cachetools.cached(cache={})
    def inner(path):
        r, s = read_dir(path)
        real = [read_video(i, frame_index=1) for i in r]
        spoof = [read_video(i, frame_index=1) for i in s]
        return real, spoof
    real, spoof = inner(path)
    if show:
        show_data(spoof, 4, 3)
        plt.figure()
        show_data(real, 4, 3)
    ds = [(i, 0) for i in real] + [(i, 1) for i in spoof]
    random.shuffle(ds)
    x = np.array([i[0] for i in ds])
    y = np.array([i[1] for i in ds])
    return x, y

In [3]:
x_train, y_train = load_data("./datasets/train_release")
x_test, y_test = load_data("./datasets/test_release")

In [5]:
# https://openaccess.thecvf.com/content_cvpr_2018/papers/Liu_Learning_Deep_Models_CVPR_2018_paper.pdf
mobilenet = tf.keras.applications.MobileNet(
    include_top=False,
    weights='imagenet',
    input_tensor=None,
    input_shape=(640, 480, 3),
    
    pooling=None,
    classes=2,
    classifier_activation='softmax'
)
for l in mobilenet.layers[:len(mobilenet.layers)]:
    l.trainable = False

model = tf.keras.Sequential()
model.add(mobilenet)
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dropout(rate = 0.8))
model.add(tf.keras.layers.Dense(2, activation='softmax'))

model.compile(
    optimizer=tf.keras.optimizers.Adam(0.001),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)



  mobilenet = tf.keras.applications.MobileNet(
2024-07-07 12:35:31.843977: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-07-07 12:35:31.980980: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-07-07 12:35:31.981299: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:998] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.

In [6]:
def add_salt_pepper(img, rate = 0.2): 
    
    w, h, _ = img.shape 
    noised_image = img.copy()
    white = (255, 255, 255)
    black = (0, 0, 0)
    for i in range(w):
        for j in range(h):
            if random.random() > rate:
                continue
            noised_image[i, j] = white if random.random() > 0.5 else black 
    return noised_image

noised_image = [add_salt_pepper(i) for i in x_train]
blurred_image = [cv2.GaussianBlur(i, (13, 13), 10) for i in x_train]

augmented_x = np.concatenate([x_train, noised_image, blurred_image])
augmented_y = np.concatenate([y_train, y_train, y_train])
ind = list(range(len(augmented_x)))
random.shuffle(ind)
x_train_final = [None for _ in range(len(augmented_x))]
y_train_final = [None for _ in range(len(augmented_x))]
for i in range(len(ind)):
    x_train_final[i] = augmented_x[ind[i]]
    y_train_final[i] = augmented_y[ind[i]]


In [8]:
x_train = np.array(x_train)
y_train = np.array(y_train)
x_train_final = np.array(x_train_final)
y_train_final = np.array(y_train_final)
x_test = np.array(x_test)
y_test = np.array(y_test)
print(x_train.shape, x_train_final.shape)
print(y_train.shape, y_train_final.shape)
print(x_test.shape, y_test.shape)
model.fit(x_train_final, y_train_final, epochs=20, validation_data=(x_test,y_test), batch_size=12)


# print(len(x_train_final), len(y_train_final), len(x_train))

(240, 640, 480, 3) (720, 640, 480, 3)
(240,) (720,)
(360, 640, 480, 3) (360,)
Epoch 1/20
[1m60/60[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 101ms/step - accuracy: 0.9824 - loss: 0.2598 - val_accuracy: 0.9250 - val_loss: 2.5163
Epoch 2/20
[1m60/60[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 92ms/step - accuracy: 0.9742 - loss: 0.4299 - val_accuracy: 0.9083 - val_loss: 2.7739
Epoch 3/20
[1m60/60[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 90ms/step - accuracy: 0.9838 - loss: 0.3909 - val_accuracy: 0.9028 - val_loss: 3.0288
Epoch 4/20
[1m60/60[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 84ms/step - accuracy: 0.9861 - loss: 0.1557 - val_accuracy: 0.9222 - val_loss: 3.0507
Epoch 5/20
[1m60/60[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 91ms/step - accuracy: 0.9786 - loss: 0.3279 - val_accuracy: 0.8861 - val_loss: 4.5290
Epoch 6/20
[1m60/60[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 92ms/step - accuracy: 0.9711 - loss: 0.52

<keras.src.callbacks.history.History at 0x7663ea7b8bf0>