In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current sessionv

In [1]:
!pip install pymysql

Collecting pymysql
  Downloading pymysql-1.1.2-py3-none-any.whl.metadata (4.3 kB)
Downloading pymysql-1.1.2-py3-none-any.whl (45 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.3/45.3 kB[0m [31m87.3 kB/s[0m eta [36m0:00:00[0mta [36m0:00:01[0m
[?25hInstalling collected packages: pymysql
Successfully installed pymysql-1.1.2


In [7]:
# Provenance Radar++ — AIC(3k) + Cleveland ingest, batched embeddings, robust DB retries (GPU optional)

# -------- quiet & device selection --------
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["TRANSFORMERS_VERBOSITY"] = "error"
os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1"

USE_GPU = os.getenv("USE_GPU", "1") == "1"  # set to "1" if you turned on Kaggle GPU
if not USE_GPU:
    os.environ["CUDA_VISIBLE_DEVICES"] = ""   # force CPU unless USE_GPU=1

# -------- std imports --------
import re, json, time, warnings
import certifi, pymysql, requests, torch
from datetime import datetime
from sentence_transformers import SentenceTransformer
from transformers.utils.logging import set_verbosity_error
from pymysql.err import OperationalError

# Progress bar import (FIXED to be notebook-aware)
def _is_notebook() -> bool:
    """Checks if the code is running in a Jupyter-like notebook environment."""
    try:
        from IPython import get_ipython
        shell = get_ipython().__class__.__name__
        if 'Shell' in shell:
            return shell == 'ZMQInteractiveShell' # Jupyter, Kaggle, Colab
        return False
    except (NameError, ImportError):
        return False # Not in an IPython environment

try:
    if _is_notebook():
        from tqdm.notebook import tqdm
    else:
        from tqdm import tqdm
    HAS_TQDM = True
except ImportError:
    HAS_TQDM = False
    print("Install tqdm for better progress bars: pip install tqdm")


warnings.filterwarnings("ignore")
set_verbosity_error()

# =============================
# Config (tweak here)
# =============================
DB_NAME        = os.getenv("TIDB_DB", "test")
TARGET_VEC_DIM = int(os.getenv("VEC_DIM", "1536"))

# AIC: page through to target (fast + reliable)
AIC_TARGET     = int(os.getenv("AIC_TARGET", "3000"))
AIC_PAGE_SIZE  = int(os.getenv("AIC_PAGE_SIZE", "100"))

# Cleveland Museum of Art (no key)
CMA_TARGET = int(os.getenv("CMA_TARGET", "0"))  # was 1500
CMA_PAGE_SIZE      = int(os.getenv("CMA_PAGE_SIZE", "100"))
CMA_MAX_TIME_SEC = int(os.getenv("CMA_MAX_TIME_SEC", "600"))   # bail after time budget

# batching controls
SENT_FLUSH_BATCH   = int(os.getenv("SENT_FLUSH_BATCH", "1200"))  # sentences per flush to DB
EMBED_BATCH_SIZE   = int(os.getenv("EMBED_BATCH_SIZE", "128"))   # model batch size
BULK_COMMIT_EVERY  = int(os.getenv("BULK_COMMIT_EVERY", "400"))  # commit every N objects

# TiDB conn (your defaults)
TIDB_HOST = os.getenv("TIDB_HOST", "gateway01.ap-northeast-1.prod.aws.tidbcloud.com")
TIDB_PORT = int(os.getenv("TIDB_PORT", "4000"))
TIDB_USER = os.getenv("TIDB_USER", "user")
TIDB_PASS = os.getenv("TIDB_PASS", "password")

# =============================
# Global connection + helpers
# =============================
conn = None
cursor = None

def _connect():
    global conn, cursor
    try:
        if conn: conn.close()
    except Exception:
        pass
    conn = pymysql.connect(
        host=TIDB_HOST, port=TIDB_PORT, user=TIDB_USER, password=TIDB_PASS,
        database=DB_NAME, ssl={"ca": certifi.where()},
        ssl_verify_cert=True, ssl_verify_identity=True, autocommit=True,
    )
    cursor = conn.cursor()
    cursor.execute(f"CREATE DATABASE IF NOT EXISTS `{DB_NAME}`;")
    cursor.execute(f"USE `{DB_NAME}`;")

def _is_recoverable(op_err: OperationalError) -> bool:
    return isinstance(op_err, OperationalError) and op_err.args and op_err.args[0] in (2006, 2013)

def run(sql, args=None, many=False, max_retries=3):
    """Execute SQL with auto-reconnect + retry."""
    global conn, cursor
    for attempt in range(1, max_retries + 1):
        try:
            conn.ping(reconnect=True)
            if many:
                cursor.executemany(sql, args or [])
            else:
                cursor.execute(sql, args or ())
            return
        except OperationalError as e:
            if _is_recoverable(e) and attempt < max_retries:
                time.sleep(0.5 * attempt)
                _connect()
                continue
            raise

def run_fetchall(sql, args=None, max_retries=3):
    run(sql, args=args, max_retries=max_retries)
    return cursor.fetchall()

def run_fetchone(sql, args=None, max_retries=3):
    run(sql, args=args, max_retries=max_retries)
    return cursor.fetchone()

_connect()

# =============================
# Embeddings (batched) + device
# =============================
device = "cuda" if (USE_GPU and torch.cuda.is_available()) else "cpu"
model = SentenceTransformer("all-MiniLM-L6-v2", device=device)  # 384-dim

def _pad(vec, dim=TARGET_VEC_DIM):
    return vec[:dim] + [0.0] * max(0, dim - len(vec))

def embed_batch(sentences):
    if not sentences: return []
    embs = model.encode(sentences, batch_size=EMBED_BATCH_SIZE, show_progress_bar=False, convert_to_numpy=True)
    return [_pad(e.tolist()) for e in embs]

# =============================
# Heuristics: events / risks
# =============================
EVENT_PATTERNS = {
    "SOLD": r"\bsold\b|\bauction\b|\bpurchased\b|\bbought\b",
    "SEIZED": r"\b(seized|confiscated|looted|spoliated)\b",
    "ACQUIRED": r"\bacquired\b|\bobtained\b|\bcollection of\b",
    "EXPORTED": r"\bexported\b|\bremoved from\b|\bsmuggled\b",
    "LOOTED": r"\blooted\b|\bspoliated\b",
    "STOLEN": r"\bstolen\b|\btheft\b|\bpilfered\b",
}
RISK_KEYWORDS = ["seized","confiscated","looted","spoliated","missing","stolen","illicit","smuggled","exported"]
NAZI_START, NAZI_END = 1933, 1945

def extract_years(sentence: str):
    years = re.findall(r"\b(1[5-9]\d{2}|20\d{2})\b", sentence)
    if not years: return None, None
    y0 = int(years[0]); y1 = int(years[-1]) if len(years) > 1 else None
    from_d = datetime(y0,1,1).date()
    to_d   = datetime(y1,1,1).date() if y1 else None
    return from_d, to_d

def extract_events(sentence: str):
    out = []
    for etype, pattern in EVENT_PATTERNS.items():
        if re.search(pattern, sentence, re.IGNORECASE):
            df, dt = extract_years(sentence)
            out.append((etype, df, dt))
    return out

def detect_textual_risks(sentence: str):
    risks = []
    s = sentence.lower()
    for kw in RISK_KEYWORDS:
        if kw in s:
            base = 0.9 if kw in {"looted","stolen","confiscated"} else 0.6
            risks.append((f"KEYWORD_{kw.upper()}", f"Matched keyword '{kw}'", base))
    if "private collection" in s and "of " not in s:
        risks.append(("PRIVATE_COLLECTION_UNKNOWN","'private collection' without owner",0.3))
    if any(w in s for w in ["sold","auction","dealer"]) and re.search(r"\b(193[3-9]|194[0-5])\b", s):
        risks.append(("NAZI_ERA_TRADE_WINDOW","Commercial activity in 1933–1945",0.6))
    return risks

def detect_event_risks(events):
    risks = []
    for etype, df, _ in events:
        year = df.year if df else None
        if etype in {"SEIZED","LOOTED","STOLEN"}:
            risks.append((f"EVENT_{etype}", f"Event type {etype}", 1.0))
        elif etype == "EXPORTED":
            risks.append(("EVENT_EXPORTED", f"Exported (year={year})", 0.8 if (year and year >= 1970) else 0.5))
        elif etype == "SOLD":
            if year and NAZI_START <= year <= NAZI_END:
                risks.append(("EVENT_SOLD_NAZI_WINDOW","Sold 1933–1945",0.6))
            else:
                risks.append(("EVENT_SOLD","Sold (neutral)",0.2))
    return risks

# =============================
# Schema bootstrap (idempotent)
# =============================
def table_exists(schema, name):
    row = run_fetchone(
        "SELECT 1 FROM information_schema.tables WHERE table_schema=%s AND table_name=%s LIMIT 1",
        (schema, name),
    )
    return row is not None

def bootstrap_schema():
    run(f"CREATE DATABASE IF NOT EXISTS `{DB_NAME}`;")
    run(f"USE `{DB_NAME}`;")

    if not table_exists(DB_NAME, "objects"):
        run("""
        CREATE TABLE objects (
          object_id BIGINT PRIMARY KEY AUTO_RANDOM,
          source VARCHAR(32) NOT NULL,
          source_object_id VARCHAR(128),
          title TEXT, creator TEXT, object_type VARCHAR(64),
          date_display VARCHAR(128), culture VARCHAR(128), country VARCHAR(128),
          wd_qid VARCHAR(32), inventory_no VARCHAR(128), collection VARCHAR(256),
          image_url TEXT,
          risk_score DECIMAL(6,3) DEFAULT 0,
          last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );""")
    
    # Add image_url column if it doesn't exist (safe for your existing DB)
    try:
        run("ALTER TABLE objects ADD COLUMN image_url TEXT;")
    except Exception:
        pass  # Column already exists (your manual addition is fine!)

    if not table_exists(DB_NAME, "provenance_sentences"):
        run(f"""
        CREATE TABLE provenance_sentences (
          sent_id BIGINT PRIMARY KEY AUTO_RANDOM,
          object_id BIGINT NOT NULL,
          seq INT, sentence TEXT,
          embedding VECTOR({TARGET_VEC_DIM}),
          CONSTRAINT fk_ps_object FOREIGN KEY (object_id) REFERENCES objects(object_id)
        );""")

    if not table_exists(DB_NAME, "provenance_events"):
        run("""
        CREATE TABLE provenance_events (
          event_id BIGINT PRIMARY KEY AUTO_RANDOM,
          object_id BIGINT NOT NULL,
          event_type ENUM('ACQUIRED','SOLD','LOOTED','SEIZED','EXCAVATED','EXPORTED','ON_LOAN','UNKNOWN') DEFAULT 'UNKNOWN',
          date_from DATE, date_to DATE, place TEXT, actor TEXT, method TEXT, source_ref TEXT,
          CONSTRAINT fk_pe_object FOREIGN KEY (object_id) REFERENCES objects(object_id)
        );""")

    if not table_exists(DB_NAME, "risk_signals"):
        run("""
        CREATE TABLE risk_signals (
          signal_id BIGINT PRIMARY KEY AUTO_RANDOM,
          object_id BIGINT NOT NULL,
          code VARCHAR(64), detail TEXT, weight DECIMAL(6,3),
          CONSTRAINT fk_rs_object FOREIGN KEY (object_id) REFERENCES objects(object_id)
        );""")

    run("DROP VIEW IF EXISTS flagged_leads;")
    run("""
    CREATE VIEW flagged_leads AS
    SELECT
      o.object_id, o.source, o.title, o.creator, o.risk_score,
      (
        SELECT GROUP_CONCAT(code ORDER BY weight DESC SEPARATOR ', ')
        FROM (
          SELECT rs.code, rs.weight
          FROM risk_signals AS rs
          WHERE rs.object_id = o.object_id
          ORDER BY rs.weight DESC
          LIMIT 3
        ) AS top3
      ) AS top_signals
    FROM objects AS o
    WHERE o.risk_score > 0
    ORDER BY o.risk_score DESC;""")

    conn.commit()
    print("✅ Schema ready.")

# =============================
# Batched buffers for speed
# =============================
class IngestBuffer:
    def __init__(self, flush_batch=SENT_FLUSH_BATCH):
        self.flush_batch = flush_batch
        self.sent_rows = []   # (object_id, seq, sentence)
        self.event_rows = []  # (object_id, etype, df, dt, place, actor, method, src)
        self.risk_rows  = []  # (object_id, code, detail, weight)

    def add_sentence(self, object_id, seq, sentence, src_label):
        self.sent_rows.append((object_id, seq, sentence))
        evs = extract_events(sentence)
        for etype, df, dt in evs:
            self.event_rows.append((object_id, etype, df, dt, None, None, None, src_label))
        for code, detail, w in detect_textual_risks(sentence):
            self.risk_rows.append((object_id, code, detail, float(w)))
        for code, detail, w in detect_event_risks(evs):
            self.risk_rows.append((object_id, code, detail, float(w)))

        if len(self.sent_rows) >= self.flush_batch:
            self.flush()

    def flush(self):
        if self.sent_rows:
            sentences = [s for (_,_,s) in self.sent_rows]
            embs = embed_batch(sentences)
            args = []
            for (obj_id, seq, snt), emb in zip(self.sent_rows, embs):
                args.append((obj_id, seq, snt, json.dumps(emb)))
            run("""INSERT INTO provenance_sentences (object_id, seq, sentence, embedding)
                   VALUES (%s,%s,%s,%s)""", args=args, many=True)
            self.sent_rows.clear()

        if self.event_rows:
            run("""INSERT INTO provenance_events
                   (object_id, event_type, date_from, date_to, place, actor, method, source_ref)
                   VALUES (%s,%s,%s,%s,%s,%s,%s,%s)""", args=self.event_rows, many=True)
            self.event_rows.clear()

        if self.risk_rows:
            run("""INSERT INTO risk_signals (object_id, code, detail, weight)
                   VALUES (%s,%s,%s,%s)""", args=self.risk_rows, many=True)
            self.risk_rows.clear()

        conn.commit()

buf = IngestBuffer()

# =============================
# Helpers
# =============================

def get_or_create_object(source, sid, title, creator, date_display, country, image_url=None):
    row = run_fetchone(
        "SELECT object_id FROM objects WHERE source=%s AND source_object_id=%s",
        (source, str(sid)),
    )
    if row:
        # Update image_url if it's missing and we have one
        if image_url:
            run("UPDATE objects SET image_url=%s WHERE object_id=%s AND (image_url IS NULL OR image_url = '')", 
                (image_url, row[0]))
        return row[0]

    # Insert new object with image_url
    run("""INSERT INTO objects (source, source_object_id, title, creator, date_display, country, image_url)
           VALUES (%s,%s,%s,%s,%s,%s,%s)""",
        (source, str(sid), title or "", creator or "", date_display or "", country or "", image_url))

    row2 = run_fetchone(
        "SELECT object_id FROM objects WHERE source=%s AND source_object_id=%s",
        (source, str(sid)),
    )
    return row2[0]

def chunk_sentences(txt: str):
    parts = re.split(r"[.;]\s+", txt.replace("\n", " ").strip())
    return [p.strip() for p in parts if len(p.strip()) > 6]

# =============================
# AIC: paginated ingest to target (FIXED)
# =============================
def ingest_aic_paged(target=AIC_TARGET, page_size=AIC_PAGE_SIZE, start_page=1):
    s = requests.Session()
    headers = {"User-Agent": "ProvenanceRadar/0.4"}
    ingested = 0; page = start_page
    
    pbar = None
    if HAS_TQDM:
        pbar = tqdm(total=target, desc="AIC Ingest", unit="obj")
    
    while ingested < target:
        # Request only image_id - we'll construct the IIIF URL ourselves
        url = ("https://api.artic.edu/api/v1/artworks"
               f"?page={page}&limit={page_size}"
               "&fields=id,title,artist_title,date_display,provenance_text,place_of_origin,image_id")
        
        r = s.get(url, headers=headers, timeout=60)
        r.raise_for_status()
        
        data = r.json().get("data", [])
        if not data: break

        for obj in data:
            prov = obj.get("provenance_text")
            if not prov: continue

            # Construct the proper AIC IIIF image URL
            image_url = None
            image_id = obj.get("image_id")
            if image_id:
                image_url = f"https://www.artic.edu/iiif/2/{image_id}/full/843,/0/default.jpg"

            oid = get_or_create_object(
                "AIC", 
                obj.get("id"), 
                obj.get("title"),
                obj.get("artist_title"), 
                obj.get("date_display"),
                obj.get("place_of_origin", ""),
                image_url
            )
            
            for i, snt in enumerate(chunk_sentences(prov)):
                buf.add_sentence(oid, i, snt, "AIC")
            ingested += 1

            if HAS_TQDM:
                pbar.update(1)

            if ingested % BULK_COMMIT_EVERY == 0:
                buf.flush()
                if not HAS_TQDM:
                    print(f"[AIC] committed {ingested}/{target} …")
            if ingested >= target: break
        page += 1

    if HAS_TQDM and pbar:
        pbar.close()
    
    buf.flush()
    print(f"[AIC] total ingested with provenance: {ingested}")
    return ingested

# =============================
# CMA: resilient paginated ingest (no key)
# Tries both known endpoints and data shapes; only keeps objects with provenance text.
# =============================
CMA_ENDPOINTS = [
    "https://openaccess-api.clevelandart.org/api/collection",
    "https://openaccess-api.clevelandart.org/api/artworks",
]

def _extract_cma_data(payload):
    # Common shapes: {"data":[...]} or {"results":[...]}
    return payload.get("data") or payload.get("results") or []

def _extract_cma_provenance(obj):
    # Try common keys
    for k in ("provenance", "provenance_text", "provenanceDescription", "provenance_display"):
        v = obj.get(k)
        if isinstance(v, str) and v.strip():
            return v.strip()
    return ""

def _extract_cma_id(obj):
    # Prefer id-like keys
    for k in ("id", "objectid", "object_id", "accession_number"):
        if obj.get(k) is not None:
            return obj.get(k)
    # fallback: try "uuid" or "inventory"
    for k in ("uuid","inventory_number"):
        if obj.get(k) is not None:
            return obj.get(k)
    return None

def _extract_cma_title(obj):
    for k in ("title","object_title","primary_title"): 
        if obj.get(k): return obj.get(k)
    return obj.get("name") or ""

def _extract_cma_creator(obj):
    for k in ("creator","artist","maker","artist_display_name"):
        if obj.get(k): return obj.get(k)
    return ""

def _extract_cma_date(obj):
    for k in ("creation_date","dated","date","display_date"):
        if obj.get(k): return obj.get(k)
    return ""

def _extract_cma_culture(obj):
    for k in ("culture","culture_name","nationality","country"):
        if obj.get(k): return obj.get(k)
    return ""

def ingest_cma(target=CMA_TARGET, page_size=CMA_PAGE_SIZE, max_time_sec=CMA_MAX_TIME_SEC):
    sess = requests.Session()
    headers = {"User-Agent": "ProvenanceRadar/0.4"}
    ingested = 0
    start_time = time.time()

    pbar = None
    if HAS_TQDM:
        pbar = tqdm(total=target, desc="CMA Ingest", unit="obj")

    # Iterate pages; stop when empty, time budget exceeded, or target met.
    page = 0
    while ingested < target and (time.time() - start_time) < max_time_sec:
        got_page = False
        for ep in CMA_ENDPOINTS:
            try:
                # Try "limit/skip"; many CMA examples support this.
                params = {"limit": page_size, "skip": page * page_size}
                r = sess.get(ep, params=params, headers=headers, timeout=60)
                if r.status_code != 200:
                    continue
                items = _extract_cma_data(r.json())
                if not isinstance(items, list) or not items:
                    continue
                got_page = True
                for obj in items:
                    prov = _extract_cma_provenance(obj)
                    if not prov:
                        continue
                    oid = _extract_cma_id(obj)
                    if oid is None:
                        continue
                    title   = _extract_cma_title(obj)
                    creator = _extract_cma_creator(obj)
                    dated   = _extract_cma_date(obj)
                    culture = _extract_cma_culture(obj)
                    dbid = get_or_create_object("CMA", oid, title, creator, dated, culture)
                    for i, snt in enumerate(chunk_sentences(prov)):
                        buf.add_sentence(dbid, i, snt, "CMA")
                    ingested += 1
                    
                    if HAS_TQDM:
                        pbar.update(1)
                    
                    if ingested % BULK_COMMIT_EVERY == 0:
                        buf.flush()
                        if not HAS_TQDM:
                            print(f"[CMA] committed {ingested}/{target} …")
                    if ingested >= target or (time.time() - start_time) >= max_time_sec:
                        break
                if ingested >= target or (time.time() - start_time) >= max_time_sec:
                    break
            except Exception:
                # Try next endpoint if this one fails
                continue

        if not got_page:
            # No endpoint produced data for this page; stop.
            break
        page += 1

    if HAS_TQDM and pbar:
        pbar.close()
    
    buf.flush()
    print(f"[CMA] total ingested with provenance: {ingested} (time {int(time.time()-start_time)}s)")
    return ingested

# =============================
# Rescoring + reporting (WITH PROGRESS BAR!)
# =============================
def rescore_all_objects():
    # Get all object IDs
    ids = [r[0] for r in run_fetchall("SELECT object_id FROM objects")]
    total_objects = len(ids)
    
    if total_objects == 0:
        print("No objects to rescore.")
        return
    
    print(f"Rescoring {total_objects:,} objects...")
    
    # Process in batches for better performance and progress tracking
    batch_size = 100
    processed = 0
    
    pbar = None
    if HAS_TQDM:
        pbar = tqdm(total=total_objects, desc="Rescoring", unit="obj", 
                    bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]')
    
    for i in range(0, total_objects, batch_size):
        batch_ids = ids[i:i + batch_size]
        
        for oid in batch_ids:
            total_risk = run_fetchone(
                "SELECT COALESCE(SUM(weight),0) FROM risk_signals WHERE object_id=%s", 
                (oid,)
            )[0] or 0
            run("UPDATE objects SET risk_score=%s WHERE object_id=%s", (total_risk, oid))
            processed += 1
            
            if HAS_TQDM:
                pbar.update(1)
            elif processed % 500 == 0:  # Fallback progress without tqdm
                print(f"  Processed {processed:,}/{total_objects:,} ({100*processed/total_objects:.1f}%)")
        
        # Commit every batch to avoid long transactions
        conn.commit()
    
    if HAS_TQDM and pbar:
        pbar.close()
    
    print(f"✅ Rescoring complete! Updated {processed:,} objects.")

def rescore_all_objects_fast():
    """Alternative: Single SQL query approach (faster for large datasets)"""
    print("Rescoring all objects with single query...")
    
    # Update all risk scores in one query
    run("""
    UPDATE objects 
    SET risk_score = (
        SELECT COALESCE(SUM(rs.weight), 0) 
        FROM risk_signals rs 
        WHERE rs.object_id = objects.object_id
    )
    """)
    
    # Get count of updated objects
    total_count = run_fetchone("SELECT COUNT(*) FROM objects")[0]
    print(f"✅ Fast rescoring complete! Updated {total_count:,} objects.")
    
    conn.commit()

def print_counts():
    for t in ["objects","provenance_sentences","provenance_events","risk_signals"]:
        count = run_fetchone(f'SELECT COUNT(*) FROM {t}')[0]
        print(f"{t}: {count:,}")

def print_top_leads(n=10):
    rows = run_fetchall(f"""
    SELECT
      o.object_id, o.source, LEFT(o.title, 80) AS title, o.risk_score,
      (
        SELECT GROUP_CONCAT(code ORDER BY weight DESC SEPARATOR ', ')
        FROM (SELECT rs.code, rs.weight FROM risk_signals rs
              WHERE rs.object_id = o.object_id ORDER BY rs.weight DESC LIMIT 3) x
      ) AS top_signals
    FROM objects o
    WHERE o.risk_score > 0
    ORDER BY o.risk_score DESC
    LIMIT {int(n)};
    """)
    if not rows:
        print("(no leads with risk_score > 0 yet)")
    else:
        for r in rows:
            print(f"#{r[0]} [{r[1]}] score={r[3]:.2f} :: {r[2]} || {r[4]}")

# =============================
# Test function to verify image URLs
# =============================
def test_aic_image_urls(limit=5):
    """Test function to verify that image URLs are working"""
    url = f"https://api.artic.edu/api/v1/artworks?limit={limit}&fields=id,title,image_id"
    r = requests.get(url)
    r.raise_for_status()
    
    data = r.json().get("data", [])
    print(f"Testing {len(data)} AIC image URLs:")
    
    for obj in data:
        image_id = obj.get("image_id")
        if image_id:
            image_url = f"https://www.artic.edu/iiif/2/{image_id}/full/843,/0/default.jpg"
            print(f"ID: {obj.get('id')} | Title: {obj.get('title', 'No title')[:50]}")
            print(f"Image URL: {image_url}")
            
            # Test if URL is accessible
            try:
                resp = requests.head(image_url, timeout=10)
                status = "✅ OK" if resp.status_code == 200 else f"❌ {resp.status_code}"
                print(f"Status: {status}")
            except Exception as e:
                print(f"❌ Error: {e}")
            print("-" * 80)
        else:
            print(f"ID: {obj.get('id')} | No image_id available")

# =============================
# Main
# =============================
if __name__ == "__main__":
    print(f"🖥️  device: {'GPU' if device=='cuda' else 'CPU'}")
    print(f"📊 Progress bars: {'✅ tqdm available' if HAS_TQDM else '❌ basic progress only'}")
    print("🔧 Bootstrapping schema…")
    bootstrap_schema()

    print(f"▶️ AIC ingest → target {AIC_TARGET} …")
    aic_n = ingest_aic_paged(AIC_TARGET, AIC_PAGE_SIZE)
    print(f"   AIC ingested: {aic_n}")

    if CMA_TARGET and int(CMA_TARGET) > 0:
        print(f"▶️ CMA ingest → target {CMA_TARGET} (time cap {CMA_MAX_TIME_SEC}s) …")
        cma_n = ingest_cma(CMA_TARGET, CMA_PAGE_SIZE, CMA_MAX_TIME_SEC)
        print(f"   CMA ingested: {cma_n}")
    else:
        print("▶️ CMA ingest skipped (CMA_TARGET=0)")

    print("♻️ Rescoring all objects…")
    rescore_all_objects()

    print("📊 Counts:")
    print_counts()

    print("\n🏁 Top leads:")
    print_top_leads(10)

    conn.commit()
    print("\n✅ Done. In SQL editor, run:")
    print("   SELECT * FROM flagged_leads ORDER BY risk_score DESC LIMIT 20;")

AIC Ingest:   4%|▍         | 114/3000 [10:13<4:18:41,  5.38s/obj]


🖥️  device: GPU
📊 Progress bars: ✅ tqdm available
🔧 Bootstrapping schema…
✅ Schema ready.
▶️ AIC ingest → target 3000 …


AIC Ingest:   0%|          | 0/3000 [00:00<?, ?obj/s]

[AIC] total ingested with provenance: 3000
   AIC ingested: 3000
▶️ CMA ingest skipped (CMA_TARGET=0)
♻️ Rescoring all objects…
Rescoring 3,000 objects...


Rescoring:   0%|          | 0/3000 [00:00<?, ?obj/s]

✅ Rescoring complete! Updated 3,000 objects.
📊 Counts:
objects: 3,000
provenance_sentences: 93,046
provenance_events: 20,583
risk_signals: 21,357

🏁 Top leads:
#5764607523034295818 [AIC] score=24.00 :: Man, Woman, and Bulls || EVENT_SOLD_NAZI_WINDOW, NAZI_ERA_TRADE_WINDOW, NAZI_ERA_TRADE_WINDOW
#8935141660703094447 [AIC] score=20.80 :: Zapata || NAZI_ERA_TRADE_WINDOW, EVENT_SOLD_NAZI_WINDOW, NAZI_ERA_TRADE_WINDOW
#7782220156096247252 [AIC] score=20.00 :: The Assumption of the Virgin || EVENT_SEIZED, EVENT_SEIZED, EVENT_SEIZED
#2305843009213754022 [AIC] score=18.20 :: Stack of Wheat (Thaw, Sunset) || EVENT_SOLD, EVENT_SOLD, EVENT_SOLD
#7782220156096247226 [AIC] score=16.80 :: Sheet of Studies with the Head of the Fornarina and Hands of Madame de Senonnes || EVENT_SEIZED, EVENT_SEIZED, EVENT_SEIZED
#8935141660703094297 [AIC] score=16.00 :: Cameo Portraying Emperor Claudius as Jupiter || NAZI_ERA_TRADE_WINDOW, NAZI_ERA_TRADE_WINDOW, NAZI_ERA_TRADE_WINDOW
#5188146770730811399 [AIC] score=1