## Libraries and Imports

In [101]:
from datasets import load_dataset
import random
import cv2
import numpy as np
import matplotlib.pyplot as plt
import sklearn 
import pandas as pd
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.metrics.pairwise import cosine_similarity

## Import data

In [102]:
ds_dict = load_dataset("flwrlabs/caltech101")

random.seed(47)

ds = ds_dict["train"]
class_names = ds.features["label"].names

# sample 5 random label IDs
num_of_labels = 20 # change as needed <--------------
num_classes = len(class_names)
selected_label_ids = random.sample(range(num_classes), num_of_labels) 

# filter dataset
ds = ds.filter(lambda x: x["label"] in selected_label_ids)

# We map the ids to class names for printing so we know which labels were selected
selected_labels = [class_names[i] for i in selected_label_ids]
print("Selected labels:", selected_labels)


Selected labels: ['hawksbill', 'bonsai', 'laptop', 'panda', 'lobster', 'platypus', 'gramophone', 'elephant', 'motorbikes', 'ibis', 'inline_skate', 'saxophone', 'bass', 'ketch', 'soccer_ball', 'ant', 'camera', 'accordion', 'emu', 'mayfly']


## Split data

In [103]:
ds_split = ds.train_test_split(test_size=0.5, seed=42) # define split, it puts the remaining data in "train"

train_ds = ds_split["train"] # training set #50% of data
test_ds  = ds_split["test"] # test set #50% of data

# 1 Codebook generation

## Extract features

In [104]:
sift = cv2.SIFT_create() #SIFT feature extractor, create the SIFT object once

def extract_features(pil_image):

    # Convert PIL image to numpy array
    image = np.array(pil_image)

    # Convert to grayscale
    if image.ndim == 3:
        image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)

    #extract_features
    keypoints, descriptors = sift.detectAndCompute(image, None)

    # Handle images with no keypoints
    if descriptors is None:
        return np.empty((0, 128), dtype=np.float32)

    return keypoints ,descriptors

In [105]:
train_features = []
train_labels   = []
for sample in train_ds:
    keypoints, descriptors = extract_features(sample["image"])
    train_features.append(descriptors)
    train_labels.append(sample["label"])


test_features = []
test_labels   = []
for sample in test_ds:
    keypoints, descriptors = extract_features(sample["image"])
    test_features.append(descriptors)
    test_labels.append(sample["label"])

# Convert lists to matrix
train_feature_matrix = np.vstack(train_features) # use for training k-means
test_feature_matrix = np.vstack(test_features)

train_feature_matrix.shape, test_feature_matrix.shape #check shapes

((401169, 128), (369902, 128))

The shape above tells us that we currently have 87407 (rows) features represented as vectors of length 128. Each feature describes a patch in a given image in the dataset. A feature could fx. be a pattern, blob, corner, edge or something else.

### Reduce number of features (maybe not neccesary)

In [106]:
#Sample some of the descriptors, to reduce size for k-means
num_samples = 1000  # Adjust as needed

num_dec = train_feature_matrix.shape[0]
sample_size = min(num_samples, num_dec) # so we dont sample more than we have

rng = np.random.default_rng(seed=42)  # set random seed
sample_indices = rng.choice(num_dec, size=sample_size, replace=False) # get random indices



train_sampled_descriptors = train_feature_matrix[sample_indices] # get the randomly sampled descriptors
train_sampled_descriptors.shape #check shapes again to make sure sampling worked

(1000, 128)

## K mens clustering

In [107]:
k = 500  # number of clusters / visual words
random_state = 42
KMeans_model = sklearn.cluster.KMeans(n_clusters=k, random_state=random_state, n_init= 10) # Model definition
KMeans_model.fit(train_sampled_descriptors) # Fit model, which means to train the k-means clustering
codebook = KMeans_model.cluster_centers_  # The cluster centers are our visual words (our codebook)
print("Visual words shape:", codebook.shape)




Visual words shape: (500, 128)


## Form Bag of visual words for training data
classify each training descriptor to the
closest cluster centers and form the bag of words (BoW) for each image in the
image training set. For each image, we find out which words appears, and how many times it appears. Remember that the Bag of visual words, IS the word histogram

In [108]:

# Which visual word (cluster) does each descriptor belong to?
train_word_ids = []

for descriptors in train_features:   # one image at a time
    word_ids = KMeans_model.predict(descriptors)
    train_word_ids.append(word_ids)

# Now we can build the bag-of-words histograms for each image, i.e. 
#count how many times each visual word appears in the image
train_bow = []
for word_ids in train_word_ids:
    hist = np.bincount(word_ids, minlength=k)
    train_bow.append(hist)

train_bow = np.array(train_bow)
train_bow.shape  # Check shape

(979, 500)

# 2 Indexing
For each image in the test set:

* Extract the SIFT descriptors of the feature points in the image, (DONE)
* Project the descriptors onto the codebook, i.e. for each descriptor the find the closest cluster
* Constructs the generated corresponding bag of words, i.e. word histogram

## Form the bag of words for the testing data

In [109]:

# Which visual word (cluster) does each descriptor belong to?
test_word_ids = []

for descriptors in test_features:   # one image at a time
    word_ids = KMeans_model.predict(descriptors)
    test_word_ids.append(word_ids)
# Now we can build the bag-of-words histograms for each image, i.e. 
#count how many times each visual word appears in the image
test_bow = []
for word_ids in test_word_ids:
    hist = np.bincount(word_ids, minlength=k)
    test_bow.append(hist)

test_bow = np.array(test_bow)
test_bow.shape  # Check shape

(979, 500)

## Construct table
Per row the the table contains:
* file name
* true category
* training or test set
* bag of words/word histogram

In [110]:
# Create training DataFrame
train_rows = []

for i, sample in enumerate(train_ds):
    row = {
        "filename": sample.get("filename", f"train_{i}"),
        "label": train_labels[i],
        "split": "train",
        "bow": train_bow[i]
    }
    train_rows.append(row)

train_table = pd.DataFrame(train_rows)

# Create test DataFrame
test_rows = []

for i, sample in enumerate(test_ds):
    row = {
        "filename": sample.get("filename", f"test_{i}"),
        "label": test_labels[i],
        "split": "test",
        "bow": test_bow[i]
    }
    test_rows.append(row)

test_table = pd.DataFrame(test_rows)

# Combine the two :)
Table = pd.concat([train_table, test_table], ignore_index=True)
print("Combined table shape: images x categories", Table.shape)

Combined table shape: images x categories (1958, 4)


# 3 Retrieving

Given an image, how similar is the image to other images

## Common words

In [111]:
## common words, how many words are in common between two bags of words
def common_words(bow1, bow2):
    return np.sum(np.minimum(bow1, bow2))

In [112]:
## Basic retrieval function
# compare a query bow, to every bow in the table

def query_common_words(query_bow, table):

    score =[]
    bow_q = query_bow["bow"]

    for i, row in table.iterrows():
        bow_i = row["bow"]

        common_words_score = common_words(bow_q, bow_i)  # Using common words as similarity

        score.append({
            "filename": row["filename"],
            "label": row["label"],
            "score": common_words_score,
        })
    score_df = pd.DataFrame(score)
    score_df = score_df.sort_values(by="score", ascending=False)
    return score_df


## tf-idf similarity

In [113]:
tf_idf_transformer = TfidfTransformer(norm='l2') # an object that can transform BOW to TF-IDF
tf_idf_train = tf_idf_transformer.fit_transform(train_bow) # fitting the transformer AND  transform the training BOW to TF-IDF

In [114]:
def query_tf_idf(query_bow, train_table, tf_idf_transformer, tf_idf_train):

    bow_q = query_bow["bow"].reshape(1, -1)
    bow_q_tf_idf = tf_idf_transformer.transform(bow_q) # transform query to tf-idf
    tf_idf_db = tf_idf_train[train_table.index]
    scores = cosine_similarity(bow_q_tf_idf, tf_idf_db).flatten()


    results = pd.DataFrame({
        "filename": train_table["filename"].values,
        "label": train_table["label"].values,
        "tf_idf_score": scores
    })

    results = results.sort_values(by="tf_idf_score", ascending=False)

    return results


## Bhattacharyya distance

In [115]:
def bhattacharyya(bow1, bow2):
    # Normalize histograms to get probability distributions
    p = bow1 / (np.sum(bow1) +1e-10) # avoid division by zero +1e-10
    q = bow2 / (np.sum(bow2) +1e-10) #

    # Compute Bhattacharyya coefficient
    bc = np.sum(np.sqrt(p * q))
    
    return bc

In [116]:

def query_bhattacharyya(query_bow, table):

    score =[]
    bow_q = query_bow["bow"]

    for i, row in table.iterrows():
        bow_i = row["bow"]

        bhattacharyya_score = bhattacharyya(bow_q, bow_i)  # Using Bhattacharyya similarity

        score.append({
            "filename": row["filename"],
            "label": row["label"],
            "score": bhattacharyya_score,
        })
    score_df = pd.DataFrame(score)
    score_df = score_df.sort_values(by="score", ascending=False)
    return score_df


# Kullback-Liebler divergence

In [117]:
def kullback_leibler(bow1, bow2):
    epsilon = 1e-10
    # Normalize histograms to get probability distributions
    p = bow1 / (np.sum(bow1) + epsilon) # avoid division by zero +1e-10
    q = bow2 / (np.sum(bow2) + epsilon) #

    # Compute Kullback-Leibler divergence
    kl_div = np.sum(p * np.log(p / (q + epsilon)))
    
    return kl_div

In [118]:
def query_kullback_leibler(query_bow, table):

    score =[]
    bow_q = query_bow["bow"]

    for i, row in table.iterrows():
        bow_i = row["bow"]

        kl_score = kullback_leibler(bow_q, bow_i)  # Using Kullback-Leibler divergence

        score.append({
            "filename": row["filename"],
            "label": row["label"],
            "score": kl_score,
        })
    score_df = pd.DataFrame(score)
    score_df = score_df.sort_values(by="score", ascending= True)  # lower KL divergence is better, because it is a distance
    return score_df


## Experiment metrics

In [119]:
def reciprocal_rank(results, true_label):
    for rank, (_, row) in enumerate(results.iterrows(), start=1):
        if row["label"] == true_label:
            return 1 / rank # if found, return reciprocal rank
    return 0  #if true label not found

In [120]:
def top_k_accuracy(results, true_label, k= 3):
    return true_label in results.head(k)["label"].values

## Experiment 1 - Training data

In [121]:
# get query image
# get similar images from training set using 1/4 retrieval methods
# remove the query image from the results
# compute evaluation metrics

#common words retrieval
rr_common_words_train_list = []
top3_common_words_train_list = []

#TF-IDF retrieval
rr_tf_idf_train_list = []
top3_tf_idf_train_list = []

#Bhattacharyya retrieval
rr_bhattacharyya_train_list = []
top3_bhattacharyya_train_list = []
#Kullback-Leibler retrieval
rr_kullback_leibler_train_list = []
top3_kullback_leibler_train_list = []


for i in range(len(train_table)):  # index of the query image in the train set
    query_row = train_table.iloc[i]  # select the first image in the train set as query
    true_label = query_row["label"] # the true label of the query image

    fixed_query_table = train_table.drop(i)

    results_common_words = query_common_words(query_row, fixed_query_table) # common words retrieval
    results_tf_idf = query_tf_idf(query_row, fixed_query_table, tf_idf_transformer, tf_idf_train) # TF-IDF retrieval
    results_bhattacharyya = query_bhattacharyya(query_row, fixed_query_table) # Bhattacharyya retrieval
    results_kullback_leibler = query_kullback_leibler(query_row, fixed_query_table) # Kullback-Leibler retrieval
    # add other retrieval methods here

    #compute evaluation metrics
    #common words retrieval
    rr_common_words_train_list.append(reciprocal_rank(results_common_words, true_label))
    top3_common_words_train_list.append(top_k_accuracy(results_common_words, true_label, k=3))

    #TF-IDF retrieval
    rr_tf_idf_train_list.append(reciprocal_rank(results_tf_idf, true_label))
    top3_tf_idf_train_list.append(top_k_accuracy(results_tf_idf, true_label, k=3))

    #Bhattacharyya retrieval
    rr_bhattacharyya_train_list.append(reciprocal_rank(results_bhattacharyya, true_label))
    top3_bhattacharyya_train_list.append(top_k_accuracy(results_bhattacharyya, true_label, k=3))

    #Kullback-Leibler retrieval
    rr_kullback_leibler_train_list.append(reciprocal_rank(results_kullback_leibler, true_label))
    top3_kullback_leibler_train_list.append(top_k_accuracy(results_kullback_leibler, true_label, k=3))


# compute mean metrics
mean_rr_common_words = np.mean(rr_common_words_train_list)
mean_top3_common_words = np.mean(top3_common_words_train_list)

mean_rr_tf_idf = np.mean(rr_tf_idf_train_list)
mean_top3_tf_idf = np.mean(top3_tf_idf_train_list)

mean_rr_bhattacharyya = np.mean(rr_bhattacharyya_train_list)
mean_top3_bhattacharyya = np.mean(top3_bhattacharyya_train_list)

mean_rr_kullback_leibler = np.mean(rr_kullback_leibler_train_list)
mean_top3_kullback_leibler = np.mean(top3_kullback_leibler_train_list)



  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p 

In [127]:

# print results
print("Results using Common Words:")
print("Reciprocal Rank:", mean_rr_common_words)
print("Top-3 Accuracy:", mean_top3_common_words)

print("Results using TF-IDF:")
print("Reciprocal Rank:", mean_rr_tf_idf)
print("Top-3 Accuracy:", mean_top3_tf_idf)

print("Results using Bhattacharyya:")
print("Reciprocal Rank:", mean_rr_bhattacharyya)
print("Top-3 Accuracy:", mean_top3_bhattacharyya)

print("Results using Kullback-Leibler:")
print("Reciprocal Rank:", mean_rr_kullback_leibler)
print("Top-3 Accuracy:", mean_top3_kullback_leibler)

Results using Common Words:
Reciprocal Rank: 0.12264608515537939
Top-3 Accuracy: 0.12257405515832483
Results using TF-IDF:
Reciprocal Rank: 0.47329330048474233
Top-3 Accuracy: 0.5352400408580184
Results using Bhattacharyya:
Reciprocal Rank: 0.3722372848168103
Top-3 Accuracy: 0.41470888661899896
Results using Kullback-Leibler:
Reciprocal Rank: 0.49111919275018734
Top-3 Accuracy: 0.48723186925434114


## Experiment 2 - Test data

In [123]:
# get query image
# get similar images from training set using 1/4 retrieval methods
# remove the query image from the results
# compute evaluation metrics

#common words retrieval
rr_common_words_test_list = []
top3_common_words_test_list = []

#TF-IDF retrieval
rr_tf_idf_test_list = []
top3_tf_idf_test_list = []

#Bhattacharyya retrieval
rr_bhattacharyya_test_list = []
top3_bhattacharyya_test_list = []
#Kullback-Leibler retrieval
rr_kullback_leibler_test_list = []
top3_kullback_leibler_test_list = []


for i in range(len(test_table)):  # index of the query image in the test set
    query_row = test_table.iloc[i]  # select the first image in the test set as query
    true_label = query_row["label"] # the true label of the query image

    fixed_query_table = train_table # copy of train table

    results_common_words = query_common_words(query_row, fixed_query_table) # common words retrieval
    results_tf_idf = query_tf_idf(query_row, fixed_query_table, tf_idf_transformer, tf_idf_train) # TF-IDF retrieval
    results_bhattacharyya = query_bhattacharyya(query_row, fixed_query_table) # Bhattacharyya retrieval
    results_kullback_leibler = query_kullback_leibler(query_row, fixed_query_table) # Kullback-Leibler retrieval
    # add other retrieval methods here

    #compute evaluation metrics
    #common words retrieval
    rr_common_words_test_list.append(reciprocal_rank(results_common_words, true_label))
    top3_common_words_test_list.append(top_k_accuracy(results_common_words, true_label, k=3))

    #TF-IDF retrieval
    rr_tf_idf_test_list.append(reciprocal_rank(results_tf_idf, true_label))
    top3_tf_idf_test_list.append(top_k_accuracy(results_tf_idf, true_label, k=3))

    #Bhattacharyya retrieval
    rr_bhattacharyya_test_list.append(reciprocal_rank(results_bhattacharyya, true_label))
    top3_bhattacharyya_test_list.append(top_k_accuracy(results_bhattacharyya, true_label, k=3))
    #Kullback-Leibler retrieval
    rr_kullback_leibler_test_list.append(reciprocal_rank(results_kullback_leibler, true_label))
    top3_kullback_leibler_test_list.append(top_k_accuracy(results_kullback_leibler, true_label, k=3))


# compute mean metrics
mean_rr_common_words = np.mean(rr_common_words_test_list)
mean_top3_common_words = np.mean(top3_common_words_test_list)

mean_rr_tf_idf = np.mean(rr_tf_idf_test_list)
mean_top3_tf_idf = np.mean(top3_tf_idf_test_list)

mean_rr_bhattacharyya = np.mean(rr_bhattacharyya_test_list)
mean_top3_bhattacharyya = np.mean(top3_bhattacharyya_test_list)

mean_rr_kullback_leibler = np.mean(rr_kullback_leibler_test_list)
mean_top3_kullback_leibler = np.mean(top3_kullback_leibler_test_list)


  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p * np.log(p / (q + epsilon)))
  kl_div = np.sum(p 

In [124]:

# print results
print("Results using Common Words:")
print("Reciprocal Rank:", mean_rr_common_words)
print("Top-3 Accuracy:", mean_top3_common_words)

print("Results using TF-IDF:")
print("Reciprocal Rank:", mean_rr_tf_idf)
print("Top-3 Accuracy:", mean_top3_tf_idf)

print("Results using Bhattacharyya:")
print("Reciprocal Rank:", mean_rr_bhattacharyya)
print("Top-3 Accuracy:", mean_top3_bhattacharyya)

print("Results using Kullback-Leibler:")
print("Reciprocal Rank:", mean_rr_kullback_leibler)
print("Top-3 Accuracy:", mean_top3_kullback_leibler)

Results using Common Words:
Reciprocal Rank: 0.12264608515537939
Top-3 Accuracy: 0.12257405515832483
Results using TF-IDF:
Reciprocal Rank: 0.47329330048474233
Top-3 Accuracy: 0.5352400408580184
Results using Bhattacharyya:
Reciprocal Rank: 0.3722372848168103
Top-3 Accuracy: 0.41470888661899896
Results using Kullback-Leibler:
Reciprocal Rank: 0.49111919275018734
Top-3 Accuracy: 0.48723186925434114
