# Face Recognition (FR) - DLIB ResNET Approximation with Genetic Programming

In [None]:
import pandas as pd
import json
import matplotlib.pyplot as plt
import numpy as np
from math import inf
from pathlib import Path
import seaborn as sns

from random import random, seed, randint
import operator
from time import time

from deap import algorithms
from deap import base
from deap import creator
from deap import tools
from deap import gp

from util._telegram import send_simple_message

### Prepare data and generate extra information

In [None]:
DLIB_DISTANCES_FILE = Path("fr", "distances_dlib.json")
RESNET_DISTANCES_FILE = Path("fr", "distances_resnet.json")
RESNET_FACEPARTS_DISTANCES_FILE = Path("fr", "distances_resnet_faceparts.json")

BEST_INDIVIDUAL_FILE = Path("fr", "best_combination_runs", "gp_dlib_resnet_best_comb_0002.json")
BEST_INDIVIDUALS_FILE = Path("fr", "best_combination_runs","gp_dlib_resnet_best_combs_0002.json")

In [None]:
# Load distances from raw files into dataframes

# DLIB Distances ( <pair>: {'dlib': distance}} )
tmp_raw_data = json.load(open(DLIB_DISTANCES_FILE, "r"))
dlib_distances = pd.DataFrame(dict(pair=tmp_raw_data.keys(), dlib_distance=(d['dlib'] for d in tmp_raw_data.values())))
del tmp_raw_data

# ResNET Faceparts Distances
def rows_generator(resnet_faceparts_raw_data):
    for pair, distances in resnet_faceparts_raw_data.items():
        distances.update({'pair': pair})
        yield distances

tmp_raw_data = json.load(open(RESNET_FACEPARTS_DISTANCES_FILE, "r"))
generator = rows_generator(tmp_raw_data)
del tmp_raw_data

resnet_faceparts_distances = pd.DataFrame(generator)

# Join distances into a sigle dataframe
distances = dlib_distances.merge(resnet_faceparts_distances, on='pair', how='outer')

del dlib_distances
del resnet_faceparts_distances

# Filter only images with "n" (from VGGFACE2)
distances = distances[distances.pair.apply(lambda p: "n" in p)]

# Generate extra columns
distances["img1"] = distances.pair.apply(lambda p: p.split(" x ")[0])
distances["img2"] = distances.pair.apply(lambda p: p.split(" x ")[1])
distances["person1"] = distances.img1.apply(lambda p: p.split("_")[0])
distances["person2"] = distances.img2.apply(lambda p: p.split("_")[0])
distances["same_person"] = (distances.person1 == distances.person2).apply(lambda s: "same" if s else "different")

# Delete unnecessary columns
distances.drop(columns='pair', inplace=True)

### Genetic Programming (GP) Search

In [None]:
RESNET_COLS_TO_IGNORE = ["resnet_left_ear", "resnet_right_ear", "resnet_ears", "resnet_full_face"]

# Individuals representation
resnet_cols = list(filter(lambda c: ('resnet' in c) and (c not in RESNET_COLS_TO_IGNORE), distances.columns))

IND_SIZE = len(resnet_cols)

In [None]:
SUB_SET_SIZE = 1000000  # Number of distances to consider
CXPB = 0.5  # Probability with which two individuals are crossed
MUTPB = 0.25 # Probability for mutating an individual
POP_SIZE = 200
HALL_OF_FAME_SIZE = 10
MAX_GENERATIONS = 30

In [None]:
cleared_distances = distances.replace(inf, np.nan)
cleared_distances.dropna(inplace=True)
# cleared_distances = cleared_distances[cleared_distances.dlib_distance > 0.01].reset_index(drop=True)
cleared_distances = cleared_distances[cleared_distances.img1 != cleared_distances.img2] # Remove same image pairs
cleared_distances.sort_values(by='dlib_distance', ascending=True, inplace=True)

# TODO - Change the subseting to be more intelligent
sub_df = cleared_distances.iloc[:SUB_SET_SIZE]

# Normalize distances
sub_df = sub_df.loc[:, resnet_cols + ["dlib_distance"]] # Get numerical columns to nomrlize
for col in sub_df.columns:
    sub_df[col] = (sub_df[col]-sub_df[col].min())/(sub_df[col].max()-sub_df[col].min())

resnet_distances_norm = sub_df.loc[:, resnet_cols]

In [None]:
# Define new functions
def protectedDiv(left, right):
    try:
        return left / right
    except ZeroDivisionError:
        return 1

pset = gp.PrimitiveSet("MAIN", IND_SIZE)
pset.addPrimitive(operator.add, 2)
pset.addPrimitive(operator.sub, 2)
pset.addPrimitive(operator.mul, 2)
pset.addPrimitive(protectedDiv, 2)
pset.addPrimitive(operator.neg, 1)
# TODO - Adicionar mais operadores

# pset.addPrimitive(math.cos, 1)
# pset.addPrimitive(math.sin, 1)
pset.addEphemeralConstant("rand101", lambda: randint(-1,1))

In [None]:
creator.create("FitnessMin", base.Fitness, weights=(-1.0,)) # Error (minimize)
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMin)

toolbox = base.Toolbox()
toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=1, max_=2)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("compile", gp.compile, pset=pset)

def eval_individual_error_gp(individual):
    func = toolbox.compile(expr=individual)

    def apply_func(row):
        return func(*row)

    sub_df.loc[:, 'combination'] = resnet_distances_norm.apply(apply_func, axis=1, raw=True)
    sub_df.loc[:, 'error'] = sub_df.combination - sub_df.dlib_distance
    sub_df.loc[:, 'sqr_error'] = sub_df.error**2

    # return (sub_df[sub_df.sqr_error != inf].sqr_error.mean(),) # Shall return a tuple for compatibility with DEAP
    return (sub_df[sub_df.sqr_error != inf].sqr_error.sum(),) # Shall return a tuple for compatibility with DEAP

toolbox.register("evaluate", eval_individual_error_gp)
toolbox.register("select", tools.selTournament, tournsize=3)
toolbox.register("mate", gp.cxOnePoint)
toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)

toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17))
toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17))

seed(318)

pop = toolbox.population(n=POP_SIZE)
hof = tools.HallOfFame(HALL_OF_FAME_SIZE)

stats_fit = tools.Statistics(lambda ind: ind.fitness.values)
stats_size = tools.Statistics(len)
mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size)
mstats.register("avg", np.mean)
mstats.register("std", np.std)
mstats.register("min", np.min)
mstats.register("max", np.max)

start_time = time()

pop, log = algorithms.eaSimple(pop, toolbox, CXPB, MUTPB, MAX_GENERATIONS, stats=mstats,
                            halloffame=hof, verbose=True)

end_time = time()
print(f"GP finished in {int((end_time - start_time)/60)} minutes")
_ = send_simple_message(f"GP finished in {int((end_time - start_time)/60)} minutes")

In [None]:
best = hof[0]
best_tree = gp.PrimitiveTree(best)
str(best_tree)

In [None]:
pop_fitness = pd.DataFrame(dict(pop_fitness=np.array([i.fitness.values[0] for i in pop])))
sns.lineplot(data=pop_fitness, x=pop_fitness.index, y='pop_fitness')

In [None]:
hof_fitness = pd.DataFrame(dict(hof_fitness=np.array([i.fitness.values[0] for i in hof])))
sns.lineplot(data=hof_fitness, x=hof_fitness.index, y='hof_fitness')