In [1]:
import PIL.Image
import dlib
import numpy as np
from PIL import ImageFile
import face_recognition_models
import cv2
import numpy
import threading
import platform
from multiprocessing import Process, Manager, cpu_count, set_start_method
import time
import os
import glob

In [2]:
# Get next worker's id
def next_id(current_id, worker_num):
    if current_id == worker_num:
        return 1
    else:
        return current_id + 1


# Get previous worker's id
def prev_id(current_id, worker_num):
    if current_id == 1:
        return worker_num
    else:
        return current_id - 1

In [3]:
# A subprocess use to capture frames.
def capture(read_frame_list, Global, worker_num):
    # Get a reference to webcam #0 (the default one)
    video_capture = cv2.VideoCapture(0)
    # video_capture.set(3, 640)  # Width of the frames in the video stream.
    # video_capture.set(4, 480)  # Height of the frames in the video stream.
    # video_capture.set(5, 30) # Frame rate.
    print("Width: %d, Height: %d, FPS: %d" % (video_capture.get(3), video_capture.get(4), video_capture.get(5)))

    while not Global.is_exit:
        # If it's time to read a frame
        if Global.buff_num != next_id(Global.read_num, worker_num):
            # Grab a single frame of video
            ret, frame = video_capture.read()
            read_frame_list[Global.buff_num] = frame
            Global.buff_num = next_id(Global.buff_num, worker_num)
        else:
            time.sleep(0.01)

    # Release webcam
    video_capture.release()

In [4]:
ImageFile.LOAD_TRUNCATED_IMAGES = True

In [5]:
predictor_5_point_model = face_recognition_models.pose_predictor_five_point_model_location()
pose_predictor_5_point = dlib.shape_predictor(predictor_5_point_model)

face_recognition_model = face_recognition_models.face_recognition_model_location()
face_encoder = dlib.face_recognition_model_v1(face_recognition_model)

In [6]:
def load_image_file(file):
    im = PIL.Image.open(file)
    im = im.convert('RGB')
    return np.array(im)

In [7]:
def css_to_rect(css):
    return dlib.rectangle(css[3], css[0], css[1], css[2])
def raw_face_location(img, number_of_times_to_upsample=1, model="hog"):
    
    if model == "cnn":
        return cnn_face_detector(img, number_of_times_to_upsample)
    else:
        face_detector = dlib.get_frontal_face_detector()
        return face_detector(img, number_of_times_to_upsample)
def raw_face_landmarks(face_image, face_locations=None, model="large"):
    if face_locations is None:
        face_locations = raw_face_location(face_image)
    else:
        face_locations = [css_to_rect(face_location) for face_location in face_locations]

    pose_predictor = pose_predictor_5_point


    return [pose_predictor(face_image, face_location) for face_location in face_locations]
def encode_face(face_image, known_face_locations=None, num_jitters=1, model="small"):
    
    raw_landmarks = raw_face_landmarks(face_image, known_face_locations, model)
    return [np.array(face_encoder.compute_face_descriptor(face_image, raw_landmark_set, num_jitters)) for raw_landmark_set in raw_landmarks]

In [8]:
# Takes each image from the folder and encodes each of them as known face encodings
known_face_encodings = []
known_face_names = []
for image in glob.glob('./images/*'):
    img = load_image_file(image)
    enc = encode_face(img)[0]
    known_face_encodings.append(enc)
    known_face_names.append(os.path.splitext(os.path.basename(image))[0])

In [9]:
def rect_to_css(rect):
    return rect.top(), rect.right(), rect.bottom(), rect.left()

def trim_css_to_bounds(css, image_shape):

    return max(css[0], 0), min(css[1], image_shape[1]), min(css[2], image_shape[0]), max(css[3], 0)

def get_face_locations(img, number_of_times_to_upsample=1, model="hog"):

    if model == "cnn":
        return [trim_css_to_bounds(rect_to_css(face.rect), img.shape) for face in _raw_face_locations(img, number_of_times_to_upsample, "cnn")]
    else:
        return [trim_css_to_bounds(rect_to_css(face), img.shape) for face in raw_face_location(img, number_of_times_to_upsample, model)]

In [10]:
def face_distance(face_encodings, face_to_compare):
    if len(face_encodings) == 0:
        return np.empty((0))

    return np.linalg.norm(face_encodings - face_to_compare, axis=1)
def compare_faces(known_face_encodings, face_encoding_to_check, tolerance=0.6):
    return list(face_distance(known_face_encodings, face_encoding_to_check) <= tolerance)

In [11]:
# Many subprocess use to process frames.
def process(worker_id, read_frame_list, write_frame_list, Global, worker_num):
    known_face_encodings = Global.known_face_encodings
    known_face_names = Global.known_face_names
    while not Global.is_exit:

        # Wait to read
        while Global.read_num != worker_id or Global.read_num != prev_id(Global.buff_num, worker_num):
            # If the user has requested to end the app, then stop waiting for webcam frames
            if Global.is_exit:
                break

            time.sleep(0.01)

        # Delay to make the video look smoother
        time.sleep(Global.frame_delay)

        # Read a single frame from frame list
        frame_process = read_frame_list[worker_id]

        # Expect next worker to read frame
        Global.read_num = next_id(Global.read_num, worker_num)

        # Convert the image from BGR color (which OpenCV uses) to RGB color (which face_recognition uses)
        rgb_frame = frame_process[:, :, ::-1]

        # Find all the faces and face encodings in the frame of video, cost most time
        face_locations = get_face_locations(rgb_frame)
        face_encodings = face_encodings(rgb_frame, face_locations)

        # Loop through each face in this frame of video
        for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
            # See if the face is a match for the known face(s)
            matches = compare_faces(known_face_encodings, face_encoding)

            name = "Unknown"

            # If a match was found in known_face_encodings, just use the first one.
            if True in matches:
                first_match_index = matches.index(True)
                name = known_face_names[first_match_index]

            # Draw a box around the face
            cv2.rectangle(frame_process, (left, top), (right, bottom), (0, 0, 255), 2)

            # Draw a label with a name below the face
            cv2.rectangle(frame_process, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED)
            font = cv2.FONT_HERSHEY_DUPLEX
            cv2.putText(frame_process, name, (left + 6, bottom - 6), font, 1.0, (255, 255, 255), 1)

        # Wait to write
        while Global.write_num != worker_id:
            time.sleep(0.01)

        # Send frame to global
        write_frame_list[worker_id] = frame_process

        # Expect next worker to write frame
        Global.write_num = next_id(Global.write_num, worker_num)


In [14]:
if __name__ == '__main__':


    # Global variables
    Global = Manager().Namespace()
    Global.buff_num = 1
    Global.read_num = 1
    Global.write_num = 1
    Global.frame_delay = 0
    Global.is_exit = False
    read_frame_list = Manager().dict()
    write_frame_list = Manager().dict()

    # Number of workers (subprocess use to process frames)
    if cpu_count() > 2:
        worker_num = cpu_count() - 1  # 1 for capturing frames
    else:
        worker_num = 2
    print(f'worker number {worker_num}')
    # Subprocess list
    p = []

    # Create a thread to capture frames (if uses subprocess, it will crash on Mac)
    p.append(threading.Thread(target=capture, args=(read_frame_list, Global, worker_num,)))
    p[0].start()

    # Create workers
    for worker_id in range(1, worker_num + 1):
        p.append(Process(target=process, args=(worker_id, read_frame_list, write_frame_list, Global, worker_num,)))
        p[worker_id].start()
    print(p)

#     # Start to show video
#     last_num = 1
#     fps_list = []
#     tmp_time = time.time()
#     while not Global.is_exit:
#         while Global.write_num != last_num:
#             last_num = int(Global.write_num)

#             # Calculate fps
#             delay = time.time() - tmp_time
#             tmp_time = time.time()
#             fps_list.append(delay)
#             if len(fps_list) > 5 * worker_num:
#                 fps_list.pop(0)
#             fps = len(fps_list) / numpy.sum(fps_list)
#             print("fps: %.2f" % fps)

#             # Calculate frame delay, in order to make the video look smoother.
#             # When fps is higher, should use a smaller ratio, or fps will be limited in a lower value.
#             # Larger ratio can make the video look smoother, but fps will hard to become higher.
#             # Smaller ratio can make fps higher, but the video looks not too smoother.
#             # The ratios below are tested many times.
#             if fps < 6:
#                 Global.frame_delay = (1 / fps) * 0.75
#             elif fps < 20:
#                 Global.frame_delay = (1 / fps) * 0.5
#             elif fps < 30:
#                 Global.frame_delay = (1 / fps) * 0.25
#             else:
#                 Global.frame_delay = 0

#             # Display the resulting image
#             cv2.imshow('Video', write_frame_list[prev_id(Global.write_num, worker_num)])

#         # Hit 'q' on the keyboard to quit!
#         if cv2.waitKey(1) & 0xFF == ord('q'):
#             Global.is_exit = True
#             break

#         time.sleep(0.01)

#     # Quit
#     video_capture.release()
#     cv2.destroyAllWindows()

worker number 15
[<Thread(Thread-6, started 20068)>, <Process(Process-10, started)>, <Process(Process-11, started)>, <Process(Process-12, started)>, <Process(Process-13, started)>, <Process(Process-14, started)>, <Process(Process-15, started)>, <Process(Process-16, started)>, <Process(Process-17, started)>, <Process(Process-18, started)>, <Process(Process-19, started)>, <Process(Process-20, started)>, <Process(Process-21, started)>, <Process(Process-22, started)>, <Process(Process-23, started)>, <Process(Process-24, started)>]
Width: 640, Height: 480, FPS: 30


In [None]:
# video_capture = cv2.VideoCapture(0)
# while(True):
#     # Grab a single frame of video
#     ret, frame = video_capture.read()
#     frameorg= frame

#     # Convert the image from BGR color (which OpenCV uses) to RGB color (which face_recognition uses)
#     rgb_frame = frame[:, :, ::-1]

#     # Find all the faces and face enqcodings in the frame of video
#     face_locations = get_face_locations(rgb_frame)
#     face_encodings = encode_face(rgb_frame, face_locations)

#     # Loop through each face in this frame of video
#     for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
#         # See if the face is a match for the known face(s)
#         matches = compare_faces(known_face_encodings, face_encoding)

#         name = "Unknown"

#         # If a match was found in known_face_encodings, just use the first one.
#         # if True in matches:
#         #     first_match_index = matches.index(True)
#         #     name = known_face_names[first_match_index]

#         # Or instead, use the known face with the smallest distance to the new face
#         face_distances = face_distance(known_face_encodings, face_encoding)
#         best_match_index = np.argmin(face_distances)
#         if matches[best_match_index]:
#             name = known_face_names[best_match_index]

#         # Draw a box around the face
#         cv2.rectangle(frame, (left, top), (right, bottom), (0, 0, 255), 2)

#         # Draw a label with a name below the face
#         cv2.rectangle(frame, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED)
#         font = cv2.FONT_HERSHEY_DUPLEX
#         cv2.putText(frame, name, (left + 6, bottom - 6), font, 1.0, (255, 255, 255), 1)

#     # Display the resulting image
#     cv2.imshow('Video', frameorg)

#     if cv2.waitKey(1) & 0xFF == ord('q'):
#         break
# video_capture.release()
# cv2.destroyAllWindows() 