In [None]:
%pip install facenet-pytorch pandas tqdm scikit-learn opencv-python opencv-contrib-python

In [None]:
# Misc
import os
import time
import random
import numpy as np
import pandas as pd
import pickle

from tqdm import tqdm
import ipywidgets as widgets
import matplotlib.pyplot as plt 
from IPython.display import Image, display, clear_output

# MTCNN and relevant packages
import torch
from torchvision import datasets
from facenet_pytorch import MTCNN
from torchvision import transforms
from torch.utils.data import DataLoader

# Statistical models and relevant packages
from sklearn.svm import SVC
from scipy.stats import mode
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score
from sklearn.mixture import GaussianMixture
from sklearn.model_selection import GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.base import BaseEstimator, ClusterMixin

DATA_DIR = "data"
RNG_SEED = 42
BATCH_SIZE = 8
BOVW_CLUSTERS = 500

random.seed(RNG_SEED)
torch.manual_seed(RNG_SEED)
np.random.seed(RNG_SEED)

# Check M1 support
if torch.backends.mps.is_available():
    device = torch.device('mps')
elif torch.cuda.is_available():
    device = torch.device('cuda:0')
else:
    device = torch.device('cpu')
print('Running on device: {}'.format(device))

# Define Dataset

## Proccess raw images zip into a usable dataset

### DISCLAIMER
The following code is shown for posterity, but might not function as intend, as all images have been labeled and the full dataset is now on GitHub. Run at your own risk.

In [None]:
try:
    # create testing folder
    os.makedirs(DATA_DIR)

    # create label folders
    os.makedirs(f"{DATA_DIR}/face")
    os.makedirs(f"{DATA_DIR}/no_face")
except:
    print("Folders already exist.")

In [None]:
labels = pd.read_csv("labels.csv")

# Get all image names

files = []
labeled_files = set(labels["filename"].values)
for (dirpath, dirnames, filenames) in os.walk(DATA_DIR):
    files.extend(filenames)
    break

unmoved = labels[labels["filename"].isin(files)]
files = [file for file in files if file not in labeled_files]

### Labeling UI

In [None]:
face_bttn = widgets.Button(description="Face")
no_face_bttn = widgets.Button(description="No Face")
out = widgets.Output()

count = [0]

curr_file = ''

def face_bttn_clicked(_):
    d = {'filename': files[0],
                   'label': 'face'}
    files.pop(0)
    labels.loc[len(labels)] = d

    show_widgets()
        
face_bttn.on_click(face_bttn_clicked)

def no_face_clicked(_):
    d = {'filename': files[0],
                   'label': 'no face'}
    files.pop(0)
    labels.loc[len(labels)] = d

    show_widgets()

no_face_bttn.on_click(no_face_clicked)

def show_widgets():
    clear_output(wait=True)
    buttons = widgets.HBox([face_bttn, no_face_bttn])
    
    image = widgets.Image(
        value=Image(filename=f"/{DATA_DIR}/{files[0]}").data,
        format="webp",
        width=300,
        height=300
    )
    
    text = widgets.Text(f"Total labeled: {len(labels)}")
    
    display(widgets.VBox([buttons, text, image, out]))
    
    
# !!!DISCLAIMER!!!
# This line throws an error at the moment because there are no (accessable) unlabeled images.
# show_widgets()

In [None]:
def move_files(row):
    filename = row["filename"]
    label = row["label"].replace(" ", "_")
    
    os.rename(f"data/{filename}", f"{DATA_DIR}/{label}/{filename}")

faces = unmoved[unmoved["label"] == 'face']
no_faces = unmoved[unmoved["label"] == 'no face']

try:
    faces.apply(move_files, axis=1)
    no_faces.apply(move_files, axis=1)
    
    print("Moved files to relevant folders")
except:
    print("Images are already moved")

## Define Loaders

In [None]:
transform = transforms.Compose([
    transforms.Resize((160, 160)),
])

tensor_transform = transforms.Compose([
    transforms.Resize((160, 160)),
    transforms.ToTensor()
])


def collate_fn(batch):
    images, labels = zip(*batch)
    return list(images), list(labels)

base_train = datasets.ImageFolder(f"{DATA_DIR}/train", transform=transform)
base_test = datasets.ImageFolder(f"{DATA_DIR}/test", transform=transform)
tensor_train = datasets.ImageFolder(f"{DATA_DIR}/train", transform=tensor_transform)
tensor_test = datasets.ImageFolder(f"{DATA_DIR}/test", transform=tensor_transform)

base_loader_train = DataLoader(base_train, collate_fn=collate_fn, batch_size=BATCH_SIZE, shuffle=True)
base_loader_test = DataLoader(base_test, collate_fn=collate_fn, batch_size=BATCH_SIZE, shuffle=True)
tensor_loader_train = DataLoader(tensor_train, collate_fn=collate_fn, batch_size=BATCH_SIZE, shuffle=True)
tensor_loader_test = DataLoader(tensor_test, collate_fn=collate_fn, batch_size=BATCH_SIZE, shuffle=True,)

In [None]:
# This cell was ran multiple times to get an equal distribution of examples from both classes

fig, axes = plt.subplots(2, 3, figsize=(8, 6))

n = 10
i = 0

axes = axes.flatten()
ids = random.sample(range(len(base_test) + 1), len(axes))
images = [base_test[i] for i in ids]

for ax, (img, label) in zip(axes, images):
    
    ax.imshow(img)
    ax.set_title('face' if label == 0 else 'no face')
    ax.axis("off")

# Define MTCNN baseline
We use the default params for now

In [None]:
mtcnn = MTCNN(
    image_size=160, margin=0, min_face_size=20,
    thresholds=[0.6, 0.7, 0.7], factor=0.709, post_process=True,
    keep_all=True, device=device
)

## Testing MTCNN accuracy with manually labelled data

In [None]:
y_pred = []
y_true = []

for X, Y in tqdm(base_loader_test):
    for i in range(0, len(X)):
        x = X[i]
        y = Y[i]

        x_aligned, probs = mtcnn(x, return_prob=True)

        y_p = 0 if x_aligned is not None else 1
        
        y_pred.append(y_p)
    y_true.extend(Y)               

accuracy_score(y_true, y_pred)

# Preprocessing

You can run the following cells to generate the necessary data or use the precomputed data provided in `/processed_data`

In [None]:
def preprocess_data(loader):
    X = []
    y = []
    for images, labels in tqdm(loader, desc="Flattening data"):
        # Convert images to numpy arrays and flatten
        images_flat = [np.array(img).flatten() for img in images]
        X.extend(images_flat)
        y.extend(labels)
    return np.array(X), np.array(y)

X_train_flat, y_train = preprocess_data(base_loader_train)
X_test_flat, y_test = preprocess_data(base_loader_test)

## PCA

In [None]:
scaler = StandardScaler()

X_train_scaled = scaler.fit_transform(X_train_flat)
X_test_scaled = scaler.fit_transform(X_test_flat)

In [None]:
pca = PCA(n_components=50)

X_train_pca = pca.fit_transform(X_train_scaled)
X_test_pca = pca.fit_transform(X_test_scaled)

In [None]:
explained_variance_ratio = np.array(pca.explained_variance_ratio_)

cumulative_variance = np.cumsum(explained_variance_ratio)

cumulative_variance

## SIFT

In [None]:
import cv2

# Create SIFT extractor
sift = cv2.SIFT_create()

In [None]:
def tensor_to_opencv_img(tensor_img):
    """
    Convert a single image from a PyTorch tensor (C,H,W) to a NumPy array (H,W) or (H,W,3).
    We'll convert to grayscale for SIFT.
    """
    # tensor_img shape: (3, H, W) if color
    # Move to CPU, convert to numpy
    img_np = tensor_img.cpu().numpy()

    # img_np shape is (3, H, W). We can convert to (H, W, 3) by transposing
    img_np = np.transpose(img_np, (1, 2, 0))  # (H, W, 3)

    # Convert to uint8 [0..255] if necessary
    img_np = (img_np * 255.0).clip(0, 255).astype(np.uint8)

    # Convert to grayscale
    gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY)
    return gray

def extract_descriptors_from_dataloader(dataloader):
    """
    Loop through an entire DataLoader, extract SIFT descriptors for each image.
    """
    descriptors_per_image = []
    labels_list = []

    # Assume we already have train_loader that yields (images, labels)
    for images, labels in tqdm(dataloader):
        # images shape: (batch_size, 3, H, W)
        # labels shape: (batch_size,)
        
        batch_size = len(images)
        for i in range(batch_size):
            # Convert one image to grayscale OpenCV format
            gray_img = tensor_to_opencv_img(images[i])
            # Extract SIFT descriptors
            kp, desc = sift.detectAndCompute(gray_img, None)
            if desc is not None:
                descriptors_per_image.append(desc)
            else:
                # Some images might have no descriptors
                descriptors_per_image.append(np.zeros((0,128), dtype=np.float32))

            # We also keep the label so we can match it up later
            labels_list.append(labels[i])

    return descriptors_per_image, labels_list

def build_bovw_histogram(descriptors, kmeans_model):
    """
    Given SIFT descriptors (num_keypoints,128) for ONE image,
    assign each descriptor to the nearest cluster and build a histogram of size BOVW_CLUSTERS.
    """
    hist = np.zeros((BOVW_CLUSTERS), dtype=np.float32)
    if descriptors is None or len(descriptors) == 0:
        return hist  # no keypoints => zero histogram

    words = kmeans_model.predict(descriptors)
    for w in words:
        hist[w] += 1

    return hist

In [None]:
print("Collecting SIFT descriptors from train_loader...")
all_descriptors, all_labels = extract_descriptors_from_dataloader(tensor_loader_train)
print(f"Collected descriptors from {len(all_descriptors)} training images.")

# Stack all descriptors into one large array for K-Means (excluding empty ones)
desc_nonempty = [d for d in all_descriptors if d.shape[0] > 0]
if len(desc_nonempty) > 0:
    all_train_desc = np.vstack(desc_nonempty)
else:
    all_train_desc = np.zeros((0, 128), dtype=np.float32)

In [None]:
from sklearn.cluster import KMeans

if all_train_desc.shape[0] == 0:
    print("No descriptors found in training set! Can't build K-Means.")
    exit()

print(f"Running K-Means on {all_train_desc.shape[0]} descriptors with {BOVW_CLUSTERS} clusters...")
kmeans = KMeans(n_clusters=BOVW_CLUSTERS, random_state=RNG_SEED, verbose=1)
kmeans.fit(all_train_desc)
print("K-Means done.")

In [None]:
from sklearn.preprocessing import normalize

X_train_sift = []
y_train = []

idx = 0
print("Building BoVW histograms for training set...")
for desc in tqdm(all_descriptors):
    hist = build_bovw_histogram(desc, kmeans)
    X_train_sift.append(hist)
    y_train.append(all_labels[idx])
    idx += 1

X_train_sift = np.array(X_train_sift, dtype=np.float32)
y_train = np.array(y_train, dtype=np.int64)
X_train_sift = normalize(X_train_sift, norm='l2', axis=1)

print("Train BoVW shape:", X_train_sift.shape)  # (num_train_images, NUM_CLUSTERS)

In [None]:
print("Extracting SIFT descriptors from test_loader...")
test_descriptors_list, y_train_sift = extract_descriptors_from_dataloader(tensor_loader_test)

print("Building BoVW histograms for the test set...")
X_test_sift = []
for desc in tqdm(test_descriptors_list):
    hist = build_bovw_histogram(desc, kmeans)
    X_test_sift.append(hist)

X_test_sift = np.array(X_test_sift, dtype=np.float32)
X_test_sift = normalize(X_test_sift, norm='l2', axis=1)

# Models

In [None]:
class GMMClassifier(ClusterMixin, BaseEstimator):
    def __init__(self, n_components=2, covariance_type='full', init_params='kmeans', reg_covar=1e-06, random_state=42):
        self.n_components = n_components
        self.covariance_type = covariance_type
        self.init_params = init_params
        self.reg_covar = reg_covar
        
        self.random_state = random_state

        self.gmm = GaussianMixture(n_components=n_components, covariance_type=covariance_type, random_state=random_state)

    def fit(self, X, y=None):
        self.gmm.fit(X)
        return self

    def predict(self, X):
        y_pred = self.gmm.predict(X)
        return y_pred

    def score(self, X, y):
        y_pred = self.predict(X)
        mapped_preds = np.zeros_like(y)

        for cluster in range(self.n_components):
            mask = y_pred == cluster
            if np.any(mask):
                mapped_preds[mask] = mode(y[mask])[0]

        return accuracy_score(y, mapped_preds)

In [None]:
def run_experiment(make_model):
    """
    Runs experiment on model with provided initialization function.
    
    Inputs:
    - make_model: function returning appropriate model
    """
    
    features = ['pca', 'sift']
    
    y_train = np.load("processed_data/y_train.npy")
    y_test = np.load("processed_data/y_test.npy")
    
    print("Starting auto-generated data experiment")

    for feature in features:
        with open(f"processed_data/X_train_{feature}.npy", "rb") as f:
            X = np.load(f)

        print(f"Training with {feature} features with shape: {X.shape}")
        model = make_model()
        scores = cross_val_score(model, X, y_train, cv=5, n_jobs=-1, scoring='accuracy', verbose=1)
        print(f"Finished Cross-validation with with {feature} features.\nCross-val Accuracy: {scores.mean():.2f} ({scores.std():.2f} std)")

        with open(f"processed_data/X_test_{feature}.npy", "rb") as f:
            X_test = np.load(f)

        model.fit(X, y_train)
        accuracy = model.score(X_test, y_test)
        print(f"Accuracy against manual labels: {accuracy:.2f}")
        
    
    print("Starting manual data experiment")
    for feature in features:
        with open(f"processed_data/X_test_{feature}.npy", "rb") as f:
            X = np.load(f)

        print(f"Training with {feature} features with shape: {X.shape}")
        model = make_model()
        scores = cross_val_score(model, X, y_test, cv=5, n_jobs=-1, scoring='accuracy', verbose=1)

        print(f"Finished Cross-validation with with {feature} features.\nCross-val Accuracy: {scores.mean():.2f} ({scores.std():.2f} std)")

In [None]:
def make_logistic():
    return LogisticRegression(random_state=RNG_SEED)

def make_svm():
    return SVC(random_state=RNG_SEED)

def make_gaussian():
    return GMMClassifier(random_state=RNG_SEED)

print("Running logistic")
run_experiment(make_logistic)
print("Running SVM")
run_experiment(make_svm)
print("Running Gaussian")
run_experiment(make_gaussian)

# Hyperparameter tuning

In [None]:
y = np.load("processed_data/y_train.npy")
y_test = np.load("processed_data/y_test.npy") 
estimators = [
    ('logi', make_logistic(), "pca"),
    # ('svm', make_svm(), "pca"),
    ('gauss', make_gaussian(), "sift"),
]

parameters = [
    # Logi
    {
        "penalty": [None, "l2", "l1", "elasticnet"],
        "C": [0.25, 0.50, 0.75, 1.0],
        "fit_intercept": [True, False],
        "l1_ratio": [0.0, 0.25, 0.50, 0.75, 1.0],
        "tol": [1e-03, 1e-04, 1e-05],
    },
    # SVM
    {},
    # Gauss
    {
        "covariance_type": ["full", "tied", "diag", "spherical"],
        "init_params": ["kmeans", "k-means++", "random", "random_from_data"],
        "reg_covar": [1e-07, 1e-06, 1e-05, 1e-04],
    },
]

In [None]:
for (name, estimator, feature), params in zip(estimators, parameters):
    print(f"Running grid search for {name} with {feature}")
    
    X = np.load(f"processed_data/X_train_{feature}.npy")
    clf = GridSearchCV(estimator, params, n_jobs=-1)
    
    clf.fit(X, y)
    
    print(f"Best params: {clf.best_params_}\n Best score: {clf.best_score_}")
    
    with open(f'{name}-grid_search.pkl', 'wb') as f:
        pickle.dump(clf, f)
        
    # Get manual accuracy
    X = np.load(f"processed_data/X_test_{feature}.npy")
    
    estimator = clf.best_estimator_
    accuracy = estimator.score(X, y_test)
    
    print(f"Accuracy on manual set with optimal parameters: {accuracy}")

# Time experiment

In [None]:
estimators = [('results/logi-grid_search.pkl', 'processed_data/X_test_pca.npy'), ('results/gauss-grid_search.pkl', 'processed_data/X_test_sift.npy')]

y = np.load("processed_data/y_test.npy")
for obj, dataset in estimators:
    print(f"Using {obj}")
    
    with open(obj, "rb") as f:
        clf: GridSearchCV = pickle.load(f)
        
    with open(dataset, "rb") as f:
        X = np.load(f)

    estimator = clf.best_estimator_
    correct = 0
    times = []
    
    for image, label in zip(X, y):
        start = time.time()
        pred = estimator.predict([image])
        end = time.time()
        
        times.append(end - start)
        
        if pred[0] == label:
            correct += 1
    
    print(f"Accuracy: {correct / len(y)} \n Time (in milliseconds) mean: {np.mean(times) * 1000} ({np.std(times) * 1000} std) ")


In [None]:
with open("results/gauss-grid_search.pkl", "rb") as f:
    clf: GridSearchCV = pickle.load(f)
    
data = pd.DataFrame(clf.cv_results_)

data.loc[data["params"] == clf.best_params_]