In [None]:
import numpy as np
from PIL import Image
import requests

from transformers import CLIPProcessor, CLIPModel

# model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to("cuda")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

In [None]:
url = "http://images.cocodataset.org/val2017/000000039769.jpg"
image = Image.open(requests.get(url, stream=True).raw)

In [None]:
inputs = processor(text=["a photo of a cat", "a photo of a dog"], images=image, return_tensors="pt", padding=True)
print(inputs)

In [ ]:
outputs = model(**inputs)
logits_per_image = outputs.logits_per_image  # this is the image-text similarity score

In [ ]:
probs = logits_per_image.softmax(dim=1)  # we can take the softmax to get the label probabilities

In [None]:
from PIL import Image
import torch
import requests
from transformers import AutoProcessor, CLIPModel
device = "cuda" if torch.cuda.is_available() else "cpu"
model = CLIPModel.from_pretrained("openai/clip-vit-large-patch14").to()

# # Get the text features
# tokenizer = AutoTokenizer.from_pretrained("openai/clip-vit-large-patch14")
# 
# inputs = tokenizer(["a photo of a cat", "a photo of a dog"], padding=True, return_tensors="pt")
# text_features = model.get_text_features(**inputs)
# 
# print(text_features.shape) # output shape of text features

# Get the image features
model_ckpt = "openai/clip-vit-large-patch14"
processor = AutoProcessor.from_pretrained(model_ckpt)
# extractor = AutoFeatureExtractor.from_pretrained(model_ckpt)
# model = AutoModel.from_pretrained(model_ckpt)

In [None]:
def get_embeddings(image):
    inputs = processor(images=image, return_tensors="pt")
    image_features = model.get_image_features(**inputs)
    return image_features

In [None]:
def compute_scores(emb_one, emb_two):
    """Computes cosine similarity between two vectors."""
    scores = torch.nn.functional.cosine_similarity(emb_one, emb_two)
    # print(scores.data.numpy())
    return scores.data.numpy().tolist()

In [None]:
compute_scores(get_embeddings(Image.open("data/ex1.jpg")), get_embeddings(Image.open("data/ex2.jpg")))

# Pipeline

In [None]:
import torchvision.transforms as T


# Data transformation chain.
transformation_chain = T.Compose(
    [
        # We first resize the input image to 256x256 and then we take center crop.
        T.Resize((256, 256)),
        # T.CenterCrop(extractor.size["height"]),
        # T.ToTensor(),
        # T.Normalize(mean=extractor.image_mean, std=extractor.image_std),
    ]
)

In [None]:
import shutil
import os

dir_pth = "data/test"

for img_dir in os.listdir(dir_pth):
    for img in os.listdir(os.path.join(dir_pth, img_dir)):
        shutil.move(os.path.join(dir_pth, img_dir, img), os.path.join(dir_pth, img))
    os.rmdir(os.path.join(dir_pth, img_dir))
        

In [None]:
dir_pth = "data/my_dataset"
image_file_path = []
image = []
labels = []
for img_dir in os.listdir(dir_pth):
    for img in os.listdir(os.path.join(dir_pth, img_dir)):
        pp = os.path.join(dir_pth, img_dir, img)
        image_file_path += [pp]
        image += [Image.open(pp)]
        labels += [f"folder_{str(img_dir)}_img_{img}"]
    
res = {'image_file_path': image_file_path,
       'image': image,
       'labels': labels}

In [None]:
from datasets import Dataset
dataset = Dataset.from_dict(res)

In [None]:
dataset.features

In [None]:
# from datasets import load_dataset
# 
# dataset = load_dataset("beans")
# dataset["train"].features

In [None]:
len(dataset)

In [None]:
num_samples = len(dataset)
seed = 42
batch_size = 16
candidate_subset = dataset.shuffle(seed=seed).select(range(num_samples))

In [None]:
import torch

def extract_embeddings(model: torch.nn.Module):
    """Utility to compute embeddings."""
    device = model.device

    def pp(batch):
        images = batch["image"]
        # `transformation_chain` is a compostion of preprocessing
        # transformations we apply to the input images to prepare them
        # for the model. For more details, check out the accompanying Colab Notebook.
        images = [transformation_chain(image) for image in images]
        # print(image_batch_transformed)
        # new_batch = {"pixel_values": image_batch_transformed.to(device)}
        with torch.no_grad():
            embeddings = get_embeddings(images)
        return {"embeddings": embeddings}

    return pp

In [None]:
extract_fn = extract_embeddings(model)

In [None]:
candidate_subset_emb = candidate_subset.map(extract_fn, batched=True, batch_size=batch_size)

In [None]:
all_candidate_embeddings = np.array(candidate_subset_emb["embeddings"])
all_candidate_embeddings = torch.from_numpy(all_candidate_embeddings)

In [None]:
from tqdm.auto import tqdm

candidate_ids = []

for id in tqdm(range(len(candidate_subset_emb))):
    label = candidate_subset_emb[id]["labels"]

    # Create a unique indentifier.
    entry = str(id) + "_" + str(label)

    candidate_ids.append(entry)

In [None]:
list(filter(lambda x: "folder_1" in x and "img_4192145" in x, candidate_ids))

In [None]:
def fetch_similar(image, top_k=5):
    """Fetches the `top_k` similar images with `image` as the query."""
    # Prepare the input query image for embedding computation.
    # image_transformed = transformation_chain(image).unsqueeze(0)

    # Comute the embedding.
    with torch.no_grad():
        query_embeddings = get_embeddings(image)

    # Compute similarity scores with all the candidate images at one go.
    # We also create a mapping between the candidate image identifiers
    # and their similarity scores with the query image.
    sim_scores = compute_scores(all_candidate_embeddings, query_embeddings)
    similarity_mapping = dict(zip(candidate_ids, sim_scores))

    # Sort the mapping dictionary and return `top_k` candidates.
    similarity_mapping_sorted = dict(
        sorted(similarity_mapping.items(), key=lambda x: x[1], reverse=True)
    )
    id_entries = list(similarity_mapping_sorted.keys())[:top_k]

    ids = list(map(lambda x: int(x.split("_")[0]), id_entries))
    labels = list(map(lambda x: x, id_entries))
    return ids, labels

In [None]:
dataset

In [None]:
test_idx = 118 # np.random.choice(len(dataset))
test_sample = candidate_subset_emb[test_idx]["image"]
test_label = candidate_subset_emb["labels"]

k = 10
sim_ids, sim_labels = fetch_similar(test_sample, top_k=k)
print(f"Query label: {test_label}")
print(f"Top {k} candidate labels: {sim_labels}")

In [None]:
for i, pp in enumerate(os.listdir("data/my_dataset")):
    os.rename(os.path.join("data/my_dataset", pp), os.path.join("data/my_dataset", str(i)))

In [None]:
paths = list(map(lambda x: get_path_img("data/my_dataset", x), sim_labels))
print(paths)

In [None]:
def get_path_img(dir_name, img):
    splitted = img.split("_")
    fold = splitted[2]
    img_name = splitted[4]
    return os.path.join(dir_name, fold, img_name)

In [None]:
from PIL import Image

images = [Image.open(x) for x in paths]
widths, heights = zip(*(i.size for i in images))

total_width = sum(widths)
max_height = max(heights)

new_im = Image.new('RGB', (total_width, max_height))

x_offset = 0
for im in images:
    new_im.paste(im, (x_offset,0))
    x_offset += im.size[0]

new_im.save('test.jpg')