# Assessment 2: Metric Learning with Oxford-IIIT Pet Dataset

## Introduction and Setup

This notebook implements a deep metric learning approach for the Oxford-IIIT Pet Dataset, focusing on learning an embedding space where similar pet breeds are close together and dissimilar ones are far apart. We'll explore different loss functions, evaluate the model on verification, retrieval, and few-shot classification tasks, and visualize the embedding space.

### Environment Setup and Package Installation

In [None]:

# Check if running in Colab (to install dependencies and set up environment)
import sys
IN_COLAB = 'google.colab' in sys.modules

# Install required packages
if IN_COLAB:
    !pip install pytorch-metric-learning
    !pip install faiss-gpu
    !pip install umap-learn
    !pip install matplotlib seaborn scikit-learn tqdm
    !pip install gradio
    !pip install grad-cam

### Import Libraries

import os
import random
import numpy as np
import pandas as pd
from PIL import Image
from tqdm.notebook import tqdm
from collections import defaultdict

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, Subset, ConcatDataset
from torchvision import datasets, models, transforms
import torchvision.transforms.functional as TF

import pytorch_metric_learning
from pytorch_metric_learning import losses, miners, distances, reducers, testers

import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.manifold import TSNE
from sklearn.metrics import roc_curve, auc, precision_recall_curve, average_precision_score
from sklearn.model_selection import train_test_split
import umap

# Set random seeds for reproducibility
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed()

# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")




## Data Loading and Preprocessing

In this section, we'll load the Oxford-IIIT Pet Dataset, perform necessary preprocessing, and create appropriate data loaders for our metric learning tasks.

In [None]:

# Define transformations
train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

eval_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Function to load the dataset
def load_oxford_pets_dataset(root="./data", download=True):
    train_val_dataset = datasets.OxfordIIITPet(
        root=root, 
        split="trainval", 
        transform=train_transform, 
        download=download
    )
    
    test_dataset = datasets.OxfordIIITPet(
        root=root, 
        split="test", 
        transform=eval_transform, 
        download=download
    )
    
    # For evaluation, create a version of the training set with eval transforms
    eval_train_dataset = datasets.OxfordIIITPet(
        root=root, 
        split="trainval", 
        transform=eval_transform, 
        download=False
    )
    
    return train_val_dataset, test_dataset, eval_train_dataset



### Dataset Preparation for Different Tasks

In [None]:

# Split data for training, validation and few-shot evaluation
def prepare_datasets(train_val_dataset, test_dataset, eval_train_dataset, num_holdout_classes=5, val_ratio=0.2):
    # Get the class names
    class_to_idx = train_val_dataset.class_to_idx
    idx_to_class = {v: k for k, v in class_to_idx.items()}
    num_classes = len(class_to_idx)
    
    # Split classes for few-shot learning (hold out some classes for testing)
    all_class_indices = list(range(num_classes))
    holdout_class_indices = random.sample(all_class_indices, num_holdout_classes)
    training_class_indices = [i for i in all_class_indices if i not in holdout_class_indices]
    
    holdout_classes = [idx_to_class[i] for i in holdout_class_indices]
    print(f"Holdout classes for few-shot learning: {holdout_classes}")
    
    # Create datasets excluding holdout classes for main training
    train_val_indices = [i for i, (_, label) in enumerate(train_val_dataset) if label not in holdout_class_indices]
    test_indices = [i for i, (_, label) in enumerate(test_dataset) if label not in holdout_class_indices]
    eval_train_indices = [i for i, (_, label) in enumerate(eval_train_dataset) if label not in holdout_class_indices]
    
    # For few-shot learning, include only holdout classes
    few_shot_train_indices = [i for i, (_, label) in enumerate(train_val_dataset) if label in holdout_class_indices]
    few_shot_test_indices = [i for i, (_, label) in enumerate(test_dataset) if label in holdout_class_indices]
    
    # Split train/val
    train_indices, val_indices = train_test_split(
        train_val_indices, 
        test_size=val_ratio, 
        stratify=[train_val_dataset[i][1] for i in train_val_indices],
        random_state=42
    )
    
    # Create Subset datasets
    train_dataset = Subset(train_val_dataset, train_indices)
    val_dataset = Subset(train_val_dataset, val_indices)
    test_filtered_dataset = Subset(test_dataset, test_indices)
    eval_train_dataset = Subset(eval_train_dataset, eval_train_indices)
    
    # Create datasets for few-shot learning
    few_shot_train_dataset = Subset(train_val_dataset, few_shot_train_indices)
    few_shot_test_dataset = Subset(test_dataset, few_shot_test_indices)
    
    # Create dictionary for class mapping
    class_mapping = {
        'class_to_idx': class_to_idx,
        'idx_to_class': idx_to_class,
        'holdout_class_indices': holdout_class_indices,
        'training_class_indices': training_class_indices
    }
    
    return {
        'train': train_dataset,
        'val': val_dataset,
        'test': test_filtered_dataset,
        'eval_train': eval_train_dataset,
        'few_shot_train': few_shot_train_dataset,
        'few_shot_test': few_shot_test_dataset,
        'class_mapping': class_mapping
    }



### Create DataLoaders

In [None]:

def create_dataloaders(datasets_dict, batch_size=32, num_workers=2):
    dataloaders = {}
    
    for key in ['train', 'val', 'test', 'eval_train', 'few_shot_train', 'few_shot_test']:
        if key == 'train':
            shuffle = True
        else:
            shuffle = False
            
        dataloaders[key] = DataLoader(
            datasets_dict[key],
            batch_size=batch_size,
            shuffle=shuffle,
            num_workers=num_workers,
            pin_memory=True
        )
    
    return dataloaders




### Load and Prepare Data


In [None]:

# Load the dataset
train_val_dataset, test_dataset, eval_train_dataset = load_oxford_pets_dataset()
print(f"Train+Val size: {len(train_val_dataset)}")
print(f"Test size: {len(test_dataset)}")

# Prepare datasets for different tasks
datasets_dict = prepare_datasets(train_val_dataset, test_dataset, eval_train_dataset)

# Create dataloaders
batch_size = 32  # Adjust based on your GPU/memory constraints
dataloaders = create_dataloaders(datasets_dict, batch_size=batch_size)

# Print dataset statistics
print("\nDataset Statistics:")
for key, dataloader in dataloaders.items():
    print(f"{key}: {len(dataloader.dataset)} samples")

class_mapping = datasets_dict['class_mapping']
num_classes = len(class_mapping['class_to_idx'])
print(f"Total number of classes: {num_classes}")
print(f"Number of training classes: {len(class_mapping['training_class_indices'])}")
print(f"Number of few-shot classes: {len(class_mapping['holdout_class_indices'])}")




## Model Architecture

In this section, we'll define our metric learning model architecture using a CNN backbone and a projection head.


In [None]:

class EmbeddingNet(nn.Module):
    def __init__(self, backbone_name='resnet18', embedding_size=128, pretrained=True):
        super(EmbeddingNet, self).__init__()
        
        # Get backbone and its output size
        self.backbone, backbone_output_size = self._get_backbone(backbone_name, pretrained)
        
        # Projection head (MLP)
        self.projection_head = nn.Sequential(
            nn.Linear(backbone_output_size, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Linear(512, embedding_size)
        )
        
    def _get_backbone(self, backbone_name, pretrained):
        """
        Create a backbone network from various architectures
        """
        if backbone_name == 'resnet18':
            backbone = models.resnet18(pretrained=pretrained)
            output_size = 512
        elif backbone_name == 'resnet34':
            backbone = models.resnet34(pretrained=pretrained)
            output_size = 512
        elif backbone_name == 'resnet50':
            backbone = models.resnet50(pretrained=pretrained)
            output_size = 2048
        elif backbone_name == 'efficientnet_b0':
            backbone = models.efficientnet_b0(pretrained=pretrained)
            output_size = 1280
        elif backbone_name == 'mobilenet_v2':
            backbone = models.mobilenet_v2(pretrained=pretrained)
            output_size = 1280
        elif backbone_name == 'densenet121':
            backbone = models.densenet121(pretrained=pretrained)
            output_size = 1024
        else:
            raise ValueError(f"Unsupported backbone: {backbone_name}")
        
        # For ResNet models
        if backbone_name.startswith('resnet'):
            # Remove the classification layer
            backbone = nn.Sequential(*list(backbone.children())[:-1])
        # For EfficientNet
        elif backbone_name.startswith('efficientnet'):
            backbone = nn.Sequential(*list(backbone.children())[:-1])
        # For MobileNet
        elif backbone_name.startswith('mobilenet'):
            backbone = nn.Sequential(*list(backbone.children())[:-1])
        # For DenseNet
        elif backbone_name.startswith('densenet'):
            backbone = nn.Sequential(
                backbone.features,
                nn.ReLU(inplace=True),
                nn.AdaptiveAvgPool2d((1, 1))
            )
        
        return backbone, output_size
        
    def forward(self, x):
        features = self.backbone(x)
        features = features.view(features.size(0), -1)
        embeddings = self.projection_head(features)
        
        # Normalize embeddings to unit length (important for cosine distance)
        normalized_embeddings = F.normalize(embeddings, p=2, dim=1)
        return normalized_embeddings
    
    def get_embedding(self, x):
        return self.forward(x)


## Loss Function Implementation

Here we'll implement several loss functions for metric learning including Triplet Loss, Contrastive Loss, and ArcFace. We'll also implement miners for efficient training.


In [None]:

def create_loss_and_miner(loss_type, margin=0.2, embedding_size=128, num_classes=32):
    """
    Create loss function and miner for metric learning
    """
    if loss_type == 'triplet':
        # Triplet loss with cosine distance
        distance = distances.CosineSimilarity()
        reducer = reducers.ThresholdReducer(low=0)
        loss_func = losses.TripletMarginLoss(margin=margin, distance=distance, reducer=reducer)
        mining_func = miners.TripletMarginMiner(margin=margin, distance=distance, type_of_triplets="semihard")
        
    elif loss_type == 'contrastive':
        # Contrastive loss
        distance = distances.CosineSimilarity()
        loss_func = losses.ContrastiveLoss(pos_margin=0.8, neg_margin=0.2, distance=distance)
        mining_func = miners.PairMarginMiner(pos_margin=0.8, neg_margin=0.2, distance=distance)
        
    elif loss_type == 'arcface':
        # ArcFace loss
        loss_func = losses.ArcFaceLoss(embedding_size, num_classes, margin=28.6, scale=64)
        mining_func = None
        
    else:
        raise ValueError(f"Unsupported loss type: {loss_type}")
        
    return loss_func, mining_func




### Hard Negative Mining (Bonus Implementation)

In [None]:

class HardNegativePairMiner(miners.BaseMiner):
    def __init__(self, distance, neg_margin=0.2, hardest_fraction=0.5):
        super().__init__()
        self.distance = distance
        self.neg_margin = neg_margin
        self.hardest_fraction = hardest_fraction
        
    def mine(self, embeddings, labels, ref_emb=None, ref_labels=None):
        ref_emb, ref_labels = embeddings, labels
        dist_mat = self.distance(embeddings, ref_emb)
        
        # Get negative pairs (different classes)
        negative_mask = labels.unsqueeze(1) != ref_labels.unsqueeze(0)
        
        # For each anchor, find all negative pairs
        anchors, negatives = torch.where(negative_mask)
        
        if len(anchors) == 0:
            return empty_tensor(0), empty_tensor(0), empty_tensor(0), empty_tensor(0)
        
        # Get distances for all negative pairs
        distances = dist_mat[anchors, negatives]
        
        # Group by anchor
        anchor_groups = defaultdict(list)
        for i in range(len(anchors)):
            anchor_groups[anchors[i].item()].append((negatives[i].item(), distances[i].item()))
        
        # For each anchor, select the hardest negatives
        hard_a, hard_n = [], []
        for anchor, neg_dists in anchor_groups.items():
            # Sort negatives by distance (ascending for hardest cosine similarity)
            neg_dists.sort(key=lambda x: x[1], reverse=True)
            
            # Select hardest fraction
            num_to_select = max(1, int(len(neg_dists) * self.hardest_fraction))
            selected_negs = neg_dists[:num_to_select]
            
            for neg, dist in selected_negs:
                hard_a.append(anchor)
                hard_n.append(neg)
        
        return (
            torch.tensor(hard_a, device=embeddings.device), 
            empty_tensor(0), 
            empty_tensor(0), 
            torch.tensor(hard_n, device=embeddings.device)
        )
        
def empty_tensor(size):
    return torch.tensor([], device=device, dtype=torch.long).view(size)




## Hyperparameter Tuning Framework

This section implements a simple hyperparameter tuning framework to find the best combination of parameters for our model.

In [None]:

# Add a simple hyperparameter tuning framework
def hyperparameter_tuning(param_grid, n_trials=5, evaluation_metric='combined'):
    """
    Simple hyperparameter tuning framework
    
    Args:
        param_grid (dict): Dictionary where keys are parameter names and values are lists of parameter values
        n_trials (int): Number of random combinations to try
        evaluation_metric (str): Which metric to use for choosing the best model ('roc_auc', 'recall', 'precision', 'combined')
    
    Returns:
        dict: Best parameters and the corresponding performance
    """
    print(f"Starting hyperparameter tuning with {n_trials} trials")
    print(f"Parameter grid: {param_grid}")
    
    # Load data (only once)
    train_val_dataset, test_dataset, eval_train_dataset = load_oxford_pets_dataset()
    datasets_dict = prepare_datasets(train_val_dataset, test_dataset, eval_train_dataset)
    
    # Track results
    results = []
    
    for trial in range(n_trials):
        # Sample random combination of hyperparameters
        params = {}
        for param_name, param_values in param_grid.items():
            params[param_name] = random.choice(param_values)
        
        print(f"\n\nTrial {trial+1}/{n_trials}")
        print(f"Parameters: {params}")
        
        # Create dataloaders
        dataloaders = create_dataloaders(
            datasets_dict, 
            batch_size=params.get('batch_size', 32),
            num_workers=params.get('num_workers', 2)
        )
        
        # Create model
        model = EmbeddingNet(
            backbone_name=params.get('backbone_name', 'resnet18'),
            embedding_size=params.get('embedding_size', 128),
            pretrained=True
        )
        model = model.to(device)
        
        # Create optimizer
        if params.get('optimizer_name', 'adam') == 'adam':
            optimizer = optim.Adam(
                model.parameters(), 
                lr=params.get('learning_rate', 1e-4),
                weight_decay=params.get('weight_decay', 0)
            )
        elif params.get('optimizer_name', 'adam') == 'sgd':
            optimizer = optim.SGD(
                model.parameters(), 
                lr=params.get('learning_rate', 1e-3),
                momentum=params.get('momentum', 0.9),
                weight_decay=params.get('weight_decay', 0)
            )
        
        # Create scheduler
        scheduler = None
        if params.get('use_scheduler', True):
            if params.get('scheduler_name', 'step') == 'step':
                scheduler = optim.lr_scheduler.StepLR(
                    optimizer, 
                    step_size=params.get('scheduler_step_size', 5), 
                    gamma=params.get('scheduler_gamma', 0.5)
                )
            elif params.get('scheduler_name', 'step') == 'cosine':
                scheduler = optim.lr_scheduler.CosineAnnealingLR(
                    optimizer,
                    T_max=params.get('num_epochs', 10),
                    eta_min=params.get('min_lr', 1e-6)
                )
        
        # Train model with fewer epochs for hyperparameter tuning
        num_epochs = params.get('num_epochs', 10)
        loss_type = params.get('loss_type', 'triplet')
        embedding_size = params.get('embedding_size', 128)
        
        try:
            # Train with minimal output
            model, history = train_model(
                model=model,
                dataloaders=dataloaders,
                loss_type=loss_type,
                optimizer=optimizer,
                scheduler=scheduler,
                num_epochs=num_epochs,
                embedding_size=embedding_size
            )
            
            # Simple evaluation to get a performance metric
            train_embeddings, train_labels = extract_embeddings(model, dataloaders['eval_train'])
            test_embeddings, test_labels = extract_embeddings(model, dataloaders['test'])
            
            # Verification Task
            verification_results = evaluate_verification(test_embeddings, test_labels)
            roc_auc = verification_results['roc_auc']
            
            # Retrieval Task
            retrieval_results = evaluate_retrieval(
                query_embeddings=test_embeddings,
                query_labels=test_labels,
                gallery_embeddings=train_embeddings,
                gallery_labels=train_labels,
                k_values=[1, 5]
            )
            recall_at_1 = retrieval_results['recall@1']
            precision_at_1 = retrieval_results['precision@1']
            
            # Calculate overall score based on the chosen evaluation metric
            if evaluation_metric == 'roc_auc':
                score = roc_auc
            elif evaluation_metric == 'recall':
                score = recall_at_1
            elif evaluation_metric == 'precision':
                score = precision_at_1
            else:  # combined
                score = 0.4 * roc_auc + 0.4 * recall_at_1 + 0.2 * precision_at_1
            
            # Store results
            results.append({
                'params': params,
                'roc_auc': roc_auc,
                'recall@1': recall_at_1,
                'precision@1': precision_at_1,
                'score': score,
                'val_loss': history['val_loss'][-1]
            })
            
            print(f"Trial {trial+1} results: ROC AUC = {roc_auc:.4f}, Recall@1 = {recall_at_1:.4f}, Score = {score:.4f}")
        
        except Exception as e:
            print(f"Error in trial {trial+1}: {e}")
            continue
    
    if not results:
        print("No successful trials!")
        return None
        
    # Find best parameters
    results.sort(key=lambda x: x['score'], reverse=True)
    best_result = results[0]
    
    print("\n\n============= Hyperparameter Tuning Results ==============")
    print(f"Best score: {best_result['score']:.4f}")
    print(f"Best parameters: {best_result['params']}")
    print(f"Performance: ROC AUC = {best_result['roc_auc']:.4f}, Recall@1 = {best_result['recall@1']:.4f}")
    print("===========================================================")
    
    # Plot validation loss curves for all trials
    if len(results) > 1:
        plt.figure(figsize=(12, 8))
        
        for i, result in enumerate(results):
            if 'val_loss' in result and len(result['val_loss']) > 0:
                plt.plot(range(1, len(result['val_loss'])+1), result['val_loss'], label=f"Trial {i+1} (score={result['score']:.4f})")
        
        plt.xlabel('Epoch')
        plt.ylabel('Validation Loss')
        plt.title('Hyperparameter Tuning - Validation Loss Curves')
        plt.legend()
        plt.grid(True)
        plt.savefig('hyperparameter_tuning_results.png')
        plt.show()
    
    return best_result

# Example usage of the hyperparameter tuning framework
def run_hyperparameter_search():
    param_grid = {
        'backbone_name': ['resnet18', 'resnet34', 'efficientnet_b0', 'mobilenet_v2'],
        'embedding_size': [64, 128, 256],
        'batch_size': [16, 32, 64],
        'loss_type': ['triplet', 'contrastive', 'arcface'],
        'learning_rate': [1e-4, 5e-4, 1e-3],
        'optimizer_name': ['adam', 'sgd'],
        'weight_decay': [0, 1e-5, 1e-4],
        'scheduler_name': ['step', 'cosine'],
        'num_epochs': [10]  # Keep this fixed for faster tuning
    }
    
    # Run hyperparameter tuning
    best_params = hyperparameter_tuning(param_grid, n_trials=5, evaluation_metric='combined')
    return best_params

# For a more focused search with fewer parameters
def focused_hyperparameter_search():
    param_grid = {
        'backbone_name': ['resnet18'],
        'embedding_size': [128, 256],
        'batch_size': [32],
        'loss_type': ['triplet', 'contrastive'],
        'learning_rate': [1e-4, 5e-4],
        'optimizer_name': ['adam'],
        'scheduler_name': ['step'],
        'num_epochs': [10]
    }
    
    # Run focused hyperparameter tuning
    best_params = hyperparameter_tuning(param_grid, n_trials=4, evaluation_metric='combined')
    return best_params

# Uncomment to run hyperparameter tuning
# best_params = run_hyperparameter_search()
# best_params = focused_hyperparameter_search()



## Main Execution


In [None]:

def evaluate_few_shot(support_embeddings, support_labels, query_embeddings, query_labels, n_way=5, k_shot=5, num_episodes=100):
    """
    Evaluate the model on n-way k-shot classification with confidence intervals
    
    Args:
        support_embeddings: Embeddings for the support set (training examples)
        support_labels: Labels for the support set
        query_embeddings: Embeddings for the query set (test examples)
        query_labels: Labels for the query set
        n_way: Number of classes in each episode
        k_shot: Number of examples per class in the support set
        num_episodes: Number of episodes to run for stable results
    
    Returns:
        dict: Dictionary containing accuracy statistics and confidence intervals
    """
    unique_labels = torch.unique(support_labels)
    if len(unique_labels) < n_way:
        print(f"Warning: Only {len(unique_labels)} classes available, but n_way={n_way}")
        n_way = len(unique_labels)
    
    accuracies = []
    
    # Progress bar for episodes
    progress_bar = tqdm(range(num_episodes), desc=f"{n_way}-way {k_shot}-shot evaluation")
    
    for episode in progress_bar:
        # Randomly select n classes for this episode
        selected_classes = np.random.choice(unique_labels.numpy(), n_way, replace=False)
        
        # Create support set (k examples per class)
        support_set_embeddings = []
        support_set_labels = []
        
        for class_idx, c in enumerate(selected_classes):
            # Get indices of examples of class c
            class_indices = torch.where(support_labels == c)[0]
            
            # Randomly select k examples
            if len(class_indices) >= k_shot:
                selected_indices = np.random.choice(class_indices.numpy(), k_shot, replace=False)
            else:
                # If not enough examples, use all and repeat some
                selected_indices = np.random.choice(class_indices.numpy(), k_shot, replace=True)
            
            for idx in selected_indices:
                support_set_embeddings.append(support_embeddings[idx])
                support_set_labels.append(class_idx)  # Use class index as the new label
        
        support_set_embeddings = torch.stack(support_set_embeddings)
        support_set_labels = torch.tensor(support_set_labels)
        
        # Create query set (all examples of the selected classes from the query set)
        query_set_indices = torch.tensor([i for i, label in enumerate(query_labels) if label in selected_classes])
        
        if len(query_set_indices) == 0:
            print("Warning: No query examples for selected classes")
            continue
            
        query_set_embeddings = query_embeddings[query_set_indices]
        query_set_labels = query_labels[query_set_indices]
        
        # Map original labels to new indices (0 to n_way-1)
        label_mapping = {selected_classes[i]: i for i in range(n_way)}
        query_set_labels = torch.tensor([label_mapping[label.item()] for label in query_set_labels])
        
        # Compute prototypes (mean embedding for each class)
        prototypes = torch.zeros(n_way, support_embeddings.size(1), device=support_embeddings.device)
        for c in range(n_way):
            prototypes[c] = support_set_embeddings[support_set_labels == c].mean(0)
        
        # Compute distances between query examples and prototypes
        # Using cosine similarity (higher means more similar)
        logits = torch.matmul(query_set_embeddings, prototypes.T)
        
        # Make predictions
        _, predictions = torch.max(logits, dim=1)
        
        # Compute accuracy
        accuracy = (predictions == query_set_labels).float().mean().item()
        accuracies.append(accuracy)
        
        # Update progress bar with current mean accuracy
        progress_bar.set_postfix({'mean_acc': np.mean(accuracies):.4f})
    
    # Calculate statistics
    mean_accuracy = np.mean(accuracies)
    std_accuracy = np.std(accuracies)
    
    # Calculate 95% confidence interval using bootstrap
    bootstrap_samples = 1000
    bootstrap_means = []
    
    for _ in range(bootstrap_samples):
        # Sample with replacement from the accuracies
        bootstrap_sample = np.random.choice(accuracies, size=len(accuracies), replace=True)
        bootstrap_means.append(np.mean(bootstrap_sample))
    
    # Calculate 95% confidence interval
    conf_interval = np.percentile(bootstrap_means, [2.5, 97.5])
    
    print(f"{n_way}-way {k_shot}-shot classification:")
    print(f"  Mean accuracy: {mean_accuracy:.4f}")
    print(f"  Standard deviation: {std_accuracy:.4f}")
    print(f"  95% confidence interval: [{conf_interval[0]:.4f}, {conf_interval[1]:.4f}]")
    
    return {
        'mean_accuracy': mean_accuracy,
        'std_accuracy': std_accuracy,
        'conf_interval': conf_interval,
        'accuracies': accuracies
    }

def evaluate_multiple_few_shot_settings(support_embeddings, support_labels, query_embeddings, query_labels):
    """
    Evaluate few-shot learning across multiple n-way, k-shot configurations
    and visualize the results with error bars
    """
    # Define the settings to evaluate
    settings = [
        {'n_way': 5, 'k_shot': 1, 'num_episodes': 100},  # 5-way 1-shot
        {'n_way': 5, 'k_shot': 5, 'num_episodes': 100},  # 5-way 5-shot
        {'n_way': 10, 'k_shot': 1, 'num_episodes': 100}, # 10-way 1-shot
        {'n_way': 10, 'k_shot': 5, 'num_episodes': 100}  # 10-way 5-shot
    ]
    
    # Store results
    results = []
    
    # Run evaluation for each setting
    print("\n=== Few-Shot Learning Evaluation ===\n")
    for setting in settings:
        print(f"\nEvaluating {setting['n_way']}-way {setting['k_shot']}-shot learning...")
        result = evaluate_few_shot(
            support_embeddings=support_embeddings,
            support_labels=support_labels,
            query_embeddings=query_embeddings,
            query_labels=query_labels,
            n_way=setting['n_way'],
            k_shot=setting['k_shot'],
            num_episodes=setting['num_episodes']
        )
        
        results.append({
            'n_way': setting['n_way'],
            'k_shot': setting['k_shot'],
            'mean_accuracy': result['mean_accuracy'],
            'std_accuracy': result['std_accuracy'],
            'conf_interval': result['conf_interval']
        })
    
    # Create a visualization of the results with error bars
    plt.figure(figsize=(10, 6))
    
    # Group by n_way for bar chart
    n_way_values = sorted(list(set([r['n_way'] for r in results])))
    k_shot_values = sorted(list(set([r['k_shot'] for r in results])))
    
    # Set up bar positions
    bar_width = 0.35
    x = np.arange(len(n_way_values))
    
    # Plot bars for each k_shot value
    for i, k in enumerate(k_shot_values):
        means = []
        errors_lower = []
        errors_upper = []
        
        for n in n_way_values:
            # Find result for this n_way and k_shot
            result = next((r for r in results if r['n_way'] == n and r['k_shot'] == k), None)
            if result:
                means.append(result['mean_accuracy'])
                errors_lower.append(result['mean_accuracy'] - result['conf_interval'][0])
                errors_upper.append(result['conf_interval'][1] - result['mean_accuracy'])
            else:
                means.append(0)
                errors_lower.append(0)
                errors_upper.append(0)
        
        # Plot bars with error bars
        plt.bar(
            x + (i - 0.5*(len(k_shot_values)-1)) * bar_width, 
            means, 
            bar_width, 
            yerr=[errors_lower, errors_upper],
            label=f'{k}-shot',
            capsize=5
        )
    
    # Customize plot
    plt.xlabel('Number of Classes (N-way)')
    plt.ylabel('Accuracy')
    plt.title('Few-Shot Learning Performance')
    plt.xticks(x, [f'{n}-way' for n in n_way_values])
    plt.ylim(0, 1.0)
    plt.legend(loc='upper right')
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    
    # Add value labels on bars
    for i, result in enumerate(results):
        n_way_idx = n_way_values.index(result['n_way'])
        k_shot_idx = k_shot_values.index(result['k_shot'])
        x_pos = n_way_idx + (k_shot_idx - 0.5*(len(k_shot_values)-1)) * bar_width
        y_pos = result['mean_accuracy'] + 0.02
        plt.text(x_pos, y_pos, f"{result['mean_accuracy']:.3f}", 
                 ha='center', va='bottom', fontsize=9)
    
    plt.tight_layout()
    plt.savefig('few_shot_evaluation.png', dpi=300)
    plt.show()
    
    return results


In [None]:

def main():
    # Uncomment to find the best parameters using hyperparameter tuning
    # best_params = focused_hyperparameter_search()
    # if best_params:
    #     backbone_name = best_params['params'].get('backbone_name', 'resnet18')
    #     embedding_size = best_params['params'].get('embedding_size', 128)
    #     batch_size = best_params['params'].get('batch_size', 32)
    #     loss_type = best_params['params'].get('loss_type', 'triplet')
    #     lr = best_params['params'].get('learning_rate', 1e-4)
    #     print(f"Using best parameters from hyperparameter tuning: {best_params['params']}")
    # else:
    #     # Use default parameters if hyperparameter tuning failed
    #     backbone_name = 'resnet18'
    #     embedding_size = 128
    #     batch_size = 32
    #     loss_type = 'triplet'
    #     lr = 1e-4
    
    # Model parameters (defaults)
    backbone_name = 'resnet18'  # Options: 'resnet18', 'resnet34', 'resnet50', 'efficientnet_b0', 'mobilenet_v2', 'densenet121'
    embedding_size = 128
    batch_size = 32  # Adjust based on your GPU
    num_workers = 2
    
    # Training parameters
    loss_type = 'triplet'  # Options: 'triplet', 'contrastive', 'arcface'
    num_epochs = 20
    lr = 1e-4

    # Load data
    train_val_dataset, test_dataset, eval_train_dataset = load_oxford_pets_dataset()
    datasets_dict = prepare_datasets(train_val_dataset, test_dataset, eval_train_dataset)
    dataloaders = create_dataloaders(datasets_dict, batch_size=batch_size, num_workers=num_workers)
    
    # Create model
    model = EmbeddingNet(backbone_name=backbone_name, embedding_size=embedding_size)
    model = model.to(device)
    
    # Create optimizer and scheduler
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)
    
    # Train model
    model, history = train_model(
        model=model,
        dataloaders=dataloaders,
        loss_type=loss_type,
        optimizer=optimizer,
        scheduler=scheduler,
        num_epochs=num_epochs,
        embedding_size=embedding_size
    )
    
    # Plot training history
    plt.figure(figsize=(10, 6))
    plt.plot(range(1, num_epochs+1), history['train_loss'], 'b-', label='Training Loss')
    plt.plot(range(1, num_epochs+1), history['val_loss'], 'r-', label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title(f'Training and Validation Loss ({loss_type} loss)')
    plt.legend()
    plt.grid(True)
    plt.savefig('training_history.png')
    plt.show()
    
    # Extract embeddings for evaluation
    print("\nExtracting embeddings for evaluation...")
    train_embeddings, train_labels = extract_embeddings(model, dataloaders['eval_train'])
    test_embeddings, test_labels = extract_embeddings(model, dataloaders['test'])
    few_shot_train_embeddings, few_shot_train_labels = extract_embeddings(model, dataloaders['few_shot_train'])
    few_shot_test_embeddings, few_shot_test_labels = extract_embeddings(model, dataloaders['few_shot_test'])
    
    # Evaluation tasks
    print("\n1. Verification Task:")
    verification_results = evaluate_verification(test_embeddings, test_labels)
    
    print(f"\nVerification Results:")
    print(f"ROC AUC: {verification_results['roc_auc']:.4f}")
    print(f"Equal Error Rate (EER): {verification_results['eer']:.4f}")
    
    print("\n2. Retrieval Task:")
    retrieval_results = evaluate_retrieval(
        query_embeddings=test_embeddings,
        query_labels=test_labels,
        gallery_embeddings=train_embeddings,
        gallery_labels=train_labels,
        k_values=[1, 5, 10]
    )
    
    print("\n3. Few-shot Classification:")
    # Run comprehensive few-shot evaluation across multiple settings
    few_shot_results = evaluate_multiple_few_shot_settings(
        support_embeddings=few_shot_train_embeddings,
        support_labels=few_shot_train_labels,
        query_embeddings=few_shot_test_embeddings,
        query_labels=few_shot_test_labels
    )
    
    # Embedding visualization
    print("\n4. Embedding Visualization:")
    test_projection = visualize_embeddings(
        embeddings=test_embeddings,
        labels=test_labels,
        class_mapping=datasets_dict['class_mapping'],
        method='tsne',
        title='t-SNE Visualization of Test Embeddings'
    )
    
    # Visualize few-shot embeddings
    print("\nVisualizing few-shot embeddings:")
    # Combine few-shot train and test embeddings for visualization
    all_few_shot_embeddings = torch.cat([few_shot_train_embeddings, few_shot_test_embeddings], dim=0)
    all_few_shot_labels = torch.cat([few_shot_train_labels, few_shot_test_labels], dim=0)
    
    few_shot_projection = visualize_embeddings(
        embeddings=all_few_shot_embeddings,
        labels=all_few_shot_labels,
        class_mapping=datasets_dict['class_mapping'],
        method='tsne',
        title='t-SNE Visualization of Few-Shot Embeddings'
    )
    
    # Bonus: Grad-CAM Visualization
    print("\n5. Grad-CAM Visualization:")
    visualize_grad_cam(model, dataloaders['test'], datasets_dict['class_mapping'], num_images=3)
    
    # Save model
    torch.save({
        'model_state_dict': model.state_dict(),
        'embedding_size': embedding_size,
        'backbone_name': backbone_name,
        'class_mapping': datasets_dict['class_mapping']
    }, f'pet_metric_learning_{backbone_name}_{loss_type}.pth')
    
    print("\nEvaluation completed!")

if __name__ == "__main__":
    main()



## Bonus: Multiple Loss Function Comparison

In [None]:

def compare_loss_functions():
    """
    Compare different loss functions for metric learning
    """
    # Model parameters
    backbone_name = 'resnet18'
    embedding_size = 128
    batch_size = 32
    num_workers = 2
    num_epochs = 15
    
    # Loss functions to compare
    loss_types = ['triplet', 'contrastive', 'arcface']
    
    # Load data (only once)
    train_val_dataset, test_dataset, eval_train_dataset = load_oxford_pets_dataset()
    datasets_dict = prepare_datasets(train_val_dataset, test_dataset, eval_train_dataset)
    dataloaders = create_dataloaders(datasets_dict, batch_size=batch_size, num_workers=num_workers)
    
    results = {}
    
    for loss_type in loss_types:
        print(f"\n{'=' * 40}")
        print(f"Training with {loss_type} loss")
        print(f"{'=' * 40}")
        
        # Create model
        model = EmbeddingNet(backbone_name=backbone_name, embedding_size=embedding_size)
        model = model.to(device)
        
        # Create optimizer and scheduler
        optimizer = optim.Adam(model.parameters(), lr=1e-4)
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)
        
        # Train model
        model, history = train_model(
            model=model,
            dataloaders=dataloaders,
            loss_type=loss_type,
            optimizer=optimizer,
            scheduler=scheduler,
            num_epochs=num_epochs,
            embedding_size=embedding_size
        )
        
        # Extract embeddings for evaluation
        print("\nExtracting embeddings for evaluation...")
        train_embeddings, train_labels = extract_embeddings(model, dataloaders['eval_train'])
        test_embeddings, test_labels = extract_embeddings(model, dataloaders['test'])
        
        # Evaluation
        verification_results = evaluate_verification(test_embeddings, test_labels)
        
        retrieval_results = evaluate_retrieval(
            query_embeddings=test_embeddings,
            query_labels=test_labels,
            gallery_embeddings=train_embeddings,
            gallery_labels=train_labels,
            k_values=[1, 5]
        )
        
        # Store results
        results[loss_type] = {
            'verification': {
                'roc_auc': verification_results['roc_auc'],
                'eer': verification_results['eer']
            },
            'retrieval': {
                'recall@1': retrieval_results['recall@1'],
                'recall@5': retrieval_results['recall@5'],
                'precision@1': retrieval_results['precision@1'],
                'precision@5': retrieval_results['precision@5']
            }
        }
        
        # Save model
        torch.save({
            'model_state_dict': model.state_dict(),
            'embedding_size': embedding_size,
            'backbone_name': backbone_name,
            'class_mapping': datasets_dict['class_mapping']
        }, f'pet_metric_learning_{backbone_name}_{loss_type}_comparison.pth')
    
    # Compare results
    print("\n{'='*50}")
    print("Comparison of Loss Functions")
    print({'='*50})
    
    # Create a comparison table
    comparison_df = pd.DataFrame({
        'Loss Function': [],
        'ROC AUC': [],
        'EER': [],
        'Recall@1': [],
        'Recall@5': [],
        'Precision@1': [],
        'Precision@5': []
    })
    
    for loss_type, metrics in results.items():
        comparison_df = comparison_df.append({
            'Loss Function': loss_type,
            'ROC AUC': metrics['verification']['roc_auc'],
            'EER': metrics['verification']['eer'],
            'Recall@1': metrics['retrieval']['recall@1'],
            'Recall@5': metrics['retrieval']['recall@5'],
            'Precision@1': metrics['retrieval']['precision@1'],
            'Precision@5': metrics['retrieval']['precision@5']
        }, ignore_index=True)
    
    print(comparison_df)
    
    # Plot comparison
    plt.figure(figsize=(15, 10))
    
    metrics = ['ROC AUC', 'Recall@1', 'Recall@5', 'Precision@1', 'Precision@5']
    x = np.arange(len(metrics))
    width = 0.25
    
    for i, loss_type in enumerate(loss_types):
        values = [
            results[loss_type]['verification']['roc_auc'],
            results[loss_type]['retrieval']['recall@1'],
            results[loss_type]['retrieval']['recall@5'],
            results[loss_type]['retrieval']['precision@1'],
            results[loss_type]['retrieval']['precision@5']
        ]
        plt.bar(x + i*width, values, width, label=loss_type)
    
    plt.ylabel('Score')
    plt.title('Comparison of Loss Functions')
    plt.xticks(x + width, metrics)
    plt.legend()
    plt.grid(True, axis='y')
    plt.savefig('loss_function_comparison.png')
    plt.show()
    
    return results, comparison_df

# Uncomment to run the comparison
# loss_comparison_results, loss_comparison_df = compare_loss_functions()




## Bonus: Streamlit Demo

We've created a user-friendly Streamlit application that demonstrates the practical use of our trained metric learning model for pet similarity search. The application allows users to upload a pet image and find similar-looking pets based on the embeddings learned by our model.

### Running the Streamlit App

The complete Streamlit application code is available in the separate notebook file: `Charles_Watson_streamlit_pet_similarity_app.ipynb`.

To run the demo:

1. Ensure you have the necessary Python packages installed:
   ```bash
   pip install streamlit torch torchvision Pillow numpy tqdm matplotlib
   ```

2. Convert the Streamlit notebook to a Python script:
   ```bash
   jupyter nbconvert --to python Charles_Watson_streamlit_pet_similarity_app.ipynb
   ```

3. Make sure a trained model file (e.g., `pet_metric_learning_resnet18_triplet.pth`) exists in the working directory.

4. Run the Streamlit application:
   ```bash
   streamlit run Charles_Watson_streamlit_pet_similarity_app.py
   ```

### Features of the Streamlit App

- **Upload Interface**: Users can upload any pet image for similarity search
- **Configuration Options**: Customize model path, database location, and number of results
- **Interactive Results**: View similar pets with their breed labels and similarity scores
- **Database Management**: Load and reload the pet image database as needed

The app uses the same embedding model architecture (`EmbeddingNet`) as developed in this notebook (`Charles_Watson_Ndethi_Kibaki-Code.ipynb`), ensuring consistency between training and deployment.



## Conclusion

In this notebook, we have implemented a comprehensive metric learning pipeline for pet breed classification using the Oxford-IIIT Pet Dataset. We have:

1. Built a custom embedding model with a CNN backbone and projection head
2. Implemented various loss functions for metric learning (Triplet, Contrastive, ArcFace)
3. Developed evaluation methods for verification, retrieval, and few-shot classification
4. Created visualization tools for embedding spaces and feature importance (Grad-CAM)
5. Included bonus implementations for hard negative mining and loss function comparison
6. Moved the Streamlit demo to a separate `pet_similarity_app.py` file.

The code is modular and can be easily adapted for different settings and experimentation. To run the complete training and evaluation pipeline, simply execute the `main()` function.