In [1]:
import cv2 
from retinaface import RetinaFace
from deepface import DeepFace
from deepface.basemodels import Facenet
from deepface.commons import functions, realtime, distance as dst
import os 
from tqdm import tqdm
import pickle
import pandas as pd
import shutil

In [2]:
# User face detect
class FaceDetect:
    
    def __init__(self):
        
        self.detector = RetinaFace


    def detect(self,frame):

        img = frame.copy()
        face_detect_result  = self.detector.detect_faces(img)
        
        if len(face_detect_result) == 0 :
            return frame , False
        for face in face_detect_result:
            ROI = face_detect_result[face]['facial_area'] #min_x , min_y , max_x ,max_y
            cv2.rectangle(frame,(ROI[0],ROI[1]),(ROI[2],ROI[3]),(0,255,0),2)

        return frame , True 

    def pre_processing(self,frame):

        processed_img = self.detector.extract_faces(frame)
        
        if len(processed_img) != 1:
            return None , False
        
        frame = cv2.cvtColor(processed_img[0],cv2.COLOR_RGB2BGR) 
        return frame , True

  

In [3]:
class FaceRecognize:
    
    def __init__(self,model_name = "Facenet" , detector_backend = "retinaface" ,distance_metric ="cosine"):
        
        self.model_name = model_name
        self.model = DeepFace.build_model(model_name)
        self.detector_backend = detector_backend
        self.distance_metric = distance_metric
    
    
    def generate_representation(self , db_path):
        
        employees =  []
        for r , d, f in os.walk(db_path):

            for file in f:
                
                if ('.jpg' in file.lower()) or ('.png' in file.lower()):
                    exact_path = r +"/" + file
                    employees.append(exact_path)
        if len(employees) == 0 :
            raise ValueError("There is no image !")
        
        representations = []

        progress_bar = tqdm(range(0,len(employees)),desc="Calculating")

        for index in progress_bar:

            employee = employees[index]
            instance = []
            instance.append(employee)

            representation = DeepFace.represent(
                img_path = employee,
                model_name =self.model_name,
                model = self.model,
                enforce_detection=False,
                detector_backend=self.detector_backend,
                align = True,
                normalization='base'
            )
            instance.append(representation)

            representations.append(instance)

        shutil.rmtree(db_path)
        return representations
        

    def generate_file(self,representations,has_file=False):
        
        file_name = "representation_%s.pkl" % (self.model_name)
        if has_file:
            with open(f"./{file_name}",'rb') as f :
                org_representations = pickle.load(f)
            
            for i  in representations:
                org_representations.append(i)

            representations = org_representations

        with open(f"./{file_name}",'wb') as f :
            pickle.dump(representations,f)

    def recognize(self,img_path):
        
        file_name = "representation_%s.pkl" % (self.model_name)

        with open(f"{file_name}",'rb') as f:
            representations = pickle.load(f)
    
        df = pd.DataFrame(representations , columns = ["identity","%s_representation" % (self.model_name)])
    
        progress_bar = tqdm(range(0,1),desc="Analyzing")

        for i in progress_bar:
            

            target_representation = DeepFace.represent(
                img_path = img_path,
                model_name =self.model_name,
                model = self.model,
                enforce_detection=False,
                detector_backend=self.detector_backend,
                align = True,
                normalization='base'
            )
            

            distances = []
            for index ,instance in df.iterrows():
                source_representation = instance["%s_representation" % (self.model_name)]
                
                if self.distance_metric == "cosine":
                    distance = dst.findCosineDistance(source_representation,target_representation)
                elif self.distance_metric == "euclidean":
                    distance = dst.findEuclideanDistance(source_representation,target_representation)
                elif self.distance_metric == "euclidean_12":
                    distance = dst.findEuclideanDistance(dst.l2_normalize(source_representation), dst.l2_normalize(target_representation))
                
                distances.append(distance)
            
            df["%s_%s" % (self.model_name,self.distance_metric)] = distances
            # find the accept range
            threshold = dst.findThreshold(self.model_name,self.distance_metric)
            # drop feature
            df = df.drop(columns= ["%s_representation" % (self.model_name)])
            # filter smaller and equal threshold
            df = df[df["%s_%s" % (self.model_name,self.distance_metric)] <= threshold]
            # sort from small to large 
            df = df.sort_values(by = ["%s_%s" %(self.model_name,self.distance_metric)],ascending =True).reset_index(drop=True)

        
        if len(df) == 0:
            return "Other"
        
        else :
            identity = df['identity'][0].split('/')[1].split('_')
            return identity[0]+" "+identity[1]

In [4]:
def register_user(label):

    face_detection = FaceDetect()
    face_recognition = FaceRecognize()

    # need num of user's data  
    get_picture_num = 10

    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Cannot open camera")
        exit()

    # create user dir 
    path = "dataset" 
    if not os.path.isdir(path):
        os.mkdir(path)

    with open("label.txt","a+") as f:
        
        f.write("user %s\n" %(label))

    # cnt img_number 
    img_number = 0

    while True:
        
        ret , frame  = cap.read()

        if not ret:
            print("Cannot receive frame")
            break
        
        # pre-processing the img to only face 
        frame , check  = face_detection.pre_processing(frame)
        
        # haven't detect face ,skip
        if not check :
            continue
        
        # show 
        cv2.imshow("frame",frame)
        if cv2.waitKey(100) & 0xff ==ord('q'):
            break
        
        # save img 
        cv2.imwrite(f"{path}/user_{label}_{img_number}.jpg",frame)
        img_number+=1
        
        if img_number >= get_picture_num:
            break

    cv2.destroyAllWindows()

    representations = face_recognition.generate_representation(path)

    has_file = os.path.isfile("representation_Facenet.pkl")
    face_recognition.generate_file( representations ,has_file)
    

In [5]:

def recognize_user():

    face_Detector = FaceDetect()
    face_recognize =FaceRecognize()

    cap = cv2.VideoCapture(0)

    if not cap.isOpened():
        print("Cannot open camera")
        exit()
    while True :
        
        ret , frame = cap.read()

        if not ret:
            print("Cannot receive frame")
            break
        
        frame , check = face_Detector.pre_processing(frame)
        if not check:
            print("There is no face")
            continue
        
        cv2.imwrite("./detect.jpg" , frame)
        result = face_recognize.recognize(r"./detect.jpg")
        print("detect %s" % (result))
        
        if cv2.waitKey(100) & 0xff ==ord('q'):
            break
        
    cv2.destroyAllWindows()


In [10]:

def main(MODE = "Register"):
    
    if MODE == "Register":
        
        register_user("2")

    elif MODE == "Recognize":
        
        recognize_user()

main()

Calculating: 100%|██████████| 10/10 [00:16<00:00,  1.63s/it]


In [11]:
# check the representation size
with open(r"./representation_Facenet.pkl",'rb') as f:
    representations = pickle.load(f)
print(len(representations))


20
