# Perform Experiments with DeepFace on LFW dataset

In [1]:
# built-in dependencies
import os
# os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import statistics

# 3rd party dependencies
import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score
from sklearn.datasets import fetch_lfw_pairs
from deepface import DeepFace
import lightgbm as lgb
import xgboost
from xgboost import plot_importance
from sklearn.model_selection import KFold
from tqdm import tqdm

2024-05-09 11:59:17.587328: I tensorflow/core/util/util.cc:169] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-05-09 11:59:17.590521: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2024-05-09 11:59:17.590530: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


In [2]:
seed = 17
detector_backend = "retinaface"
more_train = False # append some of validation data into train

In [3]:
print(f"This experiment is done with pip package of deepface with {DeepFace.__version__} version")

This experiment is done with pip package of deepface with 0.0.90 version


### Configuration Sets

In [4]:
# all configuration alternatives for 4 dimensions of arguments
alignment = [True]

models = ["Facenet", "Facenet512", "VGG-Face", "ArcFace", "Dlib"]
detectors = ["retinaface"]

metrics = ["euclidean_l2"]
expand_percentage = 0

### Create Required Folders if necessary

In [5]:
target_paths = [
    "lfwe",
    "lfwe/test",
    "lfwe/train",
    "lfwe/10_folds",
    "dataset",
    "outputs",
    "outputs/test",
    "outputs/train",
    "outputs/10_folds",
    "results"
]
for target_path in target_paths:
    if os.path.exists(target_path) != True:
        os.mkdir(target_path)
        print(f"{target_path} is just created")

### Load LFW Datasets

In [6]:
def retrieve_lfe(task: str):
    if task == "test":
        instances = 1000
    elif task == "train":
        instances = 2200
    elif task == "10_folds":
        instances = 6000
    else:
        raise ValueError(f"unimplemented task - {task}")

    pairs_touch = f"outputs/{task}_lfwe.txt"

    target_path = f"dataset/{task}_lfw.npy"
    labels_path = f"dataset/{task}_labels.npy"

    if os.path.exists(target_path) != True:
        fetched_lfw_pairs = fetch_lfw_pairs(
            subset = task,
            color = True,
            # memory allocation problem occurs for validation set
            resize = 2 if task != "10_folds" else 1,
            funneled = False,
            slice_=None,
        )
        print("fetched")
        pairs = fetched_lfw_pairs.pairs
        labels = fetched_lfw_pairs.target
        # target_names = fetched_lfw_pairs.target_names
        np.save(target_path, pairs)
        np.save(labels_path, labels)
    else:
        if os.path.exists(pairs_touch) != True:
            # loading pairs takes some time. but if we extract these pairs as image, no need to load it anymore
            pairs = np.load(target_path)
        labels = np.load(labels_path)
    
    # store to file system
    for i in tqdm(range(0, instances)):
        img1_target = f"lfwe/{task}/{i}_1.jpg"
        img2_target = f"lfwe/{task}/{i}_2.jpg"
        
        if os.path.exists(img1_target) != True:
            img1 = pairs[i][0]
            # plt.imsave(img1_target, img1/255) #works for my mac
            plt.imsave(img1_target, img1) #works for my debian
        
        if os.path.exists(img2_target) != True:
            img2 = pairs[i][1]
            # plt.imsave(img2_target, img2/255) #works for my mac
            plt.imsave(img2_target, img2) #works for my debian
        
    if os.path.exists(pairs_touch) != True:
        open(pairs_touch,'a').close()
    

In [7]:
retrieve_lfe(task = "test")
retrieve_lfe(task = "train")
retrieve_lfe(task = "10_folds")

100%|██████████| 1000/1000 [00:00<00:00, 204430.67it/s]
100%|██████████| 2200/2200 [00:00<00:00, 277927.44it/s]
100%|██████████| 6000/6000 [00:00<00:00, 224941.89it/s]


# Perform Experiments

In [8]:
def perform_experiments():    
    for model_name in models:
        for detector_backend in detectors:
            for distance_metric in metrics:
                for align in alignment:
                    
                    if detector_backend == "skip" and align is True:
                        # Alignment is not possible for a skipped detector configuration
                        continue
                    
                    calculate_distances(
                        model_name=model_name,
                        detector_backend=detector_backend,
                        distance_metric=distance_metric,
                        align=align,
                    )
                    
def calculate_distances(
        model_name: str,
        detector_backend: str,
        distance_metric: str = "euclidean_l2",
        align: bool = True
):
    for experiment in ["test", "train", "10_folds"]:
        if experiment == "test":
            instances = 1000
        elif experiment == "train":
            instances = 2200
        elif experiment == "10_folds":
            instances = 6000
        else:
            raise ValueError(f"unimplemented experiment - {experiment}")

        labels = np.load(f"dataset/{experiment}_labels.npy")

        alignment_text = "aligned" if align is True else "unaligned"
        task = f"{experiment}/{model_name}_{detector_backend}_{distance_metric}_{alignment_text}"
        output_file = f"outputs/{task}.csv"

        # check file is already available
        if os.path.exists(output_file) is True:
            continue
        
        distances = []
        for i in tqdm(range(0, instances), desc = task):
            img1_target = f"lfwe/{experiment}/{i}_1.jpg"
            img2_target = f"lfwe/{experiment}/{i}_2.jpg"
            result = DeepFace.verify(
                img1_path=img1_target,
                img2_path=img2_target,
                model_name=model_name,
                detector_backend=detector_backend,
                distance_metric=distance_metric,
                align=align,
                enforce_detection=False,
                expand_percentage=expand_percentage,
            )
            distance = result["distance"]
            distances.append(distance)
        # -----------------------------------
        df = pd.DataFrame(list(labels), columns = ["actuals"])
        df["distances"] = distances
        df.to_csv(output_file, index=False)

In [9]:
perform_experiments()

# Data Frame

In [10]:
# pre-tuned threshold for single models
if detector_backend == "mtcnn":
    thresholds = {
        "Facenet": 1.0927487190831375,
        "Facenet512": 1.0676744382971612,
        "VGG-Face": 1.199458073887602,
        "ArcFace": 1.1853355178343647,
        "Dlib": 0.4020917206804517,
    }
elif detector_backend == "retinaface":
    thresholds = {
        "Facenet": 1.0771751259493634,
        "Facenet512": 1.080821730376328,
        "VGG-Face": 1.1952250102966764,
        "ArcFace": 1.1601818883318848,
        "Dlib": 0.4022031592966787,
    }
elif detector_backend == "yunet":
    thresholds = {
        "Facenet": 1.066751738677861,
        "Facenet512": 1.0691771483816928,
        "VGG-Face": 1.1802823845238797,
        "ArcFace": 1.1945138501899335,
        "Dlib": 0.422060409585814,
    }
else:
    raise ValueError(f"unimplemented detector - {detector_backend}")

In [11]:
tasks = ["train", "test", "10_folds"]

dfs = {}
for task in tasks:
    dfs[task] = None
    for model in models:
        current_df = pd.read_csv(
            f"outputs/{task}/{model}_{detector_backend}_euclidean_l2_aligned.csv"
        ).rename(columns = {"distances": model})

        if dfs[task] is None:
            dfs[task] = current_df.copy()
        else:
            current_df = current_df.drop(columns = ["actuals"])
            dfs[task] = pd.concat([dfs[task], current_df], axis=1)

In [12]:
dfs["train"].head()

Unnamed: 0,actuals,Facenet,Facenet512,VGG-Face,ArcFace,Dlib
0,1,0.665361,0.529663,0.72559,0.754328,0.241096
1,1,0.761471,0.851045,0.926009,0.872847,0.361268
2,1,0.929644,1.010149,0.957905,1.005725,0.32021
3,1,0.298545,0.489141,0.789548,0.649016,0.259671
4,1,0.613064,0.679703,0.991215,0.950595,0.35663


In [13]:
print(f"{more_train=}")
if more_train:
    tmp_df = dfs["train"].append(dfs["10_folds"], ignore_index = True)
    dfs["train"] = tmp_df.sample(frac = 0.7, random_state=seed)
    dfs["10_folds"] = tmp_df.drop(dfs["train"].index)
    k = 1
else:
    k = 10

more_train=False


In [14]:
def add_classification_results(df: pd.DataFrame) -> pd.DataFrame:
    for model_name in models:
        idx = df[df[model_name] < thresholds[model_name]].index
        df[f"{model_name}_clf"] = -1
        df.loc[idx, f"{model_name}_clf"] = 1
    return df

dfs["train"] = add_classification_results(dfs["train"])
dfs["10_folds"] = add_classification_results(dfs["10_folds"])
dfs["test"] = add_classification_results(dfs["test"])

In [15]:
def add_classification_sum(df: pd.DataFrame) -> pd.DataFrame:
    df["clf_sum"] = 0
    for model_name in models:
        df["clf_sum"] += df[f"{model_name}_clf"]
    return df

dfs["train"] = add_classification_sum(dfs["train"])
dfs["10_folds"] = add_classification_sum(dfs["10_folds"])
dfs["test"] = add_classification_sum(dfs["test"])

In [16]:
def add_distance_additions(df: pd.DataFrame) -> pd.DataFrame:
    df["distance_sums"] = 0
    for model_name in models:
        df["distance_sums"] += df[f"{model_name}"]
    return df

dfs["train"] = add_distance_additions(dfs["train"])
dfs["10_folds"] = add_distance_additions(dfs["10_folds"])
dfs["test"] = add_distance_additions(dfs["test"])

In [17]:
def add_distance_multiplications(df: pd.DataFrame) -> pd.DataFrame:
    df["distance_multiplications"] = 1
    for model_name in models:
        df["distance_multiplications"] *= df[f"{model_name}"]
    return df

dfs["train"] = add_distance_multiplications(dfs["train"])
dfs["10_folds"] = add_distance_multiplications(dfs["10_folds"])
dfs["test"] = add_distance_multiplications(dfs["test"])

In [18]:
dfs["train"].sample(5)

Unnamed: 0,actuals,Facenet,Facenet512,VGG-Face,ArcFace,Dlib,Facenet_clf,Facenet512_clf,VGG-Face_clf,ArcFace_clf,Dlib_clf,clf_sum,distance_sums,distance_multiplications
744,1,0.555978,0.535129,0.720907,0.774279,0.282407,1,1,1,1,1,5,2.8687,0.0469
1466,0,0.475164,0.811694,0.765939,0.168319,0.481163,1,1,1,1,-1,3,2.70228,0.023925
1638,0,1.282475,1.204147,1.282847,1.24333,0.447448,-1,-1,-1,-1,-1,-5,5.460246,1.102127
1657,0,1.238476,1.338712,1.361004,1.282596,0.463052,-1,-1,-1,-1,-1,-5,5.68384,1.340151
1101,0,1.33419,1.365896,1.315901,1.36848,0.487813,-1,-1,-1,-1,-1,-5,5.872279,1.600847


In [19]:
categorical_features = [column for column in dfs["train"].columns if column.endswith("_clf")]
feature_names = list(dfs["train"].drop(columns=["actuals"]).columns)

# XGBoost

In [20]:
# restore sets
y_train = dfs["train"]["actuals"].values
x_train = dfs["train"].drop(columns=["actuals"]).values

y_test = dfs["test"]["actuals"].values
x_test = dfs["test"].drop(columns=["actuals"]).values

y_val = dfs["10_folds"]["actuals"].values
x_val = dfs["10_folds"].drop(columns=["actuals"]).values

In [22]:
params = {
    'learning_rate': 0.01
    , 'max_depth': 5
    , 'max_leaves': pow(2, 5) - 1
    , 'n_estimators': 10000
    , 'seed': 17
    , 'nthread':  2
    , 'object':  'binary:logistic'
}

In [23]:
models = []
for k in range(0, 10):
    print(f"Training {k}-th model")
    
    model = xgboost.XGBClassifier(**params)

    valid_from = k * 600
    valid_until = valid_from + 600

    _ = model.fit(
        x_train,
        y_train,
        eval_metric='logloss',
        eval_set=[(x_val[valid_from:valid_until], y_val[valid_from:valid_until])],
        early_stopping_rounds=500,
        verbose=False,
    )

    models.append(model)

Training 0-th model
Training 1-th model
Training 2-th model
Training 3-th model
Training 4-th model
Training 5-th model
Training 6-th model
Training 7-th model
Training 8-th model
Training 9-th model


In [24]:
def analyze_results(x, y, label):
    scores = []
    for k in range(0, 10):
        model = models[k]

        if label == "validation":
            x_org  = x.copy()
            y_org = y.copy()

            valid_from = k * 600
            valid_until = valid_from + 600

            x = x[valid_from:valid_until]
            y = y[valid_from:valid_until]

        predictions = model.predict(x)

        classified = 0
        for idx, prediction in enumerate(predictions):
            actual = y[idx]
            if actual == prediction:
                classified += 1

        score = 100 * (classified / len(predictions))
        print(round(score, 2))
        scores.append(score)

        # restore
        if label == "validation":
            x = x_org.copy()
            y = y_org.copy()

    return scores

In [25]:
train_results = analyze_results(x_train, y_train, "train")

99.73
99.68
99.64
99.68
99.73
99.73
99.68
99.68
99.68
99.86


In [26]:
round(sum(train_results)/10, 2)

99.71

In [27]:
val_results = analyze_results(x_val, y_val, "validation")

97.33
97.17
96.33
97.17
98.83
98.0
97.83
97.5
98.0
98.33


In [28]:
round(sum(val_results)/10, 2)

97.65

In [29]:
test_results = analyze_results(x_test, y_test, "test")

98.9
98.9
99.0
99.0
98.9
98.8
99.0
98.9
98.9
98.8


In [30]:
round(sum(test_results)/10, 2)

98.91