### Introduction
This project demonstrates identification of food items from a food tray using open-source zero shot object detection models without finetuning, open-source embeddings models without finetuning and FAISS as the vector database. All models run locally on an Intel Core platform.

Core pipeline consists of:
1. Metadata and Image extraction from menu PDF document
2. FastSAM for zero shot object detection
3. Custom filter function to reduce false positives in FastSAM
4. OpenVINO for optimized model inference on Intel platforms
5. CLIP model for main embeddings
6. Custom Augmentation function to increase embeddings due to less data
7. Image identification using CLIP and FAISS (open source models without finetuning)
8. Synthesis of vector DB retrieved data using an LVM (MiniCPM)

### Requirements

In [None]:
%pip install -q faiss-cpu Pillow torch torchvision transformers supervision --extra-index-url https://download.pytorch.org/whl/cpu
%pip install -q ultralytics pdfplumber
%pip install -q git+https://github.com/openai/CLIP.git
%pip install -q ipywidgets
%pip install -q openvino openvino-genai openvino-tokenizers nncf
%pip install -q timm sentencepiece peft
%pip install -q "git+https://github.com/huggingface/optimum-intel.git" --extra-index-url https://download.pytorch.org/whl/cpu

### Extract metadata, images from PDF

This section extracts all the images from Product List PDF file into individual images. These are used an main embeddings for the image search. All associated metadata is also extracted to create a `product_data.json` file which is needed to map search results with its associated product info.

In [None]:
%%time
import json
import pdfplumber
import os

data = {}

pdf = pdfplumber.open("product_list.pdf")

if not os.path.exists("extracted"):
    os.makedirs("extracted")

for i, page in enumerate(pdf.pages):
    if i == 0:
        continue
        
    item_key = i - 1
    data[item_key] = {"metadata": {"product_names": [], "product_codes": []}}
    table = page.extract_table()

    for j, image in enumerate(page.images):
        image_data = image["stream"].get_data()
        image_path = f"extracted/page_{i+1}_image-{i-1}.jpg"
        with open(image_path, "wb") as handle:
            handle.write(image_data)
        data[item_key]["image_path"] = image_path

    if table is not None:
        for row in table:
            if row and item_key != 42:
                data[item_key]["metadata"]["product_codes"].append(row[0])
                data[item_key]["metadata"]["product_names"].append(row[1])
            if row and item_key == 42:
                data[item_key]["metadata"]["product_codes"].append(row[1])
                data[item_key]["metadata"]["product_names"].append(row[2])

# print(data)

with open("product_data.json", "w", encoding="utf-8") as handle:
    json.dump(data, handle, indent=4)

### FastSAM for ROI

We use Fast SAM for zero shot object detection since it combines the benefits of YOLO and SAM. Model is converted to openVINO IR reperesentation so that it can run efficiently on an Intel GPU.

In [None]:
%%time
import ipywidgets as widgets
from pathlib import Path

import openvino as ov
import torch
from PIL import Image
from ultralytics import FastSAM

model_name = "FastSAM-x"
model = FastSAM(model_name)

In [None]:
import numpy as np

def filter_boxes(all_boxes, shape, max_area_thr_percent=35, min_area_thr_percent=1.5, min_wh_thr_percent=9, max_wh_thr_percent=80):
    """
    This function filters bounding boxes from FastSAM's output. There are multiple extraneous boxes which interefe duing the image search. The threshold
    parameters can be changed as per the dataset. 
    """
    bboxes = np.array(all_boxes.data)
    x1, y1, x2, y2, conf, _ = bboxes[:, 0], bboxes[:, 1], bboxes[:, 2], bboxes[:, 3], bboxes[:, 4], bboxes[:, 5]
    w, h = x2 - x1, y2 - y1
    
    area = w * h
    #print(w, h, area)
    minimum = (min_area_thr_percent / 100) * (shape[0] * shape[1])
    maximum = (max_area_thr_percent / 100) * (shape[0] * shape[1]) 
    
    valid = (area > minimum) & \
            (area < maximum) & (h < (max_wh_thr_percent / 100) * shape[0]) & (w < (max_wh_thr_percent / 100) * shape[1]) & (w > (min_wh_thr_percent / 100) * shape[1]) & (h > (min_wh_thr_percent / 100) * shape[0])
    all_boxes.data = bboxes[valid]
    
    return all_boxes

Run sample image using FastSAM to initialize model. Notice the time it takes for inference without openVINO

In [None]:
%%time
import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)

import numpy as np
import matplotlib.pyplot as plt
import cv2
from PIL import Image

image_uri = Path("img/image_20250224_135438.jpg")

results = model(image_uri, device="cpu", conf=0.65, iou=0.3)
shape = results[0].orig_shape
bboxes = results[0].boxes

print(f"Total num of boxes: {len(bboxes)} with model shape {shape}")
filtered_boxes = filter_boxes(bboxes, shape)

print(f"Total num of boxes after filter: {len(filtered_boxes)}")
results[0].update(boxes=filtered_boxes.data)

Image.fromarray(results[0].plot(masks=False)[..., ::-1])

### OpenVINO format conversion

In [None]:
%%time

### Reference: https://github.com/openvinotoolkit/openvino_notebooks/blob/latest/notebooks/fast-segment-anything/fast-segment-anything.ipynb

device = "GPU"
ov_model_path = Path(f"{model_name}_openvino_model/{model_name}.xml")
if not ov_model_path.exists():
    ov_model = model.export(format="openvino", dynamic=False, half=False)

class OVWrapper:
    def __init__(self, ov_model, device="CPU", stride=32, ov_config=None) -> None:
        ov_config = ov_config or {}
        self.model = core.compile_model(ov_model, device, ov_config)

        self.stride = stride
        self.pt = False
        self.fp16 = False
        self.names = {0: "object"}

    def __call__(self, im, **_):
        result = self.model(im)
        return torch.from_numpy(result[0]), torch.from_numpy(result[1])

ov_config = {}
core = ov.Core()
if "GPU" in device or ("AUTO" in device and "GPU" in core.available_devices):
    ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"}

In [None]:
ov_model_path = Path(f"{model_name}_openvino_model/{model_name}.xml")
print(device)
wrapped_model = OVWrapper(
    ov_model_path,
    device=device,
    stride=model.predictor.model.stride,
    ov_config=ov_config,
)
model.predictor.model = wrapped_model

Now run the input image on the openVINO optimized model to see the inference speed difference

In [None]:
%%time
image_uri = Path("img/image_20250224_135438.jpg")
print(device)

ov_results = model(image_uri, device=device, conf=0.65, iou=0.3)

shape = ov_results[0].orig_shape
bboxes = ov_results[0].boxes

print(f"Total num of boxes: {len(bboxes)} with model shape {shape}")
filtered_boxes = filter_boxes(bboxes, shape)

print(f"Total num of boxes after filter: {len(filtered_boxes)}")
ov_results[0].update(boxes=filtered_boxes.data)

Image.fromarray(ov_results[0].plot(masks=False)[..., ::-1])

### Run OD on all sample inference images

The confidence threshold, IoU threshold can be configured as per dataset requirements.

In [None]:
import math 
import matplotlib.pyplot as plt
import os
import torch

img_folder = "img"
ref_paths = [os.path.join(img_folder, f) for f in os.listdir(img_folder) if f.endswith(('.png', '.jpg', '.jpeg'))]

all_res = []
for img_path in ref_paths:
    ov_results = model(img_path, device=device, conf=0.65, iou=0.3)
    
    shape = ov_results[0].orig_shape
    bboxes = ov_results[0].boxes
    
    print(f"Total num of boxes: {len(bboxes)} with model shape {shape}")
    filtered_boxes = filter_boxes(bboxes, shape)
    print(f"Total num of boxes after filter: {len(filtered_boxes)}\n")
    
    ov_results[0].update(boxes=torch.from_numpy(filtered_boxes.data))
    all_res.append(ov_results)
    
    #break

num_images = len(all_res)
cols = 3  
rows = math.ceil(num_images/cols)
fig, ax = plt.subplots(rows, cols, figsize=(15, 5 * rows))

ax = ax.flatten()

for i, r in enumerate(all_res):
        im_bgr = r[0].plot(masks=False)  
        im_rgb = Image.fromarray(im_bgr[..., ::-1])  

        ax[i].imshow(im_rgb)  
        ax[i].set_title(f"Image {i+1}-{r[0][0].path.split('/')[-1]}")

### Create CLIP embeddings and store in FAISS

We use a combination of CLIP 32 and FAISS the image search.

In [None]:
import os
dir_name = "cropped"
if not os.path.exists(dir_name):
    os.makedirs(dir_name)

for i, each in enumerate(all_res):
    #print(each)
    for j, result in enumerate(each):
        result.save_crop(save_dir=f"{dir_name}/{each[0].path.split('.')[0].split('/')[-1]}", file_name=f"detection")

In [None]:
import base64
import os
from io import BytesIO
import cv2
import faiss
import numpy as np
import torch
import clip
from PIL import Image
import json
import supervision as sv

### Data Augmentations

Since the number of main embeddings are few in number, we perform augmentations to increase the number of main embeddings. 
This gives us better search results. The search results are directly proportional to the number of main embeddings. 

In [None]:
import numpy as np
import albumentations as A
from PIL import Image
from IPython.display import display
from pathlib import Path

image_folder = "extracted"
image_paths = [os.path.join(image_folder, f) for f in os.listdir(image_folder) if f.endswith((".png", ".jpg"))]
count = len(image_paths)

for i, path in enumerate(sorted(image_paths, key=lambda x: int(x.split("-")[-1].split(".")[0]))):
    try:
        image = Image.open(path)
    except IOError:
        print("Error loading:", path)
        continue
        
    name = Path(path).stem.split("-")[0]
    rotated_90 = image.rotate(90, expand=True)
    rotated_90.save(f"{image_folder}/{i}_{name}-{i + count}.jpg")

    count += 1
    rotated_270 = image.rotate(-90, expand=True)
    rotated_270.save(f"{image_folder}/{i}_{name}-{i + count}.jpg")

### Load CLIP and populate the vector database

In [None]:
%%time
device = "cuda" if torch.cuda.is_available() else "cpu"
clip_model, clip_preprocess = clip.load("ViT-B/32", device=device)

In [None]:
def get_image_embedding(image: Image) -> np.ndarray:
    image = clip_preprocess(image).unsqueeze(0).to(device)
    with torch.no_grad():
        embedding = clip_model.encode_image(image)
    return embedding.cpu().numpy().flatten()

In [None]:
%%time
clip_index = faiss.IndexFlatL2(512)

image_folder = "extracted"
image_paths = [os.path.join(image_folder, f) for f in os.listdir(image_folder) if f.endswith((".png", ".jpg"))]
file_names = []

for i, path in enumerate(sorted(image_paths, key=lambda x: int(x.split("-")[-1].split(".")[0]))):
    try:
        frame = Image.open(path)
    except IOError:
        print("Error loading:", path)
        continue

    embedding = get_image_embedding(frame)
    clip_index.add(np.array([embedding]).astype(np.float32))
    #file_names.append(path)

faiss.write_index(clip_index, "clip_index.bin")
print("FAISS Index saved!")

Update the metadata JSON file with the new augmented images and its data

In [None]:
import json

with open("product_data.json", "r") as handle:
    product_data = json.load(handle)

image_folder = "extracted"
image_paths = [os.path.join(image_folder, f) for f in os.listdir(image_folder) if f.endswith((".png", ".jpg"))]

new_data = product_data.copy()
for i, path in enumerate(sorted(image_paths, key=lambda x: int(x.split("-")[-1].split(".")[0]))):
    if new_data.get(str(i)) is None:
        ref_path = path.split("_")[0].split("/")[1]
        new_data[str(i)] = product_data[ref_path]
    print(i, new_data[str(i)]["image_path"])

with open("product_data.json", "w", encoding="utf-8") as handle:
    json.dump(new_data, handle, indent=4)

### Inference and Image Search Test with CLIP

In [None]:
%%time
import os
import json
import pathlib
import faiss


query_folder = "cropped"
img_folder = "img"
query_paths = [os.path.join(query_folder, f) for f in os.listdir(query_folder) if f.endswith((".png", ".jpg"))]

image_folder = "extracted"
image_paths = [os.path.join(image_folder, f) for f in os.listdir(image_folder) if f.endswith((".png", ".jpg"))]
sorted_imgs = sorted(image_paths, key=lambda x: int(x.split("-")[-1].split(".")[0]))

clip_index = faiss.read_index("clip_index.bin")

with open("product_data.json", "r") as f:
    product_data = json.load(f)
    
max_distance_thres = 75
final_results = {}

for sub in sorted(os.listdir(query_folder)):
    if sub.startswith("image_"):
        subf = pathlib.Path(os.path.join(query_folder, sub))
        print(f"Input image: img/{sub}.jpg")
        final_results[f"img/{sub}.jpg"] = []
        
        input_img = cv2.imread(f"img/{sub}.jpg")
        sv.plot_image(input_img, (5, 5))
        
        print("\n")
        print("Matched items and top item's metadata:")
        
        # ignore ipynb project entries
        query_paths = [str(f) for f in subf.rglob("*.jpg") if "ipynb" not in str(f)]
        for i, query_image in enumerate(query_paths):
            image = Image.open(query_image)
            query_embedding = get_image_embedding(image)
            dist, ind = clip_index.search(np.array([query_embedding]).astype(np.float32), 4)
            #print(dist[0])
            
            filtered_images = []
            results = []

            for idx, distance in zip(ind[0], dist[0]):
                if distance < max_distance_thres:
                    filtered_images.append(cv2.imread(sorted_imgs[idx]))
                    if str(idx) in product_data:
                        results.append(product_data[str(idx)])
            if results:
                print(results[0])
                final_results[f"img/{sub}.jpg"].append(results[0])
                     
            query_image = cv2.imread(query_image)
            sv.plot_image(query_image, (2, 2))
        
            if filtered_images:
                sv.plot_images_grid(filtered_images, grid_size=(3,4), size=(6,6))
            else:
                print("No matches found")
        #break

with open("result.json", "w", encoding="utf-8") as handle:
    json.dump(final_results, handle, indent=4)

### LVM Synthesis using MiniCPM V2.6

Here we use MiniCPM to synthesize the vector DB results. We use an openVINO optimized version for best performance on an Intel GPU. This model can be switched to another LVM of choice. 

In [None]:
#export HF_TOKEN="<insert HF token here>"
#!optimum-cli export openvino -m openbmb/MiniCPM-o-2_6 --trust-remote-code --weight-format int4 minicpm_int4

In [None]:
import openvino_genai as ov_genai
from pathlib import Path

device = "GPU"
model_dir = Path("<path to minicpm OV dir>")

ov_model = ov_genai.VLMPipeline(model_dir, device=device)

In [None]:
import requests
from PIL import Image
from io import BytesIO
import numpy as np
import openvino as ov

config = ov_genai.GenerationConfig()
config.max_new_tokens = 500

def load_image(image_file):
    image = Image.open(image_file).convert("RGB")
    image_data = np.array(image.getdata()).reshape(1, image.size[1], image.size[0], 3).astype(np.byte)
    return image, ov.Tensor(image_data)

def streamer(subword: str) -> bool:
    print(subword, end="", flush=True)

In [None]:
import json
from IPython.display import display


with open("result.json", "r") as handle:
    final_results = json.load(handle)

question = """
As a food analyzer, please analyze this image. Please use the context for reference and provide a summary with product names and product codes. Only use the items listed in the context. Do not make up new items.
        
Context:
"""
for image_path, entries in final_results.items():    
    image, image_tensor = load_image(image_path)
    display(image)

    for i, entry in enumerate(entries):
        question += f"""
        Item {i+1}: {entry}"""
    break  
print(question)


In [None]:
%%time

ov_model.start_chat()
output = ov_model.generate(question, image=image_tensor, generation_config=config, streamer=streamer)