In [1]:
import cv2
import os
import numpy as np
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from scipy.spatial import cKDTree
import matplotlib.pyplot as plt

from random import sample
from numpy.linalg import norm

# Input

In [2]:
def get_all_image_paths(directory):
    """
    Retrieves all image paths from subdirectories of the given directory.

    Args:
        directory (str): Path to the dataset directory.

    Returns:
        list: List of image paths.
    """
    image_paths = []

    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')):
                image_paths.append(os.path.join(root, file))

    return image_paths

In [19]:
# image_directory = "images/"
image_directory = "datasets/UKB/full"

# Get all image paths from subdirectories
image_paths = get_all_image_paths(image_directory)

# image_paths

In [5]:
len(image_paths) 

10200

# SIFT

In [6]:
def extract_sift_features(image_path):
    """
    Extracts local SIFT features from an image.

    Args:
        image_path (str): Path to the image.

    Returns:
        tuple: Tuple containing key points and descriptors.
    """
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    sift = cv2.SIFT_create()
    key_points, descriptors = sift.detectAndCompute(image, None)
    return key_points, descriptors

In [None]:
all_features = []
for i, path in enumerate(image_paths):
    key_points, descriptors = extract_sift_features(path)
    
    if descriptors is not None:
        all_features.append(descriptors)


In [8]:
print(len(all_features))
print(len(image_paths))

10200
10200


# VLAD

In [50]:
class VLAD:

    def __init__(self, k=256, n_vocabs=1, norming="original", lcs=False, alpha=0.2, verbose=True):
        """Initialize VLAD-object

        Notes
        -----

        Hyperparameters have to be set, even if `centers` and `qs` are set externally.
        """
        self.k = k
        self.n_vocabs = n_vocabs
        self.norming = norming
        self.vocabs = None
        self.centers = None
        self.database = None
        self.lcs = lcs
        self.alpha = alpha
        self.qs = None
        self.verbose = verbose

    def fit(self, X):
        """Fit Visual Vocabulary

        Parameters
        ----------
        X : list(array)
            List of image descriptors

        Returns
        -------
        self : VLAD
            Fitted object
        """
        X_mat = np.vstack(X)
        print(X_mat.shape[1])
        self.vocabs = []
        self.centers = []  # Is a list of `n_vocabs` np.arrays. Can be set externally without fitting
        self.qs = []  # Is a list of `n_vocabs` lists of length `k` of np.arrays. Can be set externally without fitting
        for i in range(self.n_vocabs):
            if self.verbose is True:
                print(f"Training vocab #{i+1}")
            if self.verbose is True:
                print(f"Training KMeans...")
            if len(X_mat) < int(2e5):
                self.vocabs.append(KMeans(n_clusters=self.k).fit(X_mat))
            else:
                idx = sample(range(len(X_mat)), int(2e5))
                self.vocabs.append(KMeans(n_clusters=self.k).fit(X_mat[idx]))
            self.centers.append(self.vocabs[i].cluster_centers_)
            if self.lcs is True and self.norming == "RN":
                if self.verbose is True:
                    print("Finding rotation-matrices...")
                predicted = self.vocabs[i].predict(X_mat)
                qsi = []
                for j in range(self.k):
                    q = PCA(n_components=X_mat.shape[1]).fit(X_mat[predicted == j]).components_
                    qsi.append(q)
                self.qs.append(qsi)
        self.database = self._extract_vlads(X)
        return self

    def transform(self, X):
        """Transform the input-tensor to a matrix of VLAD-descriptors

        Parameters
        ----------
        X : list(array)
            List of image-descriptors

        Returns
        -------
        vlads : array, shape (n, d * self.k)
            The transformed VLAD-descriptors
        """
        vlads = self._extract_vlads(X)
        return vlads

    def fit_transform(self, X):
        """Fit the model and transform the input-data subsequently

        Parameters
        ----------
        X : list(array)
            List of image-descriptors

        Returns
        -------
        vlads : array, shape (n, d * self.k)
            The transformed VLAD-descriptors
        """
        _ = self.fit(X)
        vlads = self.transform(X)
        return vlads

    def refit(self, X):
        """Refit the Visual Vocabulary

        Uses the already learned cluster-centers as in initial values for
        the KMeans-models

        Parameters
        ----------
        X : array
            The database used to refit the visual vocabulary

        Returns
        -------
        self : VLAD
            Refitted object
        """
        self.vocabs = []
        self.centers = []

        for i in range(self.n_vocabs):
            self.vocabs.append(KMeans(n_clusters=self.k, init=self.centers).fit(X.transpose((2, 0, 1))
                                                                                .reshape(-1, X.shape[1])))
            self.centers.append(self.vocabs[i].cluster_centers_)

        self.database = self._extract_vlads(X)
        return self

    def predict(self, desc):
        """Predict class of given descriptor-matrix

        Parameters
        ----------
        desc : array
            A descriptor-matrix (m x d)

        Returns
        -------
        ``argmax(self.predict_proba(desc))`` : array
        """
        return np.argmax(self.predict_proba(desc))

    def predict_proba(self, desc):
        """Predict class of given descriptor-matrix, return probability

        Parameters
        ----------
        desc : array
            A descriptor-matrix (m x d)

        Returns
        -------
        ``self.database @ vlad``
            The similarity for all database-classes
        """
        vlad = self._vlad(desc)  # Convert to VLAD-descriptor
        return self.database @ vlad  # Similarity between L2-normed vectors is defined as dot-product

    def _vlad(self, X):
        """Construct the actual VLAD-descriptor from a matrix of local descriptors

        Parameters
        ----------
        X : array
            Descriptor-matrix for a given image

        Returns
        -------
        ``V.flatten()`` : array
            The VLAD-descriptor
        """
        np.seterr(invalid='ignore', divide='ignore')  # Division with 0 encountered below
        vlads = []

        for j in range(self.n_vocabs):  # Compute for multiple vocabs
            # predicted = self.vocabs[j].predict(X)  # Commented out in favor of line below (No dependency on actual vocab, but only on centroids)
            predicted = norm(X - self.centers[j][:, None, :], axis=-1).argmin(axis=0)
            _, d = X.shape
            V = np.zeros((self.k, d))  # Initialize VLAD-Matrix

            # Computing residuals
            if self.norming == "RN":
                curr = X - self.centers[j][predicted]
                curr /= norm(curr, axis=1)[:, None]
                # Untenstehendes kann noch vektorisiert werden

                for i in range(self.k):
                    V[i] = np.sum(curr[predicted == i], axis=0)
                    if self.lcs is True:
                        V[i] = self.qs[j][i] @ V[i]  # Equivalent to multiplication in  summation above
            else:
                for i in range(self.k):
                    V[i] = np.sum(X[predicted == i] - self.centers[j][i], axis=0)

            # Norming
            if self.norming in ("intra", "RN"):
                V /= norm(V, axis=1)[:, None]  # L2-normalize every sum of residuals
                np.nan_to_num(V, copy=False)  # Some of the rows contain 0s. np.nan will be inserted when dividing by 0!

            if self.norming in ("original", "RN"):
                V = self._power_law_norm(V)

            V /= norm(V)  # Last L2-norming
            V = V.flatten()
            vlads.append(V)
        vlads = np.concatenate(vlads)
        vlads /= norm(vlads)  # Not on axis, because already flat
        return vlads

    def _extract_vlads(self, X):
        """Extract VLAD-descriptors for a number of images

        Parameters
        ----------
        X : list(array)
            List of image-descriptors

        Returns
        -------
        database : array
            Database of all VLAD-descriptors for the given Tensor
        """
        vlads = []
        for x in X:
            vlads.append(self._vlad(x))

        database = np.vstack(vlads)
        return database

    def _add_to_database(self, vlad):
        """Add a given VLAD-descriptor to the database

        Parameters
        ----------
        vlad : array
            The VLAD-descriptor that should be added to the database

        Returns
        -------
        ``None``
        """
        self.database = np.vstack((self.database, vlad))

    def _power_law_norm(self, X):
        """Perform power-Normalization on a given array

        Parameters
        ----------
        X : array
            Array that should be normalized

        Returns
        -------
        normed : array
            Power-normalized array
        """
        normed = np.sign(X) * np.abs(X)**self.alpha
        return normed

    def __repr__(self):
        return f"VLAD(k={self.k}, norming=\"{self.norming}\")"

    def __str__(self):
        return f"VLAD(k={self.k}, norming=\"{self.norming}\")"

## VLAD use

In [51]:
# vlad = VLAD(k=64, n_vocabs=1, norming="RN", lcs=True).fit(all_features)
vlad = VLAD(k=16, n_vocabs=1, norming="RN", lcs=True).fit(all_features)

128
Training vocab #1
Training KMeans...
Finding rotation-matrices...


In [None]:
vlad_database = vlad._extract_vlads(all_features)
# vlad_database = vlad.database

# BOF

In [27]:
from bof import BOF

In [28]:
m_bof = BOF(k=1024)
# m_bof = BOF(k=20480)
m_bof.fit(all_features)

<bof.BOF at 0x26b81f10150>

In [29]:
m_bof.databases.shape

(10200, 1024)

# Fisher Vector

In [9]:
from fisher_vector import FisherVector
fivec = FisherVector(n_components=64)
fivec.fit(all_features)



<fisher_vector.FisherVector at 0x26bb48ec610>

In [10]:
fivec.databases.shape

(10200, 8192)

# ADC

In [40]:
from adc import ADC

In [41]:
# adc = ADC(m=32, k=1024)
adc = ADC(m=16, k=256)

In [13]:
adc.fit(fivec.databases)

ADC(m=32, bs=10.0)

In [42]:
adc.fit(m_bof.databases)

<adc.ADC at 0x26b81f24910>

In [60]:
adc.fit(vlad_database)

ADC(m=16, bs=8.0)

In [43]:
adc.databases.shape

(10200, 16)

# Test 1

In [21]:
import json
with open(r'groundtruth.json', 'r') as file:
    data = json.load(file)
    for key in data:
        new_similar = []
        for img in data[key]['similar']:
            for idx, path in enumerate(image_paths):
                if img in path:
                    new_similar.append(idx)
        data[key]['similar'] = new_similar

In [22]:

outputs = []
for key in data:
    path = 'images/' + data[key]['query']
    key_points, descriptors = extract_sift_features(path)
    # query_vlad = vlad.transform([descriptors])
    # probabilities = adc.predict_proba(query_vlad)
    # query_bof = m_bof.transform([descriptors])
    # probabilities = adc.predict_proba(query_bof)
    
    query_fish = fivec.transform([descriptors])
    probabilities = adc.predict_proba(query_fish)

    sorted_indices = np.argsort(probabilities)

    output_per_query = []
    for i, index in enumerate(sorted_indices):
        print(f"Original index: {index}, Probability: {probabilities[index]}")
        if i > 0:
            output_per_query.append(index)
            # if len(output_per_query) == len(data[key]['similar']):
            #     break
            if len(output_per_query) == 30:
                break
    
    outputs.append(output_per_query)

Original index: 0, Probability: 145.84850526979474
Original index: 1, Probability: 158.3336224420527
Original index: 13, Probability: 175.20564613405574
Original index: 15, Probability: 175.20564613405574
Original index: 1392, Probability: 188.3091012468449
Original index: 222, Probability: 204.63214182892006
Original index: 1394, Probability: 206.1889099389933
Original index: 908, Probability: 274.61458645369595
Original index: 909, Probability: 284.3727469391706
Original index: 1045, Probability: 296.79814327342456
Original index: 907, Probability: 298.1676104805088
Original index: 5, Probability: 303.14163865321706
Original index: 1044, Probability: 304.18977927500123
Original index: 14, Probability: 322.3573799704841
Original index: 1364, Probability: 326.8849526979909
Original index: 69, Probability: 330.66114747068036
Original index: 1430, Probability: 370.280328420875
Original index: 326, Probability: 379.5265989307942
Original index: 12, Probability: 392.0715830728368
Original 

In [15]:
def compute_ap(similar, retrieved):
    """
    Compute Average Precision (AP) for a single query.
    """
    relevant_count = len(similar)
    if relevant_count == 0:
        return 0.0

    hits = 0
    precision_sum = 0.0

    for i, img in enumerate(retrieved, start=1):
        if img in similar:
            hits += 1
            precision_sum += hits / i

    return precision_sum / relevant_count

def compute_map(data, retrieved):
    """
    Compute mAP for all queries.
    """
    aps = []
    for i, entry in enumerate(data.values()):
        query = entry["query"]
        similar = set(entry["similar"])
        ap = compute_ap(similar, retrieved[i])
        # print(ap)
        aps.append(ap)
    return sum(aps) / len(aps) if aps else 0.0

In [None]:
# Fisher (k = 16) ADC (m=16, k=256)
compute_map(data, outputs)

0.4400938682117435

In [None]:
# Fisher (k = 64) ADC (m=64, k=1024)
compute_map(data, outputs)

0.43252457417325424

In [61]:
# VLAD (k=16) ADC (m=16, k=256) Top k : 30
compute_map(data, outputs)

0.5376465998449635

In [None]:
# VLAD (k=64) ADC (m=32, k=1024) Top k : 30
compute_map(data, outputs)

In [70]:
# BOF (k=1024) ADC (m=16, k=256) Top k : 30
compute_map(data, outputs)

0.41684722833359034

In [None]:
# BOF (k=1024) ADC (m=32, k=1024) Top k : 30
compute_map(data, outputs)

0.45380486841201123

In [83]:
# BOF (k=20480) ADC (m=32, k=1024) Top k : 30
compute_map(data, outputs)

0.3191882686872316

In [88]:
# BOF (k=20480) ADC (m=16, k=256) Top k : 30
compute_map(data, outputs)

0.35210481160541585

# Test 2

In [36]:
from metrics.score4 import calculate_gr_score

In [44]:
similarity_matrix = []
for path in image_paths:
    key_points, descriptors = extract_sift_features(path)
    
    # query_fish = fivec.transform([descriptors])
    # probabilities = adc.predict_proba(query_fish)
    query_bof = m_bof.transform([descriptors])
    probabilities = adc.predict_proba(query_bof)
    
    similarity_matrix.append(probabilities)

In [38]:
len(similarity_matrix[0]), len(similarity_matrix)

(10200, 10200)

In [39]:
# BOF k=1024 ADC m=32 k=1024
calculate_gr_score(np.array(similarity_matrix))

2.2905882352941176

In [26]:
# Fisher k=64 ADC m=32 k=1024
calculate_gr_score(np.array(similarity_matrix))

2.0431372549019606

In [None]:
# Fisher k=16 ADC m=16 k=256
calculate_gr_score(np.array(similarity_matrix))

2.241862745098039

# KDTree

In [None]:
kdtree=cKDTree(vlad.database)

In [None]:
def display_image_with_vector(image_path, vlad_vector, title):
    """
    Displays an image with its descriptor vector.

    Args:
        image_path (str): Path to the image.
        vlad_vector (numpy.ndarray): VLAD vector.
        title (str): Title for the plot.
    """
    image = cv2.imread(image_path)
    plt.figure()
    plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    plt.title(title)
    plt.show()


# Function to find and display similar images
def search_similar_images(query_vector, kdtree, vlad_vectors_reduced,
                          num_results=1):
    """
    Finds and displays similar images based on a query vector.

    Args:
        query_vector (numpy.ndarray): Query vector.
        kdtree: KD-Tree built on the training set.
        vlad_vectors_reduced: Reduced VLAD vectors for the training set.
        num_results (int): Number of similar images to display.
    """
    _, indices = kdtree.query(query_vector, num_results)
    print(indices)

    if num_results == 1:
        idx = int(indices[0])  # Access the first index
        image_path = image_paths[idx]
        display_image_with_vector(image_path,
                                  vlad_vectors_reduced[idx], "Similar Image")
    else:
        for i in range(num_results):
            idx = int(indices[i])
            image_path = image_paths[idx]
            display_image_with_vector(image_path,
                                      vlad_vectors_reduced[idx], f"Similar Image {i + 1}")


# Function to find the path of a similar image
def search_similar_images_path(query_vector, kdtree, vlad_vectors_reduced,
                               num_results=1):
    """
    Finds the path of a similar image based on a query vector.

    Args:
        query_vector (numpy.ndarray): Query vector.
        kdtree: KD-Tree built on the training set.
        vlad_vectors_reduced: Reduced VLAD vectors for the training set.
        num_results (int): Number of similar images to find.

    Returns:
        str: Path of the similar image.
    """
    _, indices = kdtree.query(query_vector, num_results)

    if num_results == 1:
        idx = int(indices[0])  # Access the first index
        image_path = image_paths[idx]
        return image_path
    else:
        for i in range(num_results):
            idx = int(indices[i])
            image_path = image_paths[idx]
            return image_path

In [None]:
vlad.database.shape

In [None]:
search_similar_images(vlad.database[0], kdtree, vlad.database, 3)