In [None]:
import os
try:
    print(file_path)
except:
    file_path = os.path.abspath('')
    os.chdir(os.path.dirname(file_path))
    print(file_path)

# Prepare dataset

In [None]:
# System libraries
import os
import torch
from torchinfo import summary
# from torchvision import transforms as T
from src.Utils.utils import *
from src.facenet_triplet.utils import *
from facenet_pytorch import InceptionResnetV1

import torch
from torch.optim import lr_scheduler
import torch.optim as optim

from src.facenet_triplet.trainer import fit

from torchvision import datasets
from torchvision.transforms import InterpolationMode , v2

from src.facenet_triplet.metrics import AverageNonzeroTripletsMetric

# Set up data loaders
from src.facenet_triplet.datasets import TripletFace, BalancedBatchSampler
from src.facenet_triplet.networks import TripletNet, FacenetEmbeddingNet
from src.facenet_triplet.losses import TripletLoss, OnlineContrastiveLoss

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
cuda = torch.cuda.is_available()

In [None]:

facenet_config_path = 'config/facenet.yaml'
facenet_config = read_config(path = facenet_config_path)
EPOCHS = facenet_config['EPOCHS']
PATIENCE = facenet_config['PATIENCE']
BATCH_SIZE = facenet_config['BATCH_SIZE']
IMG_SIZE = facenet_config['IMG_SIZE']
RANDOM_SEED = facenet_config['RANDOM_SEED']
WEIGHT_DECAY = facenet_config['WEIGHT_DECAY']
LR_WARMUP = facenet_config['LR_WARMUP']
CLIP_GRAD_NORM = facenet_config['CLIP_GRAD_NORM']
PRETRAINED_MODEL = facenet_config['PRETRAINED_MODEL']
MODEL_DIR = facenet_config['MODEL_DIR']
PIN_MEMORY = facenet_config['PIN_MEMORY']
IMG_SIZE = facenet_config['IMG_SIZE']

log_interval = facenet_config['log_interval']
learning_rate = facenet_config['learning_rate']
margin = facenet_config['margin']


MODEL_DIR = rename_model(model_dir = MODEL_DIR, prefix='facenet')
facenet_config['MODEL_DIR'] = MODEL_DIR
NUM_WORKERS = 0 if os.name == 'nt' else 8


train_dir = facenet_config['train_dir']
test_dir = facenet_config['test_dir']

In [None]:
def count_folders(path):
    """ Count the number of folders in the given directory """
    return len([name for name in os.listdir(path) if os.path.isdir(os.path.join(path, name))])


num_classes = count_folders(train_dir)
print(f'Number of classes: {num_classes}')


In [None]:
transform_original = v2.Compose([
    v2.Resize(IMG_SIZE, interpolation=InterpolationMode.BICUBIC,),
    v2.CenterCrop(IMG_SIZE),
    v2.ToTensor(),
    v2.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
])
train_face_dataset = datasets.ImageFolder(train_dir, transform=transform_original)
test_face_dataset = datasets.ImageFolder(test_dir, transform=transform_original)
train_face_dataset.train =True


In [None]:
train_face_dataset

In [None]:
test_face_dataset

# Model

In [None]:
res_model = InceptionResnetV1(pretrained=PRETRAINED_MODEL, classify=False,
                               num_classes=None, device=device)
facenet_embedding_net = FacenetEmbeddingNet(res_model)
facenet_embedding_net = facenet_embedding_net.to(device)
facenet_model = TripletNet(facenet_embedding_net)
facenet_model = facenet_model.to(device)

In [None]:
triplet_train_face_dataset = TripletFace(train_face_dataset, random_seed=RANDOM_SEED) # Returns triplets of images
triplet_test_face_dataset = TripletFace(test_face_dataset, random_seed=RANDOM_SEED) # Returns triplets of images

In [None]:
plot_triplet(triplet_train_face_dataset[10][0])

In [None]:
kwargs = {'num_workers': NUM_WORKERS, 'pin_memory': PIN_MEMORY} if cuda else {}

triplet_train_face_loader = torch.utils.data.DataLoader(triplet_train_face_dataset, batch_size=BATCH_SIZE, shuffle=True, **kwargs)
triplet_test_face_loader = torch.utils.data.DataLoader(triplet_test_face_dataset, batch_size=BATCH_SIZE, shuffle=False, **kwargs)


loss_fn = TripletLoss(margin)

optimizer = optim.Adam(facenet_model.parameters(), lr=learning_rate)
scheduler_linear = lr_scheduler.LinearLR(optimizer, start_factor=0.01, total_iters=LR_WARMUP)
scheduler_cosine = lr_scheduler.CosineAnnealingLR(optimizer, T_max=490, eta_min=learning_rate/100)
scheduler = lr_scheduler.SequentialLR(optimizer, [scheduler_linear,scheduler_cosine],milestones=[10])

In [None]:
# fit(train_loader = triplet_train_face_loader, 
#     val_loader=triplet_test_face_loader, 
#     model= facenet_model, 
#     loss_fn=loss_fn,
#     optimizer=optimizer, 
#     scheduler = scheduler, 
#     n_epochs=EPOCHS, 
#     device=device, 
#     log_interval=log_interval,)

In [None]:
# torch.save(facenet_model.state_dict(), "models/facenet_tune/facenet.pt")
torch.save(facenet_model, MODEL_DIR)

## Online Pair Selection

In [None]:
# train_face_dataset.targets

In [None]:
# We'll create mini batches by sampling labels that will be present in the mini batch and number of examples from each class
train_batch_sampler = BalancedBatchSampler(train_face_dataset, n_classes=3, n_samples=2, is_dataset=True)
test_batch_sampler = BalancedBatchSampler(test_face_dataset, n_classes=3, n_samples=2, is_dataset=True)

kwargs = {'num_workers': NUM_WORKERS, 'pin_memory': PIN_MEMORY} if cuda else {}

online_train_loader = torch.utils.data.DataLoader(train_face_dataset, batch_sampler=train_batch_sampler, **kwargs)
online_test_loader = torch.utils.data.DataLoader(test_face_dataset, batch_sampler=test_batch_sampler, **kwargs)


loss_fn = OnlineContrastiveLoss(margin, HardNegativePairSelector())
optimizer = optim.Adam(facenet_model.parameters(), lr=learning_rate)
scheduler_linear = lr_scheduler.LinearLR(optimizer, start_factor=0.01, total_iters=LR_WARMUP)
scheduler_cosine = lr_scheduler.CosineAnnealingLR(optimizer, T_max=490, eta_min=learning_rate/100)
scheduler = lr_scheduler.SequentialLR(optimizer, [scheduler_linear,scheduler_cosine],milestones=[10])


In [None]:

import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import numpy as np
from collections import Counter

# Assuming BalancedBatchSampler is already defined

# Parameters
n_classes = 2  # Number of classes per batch
n_samples = 3   # Number of samples per class
batch_size = n_classes * n_samples

# Create the balanced batch sampler
# sampler = BalancedBatchSampler(dataset, n_classes=n_classes, n_samples=n_samples)
sampler = BalancedBatchSampler(train_face_dataset, n_classes=n_classes, n_samples=n_samples, is_dataset=True)
loader = DataLoader(train_face_dataset, batch_sampler=sampler, **kwargs)

def check_batch_balance(batch_labels):
    label_counts = Counter(batch_labels.numpy())
    print(f"Batch size: {len(batch_labels)}")
    print(f"Unique classes in batch: {len(label_counts)}")
    print(f"Samples per class: {label_counts}")
    is_balanced = all(count == n_samples for count in label_counts.values())
    return is_balanced

# Check a few batches
for i, (_, labels) in enumerate(loader):
    print(f"\nBatch {i + 1}:")
    is_balanced = check_batch_balance(labels)
    print(f"Is batch balanced? {is_balanced}")
    
    if i == 4:  # Check 5 batches
        break

print("\nSampler length:", len(sampler))
print("Loader length:", len(loader))

In [None]:
# # Use facenet_embedding_net not the facenet_model
# fit(train_loader = online_train_loader, 
#                 val_loader=online_test_loader, 
#                 model= facenet_embedding_net, 
#                 loss_fn=loss_fn,
#                 optimizer=optimizer, 
#                 scheduler = scheduler, 
#                 n_epochs=EPOCHS, 
#                 device=device, 
#                 log_interval=log_interval,)

In [None]:
torch.save(facenet_embedding_net, MODEL_DIR)

# Inference

In [None]:
# facenet_model = torch.load("models/facenet_tune/facenet_2024_07_15_1.pth")

In [None]:
plot_model_result(facenet_model, triplet_train_face_loader, device)

In [None]:
facenet_embedding_net = torch.load("models/facenet_tune/facenet_2024_07_16_1.pth")
# facenet_embedding_net = facenet_embedding_net.to(device)
facenet_model_2 = TripletNet(facenet_embedding_net)
facenet_model_2 = facenet_model.to(device)

In [None]:
plot_model_result(facenet_model_2, triplet_train_face_loader, device)

In [None]:
from facenet_pytorch import MTCNN, InceptionResnetV1, fixed_image_standardization, training
import torch
from torch.utils.data import DataLoader, SubsetRandomSampler
from torch import optim
from torch.optim.lr_scheduler import MultiStepLR
from torchvision import datasets, transforms
import numpy as np
import os

In [None]:
data_dir = './data/NAB_faces'
output_dir = 'data/NAB_faces_cropped'

In [None]:
# device ='cpu'
# mtcnn = MTCNN(
#     image_size=160, margin=0, min_face_size=20,
#     thresholds=[0.6, 0.7, 0.7], factor=0.709, post_process=True, select_largest=True,
#     device=device
# )

# dataset = datasets.ImageFolder(data_dir, transform=transforms.Resize((512,512)))
# dataset.samples = [
#     (p, p.replace(data_dir, data_dir + '_cropped'))
#         for p, _ in dataset.samples
# ]
        
# loader = DataLoader(
#     dataset,
#     num_workers=0,
#     batch_size=8,
#     collate_fn=training.collate_pil
# )

# for i, (x, y) in enumerate(loader):
#     try:
#         mtcnn(x, save_path=y)
#         print('\rBatch {} of {}'.format(i + 1, len(loader)), end='')
#     except:
#         continue
    
# # Remove mtcnn to reduce GPU memory usage
# del mtcnn

In [None]:
import os
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from PIL import Image
import pandas as pd

# Define device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define image transformations
transform = transforms.Compose([
    transforms.Resize((160, 160)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])


In [None]:
def load_images_from_folder(folder):
    images = []
    filenames = []
    for filename in os.listdir(folder):
        img_path = os.path.join(folder, filename)
        image = Image.open(img_path).convert('RGB')
        image = transform(image)
        images.append(image)
        filenames.append(img_path)
    return images, filenames

# Load images
images, filenames = load_images_from_folder('data/NAB_faces_cropped/faces')
images = torch.stack(images).to(device)


In [None]:
# Load the facenet model

res_model = InceptionResnetV1(pretrained=PRETRAINED_MODEL, classify=False,
                               num_classes=None, device=device)
facenet_embedding_net = FacenetEmbeddingNet(res_model)
facenet_embedding_net.load_state_dict(torch.load("models/facenet_tune/facenet_2024_07_16_5.pth"))
facenet_embedding_net = facenet_embedding_net.to(device)


# Generate embeddings for all images
with torch.no_grad():
    embeddings = facenet_embedding_net.get_embedding(images)


In [None]:
embeddings.shape

In [None]:

def cosine_similarity(embedding1, embedding2):
    cos_sim = torch.nn.functional.cosine_similarity(embedding1, embedding2)
    # Scale to [0, 1]
    return (1 + cos_sim) / 2

def calculate_cosine_similarity(embeddings, threshold=0.95):
    similarity_matrix = []
    embeddings = embeddings.cpu()

    for i in range(len(embeddings)):
        row = []
        for j in range(len(embeddings)):
            if i != j:
                sim = cosine_similarity(embeddings[i].unsqueeze(0), embeddings[j].unsqueeze(0))
                row.append(sim.item())
            else:
                row.append(0)
        similarity_matrix.append(row)

    groups = []
    visited = set()
    for i in range(len(similarity_matrix)):
        if i not in visited:
            group = [i]
            visited.add(i)
            for j in range(len(similarity_matrix[i])):
                if similarity_matrix[i][j] > threshold and j not in visited:
                    group.append(j)
                    visited.add(j)
            groups.append(group)
    return groups


# Calculate cosine similarity and group images
groups = calculate_cosine_similarity(embeddings, threshold=0.95)


In [None]:
# groups

In [None]:
# Prepare data for CSV
data = []
for group_id, group in enumerate(groups):
    person_count = group_id
    for idx in group:
        data.append([idx, person_count, filenames[idx]])

# Create DataFrame and save to CSV
df = pd.DataFrame(data, columns=["id", "person", "image path"])
df.to_csv("data/grouped_faces.csv", index=False)

print("CSV file created: grouped_faces.csv")


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import os

# Load the CSV file containing the grouped faces information
df = pd.read_csv("data/grouped_faces.csv")

# Function to show images of a specific person
def show_images_of_person(person_id):
    # Filter the DataFrame for the specified person
    person_images = df[df['person'] == person_id]
    
    # Get the image paths
    image_paths = person_images['image path'].tolist()
    
    return image_paths

# Function to display the images
# Function to display the images
def display_images(image_paths, person_id):
    # Set up the number of images to display
    n_images = len(image_paths)
    cols = 5  # Number of columns for the display
    rows = (n_images // cols) + (n_images % cols > 0)  # Calculate the number of rows

    # Create a figure to display images
    plt.figure(figsize=(15, 3 * rows))
    
    for i, image_path in enumerate(image_paths):
        # Read the image
        img = mpimg.imread(image_path)
        
        # Add a subplot for each image
        plt.subplot(rows, cols, i + 1)
        plt.imshow(img)
        plt.axis('off')  # Hide the axes
        plt.title(f"Person ID: {person_id}")  # Set the title as the person_id

    plt.tight_layout()
    plt.show()
# Example: Get images for person 1
person_id = 7
image_paths = show_images_of_person(person_id)

# Display images for person 1
display_images(image_paths, person_id)


In [None]:
# Get unique person_ids from the DataFrame
unique_person_ids = df['person'].unique()

# Iterate over each unique person_id and display their images
for person_id in unique_person_ids:
    # Get image paths for the current person_id
    image_paths = show_images_of_person(person_id)
    if len(image_paths) == 1:
        continue
    
    # Display images for the current person_id
    display_images(image_paths, person_id)