In [None]:
import json
import os
import numpy as np
from numpy.linalg import norm
import cv2
from umap import UMAP
from sklearn.preprocessing import MinMaxScaler
import pandas as pd
from tqdm import tqdm

import requests
from PIL import Image
from IPython.display import display

In [None]:
def get_prediction_guard_api_key():
    load_env()
    PREDICTION_GUARD_API_KEY = os.getenv("PREDICTION_GUARD_API_KEY", None)
    if PREDICTION_GUARD_API_KEY is None:
        PREDICTION_GUARD_API_KEY = input("Please enter your Prediction Guard API Key: ")
    return PREDICTION_GUARD_API_KEY


In [None]:
PREDICTION_GUARD_URL_ENDPOINT = os.getenv("DLAI_PREDICTION_GUARD_URL_ENDPOINT", "https://dl-itdc.predictionguard.com") ###"https://proxy-dl-itdc.predictionguard.com"

In [None]:
# helper function to compute the joint embedding of a prompt and a base64-encoded image through PredictionGuard
def bt_embedding_from_prediction_guard(prompt, base64_image):
    # get PredictionGuard client
    client = _getPredictionGuardClient()
    message = {"text": prompt,}
    if base64_image is not None and base64_image != "":
        if not isBase64(base64_image): 
            raise TypeError("image input must be in base64 encoding!")
        message['image'] = base64_image
    response = client.embeddings.create(
        model="bridgetower-large-itm-mlm-itc",
        input=[message]
    )
    return response['data'][0]['embedding']

In [None]:
# encoding image at given path or PIL Image using base64
def encode_image(image_path_or_PIL_img):
    if isinstance(image_path_or_PIL_img, PIL.Image.Image):
        # this is a PIL image
        buffered = BytesIO()
        image_path_or_PIL_img.save(buffered, format="JPEG")
        return base64.b64encode(buffered.getvalue()).decode('utf-8')
    else:
        # this is a image_path
        with open(image_path_or_PIL_img, "rb") as image_file:
            return base64.b64encode(image_file.read()).decode('utf-8')

In [None]:
# You can use your own uploaded images and captions. 
# You will be responsible for the legal use of images that 
#  you are going to use.

url1='http://farm3.staticflickr.com/2519/4126738647_cc436c111b_z.jpg'
cap1='A motorcycle sits parked across from a herd of livestock'

url2='http://farm3.staticflickr.com/2046/2003879022_1b4b466d1d_z.jpg'
cap2='Motorcycle on a platform to be worked on in garage'

url3='http://farm1.staticflickr.com/133/356148800_9bf03b6116_z.jpg'
cap3='a cat lying down stretched out near a laptop'

img1 = {
  'flickr_url': url1,
  'caption': cap1,
  'image_path' : './shared_data/motorcycle_1.jpg'
}

img2 = {
    'flickr_url': url2,
    'caption': cap2,
    'image_path' : './shared_data/motorcycle_2.jpg'
}

img3 = {
    'flickr_url' : url3,
    'caption': cap3,
    'image_path' : './shared_data/cat_1.jpg'
}

# download images
imgs = [img1, img2, img3]
for img in imgs:
    data = requests.get(img['flickr_url']).content
    with open(img['image_path'], 'wb') as f:
        f.write(data)

for img in [img1, img2, img3]:
    image = Image.open(img['image_path'])
    caption = img['caption']
    display(image)
    display(caption)
    print()

In [None]:
embeddings = []
for img in [img1, img2, img3]:
    img_path = img['image_path']
    caption = img['caption']
    base64_img = encode_image(img_path)
    embedding = bt_embeddings(caption, base64_img)
    embeddings.append(embedding)

In [None]:
# Each image-text pair is now converted into multimodal 
# embedding vector which has dimensions of 512.

print(len(embeddings[0]))

In [None]:
def cosine_similarity(vec1, vec2):
    similarity = np.dot(vec1,vec2)/(norm(vec1)*norm(vec2))
    return similarity



In [None]:
ex1_embed = np.array(embeddings[0])
ex2_embed = np.array(embeddings[1])
ex3_embed = np.array(embeddings[2])
sim_ex1_ex2 = cosine_similarity(ex1_embed, ex2_embed)
sim_ex2_ex3 = cosine_similarity(ex2_embed, ex3_embed)
sim_ex1_ex3 = cosine_similarity(ex1_embed, ex3_embed)

In [None]:
print("Cosine similarity between ex1_embeded and ex2_embeded is:")
display(sim_ex1_ex2)
print("Cosine similarity between ex2_embeded and ex3_embeded is:")
display(sim_ex2_ex3)
print("Cosine similarity between ex1_embeded and ex3_embeded is:")
display(sim_ex1_ex3)

In [None]:
# function helps to prepare list image-text pairs from the first [test_size] data of a Huggingface dataset
def prepare_dataset_for_umap_visualization(hf_dataset, class_name, templates=templates, test_size=1000):
    # load Huggingface dataset (download if needed)
    dataset = load_dataset(hf_dataset, trust_remote_code=True)
    # split dataset with specific test_size
    train_test_dataset = dataset['train'].train_test_split(test_size=test_size)
    # get the test dataset
    test_dataset = train_test_dataset['test']
    img_txt_pairs = []
    for i in range(len(test_dataset)):
        img_txt_pairs.append({
            'caption' : templates[random.randint(0, len(templates)-1)].format(class_name),
            'pil_img' : test_dataset[i]['image']
        })
    return img_txt_pairs

In [None]:
# for the first 50 data of Huggingface dataset 
#  "yashikota/cat-image-dataset"
cat_img_txt_pairs = data_prep("yashikota/cat-image-dataset", 
                             "cat", test_size=50)

# for the first 50 data of Huggingface dataset 
#  "tanganke/stanford_cars"
car_img_txt_pairs = data_prep("tanganke/stanford_cars", 
                             "car", test_size=50)

In [None]:
# display an example of a cat image-text pair data
display(cat_img_txt_pairs[0]['caption'])
display(cat_img_txt_pairs[0]['pil_img'])

# display an example of a car image-text pair data
display(car_img_txt_pairs[0]['caption'])
display(car_img_txt_pairs[0]['pil_img'])

In [None]:
# compute BridgeTower embeddings for cat image-text pairs
cat_embeddings = []
for img_txt_pair in tqdm(
                        cat_img_txt_pairs, 
                        total=len(cat_img_txt_pairs)
                    ):
    pil_img = img_txt_pair['pil_img']
    caption = img_txt_pair['caption']
    base64_img = encode_image(pil_img)
    embedding = bt_embeddings(caption, base64_img)
    cat_embeddings.append(embedding)

# compute BridgeTower embeddings for car image-text pairs
car_embeddings = []
for img_txt_pair in tqdm(
                        car_img_txt_pairs, 
                        total=len(car_img_txt_pairs)
                    ):
    pil_img = img_txt_pair['pil_img']
    caption = img_txt_pair['caption']
    base64_img = encode_image(pil_img)
    embedding = bt_embeddings(caption, base64_img)
    car_embeddings.append(embedding)

In [None]:
# function transforms high-dimension vectors to 2D vectors using UMAP
def dimensionality_reduction(embed_arr, label):
    X_scaled = MinMaxScaler().fit_transform(embed_arr)
    print(X_scaled)
    mapper = UMAP(n_components=2, metric="cosine").fit(X_scaled)
    df_emb = pd.DataFrame(mapper.embedding_, columns=["X", "Y"])
    df_emb["label"] = label
    print(df_emb)
    return df_emb

In [None]:
# stacking embeddings of cat and car examples into one numpy array
all_embeddings = np.concatenate([cat_embeddings, car_embeddings])

# prepare labels 
labels = ['cat'] * len(cat_embeddings) + ['car'] * len(car_embeddings)

# compute dimensionality reduction 
reduced_dim_emb = dimensionality_reduction(all_embeddings, labels)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Plot the centroids against the cluster
fig, ax = plt.subplots(figsize=(8,6)) # Set figsize

sns.set_style("whitegrid", {'axes.grid' : False})
sns.scatterplot(data=reduced_dim_emb, 
                x=reduced_dim_emb['X'], 
                y=reduced_dim_emb['Y'], 
                hue='label', 
                palette='bright')
sns.move_legend(ax, "upper left", bbox_to_anchor=(1, 1))
plt.title('Scatter plot of images of cats and cars using UMAP')
plt.xlabel('X')
plt.ylabel('Y')
plt.show()