# Facial Anti-Spoofing using Deep Neural Network Approaches

In [5]:
import os
import shutil
from sklearn.model_selection import train_test_split

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np

## Data Splitting

In [7]:
data_dir = "/Users/jasminecjwchen/Documents/GitHub/COMS-4995-ACV-Project/preprocessed_data"
test_dir = "/Users/jasminecjwchen/Documents/GitHub/COMS-4995-ACV-Project/unseen_data"
output_dir = '/Users/jasminecjwchen/Documents/GitHub/COMS-4995-ACV-Project/split_data'

In [8]:
def create_splits(data_dir=data_dir, test_dir = test_dir, output_dir=output_dir):
    """
    Create data splits for training, validation, and testing.

    Parameters:
    - data_dir: Path to the directory containing the 'live' and 'spoof' subdirectories.
    - output_dir: Path to the directory where the splits will be saved.
    - split_ratio: A tuple indicating the split ratio for training, validation, and testing sets.
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
        os.makedirs(os.path.join(output_dir, 'train', 'live'))
        os.makedirs(os.path.join(output_dir, 'train', 'spoof'))
        os.makedirs(os.path.join(output_dir, 'val', 'live'))
        os.makedirs(os.path.join(output_dir, 'val', 'spoof'))
        os.makedirs(os.path.join(output_dir, 'test', 'live'))
        os.makedirs(os.path.join(output_dir, 'test', 'spoof'))

    for category in ['live', 'spoof']:
        files = os.listdir(os.path.join(data_dir, category))
        train_files, val_files = train_test_split(files, test_size=0.2, random_state=42)
        test_files = os.listdir(os.path.join(test_dir, category))

        for file in train_files:
            shutil.copy(os.path.join(data_dir, category, file), os.path.join(output_dir, 'train', category, file))
        for file in val_files:
            shutil.copy(os.path.join(data_dir, category, file), os.path.join(output_dir, 'val', category, file))
        for file in test_files:
            shutil.copy(os.path.join(data_dir, category, file), os.path.join(output_dir, 'test', category, file))

In [9]:
# WARNING: this copies all the images to a new folder so be cautious...! :(
create_splits()

## Preparing Data Loaders

In [11]:
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from torchvision import transforms, datasets

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

train_dataset = datasets.ImageFolder(root=os.path.join(output_dir, 'train'), transform=transform)
val_dataset = datasets.ImageFolder(root=os.path.join(output_dir, 'val'), transform=transform)
test_dataset = datasets.ImageFolder(root=os.path.join(output_dir, 'test'), transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

## Defining the basic CNN Model -- First Try, Optimizing for Recall

In [12]:
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.fc1 = nn.Linear(64 * 56 * 56, 512)  
        self.fc2 = nn.Linear(512, 2)  

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 64 * 56 * 56) 
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

model = CNNModel()

## Training

In [13]:
import numpy as np

num_epochs = 10
patience = 2 
best_val_loss = np.inf
patience_counter = 0 

criterion = nn.CrossEntropyLoss()  # should experiment with other loss functions...
optimizer = optim.Adam(model.parameters(), lr=0.001)  # experiment with learning rate and more...

In [14]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device...") 

Using cpu device...


In [15]:
import torch.nn.functional as F
import torch
import numpy as np
from sklearn.metrics import recall_score, precision_score, f1_score, roc_auc_score
from torch.nn.functional import softmax

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * inputs.size(0)
    
    epoch_loss = running_loss / len(train_loader.dataset)
    
    model.eval()
    val_running_loss = 0.0
    true_labels = []
    pred_labels = []
    pred_probs = []  # Store predicted probabilities for AUC computation
    
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_running_loss += loss.item() * inputs.size(0)
            
            probs = softmax(outputs, dim=1) # softmax for ROC-AUC
            pred_probs.extend(probs[:, 1].cpu().numpy())  
            _, preds = torch.max(outputs, 1)
            true_labels.extend(labels.cpu().numpy())
            pred_labels.extend(preds.cpu().numpy())
    
    val_loss = val_running_loss / len(val_loader.dataset)
    val_recall = recall_score(true_labels, pred_labels, average='binary')
    val_precision = precision_score(true_labels, pred_labels, average='binary')
    val_f1 = f1_score(true_labels, pred_labels, average='binary')
    val_roc_auc = roc_auc_score(true_labels, pred_probs)  
    
    print(f'Epoch {epoch+1}/{num_epochs}, Training Loss: {epoch_loss:.4f}, Validation Loss: {val_loss:.4f}, Recall: {val_recall:.4f}, Precision: {val_precision:.4f}, F1 Score: {val_f1:.4f}, ROC-AUC: {val_roc_auc:.4f}')
    
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        torch.save(model.state_dict(), 'best_model.pth')
    else:
        patience_counter += 1
    
    if patience_counter >= patience:
        print("Early stopping triggered.")
        break

Epoch 1/10, Training Loss: 0.2948, Validation Loss: 0.0875, Recall: 0.9843, Precision: 0.9570, F1 Score: 0.9705, ROC-AUC: 0.9944
Epoch 2/10, Training Loss: 0.0535, Validation Loss: 0.0711, Recall: 0.9903, Precision: 0.9629, F1 Score: 0.9764, ROC-AUC: 0.9963
Epoch 3/10, Training Loss: 0.0255, Validation Loss: 0.0816, Recall: 0.9665, Precision: 0.9833, F1 Score: 0.9749, ROC-AUC: 0.9968
Epoch 4/10, Training Loss: 0.0173, Validation Loss: 0.0992, Recall: 0.9689, Precision: 0.9757, F1 Score: 0.9723, ROC-AUC: 0.9963
Early stopping triggered.


In [16]:
model = CNNModel()
model.load_state_dict(torch.load('best_model.pth'))
model.eval()

CNNModel(
  (conv1): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (fc1): Linear(in_features=200704, out_features=512, bias=True)
  (fc2): Linear(in_features=512, out_features=2, bias=True)
)

In [23]:
with torch.no_grad():
    running_corrects = 0
    for images, labels in test_loader:
        outputs = model(images)
        _, preds = torch.max(outputs, 1)
        running_corrects += torch.sum(preds == labels.data)
    print("test accuracy", (running_corrects.double() / len(test_dataset)).item())

test accuracy 0.9722315155570425
