In [1]:
import cv2
import numpy as np
import os
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Input
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.decomposition import PCA
from sklearn.neighbors import NearestNeighbors
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import pickle

In [2]:
# preprocessing of dataset

def get_base_model():
    base = MobileNetV2(weights='imagenet', include_top=False, input_tensor=Input(shape=(256, 256, 3)))
    x = GlobalAveragePooling2D()(base.output)
    model = Model(inputs=base.input, outputs=x)
    return model

def preprocess_face(frame, face_cascade, size=(256, 256)):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, 1.3, 5)
    processed_faces = []

    for (x, y, w, h) in faces:
        margin = int(0.2 * w)
        x1 = max(0, x - margin)
        y1 = max(0, y - margin)
        x2 = min(frame.shape[1], x + w + margin)
        y2 = min(frame.shape[0], y + h + margin)
        face_img = frame[y1:y2, x1:x2]
        face_resized = cv2.resize(face_img, size)
        processed_faces.append((face_resized, (x1, y1, x2, y2)))
    return processed_faces

In [3]:
# capturing data through webcam for creating dataset for each individual

def collect_data(data_dir="dataset", label="person"):
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml")
    os.makedirs(os.path.join(data_dir, label), exist_ok=True)

    cap = cv2.VideoCapture(0)
    count = 0
    print("[INFO] Press 'q' to stop capturing...")

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        faces = preprocess_face(frame, face_cascade)
        for face_img, (x1, y1, x2, y2) in faces:
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            filename = os.path.join(data_dir, label, f"{count}.jpg")
            cv2.imwrite(filename, face_img)
            count += 1

        cv2.imshow("Collecting Faces", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()
    print(f"[INFO] Collected {count} images for '{label}'")

In [4]:
# creating the model for training

def create_model(num_classes):
    base_model = MobileNetV2(weights='imagenet', include_top=False, input_tensor=Input(shape=(256, 256, 3)))
    x = GlobalAveragePooling2D()(base_model.output)
    x = Dense(128, activation='relu')(x)
    out = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs=base_model.input, outputs=out)
    for layer in base_model.layers:
        layer.trainable = False
    model.compile(optimizer=Adam(0.0001), loss='categorical_crossentropy', metrics=['accuracy'])
    return model, base_model

In [5]:
# training model with data augmentation

def train_model(data_dir="dataset", model_save_path="face_model.h5"):
    datagen = ImageDataGenerator(
        preprocessing_function=preprocess_input,
        validation_split=0.2,
        rotation_range=15,
        zoom_range=0.2,
        horizontal_flip=True,
    )

    train_gen = datagen.flow_from_directory(
        data_dir, target_size=(256, 256), batch_size=16, subset="training"
    )
    val_gen = datagen.flow_from_directory(
        data_dir, target_size=(256, 256), batch_size=16, subset="validation"
    )

    model, base_model = create_model(num_classes=train_gen.num_classes)

    history = model.fit(train_gen, validation_data=val_gen, epochs=5)
    model.save(model_save_path)
    print("[INFO] Model saved as", model_save_path)
    return model, base_model, history, val_gen

In [6]:
def plot_training_history(history):
    plt.figure(figsize=(12, 5))

    # Accuracy Plot
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Train Accuracy', linewidth=2)
    plt.plot(history.history['val_accuracy'], label='Val Accuracy', linewidth=2)
    plt.title('Accuracy vs Epochs', fontsize=14)
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True)

    # Loss Plot
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train Loss', linewidth=2)
    plt.plot(history.history['val_loss'], label='Val Loss', linewidth=2)
    plt.title('Loss vs Epochs', fontsize=14)
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.show()

    ######
    
    

In [7]:
# generation of embeddings

def generate_embeddings(base_model, data_dir="dataset", save_path="embeddings.pkl"):
    datagen = ImageDataGenerator(preprocessing_function=preprocess_input)
    gen = datagen.flow_from_directory(data_dir, target_size=(256, 256), class_mode='sparse', shuffle=False)

    embeddings = base_model.predict(gen)
    if len(embeddings.shape) > 2:
        embeddings = embeddings.reshape(embeddings.shape[0], -1)
    labels = gen.classes
    label_map = {v: k for k, v in gen.class_indices.items()}

    emb_dict = {"embeddings": embeddings, "labels": labels, "label_map": label_map}
    with open(save_path, "wb") as f:
        pickle.dump(emb_dict, f)

    print("[INFO] Saved embeddings to", save_path)
    return emb_dict

In [8]:
# graph for PCA (Principle Component Analysis) of dataset

def plot_embeddings(emb_dict):
    X = emb_dict["embeddings"]
    y = emb_dict["labels"]
    label_map = emb_dict["label_map"]

    if len(X.shape) > 2:
        X = X.reshape(X.shape[0], -1)
    pca = PCA(n_components=2)
    reduced = pca.fit_transform(X)
    plt.figure(figsize=(6, 6))
    for label in np.unique(y):
         plt.scatter(reduced[y == label, 0], reduced[y == label, 1],
                    label=label_map[label], alpha=0.7)
    plt.legend()
    plt.title("PCA of Face Embeddings")
    plt.show()

In [9]:
# output screen, detecting faces

def recognize_faces(base_model, emb_dict, threshold=0.5):
    from sklearn.neighbors import NearestNeighbors
    import cv2
    import numpy as np

    embeddings = emb_dict['embeddings']
    labels = emb_dict['labels']
    label_map = emb_dict['label_map']

    # Flatten stored embeddings before fitting NearestNeighbors
    if len(embeddings.shape) > 2:
        embeddings = embeddings.reshape(embeddings.shape[0], -1)

    nbrs = NearestNeighbors(n_neighbors=1, metric='cosine').fit(embeddings)
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml")

    cap = cv2.VideoCapture(0)
    print("[INFO] Starting real-time recognition... Press 'q' to quit.")

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        faces = preprocess_face(frame, face_cascade)
        for face_img, (x1, y1, x2, y2) in faces:
            img_array = np.expand_dims(preprocess_input(face_img), axis=0)
            emb = base_model.predict(img_array)

            if len(emb.shape) > 2:   # flatten embedding to match stored dimension
                emb = emb.reshape(emb.shape[0], -1)

            if emb.shape[1] != embeddings.shape[1]:    # resize if needed
                emb = np.resize(emb, (emb.shape[0], embeddings.shape[1]))

            dist, idx = nbrs.kneighbors(emb)
            label = label_map[labels[idx[0][0]]]

            if dist[0][0] < threshold:
                color, text = (0, 255, 0), f"Valid ({label})"
            else:
                color, text = (0, 0, 255), "Unknown"

            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(frame, text, (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)

        cv2.imshow('Recognition', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

In [10]:
def create_model(num_classes):
    base_model = MobileNetV2(weights='imagenet', include_top=False, input_tensor=Input(shape=(256, 256, 3)))
    x = GlobalAveragePooling2D()(base_model.output)
    x = Dense(128, activation='relu')(x)
    out = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs=base_model.input, outputs=out)
    for layer in base_model.layers:
        layer.trainable = False
    model.compile(optimizer=Adam(0.0001), loss='categorical_crossentropy', metrics=['accuracy'])
    
    # Create embedding model (without classification head)
    embedding_model = Model(inputs=base_model.input, outputs=base_model.output)
    
    return model, embedding_model


def train_model(data_dir="dataset", model_save_path="face_model.h5", embedding_model_save_path="embedding_model.h5"):
    datagen = ImageDataGenerator(
        preprocessing_function=preprocess_input,
        validation_split=0.2,
        rotation_range=15,
        zoom_range=0.2,
        horizontal_flip=True,
    )

    train_gen = datagen.flow_from_directory(
        data_dir, target_size=(256, 256), batch_size=16, subset="training"
    )
    val_gen = datagen.flow_from_directory(
        data_dir, target_size=(256, 256), batch_size=16, subset="validation"
    )

    model, embedding_model = create_model(num_classes=train_gen.num_classes)

    history = model.fit(train_gen, validation_data=val_gen, epochs=5)
    model.save(model_save_path)
    embedding_model.save(embedding_model_save_path)
    print("[INFO] Model saved as", model_save_path)
    print("[INFO] Embedding model saved as", embedding_model_save_path)
    return model, embedding_model, history, val_gen


def generate_embeddings(embedding_model, data_dir="dataset", save_path="embeddings.pkl"):
    datagen = ImageDataGenerator(preprocessing_function=preprocess_input)
    gen = datagen.flow_from_directory(data_dir, target_size=(256, 256), class_mode='sparse', shuffle=False)

    embeddings = embedding_model.predict(gen)
    if len(embeddings.shape) > 2:
        embeddings = embeddings.reshape(embeddings.shape[0], -1)
    labels = gen.classes
    label_map = {v: k for k, v in gen.class_indices.items()}

    emb_dict = {"embeddings": embeddings, "labels": labels, "label_map": label_map}
    with open(save_path, "wb") as f:
        pickle.dump(emb_dict, f)

    print("[INFO] Saved embeddings to", save_path)
    print(f"[INFO] Embedding shape: {embeddings.shape}")
    return emb_dict


def recognize_faces(embedding_model, emb_dict, threshold=0.5):
    from sklearn.neighbors import NearestNeighbors
    import cv2
    import numpy as np

    embeddings = emb_dict['embeddings']
    labels = emb_dict['labels']
    label_map = emb_dict['label_map']

    if len(embeddings.shape) > 2:
        embeddings = embeddings.reshape(embeddings.shape[0], -1)

    print(f"[DEBUG] Stored embeddings shape: {embeddings.shape}")
    
    nbrs = NearestNeighbors(n_neighbors=1, metric='cosine').fit(embeddings)
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml")

    cap = cv2.VideoCapture(0)
    print("[INFO] Starting real-time recognition... Press 'q' to quit.")

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        faces = preprocess_face(frame, face_cascade)
        for face_img, (x1, y1, x2, y2) in faces:
            img_array = np.expand_dims(preprocess_input(face_img), axis=0)
            emb = embedding_model.predict(img_array, verbose=0)

            if len(emb.shape) > 2:
                emb = emb.reshape(emb.shape[0], -1)

            print(f"[DEBUG] Live embedding shape: {emb.shape}, Stored shape: {embeddings.shape}")

            dist, idx = nbrs.kneighbors(emb)
            label = label_map[labels[idx[0][0]]]
            
            print(f"[DEBUG] Distance: {dist[0][0]:.4f}, Threshold: {threshold}, Label: {label}")

            if dist[0][0] < threshold:
                color, text = (0, 255, 0), f"Valid ({label})"
            else:
                color, text = (0, 0, 255), "Unknown"

            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(frame, text, (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)

        cv2.imshow('Recognition', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()


def main():
    print("\n========== Facial Recognition Pipeline ==========")
    print("1. Collect dataset from webcam")
    print("2. Train model with data augmentation")
    print("3. Generate & visualize embeddings")
    print("4. Run real-time recognition")
    print("=================================================\n")

    choice = input("Select step to start from (1-4): ")

    if choice == "1":
        label = input("Enter label/name for person: ")
        collect_data(data_dir="dataset", label=label)
        proceed = input("Proceed to training? (y/n): ")
        if proceed.lower() != "y":
            return

    emb_dict = None

    if choice in ["1", "2"]:
        model, embedding_model, history, val_gen = train_model("dataset")
        emb_dict = generate_embeddings(embedding_model, "dataset")
    else:
        try:
            from tensorflow.keras.models import load_model
            print("[INFO] Loading existing models...")
            model = load_model("face_model.h5", compile=False)
            model.compile(optimizer=Adam(0.0001), loss='categorical_crossentropy', metrics=['accuracy'])
            
            embedding_model = load_model("embedding_model.h5", compile=False)
            
            print("[INFO] Models loaded successfully")
        except Exception as e:
            print("[ERROR] Could not load models:", e)
            print("[INFO] Please run option 2 to train the model first")
            return

    try:
        if choice not in ["1", "2"]:
            import pickle
            print("[INFO] Loading saved embeddings...")
            with open("embeddings.pkl", "rb") as f:
                emb_dict = pickle.load(f)
            print("[INFO] Embeddings loaded successfully")
            print(f"[INFO] Loaded {len(emb_dict['embeddings'])} embeddings with shape {emb_dict['embeddings'].shape}")

            datagen = ImageDataGenerator(preprocessing_function=preprocess_input)
            gen = datagen.flow_from_directory("dataset", target_size=(256, 256), 
                                            class_mode='sparse', shuffle=False, batch_size=1)
            if len(gen.class_indices) != model.layers[-1].units:
                print("[ERROR] Model classes don't match dataset. Please retrain with option 2")
                return
    except Exception as e:
        print("[ERROR] Could not load embeddings:", e)
        print("[INFO] Please run option 2 to regenerate embeddings")
        return

    if choice in ["1", "2", "3"] and emb_dict is not None:
        plot_embeddings(emb_dict)

    print("\n[INFO] Starting real-time recognition...")
    recognize_faces(embedding_model, emb_dict)

    if choice in ["1", "2"] and 'history' in locals():
        plot_training_history(history)
        
        print("[INFO] Generating confusion matrix...")
        val_gen.reset()
        Y_pred = model.predict(val_gen)
        y_pred = np.argmax(Y_pred, axis=1)

        cm = confusion_matrix(val_gen.classes, y_pred)
        disp = ConfusionMatrixDisplay(confusion_matrix=cm, 
                                    display_labels=list(val_gen.class_indices.keys()))

        plt.figure(figsize=(8, 6))
        disp.plot(cmap='Blues', values_format='d')
        plt.title("Confusion Matrix - Validation Data", fontsize=14)
        plt.grid(False)
        plt.show()

In [12]:
if __name__ == "__main__":
    main()


1. Collect dataset from webcam
2. Train model with data augmentation
3. Generate & visualize embeddings
4. Run real-time recognition



Select step to start from (1-4):  4


[INFO] Loading existing models...
[INFO] Models loaded successfully
[INFO] Loading saved embeddings...
[INFO] Embeddings loaded successfully
[INFO] Loaded 901 embeddings with shape (901, 81920)
Found 901 images belonging to 3 classes.

[INFO] Starting real-time recognition...
[DEBUG] Stored embeddings shape: (901, 81920)
[INFO] Starting real-time recognition... Press 'q' to quit.
[DEBUG] Live embedding shape: (1, 81920), Stored shape: (901, 81920)
[DEBUG] Distance: 0.6545, Threshold: 0.5, Label: Rishav
[DEBUG] Live embedding shape: (1, 81920), Stored shape: (901, 81920)
[DEBUG] Distance: 0.6872, Threshold: 0.5, Label: Rishav
[DEBUG] Live embedding shape: (1, 81920), Stored shape: (901, 81920)
[DEBUG] Distance: 0.6629, Threshold: 0.5, Label: Rishav
[DEBUG] Live embedding shape: (1, 81920), Stored shape: (901, 81920)
[DEBUG] Distance: 0.5075, Threshold: 0.5, Label: Rishav
[DEBUG] Live embedding shape: (1, 81920), Stored shape: (901, 81920)
[DEBUG] Distance: 0.5132, Threshold: 0.5, Label: