In [None]:
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import cohere
from groq import Groq

import tqdm
import time
import requests
import PIL
import io

import base64

import os
from dotenv import load_dotenv

load_dotenv()

In [None]:
image_paths = ["https://substackcdn.com/image/fetch/w_1456,c_limit,f_webp,q_auto:good,fl_progressive:steep/https%3A%2F%2Fsubstack-post-media.s3.amazonaws.com%2Fpublic%2Fimages%2Fa5cd33ba-ae1a-42a8-a254-d85e690d9870_2741x1541.png"]
image_folder_path = os.path.join(os.getcwd(), "img")
os.makedirs(image_folder_path, exist_ok=True)

In [None]:
local_image_paths = []

for url in tqdm.tqdm(image_paths):
    img_path = os.path.join(image_folder_path, f"1.{url.split(".")[-1]}")
    if not os.path.exists(img_path):
        response = requests.get(url, timeout=10)  # timeout is a good practice
        response.raise_for_status()
        with open(img_path, "wb") as fOut:
            fOut.write(response.content)
        print(f"Downloaded: {img_path}")
    else:
        print(f"Already exists: {img_path}")
    
    local_image_paths.append(img_path)


In [None]:
import cohere
cohere_key = os.getenv("COHERE_API_KEY")
print(f"{cohere_key[:5]}...{cohere_key[-5:]}")
co_client = cohere.ClientV2(api_key=cohere_key)

In [None]:
# Resize too large images
def resize_image(pil_image) -> Image.Image:
    max_pixels = 1568*1568  #Max resolution for images
    org_width, org_height = pil_image.size

    # Resize image if too large
    if org_width * org_height > max_pixels:
        scale_factor = (max_pixels / (org_width * org_height)) ** 0.5
        new_width = int(org_width * scale_factor)
        new_height = int(org_height * scale_factor)
        # resize is safer than thumbnail. Thumbnail updates the image inplace
        pil_image = pil_image.resize((new_width, new_height)) # pil_image.thumbnail((new_width, new_height))
        

    return pil_image

# Convert images to a base64 string before sending it to the API
def base64_from_image(img_path):
    """Convert the image at path to Base64 Image"""

    # Check if the image is a path or PIL image
    if isinstance(img_path, str): # If the image is path
        pil_image = PIL.Image.open(img_path)
        img_format = pil_image.format if pil_image.format else "PNG"
    else:
         pil_image = img_path


    # Resize the image to max_pixel
    pil_image = resize_image(pil_image)

    # Read the image as buffer and convert to base64
    with io.BytesIO() as img_buffer:
        pil_image.save(img_buffer, format=img_format)
        img_buffer.seek(0)
        img_data = f"data:image/{img_format.lower()};base64,"+base64.b64encode(img_buffer.read()).decode("utf-8")

    return img_data

In [None]:
def embed_image_with_cohere(co, img_path):
    """Embed Image using the cohere vision model"""
    # Get the base64 representation of the image
    print(img_path)
    base64_uri = base64_from_image(img_path)
    api_input_document = {
        "content": [
            {"type": "image", "image": base64_uri },
        ]
    }
    
    resp = co.embed(
        model="embed-v4.0",
        input_type="search_document",
        embedding_types=["float"],
        inputs=[api_input_document],
    )

    return {"embedding":resp, "base64":base64_uri}

In [None]:
image_embeddings = [embed_image_with_cohere(co=co_client, img_path=img_path) for img_path in local_image_paths]

In [None]:
len(image_embeddings[0]["embedding"].embeddings.float[0])

In [None]:
groq_client = Groq(api_key=os.getenv("GROQ_API_KEY"))

In [None]:
# 2. Define the multimodal content for the API call
content = [
    {
        "type": "text", 
        "text": "Describe this image in detail." # Your text prompt
    },
    {
        "type": "image_url",
        "image_url": {
            # The data URI contains the image data and its type
            "url": image_embeddings[0]["base64"] 
        }
    }
]

chat_completion = groq_client.chat.completions.create(
    model="meta-llama/llama-4-scout-17b-16e-instruct",
    messages=[
                {
                    "role": "user",
                    "content": content
                }
            ]
)

In [None]:
def print_llm_response(response: str):
    from rich.console import Console
    from rich.markdown import Markdown
    from rich.json import JSON

    console = Console()
    try:
        console.print(JSON(response))
    except Exception:
        console.print(Markdown(response))

# --- Step 8: Display the response ---
response_text = chat_completion.choices[0].message.content
print_llm_response(f"\nResponse:\n{response_text}\n")

In [None]:
# Several images from https://www.appeconomyinsights.com/
images = {
    "tesla.png": "https://substackcdn.com/image/fetch/w_1456,c_limit,f_webp,q_auto:good,fl_progressive:steep/https%3A%2F%2Fsubstack-post-media.s3.amazonaws.com%2Fpublic%2Fimages%2Fbef936e6-3efa-43b3-88d7-7ec620cdb33b_2744x1539.png",
    "netflix.png": "https://substackcdn.com/image/fetch/w_1456,c_limit,f_webp,q_auto:good,fl_progressive:steep/https%3A%2F%2Fsubstack-post-media.s3.amazonaws.com%2Fpublic%2Fimages%2F23bd84c9-5b62-4526-b467-3088e27e4193_2744x1539.png",
    "nike.png": "https://substackcdn.com/image/fetch/w_1456,c_limit,f_webp,q_auto:good,fl_progressive:steep/https%3A%2F%2Fsubstack-post-media.s3.amazonaws.com%2Fpublic%2Fimages%2Fa5cd33ba-ae1a-42a8-a254-d85e690d9870_2741x1541.png",
    "google.png": "https://substackcdn.com/image/fetch/f_auto,q_auto:good,fl_progressive:steep/https%3A%2F%2Fsubstack-post-media.s3.amazonaws.com%2Fpublic%2Fimages%2F395dd3b9-b38e-4d1f-91bc-d37b642ee920_2741x1541.png",
    "accenture.png": "https://substackcdn.com/image/fetch/w_1456,c_limit,f_webp,q_auto:good,fl_progressive:steep/https%3A%2F%2Fsubstack-post-media.s3.amazonaws.com%2Fpublic%2Fimages%2F08b2227c-7dc8-49f7-b3c5-13cab5443ba6_2741x1541.png",
    "tecent.png": "https://substackcdn.com/image/fetch/w_1456,c_limit,f_webp,q_auto:good,fl_progressive:steep/https%3A%2F%2Fsubstack-post-media.s3.amazonaws.com%2Fpublic%2Fimages%2F0ec8448c-c4d1-4aab-a8e9-2ddebe0c95fd_2741x1541.png"
}

In [None]:
import faiss
from chromadb import PersistentClient
import numpy as np
import matplotlib.pyplot as plt

In [None]:
def download_images(image_paths):
    local_image_paths = []
    for name, url in tqdm.tqdm(image_paths.items()):
        img_path = os.path.join(image_folder_path, f"{name}")
        if not os.path.exists(img_path):
            response = requests.get(url, timeout=10)  # timeout is a good practice
            response.raise_for_status()
            with open(img_path, "wb") as fOut:
                fOut.write(response.content)
            print(f"Downloaded: {img_path}")
        else:
            print(f"Already exists: {img_path}")
        
        local_image_paths.append({"name" : name, "path" : img_path})
    
    return local_image_paths

In [None]:
embeddings_image = []
documents = []
ids = []
metadata = []

local_image_paths = download_images(image_paths=images)

for local_image in local_image_paths:
    embedding_with_base64 = embed_image_with_cohere(co=co_client, img_path=local_image["path"])

    embeddings_image.append(embedding_with_base64["embedding"].embeddings.float[0])
    documents.append(embedding_with_base64["base64"])
    ids.append(local_image["name"])
    metadata.append({"type" : "image"})

In [None]:
print(ids)
embeddings_image = np.array(embeddings_image, dtype="float32")

print("💾 Building FAISS HNSW index...")
dim = embeddings_image.shape[1]
index = faiss.IndexHNSWFlat(dim, 32)
index.hnsw.efConstruction = 40
index.add(embeddings_image)
faiss.write_index(index, "../Store/image_db/images_faiss_hnsw.index")

print("💾 Storing embeddings in ChromaDB...")
chroma_client = PersistentClient(path="../Store/image_db/image_chroma_store")
collection = chroma_client.get_or_create_collection("image_store")
collection.add(
embeddings=embeddings_image.tolist(),
metadatas=metadata,
documents=documents,
ids=ids)

In [None]:
def load_or_create_stores(faiss_path="../Store/image_db/images_faiss_hnsw.index", chroma_path="../Store/image_db/image_chroma_store"):
    """
    Load existing FAISS and Chroma stores if available, otherwise return None placeholders.
    """
    faiss_index = None
    chroma_client = None
    collection = None

    if os.path.exists(faiss_path):
        print(f"📂 Loading existing FAISS index from {faiss_path}")
        faiss_index = faiss.read_index(faiss_path)
    else:
        print("⚠️ No FAISS index found, will build a new one.")

    if os.path.exists(chroma_path) and any(os.scandir(chroma_path)):
        print(f"📂 Loading existing ChromaDB store from {chroma_path}")
        chroma_client = PersistentClient(path=chroma_path)
        collection = chroma_client.get_or_create_collection("image_store")
    else:
        print("⚠️ No Chroma store found, will build a new one.")
        chroma_client = PersistentClient(path=chroma_path)
        collection = chroma_client.get_or_create_collection("image_store")

    return faiss_index, chroma_client, collection

faiss_index, chroma_client, collection =  load_or_create_stores()

In [None]:
def embed_text_with_cohere(co, text):
    """Embed Text using the cohere vision model"""
    resp = co.embed(
        model="embed-v4.0",
        input_type="search_document",
        texts=[text]
    )
    return resp.embeddings.float[0]

In [None]:
top_k = 1
def search_in_db(query_emb):
    distances, indices = faiss_index.search(query_emb, top_k)
    FAISS_ids = [str(i) for i in indices[0] if i > -1]
    chromdb_ids = [ids[int(id)] for id in FAISS_ids]    
    results = collection.get(ids=chromdb_ids)

    for i, meta in enumerate(results["metadatas"]):
        item_type = meta["type"]
        print(f"{i+1}. {item_type} (Distance: {distances[0][i]:.4f})")

        if item_type == "text":
            print(results["documents"][i][:300], "...")
            # pass
        elif item_type == "image":
            # Show the stored image
            img_bytes = base64.b64decode(results["documents"][i].split(",")[1])
            img = Image.open(io.BytesIO(img_bytes))
            plt.imshow(img)
            plt.axis("off")
            plt.show()
    
    return results

In [None]:
query = "What is the revenue of tesla?"
query_emb = np.array([embed_text_with_cohere(co_client, query)], dtype="float32")

In [None]:
search_in_db(query_emb=query_emb)

In [None]:
# 2. Define the multimodal content for the API call
content = [
    {
        "type": "text", 
        "text": query # Your text prompt
    },
    {
        "type": "image_url",
        "image_url": {
            # The data URI contains the image data and its type
            "url": image_embeddings[0]["base64"] 
        }
    }
]

chat_completion = groq_client.chat.completions.create(
    model="meta-llama/llama-4-scout-17b-16e-instruct",
    messages=[
                {
                    "role": "user",
                    "content": content
                }
            ]
)

In [None]:
response_text = chat_completion.choices[0].message.content
print_llm_response(f"\nResponse:\n{response_text}\n")