In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
import random
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import cv2
from matplotlib import pyplot as plt
from mtcnn import MTCNN
from utils import count_file_and_folder, reset_folder

In [None]:
from utils import count_file_and_folder

folder_count, total_file_count = count_file_and_folder('./dataset')
print(f"\nNumber of folders: {folder_count}")
print(f"Total number of files: {total_file_count}")

## Preprocessing

In [None]:
from utils import reset_folder

reset_folder('./train_detected_faces')
print("Contents of train_detected_face cleared.")

In [None]:
from services.preprocessing import Preprocessing

image_paths = Preprocessing.get_image_paths('./dataset')
Preprocessing.detect_and_save(image_paths, './train_detected_faces/')

In [None]:
from utils import count_file_and_folder

folder_count, total_file_count = count_file_and_folder('./train_detected_faces')
print(f"\nNumber of folders: {folder_count}")
print(f"Total number of files: {total_file_count}")

## Extracting embeddings

In [None]:
import shutil
import os

folder_path = './extracted_embeddings'

# Check if the folder exists before deleting
if os.path.exists(folder_path):
  shutil.rmtree(folder_path)
  print(f"The folder {folder_path} has been deleted.")
else:
  print(f"The folder {folder_path} does not exist.")

In [None]:
from facenet_pytorch import InceptionResnetV1
from PIL import Image
from torchvision import transforms
from tqdm import tqdm

# Function to extract embeddings from a single folder with data augmentation
def extract_embeddings_from_folder(folder_path, face_embedder, device, output_directory):
  embeddings = {}
  data_transform = transforms.Compose([
      transforms.RandomHorizontalFlip(),
      transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
      transforms.RandomRotation(10),
      transforms.ToTensor()
  ])
  label = os.path.basename(folder_path)  # Get the label from the folder name

  for image_name in tqdm(os.listdir(folder_path), desc=f"Processing {folder_path}"):
    image_path = os.path.join(folder_path, image_name)
    try:
      img = Image.open(image_path)

      # Convert image to tensor
      img_tensor = transforms.ToTensor()(img).unsqueeze(0).float().to(device)

      # Calculate embedding
      embedding = face_embedder(img_tensor).squeeze().detach().cpu().numpy()
      embeddings[image_name] = embedding

      # Save the embedding for the original image
      output_emb_path = os.path.join(output_directory, f"{label}_{os.path.splitext(image_name)[0]}_embedding.npy")
      np.save(output_emb_path, embedding)

      # Apply data augmentation
      augmented_img = data_transform(img)

      # Convert augmented image to tensor
      img_tensor_augmented = augmented_img.unsqueeze(0).float().to(device)

      # Calculate embedding for the augmented image
      embedding_augmented = face_embedder(img_tensor_augmented).squeeze().detach().cpu().numpy()
      embeddings[f"{os.path.splitext(image_name)[0]}_augmented_embedding.npy"] = embedding_augmented
      output_emb_path_augmented = os.path.join(output_directory, f"{label}_{os.path.splitext(image_name)[0]}_augmented_embedding.npy")
      np.save(output_emb_path_augmented, embedding_augmented)
    except Exception as e:
      print(f"Error processing {image_name}: {str(e)}")

  return label, embeddings

input_directory = './train_detected_faces'
output_directory = './extracted_embeddings'

# Initialize InceptionResnetV1 for face embedding
device = 'cpu'
#device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
face_embedder = InceptionResnetV1(pretrained='vggface2').eval().to(device)

# Loop through each folder in the input directory
for folder_name in os.listdir(input_directory):
  folder_path = os.path.join(input_directory, folder_name)

  # Create subfolder in output_directory for the current label
  label_output_directory = os.path.join(output_directory, folder_name)
  os.makedirs(label_output_directory, exist_ok=True)

  # Extract embeddings from the current folder with data augmentation
  label, embeddings = extract_embeddings_from_folder(folder_path, face_embedder, device, label_output_directory)

print("Embeddings extraction complete.")


In [None]:
import os

# Directory path
output_directory = './extracted_embeddings'

# Get folder names and file counts
folders_and_counts = [(folder, len(os.listdir(os.path.join(output_directory, folder)))) for folder in os.listdir(output_directory)]

# Print results
for folder, count in folders_and_counts:
    print(f"Folder: {folder}, Number of Files: {count}")

In [None]:
import shutil

folder_path = './extracted_embeddingstest'

# Check if the folder exists before deleting
if os.path.exists(folder_path):
    shutil.rmtree(folder_path)
    print(f"The folder {folder_path} has been deleted.")
else:
    print(f"The folder {folder_path} does not exist.")

In [None]:
import shutil

source_folder = "./extracted_embeddings"
destination_folder = "./extracted_embeddingstest"

# Create the destination folder if it doesn't exist
os.makedirs(destination_folder, exist_ok=True)

# Iterate through each folder in the source directory
for folder_name in os.listdir(source_folder):
  folder_path = os.path.join(source_folder, folder_name)

  # Check if it's a directory
  if os.path.isdir(folder_path):
    # Create the corresponding folder in the destination directory
    destination_folder_path = os.path.join(destination_folder, folder_name)
    os.makedirs(destination_folder_path, exist_ok=True)

    # Get the list of files in the current folder
    files = os.listdir(folder_path)

    # Move the first two files that do not contain "augmented" in their names
    moved_files = 0
    for file_name in files:
      if "augmented" not in file_name:
        source_file_path = os.path.join(folder_path, file_name)
        destination_file_path = os.path.join(destination_folder_path, file_name)

        # Move the file (not copy)
        shutil.move(source_file_path, destination_file_path)
        moved_files += 1

        if moved_files == 2:
          break  # Break after moving two suitable files

print("Files moved successfully.")

## RECOGNIZING FACES-Using Distances between embeddings

In [None]:
# Function to calculate average embedding for each label
def calculate_average_embeddings(base_folder):
    average_embeddings = {}

    for label_folder in os.listdir(base_folder):
        label_path = os.path.join(base_folder, label_folder)
        if os.path.isdir(label_path):
            label_embeddings = []
            for file_name in os.listdir(label_path):
                file_path = os.path.join(label_path, file_name)
                # Load embeddings using your preferred method (e.g., np.load())
                embedding = np.load(file_path)
                label_embeddings.append(embedding)

            # Calculate average embedding for the label
            average_embedding = np.mean(label_embeddings, axis=0)
            average_embeddings[label_folder] = average_embedding

    return average_embeddings

# Function to recognize faces based on existing embeddings
def recognize_faces(test_folder, average_embeddings):
    predictions = []

    for label_folder in os.listdir(test_folder):
        label_path = os.path.join(test_folder, label_folder)
        if os.path.isdir(label_path):
            for file_name in os.listdir(label_path):
                file_path = os.path.join(label_path, file_name)

                # Load test embedding from the saved numpy file
                test_embedding = np.load(file_path)

                # Compare test embedding with averaged embeddings
                distances = {}
                for label, avg_embedding in average_embeddings.items():
                    distance = np.linalg.norm(test_embedding - avg_embedding)
                    distances[label] = distance

                # Predict the label with the minimum distance
                predicted_label = min(distances, key=distances.get)
                predictions.append((file_name, label_folder, predicted_label))

    return predictions

# folder paths
base_folder = './extracted_embeddings'
test_folder = './extracted_embeddingstest'

# Load existing average embeddings
average_embeddings = calculate_average_embeddings(base_folder)

# Recognize faces in the test folder
predictions = recognize_faces(test_folder, average_embeddings)

# Print the predictions
for file_name, actual_label, predicted_label in predictions:
    print(f"{file_name}: Actual Label - {actual_label}, Predicted Label - {predicted_label}")

In [None]:
from PIL import Image

# Function to calculate average embedding for each label
def calculate_average_embeddings(base_folder):
    average_embeddings = {}

    for label_folder in os.listdir(base_folder):
        label_path = os.path.join(base_folder, label_folder)
        if os.path.isdir(label_path):
            label_embeddings = []
            for file_name in os.listdir(label_path):
                file_path = os.path.join(label_path, file_name)
                # Load embeddings using your preferred method (e.g., np.load())
                embedding = np.load(file_path)
                label_embeddings.append(embedding)

            # Calculate average embedding for the label
            average_embedding = np.mean(label_embeddings, axis=0)
            average_embeddings[label_folder] = average_embedding

    return average_embeddings

# Function to recognize faces based on existing embeddings
def recognize_faces(test_folder, average_embeddings):
    predictions = []

    for label_folder in os.listdir(test_folder):
        label_path = os.path.join(test_folder, label_folder)
        if os.path.isdir(label_path):
            for file_name in os.listdir(label_path):
                file_path = os.path.join(label_path, file_name)

                # Load test embedding from the saved numpy file
                test_embedding = np.load(file_path)

                # Compare test embedding with averaged embeddings
                distances = {}
                for label, avg_embedding in average_embeddings.items():
                    distance = np.linalg.norm(test_embedding - avg_embedding)
                    distances[label] = distance

                # Predict the label with the minimum distance
                predicted_label = min(distances, key=distances.get)
                predictions.append((file_name, label_folder, predicted_label))

    return predictions

import random

# Function to display images with actual and predicted labels
def display_random_images(predictions, image_folder, num_images=40):
    selected_predictions = random.sample(predictions, min(num_images, len(predictions)))
    
    # Calculate the number of rows and columns based on the desired number of images
    num_rows = (num_images + 4) // 5  # Ensure at least 1 row
    num_cols = min(5, num_images)  # Maximum of 5 columns
    
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(15, 2 * num_rows))
    fig.subplots_adjust(hspace=0.5)  # Adjust the vertical spacing

    for i, (file_name, actual_label, predicted_label) in enumerate(selected_predictions):
        # Extract identifier from the file name
        identifier_start = file_name.find("_detected_face_") + len("_detected_face_")
        identifier_end = file_name.find("_", identifier_start)
        if identifier_end == -1:
            identifier = file_name[identifier_start:]
        else:
            identifier = file_name[identifier_start:identifier_end]

        # Construct the file path for the corresponding image
        image_name = f"detected_face_{identifier}.jpg"
        image_path = os.path.join(image_folder, actual_label, image_name)

        # Display the actual image along with labels
        try:
            image = Image.open(image_path)
            axes[i // 5, i % 5].imshow(image)
            axes[i // 5, i % 5].set_title(f"Actual: {actual_label}\nPredicted: {predicted_label}")
            axes[i // 5, i % 5].axis('off')

        except FileNotFoundError:
            print(f"Image not found for {identifier}. Skipping to the next one.")

    plt.show()

display_random_images(predictions, './train_detected_faces', num_images=40)

# folder paths
base_folder = './extracted_embeddings'
test_folder = './extracted_embeddingstest'
image_folder = './train_detected_faces'

# Load existing average embeddings
average_embeddings = calculate_average_embeddings(base_folder)

# Recognize faces in the test folder
predictions = recognize_faces(test_folder, average_embeddings)

In [None]:
def load_test_embeddings(test_folder):
    test_embeddings = []

    for label_folder in os.listdir(test_folder):
        label_path = os.path.join(test_folder, label_folder)
        if os.path.isdir(label_path):
            for file_name in os.listdir(label_path):
                file_path = os.path.join(label_path, file_name)

                # Load test embedding from the saved numpy file
                test_embedding = np.load(file_path)
                test_embeddings.append((file_name, label_folder, test_embedding))

    return test_embeddings

# folder paths
base_folder = './extracted_embeddings'
test_folder = './extracted_embeddingstest'

# Load existing average embeddings
average_embeddings = calculate_average_embeddings(base_folder)

# Load test embeddings
test_embeddings = load_test_embeddings(test_folder)

# Randomly pick two test embeddings
random_test_embeddings = random.sample(test_embeddings, 2)

# Calculate and print distances between the random test embeddings and all average embeddings
for file_name, label, test_embedding in random_test_embeddings:
    print(f"\nDistances for {file_name} ({label}) against Average Embeddings:")
    for avg_label, avg_embedding in average_embeddings.items():
        distance = np.linalg.norm(test_embedding - avg_embedding)
        print(f"Distance to {avg_label}: {distance}")

RECOGNIZING FACES - Classification model - Multilayered Neural Network

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from json import JSONEncoder
import json

# Disable CUDA
torch.cuda.is_available = lambda : False

# Set the device to CPU
device = torch.device("cpu")

# Load embeddings and labels
data_dir = "./extracted_embeddings"
embeddings = []
labels = []

for label_folder in os.listdir(data_dir):
    label_path = os.path.join(data_dir, label_folder)
    if os.path.isdir(label_path):
        label = label_folder  # Assuming the folder names are labels
        embeddings_per_label = [] 

        for file_name in os.listdir(label_path):
            file_path = os.path.join(label_path, file_name)
            if file_name.endswith(".npy"):
                embedding = np.load(file_path)
                embeddings_per_label.append(embedding)
                labels.append(label)

        embeddings.append(embeddings_per_label)

# Flatten the embeddings list
embeddings = [item for sublist in embeddings for item in sublist]

# Convert data to PyTorch tensors
X = torch.tensor(embeddings, dtype=torch.float32)
labels = np.array(labels)

# Use LabelEncoder to encode string labels into integers
label_encoder = LabelEncoder()
y = torch.tensor(label_encoder.fit_transform(labels), dtype=torch.long)

# Split the data into 80% training, 10% validation, and 10% test
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.2, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# Define an updated model with more layers
class UpdatedModel(nn.Module):
    def __init__(self, input_size, num_classes):
        super(UpdatedModel, self).__init__()
        self.fc1 = nn.Linear(input_size, 256)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(256, 128)
        self.relu2 = nn.ReLU()
        self.fc3 = nn.Linear(128, 64)
        self.relu3 = nn.ReLU()
        self.fc4 = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.fc2(x)
        x = self.relu2(x)
        x = self.fc3(x)
        x = self.relu3(x)
        x = self.fc4(x)
        return x

# Initialize the model
input_size = X_train.shape[1]
num_classes = len(set(y_train))
print(input_size, num_classes)
model = UpdatedModel(input_size, num_classes).to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 20
batch_size = 32

for epoch in range(num_epochs):
    model.train()
    for i in range(0, len(X_train), batch_size):
        inputs = X_train[i:i+batch_size].to(device)
        labels = y_train[i:i+batch_size].to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

    # Validation
    model.eval()
    with torch.no_grad():
        val_inputs = X_val.to(device)
        val_labels = y_val.to(device)

        val_outputs = model(val_inputs)
        val_loss = criterion(val_outputs, val_labels)

    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {loss.item():.4f}, Validation Loss: {val_loss.item():.4f}")

# Evaluation on the test data
model.eval()
with torch.no_grad():
    test_inputs = X_test.to(device)
    test_labels = y_test.to(device)

    test_outputs = model(test_inputs)
    test_loss = criterion(test_outputs, test_labels)

# Decode the predicted labels using inverse_transform
predicted_labels = label_encoder.inverse_transform(torch.argmax(test_outputs, dim=1).cpu().numpy())

# Decode the true labels using inverse_transform
true_labels = label_encoder.inverse_transform(y_test.cpu().numpy())

# Display classification report and confusion matrix
print("Classification Report: ")
# print(classification_report(true_labels, predicted_labels))

torch.save(model.state_dict(), "./weights")

In [None]:
import torch
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

data_dir = "./extracted_embeddings"
embeddings = []
labels = []

for label_folder in os.listdir(data_dir):
    label_path = os.path.join(data_dir, label_folder)
    if os.path.isdir(label_path):
        label = label_folder  # Assuming the folder names are labels
        embeddings_per_label = [] 

        for file_name in os.listdir(label_path):
            file_path = os.path.join(label_path, file_name)
            if file_name.endswith(".npy"):
                embedding = np.load(file_path)
                embeddings_per_label.append(embedding)
                labels.append(label)

        embeddings.append(embeddings_per_label)

embeddings = [item for sublist in embeddings for item in sublist]
X = torch.tensor(embeddings, dtype=torch.float32)
labels = np.array(labels)

label_encoder = LabelEncoder()
y = torch.tensor(label_encoder.fit_transform(labels), dtype=torch.long)

# Split the data into 80% training, 10% validation, and 10% test
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.2, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
print("y_test", y_test)

input_size = X_train.shape[1]
num_classes = len(set(y_train))

device = torch.device("cpu")
model = UpdatedModel(input_size, num_classes).to(device)
model.load_state_dict(torch.load('./weights'))
model.eval()

test_inputs = X_test.to(device)
test_outputs = model(test_inputs)
print("test_outputs", test_outputs)

# predicted_labels = label_encoder.inverse_transform(torch.argmax(test_outputs, dim=1).cpu().numpy())
# true_labels = label_encoder.inverse_transform(y_test.cpu().numpy())
# print("predicted_labels", predicted_labels, len(predicted_labels))
# print("true_labels", true_labels, len(true_labels))

test_outputs = model(test_inputs[0:1])
print("test_outputs", test_outputs)
predicted_labels = label_encoder.inverse_transform(torch.argmax(test_outputs, dim=1).cpu().numpy())
print("predicted_labels", predicted_labels, len(predicted_labels))