In [1]:
import degirum as dg, degirum_tools
import cv2, numpy as np

In [2]:
# choose inference host address
inference_host_address = "@cloud" 
# inference_host_address = "@local"

# choose zoo_url
zoo_url = "degirum/models_hailort"
# zoo_url = "<path to local folder>"

# set token
token = degirum_tools.get_token()
# token = '' # leave empty for local inference

# choose image source
image_source = "../assets/Friends.jpg"

**Face Detection and Keypoint Estimation with DeGirum PySDK** 

It loads a YOLOv8 face detection model with keypoint estimation and applies it to an input image. The detected faces and their corresponding keypoints are displayed visually, providing insights into facial regions. This setup is ideal for tasks such as face analysis, landmark detection, or pose estimation.

In [3]:
face_det_kypts_model_name = "yolov8n_relu6_widerface_kpts--640x640_quant_hailort_hailo8l_1"

# load AI model
face_det_kypts_model = dg.load_model(
    model_name=face_det_kypts_model_name,
    inference_host_address=inference_host_address,
    zoo_url=zoo_url,
    token=token
)

# perform AI model inference on given image source
print(f" Running inference using '{face_det_kypts_model_name}' on image source '{image_source}'")
face_det_kypts_inference_result = face_det_kypts_model(image_source)

# print('Inference Results \n', face_det_kypts_inference_result)  # Detection Results with keypoints

# show results of inference
with degirum_tools.Display("AI Camera") as output_display:
    output_display.show_image(face_det_kypts_inference_result)
    
print("Press 'x' or 'q' to stop.")

 Running inference using 'yolov8n_relu6_widerface_kpts--640x640_quant_hailort_hailo8l_1' on image source '../assets/Friends.jpg'
Press 'x' or 'q' to stop.


**Face Re-identification with DeGirum PySDK**

This script loads a YOLOv8-based face re-identification (ReID) model and applies it to the face crops obtained from YOLOv8 face detection model with keypoint estimation. The model extracts extracts unique features/embeddings from each cropped face.

The output of the ReID model is a 512-dimensional embedding, which encapsulates the unique features of each cropped face, enabling precise identification and comparison across different images.

In [4]:
face_reid_model_name = "arcface_mobilefacenet--112x112_quant_hailort_hailo8_2"
zoo_url = "degirum/sandbox_shashi"

# load AI model
face_reid_model = dg.load_model(
    model_name=face_reid_model_name,
    inference_host_address=inference_host_address,
    zoo_url=zoo_url,
    token=token
)
image = face_det_kypts_inference_result.image

cropped_faces=[]
for face in face_det_kypts_inference_result.results:
    x1, y1, x2, y2 = map(int, face["bbox"])
    # Crop the face from the image using corner coordinates
    cropped_face = image[y1:y2, x1:x2]
    # Append the cropped face to the list
    cropped_faces.append(cropped_face)

    
face_reid_inference_result = face_reid_model(cropped_faces[0]) # Embedding for a single cropped face
print ('Shape of a face embedding:', len(face_reid_inference_result.results[0]["data"][0])) # Print the length of the embedding for a single face.

Shape of a face embedding: 512


#### Display the Cropped Faces

In [5]:
# Assuming 'cropped_faces' contains a list of face images
num_faces = len(cropped_faces)
cols = 3  # Number of columns in the grid
rows = (num_faces // cols) + (num_faces % cols > 0)  # Calculate rows needed

# Calculate maximum width and height in one pass using zip to avoid multiple iterations
max_width, max_height = max((img.shape[1], img.shape[0]) for img in cropped_faces)

# Create a blank canvas to hold all the images in a grid layout
canvas_width = cols * max_width
canvas_height = rows * max_height
canvas = np.zeros((canvas_height, canvas_width, 3), dtype=np.uint8)  # Blank canvas

# Resize all images to match max width and height (if needed)
resized_faces = [cv2.resize(face, (max_width, max_height)) for face in cropped_faces]

# Place each image in the correct location on the canvas
for idx, face in enumerate(resized_faces):
    row = idx // cols  # Determine row number
    col = idx % cols  # Determine column number

    x_offset = col * max_width  # X position on canvas
    y_offset = row * max_height  # Y position on canvas

    # Place the face image on the canvas at the calculated position
    canvas[y_offset:y_offset + face.shape[0], x_offset:x_offset + face.shape[1]] = face

# Display the final canvas with all images in a grid
cv2.imshow('Cropped Faces Grid', canvas)
cv2.waitKey(0)
cv2.destroyAllWindows()


**Align and Crop**

The `align_and_crop` function is designed to align and crop a face from an image based on a given set of landmarks. This is particularly useful in facial recognition tasks, where precise alignment of the face is necessary for accurate feature extraction.


In [6]:
from skimage import transform as trans

def align_and_crop(img, landmarks, image_size=112):
    """
    Align and crop the face from the image based on the given landmarks.

    Args:
        img (np.ndarray): The full image (not the cropped bounding box).
        landmarks (List[np.ndarray]): List of 5 keypoints (landmarks) as (x, y) coordinates.
        image_size (int, optional): The size to which the image should be resized. Defaults to 112.

    Returns:
        Tuple[np.ndarray, np.ndarray]: The aligned face image and the transformation matrix.
    """
    _arcface_ref_kps = np.array(
        [
            [38.2946, 51.6963],
            [73.5318, 51.5014],
            [56.0252, 71.7366],
            [41.5493, 92.3655],
            [70.7299, 92.2041],
        ],
        dtype=np.float32,
    )
    assert len(landmarks) == 5
    assert image_size % 112 == 0 or image_size % 128 == 0

    if image_size % 112 == 0:
        ratio = float(image_size) / 112.0
        diff_x = 0
    else:
        ratio = float(image_size) / 128.0
        diff_x = 8.0 * ratio

    dst = _arcface_ref_kps * ratio
    dst[:, 0] += diff_x
    tform = trans.SimilarityTransform()
    tform.estimate(np.array(landmarks), dst)
    M = tform.params[0:2, :]

    aligned_img = cv2.warpAffine(img, M, (image_size, image_size), borderValue=0.0)

    return aligned_img, M

#### Display the Aligned Faces

In [7]:
aligned_faces=[]
for idx, result in enumerate(face_det_kypts_inference_result.results):
    landmarks = [landmark["landmark"] for landmark in result["landmarks"]]
    aligned_face, _ = align_and_crop(image, landmarks)
    aligned_faces.append(aligned_face)
    
# Display the concatenated aligned faces horizontally

# cv2.imshow('Aligned Faces', cv2.hconcat(aligned_faces))
# cv2.waitKey(0)
# cv2.destroyAllWindows()
# print("Press 'x' or 'q' to stop.")

#### Display both the Cropped faces and the Aligned faces

In [None]:
# Assuming 'cropped_faces' contains a list of cropped face images
num_faces = len(cropped_faces)
cols = 3  # Number of columns in the grid
rows = (num_faces // cols) + (num_faces % cols > 0)  # Calculate rows needed

# Calculate maximum width and height in one pass using zip to avoid multiple iterations
max_width, max_height = max((img.shape[1], img.shape[0]) for img in cropped_faces)

# Create a blank canvas to hold all the cropped faces in a grid layout
canvas_width = cols * max_width
canvas_height = rows * max_height
canvas = np.zeros((canvas_height, canvas_width, 3), dtype=np.uint8)  # Blank canvas

# Resize all cropped faces to match max width and height
resized_faces = [cv2.resize(face, (max_width, max_height)) for face in cropped_faces]

# Place each image in the correct location on the canvas (cropped faces grid)
for idx, face in enumerate(resized_faces):
    row = idx // cols  # Determine row number
    col = idx % cols  # Determine column number

    x_offset = col * max_width  # X position on canvas
    y_offset = row * max_height  # Y position on canvas

    # Place the face image on the canvas at the calculated position
    canvas[y_offset:y_offset + face.shape[0], x_offset:x_offset + face.shape[1]] = face

# Create the aligned faces (assuming 'align_and_crop' works properly)
aligned_faces = []
for idx, result in enumerate(face_det_kypts_inference_result.results):
    landmarks = [landmark["landmark"] for landmark in result["landmarks"]]
    aligned_face, _ = align_and_crop(image, landmarks)
    aligned_faces.append(aligned_face)

# Resize all aligned faces to match the max dimensions of the cropped faces (optional, for uniformity)
resized_aligned_faces = [cv2.resize(face, (max_width, max_height)) for face in aligned_faces]

# Concatenate all aligned faces horizontally (to make one row)
aligned_faces_grid = cv2.hconcat(resized_aligned_faces)

# Resize the cropped faces canvas to match the width of aligned faces grid
canvas_resized = cv2.resize(canvas, (aligned_faces_grid.shape[1], canvas.shape[0]))

# Check and ensure both grids have the same number of channels
if canvas_resized.shape[2] != aligned_faces_grid.shape[2]:
    canvas_resized = cv2.cvtColor(canvas_resized, cv2.COLOR_BGR2RGB)
    aligned_faces_grid = cv2.cvtColor(aligned_faces_grid, cv2.COLOR_BGR2RGB)

# Stack the cropped faces grid on top of the aligned faces grid (vertical stack)
final_display = cv2.vconcat([canvas_resized, aligned_faces_grid])

# Show the final concatenated result
cv2.imshow('Cropped and Aligned Faces', final_display)
cv2.waitKey(0)
cv2.destroyAllWindows()

print("Press 'x' or 'q' to stop.")


Press 'x' or 'q' to stop.


**Similarity between two embeddings**

In Face recognition, embeddings are numerical representations of a person's face. These embeddings capture the unique features of a face in a high-dimensional vector space, where faces that are similar will be closer together.

To measure the similarity between two face embeddings, the most common approach is to calculate a distance metric between the two vectors. The closer the vectors are in space, the more similar the faces are.
Two widely used metrics for this purpose are: 
1. Cosine Similarity  
2. Euclidean Distance

In [9]:
face_reid_inference_result_cropped_face1 = face_reid_model(cropped_faces[0]) # Embedding for the first cropped face
face_reid_inference_result_cropped_face2 = face_reid_model(cropped_faces[1]) # Embedding for the second cropped face

# print (len(face_reid_inference_result_cropped_face1.results[0]["data"][0]))
embedding1 = np.array(face_reid_inference_result_cropped_face1.results[0]["data"][0])
embedding2 = np.array(face_reid_inference_result_cropped_face2.results[0]["data"][0])
from sklearn.metrics.pairwise import cosine_similarity
cosine_similarity([embedding1], [embedding2])

array([[0.07326925]])

## Database Indexing

**Database indexing** is a technique used to improve the speed of data retrieval operations on a database table. 
Indexes allow for quick access to data without the need to scan every row, making data retrieval more efficient. 
In the context of modern vector databases like LanceDB, indexing is crucial for fast querying, especially when dealing with large datasets, such as embeddings for machine learning or image recognition tasks.

The components in database indexing are:

    1. Image generator
    2. Creating a new database table / Use the existing table
    3. Database Schema
    4. Adding data to the database

**Image generator** : This function is designed to iterate over a given directory or a single image file to generate paths to image files. Additionally, it can associate an entity name with each image based on its filename.

Parameters:

1. *input_path* (str or Path): This is the path to either a directory or a single image file. If a directory is provided, the function will recursively search for image files. If a single image file is provided, it will yield that file.

2. *identity_name* (str, optional): An optional name to associate with the image. If not provided, the function will extract the name from the image file's name (based on splitting the filename by underscores).

In [10]:
from pathlib import Path

def image_generator(input_path, identity_name = None):
    """Generate image paths from a given directory or a single image file."""
    path = Path(input_path)
    # If the input path is a single file, yield it if it's an image along with its entity name
    if path.is_file() and path.suffix.lower() in (".png", ".jpg", ".jpeg"):
        entity_name = identity_name if identity_name is not None else path.stem.split("_")[0]
        yield str(path), {"image_path": str(path), "entity_name": entity_name}
    # If it's a directory, yield all image files found within along with its entity name
    else:
        for file in path.rglob("*"):
            if file.suffix.lower() in (".png", ".jpg", ".jpeg"):
                entity_name = file.stem.split("_")[0]
                yield str(file), {"image_path": str(file), "entity_name": entity_name}


**Database Configuration**

This section defines the key parameters for working with a database like the database **URI** is used to establish a connection to the database. 
The **table_name** is the new table/existing table within the database where embeddings, identities, and associated metadata are stored. Each entry in the table corresponds to a unique face and its corresponding data, and the **input_path** provides the location of the image data used to generate embeddings, 

In [11]:
# URI of the database where face data is stored. 
uri = "face_database"

# Name of the table in the database that stores information such as facial embeddings, identities, and other related metadata.
table_name = "face"

# Path to the directory containing the sample dataset for indexing.
input_path = "../assets/Friends_dataset"

The **FaceRecognitionSchema** in LanceDB defines the structure and data types for storing face recognition-related data, such as face embeddings, identities, and metadata. This schema is used to ensure consistency when storing and querying face recognition data within a LanceDB database.

In [12]:
from lancedb.pydantic import LanceModel, Vector
import uuid

# Define the Lance schema for face recognition
class FaceRecognitionSchema(LanceModel):
    id: str  # Unique identifier for each entry
    vector: Vector(512)  # Face embeddings, fixed size of 512
    image_path: str = "image_path"  # Default image path
    entity_name: str = "default"  # Default entity name
    bbox: Vector(4)  # Bounding box with 4 dimensions (x, y, width, height)
    source: int = 0  # Source , default is 0

    @classmethod
    def format_data(cls, result) -> 'FaceRecognitionSchema':
        """Converts the result to a FaceRecognitionSchema instance.

        Args:
            result: A list of results containing embeddings and bounding box data.
            image_path: The path to the image associated with the entries.
            entity_name: Optional name for the entity; defaults to None.

        Returns:
            A list of FaceRecognitionSchema instances.
        """
        image_path, entity_name = result.info["image_path"], result.info["entity_name"]

        data = [
            cls(
                id=str(uuid.uuid4()),  # Generate a unique ID for each entry
                vector=np.array(res["embedding"], dtype=np.float32),  # Convert embedding to a NumPy array with float32 dtype
                image_path=image_path,  # Set the image path
                entity_name=entity_name,  # Set the entity name, or use the default
                bbox=np.array(res["bbox"], dtype=np.float32)  # Convert bounding box to a NumPy array with float32 dtype
            )
            for res in result.results if "embedding" in res
        ]
        return data

In [13]:
# Load the configuration
import lancedb
# Connect to the LanceDB database
db = lancedb.connect(uri=uri)

# Check if the table exists, create if not
if table_name not in db.table_names():
    """Create a new table in the database."""
    tbl = db.create_table(table_name, schema=FaceRecognitionSchema)
else:
    """Open an existing table in the database."""
    tbl = db.open_table(table_name)
    schema_fields = [field.name for field in tbl.schema]
    if schema_fields != list(FaceRecognitionSchema.model_fields.keys()):
        raise RuntimeError(
            f"Table {table_name} has a different schema."
        )

In [14]:
num_entities =  0 # Count the number of entities
# Process images in batches
for det_result in face_det_kypts_model.predict_batch(image_generator(input_path, identity_name=None)):
    for result in det_result.results:
        landmarks = [landmark["landmark"] for landmark in result["landmarks"]]
        aligned_img, _ = align_and_crop(det_result.image, landmarks)
        face_reid_inference_result = face_reid_model(aligned_img)
        result["embedding"] = face_reid_inference_result.results[0]["data"][0]
        
    # Format data for the FaceRecognitionSchema
    data = FaceRecognitionSchema.format_data(det_result)
    if len(data) > 0:
        # Add the LanceSchema data to the table
        tbl.add(data=data)
    num_entities+=len(data)

# Prints the number of entities added to the table
print (f"Successfully added {num_entities} entities to the {table_name} table.")   
# Prints the total number of entities in the table
print(f"{table_name} table contains a total of {tbl.count_rows()} entities.")

Successfully added 19 entities to the face table.
face table contains a total of 19 entities.


**Face Identification/Recognition**

This typically involves comparing a face captured in an image or video against a database of known faces. The goal is to match the query face with one in the database to either identify the person or authenticate their identity.

Search parameters: 

1. **Top-K** - The Top-K parameter defines the number of closest or most relevant results to return from a search query. In the context of face recognition, this often means retrieving the top K most similar face embeddings to a given query face embedding from the database.
2. **Field_name** - Field_name refers to the specific field or column within the database that will be searched. This could refer to attributes like facial embeddings, identity names, timestamps, or other metadata associated with the faces in the database.
3. **Metric type** - The Metric Type defines the similarity measure used to compare the face embeddings in the database during the search process. It is a critical parameter for determining how the system calculates the "closeness" or "similarity" between faces.


In [15]:
top_k = 1
field_name = "vector"
metric_type = "cosine"

In [16]:
def process_face_result(tbl, result, field_name, metric_type, top_k, threshold=0.3):
    """Process the face result: perform database search, calculate distance, and assign label."""
    for i, res in enumerate(result.results):
        # Perform database search
        search_result = (
            tbl.search(
                np.array(res["embedding"]).astype(np.float32),
                vector_column_name=field_name,
            )
            .metric(metric_type)
            .limit(top_k)
            .to_list()
        )

        # Calculate distance and assign label
        distance = round(1 - search_result[0]["_distance"], 2)
        if distance >= threshold:
            res["label"] = search_result[0]["entity_name"]
        else:
            res["label"] = "Unknown"

        res["score"] = distance

        # Clean up unnecessary fields
        result.results[i].pop("landmarks", None)
        result.results[i].pop("embedding", None)
    return result

In [17]:
# db = lancedb.connect(uri=uri)
if table_name in db.table_names():
    tbl = db.open_table(table_name)
    schema_fields = [field.name for field in tbl.schema]
    if schema_fields != list(FaceRecognitionSchema.model_fields.keys()):
        raise RuntimeError(f"Table {table_name} has a different schema.")

In [18]:
det_result = face_det_kypts_model.predict(image_source) 
image = det_result.image
for result in det_result.results:
    landmarks = [landmark["landmark"] for landmark in result["landmarks"]]
    aligned_img, _ = align_and_crop(det_result.image, landmarks)
    face_reid_inference_result = face_reid_model(aligned_img)
    result["embedding"] = face_reid_inference_result.results[0]["data"][0]
    
search_result = process_face_result(tbl, det_result, field_name, metric_type, top_k)
        

## Display

In [19]:
from degirum_tools.ui_support import Display
win_name = f"Annotated Image"
display = Display(win_name)
img = search_result.image_overlay
display.show(img)
# Wait for the user to press a key
while True:
    key = cv2.waitKey(1) & 0xFF  # Wait for key press
    if key == ord('x') or key == ord('q'):
        break  # Exit the loop if 'x' or 'q' is pressed

cv2.destroyAllWindows()  # Close the window