In [1]:
# !git clone https://github.com/sathishkumar67/Face-Recognition-using-Resnet.git
# !mv /kaggle/working/Face-Recognition-using-Resnet/* /kaggle/working/
!pip install -r requirements.txt
# !pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126

Collecting tensorflow==2.19.0 (from -r requirements.txt (line 1))
  Using cached tensorflow-2.19.0-cp312-cp312-win_amd64.whl.metadata (4.1 kB)
Collecting absl-py>=1.0.0 (from tensorflow==2.19.0->-r requirements.txt (line 1))
  Using cached absl_py-2.2.2-py3-none-any.whl.metadata (2.6 kB)
Collecting astunparse>=1.6.0 (from tensorflow==2.19.0->-r requirements.txt (line 1))
  Using cached astunparse-1.6.3-py2.py3-none-any.whl.metadata (4.4 kB)
Collecting flatbuffers>=24.3.25 (from tensorflow==2.19.0->-r requirements.txt (line 1))
  Using cached flatbuffers-25.2.10-py2.py3-none-any.whl.metadata (875 bytes)
Collecting gast!=0.5.0,!=0.5.1,!=0.5.2,>=0.2.1 (from tensorflow==2.19.0->-r requirements.txt (line 1))
  Using cached gast-0.6.0-py3-none-any.whl.metadata (1.3 kB)
Collecting google-pasta>=0.1.1 (from tensorflow==2.19.0->-r requirements.txt (line 1))
  Using cached google_pasta-0.2.0-py3-none-any.whl.metadata (814 bytes)
Collecting libclang>=13.0.0 (from tensorflow==2.19.0->-r requiremen

In [2]:
from tqdm import tqdm
import os
import random
import shutil
import cv2
from mtcnn import MTCNN
import numpy as np
from huggingface_hub import hf_hub_download
from siamese_resnet import unzip_file
# import torch
# import torch.nn as nn
# import torch.nn.functional as F
# from torch.utils.data import Dataset
# import torchvision
# import torchvision.transforms as transforms


  from .autonotebook import tqdm as notebook_tqdm


In [5]:
DATASET_REPO_ID = "pt-sk/Face_Recognition_Dataset"
DATASET_FILENAME_IN_REPO = "Face Recognition Dataset.zip"
DATASET_REPO_TYPE = "dataset"
LOCAL_DIR = os.getcwd()

In [None]:
# Download the dataset from Hugging Face Hub
hf_hub_download(repo_id=DATASET_REPO_ID, filename=DATASET_FILENAME_IN_REPO, repo_type=DATASET_REPO_TYPE, local_dir=LOCAL_DIR)

# Unzip the dataset
unzip_file(os.path.join(LOCAL_DIR, DATASET_FILENAME_IN_REPO), LOCAL_DIR)

In [6]:
def split_dataset(
    root_dir: str,
    save_dir: str,
    train_ratio: float = 0.8,
    val_ratio: float = 0.1,
    test_ratio: float = 0.1,
    target_size: tuple = (224, 224)) -> None:
    """
    Splits a facial image dataset into train/val/test sets, detects and crops faces,
    and saves processed images in organized directories.

    Args:
        root_dir: Directory containing identity subfolders with images
        save_dir: Directory to save processed images
        train_ratio: Proportion of data for training
        val_ratio: Proportion of data for validation
        test_ratio: Proportion of data for testing
        target_size: Size to resize detected faces to
    """
    
    # Validate input ratios
    assert abs((train_ratio + val_ratio + test_ratio) - 1.0) < 1e-9, "Ratios must sum to 1.0"
    
    # Initialize face detector
    face_detector = MTCNN()

    # Get and shuffle valid identities
    identities = _get_valid_identities(root_dir)
    random.shuffle(identities)

    # Split identities into groups
    split_groups = _split_identities(identities, train_ratio, val_ratio, test_ratio)
    splits = ["train", "val", "test"]

    # Process each split group
    for split_name, identity_group in zip(splits, split_groups):
        print(f"\nProcessing {split_name} set:")
        processed_count = 0
        skipped_images = 0

        # Main progress bar for identities
        identity_pbar = tqdm(identity_group, desc="Identities", leave=False)
        for identity in identity_pbar:
            identity_path = os.path.join(root_dir, identity)
            image_paths = _get_image_paths(identity_path)
            
            # Update identity progress bar description
            identity_pbar.set_postfix_str(f"Processing: {identity[:15]}...")
            
            # Create output directory for this identity
            output_dir = os.path.join(save_dir, split_name, identity)
            os.makedirs(output_dir, exist_ok=True)

            # Images progress bar for current identity
            image_pbar = tqdm(image_paths, 
                            desc=f"Images ({identity[:12]}...)", 
                            leave=False,
                            dynamic_ncols=True)
            for image_path in image_pbar:
                result = _process_and_save_image(
                    image_path=image_path,
                    output_dir=output_dir,
                    detector=face_detector,
                    target_size=target_size
                )
                
                if result == "skipped":
                    skipped_images += 1
                else:
                    processed_count += 1
                
                # Update image progress bar postfix
                image_pbar.set_postfix_str(f"Saved: {processed_count} | Skipped: {skipped_images}")
            
            image_pbar.close()

        identity_pbar.close()

        # Print statistics
        _print_split_stats(split_name, processed_count, skipped_images, save_dir)

    # Cleanup original data
    shutil.rmtree(root_dir)
    print("\n✅ Original dataset directory removed.")
def _get_valid_identities(root_dir: str) -> list:
    """Get identities with valid image directories"""
    return [
        identity for identity in os.listdir(root_dir)
        if os.path.isdir(os.path.join(root_dir, identity))
    ]

def _split_identities(
    identities: list,
    train_ratio: float,
    val_ratio: float,
    test_ratio: float)-> tuple:
    """Split identities into train/val/test groups"""
    total = len(identities)
    train_end = int(train_ratio * total)
    val_end = train_end + int(val_ratio * total)
    
    return (
        identities[:train_end],
        identities[train_end:val_end],
        identities[val_end:]
    )

def _get_image_paths(identity_path: str) -> list:
    """Get valid image paths for an identity"""
    return [
        os.path.join(identity_path, img)
        for img in os.listdir(identity_path)
        if img.lower().endswith(('.jpg', '.png', '.jpeg'))
    ]

def _process_and_save_image(
    image_path: str,
    output_dir: str,
    detector: MTCNN,
    target_size: tuple)-> str:
    """Process single image and save cropped face"""
    try:
        # Read and convert image
        image = cv2.cvtColor(cv2.imread(image_path), cv2.COLOR_BGR2RGB)
        
        # Detect faces
        faces = detector.detect_faces(image)
        if not faces:
            return "skipped"

        # Select best face
        best_face = max(faces, key=lambda x: x['confidence'])
        x, y, w, h = best_face['box']
        
        # Crop and resize
        face_region = image[y:y+h, x:x+w]
        resized_face = cv2.resize(face_region, target_size, interpolation=cv2.INTER_LANCZOS4)
        
        # Save image
        filename = f"{os.path.splitext(os.path.basename(image_path))[0]}_cropped.jpg"
        output_path = os.path.join(output_dir, filename)
        cv2.imwrite(output_path, cv2.cvtColor(resized_face, cv2.COLOR_RGB2BGR))
        
        return "saved"

    except Exception as e:
        print(f"⚠️ Error processing {image_path}: {str(e)}")
        return "skipped"

def _print_split_stats(split_name: str, valid_count: int, skipped_count: int, save_dir: str) -> None:
    """Print statistics for processed split"""
    print(f"\n{split_name.capitalize()} Set Summary:")
    print(f"✅ Successfully saved faces: {valid_count}")
    print(f"⏭️ Skipped images (no face detected/errors): {skipped_count}")
    print(f"📊 Total images processed: {valid_count + skipped_count}")
    print(f"📁 Output directory: {os.path.abspath(save_dir)}/{split_name}\n")

# Example usage
if __name__ == "__main__":
    split_dataset(
        root_dir="Face Recognition Dataset",
        save_dir="processed_data",
        train_ratio=0.8,
        val_ratio=0.1,
        test_ratio=0.1,
        target_size=(224, 224)
    )

I0000 00:00:1746252743.278495     161 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 15513 MB memory:  -> device: 0, name: Tesla P100-PCIE-16GB, pci bus id: 0000:00:04.0, compute capability: 6.0



Processing train set:


                                                                                   

KeyboardInterrupt: 

In [None]:
# triplet loss function
# This function computes the triplet loss for a batch of anchor, positive, and negative samples.
class TripletLoss(nn.Module):
    def __init__(self, margin=0.5):
        super(TripletLoss, self).__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative):
        # Compute pairwise distances
        distance_positive = F.pairwise_distance(anchor, positive)
        distance_negative = F.pairwise_distance(anchor, negative)
        
        # Calculate triplet loss
        losses = F.relu(distance_positive - distance_negative + self.margin)
        return losses.mean()
    
class SiameseResNet(nn.Module):
    def __init__(self, embedding_dim=256):
        super(SiameseResNet, self).__init__()
        # Load pretrained ResNet18
        self.backbone = torchvision.models.resnet18(weights="IMAGENET1K_V1", progress=True)
        
        # Replace the final fully connected layer
        self.backbone.fc = nn.Linear(512, embedding_dim)  # 512 -> 256

    def forward(self, x):
        return self.backbone(x)
    
    def print_parameters_count(self):
        total_params = sum(p.numel() for p in self.parameters()) / 1e6  # Convert to millions
        # Print the number of parameters in millions
        print(f"Total parameters: {total_params:.2f}m")
        
model = SiameseResNet(embedding_dim=256)
model.print_parameters_count()

# pass a sample image through the model
dummy_input = torch.randn(1, 3, 224, 224)  # batch size of 1, 3 channels, 224x224 image
output = model(dummy_input)
output.shape  # should be (1, 256) since we changed the final layer to output 256 features

In [None]:
import torch
from torch.utils.data import DataLoader
from torchvision import transforms

# Define transforms
transform = transforms.Compose([
    transforms.Resize(100),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Initialize dataset and dataloader
dataset = TripletFaceDataset(root_dir="path/to/dataset", transform=transform)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Initialize model, loss, and optimizer
model = SiameseResNet(embedding_dim=256)
criterion = TripletLoss(margin=0.5)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)

# Training loop
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

for epoch in range(10):
    for batch in dataloader:
        anchor, positive, negative = batch
        anchor, positive, negative = anchor.to(device), positive.to(device), negative.to(device)
        
        # Forward pass
        anchor_emb = model(anchor)
        positive_emb = model(positive)
        negative_emb = model(negative)
        
        # Compute loss
        loss = criterion(anchor_emb, positive_emb, negative_emb)
        
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")