In [5]:
import duckdb

import os
import time
import math
import requests
import pandas as pd
from typing import List, Dict, Optional, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed

# ================== USER SETTINGS ==================
API_KEY = "c5PobUQjaaMTHySILWqmWi9uyIDqYJBi"

SINGLE_DAY_MODE = False        # True -> from=to=TARGET_DATE; False -> use DATE_FROM/DATE_TO
TARGET_DATE = "2025-10-07"    # YYYY-MM-DD

# If SINGLE_DAY_MODE is False, set these:
DATE_FROM = "2016-01-01"
DATE_TO   = "2025-10-08"
# ===================================================

In [None]:
db_path = "/Users/martingobbo/stock-dashboard/data/serving/analytics.duckdb"

con = duckdb.connect(db_path)

tickers_duck = con.execute("SELECT ticker FROM dim_ticker").fetchall()
tickers = [t[0] for t in tickers_duck]
TICKERS: List[str] = tickers 
con.close()

print(len(tickers))


In [6]:
# ✅ Corrected ticker list and variable name
tickers = ['CELH', '']
TICKERS: list[str] = tickers

print(len(tickers))

"""
Daily OHLCV downloader (FMP) — concurrent, batched, with retries
- Reads from a Python list of tickers (or optional CSV)
- Fetches daily bars for a single date (from=to=TARGET_DATE) or a range
- Concurrency capped to ~4 in-flight requests (safe for FMP Starter)
- Short pauses between batches
- Retries 429/5xx with exponential backoff
- Prints a summary of tickers with no data for the requested day
- Produces a DataFrame `data` (no files written)
"""


# ---- Tuning (fast but generally safe for FMP Starter) -------------------------
# Parallel workers: FMP commonly allows ~4 parallel requests safely.
MAX_WORKERS = 4
# Batch size: how many symbols to schedule per wave
BATCH_SIZE = 100
# Pause between batches (seconds). Keep small to speed up end-to-end.
SLEEP_BETWEEN_BATCHES = 2.0
# Per-request connect/read timeout
REQUEST_TIMEOUT = 20
# Max retries for 429/5xx
MAX_RETRIES = 3
# Backoff base (seconds) for 429/5xx
BACKOFF_BASE = 1.5
# -----------------------------------------------------------------------------

def _daterange() -> Tuple[str, str]:
    if SINGLE_DAY_MODE:
        return TARGET_DATE, TARGET_DATE
    return DATE_FROM, DATE_TO

def fetch_daily(symbol: str, date_from: str, date_to: str) -> Optional[List[Dict]]:
    """
    Fetch daily bars for a symbol over [date_from, date_to].
    Retries on 429 and transient 5xx.
    Returns list of bar dicts or None on hard failure.
    """
    url = f"https://financialmodelingprep.com/api/v3/historical-price-full/{symbol}"
    params = {"from": date_from, "to": date_to, "apikey": API_KEY}

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            r = requests.get(url, params=params, timeout=REQUEST_TIMEOUT)
        except requests.RequestException:
            # transient network error — backoff & retry
            if attempt < MAX_RETRIES:
                time.sleep(BACKOFF_BASE ** attempt)
                continue
            return None

        # Handle rate limiting / transient server errors
        if r.status_code in (429, 502, 503, 504):
            if attempt < MAX_RETRIES:
                # Try to respect Retry-After if present
                retry_after = r.headers.get("Retry-After")
                delay = float(retry_after) if retry_after else (BACKOFF_BASE ** attempt)
                time.sleep(delay)
                continue
            return None

        if r.status_code != 200:
            # Hard failure; don't retry further
            return None

        try:
            js = r.json()
        except ValueError:
            return None

        return js.get("historical", [])

    return None  # unreachable, but explicit

def chunked(lst: List[str], n: int):
    for i in range(0, len(lst), n):
        yield lst[i:i+n]

date_from, date_to = _daterange()
all_rows: List[Dict] = []
no_data: List[str] = []       # symbols that returned zero rows in the requested window
hard_fail: List[str] = []     # symbols that errored out after retries

total = len(TICKERS)
batches = list(chunked(TICKERS, BATCH_SIZE))

for bi, batch in enumerate(batches, start=1):
    print(f"Batch {bi}/{len(batches)}: {len(batch)} symbols")
    futures = {}
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
        for sym in batch:
            futures[ex.submit(fetch_daily, sym, date_from, date_to)] = sym

        for fut in as_completed(futures):
            sym = futures[fut]
            hist = fut.result()
            if hist is None:
                hard_fail.append(sym)
                continue

            # Keep only rows that match the exact day in single-day mode
            if SINGLE_DAY_MODE:
                hist = [h for h in hist if h.get("date") == TARGET_DATE]

            if not hist:
                no_data.append(sym)
                continue

            for h in hist:
                all_rows.append({
                    "date": h.get("date"),
                    "ticker": sym,
                    "open": h.get("open"),
                    "high": h.get("high"),
                    "low": h.get("low"),
                    "close": h.get("close"),
                    "adjClose": h.get("adjClose"),
                    "volume": h.get("volume"),
                })

    if bi < len(batches):
        time.sleep(SLEEP_BETWEEN_BATCHES)

# Build final DataFrame
data = pd.DataFrame(all_rows)
if not data.empty:
    data = data.sort_values(["date", "ticker"]).reset_index(drop=True)

# Progress / diagnostics
fetched = data["ticker"].nunique() if not data.empty else 0
print(f"\nDone. Tickers requested: {total}")
print(f"Tickers with rows returned: {fetched}")
print(f"Rows fetched: {len(data)}")

if no_data:
    print("\nNo rows returned for the requested date/window:")
    # Show a few, then count
    preview = ", ".join(no_data[:20])
    more = f" ... (+{len(no_data)-20} more)" if len(no_data) > 20 else ""
    print(preview + more)

if hard_fail:
    print("\nFailed after retries (HTTP/network errors):")
    preview = ", ".join(hard_fail[:20])
    more = f" ... (+{len(hard_fail)-20} more)" if len(hard_fail) > 20 else ""
    print(preview + more)

# Show head for quick inspection
print("\nHead:")
print(data.head())



7
Batch 1/1: 7 symbols

Done. Tickers requested: 7
Tickers with rows returned: 7
Rows fetched: 12259

Head:
         date ticker        open        high         low   close  adjClose  \
0  2016-01-04   BF-B   39.010000   39.160000   38.000000   38.88     32.81   
1  2016-01-04  BRK-B  130.160004  131.029999  128.759995  130.75    130.75   
2  2016-01-04    EME   47.280000   47.570000   46.060000   46.25     44.36   
3  2016-01-04   IBKR   10.670000   10.790000   10.560000   10.61      9.93   
4  2016-01-05   BF-B   38.830000   39.020000   38.480000   38.90     32.83   

    volume  
0  3212870  
1  6869100  
2   318400  
3  3920020  
4  2395675  


In [8]:
# === Backfill New Tickers: add to dim_ticker, load 10y OHLCV, compute + write analytics ===
import math, numpy as np, pandas as pd, duckdb
from datetime import datetime

# ================== USER INPUTS ==================
DB_PATH = "/Users/martingobbo/stock-dashboard/data/serving/analytics.duckdb"

# 1) List the tickers you just added/downloaded (exact symbols as in `data['ticker']`)
NEW_TICKERS = tickers      # <-- EDIT
TICKER_TYPE = "Stock"             # per your note, all are Stock

# 2) Your downloaded OHLCV is expected in a DataFrame called `data`
#    Must include at least: ticker, date, open, high, low, close, adjClose, volume
assert "data" in globals(), "Please ensure your downloaded DataFrame is available as `data`."

# ================== NORMALIZE DOWNLOADED DATA ==================
df_raw = data.copy()
# Standardize column names
df_raw = df_raw.rename(columns={"adjClose":"adj_close", "date":"dt"})
# Keep the essential set; ignore extra columns if present
required = ["ticker","dt","open","high","low","close","adj_close","volume"]
missing = [c for c in required if c not in df_raw.columns]
if missing:
    raise ValueError(f"Downloaded `data` is missing required columns: {missing}")

# Coerce types
df_raw["ticker"] = df_raw["ticker"].astype(str).str.upper()
df_raw["dt"] = pd.to_datetime(df_raw["dt"]).dt.date
for c in ["open","high","low","close","adj_close","volume"]:
    df_raw[c] = pd.to_numeric(df_raw[c], errors="coerce")

# Only keep the new tickers we care about (defensive)
df_raw = df_raw[df_raw["ticker"].isin([t.upper() for t in NEW_TICKERS])].copy()
if df_raw.empty:
    raise ValueError("No rows in `data` match NEW_TICKERS. Check symbols or inputs.")

# ================== CONNECT (write-enabled) ==================
con = duckdb.connect(DB_PATH)  # NOT read_only

# Ensure dim_ticker exists minimally (optional safety)
con.execute("""
CREATE TABLE IF NOT EXISTS dim_ticker (
  ticker_id   INTEGER PRIMARY KEY,
  ticker      VARCHAR NOT NULL UNIQUE,
  ticker_type VARCHAR
);
""")

# ================== 1) INSERT NEW TICKERS INTO dim_ticker ==================
# Fetch existing tickers/ids
dim_t = con.execute("SELECT ticker_id, UPPER(ticker) AS ticker FROM dim_ticker").df()
existing = set(dim_t["ticker"]) if not dim_t.empty else set()

incoming = pd.DataFrame({"ticker":[t.upper() for t in NEW_TICKERS]})
incoming = incoming.drop_duplicates(ignore_index=True)
to_insert = incoming[~incoming["ticker"].isin(existing)].copy()

if not to_insert.empty:
    # Determine next IDs
    max_id = con.execute("SELECT COALESCE(MAX(ticker_id), 0) FROM dim_ticker").fetchone()[0]
    to_insert["ticker_id"] = [max_id + i + 1 for i in range(len(to_insert))]
    to_insert["ticker_type"] = TICKER_TYPE

    # Write
    con.register("df_new_dim_ticker", to_insert[["ticker_id","ticker","ticker_type"]])
    con.execute("""
        INSERT INTO dim_ticker (ticker_id, ticker, ticker_type)
        SELECT ticker_id, ticker, ticker_type
        FROM df_new_dim_ticker;
    """)
else:
    # Nothing to insert
    pass

# Refresh map
dim_t = con.execute("SELECT ticker_id, UPPER(ticker) AS ticker FROM dim_ticker").df()
ticker_map = dict(zip(dim_t["ticker"], dim_t["ticker_id"]))

# Map ticker_id into price DataFrame
df_prices = df_raw.copy()
df_prices["ticker_id"] = df_prices["ticker"].map(ticker_map)
if df_prices["ticker_id"].isna().any():
    missing_syms = sorted(df_prices.loc[df_prices["ticker_id"].isna(),"ticker"].unique().tolist())
    raise ValueError(f"These tickers do not exist in dim_ticker and could not be mapped: {missing_syms}")

# ================== 2) UPSERT PRICES INTO fact_price_daily ==================
con.execute("""
CREATE TABLE IF NOT EXISTS fact_price_daily (
  ticker_id INTEGER NOT NULL,
  dt        DATE    NOT NULL,
  open      DOUBLE,
  high      DOUBLE,
  low       DOUBLE,
  close     DOUBLE,
  adj_close DOUBLE,
  volume    DOUBLE,
  PRIMARY KEY (ticker_id, dt)
);
""")

con.register("df_price_src", df_prices[["ticker_id","dt","open","high","low","close","adj_close","volume"]])
con.execute("""
BEGIN TRANSACTION;
MERGE INTO fact_price_daily AS t
USING df_price_src AS s
ON t.ticker_id = s.ticker_id AND t.dt = s.dt
WHEN MATCHED THEN UPDATE SET
  open = s.open, high = s.high, low = s.low,
  close = s.close, adj_close = s.adj_close, volume = s.volume
WHEN NOT MATCHED THEN INSERT (ticker_id, dt, open, high, low, close, adj_close, volume)
VALUES (s.ticker_id, s.dt, s.open, s.high, s.low, s.close, s.adj_close, s.volume);
COMMIT;
""")

# ================== 3) COMPUTE ANALYTICS FOR NEW TICKERS (ALL DATES) ==================
TRADING_DAYS_PER_YEAR = 252
WIN_05=5; WIN_10=10; WIN_15=15; WIN_20=20; WIN_50=50; WIN_60=60; WIN_100=100; WIN_200=200; WIN_252=252; WIN_300=300; WIN_750=750
SMA_POS_LEN = 3

def _ols_slope(arr: np.ndarray) -> float:
    m = np.isfinite(arr)
    y = arr[m]
    n = y.size
    if n < 2: return np.nan
    x = np.arange(n, dtype=float)
    xm, ym = x.mean(), y.mean()
    denom = np.sum((x-xm)**2)
    if denom == 0: return np.nan
    return float(np.sum((x-xm)*(y-ym))/denom)

def _max_dd_only(a: np.ndarray) -> float:
    a = np.asarray(a, float)
    if not np.isfinite(a).any(): return np.nan
    # start at first finite positive
    peak = np.nan; started=False
    best_dd=0.0
    for v in a:
        if not math.isfinite(v) or v<=0: continue
        if not started:
            peak=v; started=True; continue
        if v>peak: peak=v
        dd = v/peak - 1.0
        if dd < best_dd: best_dd = dd
    return float(best_dd) if started else np.nan

def _max_dd_dur_only(a: np.ndarray) -> float:
    a = np.asarray(a, float)
    if not np.isfinite(a).any(): return np.nan
    # duration since peak to trough (index distance)
    peak = -1; peak_val = -np.inf
    best_dd=0.0; best_dur=0; cur_peak_idx=None
    for i,v in enumerate(a):
        if not math.isfinite(v) or v<=0: continue
        if v>peak_val:
            peak_val=v; cur_peak_idx=i
        dd = v/peak_val - 1.0
        if dd < best_dd:
            best_dd = dd
            best_dur = i - cur_peak_idx
    return float(best_dur)

# Pull just the new tickers' full price history from fact_price_daily (to guarantee continuity/order)
new_ids = tuple(sorted(set(df_prices["ticker_id"].tolist())))
hist = con.execute(f"""
SELECT t.ticker_id, d.ticker, f.dt::DATE AS dt,
       f.open, f.high, f.low, f.close, f.adj_close, f.volume
FROM fact_price_daily f
JOIN dim_ticker d USING (ticker_id)
JOIN (SELECT UNNEST([{",".join(map(str,new_ids))}]) AS ticker_id) t USING (ticker_id)
ORDER BY ticker_id, dt
""").df()

# Compute metrics per ticker across ALL dates
def compute_metrics(g: pd.DataFrame) -> pd.DataFrame:
    g = g.sort_values("dt").copy()
    close = g["adj_close"].astype(float)
    open_ = g["open"].astype(float)
    high  = g["high"].astype(float)
    low   = g["low"].astype(float)
    vol   = pd.to_numeric(g["volume"], errors="coerce").astype(float)

    # logs & returns
    close_safe = close.replace(0, np.nan)
    lp = np.log(close_safe)
    prev = close_safe.shift(1)
    with np.errstate(divide="ignore", invalid="ignore"):
        lr = np.where((close_safe>0) & (prev>0), np.log(close_safe/prev), np.nan)
    lr = pd.Series(lr, index=g.index)
    if len(g)>0 and pd.isna(lr.iloc[0]): lr.iloc[0]=0.0

    # MAs
    ma20  = close.rolling(WIN_20, 1).mean()
    ma50  = close.rolling(WIN_50, 1).mean()
    ma100 = close.rolling(WIN_100,1).mean()
    ma200 = close.rolling(WIN_200,1).mean()

    # VWAP20
    dv = close*vol
    vwap20 = (dv.rolling(WIN_20,1).sum())/(vol.rolling(WIN_20,1).sum().replace(0,np.nan))

    # volume/dollar-volume signals
    with np.errstate(divide="ignore", invalid="ignore"):
        ln_dv = np.where((vol>0)&(close>0), np.log(vol*close), np.nan)
    ln_dv = pd.Series(ln_dv, index=g.index)
    vol_accel_5  = ln_dv - ln_dv.shift(5)
    vol_accel_10 = ln_dv - ln_dv.shift(10)

    avg10_dv = dv.rolling(WIN_10,1).mean()
    avg60_dv = dv.rolling(WIN_60,1).mean()
    std60_dv = dv.rolling(WIN_60,2).std(ddof=1)
    with np.errstate(invalid="ignore", divide="ignore"):
        abn_vol_60d = (avg10_dv - avg60_dv)/std60_dv

    # Ann vol
    vol20_ann  = pd.Series(lr).rolling(WIN_20,2).std(ddof=1)*np.sqrt(TRADING_DAYS_PER_YEAR)
    vol100_ann = pd.Series(lr).rolling(WIN_100,2).std(ddof=1)*np.sqrt(TRADING_DAYS_PER_YEAR)
    mean100    = pd.Series(lr).rolling(WIN_100,1).mean()

    # Range/position
    low10  = low.rolling(WIN_10,1).min()
    high10 = high.rolling(WIN_10,1).max()
    rng10  = high10 - low10
    with np.errstate(invalid="ignore", divide="ignore"):
        pos10 = (close - low10)/rng10
    pos10 = pd.Series(pos10, index=g.index).fillna(0.0)
    five_day_range_pos = pos10.rolling(SMA_POS_LEN,1).mean()

    daily_rng = (high-low)
    avg_rng10 = daily_rng.rolling(WIN_10,1).mean()
    avg_rng60 = daily_rng.rolling(WIN_60,1).mean()
    std_rng60 = daily_rng.rolling(WIN_60,2).std(ddof=1)
    with np.errstate(invalid="ignore", divide="ignore"):
        z_60_10_rng = (avg_rng10 - avg_rng60)/std_rng60
    z_60_10_rng = pd.Series(z_60_10_rng, index=g.index).fillna(0.0)

    def sret(lag): 
        with np.errstate(divide="ignore", invalid="ignore"):
            return np.where((close>0)&(close.shift(lag)>0), np.log(close/close.shift(lag)), 0.0)
    ret5   = sret(5);   ret10  = sret(10); ret20  = sret(20)
    ret40  = sret(40);  ret60  = sret(60); ret200 = sret(200); ret300 = sret(300)

    med100 = pd.Series(lr).rolling(WIN_100,1).median()

    # 100d drawdown percent/duration on calendar dates
    closes_by_date = pd.Series(close.values, index=pd.to_datetime(g["dt"]))
    def _dd_pct(w: pd.Series) -> float:
        w = w.dropna()
        if len(w)<=1: return 0.0
        dmax = w.idxmax(); maxc = w.loc[dmax]; tail = w.loc[dmax:]
        if tail.empty: return 0.0
        return float(tail.min()/maxc - 1.0)
    def _dd_dur(w: pd.Series) -> float:
        w = w.dropna()
        if len(w)<=1: return 0.0
        dmax = w.idxmax(); dmin = w.loc[dmax:].idxmin()
        return float((dmin - dmax).days)
    dd_pct_100 = closes_by_date.rolling(WIN_100,1).apply(_dd_pct, raw=False).values
    dd_dur_100 = closes_by_date.rolling(WIN_100,1).apply(_dd_dur, raw=False).values

    # non-annualized vols
    lr_s = pd.Series(lr, index=g.index)
    vol5   = lr_s.rolling(WIN_05, WIN_05).std(ddof=1)
    vol15  = lr_s.rolling(WIN_15, WIN_15).std(ddof=1)
    vol60  = lr_s.rolling(WIN_60, WIN_60).std(ddof=1)
    vol252 = lr_s.rolling(WIN_252,WIN_252).std(ddof=1)

    neg = np.minimum(lr_s,0.0); pos = np.maximum(lr_s,0.0)
    dd15  = (neg.pow(2).rolling(WIN_15,WIN_15).mean())**0.5
    dd60  = (neg.pow(2).rolling(WIN_60,WIN_60).mean())**0.5
    dd252 = (neg.pow(2).rolling(WIN_252,WIN_252).mean())**0.5
    ud15  = (pos.pow(2).rolling(WIN_15,WIN_15).mean())**0.5
    ud60  = (pos.pow(2).rolling(WIN_60,WIN_60).mean())**0.5
    ud252 = (pos.pow(2).rolling(WIN_252,WIN_252).mean())**0.5

    # Parkinson 20d
    with np.errstate(divide="ignore", invalid="ignore"):
        hl_log = np.log((high.replace(0,np.nan)) / (low.replace(0,np.nan)))
    k = 1.0/(4.0*math.log(2.0))
    pk20 = np.sqrt(k*(hl_log.pow(2).rolling(WIN_20,WIN_20).mean()))

    # change in 10d cum log returns
    sum10 = lr_s.rolling(WIN_10,WIN_10).sum()
    change10 = sum10 - sum10.shift(WIN_10)

    # slope accel of log_price over 60d
    slope60 = pd.Series(np.log(close.replace(0,np.nan))).rolling(WIN_60,WIN_60).apply(_ols_slope, raw=True)
    slope60_prev = slope60.shift(WIN_60)
    ret_accel_60 = slope60 - slope60_prev

    # vol slopes
    slope_vol60_over20  = vol60.rolling(WIN_20,WIN_20).apply(_ols_slope, raw=True)
    slope_vol252_over60 = vol252.rolling(WIN_60,WIN_60).apply(_ols_slope, raw=True)

    # dollar volume long windows & correlation
    dv_sma_252 = dv.rolling(WIN_252,WIN_252).mean()
    dv_sma_60  = dv.rolling(WIN_60,WIN_60).mean()
    dv252_accel_60 = dv_sma_252.rolling(WIN_60,WIN_60).apply(_ols_slope, raw=True)
    corr_px_dv_60  = close.rolling(WIN_60,WIN_60).corr(dv)

    ema5_of_vol15 = vol15.ewm(span=5, adjust=False).mean()

    mdd_750     = close.rolling(WIN_750,2).apply(_max_dd_only, raw=True)
    mdd_dur_750 = close.rolling(WIN_750,2).apply(_max_dd_dur_only, raw=True)

    out = pd.DataFrame({
        "dt": g["dt"].values,
        "ticker_id": g["ticker_id"].values,
        "open": open_.values,
        "high": high.values,
        "low": low.values,
        "adj_close": close.values,
        "volume": vol.values,

        "log_returns": lr_s.values,
        "volatility_20d": vol20_ann.values,
        "volatility_100d": vol100_ann.values,
        "mean_return_100d": mean100.values,
        "moving_avg_20d": ma20.values,
        "moving_avg_50d": ma50.values,
        "moving_avg_100d": ma100.values,
        "moving_avg_200d": ma200.values,
        "vwap_20d": vwap20.values,
        "vol_accel_5d": vol_accel_5.values,
        "vol_accel_10d": vol_accel_10.values,
        "abn_vol_60d": abn_vol_60d.values,
        "5_day_range_pos": five_day_range_pos.values,
        "60_10_highlowrange_zscore": z_60_10_rng.values,
        "5_day_ret": ret5,
        "10_day_ret": ret10,
        "20_day_ret": ret20,
        "40_day_ret": ret40,
        "60_day_ret": ret60,
        "200_day_ret": ret200,
        "300_day_ret": ret300,
        "median_return_100d": med100.values,
        "drawdown_percent": dd_pct_100,
        "drawdown_duration_days": dd_dur_100,

        "log_prices": lp.values,
        "change_10dayret": change10.values,
        "slope_over60_of_logprice": slope60.values,
        "prior_slope_over60_of_logprice": slope60_prev.values,
        "60d_return_accel": ret_accel_60.values,

        "750d_drawdown": mdd_750.values,
        "750d_drawdownduration": mdd_dur_750.values,

        "15d_downsidedeviation": dd15.values,
        "60d_downsidedeviation": dd60.values,
        "252d_downsidedeviation": dd252.values,

        "15d_upsidevolatility": ud15.values,
        "60d_upsidevolatility": ud60.values,
        "252d_upsidevolatility": ud252.values,

        "5d_volatility": vol5.values,
        "15d_volatility": vol15.values,
        "60d_volatility": vol60.values,
        "252d_volatility": vol252.values,

        "20d_parkinson_HL_volatility": pk20.values,
        "5d_EMA_15dayvolatility": ema5_of_vol15.values,

        "slope_over20_of_60d_volatility": slope_vol60_over20.values,
        "slope_over60_of_252d_volatility": slope_vol252_over60.values,

        "252d_dollar_volume_SMA": dv_sma_252.values,
        "60d_dollar_volume_SMA":  dv_sma_60.values,
        "252d_dollar_volume_accel": dv252_accel_60.values,
        "60d_price_dollarVolume_correlation": corr_px_dv_60.values,
    })

    # clean structural NaNs
    out.replace([np.inf,-np.inf], np.nan, inplace=True)
    out[["5_day_range_pos","60_10_highlowrange_zscore","drawdown_percent","drawdown_duration_days"]] = \
        out[["5_day_range_pos","60_10_highlowrange_zscore","drawdown_percent","drawdown_duration_days"]].fillna(0.0)

    return out

hist["ticker_id"] = hist["ticker"].map(ticker_map).astype(int)
metrics_all = (
    pd.concat([compute_metrics(g) for _,g in hist.groupby("ticker_id", sort=False)], ignore_index=True)
    if not hist.empty else pd.DataFrame()
)

# ================== 4) MAP METRIC CODES -> IDs & UPSERT fact_metric_daily ==================
con.execute("""
CREATE TABLE IF NOT EXISTS fact_metric_daily (
  metric_id INTEGER NOT NULL,
  dt        DATE    NOT NULL,
  ticker_id INTEGER NOT NULL,
  value     DOUBLE,
  PRIMARY KEY (metric_id, dt, ticker_id)
);
""")

dim_metric = con.execute("SELECT metric_id, metric_code FROM dim_metric").df()
metric_map = dict(zip(dim_metric["metric_code"], dim_metric["metric_id"]))

PRICE_COLS = {"open","high","low","close","adj_close","volume"}
NON_METRIC = {"ticker_id","dt"} | PRICE_COLS
metric_cols = [c for c in metrics_all.columns if c not in NON_METRIC]

# rename metric_code -> metric_id where available
rename_dict = {c: metric_map[c] for c in metric_cols if c in metric_map}
metrics_for_unpivot = metrics_all.rename(columns=rename_dict).copy()

# any columns not present in dim_metric are dropped from write (but kept in memory if needed)
kept_cols = [c for c in metrics_for_unpivot.columns if (c in NON_METRIC) or isinstance(c, int)]
metrics_for_unpivot = metrics_for_unpivot[kept_cols]

# long format
long_metrics = metrics_for_unpivot.melt(
    id_vars=["ticker_id","dt"],
    value_vars=[c for c in metrics_for_unpivot.columns if isinstance(c, int)],
    var_name="metric_id",
    value_name="value"
).dropna(subset=["value"])

con.register("df_metrics_long", long_metrics)
con.execute("""
BEGIN TRANSACTION;
MERGE INTO fact_metric_daily t
USING df_metrics_long s
ON t.metric_id = s.metric_id AND t.dt = s.dt AND t.ticker_id = s.ticker_id
WHEN MATCHED THEN UPDATE SET value = s.value
WHEN NOT MATCHED THEN INSERT (metric_id, dt, ticker_id, value)
VALUES (s.metric_id, s.dt, s.ticker_id, s.value);
COMMIT;
""")

# ================== 5) REFRESH snapshot_metric_latest FOR JUST THESE TICKERS ==================
con.execute("""
CREATE TABLE IF NOT EXISTS snapshot_metric_latest (
  ticker_id INTEGER NOT NULL,
  metric_id INTEGER NOT NULL,
  dt        DATE    NOT NULL,
  value     DOUBLE,
  PRIMARY KEY (ticker_id, metric_id)
);
""")

# limit refresh to the affected ticker_ids for efficiency
id_list_sql = ",".join(map(str, sorted(set(hist["ticker_id"]))))

con.execute(f"""
CREATE OR REPLACE TEMP TABLE _latest_rows AS
WITH mx AS (
  SELECT ticker_id, metric_id, MAX(dt) AS dt
  FROM fact_metric_daily
  WHERE ticker_id IN ({id_list_sql})
  GROUP BY 1,2
)
SELECT f.ticker_id, f.metric_id, CAST(f.dt AS DATE) AS dt, f.value
FROM fact_metric_daily f
JOIN mx
  ON f.ticker_id = mx.ticker_id
 AND f.metric_id = mx.metric_id
 AND f.dt = mx.dt;
""")

con.execute("""
BEGIN TRANSACTION;
MERGE INTO snapshot_metric_latest t
USING _latest_rows s
ON t.ticker_id = s.ticker_id AND t.metric_id = s.metric_id
WHEN MATCHED AND (t.dt <> s.dt OR (t.value IS DISTINCT FROM s.value)) THEN
  UPDATE SET dt = s.dt, value = s.value
WHEN NOT MATCHED THEN
  INSERT (ticker_id, metric_id, dt, value)
  VALUES (s.ticker_id, s.metric_id, s.dt, s.value);
COMMIT;
""")

# Optional prune (only for these tickers)
con.execute(f"""
DELETE FROM snapshot_metric_latest t
WHERE t.ticker_id IN ({id_list_sql})
  AND NOT EXISTS (
    SELECT 1 FROM (
      SELECT DISTINCT ticker_id, metric_id FROM fact_metric_daily WHERE ticker_id IN ({id_list_sql})
    ) k
    WHERE k.ticker_id = t.ticker_id AND k.metric_id = t.metric_id
  );
""")

# ================== DONE ==================
# Quick summary
n_new = len(to_insert) if 'to_insert' in locals() else 0
print(f"New tickers inserted into dim_ticker: {n_new}")
print("Price rows upserted:", len(df_prices))
print("Metric rows upserted:", len(long_metrics))
print("Snapshot rows now:", con.execute("SELECT COUNT(*) FROM snapshot_metric_latest").fetchone()[0])

# (Optional) peek a couple of rows for one new ticker
sample_id = next(iter(sorted(set(hist["ticker_id"])))) if not hist.empty else None
if sample_id is not None:
    print("\nSample prices:", con.execute(f"""
      SELECT * FROM fact_price_daily WHERE ticker_id = {sample_id} ORDER BY dt DESC LIMIT 5
    """).df())
    print("\nSample metrics:", con.execute(f"""
      SELECT * FROM fact_metric_daily WHERE ticker_id = {sample_id} ORDER BY dt DESC LIMIT 5
    """).df())

con.close()


New tickers inserted into dim_ticker: 7
Price rows upserted: 12259
Metric rows upserted: 583907
Snapshot rows now: 25994

Sample prices:    ticker_id         dt   open   high    low  close  adj_close   volume
0        515 2025-10-07  26.39  26.50  25.18  25.25      25.25  1904828
1        515 2025-10-06  26.00  26.58  25.85  26.27      26.27  3511791
2        515 2025-10-03  25.35  26.35  25.27  25.95      25.95  3006900
3        515 2025-10-02  25.02  25.28  24.00  25.15      25.15  2401941
4        515 2025-10-01  23.61  25.24  23.56  25.16      25.16  1894913

Sample metrics:    ticker_id         dt  metric_id         value
0        515 2025-10-07          8  2.407260e-02
1        515 2025-10-07         28 -5.080838e-01
2        515 2025-10-07         22  4.134547e+07
3        515 2025-10-07         26  1.565875e-02
4        515 2025-10-07         19  3.379948e-02
