In [None]:
!pip install open_clip_torch



In [None]:
from openai import OpenAI
import os
from getpass import getpass

# input api key
# get from https://openai.com/api/
api_key = getpass("Enter your OpenAI API key: ")

#gpt api
gpt_client = OpenAI(api_key=api_key)


Enter your OpenAI API key: ··········


In [None]:
import numpy as np
import torch

# CLIP Ranking

def clip_image_embed(pil_img, clip_model, clip_preprocess):
    """
    Converts a PIL image into a normalized CLIP image embedding.

    Steps:
    1. Preprocess the image into the format expected by the CLIP model.
    2. Disable gradient calculation for faster inference.
    3. Encode the image using CLIP’s image encoder.
    4. Normalize the resulting embedding vector to unit length.
    """
    # Preprocess image and move it to CPU
    img_in = clip_preprocess(pil_img).unsqueeze(0).to('cpu')

    with torch.no_grad():  # Disable gradients for faster computation
        img_emb = clip_model.encode_image(img_in)
        # Normalize the embedding to prevent magnitude differences from affecting similarity
        return img_emb / img_emb.norm(dim=-1, keepdim=True)


def rank_items_from_embedding(img_emb, text_emb, menu_items, item_pointers):
    """
    Ranks menu items by similarity to an image embedding using CLIP embeddings.

    Args:
        img_emb: Torch tensor representing the image embedding (1 x D).
        text_emb: Torch tensor representing all text embeddings (N x D).
        menu_items: List of menu item names.
        item_pointers: Array mapping text embeddings back to menu item indices.

    Returns:
        A tuple of (sorted_indices, probabilities) where:
        - sorted_indices: Indices of menu_items sorted by descending similarity.
        - probabilities: Normalized softmax-like probabilities per item.
    """
    # Compute cosine similarities between image and all text embeddings
    sims = (img_emb @ text_emb.T).squeeze(0)

    # Initialize array for per-item max similarities
    per_item = np.full(len(menu_items), -1e9, dtype=np.float32)

    # Move tensor to CPU and convert to numpy for easy indexing
    sims_cpu = sims.detach().float().cpu().numpy()

    # For each menu item, store the maximum similarity from its text variants
    for i in range(len(menu_items)):
        per_item[i] = sims_cpu[item_pointers == i].max()

    # Standardize scores (z-score normalization) to stabilize scaling
    z = (per_item - per_item.mean()) / (per_item.std() + 1e-6)

    # Convert standardized scores to probabilities via softmax
    probs = np.exp(z) / np.exp(z).sum()

    # Sort by descending probability
    sorted_indices = np.argsort(-probs)
    sorted_probs = probs[sorted_indices]

    # Return sorted menu indices and corresponding probabilities
    return sorted_indices, sorted_probs


In [None]:
import base64
from io import BytesIO
import json

from typing import List
from pydantic import BaseModel, confloat

class ClassifiedItem(BaseModel):
    id: int
    name: str
    confidence: confloat(ge=0.0, le=1.0)

class ClassificationResult(BaseModel):
    items: List[ClassifiedItem]   # [{id,name,confidence}, ...]
    explanation: str

def image_to_base64(img):
    """Convert a PIL Image to a base64 data URI for OpenAI API."""
    buffer = BytesIO()
    img.save(buffer, format="JPEG")  # or "PNG" if you prefer
    b64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
    return f"data:image/jpeg;base64,{b64}"

# gpt model 1, item classification
def gpt_item_classification(pil_img, items_ranked, client, model="gpt-5-mini"):
    # convert PIL image to base64
    image_url = image_to_base64(pil_img)

    # pair items with scores (top-N already ranked upstream)
    items_payload = json.dumps(items_ranked)

    # prompt
    prompt_text = (
        "You are a food identification expert specializing in dining hall meals.\n\n"
        "You will be shown an image of a plate and a list of candidate menu items ranked "
        "by visual similarity scores (highest to lowest).\n"
        "Your task: determine which of these items are actually present on the plate.\n\n"
        "Guidelines:\n"
        "- Use both the image and the ranking scores; scores are hints, not ground truth.\n"
        "- Focus only on visible foods; ignore background objects like trays or utensils.\n"
        "- Only choose from the provided menu item list — do not invent new items.\n\n"
        "Output Format (strict JSON):\n"
        '- \"items\": an array of objects, each with fields {\"id\", \"name\", \"confidence\"}\n'
        '- \"confidence\": a float in [0,1] for each chosen item\n'
        '- \"explanation\": a brief 1–2 sentence rationale describing your reasoning.\n'
    )

    # call the gpt responses API
    response = client.responses.parse(
        model=model,
        input=[
            {
                "role": "user",
                "content": [
                    { "type": "input_text", "text": prompt_text },
                    { "type": "input_text", "text": items_payload },
                    { "type": "input_image", "image_url": image_url }
                ],
            }
        ],
        text_format=ClassificationResult
    )

    return response.output_parsed.model_dump()



In [None]:
from typing import List
from pydantic import BaseModel, confloat

class PortionEstimate(BaseModel):
    id: int
    name: str
    num_servings: confloat(ge=0.0, le=10.0)

class PortionEstimationResult(BaseModel):
    servings: List[PortionEstimate]
    explanation: str

# gpt model 2, portion estimation
def gpt_portion_estimation(pil_img, classification_result, client, model="gpt-5-mini"):
    image_url = image_to_base64(pil_img)
    payload = json.dumps(classification_result, indent=2)

    prompt_text = (
        "You are a nutrition analyst estimating portion sizes.\n\n"
        "INPUTS:\n"
        "1) An image of a single plate.\n"
        "2) A JSON of detected items with nutrition info, including serving size.\n\n"
        "TASK:\n"
        "- For each detected item, output ONLY the number of SERVINGS on the plate, as a decimal if needed.\n"
        "- If serving size uses EACH and shows a number N (e.g., '4 EACH'), then 1 serving = N pieces. "
        "  If you estimate P pieces on the plate, report servings = P / N.\n"
        "- For non-EACH units (g/oz/cup), estimate servings by dividing the visible amount by the serving size.\n"
        "- Do not output the piece count (P). Do not output words like 'tenders' or any free text other than the explanation field.\n"
        "- Do not invent items. Only return items present in the provided JSON.\n"
        "- Include the count in explanation for EACH type items.\n"
        "- If unsure, return your best conservative estimate.\n\n"
        "FORMAT (strict JSON):\n"
        "{\n"
        '  "servings": [ { "id": <int>, "name": "<str>", "estimated_servings": <float> }, ... ],\n'
        '  "explanation": "<1-2 short sentences>"\n'
        "}\n"
    )

    response = client.responses.parse(
        model=model,
        input=[
            {
                "role": "user",
                "content": [
                    {"type": "input_text", "text": prompt_text},
                    {"type": "input_text", "text": payload},
                    {"type": "input_image", "image_url": image_url}
                ]
            }
        ],
        text_format=PortionEstimationResult
    )

    return response.output_parsed.model_dump()

In [None]:
import open_clip
# Load CLIP model for ranking, use same instance (~6 second on cpu)
model, _, preprocess = open_clip.create_model_and_transforms(
  "ViT-B-32", pretrained="laion2b_s34b_b79k", device='cpu'
)

tokenizer = open_clip.get_tokenizer("ViT-B-32")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


open_clip_model.safetensors:   0%|          | 0.00/605M [00:00<?, ?B/s]

In [None]:
# api stuff
import requests

def get_menu_items(hallid, meal, date):
    url = "https://husky-eats.onrender.com/api/menu"
    r = requests.get(url, params={"hallid": hallid, "meal": meal, "date": date})
    r.raise_for_status()
    return [
        {"name": item["name"], "id": item["id"]}
        for item in r.json()
        if "name" in item and "id" in item
    ]

def get_nutrition_info(item_id):
    url = "https://husky-eats.onrender.com/api/menuitem/" + str(item_id)
    r = requests.get(url)
    r.raise_for_status()
    return r.json()

In [None]:
import time
import PIL.Image as Image
import open_clip
import numpy as np

# full pipeline
def predict(pil_image, dining_hall_id, meal, date):
  start_total = time.time()
  print(f"\n=== Running menumatch for new image ({meal}, {date}) ===")

  t0 = time.time()
  print(f"[Loaded image] {time.time() - t0:.3f}s")

  # call huskyeats api for menu items
  t0 = time.time()
  menu_items = get_menu_items(dining_hall_id, meal, date)

  menu_items_names = [item["name"] for item in menu_items]
  print(f"[Loaded menu items] {time.time() - t0:.3f}s")

  clip_prompts = [
    "a photo of {}",
    "a plate of {}",
    "{} on a plate",
    "cafeteria serving of {}",
    "dining hall style {}",
  ]

  t0 = time.time()
  # create prompt variants of each menu item
  item_variants, variant_pointers = [], []
  for idx, item in enumerate(menu_items_names):
    for p in clip_prompts:
      item_variants.append(p.format(item))
      variant_pointers.append(idx)

  variant_pointers = np.array(variant_pointers)
  print(f"[Prepared prompt variants] {time.time() - t0:.3f}s")

  t0 = time.time()
  # generate text embeddings
  # TODO: use redis cache in prod to prevent recomputation of exact same prompts
  text_tokens = tokenizer(item_variants).to('cpu')
  text_emb = model.encode_text(text_tokens)
  text_emb = text_emb / text_emb.norm(dim=-1, keepdim=True)
  print(f"[Generated text embeddings] {time.time() - t0:.3f}s")

  t0 = time.time()
  # embed image
  img_emb = clip_image_embed(pil_image, model, preprocess)
  print(f"[Generated image embedding] {time.time() - t0:.3f}s")

  t0 = time.time()
  # rank text embeddings to image embedding
  items_ranked_idxs, items_scores = rank_items_from_embedding(img_emb, text_emb, menu_items_names, variant_pointers)
  items_ranked = [
    {
        "name": menu_items[i]["name"],
        "id": menu_items[i]["id"],
        "score": float(items_scores[k])
    }
    for k, i in enumerate(items_ranked_idxs)
  ]

  print(f"[CLIP Ranking items] {time.time() - t0:.3f}s")
  print(items_ranked)
  print()

  del text_tokens, text_emb, img_emb

  t0 = time.time()
  # item classification
  classification_result = gpt_item_classification(pil_image, items_ranked, gpt_client)
  print(classification_result)
  print(f"[GPT Item classification] {time.time() - t0:.3f}s")

  # use classification result and pull menu data
  t0 = time.time()
  for item in classification_result["items"]:
    item["nutrition"] = get_nutrition_info(item["id"])
  print(f"[Pulled nutrition info] {time.time() - t0:.3f}s")

  # portion estimation
  t0 = time.time()
  portion_result = gpt_portion_estimation(pil_image, classification_result, gpt_client)
  print(portion_result)
  print(f"[GPT Portion estimation] {time.time() - t0:.3f}s")

  print(f"=== Total runtime: {time.time() - start_total:.3f}s ===\n")
  return portion_result['servings']


In [None]:
# ---- CONFIG ----
from getpass import getpass

API_BASE_URL = "https://3vw53n9900.execute-api.us-east-1.amazonaws.com/dev"
API_TOKEN = getpass("Enter MenuMatch API token: ")

HUSKYEATS_BASE_URL = "https://husky-eats.onrender.com/api"

# ---- IMPORTS ----
import requests
from dataclasses import dataclass
from typing import List, Optional, Dict, Any, Iterable, Tuple
from io import BytesIO
import math

from PIL import Image


# ---- LOW-LEVEL API HELPERS ----

def _auth_headers() -> Dict[str, str]:
    return {"X-Api-Key": API_TOKEN}


def fetch_dataset_metadata() -> List[Dict[str, Any]]:
    url = f"{API_BASE_URL}/dataset"
    resp = requests.get(url, headers=_auth_headers())
    resp.raise_for_status()
    data = resp.json()
    return data.get("items", data)


def get_download_url(object_key: str, bucket: Optional[str] = None) -> str:
    url = f"{API_BASE_URL}/downloads/presign"
    payload: Dict[str, Any] = {"objectKey": object_key}
    if bucket:
        payload["bucket"] = bucket

    resp = requests.post(url, headers=_auth_headers(), json=payload)
    resp.raise_for_status()
    data = resp.json()
    return data["downloadUrl"]


def load_image(object_key: str, bucket: Optional[str] = None) -> Image.Image:
    download_url = get_download_url(object_key, bucket=bucket)
    resp = requests.get(download_url)
    resp.raise_for_status()
    img = Image.open(BytesIO(resp.content)).convert("RGB")
    return img


def get_nutrition_for_id(menu_item_id):
    # HuskyEats: GET /menuitem/{id}
    url = f"{HUSKYEATS_BASE_URL}/menuitem/{menu_item_id}"
    resp = requests.get(url)
    resp.raise_for_status()
    data = resp.json()

    return {
        "kcal": float(data["calories"]),
        "protein_g": float(data["protein_g"]),
        "carb_g": float(data["totalcarbohydrate_g"]),
        "fat_g": float(data["totalfat_g"]),
    }


# ---- DATA STRUCTURES ----

@dataclass
class GroundTruthItem:
    id: str
    num_servings: float


@dataclass
class Sample:
    object_key: str
    image: Image.Image
    dining_hall_id: str
    meal_time: str
    date: str
    difficulty: Optional[str]
    ground_truth: List[GroundTruthItem]


def iter_samples(limit: Optional[int] = None) -> Iterable[Sample]:
    metadata_items = fetch_dataset_metadata()
    if limit is not None:
        metadata_items = metadata_items[:limit]

    for meta in metadata_items:
        object_key = meta["objectKey"]
        bucket = meta.get("bucket")

        img = load_image(object_key, bucket=bucket)

        gt_items = [
            GroundTruthItem(
                id=str(item["menuItemId"]),
                num_servings=float(item["servings"]),
            )
            for item in meta.get("items", [])
        ]

        yield Sample(
            object_key=object_key,
            image=img,
            dining_hall_id=str(meta.get("diningHallId")),
            meal_time=str(meta.get("mealtime")),
            date=str(meta.get("mealDate")),
            difficulty=meta.get("difficulty"),
            ground_truth=gt_items,
        )


# ---- RUNNER ----

def run_model_on_dataset(
    predict_fn=None,
    limit: Optional[int] = None,
):
    if predict_fn is None:
        try:
            predict_fn = globals()["predict"]
        except KeyError:
            raise ValueError("No predict_fn provided and no global `predict` defined.")

    results = []

    for sample in iter_samples(limit=limit):
        preds = predict_fn(
            sample.image,
            sample.dining_hall_id,
            sample.meal_time,
            sample.date,
        )

        results.append(
            {
                "object_key": sample.object_key,
                "dining_hall_id": sample.dining_hall_id,
                "meal_time": sample.meal_time,
                "date": sample.date,
                "ground_truth": sample.ground_truth,
                "predictions": preds,
            }
        )

    return results


# ---- METRICS ----

def _items_to_dict(items: Iterable[Any]) -> Dict[str, float]:
    out: Dict[str, float] = {}
    for it in items:
        if hasattr(it, "id"):
            _id = str(it.id)
            servings = float(it.num_servings)
        else:
            _id = str(it["id"])
            servings = float(it.get("num_servings", 0.0))
        out[_id] = servings
    return out


def compute_all_metrics(
    results: List[Dict[str, Any]],
    get_nutrition_for_id: Optional[callable] = None,
    macro_nutrients: Tuple[str, ...] = ("kcal", "protein_g", "carb_g", "fat_g"),
) -> Dict[str, float]:
    tp = fp = fn = 0
    jaccards: List[float] = []
    exact_match_count = 0

    abs_errors: List[float] = []
    sq_errors: List[float] = []
    perc_errors: List[float] = []

    for r in results:
        gt = _items_to_dict(r["ground_truth"])
        pr = _items_to_dict(r["predictions"])

        gt_ids = {k for k, v in gt.items() if v > 0}
        pr_ids = {k for k, v in pr.items() if v > 0}

        inter = gt_ids & pr_ids
        tp += len(inter)
        fp += len(pr_ids - gt_ids)
        fn += len(gt_ids - pr_ids)

        union = gt_ids | pr_ids
        j = len(inter) / len(union) if union else 1.0
        jaccards.append(j)

        if gt_ids == pr_ids:
            exact_match_count += 1

        all_ids = set(gt.keys()) | set(pr.keys())
        for item_id in all_ids:
            g = gt.get(item_id, 0.0)
            p = pr.get(item_id, 0.0)
            err = p - g
            ae = abs(err)
            abs_errors.append(ae)
            sq_errors.append(err * err)
            if g > 0:
                perc_errors.append(ae / g)

    precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0

    avg_jaccard = sum(jaccards) / len(jaccards) if jaccards else 0.0
    exact_match = exact_match_count / len(results) if results else 0.0

    mae_serv = sum(abs_errors) / len(abs_errors) if abs_errors else 0.0
    rmse_serv = math.sqrt(sum(sq_errors) / len(sq_errors)) if sq_errors else 0.0
    pmae_serv = sum(perc_errors) / len(perc_errors) if perc_errors else 0.0

    metrics: Dict[str, float] = {
        "cls_precision": precision,
        "cls_recall": recall,
        "cls_f1": f1,
        "cls_avg_jaccard": avg_jaccard,
        "cls_exact_match": exact_match,
        "portion_mae_servings": mae_serv,
        "portion_rmse_servings": rmse_serv,
        "portion_pmae_servings": pmae_serv,
    }

    if get_nutrition_for_id is not None:
        nutr_cache: Dict[str, Dict[str, float]] = {}

        def nutr(item_id: str) -> Dict[str, float]:
            if item_id not in nutr_cache:
                nutr_cache[item_id] = get_nutrition_for_id(item_id)
            return nutr_cache[item_id]

        macro_abs_errors = {n: [] for n in macro_nutrients}
        macro_perc_errors = {n: [] for n in macro_nutrients}

        for r in results:
            gt = _items_to_dict(r["ground_truth"])
            pr = _items_to_dict(r["predictions"])

            gt_tot = {n: 0.0 for n in macro_nutrients}
            pr_tot = {n: 0.0 for n in macro_nutrients}

            for item_id, servings in gt.items():
                info = nutr(item_id)
                for n in macro_nutrients:
                    gt_tot[n] += servings * float(info[n])

            for item_id, servings in pr.items():
                info = nutr(item_id)
                for n in macro_nutrients:
                    pr_tot[n] += servings * float(info[n])

            for n in macro_nutrients:
                g = gt_tot[n]
                p = pr_tot[n]
                ae = abs(p - g)
                macro_abs_errors[n].append(ae)
                if g > 0:
                    macro_perc_errors[n].append(ae / g)

        for n in macro_nutrients:
            ae_list = macro_abs_errors[n]
            pe_list = macro_perc_errors[n]

            metrics[f"macro_mae_{n}"] = sum(ae_list) / len(ae_list) if ae_list else 0.0
            metrics[f"macro_pmae_{n}"] = sum(pe_list) / len(pe_list) if pe_list else 0.0

    return metrics


# ---- TEMPLATE PREDICT FUNCTION (DELETE/EDIT IN EACH NOTEBOOK) ----
# def predict(image, dining_hall_id, meal_time, date):
#     return []


Enter MenuMatch API token: ··········


In [None]:
results = run_model_on_dataset(limit=30)
metrics = compute_all_metrics(results, get_nutrition_for_id=get_nutrition_for_id)

for k, v in metrics.items():
    print(f"{k:25s} {v:.4f}")


=== Running menumatch for new image (dinner, 2025-10-22) ===
[Loaded image] 0.000s
[Loaded menu items] 0.330s
[Prepared prompt variants] 0.000s
[Generated text embeddings] 6.600s
[Generated image embedding] 0.288s
[CLIP Ranking items] 0.009s
[{'name': "General Tso's Chicken", 'id': 111083, 'score': 0.3230091631412506}, {'name': 'Shrimp Egg Foo Yung', 'id': 300656, 'score': 0.2066836804151535}, {'name': 'Jasmine Rice', 'id': 400711, 'score': 0.1253126710653305}, {'name': 'Veggie Strip Stir Fry', 'id': 300685, 'score': 0.06751075387001038}, {'name': 'Crinkle Cut Fries', 'id': 161028, 'score': 0.040266264230012894}, {'name': 'Fortune Cookies', 'id': 601156, 'score': 0.03612662851810455}, {'name': 'Chefs Choice Lo Mein', 'id': 142046, 'score': 0.03413303568959236}, {'name': 'Summer Vegetable Medley', 'id': 300974, 'score': 0.031123332679271698}, {'name': 'Mini Vegetable Egg Rolls', 'id': 142220, 'score': 0.029234956949949265}, {'name': 'Corn & Bacon Chowder', 'id': 300876, 'score': 0.0221