In [3]:
import pandas as pd
import torch
from PIL import Image
from transformers import CLIPProcessor, CLIPModel
import hopsworks
import os

proj = hopsworks.login()
fs = proj.get_feature_store()
mr = proj.get_model_registry()


2025-08-23 06:58:18,464 INFO: Python Engine initialized.

Logged in to project, explore it here https://snurran.devnet.hops.works/p/120


In [5]:
name="openaiclip_vit_base_patch32"
model_mr = mr.get_model(name, version=1)
if model_mr is None:
    print("Downloading model from HF")
    model_name = "openai/clip-vit-base-patch32"
    model = CLIPModel.from_pretrained(model_name)
    processor = CLIPProcessor.from_pretrained(model_name)    
    save_dir = "/tmp/clip-vit-base-patch32-local"
    os.makedirs(save_dir, exist_ok=True)
    model.save_pretrained(save_dir)
    processor.save_pretrained(save_dir)
    model_mr =  mr.python.create_model(
        name=name, 
        description="Image vector embedding model from OpenAI",
    )
    model_mr.save(save_dir)
else:
    print("Downloading model from Model Registry")
    model_mr.download(local_path="/tmp")
    local_path = "/tmp"
    model = CLIPModel.from_pretrained(local_path)
    processor = CLIPProcessor.from_pretrained(local_path)

# Move model to GPU (only if available)
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)
print(f"Device: {device}")

Downloading model from Model Registry
Downloading model artifact (0 dirs, 9 files)... DONE

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


Device: cuda


In [None]:
def get_image_embedding(image_path):
    try:
        image = Image.open(image_path).convert("RGB")
        inputs = processor(images=image, return_tensors="pt").to(device)

        with torch.no_grad():
            embedding = model.get_image_features(**inputs)

        # Normalize
        embedding = embedding / embedding.norm(p=2, dim=-1, keepdim=True)

        # Move back to CPU before converting to list
        return embedding.squeeze().cpu().tolist()
    except Exception as e:
        print(f"Error processing {image_path}: {e}")
        return None


In [None]:
fg =  fs.get_feature_group("wider_face_files", version=1)
df = fg.read()
df

In [None]:
df1 = df[['file_path']]

In [None]:
df1.head()

In [None]:
import time
start = time.time()

# # Apply function to each row
df1["embedding"] = df1["file_path"].apply(get_image_embedding)

print(f"Execution time: {time.time() - start:.4f} seconds")

df1.head()

In [None]:
from hopsworks import hsfs
embedding_index = hsfs.embedding.EmbeddingIndex()
embedding_index.add_embedding("embedding", dimension=model.config.projection_dim, model=model_mr)

# Create or get feature group
fg = fs.get_or_create_feature_group(
    name="image_embeddings",
    version=1,
    primary_key=["file_path"],
    online_enabled=True,
    description="image embeddings",
    embedding_index=embedding_index,    
)


In [None]:
fg.insert(df1)

In [1]:
%%writefile predict_similar_images.py

import torch
from PIL import Image
from transformers import CLIPProcessor, CLIPModel
import hopsworks
import base64
import io
import os
import random
from typing import Dict
import kserve

def get_image_embedding(image: Image.Image, processor, model):
    try:
        inputs = processor(images=image, return_tensors="pt").to(device)
        with torch.no_grad():
            embedding = model.get_image_features(**inputs)
        embedding = embedding / embedding.norm(p=2, dim=-1, keepdim=True)
        return embedding.squeeze().cpu().tolist()
    except Exception as e:
        print(f"Error computing embedding: {e}")
        return None


class ImageEmbeddingPredictor(kserve.Model):
    def __init__(self):
        super().__init__(name)
        self.image_dir = "/hopsfs/Jupyter/yolov8-face/data/widerface/train"
        self.ready = True
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.fs = hopsworks.login().get_feature_store()
        self.fg = fs.get_feature_group("image_embeddings", version=1)
        self.model = CLIPModel.from_pretrained(os.environ["MODEL_FILES_PATH"])
        self.processor = CLIPProcessor.from_pretrained(os.environ["MODEL_FILES_PATH"])
        self.model_mr = self.mr.get_model("openaiclip_vit_base_patch32", version=1)
        print("Initialization Complete")
    

    def predict(self, request: Dict, headers: Dict[str, str] = None) -> Dict:
        try:
            # Get base64 image string from request
            b64_image = request["image"]
            image_bytes = base64.b64decode(b64_image)
            image = Image.open(io.BytesIO(image_bytes)).convert("RGB")

            # Compute embedding
            embedding = get_image_embedding(image, self.processor, self.model)

            results = self.fg.find_neighbors( embedding, k=3)

            for result in results:
                print(result[1][0])
                img = Image.open(result[1][0])
                display(img)
            
            returned_images = []
            returned_files = []
            for result in results:
                with open(result[1][0], "rb") as f:
                    encoded = base64.b64encode(f.read()).decode("utf-8")
                    returned_images.append(encoded)
                    returned_files.append(result[1][0])
            return {
                "file_names": returned_files,
                "images": returned_images,
            }

        except Exception as e:
            return {"error": str(e)}

Overwriting predict_similar_images.py


In [6]:
# Get the dataset API from the project
dataset_api = proj.get_dataset_api()

# Specify the file to upload ("predict_example.py") to the "Models" directory, and allow overwriting
uploaded_file_path = dataset_api.upload("predict_similar_images.py", model_mr.model_files_path, overwrite=True)

# Construct the full path to the uploaded predictor script
predictor_script_path = os.path.join("/Projects", proj.name, uploaded_file_path)

# Deploy the fraud model
deployment = model_mr.deploy(
    name="similarimages",
    script_file=predictor_script_path, 
)

Uploading /hopsfs/Jupyter/yolov8-face/predict_similar_images.py: 0.000%|          | 0/2510 elapsed<00:00 remai…

Deployment with the same name already exists. Getting existing deployment...
To create a new deployment choose a different name.


In [None]:
deployment

In [None]:
import streamlit as st
import requests
import base64
import io
from PIL import Image

# Set your KServe endpoint here
KSERVE_URL = "http://<KSERVE_ENDPOINT>/v1/models/image-embedder:predict"

st.title("🔍 Similar Image Finder")

uploaded_file = st.file_uploader("Upload an image", type=["jpg", "jpeg", "png"])

if uploaded_file is not None:
    # Show the uploaded image
    image = Image.open(uploaded_file).convert("RGB")
    st.image(image, caption="Uploaded Image", use_column_width=True)

    # Convert image to base64
    buffered = io.BytesIO()
    image.save(buffered, format="PNG")
    b64_image = base64.b64encode(buffered.getvalue()).decode("utf-8")

    if st.button("Find Similar Images"):
        with st.spinner("Querying KServe model..."):
            payload = {"image": b64_image}
            response = requests.post(KSERVE_URL, json=payload)

            if response.status_code == 200:
                result = response.json()
                if "error" in result:
                    st.error(result["error"])
                else:
                    st.success("Found similar images:")
                    for fname, b64_img in zip(result.get("file_names", []), result.get("images", [])):
                        img_bytes = base64.b64decode(b64_img)
                        sim_img = Image.open(io.BytesIO(img_bytes))
                        st.image(sim_img, caption=fname, use_column_width=True)
            else:
                st.error(f"Request failed: {response.status_code}")
