In [1]:
import warnings
warnings.filterwarnings("ignore")
import torch
import os
from going_modular.model.TripletFaceRecognition import EmbeddingNetConcatV3, TripletNetConcatV3
from going_modular.dataloader.triplet import CustomExrDatasetConCatV3
from torch.utils.data import Dataset, DataLoader
import csv
import json
from mtcnn import MTCNN
import cv2
import numpy as np
import albumentations as A
import random
import shutil
import pandas as pd
import ast
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc, accuracy_score
from pathlib import Path
from typing import Tuple


random.seed(42)
device = "cuda" if torch.cuda.is_available() else "cpu"

os.environ["OPENCV_IO_ENABLE_OPENEXR"]="1"
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

# Đặt seed toàn cục
seed = 42
torch.manual_seed(seed)

CONFIGURATION = {
    # Thư mục
    'data_dir': '../3d_face_recognition_magface/Dataset',
    'checkpoint': './checkpoint/new/concat_3/models/checkpoint.pth',
    'recognition_dir': './Gallery/recognition',

    # Cấu embedding
    'embedding_size': 512,
    'batch_size': 16,
    
    # Cấu hình khác
    'image_size': 256,
    'num_workers': 4,
}

test_transform = A.Compose([
    A.Resize(height=CONFIGURATION['image_size'], width=CONFIGURATION['image_size'])
], additional_targets={
    'albedo': 'image',
    'depthmap': 'image'
})

2025-01-20 18:28:15.779718: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
class ConcatGalleryExrDatasetV3(Dataset):
    
    def __init__(self, dataset_dir:str, transform, metadata_path):
        self.metadata_path = metadata_path
        self.albedo_dir = Path(dataset_dir) / 'Albedo' / 'gallery'
        self.depth_dir = Path(dataset_dir) / 'Depth_Map' / 'gallery'
        self.normal_dir = Path(dataset_dir) / 'Normal_Map' / 'gallery'
        
        self.transform = transform
        self.classes = sorted(os.listdir(self.albedo_dir))
        
        metadata = pd.read_csv(metadata_path)

        # Tạo danh sách các đường dẫn và id tương ứng
        self.data = [
            (
                (
                    Path(self.normal_dir, str(row['id']), f"{row['session']}.exr"),  # Đường dẫn albedo
                    Path(self.albedo_dir, str(row['id']), f"{row['session']}.exr"),  # Đường dẫn normal
                    Path(self.depth_dir, str(row['id']), f"{row['session']}.exr")   # Đường dẫn depth
                ),
                row['id']  # Lớp (label_index)
            )
            for _, row in metadata.iterrows()
        ]
        
        self.paths, self.classes = zip(*self.data)
        self.transform = transform

                
    def __len__(self):
        return len(self.paths)
    
    
    # Nhận vào index mà dataloader muốn lấy
    def __getitem__(self, index:int) -> Tuple[torch.Tensor, int]:
        albedo_path, normal_path, depth_path = self.paths[index]
        numpy_normal = self.__load_numpy_image(normal_path)
        numpy_albedo = self.__load_numpy_image(albedo_path)
        numpy_depth = self.__load_numpy_image(depth_path)
        label = albedo_path.parent.name
        label_index = self.classes.index(int(label))
        
        if self.transform:
            transformed = self.transform(image=numpy_normal, albedo=numpy_albedo, depthmap=numpy_depth)
            numpy_normal = transformed['image']
            numpy_albedo = transformed['albedo']
            numpy_depth = transformed['depthmap']
        
        # Stack các tensor lại thành một tensor duy nhất
        X = torch.stack((
            torch.from_numpy(numpy_normal).permute(2, 0, 1),
            torch.from_numpy(numpy_albedo).permute(2, 0, 1), 
            torch.from_numpy(numpy_depth).permute(2, 0, 1)
        ), dim=0)
        return X, label_index
        
        
    def __load_numpy_image(self, image_path):
        image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED)
        
        if image is None:
            raise ValueError(f"Failed to load image at {image_path}")
        elif len(image.shape) == 2:
            image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
        elif len(image.shape) == 3:
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        return image


dataset = ConcatGalleryExrDatasetV3(CONFIGURATION['data_dir'], test_transform, '../3d_face_recognition_magface/test_models/multi/gallery.csv')

dataloader = DataLoader(
    dataset,
    batch_size=CONFIGURATION['batch_size'],
    shuffle=True,
    num_workers=CONFIGURATION['num_workers'],
    pin_memory=True,
)

# Generate embedding to csv file

In [3]:
embedding_net = EmbeddingNetConcatV3(conf=CONFIGURATION)

model = TripletNetConcatV3(embedding_net).to(device)
checkpoint = torch.load(CONFIGURATION['checkpoint'])
model.load_state_dict(checkpoint['model_state_dict'])

<All keys matched successfully>

In [6]:
# Chuyển sang chế độ đánh giá
model.eval()

# Lưu embedding và class vào CSV
output_csv = "gallery_db.csv"
print(f"Lưu dữ liệu vào {output_csv}...")

with open(output_csv, mode="w", newline="") as file:
    writer = csv.writer(file)
    writer.writerow(["id", "embedding"])

    with torch.no_grad():
        for X, label_index in dataloader:
            # Đưa batch lên GPU nếu có
            X = X.to(device)

            # Sinh embedding
            embeddings = model.get_embedding(X).cpu().numpy()  # Chuyển về CPU

            # Lấy nhãn tương ứng
            labels = [dataset.classes[i] for i in label_index.numpy()]

            # Lưu vào CSV
            for embedding, label in zip(embeddings, labels):
                embedding_str = json.dumps(embedding.tolist())  # Chuyển thành chuỗi JSON
                writer.writerow([label, embedding_str])

print(f"Lưu thành công dữ liệu vào {output_csv}.")

Lưu dữ liệu vào gallery_db.csv...
Lưu thành công dữ liệu vào gallery_db.csv.


# Test face recognition

## Tính Accuracy

In [10]:
def compute_roc_auc(
    dataloader: torch.utils.data.DataLoader, 
    model: torch.nn.Module, 
    device: str
):
    
    model.eval()
    with torch.no_grad():
        embeddings_list = []
        for batch in dataloader:
            images, ids = batch
            images = images.to(device)
            embeddings = model.get_embedding(images)
            
            embeddings_list.append((ids, embeddings))
        
        # Concatenate all embeddings into one tensor
        all_ids = torch.cat([x[0] for x in embeddings_list], dim=0)
        all_embeddings = torch.cat([x[1] for x in embeddings_list], dim=0)
        
        euclidean_scores = []
        euclidean_labels = []
        cosine_scores = []
        cosine_labels = []

        # Compute pairwise Euclidean distance and cosine similarity
        all_embeddings_norm = all_embeddings / all_embeddings.norm(p=2, dim=1, keepdim=True)
        euclidean_distances = torch.cdist(all_embeddings, all_embeddings, p=2)  # Euclidean distance matrix
        cosine_similarities = torch.mm(all_embeddings_norm, all_embeddings_norm.t())  # Cosine similarity matrix
        
        # Compute labels (same id = 0, different id = 1)
        labels = (all_ids.unsqueeze(1) == all_ids.unsqueeze(0)).int().to(device)

        # Flatten and filter results
        euclidean_scores = euclidean_distances[torch.triu(torch.ones_like(labels), diagonal=1) == 1].cpu().numpy()
        euclidean_labels = labels[torch.triu(torch.ones_like(labels), diagonal=1) == 1].cpu().numpy()
        
        cosine_scores = cosine_similarities[torch.triu(torch.ones_like(labels), diagonal=1) == 1].cpu().numpy()
        cosine_labels = labels[torch.triu(torch.ones_like(labels), diagonal=1) == 1].cpu().numpy()
        
        # Compute ROC AUC for Euclidean distance
        euclidean_true_labels = 1 - np.array(euclidean_labels)
        euclidean_pred_scores = np.array(euclidean_scores)
        fpr_euclidean, tpr_euclidean, thresholds_euclidean = roc_curve(euclidean_true_labels, euclidean_pred_scores)
        roc_auc_euclidean = auc(fpr_euclidean, tpr_euclidean)

        # Compute ROC AUC for Cosine similarity
        cosine_true_labels = np.array(cosine_labels)
        cosine_pred_scores = np.array(cosine_scores)
        fpr_cosine, tpr_cosine, thresholds_cosine = roc_curve(cosine_true_labels, cosine_pred_scores)
        roc_auc_cosine = auc(fpr_cosine, tpr_cosine)
        
        # Calculate accuracy for Euclidean distance
        euclidean_optimal_idx = np.argmax(tpr_euclidean - fpr_euclidean) # Chọn ngưỡng tại điểm có giá trị tpr - fpr lớn nhất trên đường ROC, vì đây là nơi tối ưu hóa sự cân bằng giữa tỷ lệ phát hiện (TPR) và tỷ lệ báo động giả (FPR).
        euclidean_optimal_threshold = thresholds_euclidean[euclidean_optimal_idx]
        euclidean_pred_labels = (euclidean_pred_scores >= euclidean_optimal_threshold).astype(int)
        euclidean_accuracy = accuracy_score(euclidean_true_labels, euclidean_pred_labels)

        # Calculate accuracy for Cosine similarity
        cosine_optimal_idx = np.argmax(tpr_cosine - fpr_cosine)
        cosine_optimal_threshold = thresholds_cosine[cosine_optimal_idx]
        cosine_pred_labels = (cosine_pred_scores >= cosine_optimal_threshold).astype(int)
        cosine_accuracy = accuracy_score(cosine_true_labels, cosine_pred_labels)
        
    return roc_auc_euclidean, roc_auc_cosine, euclidean_accuracy, cosine_accuracy

roc_auc_euclidean, roc_auc_cosine, euclidean_accuracy, cosine_accuracy = compute_roc_auc(dataloader, model, device)


In [11]:
print(f'AUC - Cosine: {roc_auc_euclidean}')
print(f'AUC - Euclidean: {roc_auc_euclidean}')
print(f'Accuracy - Cosine: {roc_auc_euclidean}')
print(f'Accuracy - Euclidean: {roc_auc_euclidean}')

AUC - Cosine: nan
AUC - Euclidean: nan
Accuracy - Cosine: nan
Accuracy - Euclidean: nan


## Tìm embedding gần nhất trong gallery

In [18]:
# Tính khoảng cách Cosine
def calculate_cosine_distances_torch(query_embedding, embeddings):
    # Chuẩn hóa vector để tính cosine similarity
    query_embedding = query_embedding / query_embedding.norm(dim=1, keepdim=True)
    embeddings = embeddings / embeddings.norm(dim=1, keepdim=True)
    
    # Tính cosine similarity
    cosine_similarities = torch.mm(query_embedding, embeddings.T)
    # Chuyển thành khoảng cách
    return cosine_similarities

# Tính khoảng cách Euclidean
def calculate_euclidean_distances_torch(query_embedding, embeddings):
    # Tính Euclidean Distance
    euclidean_distances = torch.cdist(query_embedding, embeddings, p=2)
    return euclidean_distances

# Tìm top-k embedding gần nhất
def find_top_k_neighbors_torch(query_embedding, embeddings, labels, device, k=5, distance_metric='cosine'):
    query_embedding = torch.tensor(query_embedding, device=device, dtype=torch.float32).unsqueeze(0)
    embeddings = torch.tensor(embeddings, device=device, dtype=torch.float32)
    labels = torch.tensor(labels, device=device)

    if distance_metric == 'cosine':
        distances = calculate_cosine_distances_torch(query_embedding, embeddings).squeeze(0)
        # Lấy top-k (torch.topk trả giá trị nhỏ nhất nếu `largest=False`)
        top_k_distances, top_k_indices = torch.topk(distances, k, largest=True)
    elif distance_metric == 'euclidean':
        distances = calculate_euclidean_distances_torch(query_embedding, embeddings).squeeze(0)
        # Lấy top-k (torch.topk trả giá trị nhỏ nhất nếu `largest=False`)
        top_k_distances, top_k_indices = torch.topk(distances, k, largest=False)
    else:
        raise ValueError("Unsupported distance metric. Choose 'cosine' or 'euclidean'.")
    

    # Lấy nhãn của top-k
    top_k_labels = labels[top_k_indices]
    return top_k_distances.cpu().numpy(), top_k_labels.cpu().numpy()

df = pd.read_csv('gallery_db.csv')

# Chuyển chuỗi trong cột `Embedding` thành danh sách Python
df['embedding'] = df['embedding'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)

# Chuyển các danh sách trong cột `Embedding` thành mảng numpy
embeddings = np.array(df['embedding'].tolist(), dtype=np.float32)

# Giữ nguyên nhãn
labels = df['id'].values

In [26]:
def calculate_top_k_accuracy(query_embedding, embeddings, labels, query_label, device, k=5, distance_metric='cosine'):
    query_label = query_label.to(device)
    query_label_index = dataset.classes[query_label.item()]
    # Tính top-k nhãn gần nhất
    _, top_k_labels = find_top_k_neighbors_torch(query_embedding, embeddings, labels, device, k, distance_metric)
    
    top_k_labels = torch.tensor(top_k_labels, device=device)
    # Kiểm tra nếu nhãn thực tế nằm trong top-k
    if torch.isin(query_label_index, top_k_labels).any():  # Kiểm tra nếu query_label có nằm trong top_k_labels
        return 1  # Đúng
    return 0  # Sai

def calculate_recognition_accuracy(model, dataloader, gallery_embeddings, gallery_labels, device, top_k=5, distance_metric='cosine'):
    correct_predictions_top_k = 0
    total_predictions = 0

    model.eval()  # Đảm bảo mô hình ở trạng thái eval

    with torch.no_grad():  # Tắt gradient để tiết kiệm bộ nhớ
        for query_images, query_labels in dataloader:
            query_images = query_images.to(device)
            query_labels = query_labels.to(device)

            # Lấy embedding từ mô hình
            query_embeddings = model.get_embedding(query_images)
            
            # Tính accuracy cho từng query trong batch
            for idx, query_label in enumerate(query_labels):
                query_embedding = query_embeddings[idx]  # Lấy embedding của ảnh tại index idx trong batch
                
                # Tính top-k accuracy
                correct_predictions_top_k += calculate_top_k_accuracy(query_embedding, gallery_embeddings, gallery_labels, query_label, device, k=top_k, distance_metric=distance_metric)
                total_predictions += 1

    return correct_predictions_top_k / total_predictions

In [27]:
recognition_dataset = ConcatGalleryExrDatasetV3(CONFIGURATION['data_dir'], test_transform, '../3d_face_recognition_magface/test_models/multi/gallery_remaining.csv')

recognition_dataloader = DataLoader(
    recognition_dataset,
    batch_size=CONFIGURATION['batch_size'],
    shuffle=True,
    num_workers=CONFIGURATION['num_workers'],
    pin_memory=True,
)

In [28]:
# Tính top-1 accuracy
accuracy_top_1 = calculate_recognition_accuracy(
    model=model,
    dataloader=recognition_dataloader,
    gallery_embeddings=embeddings,
    gallery_labels=labels,
    device=device,
    top_k=1,  # Top-1 accuracy
    distance_metric='cosine'
)

# Tính top-5 accuracy
accuracy_top_5 = calculate_recognition_accuracy(
    model=model,
    dataloader=recognition_dataloader,
    gallery_embeddings=embeddings,
    gallery_labels=labels,
    device=device,
    top_k=5,  # Top-5 accuracy
    distance_metric='cosine'
)

print(f"Top-1 Accuracy: {accuracy_top_1:.2%}")
print(f"Top-5 Accuracy: {accuracy_top_5:.2%}")

Top-1 Accuracy: 54.10%
Top-5 Accuracy: 76.23%
