## Face Recognition from Video

In [None]:
import insightface
import cv2
import dlib
from imutils.video import FileVideoStream
import numpy as np
import scipy
from sklearn.metrics.pairwise import euclidean_distances
import time

In [None]:
import mxnet as mx 
def gpu_device(gpu_number=0):
    try:
        _ = mx.nd.array([1, 2, 3], ctx=mx.gpu(gpu_number))
    except mx.MXNetError:
        return None
    return mx.gpu(gpu_number)

In [None]:
CTX = -1
if gpu_device():
    CTX = 0

### Loading trained model

In [None]:
model = insightface.app.FaceAnalysis()
model.prepare(ctx_id=CTX)

embeddings = np.load('assets/embeddings.npz')
KNOWN_EMBEDDINGS, KNOWN_SUBJECTS = embeddings['KNOWN_EMBEDDINGS'], embeddings['KNOWN_SUBJECTS']

### util functions

In [None]:
def anonymize_face(image, face_bbox):
    # face patially out of frame
    face_bbox[0] = max(face_bbox[0], 0)
    face_bbox[1] = max(face_bbox[1], 0)
    face_bbox[2] = min(face_bbox[2], image.shape[1])
    face_bbox[3] = min(face_bbox[3], image.shape[0])

    img_face = image[face_bbox[1]:face_bbox[3],
                     face_bbox[0]:face_bbox[2]]
    
    image[face_bbox[1]:face_bbox[3], face_bbox[0]:face_bbox[2]
          ] = cv2.GaussianBlur(img_face, (101, 101), 0)
    
    return image

In [4]:
def delete_low_quality_trackers(frame, quality_threshold=5):
    fid_delete_list = []
    for fid in face_trackers.keys():
        tracking_quality = face_trackers[fid].update(frame)

        if tracking_quality < quality_threshold:
            fid_delete_list.append(fid)

    for fid in fid_delete_list:
        # print("Removing fid " + str(fid) + " from list of trackers")
        face_trackers.pop(fid, None)

In [5]:
def update_face_trackers(frame, faces):
    matched_or_created = []
    new_faces = []
    tracker_mapping = []

    for face in faces:
        box = face.bbox.astype(np.int)
        x, y, w, h = box[0], box[1], box[2]-box[0], box[3]-box[1]

        # calculate the centerpoint
        x_bar = x + 0.5 * w
        y_bar = y + 0.5 * h

        matched_fid = None
        for fid in face_trackers.keys():
            tracked_position = face_trackers[fid].get_position()

            t_x = int(tracked_position.left())
            t_y = int(tracked_position.top())
            t_w = int(tracked_position.width())
            t_h = int(tracked_position.height())

            t_x_bar = t_x + 0.5 * t_w
            t_y_bar = t_y + 0.5 * t_h

            if ((t_x <= x_bar <= (t_x + t_w)) and (t_y <= y_bar <= (t_y + t_h)) and (x <= t_x_bar <= (x + w)) and (y <= t_y_bar <= (y + h))):
                matched_fid = fid

        tracker = dlib.correlation_tracker()
        tracker.start_track(
            frame, dlib.rectangle(x-2, y-1, x+w+1, y+h+1))

        global current_fid
        if matched_fid is None:
            # print("New tracker " + str(current_fid))
            face_trackers[current_fid] = tracker
            matched_or_created.append(current_fid)

            tracker_mapping.append(current_fid)
            current_fid += 1
        else:
            face_trackers[matched_fid] = tracker
            matched_or_created.append(matched_fid)
            tracker_mapping.append(matched_fid)

    delete_fid = []
    for fid in face_trackers.keys():
        if fid not in matched_or_created:
            delete_fid.append(fid)

    for fid in delete_fid:
        face_trackers.pop(fid, None)

    return tracker_mapping

In [6]:
def annotate_frame_with_face_recognition(frame, predicted_names, access_list, threshold=1.0):
    for fid in face_trackers.keys():
        tracked_position = face_trackers[fid].get_position()

        t_x = int(tracked_position.left())
        t_y = int(tracked_position.top())
        t_w = int(tracked_position.width())
        t_h = int(tracked_position.height())

        font = cv2.FONT_HERSHEY_SIMPLEX
        if fid in predicted_names.keys():
            name, conf = predicted_names[fid]
            if conf < threshold:
                text = f'{name}'
#                 cv2.rectangle(frame, (t_x, t_y),(t_x + t_w, t_y + t_h), (80, 236, 18), 2)
            else:
                text = 'unknown'
#                 cv2.rectangle(frame, (t_x, t_y),(t_x + t_w, t_y + t_h), (80, 18, 236), 2)
            if text not in access_list:
                anonymize_face(frame, face_bbox=[t_x, t_y,
                                                 t_x + t_w, t_y + t_h])
            else:
                cv2.rectangle(frame, (t_x, t_y),(t_x + t_w, t_y + t_h), (80, 236, 18), 2)
#             cv2.putText(frame, text, (int(t_x)-1, int(t_y)-5),
#                         font, 0.5, (255, 255, 255), 2)
    return frame

In [8]:
import os

In [9]:
video_path = os.path.join('MVI_8215.MOV')
recognition_threshold=1.0
access_list = ['Rayed', 'Tanjid']

In [13]:
num_frame_to_track = 8
predicted_names = {}
face_trackers = {}
current_fid = 0

frame_count = 0

fvs = FileVideoStream(video_path)
fvs.start()
time.sleep(0.5)

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
videowriter = cv2.VideoWriter('demo.mp4',fourcc, 25, (720, 480))

start_time = time.time()

while fvs.running():
    frame = fvs.read()
    if frame is None:
        fvs.stop()
        break
    else:
        frame = cv2.resize(frame, (720, 480))

        delete_low_quality_trackers(frame)

        if frame_count % num_frame_to_track == 0:
            faces = model.get(frame)

            tracker_mapping = update_face_trackers(
                frame, faces)

            face_embs = []
            for face_idx, face in enumerate(faces):
                face_embs.append(face.normed_embedding)

            if len(face_embs) > 0:
                face_embs = np.asarray(face_embs)
                euclidin_dist = euclidean_distances(
                    face_embs, KNOWN_EMBEDDINGS)
                predictions = np.argmin(euclidin_dist, axis=1)
                pred_dist = euclidin_dist[np.arange(
                    euclidin_dist.shape[0]), predictions]

            for i, f_id in enumerate(tracker_mapping):
                if f_id in predicted_names:
                    old_n, old_dist = predicted_names[f_id]
                    if pred_dist[i] < old_dist:
                        predicted_names[f_id] = (
                            KNOWN_SUBJECTS[predictions[i]], pred_dist[i])
                else:
                    predicted_names[f_id] = (
                        KNOWN_SUBJECTS[predictions[i]], pred_dist[i])

        rec_frame = annotate_frame_with_face_recognition(
            frame, predicted_names, access_list, threshold=recognition_threshold)
        
        frame_count += 1
        if frame_count % min(10, frame_count) == 0:
            end_time = time.time()
            fps = 10/(end_time-start_time)
            start_time = time.time()
        fps_text = f'Processing {fps:0.2f} frames per second'

        cv2.putText(rec_frame, fps_text, (20, 40),
                    cv2.FONT_HERSHEY_DUPLEX, 0.5, (255, 255, 255), 1)

        videowriter.write(frame)


videowriter.release()