In [1]:
import pandas as pd
import numpy as np
import sklearn
import matplotlib.pyplot as plt
import cv2
import tensorflow as tf
from tensorflow import keras
import time
import os

In [2]:
%load_ext nb_black

<IPython.core.display.Javascript object>

In [3]:
model = keras.models.load_model("../Data/model_face_pair")

Metal device set to: Apple M1


2022-08-08 12:38:25.460081: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2022-08-08 12:38:25.460535: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


<IPython.core.display.Javascript object>

In [4]:
test_subjects = [l for l in os.listdir("../Data/test_images/") if l[0] != "."]

<IPython.core.display.Javascript object>

In [5]:
images_test = {}
for t in test_subjects:
    images_test[t] = [
        cv2.cvtColor(
            cv2.imread("../Data/test_images/" + t + "/" + im, cv2.IMWRITE_JPEG_QUALITY),
            cv2.COLOR_BGR2RGB,
        )
        for im in os.listdir("../Data/test_images/" + t)
        if im != ".DS_Store"
    ]

<IPython.core.display.Javascript object>

In [6]:
def prepareImg(image):
    height = image.shape[0]
    width = image.shape[1]
    if height > width:
        lf = int((height - width) / 2)
        rg = height - width - lf
        new_img = np.concatenate(
            [np.zeros((height, lf, 3)), image, np.zeros((height, rg, 3))], axis=1
        )
        return cv2.resize(new_img, (250, 250))
    elif height < width:
        up = int((width - height) / 2)
        dn = width - height - up
        new_img = np.concatenate(
            [np.zeros((up, width, 3)), image, np.zeros((dn, width, 3))], axis=0
        )
        return cv2.resize(new_img, (250, 250))
    return cv2.resize(image, (250, 250))

<IPython.core.display.Javascript object>

In [7]:
names_to_test = np.array([])
test_batch = None
for key in images_test.keys():
    if test_batch is None:
        test_batch = np.concatenate(
            [
                prepareImg(im).reshape((1, 250, 250, 3)) / 127.5 - 1
                for im in images_test[key]
            ]
        )
    else:
        test_batch = np.concatenate(
            [test_batch]
            + [
                prepareImg(im).reshape((1, 250, 250, 3)) / 127.5 - 1
                for im in images_test[key]
            ]
        )
    names_to_test = np.concatenate(
        [names_to_test, np.array([key]).repeat(len(images_test[key]))]
    )

<IPython.core.display.Javascript object>

In [8]:
video_capture = cv2.VideoCapture(0)
if not video_capture.isOpened():
    print("Unable to access the camera")
else:
    print("Access to the camera was successfully obtained")

print("Streaming started")
res = pd.DataFrame()
res["label"] = names_to_test

face_cascade = cv2.CascadeClassifier(
    cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
)
while True:
    # Capture frame-by-frame
    ret, frame = video_capture.read()
    if not ret:
        print("Can't receive frame (stream end?). Exiting ...")
        break
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(
        gray,
        scaleFactor=1.3,
        minNeighbors=5,
        minSize=(40, 40),
        flags=cv2.CASCADE_SCALE_IMAGE,
    )
    frame_shape = frame.shape
    frame_model = cv2.cvtColor(
        frame,
        cv2.COLOR_BGR2RGB,
    )
    for (x, y, w, h) in faces:
        y_min = max(y - h, 0)
        y_max = min(frame_shape[0], y + 2 * h)
        x_min = max(x - w, 0)
        x_max = min(frame_shape[1], x + 2 * w)
        face_processed = prepareImg(frame_model[y_min:y_max, x_min:x_max, :])
        face_processed = np.tile(
            face_processed.reshape((1, 250, 250, 3)) / 127.5 - 1,
            (len(names_to_test), 1, 1, 1),
        )
        results_pred = model(np.concatenate([test_batch, face_processed], axis=1))
        res["pred"] = results_pred.numpy().reshape(-1)
        best_match = (
            res.groupby("label").max().sort_values("pred", ascending=False).iloc[0]
        )
        if best_match["pred"] > 0.4:
            cv2.rectangle(
                frame,
                (x, y),
                (x + w, y + h),
                (255, 0, 0),
                2,
            )
            cv2.putText(
                img=frame,
                text=f'Hello {" ".join([str.capitalize(n) for n in best_match.name.split("_")])} -- {best_match["pred"]}',
                org=(x, y - 10),
                fontFace=cv2.FONT_HERSHEY_TRIPLEX,
                fontScale=1,
                color=(0, 255, 0),
                thickness=2,
            )

    cv2.imshow("Face detector - to quit press ESC", frame)
    # Exit with ESC
    key = cv2.waitKey(1)
    if key % 256 == 27:  # ESC code
        break

# When everything done, release the capture
video_capture.release()
cv2.destroyAllWindows()
cv2.waitKey(1)
print("Streaming ended")

Access to the camera was successfully obtained
Streaming started
Streaming ended


<IPython.core.display.Javascript object>

The video capture code is based on

https://towardsdatascience.com/how-to-create-real-time-face-detector-ff0e1f81925f

