In [None]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.model_selection import train_test_split
from mtcnn import MTCNN
import random
from pathlib import Path

In [None]:
from tqdm import tqdm
import os
import cv2
from mtcnn import MTCNN

def detect_and_crop_faces(lfw_path, output_path, size=(100, 100)):
    detector = MTCNN()
    os.makedirs(output_path, exist_ok=True)
    
    # Iterate over each person folder with tqdm
    for person_folder in tqdm(os.listdir(lfw_path), desc="Processing persons", unit="person"):
        person_path = os.path.join(lfw_path, person_folder)
        if not os.path.isdir(person_path):
            continue
            
        output_person_path = os.path.join(output_path, person_folder)
        os.makedirs(output_person_path, exist_ok=True)
        
        # Iterate over images within each person folder with tqdm
        for img_name in tqdm(os.listdir(person_path), desc=f"Processing {person_folder}", unit="image"):
            img_path = os.path.join(person_path, img_name)
            img = cv2.imread(img_path)
            if img is None:
                continue
            img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            
            # Detect faces
            faces = detector.detect_faces(img_rgb)
            if len(faces) == 0:
                continue
                
            # Get the first face
            face = faces[0]
            x, y, w, h = face['box']
            
            # Crop and resize
            x = max(0, x)
            y = max(0, y)
            face_img = img_rgb[y:y+h, x:x+w]
            face_img = cv2.resize(face_img, size)
            
            # Save cropped image
            output_img_path = os.path.join(output_person_path, img_name)
            cv2.imwrite(output_img_path, cv2.cvtColor(face_img, cv2.COLOR_RGB2BGR))

# Run face detection
lfw_path = "lfw"  # Update with your LFW dataset path
cropped_path = "lfw_cropped"
print("Detecting and cropping faces...")
detect_and_crop_faces(lfw_path, cropped_path)
print("Face detection and cropping completed.")


In [None]:
def create_triplet_data(cropped_path, min_images=2):
    triplets = []
    labels = []
    
    for person in os.listdir(cropped_path):
        person_path = os.path.join(cropped_path, person)
        images = [os.path.join(person_path, img) for img in os.listdir(person_path)]
        
        if len(images) < min_images:
            continue
            
        labels.append(person)
        
        # Create triplets
        for i in range(len(images)):
            anchor = images[i]
            positive = random.choice([img for img in images if img != anchor])
            
            # Choose negative from different person
            other_labels = [l for l in labels if l != person]
            if not other_labels:
                continue
            negative_person = random.choice(other_labels)
            negative_path = os.path.join(cropped_path, negative_person)
            negative = random.choice([os.path.join(negative_path, img) 
                                   for img in os.listdir(negative_path)])
            
            triplets.append((anchor, positive, negative))
    
    return triplets

# Create triplets
print("Creating triplet dataset...")
triplets = create_triplet_data(cropped_path)
if not triplets:
    print("No valid triplets found. Check dataset and minimum image requirements.")
else:
    print(f"Created {len(triplets)} triplets.")

In [None]:
def load_and_preprocess_image(img_path):
    img = cv2.imread(img_path)
    if img is None:
        return np.zeros((100, 100, 3))  # Return zero array for invalid images
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = img / 255.0  # Normalize to [0,1]
    return img

def triplet_generator(triplets, batch_size=32):
    def gen():
        while True:
            batch_indices = np.random.choice(len(triplets), batch_size)
            anchors, positives, negatives = [], [], []
            
            for idx in batch_indices:
                anchor_path, positive_path, negative_path = triplets[idx]
                anchors.append(load_and_preprocess_image(anchor_path))
                positives.append(load_and_preprocess_image(positive_path))
                negatives.append(load_and_preprocess_image(negative_path))
            
            yield (
                (
                    np.array(anchors, dtype=np.float32),
                    np.array(positives, dtype=np.float32),
                    np.array(negatives, dtype=np.float32)
                ),
                np.zeros(batch_size, dtype=np.float32)
            )
    
    # Define output signature
    output_signature = (
        (
            tf.TensorSpec(shape=(batch_size, 100, 100, 3), dtype=tf.float32),
            tf.TensorSpec(shape=(batch_size, 100, 100, 3), dtype=tf.float32),
            tf.TensorSpec(shape=(batch_size, 100, 100, 3), dtype=tf.float32)
        ),
        tf.TensorSpec(shape=(batch_size,), dtype=tf.float32)
    )
    
    # Create dataset from generator
    return tf.data.Dataset.from_generator(
        gen,
        output_signature=output_signature
    )

# Split dataset
train_triplets, val_triplets = train_test_split(triplets, test_size=0.2, random_state=42)
print(f"Training triplets: {len(train_triplets)}, Validation triplets: {len(val_triplets)}")

In [None]:
def create_base_network(input_shape=(100, 100, 3)):
    model = models.Sequential([
        layers.Conv2D(64, (3, 3), activation='relu', input_shape=input_shape),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(128, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(128, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(256, (3, 3), activation='relu'),
        layers.Flatten(),
        layers.Dense(512, activation='relu'),
        layers.Dense(128, activation=None)  # Embedding layer
    ])
    return model

In [None]:
def triplet_loss(y_true, y_pred, alpha=0.2):
    anchor, positive, negative = y_pred[:, :128], y_pred[:, 128:256], y_pred[:, 256:]
    
    # Cosine distance
    pos_dist = 1 - tf.reduce_sum(tf.multiply(anchor, positive), axis=-1) / (
        tf.sqrt(tf.reduce_sum(tf.square(anchor), axis=-1)) * 
        tf.sqrt(tf.reduce_sum(tf.square(positive), axis=-1))
    )
    neg_dist = 1 - tf.reduce_sum(tf.multiply(anchor, negative), axis=-1) / (
        tf.sqrt(tf.reduce_sum(tf.square(anchor), axis=-1)) * 
        tf.sqrt(tf.reduce_sum(tf.square(negative), axis=-1))
    )
    
    loss = tf.maximum(pos_dist - neg_dist + alpha, 0.0)
    return tf.reduce_mean(loss)

In [None]:
def create_siamese_model(input_shape=(100, 100, 3)):
    base_network = create_base_network(input_shape)
    
    input_anchor = layers.Input(shape=input_shape)
    input_positive = layers.Input(shape=input_shape)
    input_negative = layers.Input(shape=input_shape)
    
    processed_anchor = base_network(input_anchor)
    processed_positive = base_network(input_positive)
    processed_negative = base_network(input_negative)
    
    # Normalize embeddings
    processed_anchor = layers.Lambda(lambda x: tf.math.l2_normalize(x, axis=1))(processed_anchor)
    processed_positive = layers.Lambda(lambda x: tf.math.l2_normalize(x, axis=1))(processed_positive)
    processed_negative = layers.Lambda(lambda x: tf.math.l2_normalize(x, axis=1))(processed_negative)
    
    merged_output = layers.Concatenate(axis=-1)([processed_anchor, processed_positive, processed_negative])
    
    model = models.Model(
        inputs=[input_anchor, input_positive, input_negative],
        outputs=merged_output
    )
    
    model.compile(optimizer='adam', loss=triplet_loss)
    return model, base_network

# Create model
print("Creating model...")
model, base_network = create_siamese_model()
model.summary()

In [None]:
batch_size = 32
steps_per_epoch = len(train_triplets) // batch_size
validation_steps = len(val_triplets) // batch_size

print("Training model...")
history = model.fit(
    triplet_generator(train_triplets, batch_size),
    steps_per_epoch=steps_per_epoch,
    validation_data=triplet_generator(val_triplets, batch_size),
    validation_steps=validation_steps,
    epochs=10,
    verbose=1
)

# Save models
model.save('siamese_model.h5')
base_network.save('base_network.h5')
print("Models saved successfully.")

In [None]:
import matplotlib.pyplot as plt

plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import load_model
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
import random
from pathlib import Path

In [None]:
# Load the base network
base_network = load_model('base_network.h5')
print("Base network loaded successfully.")

# Function to load and preprocess image (same as training)
def load_and_preprocess_image(img_path):
    img = cv2.imread(img_path)
    if img is None:
        return np.zeros((100, 100, 3))  # Return zero array for invalid images
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = img / 255.0  # Normalize to [0,1]
    return img

In [None]:
def create_test_pairs(cropped_path, num_pairs=500):
    pairs = []
    labels = []  # 1 for same person, 0 for different person
    person_folders = [f for f in os.listdir(cropped_path) if os.path.isdir(os.path.join(cropped_path, f))]
    
    # Create positive pairs (same person)
    for _ in range(num_pairs // 2):
        person = random.choice(person_folders)
        person_path = os.path.join(cropped_path, person)
        images = [os.path.join(person_path, img) for img in os.listdir(person_path)]
        if len(images) < 2:
            continue
        img1, img2 = random.sample(images, 2)
        pairs.append((img1, img2))
        labels.append(1)
    
    # Create negative pairs (different people)
    for _ in range(num_pairs // 2):
        person1, person2 = random.sample(person_folders, 2)
        person1_path = os.path.join(cropped_path, person1)
        person2_path = os.path.join(cropped_path, person2)
        img1 = random.choice([os.path.join(person1_path, img) for img in os.listdir(person1_path)])
        img2 = random.choice([os.path.join(person2_path, img) for img in os.listdir(person2_path)])
        pairs.append((img1, img2))
        labels.append(0)
    
    return pairs, labels

# Generate test pairs
cropped_path = "lfw_cropped"
print("Creating test pairs...")
test_pairs, test_labels = create_test_pairs(cropped_path, num_pairs=500)
print(f"Created {len(test_pairs)} test pairs ({sum(test_labels)} positive, {len(test_labels) - sum(test_labels)} negative).")

In [None]:
def compute_cosine_distance(emb1, emb2):
    # Cosine distance = 1 - cosine similarity
    dot_product = np.sum(emb1 * emb2, axis=-1)
    norm1 = np.sqrt(np.sum(emb1 * emb1, axis=-1))
    norm2 = np.sqrt(np.sum(emb2 * emb2, axis=-1))
    cosine_similarity = dot_product / (norm1 * norm2 + 1e-10)  # Avoid division by zero
    return 1 - cosine_similarity

# Compute distances for test pairs
distances = []
print("Computing embeddings and distances...")
for img1_path, img2_path in test_pairs:
    # Load and preprocess images
    img1 = load_and_preprocess_image(img1_path)
    img2 = load_and_preprocess_image(img2_path)
    
    # Get embeddings
    emb1 = base_network.predict(np.expand_dims(img1, axis=0), verbose=0)
    emb2 = base_network.predict(np.expand_dims(img2, axis=0), verbose=0)
    
    # Compute cosine distance
    distance = compute_cosine_distance(emb1, emb2)
    distances.append(distance)

distances = np.array(distances)
print("Distance computation completed.")

In [None]:
def evaluate_accuracy(distances, labels, threshold=0.5):
    # Predict same person if distance < threshold
    predictions = (distances < threshold).astype(int)
    return accuracy_score(labels, predictions)

# Evaluate with a default threshold
threshold = 0.5
accuracy = evaluate_accuracy(distances, test_labels, threshold)
print(f"Accuracy with threshold {threshold}: {accuracy:.4f}")

# Try multiple thresholds to find the best
thresholds = np.arange(0.1, 1.0, 0.1)
accuracies = [evaluate_accuracy(distances, test_labels, t) for t in thresholds]
best_threshold = thresholds[np.argmax(accuracies)]
best_accuracy = max(accuracies)
print(f"Best threshold: {best_threshold:.2f}, Best accuracy: {best_accuracy:.4f}")

In [None]:
# Separate distances by label
positive_distances = distances[np.array(test_labels) == 1]
negative_distances = distances[np.array(test_labels) == 0]

# Plot histogram
plt.figure(figsize=(10, 6))
plt.hist(positive_distances, bins=30, alpha=0.5, label='Positive Pairs (Same Person)', color='blue')
plt.hist(negative_distances, bins=30, alpha=0.5, label='Negative Pairs (Different Person)', color='red')
plt.axvline(x=best_threshold, color='green', linestyle='--', label=f'Best Threshold ({best_threshold:.2f})')
plt.title('Distribution of Cosine Distances')
plt.xlabel('Cosine Distance')
plt.ylabel('Frequency')
plt.legend()
plt.show()

# test model

In [None]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import load_model
from mtcnn import MTCNN
import pandas as pd
from datetime import datetime
import random
from pathlib import Path

# Load the base network
base_network = load_model('base_network.h5')
print("Base network loaded successfully.")

In [None]:
# Preprocess image
def load_and_preprocess_image(img_path):
    img = cv2.imread(img_path)
    if img is None:
        return np.zeros((100, 100, 3))  # Return zero array for invalid images
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = img / 255.0  # Normalize to [0,1]
    return img

# Compute cosine distance
def compute_cosine_distance(emb1, emb2):
    dot_product = np.sum(emb1 * emb2, axis=-1)
    norm1 = np.sqrt(np.sum(emb1 * emb1, axis=-1))
    norm2 = np.sqrt(np.sum(emb2 * emb2, axis=-1))
    cosine_similarity = dot_product / (norm1 * norm2 + 1e-10)
    return (1 - cosine_similarity).item()

# Detect and crop face
def detect_and_crop_face(image, size=(100, 100)):
    detector = MTCNN()
    img_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    faces = detector.detect_faces(img_rgb)
    if len(faces) == 0:
        return None
    face = faces[0]
    x, y, w, h = face['box']
    x, y = max(0, x), max(0, y)
    face_img = img_rgb[y:y+h, x:x+w]
    face_img = cv2.resize(face_img, size)
    return face_img / 255.0  # Normalize to [0,1]

# Log attendance
def log_attendance(person, csv_path="attendance.csv"):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    entry = {"Timestamp": timestamp, "Person": person}
    df = pd.DataFrame([entry])
    if os.path.exists(csv_path):
        df.to_csv(csv_path, mode='a', header=False, index=False)
    else:
        df.to_csv(csv_path, mode='w', header=True, index=False)
    print(f"Attendance logged for {person} at {timestamp}")

In [None]:
def capture_images(person_name, output_dir="new_person_images", num_images=5):
    os.makedirs(output_dir, exist_ok=True)
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Error: Could not open webcam.")
        return
    
    print(f"Capturing {num_images} images for {person_name}. Press 'c' to capture, 'q' to quit.")
    captured = 0
    
    while captured < num_images:
        ret, frame = cap.read()
        if not ret:
            print("Error: Could not read frame.")
            break
        
        cv2.imshow("Capture Image", frame)
        key = cv2.waitKey(1) & 0xFF
        
        if key == ord('c'):
            img_path = os.path.join(output_dir, f"{person_name}_{captured+1}.jpg")
            cv2.imwrite(img_path, frame)
            print(f"Saved image: {img_path}")
            captured += 1
        elif key == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

# Capture images for the new person
new_person_name = "amin"  # Replace with the new person's name
capture_images(new_person_name, num_images=5)

In [None]:
def crop_and_save_faces(input_dir, person_name, output_dir="face_database", size=(100, 100)):
    detector = MTCNN()
    person_output_path = os.path.join(output_dir, person_name)
    os.makedirs(person_output_path, exist_ok=True)
    
    print(f"Processing images for {person_name}...")
    for img_name in os.listdir(input_dir):
        img_path = os.path.join(input_dir, img_name)
        img = cv2.imread(img_path)
        if img is None:
            continue
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        # Detect faces
        faces = detector.detect_faces(img_rgb)
        if len(faces) == 0:
            print(f"No face detected in {img_name}")
            continue
        
        # Crop and resize
        face = faces[0]
        x, y, w, h = face['box']
        x, y = max(0, x), max(0, y)
        face_img = img_rgb[y:y+h, x:x+w]
        face_img = cv2.resize(face_img, size)
        
        # Save cropped image
        output_img_path = os.path.join(person_output_path, img_name)
        cv2.imwrite(output_img_path, cv2.cvtColor(face_img, cv2.COLOR_RGB2BGR))
        print(f"Saved cropped image: {output_img_path}")

# Crop images for the new person
input_dir = "new_person_images"
face_database_path = "face_database"
crop_and_save_faces(input_dir, new_person_name, face_database_path)

In [None]:
def build_embedding_database(face_db_path, max_images_per_person=5):
    database = {}
    print("Building embedding database...")
    
    for person in os.listdir(face_db_path):
        person_path = os.path.join(face_db_path, person)
        if not os.path.isdir(person_path):
            continue
        images = [os.path.join(person_path, img) for img in os.listdir(person_path)]
        if not images:
            continue
        images = random.sample(images, min(len(images), max_images_per_person))
        embeddings = []
        for img_path in images:
            img = load_and_preprocess_image(img_path)
            emb = base_network.predict(np.expand_dims(img, axis=0), verbose=0)
            embeddings.append(emb[0])
        database[person] = np.array(embeddings)
        print(f"Added {person} with {len(embeddings)} embeddings")
    
    return database

def add_person_to_database(person_name, face_db_path, database, max_images=5):
    person_path = os.path.join(face_db_path, person_name)
    if not os.path.exists(person_path):
        print(f"Error: No images found for {person_name} in {face_db_path}")
        return database
    
    images = [os.path.join(person_path, img) for img in os.listdir(person_path)]
    if not images:
        print(f"No valid images for {person_name}")
        return database
    
    images = random.sample(images, min(len(images), max_images))
    embeddings = []
    for img_path in images:
        img = load_and_preprocess_image(img_path)
        emb = base_network.predict(np.expand_dims(img, axis=0), verbose=0)
        embeddings.append(emb[0])
    database[person_name] = np.array(embeddings)
    print(f"Added {person_name} with {len(embeddings)} embeddings")
    
    return database

# Build or update database
face_database_path = "face_database"
if 'embedding_database' not in globals():
    embedding_database = build_embedding_database(face_database_path)
else:
    embedding_database = add_person_to_database(new_person_name, face_database_path, embedding_database)
print(f"Database contains {len(embedding_database)} individuals.")

In [None]:
def identify_person(image, database, threshold=0.5):
    face_img = detect_and_crop_face(image)
    if face_img is None:
        return None, None
    
    emb = base_network.predict(np.expand_dims(face_img, axis=0), verbose=0)[0]
    
    min_distance = float('inf')
    identified_person = None
    
    for person, embeddings in database.items():
        for db_emb in embeddings:
            distance = compute_cosine_distance(emb, db_emb)
            if distance < min_distance:
                min_distance = distance
                identified_person = person
    
    if min_distance > threshold:
        return None, min_distance
    return identified_person, min_distance

# Test with a new image
test_img_path = os.path.join("new_person_images", f"{new_person_name}_1.jpg")
if os.path.exists(test_img_path):
    test_img = cv2.imread(test_img_path)
    person, distance = identify_person(test_img, embedding_database, threshold=0.5)
    print(f"Test image identified as: {person}, Distance: {distance:.4f}")
    if person == new_person_name:
        log_attendance(person)
    else:
        print("Identification failed. Attendance not logged.")
else:
    print(f"Test image not found: {test_img_path}")

In [None]:
def real_time_attendance(database, threshold=0.5, csv_path="attendance.csv"):
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Error: Could not open webcam.")
        return
    
    print("Press 'q' to quit webcam.")
    last_logged = {}
    
    while True:
        ret, frame = cap.read()
        if not ret:
            print("Error: Could not read frame.")
            break
        
        person, distance = identify_person(frame, database, threshold)
        
        if person:
            text = f"{person} (Dist: {distance:.2f})"
            cv2.putText(frame, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            current_time = datetime.now()
            if person not in last_logged or (current_time - last_logged[person]).total_seconds() > 60:
                log_attendance(person, csv_path)
                last_logged[person] = current_time
        else:
            cv2.putText(frame, "Unknown", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
        
        cv2.imshow("Attendance System", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

# Test real-time recognition
real_time_attendance(embedding_database, threshold=0.5)