# Setup

In [None]:
!pip install faiss-cpu
!pip install insightface
!pip install onnxruntime
!pip install onnxruntime-gpu
!pip install mxnet

In [2]:
import insightface
insightface.__version__

'0.4.2'

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
project_path = "/content/drive/My Drive/ml/edyo/project"
debug_folder_path = f"{project_path}/output/mask/debug"
data_path = f"{project_path}/data"

In [5]:
!cp -r "/content/drive/My Drive/ml/edyo/project/utils" .

In [6]:
import numpy as np
import pandas as pd
pd.set_option('display.precision', 2)
import cv2
import os
import sys
import tqdm
import glob

# from utils.recognizer import InsightFaceRecognizer
from utils.recognizer import BaseFaceRecognizer
from utils.index import FaissIndexWrapper 
from utils.video_reader import VideoReader

# Utils

In [None]:
class InsightFaceRecognizer(BaseFaceRecognizer):
    def __init__(self, descriptor_path=None, index=None, ctx_id=0):
        super(InsightFaceRecognizer, self).__init__(descriptor_path, index, ctx_id)
        self._face_analysis = FaceAnalysis(allowed_modules=["detection", "recognition"])
        self._face_analysis.prepare(ctx_id=ctx_id)
        
    def find_faces(self, image):
        pass
    
    def get_encodings(self, image):
        return self._face_analysis.get(image)

    def get_face_embedding(self, face_encodings):
        if len(face_encodings) == 1:
            return face_encodings[0].embedding
        else:
            raise ValueError(f"{len(face_encodings)} faces found")

In [9]:
class VideoDebugger(object):
    def __init__(self, recognizer, debug_folder_path):
        self.recognizer = recognizer
        self.debug_folder_path = debug_folder_path
        self.video_output_path = f"{debug_folder_path}/videos"
        self.frames_output_path = f"{debug_folder_path}/frames"
        self.csv_output_path = f"{debug_folder_path}/csv"
        os.makedirs(self.video_output_path, exist_ok=True)
        os.makedirs(self.frames_output_path, exist_ok=True)
        os.makedirs(self.csv_output_path, exist_ok=True)
        
    def debug_video(self, video_path, person_name, skip_frames=20):
        video_name = os.path.basename(video_path)
        vr = VideoReader(video_path)
        fourcc = cv2.VideoWriter_fourcc(*'MJPG')

        # out = cv2.VideoWriter(f"{self.video_output_path}/{video_name}", fourcc, vr.fps, (vr.width, vr.height))
        
        output_df = pd.DataFrame([], columns=["frame", "num_faces", "person_name", "distance", "similar_people"])

        while vr.has_next():
            frame = vr.read(skip=0, rgb=True)
            if frame is None: continue
            if vr.get_frame_idx() == 1 or vr.get_frame_idx()%skip_frames == 0:
                frame, people = self._process_frame(frame)
                person_name, distance, similar_people = None, None, None
                
                if len(people) == 1:
                    person_name = people[0]["person_name"]
                    distance = people[0]["distance"]
                    similar_people = people[0]["similar_people"]
                # else:
                    # cv2.imwrite(f"{self.frames_output_path}/people_{len(people)}_{video_name}_frame_{vr.get_frame_idx()}_.png", frame[:,:,::-1])

                # if len(people) == 1 and person_name not in people[0]["person_name"]:
                #     cv2.imwrite(f"{self.frames_output_path}/person_{people[0]['person_name']}_{video_name}_frame_{vr.get_frame_idx()}.png", frame[:,:,::-1])
                    
                output_df = output_df.append({"frame": vr.get_frame_idx(), "num_faces": len(people),
                                              "person_name": person_name, "distance": distance,
                                              "similar_people": similar_people}, ignore_index=True)

            # for person in people:
            #     frame = self._draw_recognition_info(frame, person) 
            # out.write(frame[:,:,::-1])
            
        output_df.to_csv(f"{self.csv_output_path}/{video_name}_{skip_frames}.csv", index=False)
        # out.release()
        
    def create_summary(self):
        summary_df = pd.DataFrame([], columns=["frame_path", "num_faces", "person_name", "distance", "similar_people"])
        for cvs_name in os.listdir(self.csv_output_path):
            csv = pd.read_csv(f"{self.csv_output_path}/{cvs_name}")
            csv["frame_path"] = csv["frame"].apply(lambda x: f"{cvs_name}_{x}")
            summary_df = summary_df.append(csv[summary_df.columns], ignore_index=True)
            
        summary_df.to_csv(f"{self.debug_folder_path}/summary.csv", index=False)
        return summary_df
        
    # def _draw_recognition_info(self, frame, person):
    #     if person is None: return frame

    #     left, top, right, bottom = map(int, person["bbox"])
    #     color = (0,0,255)
    #     cv2.rectangle(frame, (left, top), (right, bottom), color, 3)

    #     for point in person["kps"]:
    #         cv2.circle(frame, tuple(point), 5, color, -1)

    #     text = f"{person['person_name']} (dist={person['distance']:.2f})"
    #     h, w = frame.shape[:2]
        
    #     if bottom < h:
    #         cv2.rectangle(frame, (left, bottom-30), (right, bottom), color, -1)
    #         cv2.putText(frame, text, (left+6, bottom-10), cv2.FONT_HERSHEY_SIMPLEX , 0.5, (255, 255, 255), 1, cv2.LINE_AA) 
    #     else:
    #         cv2.rectangle(frame, (left, top-30), (right, top), color, -1)
    #         cv2.putText(frame, text, (left+6, top-10), cv2.FONT_HERSHEY_SIMPLEX , 0.5, (255, 255, 255), 1, cv2.LINE_AA)

    #     return frame

    def _process_frame(self, frame):
        people = []
        face_encodings = self.recognizer.get_encodings(frame)
        for face_info in face_encodings:
            distance, person_name = self.recognizer.recognize_face([face_info])
            similar_people = self.recognizer.get_similar_people([face_info])
            people.append({
                "bbox": face_info.bbox,
                "kps": face_info.kps,
                "det_score": face_info.det_score,
                "person_name": person_name,
                "distance": distance,
                "similar_people": similar_people
            })

        # for person in people:
        #     frame = self._draw_recognition_info(frame, person)  

        return frame, people

In [36]:
def process_output(video_list, summary_df, dataset_df, title):
    output_df = pd.DataFrame([], columns=["wrong_face_detection_%", "wrong_face_recognition_%"])
    
    for video_name in video_list:
        selected_df = summary_df[summary_df["frame_path"].str.startswith(video_name)]
        processed_frames = len(selected_df)
        person_name = dataset_df[dataset_df["video_name"] == video_name]["person_name"].iloc[0]
        
        wrong_face_detection = len(selected_df[selected_df["num_faces"] != 1])
        wrong_person_detection = len(selected_df[selected_df["person_name"] != person_name])
        
        output_df = output_df.append({"wrong_face_detection_%": wrong_face_detection/processed_frames*100, 
                                        "wrong_face_recognition_%": wrong_person_detection/processed_frames*100}, 
                                       ignore_index=True)
#     output_df = output_df.agg({'wrong_face_detection_%': ['mean', 'count'],
#                                'wrong_face_recognition_%': ['mean', 'count']})
    output_df = output_df.mean().to_frame()#.describe()
    output_df.columns = ["mean"]
    return output_df.style.set_caption(f"{title}: {len(video_list)} videos")

# Run

In [None]:
os.makedirs(debug_folder_path, exist_ok=True)

index_path = f"{data_path}/insightface_vector.index"
descriptor_path = f"{data_path}/insightface_embeddings.csv"

faiss_index = FaissIndexWrapper(index_path)
faiss_index.load()

recognizer = InsightFaceRecognizer(descriptor_path, faiss_index, ctx_id=0) 

d = VideoDebugger(recognizer, debug_folder_path)

person_name = "Hanna Shubina"
videos_list = sorted(glob.iglob(f"{project_path}/dataset/video/medium/*.mp4"))

In [22]:
for video_path in tqdm.tqdm(videos_list, file=sys.stdout):
    d.debug_video(video_path, person_name)

100%|██████████| 245/245 [22:53<00:00,  5.61s/it]


In [23]:
summary_df = d.create_summary()
print(summary_df.shape)
print(summary_df["num_faces"].value_counts())
summary_df["person_name"].value_counts()[:20]

(8784, 5)
1.0    7519
0.0    1265
Name: num_faces, dtype: int64


Hanna Shubina            6570
Naomi_Watts               215
Dick_Cheney               121
James_Blake                97
John_Ashcroft              80
Kathryn_Bigelow            48
John_Eder                  48
Jonathan_Karsh             40
Don_Nickles                29
Martin_McGuinness          23
Paddy_Long                 23
Florencia_Macri            22
Alvaro_Silva_Calderon      19
James_Kelly                16
Victor_Kraatz              14
Teresa_Worbis              13
Monica_Bellucci            12
John_Negroponte            11
Jeff_Feldman               10
Michael_Wayne               7
Name: person_name, dtype: int64

# Analysis

In [25]:
summary_df = pd.read_csv(f"{debug_folder_path}/summary.csv")
summary_df["person_name"].value_counts()[:20]

Hanna Shubina            6570
Naomi_Watts               215
Dick_Cheney               121
James_Blake                97
John_Ashcroft              80
Kathryn_Bigelow            48
John_Eder                  48
Jonathan_Karsh             40
Don_Nickles                29
Martin_McGuinness          23
Paddy_Long                 23
Florencia_Macri            22
Alvaro_Silva_Calderon      19
James_Kelly                16
Victor_Kraatz              14
Teresa_Worbis              13
Monica_Bellucci            12
John_Negroponte            11
Jeff_Feldman               10
Michael_Wayne               7
Name: person_name, dtype: int64

In [26]:
annotations_df = pd.read_csv(f"{project_path}/dataset/Milestone 1 - Dataset.csv")
annotated_df = annotations_df[annotations_df.iloc[:, 1].notna()]

In [27]:
dataset_df = annotated_df.iloc[1:, [0,2,3,4,5,6,7,8,9,10,11,12]].copy()
dataset_df.columns = ["video_name", "person_name", "no_face", "one_face", "multiple_faces",
                      "dark", "normal_light", "light",
                      "visible", "not_facing_camera", "mask", "occluded"]
dataset_df.reset_index(drop=True, inplace=True)

one_face = dataset_df["no_face"].isna() & dataset_df["one_face"].notna() & dataset_df["multiple_faces"].isna()

In [42]:
one_person_only_list = dataset_df[one_face]["video_name"].unique()

process_output(one_person_only_list, summary_df, dataset_df, "One person")

Unnamed: 0,mean
wrong_face_detection_%,22.32
wrong_face_recognition_%,48.33


In [43]:
normal_list = dataset_df[one_face & dataset_df["normal_light"].notna() 
                         & dataset_df["mask"].isna()
                         & dataset_df["occluded"].isna()]["video_name"].unique()

process_output(normal_list, summary_df, dataset_df, "One person")

Unnamed: 0,mean
wrong_face_detection_%,0.66
wrong_face_recognition_%,3.46


In [44]:
mask_video_list = dataset_df[dataset_df["no_face"].isna() 
                             & dataset_df["multiple_faces"].isna()
                             & dataset_df["mask"].notna()]["video_name"].unique()

process_output(mask_video_list, summary_df, dataset_df, "Mask")

Unnamed: 0,mean
wrong_face_detection_%,13.04
wrong_face_recognition_%,97.1


In [45]:
dark_video_list = dataset_df[dataset_df["no_face"].isna() 
                             & dataset_df["multiple_faces"].isna()
                             & dataset_df["dark"].notna()]["video_name"].unique()

process_output(dark_video_list, summary_df, dataset_df, "Dark")

Unnamed: 0,mean
wrong_face_detection_%,83.33
wrong_face_recognition_%,99.65
