In [1]:
import torch
import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader
from torch.optim import Adam
from pytorch_metric_learning import losses
from torchvision import models
from PIL import Image
import torch.nn.functional as F
import torch.nn as nn
import os

In [2]:
def get_device():
    """
    Get the device for PyTorch operations based on GPU availability.

    Returns:
        str: A string representing the device, either 'cuda:0' if a CUDA-compatible GPU is available,
        or 'cpu' if no GPU is available.

    Example:
    device = get_device()
    print(f"Using device: {device}")
    Using device: cuda:0 (if a GPU is available)
    Using device: cpu (if no GPU is available)
    """
    return 'cuda:0' if torch.cuda.is_available() else 'cpu'

In [3]:
def preprocess_dataset(dataset_path:str, batch_size:int, num_workers:int):
    """
    Preprocesses a dataset of images for training and testing.

    Args:
        dataset_path (str): The path to the dataset containing image files.
        batch_size (int): The batch size for the data loaders.
        num_workers (int): The number of worker processes for data loading.

    Returns:
        tuple: A tuple containing the following elements:
            - train_loader (DataLoader): DataLoader for the training dataset.
            - test_loader (DataLoader): DataLoader for the testing dataset.
            - class_to_name (dict): A dictionary mapping class indices to class names.
            - classes (list): A list of class names in the dataset.

    This function loads the dataset from the specified path, preprocesses the images, splits them into training and testing sets,
    and creates data loaders for both sets. It also provides a mapping of class indices to class names and returns the list
    of class names in the dataset.

    The image preprocessing includes resizing, horizontal flipping, random rotation, elastic transformation, perspective transformation,
    converting to tensors, and normalization.

    Example usage:
    train_loader, test_loader, class_to_name, classes = preprocess_dataset('data/dataset', batch_size=32, num_workers=4)
    """
    transform = transforms.Compose([
        transforms.Resize(128),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        # transforms.ElasticTransform(),
        transforms.RandomPerspective(),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406], 
            std=[0.229, 0.224, 0.225]
        )
    ])

    # Loading the dataset
    dataset = torchvision.datasets.ImageFolder(dataset_path, transform=transform)

    # Splitting the dataset into train and test
    train_dataset, test_dataset = torch.utils.data.random_split(dataset, [int(0.8 * len(dataset)), int(0.2 * len(dataset))])

    # Creating dataloaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True, num_workers=num_workers)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, pin_memory=True, num_workers=num_workers)

    num_classes = len(dataset.classes)
    print(f'Number of classes: {num_classes}')

    class_to_name = {i: class_name for i, class_name in enumerate(dataset.classes)}

    return train_loader, test_loader, class_to_name, dataset.classes

In [4]:
def build_model(embedding_size, lr=0.001):
    """
    Build a custom deep learning model for extracting embeddings from images.

    Args:
        embedding_size (int): The size of the embedding layer's output.
        lr (float, optional): The learning rate for the optimizer (default is 0.001).

    Returns:
        tuple: A tuple containing the following elements:
            - model (nn.Module): The custom deep learning model for image embedding extraction.
            - optimizer (torch.optim.Optimizer): The optimizer used for model training.

    This function constructs a custom deep learning model based on the ResNet-50 architecture. It removes the last dense layer
    and replaces it with a new fully connected layer with an output size specified by `embedding_size`.

    The model is specifically designed for extracting feature embeddings from images, making it suitable for various computer
    vision tasks.

    Example usage:
    model, optimizer = build_model(embedding_size=128, lr=0.001)
    """
    # getting the device
    device = get_device()

    # Defining the model
    model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
    model.fc = nn.Linear(in_features=model.fc.in_features, out_features=embedding_size)  # Removing the last dense layer
    optimizer = Adam(model.parameters(), lr=lr)
    model.to(device)

    return model, optimizer

In [6]:
def save_embeddings_labels(embeddings, labels, path, embedding_file_name, labels_file_name):
    """
    Save feature embeddings and corresponding labels to specified files.

    Args:
        embeddings (torch.Tensor): Feature embeddings to be saved.
        labels (torch.Tensor): Corresponding labels for the feature embeddings.
        path (str): The directory path where the files will be saved.
        embedding_file_name (str): The filename for saving feature embeddings.
        labels_file_name (str): The filename for saving labels.

    This function checks if the specified directory path exists; if not, it creates the directory.
    It then saves the feature embeddings and labels to separate files in the specified directory.

    Example usage:
    save_embeddings_labels(embeddings, labels, '/path/to/save', 'embeddings.pth', 'labels.pth')
    """
    if not os.path.exists(path):
        os.makedirs(path)
    
    torch.save(embeddings, os.path.join(path, embedding_file_name))
    torch.save(labels, os.path.join(path, labels_file_name))

In [7]:
def train(model, train_loader, index_to_class_dict, loss_fn, optimizer, epochs, embeddings_save_dir, model_out_path):
    """
    Train a deep learning model using CosFace and ArcFace loss functions and save embeddings.

    Args:
        model (nn.Module): The deep learning model to be trained.
        cosface_loss (torch.nn.Module): The CosFace loss function.
        arcface_loss (torch.nn.Module): The ArcFace loss function.
        optimizer (torch.optim.Optimizer): The optimizer for model training.
        epochs (int): The number of training epochs.
        train_loader (DataLoader): DataLoader for the training dataset.
        class_to_name (dict): A dictionary mapping class indices to class names.
        embeddings_save_dir (str): The directory path to save embeddings and labels.
        model_out_path (str): The file path to save the trained model.

    This function performs the training loop for the specified number of epochs using the provided model, loss functions,
    optimizer, and data loader. During each epoch, feature embeddings are extracted, and the combined loss is calculated
    using CosFace and ArcFace losses. Embeddings and labels are saved at the end of each epoch. The trained model is saved
    to the specified file path.

    Example usage:
    train(model, cosface_loss, arcface_loss, optimizer, 10, train_loader, class_to_name, '/path/to/save', 'model.pth')
    """
    # Getting the device
    device = get_device()

    # Starting training loop
    for epoch in range(epochs):
        model.train()
        # Initialize lists to store embeddings and labels
        train_embeddings = []
        train_labels = []

        # Iterating over the batches of training data
        for i, (images, labels) in enumerate(train_loader):
            images, labels = images.to(device), labels.to(device)

            # Extracting the features from the images
            features = model(images)

            # Calculate the CosFaceLoss and ArcFaceLoss
            # loss = combined_loss_fn(features, labels, cosface_loss, arcface_loss)
            loss = loss_fn(features, labels)

            # Storing embeddings and labels for this batch
            train_embeddings.extend(features)
            train_labels.extend([index_to_class_dict[i.item()] for i in labels])

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Printing the progress
            if i % 50 == 0:
                print(f"Epoch {epoch + 1}/{epochs} - Iteration {i + 1}/{len(train_loader)} - Loss: {loss.item():.4f}")
            
        # Save embeddings and labels at the end of each epoch
        save_embeddings_labels(
            embeddings=torch.stack(train_embeddings,dim=0),
            labels=train_labels,
            path=embeddings_save_dir,
            embedding_file_name=f"train_embeddings_epoch_{epoch}.pt",
            labels_file_name=f"train_labels_epoch_{epoch}.pt"
        )

        # Clear the lists to release GPU memory
        train_embeddings.clear()
        train_labels.clear()

    # Save the model after training
    torch.save(model.state_dict(), model_out_path)

In [8]:
batch_size = 128
dataset_path = "post-processed"
num_epochs = 20
embeddings_save_dir = 'embeddings/'
embedding_size =  512

In [9]:
train_loader, test_loader, class_to_name, classes = preprocess_dataset(dataset_path=dataset_path, batch_size=batch_size, num_workers=4)
# Building the model
model, optimizer = build_model(embedding_size=embedding_size)
# Defining the CosFaceLoss
# cosface_loss = losses.CosFaceLoss(num_classes=len(classes), embedding_size=embedding_size, margin=0.1)
# Defining ArcFaceLoss
arcface_loss = losses.ArcFaceLoss(num_classes=len(classes), embedding_size=embedding_size, margin=0.1)

train(
    model=model,
    train_loader=train_loader,
    index_to_class_dict=class_to_name,
    loss_fn=arcface_loss,
    optimizer=optimizer,
    epochs=num_epochs,
    embeddings_save_dir=embeddings_save_dir,
    model_out_path="model.pth",
)

Number of classes: 2996
Epoch 1/20 - Iteration 1/75 - Loss: 12.2104
Epoch 1/20 - Iteration 51/75 - Loss: 9.3830
Epoch 2/20 - Iteration 1/75 - Loss: 8.2994
Epoch 2/20 - Iteration 51/75 - Loss: 7.9286
Epoch 3/20 - Iteration 1/75 - Loss: 5.6080
Epoch 3/20 - Iteration 51/75 - Loss: 6.0126
Epoch 4/20 - Iteration 1/75 - Loss: 3.2437
Epoch 4/20 - Iteration 51/75 - Loss: 4.6735
Epoch 5/20 - Iteration 1/75 - Loss: 2.3007
Epoch 5/20 - Iteration 51/75 - Loss: 2.9967
Epoch 6/20 - Iteration 1/75 - Loss: 0.9751
Epoch 6/20 - Iteration 51/75 - Loss: 1.6555
Epoch 7/20 - Iteration 1/75 - Loss: 0.3368
Epoch 7/20 - Iteration 51/75 - Loss: 0.4562
Epoch 8/20 - Iteration 1/75 - Loss: 0.1894
Epoch 8/20 - Iteration 51/75 - Loss: 0.2522
Epoch 9/20 - Iteration 1/75 - Loss: 0.1406
Epoch 9/20 - Iteration 51/75 - Loss: 0.1656
Epoch 10/20 - Iteration 1/75 - Loss: 0.0971
Epoch 10/20 - Iteration 51/75 - Loss: 0.1182
Epoch 11/20 - Iteration 1/75 - Loss: 0.1381
Epoch 11/20 - Iteration 51/75 - Loss: 0.0784
Epoch 12/20 - 

In [9]:
def load_embeddings_labels(path):
    """
    Load feature embeddings and corresponding labels from saved files.

    Args:
        path (str): The directory path where the embeddings and labels are saved.
        num_epochs (int): The number of epochs for which embeddings and labels were saved.

    Returns:
        tuple: A tuple containing the following elements:
            - train_embeddings (torch.Tensor): Feature embeddings loaded from saved files.
            - train_labels (list): Corresponding labels loaded from saved files.

    This function loads feature embeddings and labels from saved files for the specified number of epochs.
    The loaded embeddings are converted to a torch.Tensor and returned, along with the list of labels.

    Example usage:
    train_embeddings, train_labels = load_embeddings_labels('/path/to/embeddings', num_epochs=10)
    """
    device = get_device()
    train_embeddings, train_labels = [], []

    for epoch in range(num_epochs):
        embedding = torch.load(os.path.join(path, f'train_embeddings_epoch_{epoch}.pt')).to(device)
        label = torch.load(os.path.join(path, f'train_labels_epoch_{epoch}.pt'))

        train_embeddings.extend(embedding)
        train_labels.extend(label)

    train_embeddings = torch.stack(train_embeddings, dim=0).to(device)

    return train_embeddings, train_labels

In [10]:
def load_model(model_path, embedding_size):
    """
    Load a previously trained deep learning model and its optimizer from a saved checkpoint.

    Args:
        model_path (str): The file path to the saved model checkpoint.
        embedding_size (int): The size of the embedding layer's output.

    Returns:
        tuple: A tuple containing the following elements:
            - model (nn.Module): The loaded deep learning model.
            - optimizer (torch.optim.Optimizer): The loaded optimizer associated with the model.

    This function loads a previously trained deep learning model and its optimizer from a saved checkpoint file.
    The model architecture should match the one used when the checkpoint was saved, and it is configured for the specified
    embedding size. The loaded model and optimizer are returned as a tuple.

    Example usage:
    model, optimizer = load_model('saved_model_checkpoint.pth', embedding_size=128)
    """
    model, optimizer = build_model(
        embedding_size = embedding_size,
    )
    # Load the saved model state dictionary
    model.load_state_dict(torch.load(model_path))

    return model, optimizer

In [13]:
train_embeddings, train_labels = load_embeddings_labels(embeddings_save_dir)

model, optimizer = load_model(
    model_path="model.pth",
    embedding_size=embedding_size
)

In [14]:
def predict_face(model, image_path, embeddings, labels):
    """
    Perform face recognition prediction using a trained model and saved embeddings.

    Args:
        model (nn.Module): The trained deep learning model for feature extraction.
        image_path (str): The file path to the image to be recognized.
        embeddings (torch.Tensor): Feature embeddings of known faces.
        labels (list): Corresponding labels for the known faces.

    This function loads and preprocesses an image, extracts its feature embedding using the provided model,
    and calculates the cosine similarity between the input embedding and known embeddings. It then predicts
    the class (label) of the most similar face and prints the predicted class.

    Example usage:
    predict_face(model, 'image_to_recognize.jpg', known_embeddings, known_labels)
    """
    model.eval()
    device = get_device()

    image = Image.open(image_path)
    transform = transforms.Compose(
        [
            transforms.Resize(128),
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(15),
            transforms.ElasticTransform(),
            transforms.RandomPerspective(),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406], 
                std=[0.229, 0.224, 0.225]
            )
        ]
    )
    input_image = transform(image).to(device)

    # Perform inference
    with torch.no_grad():
        input_image = input_image.unsqueeze(0).to(device)
        input_embedding = model(input_image)

    # Calcula a similaridade entres os vetores de embeddings
    similarities = F.cosine_similarity(input_embedding, embeddings, dim=1)
    most_similar_index = torch.argmax(similarities)

    # Recupera a classe predita
    predicted_class = labels[most_similar_index]

    # Printa o nome da classe
    print(f'A classe predita é: {predicted_class} --> {max(similarities)}')

predict_face(
    model=model,
    image_path='post-processed/Alvaro_Uribe/Alvaro_Uribe_0002_0000.jpg',
    embeddings=train_embeddings,
    labels=train_labels
)

A classe predita é: Alvaro_Uribe --> 0.5723316669464111


In [15]:
def add_new_class(model, embeddings, labels, dataset_path, epochs, num_works=4):
    """
    Add a new class to an existing face recognition model.

    Args:
        model (nn.Module): The trained deep learning model for feature extraction.
        embeddings (torch.Tensor): Feature embeddings of known faces.
        labels (list): Corresponding labels for the known faces.
        dataset_path (str): The directory path to the dataset containing images of the new class.
        epochs (int): The number of training epochs on the new class data.
        num_workers (int, optional): The number of worker processes for data loading (default is 4).

    Returns:
        tuple: A tuple containing the following elements:
            - embeddings (torch.Tensor): Updated feature embeddings, including the new class.
            - labels (list): Updated list of labels, including the new class.

    This function extends an existing face recognition model to include a new class by loading images of the new class,
    extracting feature embeddings, and updating the existing embeddings and labels. The training process is repeated for
    the specified number of epochs on the new class data.

    Example usage:
    updated_embeddings, updated_labels = add_new_class(model, known_embeddings, known_labels, 'new_class_data', 10)
    """
    device=get_device()
    model.eval()

    # Transformations
    transform = transforms.Compose([
        transforms.Resize(128),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        # transforms.ElasticTransform(),
        transforms.RandomPerspective(),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406], 
            std=[0.229, 0.224, 0.225]
        )
    ])

    # Loading the dataset
    new_dataset = torchvision.datasets.ImageFolder(dataset_path, transform=transform)

    print(f"Classes found: {', '.join(new_dataset.classes)}")

    # Criando os dataloaders
    train_loader = DataLoader(new_dataset, batch_size=batch_size, shuffle=True, pin_memory=True, num_workers=num_works)

    for i, (images, _) in enumerate(train_loader):
        for _ in range(epochs):
            # Inference on new class data
            with torch.no_grad():
                images = images.to(device)
                new_embeddings = model(images)

            # Update embeddings and labels
            embeddings = torch.cat([embeddings, new_embeddings], dim=0)
            labels.extend([new_dataset.classes[i]] * new_embeddings.size(0))

    return embeddings, labels

train_embeddings, train_labels = add_new_class(
    model=model,
    embeddings=train_embeddings,
    labels=train_labels,
    dataset_path='new_class',
    epochs=num_epochs*5
)

Classes found: new_person


In [17]:
predict_face(
    model=model,
    image_path='new_class/new_person/new_person_db.jpg',
    embeddings=train_embeddings,
    labels=train_labels
)

predict_face(
    model=model,
    image_path='new_person_infer.jpg',
    embeddings=train_embeddings,
    labels=train_labels
)

A classe predita é: new_person --> 0.8516544103622437
A classe predita é: new_person --> 0.5857816934585571
