In [None]:
import os
import json
import sqlite3
import threading
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Dict, Optional, Tuple, List

import requests


# =========================
# Constants / Globals
# =========================
TROY_OZ_TO_GRAM = 31.1034768

_FX_CACHE: Dict[Tuple[str, str], float] = {}   # (TARGET, YYYY-MM-DD) -> rate
_stop_event = threading.Event()
_worker_thread: Optional[threading.Thread] = None


# =========================
# Exceptions / Models
# =========================
class ProviderError(RuntimeError):
    pass


class RateLimited(ProviderError):
    pass


@dataclass
class Quote:
    ts_utc: str
    source: str
    currency: str

    # Spot prijs (nu)
    price_per_troy_oz: float
    price_per_gram: float

    # Dagelijkse OHLC (als beschikbaar), anders fallback = spot
    open_per_troy_oz: Optional[float]
    high_per_troy_oz: Optional[float]
    close_per_troy_oz: Optional[float]

    fx_rate_to_target: Optional[float]
    raw_json: Dict[str, Any]


# =========================
# Configuration
# =========================
def _env_bool(name: str, default: str = "1") -> bool:
    return os.getenv(name, default).strip().lower() in ("1", "true", "yes", "y", "on")


def load_config() -> Dict[str, Any]:
    """
    Reads env vars each time, so it works well in Jupyter.
    """
    db_path = os.getenv("GOLD_DB_PATH", "gold_prices.sqlite")
    target_currency = os.getenv("GOLD_CURRENCY", "USD").upper()

    providers = [
        p.strip().lower()
        for p in os.getenv("GOLD_PROVIDERS", "gold-api,goldapi-net,metals-api,metals-dev").split(",")
        if p.strip()
    ]

    cfg = {
        "DB_PATH": db_path,
        "TARGET_CURRENCY": target_currency,
        "PROVIDERS": providers,
        "REQUEST_TIMEOUT": float(os.getenv("GOLD_TIMEOUT_SEC", "20")),
        "USER_AGENT": os.getenv("GOLD_USER_AGENT", "gold-collector/1.0"),
        "GOLDAPI_NET_KEY": os.getenv("GOLDAPI_KEY", ""),
        "METALS_API_KEY": os.getenv("METALS_API_KEY", ""),
        "METALS_DEV_KEY": os.getenv("METALS_DEV_KEY", ""),
        "INTERVAL_SEC": int(os.getenv("GOLD_INTERVAL_SEC", "60")),
        # probeert dag-OHLC bij metals-api (extra call); fallback is spot
        "INCLUDE_DAILY_OHLC": _env_bool("GOLD_INCLUDE_DAILY_OHLC", "1"),
    }
    return cfg


# =========================
# Database (with migration)
# =========================
def ensure_db(path: str) -> None:
    with sqlite3.connect(path) as con:
        con.execute(
            """
            CREATE TABLE IF NOT EXISTS gold_spot_prices (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                ts_utc TEXT NOT NULL,
                source TEXT NOT NULL,
                currency TEXT NOT NULL,

                price_per_troy_oz REAL NOT NULL,
                price_per_gram REAL NOT NULL,

                open_per_troy_oz REAL,
                high_per_troy_oz REAL,
                close_per_troy_oz REAL,

                fx_rate_to_target REAL,
                raw_json TEXT NOT NULL
            )
            """
        )

        # Migration: add missing columns if you updated the script
        cols = {row[1] for row in con.execute("PRAGMA table_info(gold_spot_prices)").fetchall()}

        if "fx_rate_to_target" not in cols:
            con.execute("ALTER TABLE gold_spot_prices ADD COLUMN fx_rate_to_target REAL")
        if "open_per_troy_oz" not in cols:
            con.execute("ALTER TABLE gold_spot_prices ADD COLUMN open_per_troy_oz REAL")
        if "high_per_troy_oz" not in cols:
            con.execute("ALTER TABLE gold_spot_prices ADD COLUMN high_per_troy_oz REAL")
        if "close_per_troy_oz" not in cols:
            con.execute("ALTER TABLE gold_spot_prices ADD COLUMN close_per_troy_oz REAL")

        con.execute("CREATE INDEX IF NOT EXISTS idx_gold_ts ON gold_spot_prices(ts_utc)")
        con.execute("CREATE INDEX IF NOT EXISTS idx_gold_source_cur ON gold_spot_prices(source, currency)")


def save_quote(path: str, q: Quote) -> None:
    with sqlite3.connect(path) as con:
        con.execute(
            """INSERT INTO gold_spot_prices
               (ts_utc, source, currency,
                price_per_troy_oz, price_per_gram,
                open_per_troy_oz, high_per_troy_oz, close_per_troy_oz,
                fx_rate_to_target, raw_json)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
            (
                q.ts_utc,
                q.source,
                q.currency,
                q.price_per_troy_oz,
                q.price_per_gram,
                q.open_per_troy_oz,
                q.high_per_troy_oz,
                q.close_per_troy_oz,
                q.fx_rate_to_target,
                json.dumps(q.raw_json, ensure_ascii=False),
            ),
        )


# =========================
# HTTP helpers
# =========================
def _http_get(cfg: Dict[str, Any], url: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    headers = {"User-Agent": cfg["USER_AGENT"]}
    r = requests.get(url, params=params, headers=headers, timeout=cfg["REQUEST_TIMEOUT"])

    if r.status_code == 429:
        raise RateLimited("HTTP 429 rate limited: %s" % url)
    if r.status_code in (401, 403, 402):
        raise ProviderError("HTTP %s auth/quota: %s | %s" % (r.status_code, url, r.text[:200]))

    r.raise_for_status()
    try:
        return r.json()
    except Exception as e:
        raise ProviderError("Non-JSON response from %s: %s" % (url, e))


def _looks_like_rate_limit(payload: Dict[str, Any]) -> bool:
    txt = json.dumps(payload, ensure_ascii=False).lower()
    needles = ["rate limit", "too many", "quota", "exceeded", "limit reached", "requests per", "throttle"]
    return any(n in txt for n in needles)


# =========================
# FX (only needed when converting USD->other)
# =========================
def fx_usd_to(cfg: Dict[str, Any], target: str) -> float:
    target = target.upper()
    if target == "USD":
        return 1.0

    today = datetime.now(timezone.utc).date().isoformat()
    key = (target, today)
    if key in _FX_CACHE:
        return _FX_CACHE[key]

    data = _http_get(cfg, "https://api.frankfurter.app/latest", params={"from": "USD", "to": target})
    rate = data.get("rates", {}).get(target)
    if rate is None:
        raise ProviderError("FX rate USD->%s missing in response: %s" % (target, data))

    _FX_CACHE[key] = float(rate)
    return _FX_CACHE[key]


# =========================
# Providers
# =========================
def fetch_gold_api_dot_com(
    cfg: Dict[str, Any], target_currency: str
) -> Tuple[float, str, Dict[str, Any], Optional[float], Optional[float], Optional[float], Optional[float]]:
    """
    gold-api.com: returns XAU spot in USD per troy ounce.
    Daily OHLC not provided -> fallback to spot.
    """
    data = _http_get(cfg, "https://api.gold-api.com/price/XAU")
    if _looks_like_rate_limit(data):
        raise RateLimited("Rate limit payload (gold-api.com)")

    price_usd_oz = data.get("price")
    if price_usd_oz is None:
        raise ProviderError("No 'price' field (gold-api.com): %s" % data)

    price_usd_oz = float(price_usd_oz)

    if target_currency.upper() == "USD":
        spot = price_usd_oz
        return spot, "USD", data, None, spot, spot, spot  # O,H,C = spot

    fx = fx_usd_to(cfg, target_currency)
    spot = price_usd_oz * fx
    wrapped = {"source_payload": data, "fx_usd_to_target": fx, "target_currency": target_currency.upper()}
    return spot, target_currency.upper(), wrapped, fx, spot, spot, spot


def fetch_goldapi_net(
    cfg: Dict[str, Any], target_currency: str
) -> Tuple[float, str, Dict[str, Any], Optional[float], Optional[float], Optional[float], Optional[float]]:
    """
    goldapi.net: returns XAU in chosen currency, requires key.
    Daily OHLC not provided -> fallback to spot.
    """
    if not cfg["GOLDAPI_NET_KEY"]:
        raise ProviderError("GOLDAPI_KEY missing for goldapi.net")

    url = "https://app.goldapi.net/price/XAU/%s" % target_currency.upper()
    data = _http_get(cfg, url, params={"x-api-key": cfg["GOLDAPI_NET_KEY"]})
    if _looks_like_rate_limit(data):
        raise RateLimited("Rate limit payload (goldapi.net)")

    price = data.get("price")
    if price is None:
        raise ProviderError("No 'price' field (goldapi.net): %s" % data)

    spot = float(price)
    return spot, target_currency.upper(), data, None, spot, spot, spot


def _metals_api_pick_ohlc_value(rates: Dict[str, Any], key: str, symbol: str) -> Optional[float]:
    """
    Metals-API OHLC endpoint formats differ in the wild.
    Try:
      - rates[key] == float
      - rates[key] == {"EUR": float, ...}
    """
    v = rates.get(key)
    if v is None:
        return None
    if isinstance(v, (int, float)):
        return float(v)
    if isinstance(v, dict):
        vv = v.get(symbol)
        return None if vv is None else float(vv)
    return None


def fetch_metals_api(
    cfg: Dict[str, Any], target_currency: str
) -> Tuple[float, str, Dict[str, Any], Optional[float], Optional[float], Optional[float], Optional[float]]:
    """
    metals-api.com: base=USD symbols=XAU; commonly rates.XAU = XAU per USD, so USD per XAU = 1/rates.XAU.
    If enabled, also tries the daily OHLC endpoint for open/high/close (fallback to spot).
    """
    if not cfg["METALS_API_KEY"]:
        raise ProviderError("METALS_API_KEY missing for metals-api.com")

    latest = _http_get(
        cfg,
        "https://metals-api.com/api/latest",
        params={"access_key": cfg["METALS_API_KEY"], "base": "USD", "symbols": "XAU"},
    )

    if not latest.get("success", True) and "error" in latest:
        if _looks_like_rate_limit(latest):
            raise RateLimited("Rate limit payload (metals-api.com)")
        raise ProviderError("metals-api error: %s" % latest.get("error"))

    xau = latest.get("rates", {}).get("XAU")
    if xau is None:
        raise ProviderError("No rates.XAU (metals-api.com): %s" % latest)

    usd_per_xau = 1.0 / float(xau)

    fx: Optional[float] = None
    symbol = target_currency.upper()
    if symbol == "USD":
        spot = usd_per_xau
    else:
        fx = fx_usd_to(cfg, symbol)
        spot = usd_per_xau * fx

    # Defaults: fallback to spot
    day_open = spot
    day_high = spot
    day_close = spot
    ohlc_payload: Optional[Dict[str, Any]] = None

    if cfg.get("INCLUDE_DAILY_OHLC", True):
        try:
            date_iso = datetime.now(timezone.utc).date().isoformat()
            ohlc_payload = _http_get(
                cfg,
                f"https://metals-api.com/api/open-high-low-close/{date_iso}",
                params={"access_key": cfg["METALS_API_KEY"], "base": "XAU", "symbols": symbol},
            )

            if not ohlc_payload.get("success", True) and "error" in ohlc_payload:
                # niet hard falen; terugvallen op spot
                ohlc_payload = {"error": ohlc_payload.get("error"), "note": "OHLC failed; used spot fallback"}

            rates = (ohlc_payload.get("rates") or {}) if isinstance(ohlc_payload, dict) else {}

            o = _metals_api_pick_ohlc_value(rates, "open", symbol)
            h = _metals_api_pick_ohlc_value(rates, "high", symbol)
            c = _metals_api_pick_ohlc_value(rates, "close", symbol)

            if o is not None:
                day_open = o
            if h is not None:
                day_high = h
            if c is not None:
                day_close = c
        except Exception as e:
            ohlc_payload = {"note": "OHLC exception; used spot fallback", "exception": str(e)}

    wrapped: Dict[str, Any] = {
        "source_payload_latest": latest,
        "source_payload_ohlc": ohlc_payload,
        "note_latest": "spot computed as 1/rates.XAU then optionally USD->target FX",
        "target_currency": symbol,
    }
    if fx is not None:
        wrapped["fx_usd_to_target"] = fx

    return spot, symbol, wrapped, fx, day_open, day_high, day_close


def fetch_metals_dev(
    cfg: Dict[str, Any], target_currency: str
) -> Tuple[float, str, Dict[str, Any], Optional[float], Optional[float], Optional[float], Optional[float]]:
    """
    metals.dev: requires key; response structure can vary, we try multiple field paths.
    Daily OHLC not provided -> fallback to spot.
    """
    if not cfg["METALS_DEV_KEY"]:
        raise ProviderError("METALS_DEV_KEY missing for metals.dev")

    data = _http_get(
        cfg,
        "https://api.metals.dev/v1/metal/spot",
        params={"api_key": cfg["METALS_DEV_KEY"], "metal": "XAU", "currency": target_currency.upper()},
    )

    if _looks_like_rate_limit(data):
        raise RateLimited("Rate limit payload (metals.dev)")

    candidates = [
        ("price",),
        ("rate",),
        ("result", "price"),
        ("data", "price"),
        ("metal", "price"),
    ]

    def get_nested(d: Dict[str, Any], path: Tuple[str, ...]) -> Optional[Any]:
        cur: Any = d
        for k in path:
            if not isinstance(cur, dict) or k not in cur:
                return None
            cur = cur[k]
        return cur

    price = None
    for path in candidates:
        v = get_nested(data, path)
        if v is not None:
            price = v
            break

    if price is None:
        raise ProviderError("Could not find price in metals.dev response: %s" % data)

    spot = float(price)
    return spot, target_currency.upper(), data, None, spot, spot, spot


# =========================
# Orchestrator / Collector
# =========================
def fetch_with_failover(cfg: Dict[str, Any]) -> Quote:
    target_currency = cfg["TARGET_CURRENCY"]
    providers = cfg["PROVIDERS"]

    ts_utc = datetime.now(timezone.utc).isoformat(timespec="seconds")
    errors: List[str] = []

    for p in providers:
        try:
            if p == "gold-api":
                spot_oz, cur, raw, fx, day_open, day_high, day_close = fetch_gold_api_dot_com(cfg, target_currency)
                source = "gold-api.com"
            elif p == "goldapi-net":
                spot_oz, cur, raw, fx, day_open, day_high, day_close = fetch_goldapi_net(cfg, target_currency)
                source = "goldapi.net"
            elif p == "metals-api":
                spot_oz, cur, raw, fx, day_open, day_high, day_close = fetch_metals_api(cfg, target_currency)
                source = "metals-api.com"
            elif p == "metals-dev":
                spot_oz, cur, raw, fx, day_open, day_high, day_close = fetch_metals_dev(cfg, target_currency)
                source = "metals.dev"
            else:
                errors.append("%s: ERROR: unknown provider" % p)
                continue

            price_g = float(spot_oz) / TROY_OZ_TO_GRAM

            return Quote(
                ts_utc=ts_utc,
                source=source,
                currency=cur,
                price_per_troy_oz=float(spot_oz),
                price_per_gram=float(price_g),
                open_per_troy_oz=None if day_open is None else float(day_open),
                high_per_troy_oz=None if day_high is None else float(day_high),
                close_per_troy_oz=None if day_close is None else float(day_close),
                fx_rate_to_target=fx,
                raw_json=raw,
            )

        except RateLimited as e:
            errors.append("%s: RATE_LIMIT: %s" % (p, e))
            continue
        except (requests.RequestException, ProviderError) as e:
            errors.append("%s: ERROR: %s" % (p, e))
            continue

    raise RuntimeError("All providers failed:\n- " + "\n- ".join(errors))


def collect_once() -> Quote:
    cfg = load_config()
    ensure_db(cfg["DB_PATH"])
    q = fetch_with_failover(cfg)
    save_quote(cfg["DB_PATH"], q)

    o = "-" if q.open_per_troy_oz is None else f"{q.open_per_troy_oz:.2f}"
    h = "-" if q.high_per_troy_oz is None else f"{q.high_per_troy_oz:.2f}"
    c = "-" if q.close_per_troy_oz is None else f"{q.close_per_troy_oz:.2f}"

    print(
        "OK %s | %s | %s spot=%.2f/oz | O=%s H=%s C=%s | %.2f/g | DB=%s"
        % (q.ts_utc, q.source, q.currency, q.price_per_troy_oz, o, h, c, q.price_per_gram, cfg["DB_PATH"])
    )
    return q


# =========================
# Periodic runner (Jupyter-friendly)
# =========================
def start_collector(interval_sec: Optional[int] = None) -> None:
    """
    Runs in a background thread (recommended for Jupyter).
    Stop via stop_collector().
    """
    global _worker_thread
    cfg = load_config()

    if interval_sec is None:
        interval_sec = int(cfg["INTERVAL_SEC"])

    if _worker_thread is not None and _worker_thread.is_alive():
        print("[collector] Already running")
        return

    _stop_event.clear()

    def worker():
        while not _stop_event.is_set():
            try:
                collect_once()
            except Exception as e:
                print("[collector] ERROR:", e)
            _stop_event.wait(interval_sec)

    _worker_thread = threading.Thread(target=worker, daemon=True)
    _worker_thread.start()
    print(
        "[collector] Started (interval=%ss, currency=%s, providers=%s)"
        % (interval_sec, cfg["TARGET_CURRENCY"], ",".join(cfg["PROVIDERS"]))
    )


def stop_collector() -> None:
    _stop_event.set()
    print("[collector] Stop signal sent")


# =========================
# Optional: blocking loop (non-thread)
# =========================
def run_forever(interval_sec: Optional[int] = None) -> None:
    """
    Blocking loop (useful outside Jupyter). Stop with KeyboardInterrupt.
    """
    cfg = load_config()
    if interval_sec is None:
        interval_sec = int(cfg["INTERVAL_SEC"])

    print("[collector] Running forever (interval=%ss). Ctrl+C to stop." % interval_sec)
    while True:
        collect_once()
        threading.Event().wait(interval_sec)


# =========================
# Defaults for your use-case (USD)
# =========================
# You can set these in Jupyter before calling start_collector()/collect_once()
# os.environ["GOLD_CURRENCY"] = "USD"
# os.environ["GOLD_PROVIDERS"] = "gold-api,goldapi-net,metals-api,metals-dev"
# os.environ["GOLD_DB_PATH"] = "gold_prices.sqlite"
# os.environ["GOLD_INTERVAL_SEC"] = "60"
# os.environ["GOLD_INCLUDE_DAILY_OHLC"] = "1"




In [None]:
import os
os.environ["GOLD_CURRENCY"] = "USD"
os.environ["GOLD_PROVIDERS"] = "gold-api,goldapi-net,metals-api,metals-dev"
os.environ["GOLD_DB_PATH"] = "gold_prices.sqlite"
os.environ["GOLD_INTERVAL_SEC"] = "60"  # elke minuut


OK 2025-12-22T08:01:30+00:00 | gold-api.com | USD spot=4415.50/oz | O=4415.50 H=4415.50 C=4415.50 | 141.96/g | DB=gold_prices.sqlite
OK 2025-12-22T08:02:24+00:00 | gold-api.com | USD spot=4412.00/oz | O=4412.00 H=4412.00 C=4412.00 | 141.85/g | DB=gold_prices.sqlite
OK 2025-12-22T08:02:30+00:00 | gold-api.com | USD spot=4412.00/oz | O=4412.00 H=4412.00 C=4412.00 | 141.85/g | DB=gold_prices.sqlite
OK 2025-12-22T08:03:24+00:00 | gold-api.com | USD spot=4412.00/oz | O=4412.00 H=4412.00 C=4412.00 | 141.85/g | DB=gold_prices.sqlite
OK 2025-12-22T08:03:30+00:00 | gold-api.com | USD spot=4412.00/oz | O=4412.00 H=4412.00 C=4412.00 | 141.85/g | DB=gold_prices.sqlite
OK 2025-12-22T08:04:24+00:00 | gold-api.com | USD spot=4413.80/oz | O=4413.80 H=4413.80 C=4413.80 | 141.91/g | DB=gold_prices.sqlite
OK 2025-12-22T08:04:30+00:00 | gold-api.com | USD spot=4413.00/oz | O=4413.00 H=4413.00 C=4413.00 | 141.88/g | DB=gold_prices.sqlite
OK 2025-12-22T08:05:25+00:00 | gold-api.com | USD spot=4413.00/oz | O

In [None]:
start_collector()  # pakt GOLD_INTERVAL_SEC


In [None]:
stop_collector()


In [None]:
stop_collector() 
start_collector()