In [None]:
import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import normalize
from sklearn.cluster import KMeans
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import TfidfVectorizer
import matplotlib.pyplot as plt

def load_data(data_path, categories, img_size=(128, 128), test_size=0.5):
    
    train_data = {"images": [], "labels": []}
    test_data = {"images": [], "labels": []}
    category_to_id = {category: idx for idx, category in enumerate(categories)}

    for category in categories:
        category_path = os.path.join(data_path, category)
        category_images = []

        for filename in os.listdir(category_path):
            if filename.endswith('.jpg') or filename.endswith('.jpeg'):
                img_path = os.path.join(category_path, filename)
                try:
                    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
                    if img is not None:
                        img = cv2.resize(img, img_size) 
                        category_images.append(img)
                except Exception as e:
                    print(f"Error loading image {img_path}: {e}")

        train_images, test_images = train_test_split(category_images, test_size=test_size, random_state=42)

        train_data["images"].extend(train_images)
        train_data["labels"].extend([category_to_id[category]] * len(train_images))

        test_data["images"].extend(test_images)
        test_data["labels"].extend([category_to_id[category]] * len(test_images))

    return train_data, test_data, category_to_id


def extract_sift_features(images):

    sift = cv2.SIFT_create()
    all_features = []
    for img in images:
        keypoints, descriptors = sift.detectAndCompute(img, None)
        if descriptors is not None:
            all_features.append(descriptors)
        else:
            all_features.append(np.array([]))  
    return all_features


def build_visual_vocab(features, vocab_size=100):
   
    descriptors = np.vstack(features)  
    try:
        kmeans = KMeans(n_clusters=vocab_size, random_state=42, n_init=10, max_iter=300, tol=1e-4)
        kmeans.fit(descriptors)  
    except Exception as e:
        print(f"Error during K-Means clustering: {e}")
        kmeans = None
    return kmeans


def compute_bow_histograms(features, kmeans):
    
    histograms = []
    for descriptors in features:
        if descriptors.size > 0:
            cluster_assignments = kmeans.predict(descriptors)  
            hist = np.bincount(cluster_assignments, minlength=kmeans.n_clusters)  
        else:
            hist = np.zeros(kmeans.n_clusters)  
        histograms.append(hist)
    return histograms


def tf_idf_similarity(hist1, hist2):
    
    hist1 = hist1 / np.sum(hist1)
    hist2 = hist2 / np.sum(hist2)

    idf = np.log(len(hist1) / (np.count_nonzero(hist1) + 1)) + 1
    
    tfidf_hist1 = hist1 * idf
    tfidf_hist2 = hist2 * idf
    
    tfidf_hist1 = normalize([tfidf_hist1])[0]
    tfidf_hist2 = normalize([tfidf_hist2])[0]
    
    return cosine_similarity([tfidf_hist1], [tfidf_hist2])[0][0]


def kullback_leibler_divergence(hist1, hist2):

    hist1 = hist1 / np.sum(hist1)
    hist2 = hist2 / np.sum(hist2)
    # Avoid division by zero
    hist1 = np.clip(hist1, 1e-10, 1)
    hist2 = np.clip(hist2, 1e-10, 1)
    return np.sum(hist1 * np.log(hist1 / hist2))


def common_words_similarity(hist1, hist2, measure="cosine"):
    
    if measure == "cosine":
        return cosine_similarity([hist1], [hist2])[0][0]
    elif measure == "tfidf":
        return tf_idf_similarity(hist1, hist2)
    elif measure == "kl":
        return -kullback_leibler_divergence(hist1, hist2)  


def compute_mrr_and_top3(query_bow, train_bows, train_labels, query_label, similarity_measure="cosine"):
    
    similarities = [common_words_similarity(query_bow, train_bow, similarity_measure) for train_bow in train_bows]
    sorted_indices = np.argsort(similarities)[::-1]  
    top3_labels = [train_labels[i] for i in sorted_indices[:3]]  

    mrr = 0
    for rank, label in enumerate(top3_labels, 1):
        if label == query_label:
            mrr = 1 / rank  # Reciprocal rank
            break

    top3_accuracy = query_label in top3_labels  
    return mrr, top3_accuracy


def visualize_retrieval(query_image, top_3_images, top_3_labels, category_to_id):
    query_image_rgb = cv2.cvtColor(query_image, cv2.COLOR_GRAY2RGB)

    fig, axes = plt.subplots(1, 4, figsize=(16, 4))
    axes[0].imshow(query_image_rgb)
    axes[0].set_title("Query Image")
    axes[0].axis('off')

    for i in range(3):
        axes[i+1].imshow(cv2.cvtColor(top_3_images[i], cv2.COLOR_GRAY2RGB))
        axes[i+1].set_title(f"Top {i+1}: {list(category_to_id.keys())[list(category_to_id.values()).index(top_3_labels[i])]}")
        axes[i+1].axis('off')

    plt.show()

def image_retrieval_experiment(train_bows, train_labels, test_bows, test_labels, similarity_measure="cosine"):
    
    mrrs = []
    top3_accuracies = []

    for i, test_bow in enumerate(test_bows):
        query_label = test_labels[i]
        
        similarities = [common_words_similarity(test_bow, train_bow, similarity_measure) for train_bow in train_bows]
        
        sorted_indices = np.argsort(similarities)[::-1]  
        top3_labels = [train_labels[i] for i in sorted_indices[:3]]  

        mrr = 0
        for rank, label in enumerate(top3_labels, 1):
            if label == query_label:
                mrr = 1 / rank  
                break

        top3_accuracy = query_label in top3_labels 
        mrrs.append(mrr)
        top3_accuracies.append(top3_accuracy)
        
        top_3_images = [train_data['images'][i] for i in sorted_indices[:3]]
        visualize_retrieval(test_data['images'][i], top_3_images, top3_labels, category_to_id)
    
    mean_mrr = np.mean(mrrs)
    top3_accuracy_percent = np.mean(top3_accuracies) * 100
    return mean_mrr, top3_accuracy_percent

data_path = '101_ObjectCategories'
categories = ['beaver', 'cup', 'brain', 'elephant', 'Faces']

train_data, test_data, category_to_id = load_data(data_path, categories)

# Extract SIFT features
train_features = extract_sift_features(train_data["images"])
test_features = extract_sift_features(test_data["images"])

# Build visual vocabulary
kmeans = build_visual_vocab(train_features, vocab_size=100)

# Compute BoW histograms
train_bows = compute_bow_histograms(train_features, kmeans)
test_bows = compute_bow_histograms(test_features, kmeans)

# Experiment 1: Using TF-IDF Similarity
mean_mrr_tfidf, top3_accuracy_percent_tfidf = image_retrieval_experiment(
    train_bows, train_data["labels"], test_bows, test_data["labels"], similarity_measure="tfidf"
)

print(f"\nExperiment 1 (TF-IDF Similarity):")
print(f"Mean Reciprocal Rank (MRR): {mean_mrr_tfidf:.3f}")
print(f"Top-3 Accuracy: {top3_accuracy_percent_tfidf:.2f}%")

# Experiment 2: Using KL Divergence
mean_mrr_kl, top3_accuracy_percent_kl = image_retrieval_experiment(
    train_bows, train_data["labels"], test_bows, test_data["labels"], similarity_measure="kl"
)

print(f"\nExperiment 2 (KL Divergence):")
print(f"Mean Reciprocal Rank (MRR): {mean_mrr_kl:.3f}")
print(f"Top-3 Accuracy: {top3_accuracy_percent_kl:.2f}%")

