In [22]:
import os
import cv2
import numpy as np
from insightface.app import FaceAnalysis
from datetime import datetime
import random
import string


# Initialize FaceAnalysis model
app = FaceAnalysis(name="buffalo_l")
app.prepare(ctx_id=0, det_size=(640, 640))

# Paths

currentDir = os.getcwd()
input_folder = currentDir + "\\faceset"
dataset_folder = currentDir + "\\dataset"
os.makedirs(dataset_folder, exist_ok=True)  # Create dataset folder if not exists

MAX_IMAGES_PER_FOLDER = 20  # Limit of stored images per person


# Function to compute cosine similarity
def cosine_similarity(embedding1, embedding2):
    return np.dot(embedding1, embedding2) / (np.linalg.norm(embedding1) * np.linalg.norm(embedding2))


# Function to check if an embedding already exists in dataset
def find_matching_folder(new_embedding, threshold=0.6):
    """Compare new embedding with all embeddings in dataset folders."""
    for folder in sorted(os.listdir(dataset_folder)):
        folder_path = os.path.join(dataset_folder, folder)
        if not os.path.isdir(folder_path):
            continue

        # Check all embeddings inside the embedding folder
        emb_folder = os.path.join(folder_path, "embedding")
        if not os.path.exists(emb_folder):
            continue

        for file in os.listdir(emb_folder):
            if file.endswith(".bin"):
                emb_path = os.path.join(emb_folder, file)
                stored_embedding = np.fromfile(emb_path, dtype=np.float32)

                similarity = cosine_similarity(new_embedding, stored_embedding)
                if similarity > threshold:
                    return folder_path  # Return the matching folder path

    return None  # No match found


# Function to get the next available folder name (P000X)
def get_next_folder_name():
    existing_folders = [f for f in os.listdir(dataset_folder) if f.startswith("P")]
    if not existing_folders:
        return "P0001"  # First folder

    # Find the highest folder number and increment
    existing_folders.sort()
    last_folder = existing_folders[-1]
    next_number = int(last_folder[1:]) + 1
    return f"P{next_number:04d}"


# Function to get the next sequential filename (01, 02, ..., 20)
def get_next_sequential_filename(folder):
    existing_files = [f for f in os.listdir(folder) if f.endswith(".jpg")]
    existing_numbers = sorted(
        [int(f.split('.')[0]) for f in existing_files if f.split('.')[0].isdigit()]
    )

    if not existing_numbers:
        return "01"  # First file

    next_number = existing_numbers[-1] + 1
    return f"{next_number:02d}" if next_number <= MAX_IMAGES_PER_FOLDER else None


# Function to generate a unique filename (same for appear & embedding)
def generate_unique_filename():
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    random_str = ''.join(random.choices(string.ascii_letters + string.digits, k=5))
    return f"{timestamp}_{random_str}"


# Process each image in the input folder
for filename in os.listdir(input_folder):
    if not filename.lower().endswith((".jpg", ".png", ".jpeg")):
        continue  # Skip non-image files

    img_path = os.path.join(input_folder, filename)
    img = cv2.imread(img_path)

    if img is None:
        print(f"Error loading image: {filename}")
        continue

    # Detect faces using insightface
    faces = app.get(img)

    for idx, face in enumerate(faces):
        bbox = face.bbox.astype(int)  # Bounding box coordinates
        embedding = face.normed_embedding  # Extracted embedding

        # Crop the face based on detected bounding box
        startX, startY, endX, endY = bbox
        cropped_face = img[startY:endY, startX:endX]

        # Skip if face is too small
        if cropped_face.shape[0] == 0 or cropped_face.shape[1] == 0:
            continue

        # Check if the embedding exists in dataset
        matched_folder = find_matching_folder(embedding)

        if matched_folder:
            target_folder = matched_folder  # Use existing folder
        else:
            # Create a new folder
            new_folder = get_next_folder_name()
            target_folder = os.path.join(dataset_folder, new_folder)
            os.makedirs(target_folder, exist_ok=True)

        # Create "appear" and "embedding" folders inside the target folder
        appear_folder = os.path.join(target_folder, "appear")
        embedding_folder = os.path.join(target_folder, "embedding")
        os.makedirs(appear_folder, exist_ok=True)
        os.makedirs(embedding_folder, exist_ok=True)

        # Check if we can save more images in the main folder
        save_allowed = len([f for f in os.listdir(target_folder) if f.endswith(".jpg")]) < MAX_IMAGES_PER_FOLDER

        if save_allowed:
            # Get next filename (01, 02, ... 20)
            next_filename = get_next_sequential_filename(target_folder)
            if next_filename is not None:
                face_save_path = os.path.join(target_folder, f"{next_filename}.jpg")
                cv2.imwrite(face_save_path, cropped_face)

        # Generate a unique filename for appear & embedding (same name for both)
        unique_filename = generate_unique_filename()

        # Save embedding in the embedding folder
        emb_save_path = os.path.join(embedding_folder, f"{unique_filename}.bin")
        embedding.astype(np.float32).tofile(emb_save_path)

        # Save cropped face in appear folder
        appear_save_path = os.path.join(appear_folder, f"{unique_filename}.jpg")
        cv2.imwrite(appear_save_path, cropped_face)

print("Processing completed!")


Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\vohungvi/.insightface\models\buffalo_l\1k3d68.onnx landmark_3d_68 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\vohungvi/.insightface\models\buffalo_l\2d106det.onnx landmark_2d_106 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\vohungvi/.insightface\models\buffalo_l\det_10g.onnx detection [1, 3, '?', '?'] 127.5 128.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\vohungvi/.insightface\models\buffalo_l\genderage.onnx genderage ['None', 3, 96, 96] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\vohungvi/.insightface\models\buffalo_l\w600k_r50.onnx recognition ['None', 3,