# Scrape Americas Stocks and Fundamentals -- Financial Modeling Prep (FMP) API

End‑to‑end pipeline to build an Americas universe (tickers + profiles), fetch quotes and key metrics/ratios, and persist everything into DuckDB.

- Data sources (FMP stable APIs):
  - Exchanges: https://financialmodelingprep.com/stable/available-exchanges
  - Stock list/profiles (paged bulk): https://financialmodelingprep.com/stable/profile-bulk?part=0
  - EOD bulk (daily): https://financialmodelingprep.com/stable/eod-bulk?date=YYYY-MM-DD
  - Key metrics: https://financialmodelingprep.com/stable/key-metrics?symbol=AAPL&period=quarter&limit=100
  - Ratios: https://financialmodelingprep.com/stable/ratios?symbol=AAPL&period=quarter&limit=100
  - Index list: https://financialmodelingprep.com/stable/index-list
  - Index quotes (light): https://financialmodelingprep.com/stable/historical-price-eod/light

- Inputs
  - Optional environment variable FMP_API_KEY to raise rate limits: export FMP_API_KEY=your_key

- Outputs (DuckDB: americas.db)
  - exchanges, profiles, tickers, quotes, key_metrics, indices, index_quotes

- Run order
  1) Setup and exchanges filter  2) Profiles (paged) → tickers  3) Load profiles
  4) EOD bulk quotes  5) Load quotes  6) Key metrics + ratios → load
  7) Exchanges table  8) Indices + quotes → load

Notes
- “Americas” classification uses FMP exchange metadata (region/country) and symbol suffix mapping; U.S. tickers with no suffix are included.
- Network calls use simple retry/backoff and thread pools to balance speed and rate limits.

## Setup

- Optionally set FMP_API_KEY in your shell to improve quotas: export FMP_API_KEY=your_key.
- All outputs are persisted into DuckDB (americas.db). No static files are read or written by this notebook.

In [None]:
import os, requests
API_KEY=os.getenv("FMP_API_KEY")
params={"apikey":API_KEY} if API_KEY else {}

# Fetch exchange data with better error handling
try:
    raw=requests.get("https://financialmodelingprep.com/stable/available-exchanges",params=params,timeout=30).json()
    print(f"API response type: {type(raw)}")
    
    # Handle both potential response formats
    if isinstance(raw, dict):
        # Extract list from dictionary if possible
        list_found = False
        for k, v in raw.items():
            if isinstance(v, list) and len(v) > 0:
                raw = v
                list_found = True
                print(f"Found list in key '{k}' with {len(v)} items")
                break
        if not list_found:
            print("No list found in response dictionary, using empty list")
            raw = []
    elif not isinstance(raw, list):
        print(f"Unexpected response type: {type(raw)}, using empty list")
        raw = []
        
    # Log response size for debugging
    if isinstance(raw, list):
        print(f"Processing list with {len(raw)} items")
except Exception as e:
    print(f"API request failed: {e}")
    raw = []

ex_data = raw
SUFFIX_TO_EXCHANGE = {}
if isinstance(ex_data, list) and len(ex_data) > 0:
    SUFFIX_TO_EXCHANGE = { 
        (i.get("symbolSuffix") or "").strip().upper(): (i.get("exchange") or "").strip().upper() 
        for i in ex_data 
        if (i.get("symbolSuffix") or "").strip() and (i.get("exchange") or "").strip() 
    }
    print(f"Created SUFFIX_TO_EXCHANGE mapping with {len(SUFFIX_TO_EXCHANGE)} entries")

# Infer Americas exchanges from API (no file reads)
CTRY={"UNITED STATES","CANADA","BRAZIL","MEXICO","ARGENTINA","CHILE","PERU","COLOMBIA","VENEZUELA","URUGUAY","PARAGUAY","BOLIVIA","ECUADOR","GUYANA","SURINAME","FRENCH GUIANA","JAMAICA","TRINIDAD AND TOBAGO","TRINIDAD & TOBAGO","BARBADOS","BAHAMAS","BERMUDA","CAYMAN ISLANDS","PANAMA","COSTA RICA","GUATEMALA","HONDURAS","EL SALVADOR","NICARAGUA","DOMINICAN REPUBLIC","HAITI","PUERTO RICO","BELIZE","CURACAO","ARUBA","SAINT LUCIA","GRENADA","ST. VINCENT AND THE GRENADINES"}
ISO2={"US","CA","BR","MX","AR","CL","PE","CO","VE","UY","PY","BO","EC","GY","SR","GF","JM","TT","BB","BS","BM","KY","PA","CR","GT","HN","SV","NI","DO","H","PR","BZ","LC","GD","VC","CW","AW"}

def _amer_exchange(rec: dict)->bool:
    for k in ("region","continent"):
        v=rec.get(k)
        if isinstance(v,str) and "AMERICA" in v.upper(): return True
    for k in ("country","countryName","countryCode","country_code","country_iso2"):
        v=rec.get(k); vu=v.strip().upper() if isinstance(v,str) else ""
        if vu and ((len(vu)<=3 and vu in ISO2) or vu in CTRY or "UNITED STATES" in vu or "LATIN AMERICA" in vu): return True
    return False

EXCHANGES_AMERICAS=set()
for it in ex_data:
    if _amer_exchange(it):
        exch=(it.get("exchange") or "").strip(); acr=(it.get("acronym") or "").strip(); mic=(it.get("mic") or "").strip()
        if exch: EXCHANGES_AMERICAS.add(exch.upper())
        elif acr or mic: EXCHANGES_AMERICAS.add((acr or mic).upper())

# Fallback: infer from known U.S./Americas exchange acronyms in the exchange name map
if not EXCHANGES_AMERICAS:
    for _, exch in SUFFIX_TO_EXCHANGE.items():
        eu=exch.upper()
        if any(x in eu for x in ("NAS","NYS","ARC","BATS","OTC","TSX","TSXV","CSE","BOV","MEX","XBOV","XMEX")):
            EXCHANGES_AMERICAS.add(eu)

print(f"Identified {len(EXCHANGES_AMERICAS)} Americas exchanges: {', '.join(sorted(EXCHANGES_AMERICAS))}")
if not EXCHANGES_AMERICAS:
    # Use hardcoded fallback instead of raising error to allow continuing execution
    print("WARNING: Could not infer any Americas exchanges from API payload. Using hardcoded fallbacks.")
    EXCHANGES_AMERICAS = {"NYSE","NASDAQ","AMEX","ARCX","NYS","NAS","ARC","BATS","OTC","TSX","TSXV","CSE"}
    print(f"Using fallback exchanges: {', '.join(sorted(EXCHANGES_AMERICAS))}")

ValueError: Unexpected exchanges payload type: <class 'dict'>

## Profiles (paged) → Universe

Pull paged company profiles and keep only Americas listings (via symbol suffix→exchange map). Limit the investable universe to marketCap ≥ 1B and persist to DuckDB.

In [None]:
import os, json, time, requests, polars as pl
from io import StringIO
from pathlib import Path

API_KEY=os.getenv("FMP_API_KEY")

def fetch_profiles_paged(api_key: str|None=None, start_part: int=0, max_parts: int|None=None, sleep_s: float=0.0, max_retries: int=3, verbose: bool=False)->pl.DataFrame:
    key=api_key or API_KEY; params={"apikey":key} if key else {}
    # ensure exchange context
    global SUFFIX_TO_EXCHANGE, EXCHANGES_AMERICAS
    if 'SUFFIX_TO_EXCHANGE' not in globals() or 'EXCHANGES_AMERICAS' not in globals():
        try:
            ex_data=requests.get("https://financialmodelingprep.com/stable/available-exchanges",params=params,timeout=60).json()
            ex_data = ex_data if isinstance(ex_data,list) else []
        except Exception: ex_data=[]
        SUFFIX_TO_EXCHANGE = 'SUFFIX_TO_EXCHANGE' in globals() and SUFFIX_TO_EXCHANGE or { (i.get("symbolSuffix") or "").strip().upper(): (i.get("exchange") or "").strip().upper() for i in ex_data if (i.get("symbolSuffix") or "").strip() and (i.get("exchange") or "").strip() }
        # Infer amer exchanges from payload directly (no file reads)
        CTRY={"UNITED STATES","CANADA","BRAZIL","MEXICO","ARGENTINA","CHILE","PERU","COLOMBIA","VENEZUELA","URUGUAY","PARAGUAY","BOLIVIA","ECUADOR","GUYANA","SURINAME","FRENCH GUIANA","JAMAICA","TRINIDAD AND TOBAGO","TRINIDAD & TOBAGO","BARBADOS","BAHAMAS","BERMUDA","CAYMAN ISLANDS","PANAMA","COSTA RICA","GUATEMALA","HONDURAS","EL SALVADOR","NICARAGUA","DOMINICAN REPUBLIC","HAITI","PUERTO RICO","BELIZE","CURACAO","ARUBA","SAINT LUCIA","GRENADA","ST. VINCENT AND THE GRENADINES"}
        ISO2={"US","CA","BR","MX","AR","CL","PE","CO","VE","UY","PY","BO","EC","GY","SR","GF","JM","TT","BB","BS","BM","KY","PA","CR","GT","HN","SV","NI","DO","H","PR","BZ","LC","GD","VC","CW","AW"}
        def amer(r):
            if any(isinstance(r.get(k), str) and "AMERICA" in r[k].upper() for k in ("region","continent")): return True
            for k in ("country","countryCode","country_code","country_iso2","countryName"):
                v=r.get(k); vu=v.strip().upper() if isinstance(v,str) else ""
                if vu and ((len(vu)<=3 and vu in ISO2) or vu in CTRY or "UNITED STATES" in vu or "LATIN AMERICA" in vu): return True
            return False
        EXCHANGES_AMERICAS=set()
        for it in ex_data:
            if amer(it):
                exch=(it.get("exchange") or "").strip(); acr=(it.get("acronym") or "").strip(); mic=(it.get("mic") or "").strip()
                if exch: EXCHANGES_AMERICAS.add(exch.upper())
                elif acr or mic: EXCHANGES_AMERICAS.add((acr or mic).upper())
        if not EXCHANGES_AMERICAS:
            for _, exch in SUFFIX_TO_EXCHANGE.items():
                eu=str(exch).upper()
                if any(x in eu for x in ("NAS","NYS","ARC","BATS","OTC","TSX","TSXV","CSE","BOV","MEX","XBOV","XMEX")):
                    EXCHANGES_AMERICAS.add(eu)
    _is_amer=lambda s: isinstance(s,str) and (SUFFIX_TO_EXCHANGE.get(("."+s.rsplit(".",1)[-1]).upper()) in EXCHANGES_AMERICAS if "." in s else True)

    def _parse(txt: str)->list[dict]:
        s=txt.lstrip()
        if s.startswith('['):
            try: return json.loads(s)
            except Exception: pass
        norm=txt.replace('}{','}\n{'); rec=[]
        for ln in (l.strip() for l in norm.splitlines() if l.strip()):
            i=ln.find('{'); ln=ln[i:] if i>=0 else ln
            if ln.startswith('{'):
                try: o=json.loads(ln); rec.append(o) if isinstance(o,dict) else None
                except Exception: pass
        if rec: return rec
        if ',' in txt and '\n' in txt:
            try: return pl.read_csv(StringIO(txt), ignore_errors=True).to_dicts()
            except Exception: pass
            try:
                import pandas as pd; return pl.from_pandas(pd.read_csv(StringIO(txt), engine="python", dtype=str, on_bad_lines="skip").fillna("")).to_dicts()
            except Exception: pass
            try:
                import csv; return [dict({k:(v or "") for k,v in r.items()}) for r in csv.DictReader(StringIO(txt))]
            except Exception: return []
        return []

    def _to_pl(records: list[dict])->pl.DataFrame:
        if not records: return pl.DataFrame()
        keys=list({k for r in records for k in r.keys()}); norm=[{k:r.get(k,None) for k in keys} for r in records]
        try: return pl.DataFrame(norm, schema_overrides={k:pl.Utf8 for k in keys}, strict=False, infer_schema_length=len(norm))
        except Exception:
            try:
                import pandas as pd; return pl.from_pandas(pd.DataFrame(norm).astype("string").fillna(""))
            except Exception: return pl.DataFrame()

    sess=requests.Session(); url="https://financialmodelingprep.com/stable/profile-bulk"; frames=[]; part=start_part
    while True:
        if max_parts is not None and part>=start_part+max_parts: break
        q={"part":part,"datatype":"json",**params}; attempt=0
        while True:
            try:
                r=sess.get(url,params=q,timeout=120); status=r.status_code
                if verbose: print(f"GET {r.url[:80]}... -> {status}")
                if status in (429,) or status>=500:
                    if attempt<max_retries: time.sleep((sleep_s or 0.5)*(2**attempt)); attempt+=1; continue
                if status==403: return pl.DataFrame()
                r.raise_for_status(); data=_parse(r.text)
            except Exception:
                return pl.DataFrame() if not frames else pl.concat(frames, how="vertical_relaxed").unique(subset=["symbol"], keep="last")
            break
        if not isinstance(data,list): break
        if not data:
            if r.text.strip(): part+=1; time.sleep(sleep_s) if sleep_s>0 else None; continue
            break
        df=_to_pl(data)
        if df.is_empty(): part+=1; continue
        if "Symbol" in df.columns and "symbol" not in df.columns: df=df.rename({"Symbol":"symbol"})
        df=df.filter(pl.col("symbol").map_elements(_is_amer, return_dtype=pl.Boolean)) if "symbol" in df.columns else pl.DataFrame()
        if not df.is_empty():
            casts=[]
            for c in ("price","beta","lastDiv","lastDividend","change","changes","changePercentage","changesPercentage"): casts.append(pl.col(c).cast(pl.Float64, strict=False)) if c in df.columns else None
            for c in ("marketCap","mktCap","volume","volAvg","avgVolume","averageVolume"): casts.append(pl.col(c).cast(pl.Int64, strict=False)) if c in df.columns else None
            df=df.with_columns(casts) if casts else df; frames.append(df)
        part+=1; time.sleep(sleep_s) if sleep_s>0 else None
    return pl.DataFrame() if not frames else pl.concat(frames, how="vertical_relaxed").unique(subset=["symbol"], keep="last")

profiles_df=fetch_profiles_paged(api_key=os.getenv("FMP_API_KEY"))
profiles_df=profiles_df.filter(pl.col("marketCap").cast(pl.Int64, strict=False) >= 1_000_000_000)
tickers = profiles_df.select("symbol").unique().sort("symbol")
print(profiles_df.shape)
print(profiles_df.head())

(11796, 36)
shape: (5, 36)
┌────────┬──────────────┬─────────┬───────┬───┬──────────────┬──────────────┬──────────────┬───────┐
│ change ┆ city         ┆ volume  ┆ isAdr ┆ … ┆ image        ┆ companyName  ┆ exchangeFull ┆ price │
│ ---    ┆ ---          ┆ ---     ┆ ---   ┆   ┆ ---          ┆ ---          ┆ Name         ┆ ---   │
│ f64    ┆ str          ┆ i64     ┆ str   ┆   ┆ str          ┆ str          ┆ ---          ┆ f64   │
│        ┆              ┆         ┆       ┆   ┆              ┆              ┆ str          ┆       │
╞════════╪══════════════╪═════════╪═══════╪═══╪══════════════╪══════════════╪══════════════╪═══════╡
│ 0.04   ┆ Denver       ┆ 4694636 ┆ false ┆ … ┆ https://imag ┆ Summit       ┆ New York     ┆ 52.49 │
│        ┆              ┆         ┆       ┆   ┆ es.financial ┆ Materials,   ┆ Stock        ┆       │
│        ┆              ┆         ┆       ┆   ┆ modeli…      ┆ Inc.         ┆ Exchange     ┆       │
│ 0.02   ┆ EmeryVille   ┆ 1633929 ┆ false ┆ … ┆ https://imag ┆ D

### Save tickers → DuckDB
Persist unique investable symbols into americas.db.tickers for downstream joins.

In [None]:
import duckdb, polars as pl
# Incremental load for tickers (unique by symbol)
if 'tickers' in globals() and isinstance(tickers, pl.DataFrame) and not tickers.is_empty():
    con = duckdb.connect('americas.db')
    # Register incoming dataframe
    con.register('tickers_view', tickers.to_pandas())
    # Create table if it does not exist (empty schema clone)
    con.sql("""
        CREATE TABLE IF NOT EXISTS tickers AS
        SELECT * FROM tickers_view LIMIT 0
    """)
    # Insert only new symbols
    con.sql("""
        INSERT INTO tickers
        SELECT t.*
        FROM tickers_view t
        WHERE NOT EXISTS (
            SELECT 1 FROM tickers x WHERE x.symbol = t.symbol
        )
    """)
    # (Optional) collect count of new rows inserted in this run
    new_count = con.sql("SELECT COUNT(*) AS c FROM tickers_view WHERE symbol NOT IN (SELECT symbol FROM tickers)").fetchone()[0] if False else None
    con.close()
else:
    print("No tickers to save; skipping DuckDB load.")

In [None]:
import duckdb, polars as pl
# Incremental load for profiles (unique by symbol) with robust dynamic casting
if 'profiles_df' in globals() and isinstance(profiles_df, pl.DataFrame) and not profiles_df.is_empty():
    cap_col = 'marketCap' if 'marketCap' in profiles_df.columns else ('mktCap' if 'mktCap' in profiles_df.columns else None)
    if cap_col:
        # Filter investable universe
        filtered = profiles_df.filter(pl.col(cap_col).cast(pl.Int64, strict=False) >= 1_000_000_000)
        if not filtered.is_empty():
            con = duckdb.connect('americas.db')
            table_exists = False
            try:
                table_exists = bool(con.execute("SELECT 1 FROM information_schema.tables WHERE table_name='profiles'").fetchone())
            except Exception:
                table_exists = False

            if not table_exists:
                # Detect boolean-like columns (string reps of true/false only)
                bool_like = []
                for c in filtered.columns:
                    try:
                        vals = filtered.select(pl.col(c).cast(pl.Utf8, strict=False).str.to_lowercase().drop_nulls().unique()).to_series().to_list()
                        if vals and all(v in ('true','false') for v in vals):
                            bool_like.append(c)
                    except Exception:
                        pass
                if bool_like:
                    casts = [
                        pl.when(pl.col(c).cast(pl.Utf8, strict=False).str.to_lowercase()=="true").then(pl.lit(1))
                          .when(pl.col(c).cast(pl.Utf8, strict=False).str.to_lowercase()=="false").then(pl.lit(0))
                          .otherwise(pl.lit(None)).alias(c)
                        for c in bool_like
                    ]
                    filtered = filtered.with_columns(casts)
                # Basic numeric casts for common fields
                float_cols = [c for c in ("price","beta","lastDiv","lastDividend","change","changes","changePercentage","changesPercentage") if c in filtered.columns]
                int_cols = [c for c in ("marketCap","mktCap","volume","volAvg","avgVolume","averageVolume") if c in filtered.columns]
                casts = [pl.col(c).cast(pl.Float64, strict=False) for c in float_cols] + [pl.col(c).cast(pl.Int64, strict=False) for c in int_cols]
                if casts:
                    filtered = filtered.with_columns(casts)
                # Create table schema clone + initial load
                con.register('profiles_incoming_initial', filtered.to_pandas())
                con.sql("""
                    CREATE TABLE IF NOT EXISTS profiles AS
                    SELECT * FROM profiles_incoming_initial LIMIT 0
                """)
                con.sql("INSERT INTO profiles SELECT * FROM profiles_incoming_initial")
                con.close()
            else:
                # Existing table: build insert aligned to destination schema while avoiding lower() on numeric sources
                con.register('profiles_incoming_raw', filtered.to_pandas())
                schema_rows = con.execute("PRAGMA table_info('profiles')").fetchall()
                # Map source column polars dtypes to simple strings
                src_type_map = {c: str(t) for c, t in zip(filtered.columns, filtered.dtypes)}
                numeric_prefixes = ("Int","UInt","Float","Decimal")
                select_exprs = []
                dest_cols = []
                for cid, name, dtype, *_ in schema_rows:
                    dest_cols.append(name)
                    upper_type = (dtype or '').upper()
                    if name == 'symbol':
                        expr = f"p.{name} AS {name}"
                    elif name in filtered.columns:
                        src_t = src_type_map.get(name, "")
                        is_src_numeric = any(src_t.startswith(pref) for pref in numeric_prefixes)
                        if 'INT' in upper_type:
                            if is_src_numeric:
                                # Source already numeric -> direct cast
                                expr = f"TRY_CAST(p.{name} AS {upper_type}) AS {name}"
                            else:
                                # Source textual -> handle boolean-like strings then try numeric cast
                                expr = (
                                    f"CASE WHEN lower(CAST(p.{name} AS VARCHAR))='true' THEN 1 "
                                    f"WHEN lower(CAST(p.{name} AS VARCHAR))='false' THEN 0 "
                                    f"ELSE TRY_CAST(p.{name} AS {upper_type}) END AS {name}"
                                )
                        elif any(t in upper_type for t in ['DOUBLE','FLOAT','REAL','DECIMAL']):
                            expr = f"TRY_CAST(p.{name} AS {upper_type}) AS {name}"
                        else:
                            # Leave as-is
                            expr = f"p.{name} AS {name}"
                    else:
                        null_cast_type = upper_type if upper_type else 'VARCHAR'
                        expr = f"CAST(NULL AS {null_cast_type}) AS {name}"
                    select_exprs.append(expr)
                insert_sql = f"""
                    INSERT INTO profiles ({','.join(dest_cols)})
                    SELECT {','.join(select_exprs)}
                    FROM profiles_incoming_raw p
                    WHERE NOT EXISTS (
                        SELECT 1 FROM profiles x WHERE x.symbol = p.symbol
                    )
                """
                try:
                    con.sql(insert_sql)
                except Exception as e:
                    print(f"Incremental insert failed: {e}")
                con.close()
        else:
            print("No profiles with marketCap >= 1,000,000,000; skipping DuckDB load")
    else:
        print("No market cap column found; skipping DuckDB load for profiles")
else:
    print("profiles_df is empty or undefined; skipping DuckDB load")

## EOD Bulk Quotes (2010‑01‑01 → today)

Parallel fetch daily bulk EOD, filter to Americas + investable tickers, and stage for DuckDB.

In [None]:
# Parallel EOD Bulk (full or incremental range), filtered by `tickers` + logging
import os, requests, polars as pl, concurrent.futures as cf, logging, threading
from io import StringIO
from datetime import date as _date, timedelta
from requests.adapters import HTTPAdapter

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("eod")

API_KEY=os.getenv("FMP_API_KEY"); params={"apikey":API_KEY} if API_KEY else {}
SESSION = requests.Session()
# Enlarge connection pool to avoid 'Connection pool is full' warnings under concurrency
try:
    ADAPTER = HTTPAdapter(pool_connections=128, pool_maxsize=128)
    SESSION.mount('https://', ADAPTER); SESSION.mount('http://', ADAPTER)
except Exception:
    pass

# Ensure filter context exists even if setup cells weren't run
if 'SUFFIX_TO_EXCHANGE' not in globals(): SUFFIX_TO_EXCHANGE = {}
if 'EXCHANGES_AMERICAS' not in globals(): EXCHANGES_AMERICAS = set()

# business-day date range
def date_range(start: str, end: str, weekdays_only: bool=True):
    y,m,d=map(int,start.split("-")); ye,me,de=map(int,end.split("-")); dt=_date(y,m,d); end_dt=_date(ye,me,de)
    while dt<=end_dt:
        if not weekdays_only or dt.weekday()<5: yield dt.isoformat()
        dt+=timedelta(days=1)

_is_amer=lambda s: isinstance(s,str) and (SUFFIX_TO_EXCHANGE.get(("."+s.rsplit(".",1)[-1]).upper()) in EXCHANGES_AMERICAS if "." in s else True)

# daily fetch with retry/backoff
def fetch_one_day(ds:str, allowed_symbols: set[str]|None=None, max_retries:int=3)->pl.DataFrame:
    attempt=0
    while True:
        try:
            r=SESSION.get(f"https://financialmodelingprep.com/stable/eod-bulk?date={ds}", params=params, timeout=120); status=r.status_code
            if status in (429,) or status>=500:
                if attempt<max_retries:
                    import time; log.warning(f"{ds} -> {status}, retry {attempt+1}/{max_retries}")
                    time.sleep(0.5*(2**attempt)); attempt+=1; continue
            r.raise_for_status()
            # Read potentially mixed-type numeric columns as strings to avoid parse errors, then cast below
            df=pl.read_csv(
                StringIO(r.text),
                try_parse_dates=False,
                schema_overrides={"open": pl.Utf8, "high": pl.Utf8, "low": pl.Utf8, "close": pl.Utf8, "adjClose": pl.Utf8, "volume": pl.Utf8},
                infer_schema_length=1000,
            )
            if df.is_empty(): log.debug(f"{ds} -> 0 rows"); return df
            n0=len(df); df=df.filter(pl.col("symbol").map_elements(_is_amer, return_dtype=pl.Boolean)); n1=len(df)
            if df.is_empty(): log.debug(f"{ds} -> amer 0/{n0}"); return df
            if allowed_symbols: df=df.filter(pl.col("symbol").is_in(allowed_symbols)); n2=len(df)
            else: n2=n1
            log.debug(f"{ds} -> raw={n0} amer={n1} allowed={n2}")
            return df.with_columns([
                pl.col("date").cast(pl.Utf8).str.to_date("%Y-%m-%d"),
                pl.col("open").cast(pl.Float64, strict=False), pl.col("high").cast(pl.Float64, strict=False),
                pl.col("low").cast(pl.Float64, strict=False), pl.col("close").cast(pl.Float64, strict=False),
                pl.col("adjClose").cast(pl.Float64, strict=False),
                # Cast volume via Float -> rounded Int to handle occasional fractional values
                pl.col("volume").cast(pl.Float64, strict=False).round(0).cast(pl.Int64, strict=False)
            ])
        except Exception as e:
            if attempt<max_retries:
                import time; log.warning(f"{ds} -> error {e!r}, retry {attempt+1}/{max_retries}")
                time.sleep(0.5*(2**attempt)); attempt+=1; continue
            log.error(f"{ds} -> failed after {max_retries} retries: {e!r}")
            return pl.DataFrame()

# Parallel over full or supplied range, with optional streaming insert
def parallel_fetch_eod_bulk_range(start_date:str, end_date:str,
                                  max_workers:int|None=None,
                                  weekdays_only:bool=True,
                                  allowed_symbols: set[str] | None = None,
                                  days: list[str] | None = None,
                                  skip_existing: bool = True,
                                  existing_dates: set[str] | None = None,
                                  insert_into_duckdb: bool = False,
                                  duckdb_path: str = 'americas.db') -> pl.DataFrame:
    """Fetch EOD bulk data.
    Optimizations:
      - pass precomputed days list
      - skip days already in quotes table (existing_dates)
      - optionally stream each fetched day into DuckDB (idempotent insert on (date,symbol))
    """
    all_days = days if days is not None else list(date_range(start_date, end_date, weekdays_only=weekdays_only))
    if skip_existing and existing_dates:
        fetch_days=[d for d in all_days if d not in existing_dates]
    else:
        fetch_days=all_days
    if not fetch_days:
        log.info("No new days to fetch (incremental). Returning empty DataFrame.")
        return pl.DataFrame()
    if max_workers is None:
        import os as _os; max_workers=min(32, max(8, (_os.cpu_count() or 8)*2))
    log.info(f"EOD bulk {fetch_days[0]}→{fetch_days[-1]}: {len(fetch_days)} new days (of {len(all_days)} total), workers={max_workers}, symbols={'all' if not allowed_symbols else len(allowed_symbols)}")

    con = None
    lock = threading.Lock()
    if insert_into_duckdb:
        import duckdb
        con = duckdb.connect(duckdb_path)
        # Ensure table exists
        con.execute("CREATE TABLE IF NOT EXISTS quotes (date DATE, symbol VARCHAR, open DOUBLE, high DOUBLE, low DOUBLE, close DOUBLE, adjClose DOUBLE, volume BIGINT)")
        # Create composite index (DuckDB 1.0 lacks indexes; rely on NOT EXISTS checks later)

    frames=[]; done=0; step=max(1, len(fetch_days)//20)
    with cf.ThreadPoolExecutor(max_workers=max_workers) as ex:
        futs={ex.submit(fetch_one_day, ds, allowed_symbols): ds for ds in fetch_days}
        for fut in cf.as_completed(futs):
            ds=futs[fut]
            f=fut.result(); done+=1
            if isinstance(f, pl.DataFrame) and not f.is_empty():
                if insert_into_duckdb and con is not None:
                    # Insert only new (date,symbol)
                    try:
                        import duckdb
                        with lock:
                            con.register('__new_day', f.to_pandas())
                            con.execute("""
                                INSERT INTO quotes
                                SELECT n.* FROM __new_day n
                                WHERE NOT EXISTS (
                                    SELECT 1 FROM quotes q WHERE q.date = n.date AND q.symbol = n.symbol
                                )
                            """)
                            con.unregister('__new_day')
                    except Exception as e:
                        log.warning(f"DuckDB insert failed for {ds}: {e}")
                else:
                    frames.append(f)
            if done%step==0 or done==len(fetch_days): log.info(f"Progress {done}/{len(fetch_days)} days (fetched {len(frames)} non-empty frames)")
    if con is not None:
        con.close()
    out = pl.DataFrame() if not frames else pl.concat(frames, how="vertical_relaxed").unique(subset=["date","symbol"], keep="last").sort(["date","symbol"])
    log.info(f"Combined rows (not counting already inserted days): {0 if out.is_empty() else out.height}")
    return out

### Extract Dates

In [None]:
import duckdb as db
from datetime import date  # or: from datetime import date as _date

con = db.connect('americas.db')

# list tables
tables = con.sql("SHOW TABLES").fetchall()
print("Tables in the database:", tables)

# get last quote date if table exists
max_date = None
if any(t[0] == 'quotes' for t in tables):
    max_date = con.sql("SELECT MAX(date) AS max_date FROM quotes").fetchone()[0]

# Ensure string ISO format (DuckDB may return date/datetime object)
if max_date:
    # If you want to resume AFTER last stored day uncomment next line and import timedelta:
    # from datetime import timedelta; max_date = (max_date + timedelta(days=1))
    start_date = (max_date.date().isoformat() if hasattr(max_date, "date") else max_date.isoformat()) if hasattr(max_date, "isoformat") else str(max_date)
else:
    start_date = '2010-01-01'

end_date = date.today().isoformat()
print(f"Fetching EOD bulk from {start_date} to {end_date}")

Tables in the database: [('exchanges',), ('index_quotes',), ('indices',), ('metrics',), ('profiles',), ('quotes',), ('tickers',)]
Fetching EOD bulk from 2025-09-11 to 2025-09-11


In [None]:
# Gather existing quote dates to enable incremental skipping
import duckdb as _db
_existing_dates=set()
try:
    _con=_db.connect('americas.db')
    if _con.execute("SELECT 1 FROM information_schema.tables WHERE table_name='quotes'").fetchone():
        _existing_dates={r[0].isoformat() if hasattr(r[0],'isoformat') else str(r[0]) for r in _con.execute("SELECT DISTINCT date FROM quotes").fetchall()}
    _con.close()
except Exception as e:
    print(f"Could not load existing quote dates: {e}")
print(f"Existing quote days: {len(_existing_dates)}")

Existing quote days: 4095


In [None]:
# Incremental EOD fetch using optimized function with skipping + streaming inserts
import polars as pl
allowed_symbols = set(tickers.get_column("symbol").to_list()) if 'tickers' in globals() else None

# Only fetch new days and stream insert directly to DuckDB (reduces memory and time for long histories)
parallel_month_df = parallel_fetch_eod_bulk_range(
    start_date,
    end_date,
    allowed_symbols=allowed_symbols,
    skip_existing=True,
    existing_dates=_existing_dates,
    insert_into_duckdb=True,  # Stream directly
)

# For quick inspection show just last few rows newly fetched (if any collected in-memory)
print(parallel_month_df.shape)
if not parallel_month_df.is_empty():
    print("Unique Symbols (new batch):", parallel_month_df.select(pl.col("symbol").n_unique()).item())
    print(parallel_month_df.tail())
else:
    print("No new in-memory rows (all inserted directly or nothing new).")

2025-09-11 17:38:41,321 INFO EOD bulk 2025-09-11→2025-09-11: 1 new days (of 1 total), workers=32, symbols=11796
2025-09-11 17:38:44,207 INFO Progress 1/1 days (fetched 0 non-empty frames)
2025-09-11 17:38:44,210 INFO Combined rows (not counting already inserted days): 0


(0, 0)
No new in-memory rows (all inserted directly or nothing new).


### Load quotes → DuckDB
Create or replace americas.db.quotes from the staged EOD bulk dataframe.

In [None]:
import duckdb, polars as pl
if 'parallel_month_df' in globals() and isinstance(parallel_month_df, pl.DataFrame) and not parallel_month_df.is_empty():
    con = duckdb.connect('americas.db')
    con.register('new_quotes', parallel_month_df.to_pandas())

    # Create table if missing
    con.sql("""
        CREATE TABLE IF NOT EXISTS quotes AS
        SELECT * FROM new_quotes LIMIT 0
    """)

    # Insert only rows whose (date,symbol) key not present
    con.sql("""
        INSERT INTO quotes
        SELECT nq.*
        FROM new_quotes nq
        WHERE NOT EXISTS (
            SELECT 1 FROM quotes q
            WHERE q.date = nq.date
              AND q.symbol = nq.symbol
        )
    """)

    con.close()
else:
    print("No new quotes data to load.")

con = db.connect('americas.db')
con.sql("SELECT MAX(date) AS max_date FROM quotes").fetchone()[0]

No new quotes data to load.


datetime.datetime(2025, 9, 11, 0, 0)

## Metrics and Ratios (quarterly)

Parallel fetch per‑symbol key metrics and ratios (last 100 periods), normalize/cast, join on (symbol, date, fiscalYear, period), and load to DuckDB.

In [None]:
# Incremental fetch of Key Metrics + Ratios (quarterly) with date-based skipping
# Added: quarterly lookback logic -> always refetch previous full quarter to capture restatements.
# Example: today=2025-09-01, existing max=2025-07-05 (Q3). We set threshold to start of previous quarter (2025-04-01)
# so we re-fetch Q2 + Q3 data (rows with date >= 2025-04-01). Older quarters are skipped.

import os, time, requests, polars as pl, concurrent.futures as cf, duckdb
from typing import Any, Dict, List
from datetime import date as _date, datetime

API_KEY = os.getenv("FMP_API_KEY")
_common_params = {"apikey": API_KEY} if API_KEY else {}

# ------------------------------------------------------------------
# Helper: symbol universe
# ------------------------------------------------------------------
def load_symbol_universe()->list[str]:
    if 'tickers' in globals() and isinstance(tickers, pl.DataFrame) and 'symbol' in tickers.columns:
        return [s for s in tickers.get_column('symbol').to_list() if isinstance(s,str) and s]
    try:
        con = duckdb.connect('americas.db')
        rows = con.sql("SELECT symbol FROM tickers").fetchall(); con.close()
        return [r[0] for r in rows if isinstance(r[0], str) and r[0]]
    except Exception:
        return []

symbols = load_symbol_universe()
print(f"Key Metrics - total symbols to consider: {len(symbols)}")
if not symbols:
    print("No symbols available; aborting metrics fetch.")
    merged_df = pl.DataFrame()
else:
    # ------------------------------------------------------------------
    # Existing max date (if table exists)
    # ------------------------------------------------------------------
    existing_max_date = None; metrics_table_exists = False
    try:
        con = duckdb.connect('americas.db')
        metrics_table_exists = bool(con.execute("SELECT 1 FROM information_schema.tables WHERE table_name='metrics'").fetchone())
        if metrics_table_exists:
            existing_max_date = con.execute("SELECT max(date) FROM metrics").fetchone()[0]
        con.close()
    except Exception as e:
        print(f"Could not inspect existing metrics table: {e}")
    if existing_max_date:
        existing_max_date = existing_max_date.isoformat() if hasattr(existing_max_date,'isoformat') else str(existing_max_date)
    print(f"Existing metrics max(date): {existing_max_date}")

    # ------------------------------------------------------------------
    # Quarterly lookback threshold determination
    # ------------------------------------------------------------------
    def _quarter_start(d: _date) -> _date:
        return _date(d.year, ((d.month-1)//3)*3 + 1, 1)
    def _prev_quarter_start(d: _date) -> _date:
        qs = _quarter_start(d); m = qs.month - 3; y = qs.year
        if m <= 0: m += 12; y -= 1
        return _date(y, m, 1)

    threshold_date = None
    if existing_max_date:
        try:
            emd = datetime.strptime(existing_max_date, "%Y-%m-%d").date()
            prev_q_start = _prev_quarter_start(emd)
            threshold_date = prev_q_start.isoformat()
        except Exception:
            threshold_date = existing_max_date  # fallback original
    # If no existing data we do full load (threshold_date None)
    print(f"Computed quarterly lookback threshold_date: {threshold_date}")

    # ------------------------------------------------------------------
    # HTTP helpers
    # ------------------------------------------------------------------
    SESSION = requests.Session()
    from requests.adapters import HTTPAdapter
    try: SESSION.mount('https://', HTTPAdapter(pool_connections=64, pool_maxsize=64))
    except Exception: pass

    def _fetch_json(url: str, params: dict, max_retries: int = 3, backoff: float = 0.5):
        attempt = 0
        while True:
            try:
                r = SESSION.get(url, params=params, timeout=60); s = r.status_code
                if s in (429,) or s >= 500:
                    if attempt < max_retries: time.sleep(backoff * (2 ** attempt)); attempt += 1; continue
                if s == 403: return []
                r.raise_for_status(); data = r.json()
                if isinstance(data, dict):
                    for v in data.values():
                        if isinstance(v, list): return v
                    return []
                return data if isinstance(data, list) else []
            except Exception:
                if attempt < max_retries: time.sleep(backoff * (2 ** attempt)); attempt += 1; continue
                return []

    # Probe one symbol to detect latest API date
    latest_api_date = None
    if symbols:
        probe_symbol = symbols[0]
        probe_metrics = _fetch_json("https://financialmodelingprep.com/stable/key-metrics", {"symbol": probe_symbol, "period":"quarter", "limit":1, **_common_params})
        if probe_metrics:
            md = probe_metrics[0].get('date') or probe_metrics[0].get('reportedDate')
            if md: latest_api_date = md.split('T')[0]
        if latest_api_date is None:
            probe_ratios = _fetch_json("https://financialmodelingprep.com/stable/ratios", {"symbol": probe_symbol, "period":"quarter", "limit":1, **_common_params})
            if probe_ratios:
                rd = probe_ratios[0].get('date') or probe_ratios[0].get('reportedDate')
                if rd: latest_api_date = rd.split('T')[0]
    print(f"Latest API available date (probe): {latest_api_date}")

    if metrics_table_exists and existing_max_date and latest_api_date and latest_api_date <= existing_max_date:
        # Even if 'up to date', we still might want to re-fetch previous quarter if threshold_date earlier
        if threshold_date and threshold_date < existing_max_date:
            print("Data current, but performing quarterly lookback refresh.")
        else:
            print("Metrics table already up to date; skipping fetch.")
            merged_df = pl.DataFrame()
    if 'merged_df' not in globals():
        # ------------------------------------------------------------------
        # Per-symbol fetch & merge
        # ------------------------------------------------------------------
        def fetch_symbol(symbol: str) -> pl.DataFrame:
            km = _fetch_json("https://financialmodelingprep.com/stable/key-metrics", {"symbol": symbol, "period":"quarter", "limit":100, **_common_params})
            rt = _fetch_json("https://financialmodelingprep.com/stable/ratios", {"symbol": symbol, "period":"quarter", "limit":100, **_common_params})
            if not km and not rt: return pl.DataFrame()
            def to_df(records: List[Dict[str, Any]]) -> pl.DataFrame:
                if not records: return pl.DataFrame()
                for r in records:
                    r['symbol'] = r.get('symbol', symbol)
                    d = r.get('date') or r.get('reportedDate')
                    if d: r['date'] = d.split('T')[0]
                try: return pl.DataFrame(records)
                except Exception:
                    import pandas as pd; return pl.from_pandas(pd.DataFrame(records))
            km_df = to_df(km); rt_df = to_df(rt)
            if threshold_date and 'date' in km_df.columns and not km_df.is_empty(): km_df = km_df.filter(pl.col('date') >= threshold_date)
            if threshold_date and 'date' in rt_df.columns and not rt_df.is_empty(): rt_df = rt_df.filter(pl.col('date') >= threshold_date)
            if km_df.is_empty() and rt_df.is_empty(): return pl.DataFrame()
            if km_df.is_empty(): out = rt_df
            elif rt_df.is_empty(): out = km_df
            else:
                join_keys = [k for k in ('symbol','date','fiscalYear','period') if k in km_df.columns and k in rt_df.columns] or [k for k in ('symbol','date') if k in km_df.columns and k in rt_df.columns]
                rt_only = [c for c in rt_df.columns if c not in km_df.columns]
                if join_keys:
                    out = km_df.join(rt_df.select(join_keys + rt_only), on=join_keys, how='full')
                else:
                    out = pl.concat([km_df, rt_df], how='vertical_relaxed')
            if 'date' in out.columns:
                out = out.with_columns(pl.col('date').cast(pl.Utf8).str.strptime(pl.Date, "%Y-%m-%d", strict=False))
            return out

        MAX_WORKERS = min(48, max(8, (os.cpu_count() or 8)*2)) if metrics_table_exists else min(32, max(8, (os.cpu_count() or 8)))
        start_ts = time.time(); frames: List[pl.DataFrame] = []; submitted = 0; last_print = 0
        print(f"Fetching metrics for {len(symbols)} symbols (threshold_date={threshold_date}) with {MAX_WORKERS} workers...")
        with cf.ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
            futures = {ex.submit(fetch_symbol, s): s for s in symbols}
            for fut in cf.as_completed(futures):
                sym = futures[fut]; submitted += 1
                try:
                    df = fut.result()
                    if isinstance(df, pl.DataFrame) and not df.is_empty(): frames.append(df)
                except Exception as e:
                    print(f"{sym} failed: {e}")
                if (submitted - last_print) >= 500 or submitted == len(symbols):
                    elapsed = (time.time() - start_ts)/60
                    print(f"Progress: {submitted}/{len(symbols)} symbols, collected {len(frames)} non-empty frames, elapsed {elapsed:.1f}m")
                    last_print = submitted
        if not frames:
            print("No metrics/ratios rows fetched for lookback window.")
            merged_df = pl.DataFrame()
        else:
            # Schema alignment
            union_cols = []
            seen = set()
            for f in frames:
                for c in f.columns:
                    if c not in seen:
                        seen.add(c); union_cols.append(c)
            key_order = [c for c in ('symbol','date','fiscalYear','period') if c in union_cols]
            other_cols = [c for c in union_cols if c not in key_order]
            ordered_cols = key_order + other_cols
            aligned = []
            
            # First pass: determine common data types for numeric columns
            numeric_cols = {}
            for f in frames:
                for col_name, dtype in zip(f.columns, f.dtypes):
                    dtype_str = str(dtype)
                    if any(dtype_str.startswith(prefix) for prefix in ['Int', 'Float']):
                        if col_name not in numeric_cols:
                            numeric_cols[col_name] = dtype
                        elif str(numeric_cols[col_name]).startswith('Int') and dtype_str.startswith('Float'):
                            # Promote to Float64 if we see both Int and Float
                            numeric_cols[col_name] = pl.Float64
            
            for f in frames:
                # Add missing columns
                missing = [c for c in ordered_cols if c not in f.columns]
                if missing:
                    f = f.with_columns([pl.lit(None).alias(c) for c in missing])
                
                # Fix data types for numeric columns to ensure consistency
                casts = []
                for col_name, dtype in numeric_cols.items():
                    if col_name in f.columns:
                        # Ensure all numeric columns use the same data type (prefer Float64 for consistency)
                        if dtype == pl.Float64 or str(dtype).startswith('Float'):
                            casts.append(pl.col(col_name).cast(pl.Float64, strict=False))
                
                if casts:
                    f = f.with_columns(casts)
                
                f = f.select(ordered_cols)
                aligned.append(f)
                
            merged_df = pl.concat(aligned, how='vertical')
            
            # Filter by threshold date if provided
            if threshold_date and 'date' in merged_df.columns:
                # Clean up threshold date and convert to proper date
                threshold_date_clean = threshold_date.split('T')[0] if 'T' in threshold_date else threshold_date
                
                # Use datetime to parse the date string and convert to a Date object
                from datetime import datetime
                threshold_date_obj = datetime.strptime(threshold_date_clean, "%Y-%m-%d").date()
                
                # Convert to polars Date expression
                threshold_date_pl = pl.lit(threshold_date_obj).cast(pl.Date)
                
                # Filter with properly typed date
                merged_df = merged_df.filter(pl.col('date') >= threshold_date_pl)
                
            base_cols = [c for c in ('symbol','date','fiscalYear','period') if c in merged_df.columns]
            if base_cols:
                merged_df = merged_df.unique(subset=base_cols, keep='last')
            print(f"merged_df rows (lookback+current): {merged_df.height}")
            try:
                sample_cols = [c for c in ['symbol','date','fiscalYear','period'] if c in merged_df.columns]
                sample_cols += [c for c in merged_df.columns if c not in sample_cols][:5]
                print(merged_df.select(sample_cols).head())
            except Exception:
                print(merged_df.head())

Key Metrics - total symbols to consider: 11796
Existing metrics max(date): 2025-07-05T00:00:00
Computed quarterly lookback threshold_date: 2025-07-05T00:00:00
Latest API available date (probe): None
Fetching metrics for 11796 symbols (threshold_date=2025-07-05T00:00:00) with 32 workers...
Progress: 500/11796 symbols, collected 10 non-empty frames, elapsed 0.2m
Progress: 1000/11796 symbols, collected 19 non-empty frames, elapsed 0.3m
Progress: 1500/11796 symbols, collected 30 non-empty frames, elapsed 0.4m
Progress: 2000/11796 symbols, collected 36 non-empty frames, elapsed 0.6m
Progress: 2500/11796 symbols, collected 45 non-empty frames, elapsed 1.2m
Progress: 3000/11796 symbols, collected 59 non-empty frames, elapsed 2.4m
Progress: 3500/11796 symbols, collected 66 non-empty frames, elapsed 1.5m
Progress: 4000/11796 symbols, collected 69 non-empty frames, elapsed 1.6m
Progress: 4500/11796 symbols, collected 72 non-empty frames, elapsed 2.4m
Progress: 5000/11796 symbols, collected 82 no

In [None]:
# Load to DuckDB (incremental append on (symbol,date,fiscalYear,period))
import polars as pl
if 'merged_df' in globals() and isinstance(merged_df, pl.DataFrame) and not merged_df.is_empty():
    import duckdb
    con = duckdb.connect('americas.db')
    # Ensure base types for key columns
    casts = []
    if 'symbol' in merged_df.columns: casts.append(pl.col('symbol').cast(pl.Utf8, strict=False))
    if 'date' in merged_df.columns: casts.append(pl.col('date').cast(pl.Date, strict=False))
    if 'fiscalYear' in merged_df.columns: casts.append(pl.col('fiscalYear').cast(pl.Int64, strict=False))
    if 'period' in merged_df.columns: casts.append(pl.col('period').cast(pl.Utf8, strict=False))
    merged_df = merged_df.with_columns(casts) if casts else merged_df

    con.register('metrics_new', merged_df.to_pandas())
    # Create table if not exists
    con.execute("CREATE TABLE IF NOT EXISTS metrics AS SELECT * FROM metrics_new LIMIT 0")

    # Determine natural key columns present
    key_cols = [c for c in ('symbol','date','fiscalYear','period') if c in merged_df.columns]
    if not key_cols:  # fallback
        key_cols = [c for c in ('symbol','date') if c in merged_df.columns]

    # Build NOT EXISTS predicate
    predicate = ' AND '.join([f"m.{c} = n.{c}" for c in key_cols]) if key_cols else '1=0'

    insert_sql = f"""
        INSERT INTO metrics
        SELECT n.* FROM metrics_new n
        WHERE NOT EXISTS (
            SELECT 1 FROM metrics m WHERE {predicate}
        )
    """
    try:
        con.execute(insert_sql)
        new_rows = con.execute("SELECT COUNT(*) FROM metrics_new n WHERE NOT EXISTS (SELECT 1 FROM metrics m WHERE " + predicate + ")").fetchone()[0] if False else None
    except Exception as e:
        print(f"Incremental metrics insert failed: {e}")
    finally:
        con.close()
else:
    print("No new key metrics/ratios data to load.")

Incremental metrics insert failed: Binder Error: table metrics has 107 columns but 109 values were supplied


## Exchanges → DuckDB

Fetch all exchanges, keep only Americas via heuristic, and persist to americas.db.exchanges.

In [None]:
# Exchanges → DuckDB (incremental)
import os, requests, polars as pl
API_KEY=os.getenv("FMP_API_KEY"); _params={"apikey":API_KEY} if API_KEY else {}

try:
    r=requests.get("https://financialmodelingprep.com/stable/available-exchanges", params=_params, timeout=60); r.raise_for_status(); ex_data=r.json(); ex_data=ex_data if isinstance(ex_data,list) else []
except Exception: ex_data=[]

CTRY={"UNITED STATES","CANADA","BRAZIL","MEXICO","ARGENTINA","CHILE","PERU","COLOMBIA","VENEZUELA","URUGUAY","PARAGUAY","BOLIVIA","ECUADOR","GUYANA","SURINAME","FRENCH GUIANA","JAMAICA","TRINIDAD AND TOBAGO","TRINIDAD & TOBAGO","BARBADOS","BAHAMAS","BERMUDA","CAYMAN ISLANDS","PANAMA","COSTA RICA","GUATEMALA","HONDURAS","EL SALVADOR","NICARAGUA","DOMINICAN REPUBLIC","HAITI","PUERTO RICO","BELIZE","CURACAO","ARUBA","SAINT LUCIA","GRENADA","ST. VINCENT AND THE GRENADINES"}
ISO2={"US","CA","BR","MX","AR","CL","PE","CO","VE","UY","PY","BO","EC","GY","SR","GF","JM","TT","BB","BS","BM","KY","PA","CR","GT","HN","SV","NI","DO","H","PR","BZ","LC","GD","VC","CW","AW"}

def is_amer_exchange(rec: dict)->bool:
    for k in ("region","continent"):
        v=rec.get(k)
        if isinstance(v,str) and "AMERICA" in v.upper(): return True
    for k in ("countryName","country","countryCode","country_code","country_iso2"):
        v=rec.get(k); vu=v.strip().upper() if isinstance(v,str) else ""
        if vu and ((len(vu)<=3 and vu in ISO2) or vu in CTRY or "UNITED STATES" in vu or "LATIN AMERICA" in vu): return True
    return False

ex_df = pl.DataFrame([rec for rec in ex_data if isinstance(rec,dict) and is_amer_exchange(rec)])
if not ex_df.is_empty():
    ren={}; ren["countryName"]="country" if "countryName" in ex_df.columns and "country" not in ex_df.columns else None; ren={k:v for k,v in ren.items() if v}
    ex_df = ex_df.rename(ren) if ren else ex_df
    ex_df = ex_df.with_columns([pl.all().cast(pl.Utf8, strict=False)])

print(ex_df.shape); print(ex_df.head())

if not ex_df.is_empty():
    import duckdb
    con=duckdb.connect('americas.db')
    con.register('exchanges_new', ex_df.to_pandas())
    con.sql("""
        CREATE TABLE IF NOT EXISTS exchanges AS
        SELECT * FROM exchanges_new LIMIT 0
    """)
    
    # Check which columns exist in the dataframe
    columns = ex_df.columns
    
    # Use exchange as the primary key since it's always present
    if 'exchange' in columns:
        con.sql("""
            INSERT INTO exchanges
            SELECT e.* FROM exchanges_new e
            WHERE NOT EXISTS (
                SELECT 1 FROM exchanges x
                WHERE x.exchange = e.exchange
            )
        """)
    con.close()
else:
    print("No Americas exchanges found from API; skipping DuckDB load.")

(14, 6)
shape: (5, 6)
┌──────────┬───────────────────────────┬──────────────────┬─────────────┬──────────────┬───────────┐
│ exchange ┆ name                      ┆ country          ┆ countryCode ┆ symbolSuffix ┆ delay     │
│ ---      ┆ ---                       ┆ ---              ┆ ---         ┆ ---          ┆ ---       │
│ str      ┆ str                       ┆ str              ┆ str         ┆ str          ┆ str       │
╞══════════╪═══════════════════════════╪══════════════════╪═════════════╪══════════════╪═══════════╡
│ AMEX     ┆ New York Stock Exchange   ┆ United States of ┆ US          ┆ N/A          ┆ Real-time │
│          ┆ Arca                      ┆ America          ┆             ┆              ┆           │
│ BUE      ┆ Buenos Aires Stock        ┆ Argentina        ┆ AR          ┆ .BA          ┆ 20 min    │
│          ┆ Exchange                  ┆                  ┆             ┆              ┆           │
│ BVC      ┆ Colombia Stock Exchange   ┆ Colombia         ┆ CO       

## Indices → DuckDB

Fetch index list, keep those on Americas exchanges, then persist as americas.db.indices.

In [None]:
# Indices → DuckDB (incremental)
import os, requests, polars as pl
API_KEY=os.getenv("FMP_API_KEY"); _params={"apikey":API_KEY} if API_KEY else {}

try:
    r=requests.get("https://financialmodelingprep.com/stable/index-list", params=_params, timeout=60); r.raise_for_status(); idx_data=r.json(); idx_data=idx_data if isinstance(idx_data,list) else []
except Exception: idx_data=[]

amer_exchanges = {e.upper() for e in EXCHANGES_AMERICAS} if 'EXCHANGES_AMERICAS' in globals() and EXCHANGES_AMERICAS else {"NYSE","NASDAQ","AMEX","ARCX","NYS","NAS","ARC","BATS","OTC","TSX","TSXV","CSE","XTSE","XTSX","CHIC","B3","BMFBOVESPA","BOV","XBOV","XMEX","BMV"}
idx_df = pl.DataFrame([rec for rec in idx_data if isinstance(rec,dict) and str(rec.get("exchange","" )).upper() in amer_exchanges])
if not idx_df.is_empty(): idx_df=idx_df.with_columns([pl.all().cast(pl.Utf8, strict=False)])

print(idx_df.shape); print(idx_df.head())

if not idx_df.is_empty():
    import duckdb
    con=duckdb.connect('americas.db')
    con.register('indices_new', idx_df.to_pandas())
    con.sql("""
        CREATE TABLE IF NOT EXISTS indices AS
        SELECT * FROM indices_new LIMIT 0
    """)
    # Natural key = symbol (assumed unique for indices list)
    con.sql("""
        INSERT INTO indices
        SELECT i.* FROM indices_new i
        WHERE NOT EXISTS (SELECT 1 FROM indices x WHERE x.symbol = i.symbol)
    """)
    con.close()
else:
    print("No Americas indices found from API; skipping DuckDB load.")

(36, 4)
shape: (5, 4)
┌────────┬─────────────────────────────────┬──────────┬──────────┐
│ symbol ┆ name                            ┆ exchange ┆ currency │
│ ---    ┆ ---                             ┆ ---      ┆ ---      │
│ str    ┆ str                             ┆ str      ┆ str      │
╞════════╪═════════════════════════════════╪══════════╪══════════╡
│ ^TTIN  ┆ S&P/TSX Capped Industrials Ind… ┆ TSX      ┆ CAD      │
│ ^NYA   ┆ NYSE Composite                  ┆ NYSE     ┆ USD      │
│ ^XAX   ┆ NYSE American Composite Index   ┆ NYSE     ┆ USD      │
│ ^NYITR ┆ NYSE International 100 Index    ┆ NYSE     ┆ USD      │
│ ^DJU   ┆ Dow Jones Utility Average       ┆ NASDAQ   ┆ USD      │
└────────┴─────────────────────────────────┴──────────┴──────────┘


## Index Quotes (historical light) → DuckDB

Parallel fetch daily light quotes for Americas indices since 2010‑01‑01 and write to americas.db.index_quotes (unique by symbol,date).

In [None]:
# Index quotes (light) → DuckDB
import os, time, requests, polars as pl, concurrent.futures as cf
from io import StringIO

API_KEY=os.getenv("FMP_API_KEY"); _params={"apikey": API_KEY} if API_KEY else {}
BASE_URL="https://financialmodelingprep.com/stable/historical-price-eod/light"

# 1) Gather index symbols
index_symbols = sorted({s for s in idx_df.get_column('symbol').to_list() if isinstance(s,str) and s.strip()}) if 'idx_df' in globals() and isinstance(idx_df, pl.DataFrame) and not idx_df.is_empty() else []
if not index_symbols:
    try:
        import duckdb; con=duckdb.connect('americas.db'); res=con.sql("SELECT symbol FROM indices").fetchall(); con.close(); index_symbols=sorted({r[0] for r in res if isinstance(r[0],str) and r[0].strip()})
    except Exception: index_symbols=[]
print(f"Index symbols to fetch: {len(index_symbols)}")

# 2) Fetch per symbol
def fetch_index_quotes(symbol: str, start: str="2010-01-01", max_retries: int=3, sleep_base: float=0.4)->pl.DataFrame:
    q={"symbol":symbol,"from":start, **_params}; a=0
    while True:
        try:
            r=requests.get(BASE_URL, params=q, timeout=90); s=r.status_code
            if s in (429,) or s>=500:
                if a<max_retries: time.sleep(sleep_base*(2**a)); a+=1; continue
            if s==403: return pl.DataFrame()
            r.raise_for_status(); data=r.json(); data=[data] if isinstance(data,dict) else data
            if not isinstance(data,list) or not data: return pl.DataFrame()
            df=pl.DataFrame(data)
            casts=[]
            if "date" in df.columns: casts.append(pl.col("date").cast(pl.Utf8).str.to_date("%Y-%m-%d"))
            if "price" in df.columns: casts.append(pl.col("price").cast(pl.Float64, strict=False))
            if "volume" in df.columns: casts.append(pl.col("volume").cast(pl.Int64, strict=False))
            df=df.with_columns(casts) if casts else df
            if "symbol" not in df.columns: df=df.with_columns([pl.lit(symbol).alias("symbol")])
            return df
        except Exception:
            if a<max_retries: time.sleep(sleep_base*(2**a)); a+=1; continue
            return pl.DataFrame()

# 3) Parallel fetch
frames=[]
if index_symbols:
    max_workers=min(16, max(4, (os.cpu_count() or 8)))
    with cf.ThreadPoolExecutor(max_workers=max_workers) as ex:
        for fut in cf.as_completed({ex.submit(fetch_index_quotes, s): s for s in index_symbols}):
            df=fut.result(); frames.append(df) if isinstance(df,pl.DataFrame) and not df.is_empty() else None

index_quotes_df=pl.DataFrame() if not frames else pl.concat(frames, how="vertical_relaxed")

# Unique by (symbol,date)
if not index_quotes_df.is_empty():
    keep=[c for c in index_quotes_df.columns if c in {"symbol","date","price","volume"}] or index_quotes_df.columns
    index_quotes_df=index_quotes_df.select(keep).unique(subset=["symbol","date"], keep="last").sort(["symbol","date"])

print(index_quotes_df.shape); print(index_quotes_df.head())

# 4) Load to DuckDB
if not index_quotes_df.is_empty():
    import duckdb
    con=duckdb.connect('americas.db'); con.sql("CREATE OR REPLACE TABLE index_quotes AS SELECT * FROM index_quotes_df"); con.close()
else:
    print("No index quotes fetched; skipping DuckDB load.")

Index symbols to fetch: 36
(111420, 4)
shape: (5, 4)
┌─────────┬────────────┬────────────┬───────────┐
│ symbol  ┆ date       ┆ price      ┆ volume    │
│ ---     ┆ ---        ┆ ---        ┆ ---       │
│ str     ┆ date       ┆ f64        ┆ i64       │
╞═════════╪════════════╪════════════╪═══════════╡
│ TX60.TS ┆ 2020-11-03 ┆ 948.200012 ┆ 94504821  │
│ TX60.TS ┆ 2020-11-04 ┆ 952.299988 ┆ 179295563 │
│ TX60.TS ┆ 2020-11-05 ┆ 968.52002  ┆ 127428692 │
│ TX60.TS ┆ 2020-11-06 ┆ 966.869995 ┆ 118405024 │
│ TX60.TS ┆ 2020-11-09 ┆ 980.429993 ┆ 246182781 │
└─────────┴────────────┴────────────┴───────────┘


## Verify database tables and record counts

In [None]:
# Verify database tables and record counts
import duckdb
con = duckdb.connect('americas.db')
tables = con.sql("SHOW TABLES").fetchall()
print("Tables in the database:")
for table in tables:
    count = con.sql(f"SELECT COUNT(*) AS count FROM {table[0]}").fetchone()[0]
    print(f"  - {table[0]}: {count:,} rows")

# Check the latest dates in the quotes and index_quotes tables
if any(t[0] == 'quotes' for t in tables):
    last_quote_date = con.sql("SELECT MAX(date) FROM quotes").fetchone()[0]
    print(f"Latest quote date: {last_quote_date}")

if any(t[0] == 'index_quotes' for t in tables):
    last_index_date = con.sql("SELECT MAX(date) FROM index_quotes").fetchone()[0] 
    print(f"Latest index quote date: {last_index_date}")

con.close()

Tables in the database:
  - exchanges: 14 rows
  - index_quotes: 111,420 rows
  - indices: 36 rows
  - metrics: 525,189 rows
  - profiles: 11,837 rows
  - quotes: 34,063,560 rows
  - tickers: 11,837 rows
Latest quote date: 2025-09-11 00:00:00
Latest index quote date: 2025-09-11
