In [None]:
%pip install openai

import os
import io
import json
import base64
import yaml
import glob
import numpy as np
from PIL import Image
from pathlib import Path
from typing import Tuple, Dict, Optional, Any, Union
%pip install openai
from openai import OpenAI
API_KEY_PATH = Path("apikey.yaml")

def _load_api_config() -> Dict[str, Any]:
    if not API_KEY_PATH.exists():
        return {}
    with API_KEY_PATH.open("r", encoding="utf-8") as handle:
        data = yaml.safe_load(handle) or {}
        if not isinstance(data, dict):
            raise ValueError("apikey.yaml must contain a mapping at the root")
        return data

API_CONFIG = _load_api_config()

def _get_provider_key(provider_name: str, env_var: str) -> Optional[str]:
    provider_cfg = (API_CONFIG.get("providers") or {}).get(provider_name, {})
    if isinstance(provider_cfg, dict):
        key = provider_cfg.get("api_key")
        if key:
            return key
    return os.getenv(env_var)

LLM_PROVIDER = (os.getenv("LLM_PROVIDER") or API_CONFIG.get("default_provider") or "gemini").lower() 

GEMINI_MODEL_NAME = API_CONFIG.get("models", {}).get("gemini", "gemini-1.5-flash")
OPENAI_MODEL_NAME = API_CONFIG.get("models", {}).get("openai", "gpt-4o")
CLAUDE_MODEL_NAME = API_CONFIG.get("models", {}).get("anthropic", "claude-3-5-sonnet-20240620")

gemini_model = None
openai_client = None
anthropic_client = None

if LLM_PROVIDER == "gemini":
    import google.generativeai as genai
    from google.generativeai import GenerationConfig
    gemini_key = _get_provider_key("gemini", "GOOGLE_API_KEY")
    if not gemini_key:
        raise RuntimeError("Gemini API key missing. Provide via apikey.yaml or GOOGLE_API_KEY env var.")
    genai.configure(api_key=gemini_key)
    gemini_model = genai.GenerativeModel(GEMINI_MODEL_NAME)

elif LLM_PROVIDER == "openai":
    
    openai_key = _get_provider_key("openai", "OPENAI_API_KEY")
    if not openai_key:
        raise RuntimeError("OpenAI API key missing. Provide via apikey.yaml or OPENAI_API_KEY env var.")
    openai_client = OpenAI(api_key=openai_key)

elif LLM_PROVIDER == "anthropic":
    import anthropic
    anthropic_key = _get_provider_key("anthropic", "ANTHROPIC_API_KEY")
    if not anthropic_key:
        raise RuntimeError("Anthropic API key missing. Provide via apikey.yaml or ANTHROPIC_API_KEY env var.")
    anthropic_client = anthropic.Anthropic(api_key=anthropic_key)

print(f"‚úÖ Configured {LLM_PROVIDER.upper()} provider")


def _image_to_png_base64(image_data: np.ndarray) -> Tuple[str, str]:
    """Return (media_type, base64_data) for the image payload."""
    if image_data.dtype != np.uint8:
        image_data = (image_data * 255).astype(np.uint8)
        
    img_pil = Image.fromarray(image_data)
    if img_pil.mode != 'RGB':
        img_pil = img_pil.convert('RGB')
        
    buf = io.BytesIO()
    img_pil.save(buf, format="PNG")
    b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
    return "image/png", b64


def _extract_json_from_text(text: str) -> str:
    """Best-effort extraction of a JSON object from a text response."""
    if not text:
        raise ValueError("Empty response from LLM")
    
    text = text.strip()
    
    if text.startswith("```json"):
        text = text[7:]
    if text.startswith("```"):
        text = text[3:]
    if text.endswith("```"):
        text = text[:-3]
        
    text = text.strip()
    
    if text.startswith("{") and text.endswith("}"):
        return text
        
    start = text.find("{")
    end = text.rfind("}")
    if start == -1 or end == -1 or end <= start:
        raise ValueError(f"No JSON object found in response: {text[:200]}")
        
    return text[start:end+1]


def _legend_prompt() -> str:
    return """You are an expert GIS analyst extracting the LEGEND/KEY from a historical zoning map image. 
Return STRICT JSON only. No preamble.

REQUIREMENTS:
1. **Detection**: Identify ANY legend, key, reference box, or table that explains patterns, letters, or zone codes.
2. **Bounding Box**: Provide integer pixel coordinates relative to the image size provided.
3. **Content**: Extract every item (symbol, color, or text code) and its meaning.

Output JSON schema:
{
  "has_legend": boolean,
  "legend_bbox": {"x": int, "y": int, "width": int, "height": int},
  "legend_location": "top-left" | "top-right" | "bottom-left" | "bottom-right" | "left" | "right" | "center",
  "legend_items": [
    {
      "identifier": string,
      "symbol_pattern_type": string,
      "visual_description": string,
      "meaning": string
    }
  ],
  "map_metadata": {
      "map_title": string,
      "map_location": string,
      "map_year": string,
      "confidence_level": "high" | "medium" | "low"
  }
}"""


def extract_legend_with_llm(image_path: Union[str, Path], image_data: np.ndarray) -> Dict[str, Any]:
    """Extract legend information using the configured provider."""
    path_obj = Path(image_path)
    
    def _infer_bbox_from_location(img: np.ndarray, loc: Optional[str]) -> Dict[str, int]:
        h, w = img.shape[:2]
        box_w, box_h = int(0.30 * w), int(0.30 * h)
        loc = (loc or "").lower()
        
        x, y = max(0, w - box_w - int(0.05 * w)), max(0, h - box_h - int(0.05 * h))

        if "top" in loc:
            y = int(0.05 * h)
        if "bottom" in loc:
            y = max(0, h - box_h - int(0.05 * h))
        if "left" in loc:
            x = int(0.05 * w)
        if "right" in loc:
            x = max(0, w - box_w - int(0.05 * w))
        if "center" in loc:
            x, y = int(w/2 - box_w/2), int(h/2 - box_h/2)
            
        return {
            "x": int(np.clip(x, 0, w - 1)),
            "y": int(np.clip(y, 0, h - 1)),
            "width": int(min(box_w, w)),
            "height": int(min(box_h, h)),
        }

    def _normalize_bbox_local(raw_bbox: Any) -> Optional[Dict[str, int]]:
        if not raw_bbox:
            return None
            
        if isinstance(raw_bbox, dict):
            if {'x', 'y', 'width', 'height'}.issubset(raw_bbox.keys()):
                return {k: int(raw_bbox[k]) for k in ['x', 'y', 'width', 'height']}
            if {'x1', 'y1', 'x2', 'y2'}.issubset(raw_bbox.keys()):
                x1, y1, x2, y2 = [int(raw_bbox[k]) for k in ['x1', 'y1', 'x2', 'y2']]
                return {"x": x1, "y": y1, "width": max(0, x2 - x1), "height": max(0, y2 - y1)}
                
        if isinstance(raw_bbox, (list, tuple)) and len(raw_bbox) == 4:
            return {"x": int(raw_bbox[0]), "y": int(raw_bbox[1]), "width": int(raw_bbox[2]), "height": int(raw_bbox[3])}
            
        return None

    try:
        prompt = _legend_prompt()
        raw_text = ""

        if LLM_PROVIDER == "gemini":
            if not gemini_model: raise RuntimeError("Gemini not configured")
            
            img_pil = Image.fromarray(image_data)
            resp = gemini_model.generate_content(
                [prompt, img_pil],
                generation_config=GenerationConfig(
                    response_mime_type="application/json",
                    temperature=0.2,
                ),
            )
            raw_text = resp.text

        elif LLM_PROVIDER == "openai":
            if not openai_client: raise RuntimeError("OpenAI not configured")
            
            media_type, b64 = _image_to_png_base64(image_data)
            data_url = f"data:{media_type};base64,{b64}"
            
            resp = openai_client.chat.completions.create(
                model=OPENAI_MODEL_NAME,
                messages=[
                    {
                        "role": "user",
                        "content": [
                            {"type": "text", "text": prompt},
                            {"type": "image_url", "image_url": {"url": data_url}},
                        ],
                    }
                ],
                temperature=0.2,
                response_format={"type": "json_object"},
            )
            raw_text = resp.choices[0].message.content

        elif LLM_PROVIDER == "anthropic":
            if not anthropic_client: raise RuntimeError("Anthropic not configured")
            
            media_type, b64 = _image_to_png_base64(image_data)
            resp = anthropic_client.messages.create(
                model=CLAUDE_MODEL_NAME,
                max_tokens=2000,
                temperature=0.2,
                messages=[
                    {
                        "role": "user",
                        "content": [
                            {"type": "text", "text": prompt},
                            {
                                "type": "image",
                                "source": {"type": "base64", "media_type": media_type, "data": b64},
                            },
                        ],
                    }
                ],
            )
            raw_text = "".join([b.text for b in resp.content if getattr(b, "type", None) == "text"])

        else:
            raise ValueError(f"Unsupported LLM_PROVIDER: {LLM_PROVIDER}")

        clean_json = _extract_json_from_text(raw_text)
        legend_data = json.loads(clean_json)
        
        legend_data['source_file'] = path_obj.name

        raw_bbox = (legend_data.get('legend_bbox') or 
                   legend_data.get('legend_bounding_box') or 
                   legend_data.get('bbox'))
                   
        norm_bbox = _normalize_bbox_local(raw_bbox)
        
        if norm_bbox is None and legend_data.get('has_legend'):
            norm_bbox = _infer_bbox_from_location(image_data, legend_data.get('legend_location'))
            
        legend_data['legend_bbox'] = norm_bbox

        return legend_data

    except Exception as e:
        return {
            'has_legend': False,
            'error': str(e),
            'source_file': path_obj.name,
            'raw_response': locals().get('raw_text', '')
        }

/Users/rishi/.zshenv:.:2: no such file or directory: /Users/rishi/.cargo/env
Collecting openai
  Obtaining dependency information for openai from https://files.pythonhosted.org/packages/27/4b/7c1a00c2c3fbd004253937f7520f692a9650767aa73894d7a34f0d65d3f4/openai-2.14.0-py3-none-any.whl.metadata
  Downloading openai-2.14.0-py3-none-any.whl.metadata (29 kB)
Collecting distro<2,>=1.7.0 (from openai)
  Obtaining dependency information for distro<2,>=1.7.0 from https://files.pythonhosted.org/packages/12/b3/231ffd4ab1fc9d679809f356cebee130ac7daa00d6d6f3206dd4fd137e9e/distro-1.9.0-py3-none-any.whl.metadata
  Downloading distro-1.9.0-py3-none-any.whl.metadata (6.8 kB)
Collecting jiter<1,>=0.10.0 (from openai)
  Obtaining dependency information for jiter<1,>=0.10.0 from https://files.pythonhosted.org/packages/10/c1/40c9f7c22f5e6ff715f28113ebaba27ab85f9af2660ad6e1dd6425d14c19/jiter-0.12.0-cp311-cp311-macosx_11_0_arm64.whl.metadata
  Downloading jiter-0.12.0-cp311-cp311-macosx_11_0_arm64.whl.metadat

In [None]:
# Batch process all maps in input folder
input_folder = "input/"
output_file = "output/legend_extractions_llm.json"

image_files = glob.glob(os.path.join(input_folder, "*.png"))

print(f"üîç Found {len(image_files)} maps to process\n")

all_results = []

for idx, img_path in enumerate(image_files, 1):
    print(f"[{idx}/{len(image_files)}] Processing: {os.path.basename(img_path)}...")
    
    try:
        pil_image = Image.open(img_path).convert('RGB')
        image_data = np.array(pil_image)
        
        result = extract_legend_with_llm(img_path, image_data)
        all_results.append(result)
        
        if result.get('has_legend'):
            bbox = result.get('legend_bbox', {})
            print(f"  ‚úÖ Legend found at: ({bbox.get('x')}, {bbox.get('y')}, {bbox.get('width')}x{bbox.get('height')})")
        else:
            print(f"  ‚ùå No legend detected")
            
    except Exception as e:
        print(f"  ‚ö†Ô∏è Error: {e}")
        all_results.append({
            'source_file': os.path.basename(img_path),
            'has_legend': False,
            'error': str(e)
        })
    
    print()

# Save all results
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with open(output_file, 'w') as f:
    json.dump(all_results, f, indent=2)

print(f"üíæ Saved {len(all_results)} results to {output_file}")

# Summary
successful = sum(1 for r in all_results if r.get('has_legend'))
print(f"\nüìä Summary: {successful}/{len(all_results)} maps had detectable legends")

# Display sample result
if all_results:
    print("\n--- Sample Result ---")
    print(json.dumps(all_results[0], indent=2))

üìÇ Loading image: input/appleton-post-crescent-oct-24-1922-p-13.png...
ü§ñ Analyzing with None...


KeyboardInterrupt: 