In [None]:
import os
import cv2
import shutil
import insightface
import numpy as np
from tqdm import tqdm
import matplotlib as mpl
from sklearn import preprocessing
import matplotlib.pyplot as plt

In [None]:
def decoder(image_name):
    result = image_name.split('#')
    class_name = result[0]
    video_name = result[1]
    return class_name, video_name
video2index = {}
index2video = []
index_file = 'index.txt'
with open(index_file, 'r') as f:
    lines = f.readlines()
for line in lines:
    index = line.split(' ')[0]
    image = line.split(' ')[1]
    _, video = decoder(image)
    video2index[video] = index
    index2video.append(video)

In [None]:
gpu_id = 0
face_db = 'face_db'
if os.path.isdir(face_db):
    shutil.rmtree(face_db)
os.makedirs(face_db)
threshold = 1.44
det_thresh = 0.5
det_size = (640, 640)
model = insightface.app.FaceAnalysis(root='./',  allowed_modules=None, providers=['CPUExecutionProvider'])
model.prepare(ctx_id=gpu_id, det_thresh=det_thresh, det_size=det_size)
faces_embedding = list()

In [None]:
def feature_compare(feature1, feature2, threshold):
    diff = np.subtract(feature1, feature2)
    dist = np.sum(np.square(diff), 1)
    if dist < threshold:
        return True
    else:
        return False

In [None]:
def register(image, user_name):
    faces = model.get(image)
    if len(faces) < 1:
        return 'no face'
    embedding = np.array(faces[0].embedding).reshape((1, -1))
    embedding = preprocessing.normalize(embedding)
    is_exits = False
    for com_face in faces_embedding:
        r = feature_compare(embedding, com_face["feature"], threshold)
        if r:
            is_exits = True
    if is_exits:
        return 'already exsit'
    cv2.imencode('.png', image)[1].tofile(os.path.join(face_db, '%s.png' % user_name))
    faces_embedding.append({
        "user_name": user_name,
        "feature": embedding
    })
    return "success"

In [None]:
def recognition(image):
    faces = model.get(image)
    final_results = []
    for face in faces:
        results = {}
        embedding = np.array(face.embedding).reshape((1, -1))
        embedding = preprocessing.normalize(embedding)
        user_name = "unknown"
        for com_face in faces_embedding:
            difference = np.subtract(embedding, com_face["feature"])
            distance = np.sum(np.square(difference), 1)
            results[com_face["user_name"]]= distance[0]
        sorted_results = sorted(results.items(), key=lambda x:x[1])
        if sorted_results:
            final_results.append(sorted_results[0][0])
        else:
            final_results.append('unknown')
    return final_results

In [None]:
def detect(image):
    faces = model.get(image)
    results = list()
    for face in faces:
        result = dict()
        result["bbox"] = np.array(face.bbox).astype(np.int32).tolist()
        result["kps"] = np.array(face.kps).astype(np.int32).tolist()
        result["landmark_3d_68"] = np.array(face.landmark_3d_68).astype(np.int32).tolist()
        result["landmark_2d_106"] = np.array(face.landmark_2d_106).astype(np.int32).tolist()
        result["pose"] = np.array(face.pose).astype(np.int32).tolist()
        result["age"] = face.age
        gender = 'Male'
        if face.gender == 0:
            gender = 'Female'
        result["gender"] = gender
        embedding = np.array(face.embedding).reshape((1, -1))
        embedding = preprocessing.normalize(embedding)
        result["embedding"] = embedding
        results.append(result)
    return results

In [None]:
def load_faces(face_db_path):
    if not os.path.exists(face_db_path):
        os.makedirs(face_db_path)
    for root, dirs, files in os.walk(face_db_path):
        for file in files:
            if file == ".DS_Store":
                continue
            input_image = cv2.imdecode(np.fromfile(os.path.join(root, file), dtype=np.uint8), 1)
            user_name = file.split(".")[0]
            face = model.get(input_image)[0]
            embedding = np.array(face.embedding).reshape((1, -1))
            embedding = preprocessing.normalize(embedding)
            faces_embedding.append({
                "user_name": user_name,
                "feature": embedding
            })

load_faces('./face_db/')

In [None]:
register_folder = 'test/video_test_nearest_240/'
load_faces('./face_db/')
for video_folder in os.listdir(register_folder):
    if video_folder == '.DS_Store':
        continue
    for picture_name in os.listdir(os.path.join(register_folder, video_folder)):
        img = cv2.imdecode(np.fromfile(os.path.join(register_folder, video_folder, picture_name), dtype=np.uint8), -1)
        results = recognition(img)
        class_name, video_name = decoder(picture_name)
        register_result = register(img, user_name=video_name)

In [None]:
resolutions = [15, 20, 30, 50, 100, 160, 240]
load_faces('./face_db/')

for resolution in tqdm(resolutions):
    test_folder = 'test/video_test_nearest_' + str(resolution)
    result_folder = './result/'
    total_results = {}
    for video_folder in os.listdir(test_folder):
        if video_folder == '.DS_Store':
            continue
        for picture_name in os.listdir(os.path.join(test_folder, video_folder)):
            img = cv2.imdecode(np.fromfile(os.path.join(test_folder, video_folder, picture_name), dtype=np.uint8), -1)
            results = recognition(img)
            class_name, video_name = decoder(picture_name)
            total_results[picture_name] = results
    with open(os.path.join(result_folder, str(resolution) + '.txt'),'w+') as f:
        for picture_name in total_results:
            temp_results = total_results[picture_name]
            f.write(f'{picture_name}#')
            for face in temp_results:
                f.write(f'{face}@')
            f.write(f'\n')