#### Module imports

In [1]:
import os
import random

import numpy                as np
import sqlalchemy           as sqla
import matplotlib.image     as mpimg
import matplotlib.pyplot    as plt
import api.global_variables as glb

from re                     import compile, IGNORECASE
from tqdm                   import tqdm
from uuid                   import UUID
from IFR.api                import *
from IFR.classes            import *
from IFR.functions          import *
from sklearn.cluster        import DBSCAN

#### Parameters

In [2]:
# Toggles / flags:
build_n_save      = False # build and save face detectors & verifiers
create_database   = False # create database
save_new_database = True  # save database
align             = True  # perform face alignment

# Paths:
saved_detectors = 'api/saved_models/detectors' # face detector save directory
saved_verifiers = 'api/saved_models/verifiers' # face verifier save directory

# SQLITE_DB_FP    = '/home/rpessoa/projects/loki/api/data/database/loki.sqlite' # full path of new database
SQLITE_DB_FP    = 'api/data/database/loki_test.sqlite'
img_path        = 'api/data/img'                  # image directory to be used

# Other:
load_detectors  = ['retinaface']
load_verifiers  = ['ArcFace']

use_detector    = 'retinaface'
use_verifier    = 'ArcFace'

normalization   = 'base'
metric          = 'cosine'

# DBSCAN
dbscan_eps         = 0.5
dbscan_min_samples = 3

#### Initialization

In [3]:
# Builds and saves face detectors and verifiers (depending on 'build_n_save')
if build_n_save:
    # All face detector and verifier names
    detector_names = ['opencv', 'ssd', 'mtcnn', 'retinaface']
    verifier_names = ['VGG-Face', 'Facenet', 'Facenet512', 'OpenFace',
                      'DeepFace', 'DeepID' , 'ArcFace']

    # Builds all face detectors and verifiers
    detectors = batch_build_detectors(detector_names, show_prog_bar=True,
                                        verbose=False)
    verifiers = batch_build_verifiers(verifier_names, show_prog_bar=True,
                                        verbose=False)

    # Prints the number of face detectors and verifiers built
    print('Number of detectors built:', len(detectors))
    print('Number of verifiers built:', len(verifiers), '\n')

    # Saves each face detector model
    for name, obj in detectors.items():
        status = save_built_model(name, obj, saved_detectors, overwrite=True,
                                    verbose=True)
    print('')

    # Saves each face verifier model
    for name, obj in verifiers.items():
        status = save_built_model(name, obj, saved_verifiers, overwrite=True,
                                    verbose=True)
    print('')

In [4]:
# ---------------------- Loading / creating face detectors ---------------------

# Loads (or creates) all face detectors
print('  -> Loading / creating face detectors:')
detector_models = init_load_detectors(load_detectors, saved_detectors)
print('\n> Detectors:', detector_models, sep='\n')

  -> Loading / creating face detectors:
[load_built_model] Loading model retinaface.pickle: failed! Reason: retinaface.pickle does not exist in api/saved_models/detectors
[build_detector] Building retinaface: Metal device set to: Apple M1 Max

systemMemory: 32.00 GB
maxCacheSize: 10.67 GB

success!


> Detectors:
{'retinaface': <tensorflow.python.eager.def_function.Function object at 0x2c0bd4a30>}


In [5]:
# ---------------------- Loading / creating face verifiers ---------------------

# Loads (or creates) all face verifiers
print('  -> Loading / creating face verifiers:')
verifier_models = init_load_verifiers(load_verifiers, saved_verifiers)
print('\n> Verifiers:', verifier_models, sep='\n')

  -> Loading / creating face verifiers:
[load_built_model] Loading model ArcFace.pickle: success!

> Verifiers:
{'ArcFace': <keras.engine.functional.Functional object at 0x2e9473e80>}


#### Database

In [6]:
# Tries to load a database if it exists. If not, create a new one.
print('  -> Loading / creating database: ', end='')
glb.sqla_engine = load_database(SQLITE_DB_FP)
if glb.sqla_engine is None:
    raise AssertionError('Failed to load or create database!')
else:
    print('success!')
print('')

# Tries to load a session if it exists. If not, create a new one.
print('  -> Loading / creating session: ', end='')
glb.sqla_session = start_session(glb.sqla_engine)
if glb.sqla_session is None:
    raise AssertionError('Failed to create session!')
else:
    print('success!')
print('')

  -> Loading / creating database: success!

  -> Loading / creating session: success!



#### Staging area

In [7]:
def process_faces_from_dir(img_dir, detector_models, verifier_models, session,
                        detector_name='retinaface', verifier_names=['ArcFace'],
                        normalization='base', align=True, verbose=False):
    """
    TODO: Documentation
    """
    # Initializes records (which will be a list of FaceReps)
    records = []

    # Assuming img_dir is a directory containing images
    img_paths = get_image_paths(img_dir)
    img_paths.sort()

    # No images found, do something about it
    if len(img_paths) == 0:
        # Does something about the fact that there are no images in the
        # directory - for now just raise an assertion error
        raise AssertionError('No images in the directory specified')

    # Creates the progress bar
    n_imgs = len(img_paths)
    pbar   = tqdm(range(0, n_imgs), desc='Processing face images',
                    disable=(not show_prog_bar))

    # If auto grouping is True, then initialize the embeddings list
    if auto_grouping:
        embds = []

    # Loops through each image in the 'img_dir' directory
    for index, i, img_path in zip(pbar, range(0, n_imgs), img_paths):
        # Detects faces
        output = do_face_detection(img_path, detector_models=detector_models,
                                    detector_name=detector_name, align=align,
                                    verbose=verbose)

        # Calculates the deep neural embeddings for each face image in outputs
        embeddings = calc_embeddings(output['faces'], verifier_models,
                                     verifier_names=verifier_names,
                                     normalization=normalization)

        # Loops through each (region, embedding) pair and create a record
        # (FaceRep object)
        for region, cur_embds in zip(output['regions'], embeddings):
            # id        - handled by sqlalchemy
            # person_id - dont now exactly how to handle this (sqlalchemy?)
            # image_name_orig = img_path.split('/')[-1]
            # image_fp_orig   = img_path
            # image_name      = ''   # currently not being used in this approach
            # image_fp        = ''   # currently not being used in this approach
            # group_no        = -1   # will this be used? because person_id will be used instead I believe
            # region          = region
            # embeddings      = cur_embds
            record = FaceRep(image_name_orig=img_path.split('/')[-1],
                        image_name='', image_fp_orig=img_path,
                        image_fp='', group_no=-1, region=region,
                        embeddings=cur_embds)
            
            session.add(record)
            records.append(record)

            # If auto grouping is True, then store each calculated embedding
            if auto_grouping:
                embds.append(cur_embds[verifier_names[0]])


    # Clusters Representations together using the DBSCAN algorithm
    if auto_grouping:
        # Clusters embeddings using DBSCAN algorithm
        results = DBSCAN(eps=eps, min_samples=min_samples,
                         metric=metric).fit(embds)

        # Loops through each label and updates the 'group_no' attribute of each
        # record IF group_no != -1 (because -1 is already the default value and
        # means "no group")
        for i, lbl in enumerate(results.labels_):
            if lbl == -1:
                continue
            else:
                records[i].group_no = lbl

    # Loops through each record and add them to the global session
    for record in records:
        glb.sqla_session.add(record)
    
    # Return representation database
    return records

# ------------------------------------------------------------------------------

def process_faces_from_dir2(img_dir, detector_models, verifier_models,
                        detector_name='retinaface', verifier_names=['ArcFace'],
                        normalization='base', align=True, verbose=False):
    """
    TODO: Documentation
    """
    # Initializes records (which will be a list of FaceReps)
    records = []

    # Assuming img_dir is a directory containing images
    img_paths = get_image_paths(img_dir)
    img_paths.sort()

    # No images found, do something about it
    if len(img_paths) == 0:
        # Does something about the fact that there are no images in the
        # directory - for now just raise an assertion error
        raise AssertionError('No images in the directory specified')

    # Creates the progress bar
    n_imgs = len(img_paths)
    pbar   = tqdm(range(0, n_imgs), desc='Processing face images',
                    disable=(not show_prog_bar))

    # If auto grouping is True, then initialize the embeddings list
    if auto_grouping:
        embds = []

    # Loops through each image in the 'img_dir' directory
    for index, i, img_path in zip(pbar, range(0, n_imgs), img_paths):
        # Detects faces
        output = do_face_detection(img_path, detector_models=detector_models,
                                    detector_name=detector_name, align=align,
                                    verbose=verbose)

        # Calculates the deep neural embeddings for each face image in outputs
        embeddings = calc_embeddings(output['faces'], verifier_models,
                                     verifier_names=verifier_names,
                                     normalization=normalization)

        # Loops through each (region, embedding) pair and create a record
        # (FaceRep object)
        for region, cur_embds in zip(output['regions'], embeddings):
            # id        - handled by sqlalchemy
            # person_id - dont now exactly how to handle this (sqlalchemy?)
            # image_name_orig = img_path.split('/')[-1]
            # image_fp_orig   = img_path
            # image_name      = ''   # currently not being used in this approach
            # image_fp        = ''   # currently not being used in this approach
            # group_no        = -1
            # region          = region
            # embeddings      = cur_embds
            record = (img_path.split('/')[-1], '', img_path, '', -1, region,
                        cur_embds)
            
            records.append(record)

            # If auto grouping is True, then store each calculated embedding
            if auto_grouping:
                embds.append(cur_embds[verifier_names[0]])


    # Clusters Representations together using the DBSCAN algorithm
    if auto_grouping:
        # Clusters embeddings using DBSCAN algorithm
        results = DBSCAN(eps=eps, min_samples=min_samples,
                         metric=metric).fit(embds)

        # Loops through each label and updates the 'group_no' attribute of each
        # record IF group_no != -1 (because -1 is already the default value and
        # means "no group")
        for i, lbl in enumerate(results.labels_):
            if lbl == -1:
                continue
            else:
                records[i].group_no = lbl
    
    # Return representation database
    return records

# ------------------------------------------------------------------------------

test_dir       = 'api/data/img'
detector_name  = 'retinaface'
verifier_names = ['ArcFace']
align          = True
show_prog_bar  = False
tags           = []
uids           = []
normalization  = 'base'
auto_grouping  = True
eps            = 0.5
min_samples    = 2
metric         = 'cosine'
verbose        = True

In [8]:
# This uses 'process_faces_from_dir' function
do_approach_1 = True
if do_approach_1:

    records = process_faces_from_dir(test_dir, detector_models, verifier_models,
                    glb.sqla_session, detector_name=detector_name, align=align,
                    verifier_names=verifier_names, normalization=normalization,
                    verbose=verbose)
    glb.sqla_session.commit()

[do_face_detection] Loading face detector: success!
[do_face_detection] Detecting faces: success!
[do_face_detection] Loading face detector: success!
[do_face_detection] Detecting faces: success!


In [9]:
# This uses 'process_faces_from_dir2' function
do_approach_2 = False
if do_approach_2:
    records = process_faces_from_dir2(test_dir, detector_models,
                    verifier_models, detector_name=detector_name, align=align,
                    verifier_names=verifier_names, normalization=normalization,
                    verbose=verbose)

    reps = []
    for i, record in enumerate(records):
        reps.append(FaceRep(image_name_orig = record[0], image_name = record[1],
                            image_fp_orig   = record[2], image_fp   = record[3],
                            group_no        = record[4], region     = record[5],
                            embeddings      = record[6]))
        glb.sqla_session.add(reps[i])
    glb.sqla_session.commit()