In [None]:
# Cell 1 - Install / imports + env load
# Fresh venv? Uncomment once:
# %pip install -r requirements.txt

import os
import json
import hashlib
import sqlite3
import re
from dataclasses import dataclass
from typing import Any, Dict, Optional, List, Tuple
from datetime import datetime, timezone

import requests
import pandas as pd
from dotenv import load_dotenv

load_dotenv()  # loads .env from the notebook working directory

EXA_API_KEY = (os.getenv("EXA_API_KEY") or "").strip()
EXA_SMOKE_NO_NETWORK = (os.getenv("EXA_SMOKE_NO_NETWORK") or "0").strip() == "1"

if not EXA_API_KEY and not EXA_SMOKE_NO_NETWORK:
    raise RuntimeError(
        "Missing EXA_API_KEY. Create .env from .env.example and set EXA_API_KEY=...\n"
        "For local smoke runs without network/API billing, set EXA_SMOKE_NO_NETWORK=1."
    )

if EXA_SMOKE_NO_NETWORK:
    print("Smoke mode enabled (no network): using local mock Exa responses.")
else:
    print("Loaded EXA_API_KEY from .env (not printing it).")


In [None]:
# Cell 2 - Config

CONFIG = {
    # Exa search request settings
    "exa_endpoint": "https://api.exa.ai/search",
    "search_type": "auto",        # auto | neural | fast | deep
    "category": "people",         # keep focused on professional people profiles
    "num_results": 5,               # cheap default
    "user_location": "US",

    # Contents toggles (each enabled content type can add per-result cost)
    "use_text": False,              # expensive; leave off for first pass
    "use_highlights": True,         # recommended cheap default
    "highlights_per_url": 1,
    "highlight_num_sentences": 2,
    "use_summary": False,           # keep off unless needed

    # Optional domain controls (leave empty unless you want stricter scoping)
    "include_domains": [],
    "exclude_domains": [],

    # Safety / moderation
    "moderation": True,
    "redact_emails_phones": True,

    # Budget (hard stop for uncached calls only)
    "budget_cap_usd": 7.50,

    # Cache (prevents re-billing on repeat runs)
    "sqlite_path": "exa_cache.sqlite",
    "cache_ttl_hours": 24 * 30,  # 30 days
}

# Pricing assumptions (keep aligned with Exa pricing page before production use)
PRICING = {
    # Search request pricing tier by requested result count
    "search_1_25": 0.005,
    "search_26_100": 0.025,

    # Content extraction pricing (per page, per content type)
    "content_text_per_page": 0.001,
    "content_highlights_per_page": 0.001,
    "content_summary_per_page": 0.001,
}

print("CONFIG loaded")
print(json.dumps(CONFIG, indent=2))


In [None]:
# Cell 3 - Exa call wrapper

def _canonical_json(obj: Any) -> str:
    return json.dumps(obj, sort_keys=True, ensure_ascii=False, separators=(",", ":"))


def _sha256(s: str) -> str:
    return hashlib.sha256(s.encode("utf-8")).hexdigest()


def _estimate_cost_from_pricing(payload: Dict[str, Any], num_results: int) -> float:
    search_cost = PRICING["search_1_25"] if num_results <= 25 else PRICING["search_26_100"]

    contents_cost = 0.0
    contents = payload.get("contents") or {}
    if contents.get("text") is True:
        contents_cost += num_results * PRICING["content_text_per_page"]
    if isinstance(contents.get("highlights"), dict):
        contents_cost += num_results * PRICING["content_highlights_per_page"]
    if isinstance(contents.get("summary"), dict):
        contents_cost += num_results * PRICING["content_summary_per_page"]

    return round(search_cost + contents_cost, 6)


EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE)
PHONE_RE = re.compile(r"(\+?\d[\d\-\s().]{7,}\d)")
STREETISH_RE = re.compile(
    r"\b\d{1,5}\s+[A-Za-z0-9.\-]+\s+(Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Circle|Cir|Way)\b",
    re.IGNORECASE,
)


def redact_text(s: Optional[str]) -> Optional[str]:
    if not s or not CONFIG.get("redact_emails_phones", True):
        return s
    s = EMAIL_RE.sub("[REDACTED_EMAIL]", s)
    s = PHONE_RE.sub("[REDACTED_PHONE]", s)
    s = STREETISH_RE.sub("[REDACTED_ADDRESS]", s)
    return s


def extract_preview(result: Dict[str, Any], max_chars: int = 280) -> str:
    highlights = result.get("highlights")
    if isinstance(highlights, list) and highlights:
        return redact_text(" | ".join(str(x) for x in highlights)) or ""
    text = result.get("text")
    if isinstance(text, str) and text:
        return (redact_text(text[:max_chars]) or "")
    summary = result.get("summary")
    if isinstance(summary, str):
        return (redact_text(summary[:max_chars]) or "")
    return ""


@dataclass
class ExaCallMeta:
    cache_hit: bool
    request_hash: str
    request_payload: Dict[str, Any]
    estimated_cost_usd: float
    actual_cost_usd: Optional[float]
    request_id: Optional[str]
    resolved_search_type: Optional[str]
    created_at_utc: str


def build_exa_payload(query: str, *, num_results: Optional[int] = None) -> Dict[str, Any]:
    num_results = int(num_results or CONFIG["num_results"])
    payload: Dict[str, Any] = {
        "query": query,
        "type": CONFIG["search_type"],
        "category": CONFIG["category"],
        "numResults": num_results,
        "userLocation": CONFIG["user_location"],
        "moderation": CONFIG["moderation"],
    }

    if CONFIG["include_domains"]:
        payload["includeDomains"] = CONFIG["include_domains"]
    if CONFIG["exclude_domains"]:
        payload["excludeDomains"] = CONFIG["exclude_domains"]

    contents: Dict[str, Any] = {}
    if CONFIG["use_text"]:
        contents["text"] = True
    if CONFIG["use_highlights"]:
        contents["highlights"] = {
            "highlightsPerUrl": CONFIG["highlights_per_url"],
            "numSentences": CONFIG["highlight_num_sentences"],
        }
    if CONFIG["use_summary"]:
        contents["summary"] = {
            "query": "Summarize the person's professional background and insurance/CAT relevance."
        }

    if contents:
        payload["contents"] = contents

    return payload


def exa_search_people(query: str, *, num_results: Optional[int] = None) -> Tuple[Dict[str, Any], ExaCallMeta]:
    payload = build_exa_payload(query, num_results=num_results)
    estimated_cost = _estimate_cost_from_pricing(payload, int(payload["numResults"]))

    response_json, cache_hit = cache_get_or_set(payload, estimated_cost)  # defined in Cell 4

    actual_cost = None
    if isinstance(response_json, dict):
        cost = response_json.get("costDollars")
        if isinstance(cost, dict) and isinstance(cost.get("total"), (int, float)):
            actual_cost = float(cost["total"])

    meta = ExaCallMeta(
        cache_hit=cache_hit,
        request_hash=_sha256(_canonical_json(payload)),
        request_payload=payload,
        estimated_cost_usd=estimated_cost,
        actual_cost_usd=actual_cost,
        request_id=response_json.get("requestId") if isinstance(response_json, dict) else None,
        resolved_search_type=response_json.get("resolvedSearchType") if isinstance(response_json, dict) else None,
        created_at_utc=datetime.now(timezone.utc).isoformat(),
    )
    return response_json, meta


In [None]:
# Cell 4 - Cache wrapper (sqlite) + budget enforcement

def _db() -> sqlite3.Connection:
    conn = sqlite3.connect(CONFIG["sqlite_path"])
    conn.execute(
        """
        CREATE TABLE IF NOT EXISTS exa_cache (
            request_hash TEXT PRIMARY KEY,
            request_json TEXT NOT NULL,
            response_json TEXT NOT NULL,
            estimated_cost_usd REAL NOT NULL,
            actual_cost_usd REAL,
            created_at_utc TEXT NOT NULL
        )
        """
    )
    conn.execute(
        """
        CREATE TABLE IF NOT EXISTS exa_ledger (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            request_hash TEXT NOT NULL,
            query TEXT NOT NULL,
            cache_hit INTEGER NOT NULL,
            estimated_cost_usd REAL NOT NULL,
            actual_cost_usd REAL,
            created_at_utc TEXT NOT NULL
        )
        """
    )
    conn.commit()
    return conn


def cache_lookup(request_hash: str) -> Optional[Dict[str, Any]]:
    conn = _db()
    try:
        row = conn.execute(
            "SELECT response_json, created_at_utc FROM exa_cache WHERE request_hash = ?",
            (request_hash,),
        ).fetchone()
    finally:
        conn.close()

    if not row:
        return None

    response_json, created_at_utc = row
    try:
        created = datetime.fromisoformat(str(created_at_utc).replace("Z", "+00:00"))
    except ValueError:
        return None

    age_hours = (datetime.now(timezone.utc) - created).total_seconds() / 3600
    if age_hours > float(CONFIG["cache_ttl_hours"]):
        return None

    return json.loads(response_json)


def cache_store(request_hash: str, request_payload: Dict[str, Any], response_json: Dict[str, Any], estimated_cost: float) -> None:
    actual_cost = None
    if isinstance(response_json, dict):
        cost = response_json.get("costDollars")
        if isinstance(cost, dict) and isinstance(cost.get("total"), (int, float)):
            actual_cost = float(cost["total"])

    conn = _db()
    try:
        conn.execute(
            "INSERT OR REPLACE INTO exa_cache (request_hash, request_json, response_json, estimated_cost_usd, actual_cost_usd, created_at_utc) VALUES (?, ?, ?, ?, ?, ?)",
            (
                request_hash,
                _canonical_json(request_payload),
                _canonical_json(response_json),
                float(estimated_cost),
                actual_cost,
                datetime.now(timezone.utc).isoformat(),
            ),
        )
        conn.commit()
    finally:
        conn.close()


def ledger_add(request_hash: str, query: str, cache_hit: bool, estimated_cost: float, actual_cost: Optional[float]) -> None:
    conn = _db()
    try:
        conn.execute(
            "INSERT INTO exa_ledger (request_hash, query, cache_hit, estimated_cost_usd, actual_cost_usd, created_at_utc) VALUES (?, ?, ?, ?, ?, ?)",
            (
                request_hash,
                query,
                1 if cache_hit else 0,
                float(estimated_cost),
                None if actual_cost is None else float(actual_cost),
                datetime.now(timezone.utc).isoformat(),
            ),
        )
        conn.commit()
    finally:
        conn.close()


def ledger_summary() -> pd.DataFrame:
    conn = _db()
    try:
        df = pd.read_sql_query("SELECT * FROM exa_ledger ORDER BY id ASC", conn)
    finally:
        conn.close()

    expected_cols = [
        "id",
        "request_hash",
        "query",
        "cache_hit",
        "estimated_cost_usd",
        "actual_cost_usd",
        "created_at_utc",
    ]
    if df.empty:
        return pd.DataFrame(columns=expected_cols)
    return df


def spend_so_far() -> Dict[str, float]:
    df = ledger_summary()
    if df.empty:
        return {
            "request_count": 0,
            "cache_hits": 0,
            "uncached_calls": 0,
            "spent_usd": 0.0,
            "avg_cost_per_uncached_query": 0.0,
        }

    billable = []
    for _, row in df.iterrows():
        if int(row["cache_hit"]) == 1:
            billable.append(0.0)
        elif pd.notna(row["actual_cost_usd"]):
            billable.append(float(row["actual_cost_usd"]))
        else:
            billable.append(float(row["estimated_cost_usd"]))

    df = df.copy()
    df["billable_cost_usd"] = billable
    uncached_mask = df["cache_hit"].astype(int) == 0
    uncached_count = int(uncached_mask.sum())
    avg_uncached = float(df.loc[uncached_mask, "billable_cost_usd"].mean()) if uncached_count else 0.0

    return {
        "request_count": int(len(df)),
        "cache_hits": int((df["cache_hit"].astype(int) == 1).sum()),
        "uncached_calls": uncached_count,
        "spent_usd": round(float(df["billable_cost_usd"].sum()), 6),
        "avg_cost_per_uncached_query": round(avg_uncached, 6),
    }


def enforce_budget(next_estimated_cost: float) -> None:
    metrics = spend_so_far()
    projected_spend = float(metrics["spent_usd"]) + float(next_estimated_cost)
    if projected_spend > float(CONFIG["budget_cap_usd"]):
        raise RuntimeError(
            "Budget cap exceeded before uncached Exa call.\n"
            f"Spent so far: ${metrics['spent_usd']:.4f}\n"
            f"Next call estimate: ${float(next_estimated_cost):.4f}\n"
            f"Cap: ${float(CONFIG['budget_cap_usd']):.2f}\n"
            "Lower num_results and/or disable text/summary, or rerun cached queries."
        )


def _mock_exa_response(payload: Dict[str, Any]) -> Dict[str, Any]:
    query = str(payload.get("query") or "")
    num_results = int(payload.get("numResults") or 5)
    contents = payload.get("contents") or {}
    wants_highlights = isinstance(contents.get("highlights"), dict)
    wants_text = contents.get("text") is True
    wants_summary = isinstance(contents.get("summary"), dict)

    slug = _sha256(query)[:8]
    results: List[Dict[str, Any]] = []
    for i in range(num_results):
        title = f"Mock Professional Result {i+1} - CAT loss / insurance expert"
        url = f"https://www.linkedin.com/in/mock-{slug}-{i+1}"
        item: Dict[str, Any] = {
            "id": f"mock-{slug}-{i+1}",
            "title": title,
            "url": url,
        }
        if wants_highlights:
            item["highlights"] = [
                f"Mock highlight for query: {query}. Public professional profile for insurance litigation and expert witness context."
            ]
            item["highlightScores"] = [0.99]
        if wants_text:
            item["text"] = (
                "Mock text body. Public/professional info only. No personal addresses or contact harvesting. "
                f"Query context: {query}."
            )
        if wants_summary:
            item["summary"] = "Mock summary: relevant professional background for insurance/CAT-loss workflow evaluation."
        results.append(item)

    return {
        "requestId": f"smoke-{slug}",
        "resolvedSearchType": str(payload.get("type") or "auto"),
        "results": results,
        "costDollars": {
            "search": 0.0,
            "contents": 0.0,
            "total": 0.0,
        },
        "_smokeMode": True,
    }


def exa_http_call(payload: Dict[str, Any]) -> Dict[str, Any]:
    if EXA_SMOKE_NO_NETWORK:
        return _mock_exa_response(payload)

    if not EXA_API_KEY:
        raise RuntimeError("Missing EXA_API_KEY for live Exa request.")

    headers = {
        "x-api-key": EXA_API_KEY,
        "Content-Type": "application/json",
    }
    response = requests.post(CONFIG["exa_endpoint"], headers=headers, json=payload, timeout=60)
    response.raise_for_status()
    return response.json()


def cache_get_or_set(payload: Dict[str, Any], estimated_cost: float) -> Tuple[Dict[str, Any], bool]:
    request_hash = _sha256(_canonical_json(payload))
    cached = cache_lookup(request_hash)
    if cached is not None:
        cached_cost = None
        if isinstance(cached, dict):
            cost = cached.get("costDollars")
            if isinstance(cost, dict) and isinstance(cost.get("total"), (int, float)):
                cached_cost = float(cost["total"])
        ledger_add(
            request_hash=request_hash,
            query=str(payload.get("query") or ""),
            cache_hit=True,
            estimated_cost=estimated_cost,
            actual_cost=cached_cost,
        )
        return cached, True

    enforce_budget(estimated_cost)
    response_json = exa_http_call(payload)
    cache_store(request_hash, payload, response_json, estimated_cost)

    actual_cost = None
    if isinstance(response_json, dict):
        cost = response_json.get("costDollars")
        if isinstance(cost, dict) and isinstance(cost.get("total"), (int, float)):
            actual_cost = float(cost["total"])

    ledger_add(
        request_hash=request_hash,
        query=str(payload.get("query") or ""),
        cache_hit=False,
        estimated_cost=estimated_cost,
        actual_cost=actual_cost,
    )
    return response_json, False


print("Cache ready:", CONFIG["sqlite_path"])
print("Ledger metrics:", spend_so_far())


In [None]:
# Cell 5 - Single query demo

demo_query = "Florida property insurance attorney hurricane Ian appraisal dispute site:linkedin.com"
resp_demo, meta_demo = exa_search_people(demo_query, num_results=CONFIG["num_results"])

print("Single-query demo")
print("  cache_hit:", meta_demo.cache_hit)
print("  estimated_cost_usd:", meta_demo.estimated_cost_usd)
print("  actual_cost_usd:", meta_demo.actual_cost_usd)
print("  request_id:", meta_demo.request_id)
print("  resolved_search_type:", meta_demo.resolved_search_type)

results_demo = resp_demo.get("results", []) if isinstance(resp_demo, dict) else []
rows_demo = []
for result in results_demo[: int(CONFIG["num_results"])]:
    rows_demo.append(
        {
            "title": redact_text(result.get("title")),
            "url": result.get("url"),
            "preview": extract_preview(result, max_chars=280),
            "highlightScores": result.get("highlightScores"),
        }
    )

df_demo = pd.DataFrame(rows_demo)
print("Rows returned:", len(df_demo))
df_demo


In [None]:
# Cell 6 - Batch query test suite (insurance / CAT)

# Swap this list to evaluate other workflows while keeping the rest of the notebook unchanged.
TEST_QUERIES = [
    "forensic engineer wind damage expert witness Florida property insurance site:linkedin.com",
    "building envelope consultant moisture intrusion expert witness Florida site:linkedin.com",
    "forensic accountant business interruption insurance claim expert witness site:linkedin.com",
    "meteorologist hurricane wind field expert witness litigation site:linkedin.com",
    "fire origin and cause investigator expert witness insurance litigation site:linkedin.com",
    "policyholder attorney bad faith property insurance Florida site:linkedin.com",
    "Texas hail damage property insurance attorney appraisal dispute site:linkedin.com",
    "insurance appraisal umpire property claims Florida site:linkedin.com",
    "licensed public adjuster large loss hurricane Florida commercial property site:linkedin.com",
    "Xactimate trainer estimator large loss consultant site:linkedin.com",
    "claims consultant catastrophe response litigation support property insurance site:linkedin.com",
    "insurance coverage expert witness former adjuster property claims site:linkedin.com",
    "water mitigation IICRC expert witness insurance dispute site:linkedin.com",
    "roofing consultant wind uplift tile roof expert witness Florida site:linkedin.com",
    "civil engineer structural damage assessment hurricane expert witness site:linkedin.com",
]

RELEVANCE_KEYWORDS = [
    "expert witness",
    "forensic",
    "insurance",
    "appraisal",
    "adjuster",
    "coverage",
    "litigation",
    "catastrophe",
]

batch_rows = []
for query in TEST_QUERIES:
    resp, meta = exa_search_people(query, num_results=CONFIG["num_results"])
    results = resp.get("results", []) if isinstance(resp, dict) else []
    top = results[0] if results else {}

    top_n = results[: int(CONFIG["num_results"])]
    linkedin_present = any("linkedin.com" in str(r.get("url") or "").lower() for r in top_n)

    relevance_text = []
    for r in top_n:
        parts = [str(r.get("title") or "")]
        highlights = r.get("highlights")
        if isinstance(highlights, list):
            parts.extend(str(x) for x in highlights)
        text = r.get("text")
        if isinstance(text, str):
            parts.append(text[:200])
        relevance_text.append(" ".join(parts).lower())
    relevance_keywords_present = any(any(k in blob for k in RELEVANCE_KEYWORDS) for blob in relevance_text)

    batch_rows.append(
        {
            "query": query,
            "cache_hit": meta.cache_hit,
            "est_cost_usd": meta.estimated_cost_usd,
            "actual_cost_usd": meta.actual_cost_usd,
            "top_title": redact_text(top.get("title")) if isinstance(top, dict) else None,
            "top_url": top.get("url") if isinstance(top, dict) else None,
            "top_preview": extract_preview(top, max_chars=220) if isinstance(top, dict) else "",
            "linkedin_present": linkedin_present,
            "relevance_keywords_present": relevance_keywords_present,
            "result_count": len(results),
        }
    )

df_batch = pd.DataFrame(batch_rows)
print(f"Batch queries executed: {len(df_batch)}")
df_batch


In [None]:
# Cell 7 - Summary table + qualitative notes

df_ledger = ledger_summary()
summary = spend_so_far()

print(f"Request count: {summary['request_count']}")
print(f"Cache hits vs uncached calls: {summary['cache_hits']} vs {summary['uncached_calls']}")
print(f"Spent so far (USD): ${summary['spent_usd']:.4f}")
print(f"Avg cost per uncached query (USD): ${summary['avg_cost_per_uncached_query']:.4f}")

summary_table = pd.DataFrame([summary])
print("\nSummary table")
try:
    print(summary_table.to_markdown(index=False))
except Exception:
    print(summary_table)

qualitative_notes: List[str] = []
if not df_batch.empty:
    relevance_rate = float(df_batch["relevance_keywords_present"].mean())
    linkedin_rate = float(df_batch["linkedin_present"].mean())
    avg_results = float(df_batch["result_count"].mean())
    qualitative_notes.append(f"Observed relevance-keyword signal rate: {relevance_rate:.0%} across batch queries.")
    qualitative_notes.append(f"LinkedIn/public professional profile signal appeared in {linkedin_rate:.0%} of queries.")
    qualitative_notes.append(f"Average result count returned per query: {avg_results:.1f} (configured num_results={CONFIG['num_results']}).")
else:
    qualitative_notes.append("No batch results yet. Run Cell 6 first.")

if CONFIG["use_text"]:
    qualitative_notes.append("Text is enabled: higher evidence quality, higher cost. Consider disabling for initial screening.")
else:
    qualitative_notes.append("Text is disabled: cheaper baseline. Highlights should usually be enough for triage.")

if CONFIG["use_summary"]:
    qualitative_notes.append("Summary is enabled: validate value before scaling because it adds per-result cost.")
else:
    qualitative_notes.append("Summary is disabled (recommended for cost-sensitive baseline testing).")

if EXA_SMOKE_NO_NETWORK:
    qualitative_notes.append("Smoke mode is on: results are mocked and costs are zero; use a real API key for live quality/cost evaluation.")

print("\nQualitative notes")
for note in qualitative_notes:
    print("-", note)

review_cols = [
    "query",
    "top_title",
    "top_url",
    "top_preview",
    "linkedin_present",
    "relevance_keywords_present",
    "cache_hit",
    "actual_cost_usd",
    "est_cost_usd",
]
df_batch[review_cols]


In [None]:
# Cell 8 - Cost estimate + projections

observed_avg_uncached = float(summary.get("avg_cost_per_uncached_query", 0.0))
observed_spent = float(summary.get("spent_usd", 0.0))

# If a full rerun is cached (avg becomes 0), fall back to a config-based estimate so projections stay useful.
projection_basis = "observed_avg_uncached"
projection_unit_cost = observed_avg_uncached
if projection_unit_cost <= 0:
    baseline_payload = build_exa_payload("baseline projection query", num_results=CONFIG["num_results"])
    projection_unit_cost = _estimate_cost_from_pricing(baseline_payload, int(baseline_payload["numResults"]))
    projection_basis = "estimated_from_current_config"

projections = {
    "projection_basis": projection_basis,
    "unit_cost_usd": round(projection_unit_cost, 6),
    "projected_100_queries_usd": round(projection_unit_cost * 100, 4),
    "projected_1000_queries_usd": round(projection_unit_cost * 1000, 4),
    "projected_10000_queries_usd": round(projection_unit_cost * 10000, 4),
}

print(f"Spent so far (USD): ${observed_spent:.4f}")
print(f"Projection basis: {projection_basis}")
print(f"Projected cost for 100 queries:   ${projections['projected_100_queries_usd']:.4f}")
print(f"Projected cost for 1,000 queries: ${projections['projected_1000_queries_usd']:.4f}")
print(f"Projected cost for 10,000 queries:${projections['projected_10000_queries_usd']:.4f}")

cost_projection_table = pd.DataFrame([projections])
cost_projection_table


In [None]:
# Cell 9 - Decision rubric + recommendation integration points

RUBRIC = {
    "Search quality (people relevance)": "Do results return the right categories of professionals for CAT-loss / claim-dispute work?",
    "Coverage": "Does it find credible candidates beyond obvious first-page names?",
    "Evidence quality": "Are highlights/snippets sufficient for triage without pulling full page text?",
    "Latency": "Is response time acceptable for interactive analyst workflows?",
    "Cost": "Is unit cost predictable and within your budget at projected volumes?",
    "Safety/compliance": "Can outputs stay public/professional only (no doxxing/contact harvesting)?",
    "Repeatability": "Do cached reruns reliably reproduce demo/eval behavior without rebilling?",
}

print("Decision rubric (mark manually after review)")
for criterion, prompt in RUBRIC.items():
    print(f"- [ ] {criterion}: {prompt}")


def recommendation(summary_metrics: Dict[str, Any], batch_df: pd.DataFrame) -> Dict[str, Any]:
    if batch_df.empty:
        relevance_rate = 0.0
        linkedin_rate = 0.0
    else:
        relevance_rate = float(batch_df["relevance_keywords_present"].mean())
        linkedin_rate = float(batch_df["linkedin_present"].mean())

    avg_cost = float(summary_metrics.get("avg_cost_per_uncached_query", 0.0))

    headline = "Integrate only for scoped workflows"
    if relevance_rate >= 0.70 and (avg_cost <= 0.02 or EXA_SMOKE_NO_NETWORK):
        headline = "Integrate (with human review and budget guards)"
    if relevance_rate < 0.50 or avg_cost > 0.05:
        headline = "Do not integrate at current settings"

    return {
        "headline_recommendation": headline,
        "observed_relevance_rate": round(relevance_rate, 3),
        "observed_linkedin_rate": round(linkedin_rate, 3),
        "avg_cost_per_uncached_query_usd": round(avg_cost, 4),
        "budget_cap_usd": float(CONFIG["budget_cap_usd"]),
        "safety_guardrails": [
            "Public/professional info only",
            "No address hunting or contact harvesting",
            "Keep redaction enabled for displayed snippets",
            "Human review required before any operational use",
        ],
        "integration_points": [
            {
                "workflow": "Expert / professional discovery",
                "value": "Find candidate experts and collect source URLs plus short relevance snippets for analyst review.",
                "safe_pattern": "Query by role + peril + jurisdiction; store only title/url/highlights unless deeper review is justified.",
            },
            {
                "workflow": "Consultant / witness context enrichment",
                "value": "When a report names a professional, pull public bios/publications for litigation context.",
                "safe_pattern": "Search by name + role + insurance/litigation terms; do not harvest private contact data.",
            },
            {
                "workflow": "Claim dispute research triage",
                "value": "Quickly identify likely relevant disciplines (forensic engineer, meteorologist, accountant, etc.).",
                "safe_pattern": "Use highlights-on/text-off default, then selectively expand only high-value results.",
            },
        ],
        "next_tuning_moves": [
            "Keep highlights on, text off, summary off, num_results=5 for baseline cost testing.",
            "Try include_domains for stricter professional-source targeting if relevance is noisy.",
            "Enable text/summary only for a second-pass workflow on shortlisted results.",
        ],
    }

rec = recommendation(summary, df_batch)
print("\nRecommendation output")
print(json.dumps(rec, indent=2))
