# Image Embedding Similarity Search

## Creating image embeddings with features extracted using PyTorch

In [4]:
import os
import torch
import shutil
import numpy as np
from tqdm import tqdm
from zipfile import ZipFile
from torch.utils.data import DataLoader
from torchvision import models, transforms, datasets

#### The following is just a helper function used later to combine our organized (foldered by class) images into one big folder (unorganized).

In [5]:
# add code that will merge the folders of the test images
# Function to create new folder if not exists
def merge_folders(src, dst=None) -> None:
    '''merge_folders combine the files from multiple directories of a parent folder into one destination folder

    Args:
        src (str): Path to the main folder holding the folders you want to combine 
        dst (str, optional): Path where the destination folder is saved. Defaults to None.
    '''
    # helper fxn
    def make_new_folder(folder_name, parent_folder):
        path = os.path.join(parent_folder, folder_name)
        # Create the folder 'new_folder' in parent_folder
        try:
            mode = 0o777            # mode of the folder
            os.mkdir(path, mode)    # Create folder
        except OSError as error:
            print(error)

    # list of folders to be merged
    list_dir = os.listdir(src)

    # enumerate on list_dir to get the content of all the folders and store
    # it in a dictionary
    content_list = {}
    for index, val in enumerate(list_dir):
        path = os.path.join(src, val)
        content_list[ list_dir[index] ] = os.listdir(path)

    # folder name in which all the content will be merged
    merge_folder = src.split('/')[-1] + "merge_folder"
    
    if dst is None:
        # place the merged folder next to the source folder
        merge_folder_path = os.path.join(os.path.dirname(src), merge_folder)
    else:
        merge_folder_path = os.path.join(dst, merge_folder)
    # create merge_folder if not exists
    make_new_folder(merge_folder, src)

    # loop through the list of folders
    for sub_dir in content_list:
        # loop through the contents of the list of folders
        for contents in content_list[sub_dir]:
            # make the path of the content to move
            path_to_content = sub_dir + "/" + contents
            # make the path with the current folder
            dir_to_move = os.path.join(src, path_to_content)
            # rename original file to have subfolder name in filename
            pathNewName = os.path.join(src, sub_dir + "/" + sub_dir + '_' + contents)
            os.rename(dir_to_move, pathNewName)
            # move original file to merge_folder
            shutil.copy(pathNewName, merge_folder_path)



### Download a sample dataset of images, MNIST from github and unzip/organize them.

##### We need to store them as image (jpg) files to simulate input of datasets to the dashboard

In [6]:
if "MNIST" not in os.listdir('.'):
    print("Downloading mnist jpg images from GitHub")
    # Download mnist images from this git repo
    os.system("git clone https://github.com/teavanist/MNIST-JPG.git")
    # Extract the zipfile containing the images
    with ZipFile('MNIST-JPG/MNIST Dataset JPG format.zip', 'r') as zipObj:
        # Extract all the contents of zip file in current directory
        zipObj.extractall()
    os.rename("MNIST Dataset JPG format", "MNIST")
    shutil.rmtree("MNIST-JPG")
    merge_folders("MNIST/MNIST - JPG - testing")
else:
    print("Images Already Downloaded! Skipping.")

Downloading mnist jpg images from GitHub


Cloning into 'MNIST-JPG'...


In [None]:
def extract_features_paths(data_directory):
    class ImageFolderWithPaths(datasets.ImageFolder):
        """Custom dataset that includes image file paths. Extends
        torchvision.datasets.ImageFolder
        Source: https://gist.github.com/andrewjong/6b02ff237533b3b2c554701fb53d5c4d
        """

        # override the __getitem__ method. this is the method that dataloader calls
        def __getitem__(self, index):
            # this is what ImageFolder normally returns
            original_tuple = super(ImageFolderWithPaths, self).__getitem__(index)
            # the image file path
            path = self.imgs[index][0]
            # make a new tuple that includes original and the path
            tuple_with_path = original_tuple + (path,)
            return tuple_with_path

    def pooling_output(x, model):
        for layer_name, layer in model._modules.items():
            x = layer(x)
            if layer_name == "avgpool":
                break
        return x

    transform = transforms.Compose(
        [
            transforms.Resize(size=[224, 224], interpolation=2),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ]
    )

    dataset = ImageFolderWithPaths(
        data_directory, transform=transform
    )  # our custom dataset
    # strip away unnecessary info and store path to each image
    img_paths = [dataset.imgs[i][0] for i in range(len(dataset.imgs))]
    # initialize the dataloaders
    dataloader = DataLoader(dataset)

    DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
    print("[INFO] Using Device:", DEVICE)
    model = models.resnet101(pretrained=True, progress=True)
    print(f"[INFO] Loading model: {model.__class__.__name__}")

    features = []
    print(f"[INFO][STARTED] Feature Extraction using {model.__class__.__name__}")
    model.to(DEVICE)
    with torch.no_grad():
        model.eval()
        for inputs, labels, paths in tqdm(dataloader):
            result = pooling_output(inputs.to(DEVICE), model)
            features.append(result.cpu().view(1, -1).numpy())
            torch.cuda.empty_cache()

    print(f"[INFO][DONE] Feature Extraction using {model.__class__.__name__}")

    features = np.vstack(features)
    print("[DEBUG]: Preview Features:", features[0])
    print("[DEBUG]: Preview Image Paths:", img_paths[0])
    return features, img_paths