In [1]:
pip install -q deepface

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install -q opencv-contrib-python

Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install -q image2base64

Note: you may need to restart the kernel to use updated packages.


In [4]:
pip install -q tensorflow

Note: you may need to restart the kernel to use updated packages.


In [5]:
pip install -q confluent-kafka

Note: you may need to restart the kernel to use updated packages.


In [6]:
# imports
from deepface import DeepFace
import os
from PIL import Image
import numpy as np
import cv2
from matplotlib import pyplot as plt
import uuid
import pandas as pd
from image2base64.converters import base64_to_rgb, rgb2base64
import json
import traceback
import re
import time
import requests
from confluent_kafka import Consumer, Producer
import scipy
import multiprocessing as mp
import dask
from dask import delayed, visualize
from dask.distributed import Client

2023-06-18 14:22:24.355879: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [21]:
# models
path = r"models/EDSR_x2.pb"
sr = cv2.dnn_superres.DnnSuperResImpl_create()
sr.readModel(path)
sr.setModel("edsr", 2)

# directories
db_path = r"stored_faces_db"

# Image resizing options
crop_increase_percent = 25
standard_height = 155
standard_width = 120

# db api
URL_TABLE = "https://gc7da7be5da2e70-g833jueqvvi5nhsa.adb.eu-frankfurt-1.oraclecloudapps.com/ords/admin/persons/"
URL_MODULE = "https://gc7da7be5da2e70-g833jueqvvi5nhsa.adb.eu-frankfurt-1.oraclecloudapps.com/ords/admin/operations/"

# kafka
consumer = Consumer({
    'bootstrap.servers': '192.168.70.40:9092',
    'group.id': 'py',
    'auto.offset.reset': 'earliest'
})
consumer.subscribe(['toAnalyze'])

producer = Producer({'bootstrap.servers': '192.168.70.40:9092'})

# thresholds
face_detection_conf = 0.9
face_recognition_distance = 85

In [8]:
def add_face_to_oracledb(face_dict, cropped_cv2):
    new_id = uuid.uuid4()
    upscale_img = upscale(cropped_cv2)
    
    data = {
        "in_id": str(new_id),
        "in_age": str(face_dict["age"]),
        "in_gender": face_dict["dominant_gender"],
        "in_emotion": face_dict["dominant_emotion"],
        "in_cropped_img": rgb2base64(upscale_img)
    }

    headers = {'Content-type': 'application/json'}
    response = requests.post(URL_MODULE + "insert_person", json.dumps(data), headers=headers)
    print("Add face to oracle db")

In [9]:
def refresh_face_in_oracledb(face_dict, id, cropped_cv2):
    upscale_img = upscale(cropped_cv2)
    
    data = {
        "in_id": str(id),
        "in_cropped_img": rgb2base64(upscale_img),
        "in_age": str(face_dict["age"]), 
        "in_gender": face_dict["dominant_gender"], 
        "in_emotion": face_dict["dominant_emotion"]
    }
    
    headers = {'Content-type': 'application/json'}
    response = requests.post(URL_MODULE + "update_person", json.dumps(data), headers=headers)
    print("Refresh face in oracle db")

In [10]:
def handle_db_communication(face_dict, id, cropped_cv2):
    upscale_img = upscale(cropped_cv2)
    
    data = {
        "in_id": str(id),
        "in_cropped_img": rgb2base64(upscale_img),
        "in_age": str(face_dict["age"]), 
        "in_gender": face_dict["dominant_gender"], 
        "in_emotion": face_dict["dominant_emotion"]
    }
    headers = {'Content-type': 'application/json'}
    
    if id is "NEW":
        data["in_id"] = str(uuid.uuid4())
        response = requests.post(URL_MODULE + "insert_person", json.dumps(data), headers=headers)
        print("Add face to oracle db")
    else:
        response = requests.post(URL_MODULE + "update_person", json.dumps(data), headers=headers)
        print("Refresh face in oracle db")

In [11]:
def batch_insert_faces(persons_data):
    headers = {'Content-type': 'application/json'}
    data = json.dumps(persons_data)
    print(data)
    
    try:
        response = requests.post(URL_MODULE + "insert_person", data=data, headers=headers)
        response.raise_for_status()
        print("Insert persons in Oracle DB")
    except requests.exceptions.RequestException as error:
        print("Error:", error)

In [12]:
def reset_is_present():
    response = requests.post(URL_MODULE + "reset_is_present")

In [13]:
def truncate_oracle_db():
    response = requests.post(URL_MODULE + "truncate_table")
    response.raise_for_status()

In [14]:
def get_standard_size_img(cropped):
    return cv2.resize(cropped, (standard_width, standard_height))

In [15]:
def upscale(cropped_cv2):
    sr_upscale_np = sr.upsample(cropped_cv2)
    sr_upscale_np_corrected = cv2.cvtColor(sr_upscale_np, cv2.COLOR_RGB2BGR)

    return Image.fromarray(sr_upscale_np_corrected)

In [16]:
def check_existence(face_list, face_vector):
    if len(face_list) == 0:
        face_list.append({"vector":face_vector, "id":str(uuid.uuid4()), "time":time.time()})
        return "NEW"
    else:
        for index, element in enumerate(face_list):
            dist = scipy.spatial.distance.euclidean(face_vector, element["vector"])
            if dist <= face_recognition_distance:
                element["vector"] = face_vector
                element["time"] = time.time()
                face_list.pop(index)
                face_list.insert(0, element)
                print("Similiar face found with distance: " + str(dist))
                return element["id"]
        print("New face added to list")
        face_list.insert(0, {"vector":face_vector, "id":str(uuid.uuid4()), "time":time.time()})
        return "NEW"

In [17]:
def create_new_face(analyze_features, cropped_cv2):
    upscale_img = upscale(cropped_cv2)
    return {
        "id": str(uuid.uuid4),
        "age": analyze_features["age"],
        "gender": analyze_features["dominant_gender"],
        "emotion": analyze_features["dominant_emotion"],
        "cropped_img": rgb2base64(upscale_img)
    }

In [18]:
def process_face(face):
    
    cropped = face["cropped"]
    
    analyze_start = time.time()
    analyze = DeepFace.analyze(img_path=cropped, actions=['age', 'gender','emotion'], 
                           detector_backend='skip', enforce_detection=False, silent=True)
    print("Analyzing 1 person: " + str(time.time()-analyze_start) + " secs")
    recog_start = time.time()
    emb = DeepFace.represent(img_path=cropped, detector_backend= 'skip', model_name="VGG-Face", 
                             enforce_detection=False)[0]["embedding"]
    result = check_existence(face_list, emb) 
    print("Face recognition 1 person: " + str(time.time()-recog_start) + " secs")

    oracle_start = time.time()
    if result == "NEW":
        add_face_to_oracledb(face_dict=analyze[0], cropped_cv2=cropped)
    else:
        refresh_face_in_oracledb(face_dict=analyze[0], id=result, cropped_cv2=cropped)
    print("Oracle db actions: " + str(time.time()-oracle_start) + " secs")

In [23]:
face_list = []
num_workers = 10  # Specify the desired number of workers
client = Client(n_workers=num_workers)

while True:
    new_person = False
    try:
        msg = consumer.poll(1.0)

        if msg is None:
            print("Waiting")
            continue
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue
        else:
            start = time.time()
            reset_is_present()
            dict = json.loads(msg.value())
            for base64 in dict["screens"]:
                img = base64_to_rgb(base64, 'cv2')
                start = time.time()
                extract_faces_start = time.time()
                faces = DeepFace.extract_faces(img_path=img, detector_backend="mtcnn", 
                                             enforce_detection=False)
                for face in faces:
                    if face["confidence"] < face_detection_conf:
                        faces.remove(face)

                    x = face['facial_area'].get('x')
                    y = face['facial_area'].get('y')
                    w = face['facial_area'].get('w')
                    h = face['facial_area'].get('h')

                    w_increase = int(w * crop_increase_percent / 100)
                    h_increase = int(h * crop_increase_percent / 100)

                    x_new = x - w_increase // 2
                    y_new = y - h_increase // 2
                    w_new = w + w_increase
                    h_new = h + h_increase

                    cropped = get_standard_size_img(
                        img[y_new:y_new + h_new, x_new:x_new + w_new, :])
                    
                    face["cropped"] = cropped
                    
                computations = [process_face(face) for face in faces]
                dask.compute(*computations)
                
            print("Time elapsed: " + str(time.time()-start))

            producer.produce(topic="update_wall", value="DB refreshed")
            producer.flush()
            producer.produce(topic="update_feeds", value="DB refreshed")
            producer.flush()
    except Exception as e:
        traceback.print_exc()
        continue
client.close()

Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Analyzing 1 person: 4.169416666030884 secs
Face recognition 1 person: 2.244384288787842 secs
Add face to oracle db
Oracle db actions: 2.5941662788391113 secs
Analyzing 1 person: 0.26305699348449707 secs
Similiar face found with distance: 39.90586472342343
Face recognition 1 person: 0.1295778751373291 secs
Refresh face in oracle db
Oracle db actions: 1.8260259628295898 secs
Time elapsed: 2.8985862731933594
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting

KeyboardInterrupt: 

Process Dask Worker process (from Nanny):
2023-06-18 17:03:11,905 - distributed.nanny - ERROR - Worker process died unexpectedly
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/conda/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.10/site-packages/distributed/process.py", line 202, in _run
    target(*args, **kwargs)
  File "/opt/conda/lib/python3.10/site-packages/distributed/nanny.py", line 999, in _run
    asyncio.run(run())
  File "/opt/conda/lib/python3.10/asyncio/runners.py", line 47, in run
    _cancel_all_tasks(loop)
  File "/opt/conda/lib/python3.10/asyncio/runners.py", line 63, in _cancel_all_tasks
    loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
  File "/opt/conda/lib/python3.10/asyncio/base_events.py", line 636, in run_until_complete
    self.run_forever()
 