In [1]:
# %%
"""
Build Supabase-ready tables by joining DARKO pipeline parquet outputs.

Produces:
  1) players         — dimension, one row per player
  2) player_ratings  — fact, one row per player per date

Type conventions:
  nba_id: Int64, date: Date, season: Int32, tm_id: Int32

Uses lazy scans + semi-joins so we never eagerly load the 15.7M-row RAPM table.
"""

import polars as pl
from pathlib import Path


def find_project_root(start: Path) -> Path:
    for p in [start, *start.parents]:
        if (p / "fixed_data").exists() and (p / "calculated_data").exists():
            return p
    raise FileNotFoundError(
        f"Cannot find DARKO project root (fixed_data/ + calculated_data/). start={start}"
    )


# %%
# === CONFIG ===
ROOT = find_project_root(Path.cwd())
CALCULATED_DATA = ROOT / "calculated_data"
TEMP_DIR = CALCULATED_DATA / "temp"
OUTPUT_DIR = ROOT / "supabase_tables"
OUTPUT_DIR.mkdir(exist_ok=True)

# %%
# === LAZY SCANS WITH TYPE CASTS ===

# Base table — defines the grain (1,089,331 rows)
spm = (
    pl.scan_parquet(TEMP_DIR / "spm_outputs.parq")
    .with_columns(
        pl.col("nba_id").cast(pl.Int64, strict=False),
        pl.col("date").cast(pl.Date),
        pl.col("season").cast(pl.Int32, strict=False),
        pl.col("tm_id").cast(pl.Int32, strict=False),
        pl.col("future_game").cast(pl.Int32, strict=False),
        pl.col("active_roster").cast(pl.Int8, strict=False),
    )
)

# Bio columns from assembled features (column-projected, not full 500+ col load)
bio = (
    pl.scan_parquet(CALCULATED_DATA / "5_assembled_features.parq")
    .select("nba_id", "date", "age", "career_game_num", "seconds_played",
            "position", "position_num", "x_position")
    .with_columns(
        pl.col("nba_id").cast(pl.Int64, strict=False),
        pl.col("date").cast(pl.Date),
        pl.col("career_game_num").cast(pl.Int32, strict=False),
        pl.col("seconds_played").cast(pl.Int32, strict=False),
    )
)

# RAPM (15.7M rows — filtered via semi-join before collecting)
rapm = (
    pl.scan_parquet(CALCULATED_DATA / "bayes_rapm_ratings.parq")
    .select(
        pl.col("nba_id").cast(pl.Int64, strict=False),
        pl.col("date").cast(pl.Date),
        pl.col("offense").alias("bayes_rapm_off"),
        pl.col("defense").alias("bayes_rapm_def"),
        pl.col("total").alias("bayes_rapm_total"),
        pl.col("exposure").alias("rapm_exposure"),
    )
)

# Projections (1,089,331 rows)
projections = (
    pl.scan_parquet(CALCULATED_DATA / "talent_game_predictions.parq")
    .with_columns(
        pl.col("nba_id").cast(pl.Int64, strict=False),
        pl.col("date").cast(pl.Date),
    )
    .select(
        "nba_id", "date",
        "x_minutes", "x_pace",
        "x_pts_100", "x_ast_100", "x_orb_100", "x_drb_100",
        "x_stl_100", "x_blk_100", "x_tov_100",
        "x_fga_100", "x_fg3a_100", "x_fta_100",
        "x_fg_pct", "x_fg3_pct", "x_ft_pct",
        "tr_minutes", "tr_starter", "tr_fg3_pct", "tr_ft_pct",
    )
)

# Survivorship (1,084,268 rows, nba_id is Float64 in source)
surv_rename = {str(t): f"s{t}" for t in range(1, 16)}
survivorship = (
    pl.scan_parquet(TEMP_DIR / "nba_survivorship.parq")
    .with_columns(
        pl.col("nba_id").cast(pl.Int64, strict=False),
        pl.col("date").cast(pl.Date),
    )
    .select(
        "nba_id", "date",
        "projected_years_remaining", "projected_years_remaining_cal",
        "x_retirement_age", "x_retirement_age_cal",
        *[str(t) for t in range(1, 16)],
    )
    .rename(surv_rename)
)

# Crosswalk for dimension table
crosswalk = (
    pl.scan_csv(
        ROOT / "fixed_data" / "crosswalks" / "player_master_crosswalk.csv",
        infer_schema_length=0,
    )
    .select("nba_id", "player_name", "height", "weight", "dob",
            "draft_year", "draft_slot", "position", "country")
    .with_columns(
        pl.col("nba_id").cast(pl.Int64, strict=False),
        pl.col("height").cast(pl.Float64, strict=False),
        pl.col("weight").cast(pl.Float64, strict=False),
        pl.col("draft_year").cast(pl.Int32, strict=False),
        pl.col("draft_slot").cast(pl.Int32, strict=False),
    )
)

# %%
# =============================================================================
# TABLE 1: players (dimension)
# =============================================================================

# Deterministic "latest": max(date) per nba_id, then join back for fields
spm_dim = spm.select("nba_id", "date", "team_name", "active_roster", "season", "player_name")

max_dates = spm_dim.group_by("nba_id").agg(pl.max("date").alias("max_date"))

latest = (
    max_dates
    .join(
        spm_dim.select("nba_id", "date", "team_name", "active_roster", "season"),
        left_on=["nba_id", "max_date"],
        right_on=["nba_id", "date"],
        how="left",
    )
    .select(
        "nba_id",
        pl.col("team_name").alias("current_team"),
        "active_roster",
        "season",
    )
    .unique(subset="nba_id")
)

# Rookie season from survivorship (single scan, check schema first)
surv_raw = pl.scan_parquet(TEMP_DIR / "nba_survivorship.parq").with_columns(
    pl.col("nba_id").cast(pl.Int64, strict=False),
)
has_rookie = "rookie_season" in surv_raw.collect_schema().names()
rookie = (
    surv_raw.select("nba_id", "rookie_season").unique(subset="nba_id")
    if has_rookie else None
)

# Player name fallback: crosswalk may not have everyone, spm does
name_from_spm = (
    spm_dim
    .group_by("nba_id")
    .agg(pl.first("player_name").alias("player_name_spm"))
)

# Union all nba_ids from fact + crosswalk so dimension covers everyone
all_ids = (
    pl.concat([spm.select("nba_id"), crosswalk.select("nba_id")], how="vertical")
    .unique(subset="nba_id")
)

players_lf = (
    all_ids
    .join(crosswalk.unique(subset="nba_id"), on="nba_id", how="left")
    .join(name_from_spm, on="nba_id", how="left")
    .join(latest, on="nba_id", how="left")
)

if rookie is not None:
    players_lf = players_lf.join(rookie, on="nba_id", how="left")

# Use crosswalk name, fall back to spm name
players_lf = (
    players_lf
    .with_columns(
        pl.coalesce([pl.col("player_name"), pl.col("player_name_spm")]).alias("player_name")
    )
    .drop("player_name_spm")
)

players = players_lf.collect()

print(f"players: {players.shape[0]} rows, {players.shape[1]} cols")
print(f"  columns: {players.columns}")

# %%
# =============================================================================
# TABLE 2: player_ratings (fact)
# =============================================================================

df_base = spm.select(
    "nba_id", "date", "season", "team_name", "tm_id",
    "future_game", "active_roster", "available", "poss",
    "dpm", "o_dpm", "d_dpm",
    "box_dpm", "box_odpm", "box_ddpm",
    "on_off_dpm", "on_off_odpm", "on_off_ddpm",
)

# Semi-join keys to filter large right-side tables before collecting
keys = df_base.select("nba_id", "date").unique()

bio_f = bio.join(keys, on=["nba_id", "date"], how="semi")
rapm_f = rapm.join(keys, on=["nba_id", "date"], how="semi")
proj_f = projections.join(keys, on=["nba_id", "date"], how="semi")
surv_f = survivorship.join(keys, on=["nba_id", "date"], how="semi")

df_lf = (
    df_base
    .join(bio_f, on=["nba_id", "date"], how="left")
    .join(rapm_f, on=["nba_id", "date"], how="left")
    .join(proj_f, on=["nba_id", "date"], how="left")
    .join(surv_f, on=["nba_id", "date"], how="left")
)

df = df_lf.collect()

# %%
# === VALIDATION ===

dupes = df.select("nba_id", "date").is_duplicated().sum()
assert dupes == 0, f"Duplicate (nba_id, date) rows: {dupes}"

base_rows = spm.select(pl.len()).collect().item()
assert df.shape[0] == base_rows, (
    f"Row count changed: base={base_rows}, merged={df.shape[0]}. "
    "A right-side table has duplicates on (nba_id, date)."
)

def _nn(col: str) -> int:
    return int(df.select(pl.col(col).is_not_null().sum()).item())

print(f"\nplayer_ratings: {df.shape[0]:,} rows, {df.shape[1]} cols")
print(f"  columns: {df.columns}")
print(f"  date range: {df['date'].min()} to {df['date'].max()}")
print(f"  unique players: {df['nba_id'].n_unique()}")
print(f"  coverage — age: {_nn('age'):,}, rapm: {_nn('bayes_rapm_total'):,}, "
      f"projections: {_nn('x_pts_100'):,}, survivorship: {_nn('projected_years_remaining'):,}")

# %%
# === SAVE ===

players.write_parquet(OUTPUT_DIR / "players.parq", compression="zstd")
df.write_parquet(OUTPUT_DIR / "player_ratings.parq", compression="zstd")

print(f"\nSaved to {OUTPUT_DIR}/")
print(f"  players.parq: {(OUTPUT_DIR / 'players.parq').stat().st_size / 1e6:.1f} MB")
print(f"  player_ratings.parq: {(OUTPUT_DIR / 'player_ratings.parq').stat().st_size / 1e6:.1f} MB")

players: 5347 rows, 13 cols
  columns: ['nba_id', 'player_name', 'height', 'weight', 'dob', 'draft_year', 'draft_slot', 'position', 'country', 'current_team', 'active_roster', 'season', 'rookie_season']

player_ratings: 1,089,331 rows, 66 cols
  columns: ['nba_id', 'date', 'season', 'team_name', 'tm_id', 'future_game', 'active_roster', 'available', 'poss', 'dpm', 'o_dpm', 'd_dpm', 'box_dpm', 'box_odpm', 'box_ddpm', 'on_off_dpm', 'on_off_odpm', 'on_off_ddpm', 'age', 'career_game_num', 'seconds_played', 'position', 'position_num', 'x_position', 'bayes_rapm_off', 'bayes_rapm_def', 'bayes_rapm_total', 'rapm_exposure', 'x_minutes', 'x_pace', 'x_pts_100', 'x_ast_100', 'x_orb_100', 'x_drb_100', 'x_stl_100', 'x_blk_100', 'x_tov_100', 'x_fga_100', 'x_fg3a_100', 'x_fta_100', 'x_fg_pct', 'x_fg3_pct', 'x_ft_pct', 'tr_minutes', 'tr_starter', 'tr_fg3_pct', 'tr_ft_pct', 'projected_years_remaining', 'projected_years_remaining_cal', 'x_retirement_age', 'x_retirement_age_cal', 's1', 's2', 's3', 's4', 's

In [2]:
# %%
"""
Upload DARKO tables to Supabase (Postgres) using COPY.

Auto-detects fresh vs update per table:
  - Table missing  → CREATE + bulk load + indexes
  - Table exists:
      players:          TRUNCATE + reload
      player_ratings:   DELETE current season + insert (single transaction)
      season_sim:       TRUNCATE + reload
      win_distribution: TRUNCATE + reload

Connection: set SUPABASE_PG_DSN env var, e.g.:
  export SUPABASE_PG_DSN="host=... port=5432 dbname=postgres user=... password=..."

Prereqs: run build_supabase_tables.py first.
"""

import io
import os
import json
from pathlib import Path

import polars as pl
import psycopg2
from psycopg2 import sql as pgsql
from tqdm import tqdm

SCHEMA = "public"


def find_project_root(start: Path) -> Path:
    for p in [start, *start.parents]:
        if (p / "fixed_data").exists() and (p / "calculated_data").exists():
            return p
    raise FileNotFoundError(
        f"Cannot find DARKO project root (fixed_data/ + calculated_data/). start={start}"
    )


# %%
# === CONFIG ===
ROOT = find_project_root(Path.cwd())
SUPABASE_TABLES = ROOT / "supabase_tables"
CALCULATED_DATA = ROOT / "calculated_data"

DSN = os.getenv("SUPABASE_PG_DSN")
if not DSN:
    secrets_path = ROOT / "fixed_data" / "supabase_secret.json"
    with open(secrets_path) as f:
        _sec = json.load(f)
    DSN = f"host={_sec['host']} port={_sec['port']} dbname={_sec['dbname']} user={_sec['user']} password={_sec['password']}"


# %%
# === HELPERS ===

PG_TYPE_MAP = {
    pl.Int8: "SMALLINT",
    pl.Int16: "SMALLINT",
    pl.Int32: "INTEGER",
    pl.Int64: "BIGINT",
    pl.Float32: "REAL",
    pl.Float64: "DOUBLE PRECISION",
    pl.Boolean: "BOOLEAN",
    pl.Utf8: "TEXT",
    pl.Categorical: "TEXT",
    pl.Date: "DATE",
}


def get_conn():
    conn = psycopg2.connect(DSN)
    conn.autocommit = False
    return conn


def table_exists(conn, table_name: str) -> bool:
    with conn.cursor() as cur:
        cur.execute(
            "SELECT EXISTS (SELECT 1 FROM information_schema.tables "
            "WHERE table_schema = %s AND table_name = %s)",
            (SCHEMA, table_name),
        )
        return cur.fetchone()[0]


def create_table(conn, df: pl.DataFrame, table_name: str):
    """CREATE TABLE if it doesn't exist. Does NOT drop existing tables."""
    col_defs = []
    for name, dtype in df.schema.items():
        if dtype in PG_TYPE_MAP:
            pg_type = PG_TYPE_MAP[dtype]
        elif isinstance(dtype, pl.Datetime):
            pg_type = "TIMESTAMP"
        else:
            pg_type = "TEXT"
        col_defs.append(
            pgsql.SQL("{} {}").format(pgsql.Identifier(name), pgsql.SQL(pg_type))
        )

    stmt = pgsql.SQL("CREATE TABLE {}.{} ({})").format(
        pgsql.Identifier(SCHEMA),
        pgsql.Identifier(table_name),
        pgsql.SQL(", ").join(col_defs),
    )
    with conn.cursor() as cur:
        cur.execute(stmt)


def copy_df(conn, df: pl.DataFrame, table_name: str, chunksize: int = 50_000):
    """Bulk upload polars DataFrame via COPY FROM STDIN (CSV)."""
    cols_sql = pgsql.SQL(", ").join(pgsql.Identifier(c) for c in df.columns)
    copy_stmt = pgsql.SQL(
        "COPY {}.{} ({}) FROM STDIN WITH (FORMAT csv, NULL '\\N')"
    ).format(pgsql.Identifier(SCHEMA), pgsql.Identifier(table_name), cols_sql)
    copy_str = copy_stmt.as_string(conn)

    n = df.shape[0]
    n_chunks = (n + chunksize - 1) // chunksize

    with conn.cursor() as cur:
        for i in tqdm(range(0, n, chunksize), total=n_chunks, desc=f"  COPY {table_name}"):
            chunk = df.slice(i, chunksize)
            buf = io.BytesIO()
            chunk.write_csv(buf, include_header=False, null_value="\\N")
            buf.seek(0)
            cur.copy_expert(copy_str, buf)


def execute(conn, stmt: str, params=None):
    with conn.cursor() as cur:
        cur.execute(stmt, params)
        return cur.rowcount


def qualified(table_name: str) -> str:
    return f"{SCHEMA}.{table_name}"


# %%
# === TEST CONNECTION ===
with get_conn() as conn:
    execute(conn, "SELECT 1")
    conn.commit()
print("Connected!")

# %%
# =============================================================================
# players
# =============================================================================
print("\n=== players ===")

players = pl.read_parquet(SUPABASE_TABLES / "players.parq")

with get_conn() as conn:
    if not table_exists(conn, "players"):
        print("  Creating fresh")
        create_table(conn, players, "players")
        copy_df(conn, players, "players")
        execute(conn, f"ALTER TABLE {qualified('players')} ADD PRIMARY KEY (nba_id)")
    else:
        print("  Truncate + reload")
        execute(conn, f"TRUNCATE TABLE {qualified('players')}")
        copy_df(conn, players, "players")
    conn.commit()

print(f"  {players.shape[0]} rows")

# %%
# =============================================================================
# player_ratings
# =============================================================================
print("\n=== player_ratings ===")

df = pl.read_parquet(SUPABASE_TABLES / "player_ratings.parq")
print(f"  Source: {df.shape[0]:,} rows, {df.shape[1]} cols")

with get_conn() as conn:
    if not table_exists(conn, "player_ratings"):
        print("  Creating fresh")
        create_table(conn, df, "player_ratings")
        copy_df(conn, df, "player_ratings")
        execute(conn, f"ALTER TABLE {qualified('player_ratings')} "
                       "ADD CONSTRAINT pk_player_ratings PRIMARY KEY (nba_id, date)")
        execute(conn, f"CREATE INDEX idx_ratings_date ON {qualified('player_ratings')} (date DESC)")
        execute(conn, f"CREATE INDEX idx_ratings_season ON {qualified('player_ratings')} (season)")
        execute(conn, f"CREATE INDEX idx_ratings_nba_id ON {qualified('player_ratings')} (nba_id)")
        conn.commit()
        print(f"  {df.shape[0]:,} rows loaded, indexes created")
    else:
        # Atomic: delete current season + insert in one transaction
        current_season = int(df["season"].max())
        current_season_df = df.filter(pl.col("season") == current_season)

        deleted = execute(conn, f"DELETE FROM {qualified('player_ratings')} WHERE season = %s",
                          (current_season,))
        print(f"  Deleted {deleted:,} rows for season {current_season}")

        copy_df(conn, current_season_df, "player_ratings")
        conn.commit()
        print(f"  Inserted {current_season_df.shape[0]:,} rows (atomic)")

# %%
# =============================================================================
# season_sim
# =============================================================================
print("\n=== season_sim ===")

season_sim = pl.read_csv(CALCULATED_DATA / "season_sim.csv")

with get_conn() as conn:
    if not table_exists(conn, "season_sim"):
        print("  Creating fresh")
        create_table(conn, season_sim, "season_sim")
    else:
        print("  Truncate + reload")
        execute(conn, f"TRUNCATE TABLE {qualified('season_sim')}")
    copy_df(conn, season_sim, "season_sim")
    conn.commit()

print(f"  {season_sim.shape[0]} rows")

# %%
# =============================================================================
# win_distribution
# =============================================================================
print("\n=== win_distribution ===")

win_dist = pl.read_parquet(CALCULATED_DATA / "win_distribution.parq")

with get_conn() as conn:
    if not table_exists(conn, "win_distribution"):
        print("  Creating fresh")
        create_table(conn, win_dist, "win_distribution")
    else:
        print("  Truncate + reload")
        execute(conn, f"TRUNCATE TABLE {qualified('win_distribution')}")
    copy_df(conn, win_dist, "win_distribution")
    conn.commit()

print(f"  {win_dist.shape[0]} rows")

# %%
# =============================================================================
# VERIFY
# =============================================================================
print("\n=== Verification ===")

with get_conn() as conn:
    with conn.cursor() as cur:
        for tbl in ["players", "player_ratings", "season_sim", "win_distribution"]:
            cur.execute(f"SELECT COUNT(*) FROM {qualified(tbl)}")
            print(f"  {tbl}: {cur.fetchone()[0]:,} rows")

        cur.execute(f"SELECT MAX(date) FROM {qualified('player_ratings')}")
        print(f"  Latest date: {cur.fetchone()[0]}")

        cur.execute(f"""
            SELECT pr.nba_id, p.player_name, pr.team_name, pr.dpm, pr.o_dpm, pr.d_dpm
            FROM {qualified('player_ratings')} pr
            JOIN {qualified('players')} p ON pr.nba_id = p.nba_id
            WHERE pr.date = (SELECT MAX(date) FROM {qualified('player_ratings')})
              AND pr.active_roster = 1
            ORDER BY pr.dpm DESC
            LIMIT 5
        """)
        print("\n  Top 5 DPM:")
        for row in cur.fetchall():
            print(f"    {row[1]:25s} {row[2]:5s} DPM: {row[3]:+.2f} (O: {row[4]:+.2f}, D: {row[5]:+.2f})")

    conn.commit()

print("\nDone!")

Connected!

=== players ===
  Truncate + reload


  COPY players: 100%|██████████| 1/1 [00:00<00:00,  1.68it/s]


  5347 rows

=== player_ratings ===
  Source: 1,089,331 rows, 66 cols
  Deleted 31,661 rows for season 2026


  COPY player_ratings: 100%|██████████| 1/1 [00:07<00:00,  7.05s/it]


  Inserted 31,661 rows (atomic)

=== season_sim ===
  Truncate + reload


  COPY season_sim: 100%|██████████| 1/1 [00:00<00:00,  5.83it/s]


  30 rows

=== win_distribution ===
  Truncate + reload


  COPY win_distribution: 100%|██████████| 1/1 [00:00<00:00,  3.94it/s]


  525 rows

=== Verification ===
  players: 5,347 rows
  player_ratings: 1,089,331 rows
  season_sim: 30 rows
  win_distribution: 525 rows
  Latest date: 2026-03-03 00:00:00

  Top 5 DPM:
    Devin Booker              Phoenix Suns DPM: +2.30 (O: +3.11, D: -0.82)
    Collin Gillespie          Phoenix Suns DPM: +1.46 (O: +0.84, D: +0.62)
    Grayson Allen             Phoenix Suns DPM: +0.46 (O: +1.12, D: -0.66)
    Oso Ighodaro              Phoenix Suns DPM: +0.22 (O: -1.67, D: +1.89)
    Jordan Goodwin            Phoenix Suns DPM: +0.09 (O: -0.57, D: +0.66)

Done!
