In [43]:
import torch
from collections import Counter
import numpy as np
from sklearn.metrics.pairwise import cosine_distances, euclidean_distances
from collections import defaultdict
from sklearn.metrics import precision_score, recall_score
import matplotlib.pyplot as plt


In [2]:
def knn(train_embeddings, train_labels, test_embeddings, k, distance_metric):
    if distance_metric == 'euclidean':
        distances = euclidean_distances(test_embeddings, train_embeddings)
    elif distance_metric == 'cosine':
        distances = cosine_distances(test_embeddings, train_embeddings)
    
    sorted_indices = np.argsort(distances, axis=1)[:, :k]
    k_nearest_labels = train_labels[sorted_indices]
    
    # Majority voting using Counter
    mode_labels = torch.mode(k_nearest_labels, dim=1)[0]  # Most frequent label per row
    return mode_labels

def classify(train_embeddings, train_labels, test_embeddings, test_labels, k_values, distance_metrics):
    results = {}
    train_embeddings, train_labels = torch.tensor(train_embeddings), torch.tensor(train_labels)
    test_embeddings, test_labels = torch.tensor(test_embeddings), torch.tensor(test_labels)
    
    for k in k_values:
        """ for metric in distance_metrics:
            print(f'Classifying with k={k}, metric={metric}')
            predicted_labels = knn(train_embeddings, train_labels, test_embeddings, k, metric)
            accuracy = (predicted_labels == test_labels).float().mean().item()
            results[(k, metric)] = accuracy """
        
        
        for metric in distance_metrics:
            print(f'Classifying with k={k}, metric={metric}')
            predicted_labels = knn(train_embeddings, train_labels, test_embeddings, k, metric)
            accuracy = (predicted_labels == test_labels).float().mean().item()
            print(f'Accuracy using text embeddings to predict labels with k={k} and distance metric={metric}: {accuracy}')
    
    return results


In [3]:
# Example usage
# Load embeddings and labels from .pth files
train_embeddings = torch.load('./SMAI_A1/train_embeddings.pth', map_location=torch.device('cpu'))
train_labels = torch.load('./SMAI_A1/train_labels.pth', map_location=torch.device('cpu'))
test_embeddings = torch.load('./SMAI_A1/test_embeddings.pth', map_location=torch.device('cpu'))
test_labels = torch.load('./SMAI_A1/test_labels.pth', map_location=torch.device('cpu'))
text_embeddings = torch.load('./SMAI_A1/text_embedding.pth', map_location=torch.device('cpu'))

print("Train embeddings shape:", train_embeddings.shape)
print("Text embeddings shape:", text_embeddings.shape)
print("Test labels shape:", test_labels.shape)
print("Train labels shape:", train_labels.shape)
print("Test embeddings shape:", test_embeddings.shape)


Train embeddings shape: torch.Size([50000, 512])
Text embeddings shape: torch.Size([10, 512])
Test labels shape: torch.Size([10000])
Train labels shape: torch.Size([50000])
Test embeddings shape: torch.Size([10000, 512])


In [4]:
k_values = [1, 5, 10]
distance_metrics = ['euclidean', 'cosine']

results = classify(train_embeddings, train_labels, test_embeddings, test_labels, k_values, distance_metrics)
print(results)

k_values = [1, 5, 10]
distance_metrics = ['euclidean', 'cosine']

results = classify(train_embeddings, train_labels, test_embeddings, test_labels, k_values, distance_metrics)
print(results)


  train_embeddings, train_labels = torch.tensor(train_embeddings), torch.tensor(train_labels)
  test_embeddings, test_labels = torch.tensor(test_embeddings), torch.tensor(test_labels)


Classifying with k=1, metric=euclidean
Accuracy using text embeddings to predict labels with k=1 and distance metric=euclidean: 0.9047999978065491
Classifying with k=1, metric=cosine
Accuracy using text embeddings to predict labels with k=1 and distance metric=cosine: 0.9047999978065491
Classifying with k=5, metric=euclidean
Accuracy using text embeddings to predict labels with k=5 and distance metric=euclidean: 0.9182000160217285
Classifying with k=5, metric=cosine
Accuracy using text embeddings to predict labels with k=5 and distance metric=cosine: 0.9182000160217285
Classifying with k=10, metric=euclidean
Accuracy using text embeddings to predict labels with k=10 and distance metric=euclidean: 0.9193999767303467
Classifying with k=10, metric=cosine
Accuracy using text embeddings to predict labels with k=10 and distance metric=cosine: 0.9193999767303467
{}


  train_embeddings, train_labels = torch.tensor(train_embeddings), torch.tensor(train_labels)
  test_embeddings, test_labels = torch.tensor(test_embeddings), torch.tensor(test_labels)


Classifying with k=1, metric=euclidean
Accuracy using text embeddings to predict labels with k=1 and distance metric=euclidean: 0.9047999978065491
Classifying with k=1, metric=cosine
Accuracy using text embeddings to predict labels with k=1 and distance metric=cosine: 0.9047999978065491
Classifying with k=5, metric=euclidean
Accuracy using text embeddings to predict labels with k=5 and distance metric=euclidean: 0.9182000160217285
Classifying with k=5, metric=cosine
Accuracy using text embeddings to predict labels with k=5 and distance metric=cosine: 0.9182000160217285
Classifying with k=10, metric=euclidean
Accuracy using text embeddings to predict labels with k=10 and distance metric=euclidean: 0.9193999767303467
Classifying with k=10, metric=cosine
Accuracy using text embeddings to predict labels with k=10 and distance metric=cosine: 0.9193999767303467
{}


In [None]:

distances = euclidean_distances(test_embeddings, text_embeddings)
nearest_indices = np.argmin(distances, axis=1)
predicted_labels = nearest_indices
accuracyy = (predicted_labels == test_labels).float().mean().item()
print(f"Accuracy using text embeddings with k=1: {accuracyy:.4f}")

Accuracy using text embeddings with k=1: 0.8781


In [12]:
# Convert numpy arrays to torch tensors
train_embeddings = torch.tensor(train_embeddings)
train_labels = torch.tensor(train_labels)
test_embeddings = torch.tensor(test_embeddings)
test_labels = torch.tensor(test_labels)
text_embeddings = torch.tensor(text_embeddings)

  train_embeddings = torch.tensor(train_embeddings)
  train_labels = torch.tensor(train_labels)
  test_embeddings = torch.tensor(test_embeddings)
  test_labels = torch.tensor(test_labels)
  text_embeddings = torch.tensor(text_embeddings)


In [35]:
def calculate_mrr(predictions, true_labels):
    ranks = []
    for true_label, pred in zip(true_labels, predictions):
        try:
            rank = pred.tolist().index(true_label) + 1
            ranks.append(1 / rank)
        except ValueError:
            ranks.append(0)
    return np.mean(ranks)

def precision_at_k(retrieved_indices, true_indices, k):
    relevant = np.isin(retrieved_indices[:k], true_indices)
    return np.sum(relevant) / k

def hit_rate(retrieved_indices, true_indices):
    return np.any(np.isin(retrieved_indices, true_indices)).astype(int)

def retrieve_nearest_neighbors(query_embeddings, train_embeddings, k=100, distance_metric='euclidean'):
    if distance_metric == 'euclidean':
        distances = euclidean_distances(query_embeddings, train_embeddings)
    elif distance_metric == 'cosine':
        distances = cosine_distances(query_embeddings, train_embeddings)
    
    sorted_indices = np.argsort(distances, axis=1)[:, :k]
    return sorted_indices

# Text to Image Retrival

In [38]:
distances = euclidean_distances(text_embeddings, test_embeddings)

# Retrieve the 100 nearest embeddings for each test embedding
k = 100
nearest_indices = np.argsort(distances, axis=1)[:, :k]
nearest_labels = train_labels[nearest_indices]

# Calculate metrics
true_labels = np.arange(10)
mrr1 = calculate_mrr(nearest_labels, true_labels)
precision = precision_at_k(nearest_labels, true_labels, k)
hit_rate_value = hit_rate(nearest_labels, true_labels)

print(f"Mean Reciprocal Rank (MRR): {mrr1:.4f}")
print(f"Precision@{k}: {precision:.4f}")
print(f"Hit Rate: {hit_rate_value:.4f}")

Mean Reciprocal Rank (MRR): 0.1622
Precision@100: 10.0000
Hit Rate: 1.0000


# Image to Image Retrival

In [37]:
# Compute distances between test embeddings and all train embeddings
distances = euclidean_distances(test_embeddings, train_embeddings)

# Retrieve the 100 nearest embeddings for each test embedding
k = 100
nearest_indices = np.argsort(distances, axis=1)[:, :k]
nearest_labels = train_labels[nearest_indices]

# Calculate metrics
mrr_img_t_img = calculate_mrr(nearest_labels, test_labels)
precision = precision_at_k(nearest_labels, test_labels, k)
hit_rate_value = hit_rate(nearest_labels, test_labels)

print(f"Mean Reciprocal Rank (MRR): {mrr_img_t_img:.4f}")
print(f"Precision@100: {precision:.4f}")
print(f"Hit Rate: {hit_rate_value:.4f}")

Mean Reciprocal Rank (MRR): 0.9348
Precision@100: 100.0000
Hit Rate: 1.0000


In [39]:
class LSH:
    def __init__(self, num_hyperplanes, embedding_dim):
        self.num_hyperplanes = num_hyperplanes
        self.hyperplanes = np.random.randn(num_hyperplanes, embedding_dim)  # Random hyperplanes

    def compute_hash(self, embeddings):
        projections = np.dot(embeddings, self.hyperplanes.T)  # Compute dot products
        return (projections > 0).astype(int)  # Convert to binary hash

    def index_embeddings(self, embeddings):
        hash_buckets = defaultdict(list)
        hashes = self.compute_hash(embeddings)
        for idx, h in enumerate(hashes):
            hash_tuple = tuple(h)
            hash_buckets[hash_tuple].append(idx)
        return hash_buckets

    def retrieve(self, query_embedding, hash_buckets, k=5):
        query_hash = tuple(self.compute_hash(query_embedding.reshape(1, -1))[0])
        if query_hash in hash_buckets:
            candidates = hash_buckets[query_hash]
        else:
            candidates = []
        return candidates[:k]  # Return up to k nearest candidates



In [None]:
# Load embeddings (assuming already available in variables)
embedding_dim = train_embeddings.shape[1]
num_hyperplanes_list = [5, 10, 20]  # Different number of hyperplanes

for num_hyperplanes in num_hyperplanes_list:
    lsh = LSH(num_hyperplanes, embedding_dim)
    hash_buckets = lsh.index_embeddings(train_embeddings.numpy())
    
    # Plot histogram of bucket sizes
    bucket_sizes = [len(v) for v in hash_buckets.values()]
    plt.hist(bucket_sizes, bins=30, alpha=0.6, label=f'{num_hyperplanes} hyperplanes')
    plt.xlabel("Bucket Size")
    plt.ylabel("Frequency")
    plt.title("Histogram of Samples in Buckets")
    plt.legend()
    plt.show()
    
    # Perform image retrieval for k=5
    retrieved_indices = []
    actual_labels = []
    predicted_labels = []
    
    for i in range(len(test_embeddings)):
        retrieved = lsh.retrieve(test_embeddings[i], hash_buckets, k=5)
        retrieved_indices.append(retrieved)
        
        if retrieved:
            actual_labels.extend([test_labels[i]] * len(retrieved))
            predicted_labels.extend(train_labels[retrieved])
    
    # Compute precision and recall
    precision = precision_score(actual_labels, predicted_labels, average='weighted', zero_division=0)
    recall = recall_score(actual_labels, predicted_labels, average='weighted', zero_division=0)
    print(f'Hyperplanes: {num_hyperplanes} | Precision: {precision:.4f} | Recall: {recall:.4f}')
