In [30]:
print("Installing required libraries...")
!pip install pandas tqdm -q

from google.colab import drive
print("Mounting Google Drive...")
drive.mount('/content/drive')

Installing required libraries...
Mounting Google Drive...
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [31]:
!pip show google-generativeai

Name: google-generativeai
Version: 0.8.5
Summary: Google Generative AI High level API client library and tools.
Home-page: https://github.com/google/generative-ai-python
Author: Google LLC
Author-email: googleapis-packages@google.com
License: Apache 2.0
Location: /usr/local/lib/python3.12/dist-packages
Requires: google-ai-generativelanguage, google-api-core, google-api-python-client, google-auth, protobuf, pydantic, tqdm, typing-extensions
Required-by: 


In [43]:
from __future__ import annotations
import os
import re
import glob
import json
import time
from pathlib import Path
from typing import List, Dict
from collections import defaultdict

import pandas as pd
from google import genai
from google.genai import types
from google.colab import userdata


In [33]:
INPUT_DIR = '/content/drive/My Drive/Colab Notebooks/lera/plastic_foodware/plastic_foodware_output'

# CSV filename to write inside INPUT_DIR
OUTPUT_CSV_NAME = "output.csv"

# Model + runtime knobs
MODEL_NAME = "gemini-2.0-flash"
MAX_IMAGES_PER_REQUEST = 16
MAX_RETRIES = 5
SLEEP_BETWEEN_GROUPS_SEC = 0.2

In [34]:
# Your query/instructions to the model for each group of images.
USER_QUERY = """Give me details on the item on display in these photos. This is for a field test for a lead exposure study. Items are scanned for lead toxicity and then photographed for documentation. When describing colour and material try to analyse the part of the item that would be being checked for this. Ignore masking tape labels used for documentation.
Extract the required fields from the images in this group.
- Read any visible label text verbatim where possible.
- If a field is not inferable, return an empty string (or [] for arrays) rather than guessing.
- Be consistent across the group; prefer information that appears on actual labels over inferred marketing text.
Return exactly the JSON that matches the provided schema.
"""

In [35]:
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)

In [36]:
def list_images(folder: str | Path) -> List[str]:
    """Return a flat list of image paths in folder."""
    folder = str(folder)
    exts = ("*.jpg", "*.jpeg", "*.png", "*.webp")
    files: List[str] = []
    for e in exts:
        files.extend(glob.glob(os.path.join(folder, e)))
    files.sort()
    return files

def group_by_prefix(files: List[str]) -> Dict[str, List[str]]:
    """
    Group images by filename prefix before the first underscore.

    Example:
        MWI.0.3.1.13.1.3_4.jpg  -> group "MWI.0.3.1.13.1.3"
        KEN.12.34_01.png        -> group "KEN.12.34"

    If no underscore is present, the whole stem becomes the group.
    """
    groups: Dict[str, List[str]] = defaultdict(list)
    for f in files:
        name = os.path.basename(f)
        m = re.match(r"([^_]+)_", name)
        key = m.group(1) if m else os.path.splitext(name)[0]
        groups[key].append(f)
    return dict(groups)

def part_from_image(path: str) -> types.Part:
    """Read an image and wrap it as a genai Part with best-effort MIME."""
    with open(path, "rb") as fh:
        b = fh.read()
    lower = path.lower()
    if lower.endswith(".png"):
        mime = "image/png"
    elif lower.endswith(".webp"):
        mime = "image/webp"
    else:
        mime = "image/jpeg"
    return types.Part.from_bytes(data=b, mime_type=mime)

def build_schema() -> types.Schema:
    """Structured output schema expected from the model."""
    return genai.types.Schema(
        type=genai.types.Type.OBJECT,
        required=[
            "label_text",
            "brand",
            "country_of_origin",
            "materials_broad",
            "materials_specific",
            "colours",
            "item_description",
        ],
        properties={
            "label_text": genai.types.Schema(
                type=genai.types.Type.ARRAY,
                items=genai.types.Schema(type=genai.types.Type.STRING),
            ),
            "brand": genai.types.Schema(type=genai.types.Type.STRING),
            "country_of_origin": genai.types.Schema(type=genai.types.Type.STRING),
            "materials_broad": genai.types.Schema(
                type=genai.types.Type.ARRAY,
                items=genai.types.Schema(type=genai.types.Type.STRING),
            ),
            "materials_specific": genai.types.Schema(
                type=genai.types.Type.ARRAY,
                items=genai.types.Schema(type=genai.types.Type.STRING),
            ),
            "colours": genai.types.Schema(
                type=genai.types.Type.ARRAY,
                items=genai.types.Schema(type=genai.types.Type.STRING),
            ),
            "item_description": genai.types.Schema(type=genai.types.Type.STRING),
        },
    )

def _parse_json_text(payload: str) -> dict:
    """Parse JSON from model text; tolerate fences like ```json ... ```."""
    cleaned = payload.strip()
    cleaned = re.sub(r"^```json\s*|\s*```$", "", cleaned, flags=re.DOTALL)
    cleaned = re.sub(r"^```\s*|\s*```$", "", cleaned, flags=re.DOTALL)
    return json.loads(cleaned)

In [44]:
def call_gemini_for_group(
    client: genai.Client,
    model: str,
    query_text: str,
    image_paths: List[str],
    max_images_per_request: int = 16,
) -> dict:
    """
    Send up to `max_images_per_request` images in one request; if more, chunk and merge.
    Arrays are unioned (unique, order preserved); strings take the first non-empty value.
    """
    schema = build_schema()
    config = types.GenerateContentConfig(
        response_mime_type="application/json",
        response_schema=schema,
    )

    def single_request(img_paths: List[str]) -> dict:
        parts: List[types.Part] = [types.Part.from_text(text=query_text)]
        parts.extend(part_from_image(p) for p in img_paths)
        contents = [types.Content(role="user", parts=parts)]
        resp = client.models.generate_content(model=model, contents=contents, config=config)
        return _parse_json_text(resp.text)

    # Base accumulator
    merged = {
        "label_text": [],
        "brand": "",
        "country_of_origin": "",
        "materials_broad": [],
        "materials_specific": [],
        "colours": [],
        "item_description": "",
    }

    def merge(dst: dict, src: dict) -> None:
        # arrays: unique union
        def extend_unique(key: str) -> None:
            seen = set(dst[key])
            for item in src.get(key, []) or []:
                if item not in seen:
                    dst[key].append(item)
                    seen.add(item)
        for k in ("label_text", "materials_broad", "materials_specific", "colours"):
            extend_unique(k)
        # strings: first non-empty wins
        for k in ("brand", "country_of_origin", "item_description"):
            if not dst.get(k) and src.get(k):
                dst[k] = src[k]

    if len(image_paths) <= max_images_per_request:
        return single_request(image_paths)

    for i in range(0, len(image_paths), max_images_per_request):
        piece = single_request(image_paths[i : i + max_images_per_request])
        merge(merged, piece)

    return merged

def run_batch(
    image_dir: Path,
    user_query: str,
    output_csv: Path,
    model: str = MODEL_NAME,
    max_images_per_request: int = MAX_IMAGES_PER_REQUEST,
    sleep_s: float = SLEEP_BETWEEN_GROUPS_SEC,
    max_retries: int = MAX_RETRIES,
) -> pd.DataFrame:
    """Process all images in `image_dir`, grouped by filename prefix, and write CSV."""
    api_key = userdata.get('GOOGLE_API_KEY') # os.getenv("GOOGLE_API_KEY")
    if not api_key:
        raise RuntimeError("GOOGLE_API_KEY environment variable is not set.")

    client = genai.Client(api_key=api_key)

    files = list_images(image_dir)
    if not files:
        raise RuntimeError(f"No images found in: {image_dir}")

    groups = group_by_prefix(files)
    print(f"Found {len(files)} images across {len(groups)} groups.")

    rows = []
    for gid, img_list in sorted(groups.items(), key=lambda kv: kv[0]):
        img_list = sorted(img_list)
        for attempt in range(max_retries):
            try:
                result = call_gemini_for_group(
                    client=client,
                    model=model,
                    query_text=user_query,
                    image_paths=img_list,
                    max_images_per_request=max_images_per_request,
                )
                rows.append(
                    {
                        "group_id": gid,
                        "brand": result.get("brand", ""),
                        "country_of_origin": result.get("country_of_origin", ""),
                        "materials_broad": " | ".join(result.get("materials_broad", []) or []),
                        "materials_specific": " | ".join(result.get("materials_specific", []) or []),
                        "colours": " | ".join(result.get("colours", []) or []),
                        "item_description": result.get("item_description", ""),
                        "label_text": " | ".join(result.get("label_text", []) or []),
                        "num_images": len(img_list),
                        "image_paths": " | ".join(img_list),
                    }
                )
                print(f"✓ {gid}  ({len(img_list)} images)")
                break
            except Exception as e:
                wait = (2 ** attempt) * 0.5
                print(f"Attempt {attempt+1}/{max_retries} failed for {gid}: {e} — retrying in {wait:.1f}s")
                time.sleep(wait)
        else:
            print(f"✗ Skipping {gid} after {max_retries} failures.")
        time.sleep(sleep_s)

    # Write CSV (no JSONL) inside the input folder
    output_csv.parent.mkdir(parents=True, exist_ok=True)
    df = pd.DataFrame(rows)
    df.to_csv(output_csv, index=False, encoding="utf-8")
    print(f"\nSaved CSV: {output_csv}")
    return df

In [45]:
def main() -> None:
    input_dir = Path(INPUT_DIR)          # <-- make it a Path
    output_csv = input_dir / OUTPUT_CSV_NAME
    input_dir.mkdir(parents=True, exist_ok=True)
    run_batch(image_dir=input_dir, user_query=USER_QUERY, output_csv=output_csv)

if __name__ == "__main__":
    main()

Found 203 images across 67 groups.
✓ AGO.1.0  (3 images)
✓ AGO.1.1  (2 images)
✓ BFA.1.0  (7 images)
✓ BFA.1.1  (3 images)
✓ BFA.1.2  (3 images)
✓ BFA.1.3  (3 images)
✓ BFA.1.4  (3 images)
✓ BFA.1.5  (6 images)
✓ BFA.1.6  (2 images)
✓ BFA.1.7  (3 images)
✓ BFA.1.8  (4 images)
✓ BFA.1.9  (3 images)
✓ KEN.1.0  (3 images)
✓ KEN.1.1  (2 images)
✓ KEN.1.2  (2 images)
✓ KEN.1.3  (3 images)
✓ KEN.1.4  (2 images)
✓ KEN.1.5  (2 images)
✓ KEN.1.6  (5 images)
✓ KEN.1.7  (2 images)
✓ KEN.1.8  (3 images)
✓ LBR.1.1  (2 images)
✓ LBR.1.2  (2 images)
✓ LBR.1.3  (2 images)
✓ LBR.1.4  (2 images)
✓ LBR.1.5  (2 images)
✓ LBR.1.6  (2 images)
✓ LBR.1.7  (3 images)
✓ LBR.1.8  (4 images)
✓ LBR.2.1  (2 images)
✓ MWI.1.0  (3 images)
✓ MWI.1.1  (3 images)
✓ MWI.1.2  (2 images)
✓ MWI.1.3  (2 images)
✓ MWI.1.4  (3 images)
✓ MWI.1.5  (2 images)
✓ MWI.1.6  (5 images)
✓ MWI.1.7  (3 images)
✓ MWI.1.8  (5 images)
✓ MWI.1.9  (4 images)
✓ MWI.2.0  (4 images)
✓ MWI.2.1  (4 images)
✓ MWI.2.2  (3 images)
✓ MWI.2.3  (3 image