In [58]:
from mtcnn import MTCNN
from keras_vggface.vggface import VGGFace
from keras_vggface.utils import preprocess_input
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import cv2 as cv
import tensorflow as tf
from tqdm import tqdm
from pathlib import Path
import os
import re

tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

In [59]:
test_folder = 'test'
mark_folder = 'mark'
dump_file = 'predictions.csv'

In [60]:
# Manipulations with files

def get_files_from_folder(folder):
    return [os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if re.match(r'.*\.jpg', f, flags=re.I)]


def get_class_img_from_file(file):
    img = plt.imread(file)
    cls = int(Path(file).stem.split('.')[0])
    return cls, img


def get_classes_imgs_from_folder(folder):
    files = get_files_from_folder(folder)
    classes, imgs = zip(*[get_class_img_from_file(f) for f in files])
    return classes, imgs
            
            
def get_batched_classes_imgs_from_folder(folder, batch_size):
    files_batches = np.reshape(get_files_from_folder(folder), (-1, batch_size))
    for fs_batch in fs_batches:
        classes, imgs = zip(*[get_class_img_from_file(f) for f in fs_batch])
        yield classes, imgs
            
            
def dump_predictions(predicted_classes, questioned_classes, dump_file):
    ext = np.array([predicted_classes, questioned_classes]).T
    df_ext = pd.DataFrame(ext)
    
    if os.path.exists(dump_file):
        df = pd.read_csv(dump_file)
        df = df.append(df_ext, ignore_index=True)
        df.to_csv(dump_file, index=False, columns=[])
    else:
        df_ext.to_csv(dump_file, index=False, columns=[])

In [61]:
# Face recognition prerequisites

# Pretrained model for face detection
mtcnn = MTCNN()
# Pretrained model (VGGFace2) for face recognition
face_spatial_shape = (224, 224)
vggface2_net = VGGFace(model='resnet50', input_shape=(*face_spatial_shape, 3), pooling='avg')


def get_face(detector, img, resize_to=(224, 224)):
    try:
        res = detector.detect_faces(img)
        x1, y1, width, height = res[0]['box']
        x1, y1 = max(0, x1), max(0, y1)
        x2, y2 = x1 + width, y1 + height
        face = img[y1:y2, x1:x2]
    except IndexError:
        # In case if no faces are found
        face = img
    face = cv.resize(face, resize_to)
    return face


def get_faces(detector, imgs, resize_to=(224, 224)):
    faces = []
    for img in tqdm(imgs):
        faces.append(get_face(detector, img, resize_to=resize_to))
    return faces


def get_encodings(model, faces):
    p_faces = preprocess_input(faces, version=2)
    encs = model.predict(p_faces)
    return encs


def get_class(questioned_enc, known_encs, known_classes):
    norms = np.squeeze(np.linalg.norm(known_encs - questioned_enc[np.newaxis], axis=1))
    return known_classes[np.argmin(norms)]


def get_classes(questioned_encs, known_encs, known_classes):
    return [get_class(q_enc, known_encs, known_classes) for q_enc in questioned_encs]

In [None]:
# Solve the actual problem

# Pull known features
known_classes, known_imgs = get_classes_imgs_from_folder(mark_folder)
known_faces = get_faces(mtcnn, known_imgs, resize_to=face_spatial_shape)
known_encodings = get_encodings(vggface2_net, known_faces)

# Recognition
batch_size = 20
for q_classes, q_imgs in get_batched_classes_imgs_from_folder(folder, batch_size=batch_size):
    q_faces = get_faces(detector, q_imgs, resize_to=face_spatial_shape)
    q_encodings = get_encodings(vggface2_net, q_faces)
    p_classes = get_classes(q_encodings, known_encodings, known_classes)
    dump_predictions(p_classes, q_classes, dump_file)

 40%|███▉      | 198/500 [05:45<09:49,  1.95s/it]