<a href="https://colab.research.google.com/github/kbsmd-sportsmusicdata/Basketball-Analytics-Course/blob/main/GetTotals_pbpstats.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from __future__ import annotations

import json
from pathlib import Path
from datetime import datetime, timezone
import requests

def fetch_pbpstats_totals_wnba(
    seasons: str = "2023,2024,2025",
    season_type: str = "Regular Season",
    data_type: str = "Player",
    timeout: int = 60
) -> dict:
    url = "https://api.pbpstats.com/get-totals/wnba"
    params = {
        "Season": seasons,
        "SeasonType": season_type,
        "Type": data_type
    }
    r = requests.get(url, params=params, timeout=timeout)
    r.raise_for_status()
    return r.json()

def save_raw_json(payload: dict, output_dir: str | Path) -> tuple[Path, Path]:
    """
    Saves:
      1) timestamped file (immutable snapshot)
      2) latest.json (overwritten each run)
    Returns (snapshot_path, latest_path)
    """
    output_dir = Path(output_dir).expanduser().resolve()
    output_dir.mkdir(parents=True, exist_ok=True)

    ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%SZ")
    snapshot_path = output_dir / f"pbpstats_totals_wnba_player_{ts}.json"
    latest_path = output_dir / "pbpstats_totals_wnba_player_latest.json"

    # write snapshot
    snapshot_path.write_text(
        json.dumps(payload, ensure_ascii=False, indent=2),
        encoding="utf-8"
    )

    # write latest
    latest_path.write_text(
        json.dumps(payload, ensure_ascii=False, indent=2),
        encoding="utf-8"
    )

    return snapshot_path, latest_path

if __name__ == "__main__":
    payload = fetch_pbpstats_totals_wnba(
        seasons="2023,2024,2025",
        season_type="Regular Season",
        data_type="Player"
    )

    # ✅ Option A: if you run this *from within the repo*, use a relative path:
    # repo_relative_dir = Path("scripts/wnba_scripts/pbpstats/raw_json_data")

    # ✅ Option B: hard-point to your local repo clone (edit this once):
    repo_relative_dir = Path("/content/Basketball-Analytics-Course/scripts/wnba_scripts/pbpstats/raw_json_data")

    snap, latest = save_raw_json(payload, repo_relative_dir)

    # Access the table data if you want it after saving:
    player_stats = payload.get("multi_row_table_data", [])
    print(f"Saved raw JSON snapshot: {snap}")
    print(f"Saved raw JSON latest:   {latest}")
    print(f"multi_row_table_data rows: {len(player_stats)}")


Saved raw JSON snapshot: /content/Basketball-Analytics-Course/scripts/wnba_scripts/pbpstats/raw_json_data/pbpstats_totals_wnba_player_20251225_194555Z.json
Saved raw JSON latest:   /content/Basketball-Analytics-Course/scripts/wnba_scripts/pbpstats/raw_json_data/pbpstats_totals_wnba_player_latest.json
multi_row_table_data rows: 257


In [None]:
import json
from pathlib import Path
import pandas as pd

RAW_JSON_PATH = Path("/content/Basketball-Analytics-Course/scripts/wnba_scripts/pbpstats/raw_json_data/pbpstats_totals_wnba_player_latest.json")
OUT_DIR = Path("/content/Basketball-Analytics-Course/scripts/wnba_scripts/pbpstats/processed")
OUT_DIR.mkdir(parents=True, exist_ok=True)

payload = json.loads(RAW_JSON_PATH.read_text(encoding="utf-8"))

# Core table
rows = payload.get("multi_row_table_data", [])
df = pd.DataFrame(rows)

print("✅ df shape:", df.shape)
print("✅ first 25 columns:", df.columns[:25].tolist())
display(df.head(10))

# Save for inspection
out_csv = OUT_DIR / "pbpstats_totals_player_raw_from_json.csv"
df.to_csv(out_csv, index=False)
print("✅ wrote:", out_csv)


✅ df shape: (257, 249)
✅ first 25 columns: ['EntityId', 'TeamId', 'Name', 'ShortName', 'RowId', 'TeamAbbreviation', 'SecondsPlayed', 'GamesPlayed', 'Minutes', 'PlusMinus', 'OffPoss', 'DefPoss', 'PenaltyOffPoss', 'PenaltyDefPoss', 'SecondChanceOffPoss', 'TotalPoss', 'AtRimFGM', 'AtRimFGA', 'SecondChanceAtRimFGM', 'SecondChanceAtRimFGA', 'PenaltyAtRimFGM', 'PenaltyAtRimFGA', 'ShortMidRangeFGM', 'ShortMidRangeFGA', 'LongMidRangeFGM']


Unnamed: 0,EntityId,TeamId,Name,ShortName,RowId,TeamAbbreviation,SecondsPlayed,GamesPlayed,Minutes,PlusMinus,...,BlockedLongMidRange,Defensive 3 Seconds Violations,Period2Fouls3Minutes,Clear Path Fouls,3SecondViolations,Period3Fouls5Minutes,OffensiveGoaltends,Period1Fouls3Minutes,DefensiveGoaltends,Period2Fouls4Minutes
0,1628276,1611661320,Kelsey Plum,Kelsey Plum,1628276,LAS,241920.0,119,4032.0,687.0,...,,,,,,,,,,
1,1628277,1611661330,Allisha Gray,Allisha Gray,1628277,ATL,239667.0,120,3994.0,207.0,...,5.0,1.0,7.0,,,,,,,
2,1628909,1611661325,Kelsey Mitchell,Kelsey Mitchell,1628909,IND,238787.0,124,3980.0,-12.0,...,3.0,1.0,,1.0,,,,,,
3,203826,1611661317,Alyssa Thomas,Alyssa Thomas,203826,PHX,236590.9,119,3943.0,610.0,...,4.0,2.0,11.0,,2.0,,,,,
4,1629481,1611661321,Arike Ogunbowale,Arike Ogunbowale,1629481,DAL,233447.0,107,3891.0,-219.0,...,5.0,,5.0,1.0,,1.0,,,,
5,204319,1611661319,Jewell Loyd,Jewell Loyd,204319,LVA,229481.9,119,3825.0,161.0,...,1.0,1.0,1.0,,1.0,,,,,
6,1641648,1611661325,Aliyah Boston,Aliyah Boston,1641648,IND,228116.0,124,3802.0,101.0,...,2.0,5.0,5.0,,5.0,,,,,
7,1629498,1611661319,Jackie Young,Jackie Young,1629498,LVA,225993.0,120,3767.0,891.0,...,1.0,,5.0,,1.0,,,,,
8,1628932,1611661319,A'ja Wilson,A'ja Wilson,1628932,LVA,225519.0,117,3759.0,961.0,...,11.0,1.0,,1.0,5.0,,,,,
9,204324,1611661320,Dearica Hamby,Dearica Hamby,204324,LAS,220821.4,124,3680.0,-296.0,...,1.0,4.0,1.0,,4.0,,,,,


✅ wrote: /content/Basketball-Analytics-Course/scripts/wnba_scripts/pbpstats/processed/pbpstats_totals_player_raw_from_json.csv


In [None]:
import requests
import pandas as pd

def fetch_totals_for_season(season: int, season_type="Regular Season", data_type="Player") -> pd.DataFrame:
    url = "https://api.pbpstats.com/get-totals/wnba"
    params = {"Season": str(season), "SeasonType": season_type, "Type": data_type}
    r = requests.get(url, params=params, timeout=60)
    r.raise_for_status()
    payload = r.json()
    df_season = pd.DataFrame(payload.get("multi_row_table_data", []))
    df_season["Season"] = season
    return df_season

seasons = [2023, 2024, 2025]
dfs = [fetch_totals_for_season(s) for s in seasons]
df_all = pd.concat(dfs, ignore_index=True)

print("✅ combined shape:", df_all.shape)
display(df_all.head(10))

out_csv = OUT_DIR / "pbpstats_totals_player_by_season.csv"
df_all.to_csv(out_csv, index=False)
print("✅ wrote:", out_csv)


✅ combined shape: (495, 250)


Unnamed: 0,EntityId,TeamId,Name,ShortName,RowId,TeamAbbreviation,SecondsPlayed,GamesPlayed,Minutes,PlusMinus,...,OffensiveGoaltends,PeriodOTFouls4Minutes,PeriodOTFouls5Minutes,Corner3PctBlocked,Clear Path Fouls,Period3Fouls5Minutes,Period1Fouls3Minutes,Period2Fouls4Minutes,Season,DefensiveGoaltends
0,1629481,1611661321,Arike Ogunbowale,Arike Ogunbowale,1629481,DAL,89341.0,40,1489.0,141.0,...,,,,,,,,,2023,
1,203826,1611661323,Alyssa Thomas,Alyssa Thomas,203826,CON,86738.0,40,1446.0,224.0,...,,,,,,,,,2023,
2,1627668,1611661313,Breanna Stewart,Breanna Stewart,1627668,NYL,81920.0,40,1365.0,357.0,...,1.0,1.0,9.0,0.02381,,,,,2023,
3,1628909,1611661325,Kelsey Mitchell,Kelsey Mitchell,1628909,IND,80912.0,40,1349.0,-95.0,...,,5.0,,,1.0,,,,2023,
4,204319,1611661328,Jewell Loyd,Jewell Loyd,204319,SEA,80612.9,38,1344.0,-146.0,...,,,,,,,,,2023,
5,1629496,1611661328,Ezi Magbegor,Ezi Magbegor,1629496,SEA,78232.8,40,1304.0,-120.0,...,,2.0,1.0,0.083333,,,,,2023,
6,203827,1611661321,Natasha Howard,Natasha Howard,203827,DAL,77327.0,39,1289.0,115.0,...,,1.0,1.0,,,,,,2023,
7,203833,1611661319,Chelsea Gray,Chelsea Gray,203833,LVA,77260.0,40,1288.0,436.0,...,,,,,,,,,2023,
8,1631009,1611661330,Rhyne Howard,Rhyne Howard,1631009,ATL,76972.0,39,1283.0,-13.0,...,,1.0,1.0,0.022222,1.0,,,,2023,
9,1628276,1611661319,Kelsey Plum,Kelsey Plum,1628276,LVA,75892.0,39,1265.0,554.0,...,,,,,,,,,2023,


✅ wrote: /content/processed_positions/pbpstats_totals_player_by_season.csv


In [None]:
import pandas as pd
import re
from pathlib import Path

def norm_name(s: str) -> str:
    s = str(s).strip().lower()
    s = re.sub(r"[^a-z\s\-']", "", s)
    s = re.sub(r"\s+", " ", s)
    return s

def norm_team_abbr(s: str) -> str:
    return str(s).strip().upper()

# ✅ Update this path to your position-mapping CSV (must contain Name + Position + Team_Abbreviation)
POS_MAP_PATH = Path("import pandas as pd
import re
import unicodedata
from pathlib import Path

def norm_name(s: str) -> str:
    s = "" if pd.isna(s) else str(s).strip().lower()
    # strip accents safely: Koné -> kone
    s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii")
    # keep letters, spaces, hyphen, apostrophe
    s = re.sub(r"[^a-z\s\-']", "", s)
    s = re.sub(r"\s+", " ", s)
    return s

def norm_team_abbr(s: str) -> str:
    return "" if pd.isna(s) else str(s).strip().upper()

# ---- paths ----
POS_MAP_PATH = Path("/content/Basketball-Analytics-Course/scripts/wnba_scripts/pbpstats/mapping_player_team_id_seasons.csv")

pos = pd.read_csv(POS_MAP_PATH)
pos.columns = [c.strip() for c in pos.columns]

# required mapping columns
required = {"Name", "Team_Abbreviation"}
missing = required - set(pos.columns)
if missing:
    raise ValueError(f"Mapping CSV missing required columns: {missing}. Found: {pos.columns.tolist()}")

pos_col = next((c for c in pos.columns if c.lower() in {"position", "pos"}), None)
if not pos_col:
    raise ValueError(f"Mapping CSV must include Position or Pos. Found: {pos.columns.tolist()}")

# choose totals dataframe
df_use = df_all if "df_all" in globals() and isinstance(df_all, pd.DataFrame) else df

if "TeamAbbreviation" not in df_use.columns:
    raise ValueError(f"PBP DF missing TeamAbbreviation. Found: {df_use.columns.tolist()[:50]}")

# normalize keys
df_use = df_use.copy()
df_use["Name_norm"] = df_use["Name"].apply(norm_name)
df_use["TeamAbbr_norm"] = df_use["TeamAbbreviation"].apply(norm_team_abbr)

pos = pos.copy()
pos["Name_norm"] = pos["Name"].apply(norm_name)
pos["TeamAbbr_norm"] = pos["Team_Abbreviation"].apply(norm_team_abbr)

# --- 1) Merge on Name + Team ---
pos_team = (
    pos[["Name_norm", "TeamAbbr_norm", pos_col]]
    .dropna(subset=["Name_norm"])
    .drop_duplicates(subset=["Name_norm", "TeamAbbr_norm"])
)
m1 = df_use.merge(pos_team, on=["Name_norm", "TeamAbbr_norm"], how="left").rename(columns={pos_col: "Position_team"})

# --- 2) Fallback merge on Name only (mode position per player) ---
pos_name = (
    pos.dropna(subset=["Name_norm"])
      .groupby("Name_norm")[pos_col]
      .agg(lambda s: s.mode().iloc[0] if not s.mode().empty else s.iloc[0])
      .reset_index()
      .rename(columns={pos_col: "Position_name"})
)

m2 = m1.merge(pos_name, on="Name_norm", how="left")

# combine: prefer team-specific, else name-only
m2["Position"] = m2["Position_team"].combine_first(m2["Position_name"])

print("Missing after Name+Team:", m1["Position_team"].isna().mean())
print("Missing after fallback Name-only:", m2["Position"].isna().mean())

# sanity display
display(m2[["Name", "TeamAbbreviation", "Position", "Position_team", "Position_name"]].head(20))

# show a sample of still-missing
display(
    m2.loc[m2["Position"].isna(), ["Name", "TeamAbbreviation"]]
      .drop_duplicates()
      .h
")

pos = pd.read_csv(POS_MAP_PATH)
pos.columns = [c.strip() for c in pos.columns]  # trims column name whitespace

# --- Required columns ---
required = {"Name", "Team_Abbreviation"}
missing = required - set(pos.columns)
if missing:
    raise ValueError(f"Mapping CSV missing required columns: {missing}. Found: {pos.columns.tolist()}")

# Find a Position column (flexible)
pos_col = next((c for c in pos.columns if c.strip().lower() in {"position", "pos"}), None)
if not pos_col:
    raise ValueError(
        f"Mapping CSV must include a Position column (Position or Pos). Found: {pos.columns.tolist()}"
    )

# Use df_all if it exists (by-season), else df (raw)
df_use = df_all if "df_all" in globals() and isinstance(df_all, pd.DataFrame) else df

# Confirm pbp column exists
if "TeamAbbreviation" not in df_use.columns:
    raise ValueError(f"PBP DF missing TeamAbbreviation. Found columns: {df_use.columns.tolist()[:50]}")

# Normalize join keys
df_use = df_use.copy()
df_use["Name_norm"] = df_use["Name"].apply(norm_name)
df_use["TeamAbbr_norm"] = df_use["TeamAbbreviation"].apply(norm_team_abbr)

pos = pos.copy()
pos["Name_norm"] = pos["Name"].apply(norm_name)
pos["TeamAbbr_norm"] = pos["Team_Abbreviation"].apply(norm_team_abbr)

# Merge
merged = df_use.merge(
    pos[["Name_norm", "TeamAbbr_norm", pos_col]],
    on=["Name_norm", "TeamAbbr_norm"],
    how="left"
).rename(columns={pos_col: "Position"})

print("✅ Missing Position %:", merged["Position"].isna().mean())
display(
    merged.loc[merged["Position"].isna(), ["Name", "TeamAbbreviation"]]
      .drop_duplicates()
      .head(25)
)

# Save
OUT_DIR = Path("/content/Basketball-Analytics-Course/scripts/wnba_scripts/pbpstats/processed")
OUT_DIR.mkdir(parents=True, exist_ok=True)

out_csv = OUT_DIR / "pbpstats_totals_player_with_position.csv"
merged.to_csv(out_csv, index=False)
print("✅ wrote:", out_csv)


SyntaxError: unterminated string literal (detected at line 15) (ipython-input-4052203312.py, line 15)

In [None]:
import pandas as pd
import re
import unicodedata
from pathlib import Path

def norm_name(s: str) -> str:
    s = "" if pd.isna(s) else str(s).strip().lower()
    # strip accents safely: Koné -> kone
    s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii")
    # keep letters, spaces, hyphen, apostrophe
    s = re.sub(r"[^a-z\s\-']", "", s)
    s = re.sub(r"\s+", " ", s)
    return s

def norm_team_abbr(s: str) -> str:
    return "" if pd.isna(s) else str(s).strip().upper()

# ---- paths ----
POS_MAP_PATH = Path("/content/Basketball-Analytics-Course/mapping_player_team_id_seasons.csv")

pos = pd.read_csv(POS_MAP_PATH)
pos.columns = [c.strip() for c in pos.columns]

# required mapping columns
required = {"Name", "Team_Abbreviation"}
missing = required - set(pos.columns)
if missing:
    raise ValueError(f"Mapping CSV missing required columns: {missing}. Found: {pos.columns.tolist()}")

pos_col = next((c for c in pos.columns if c.lower() in {"position", "pos"}), None)
if not pos_col:
    raise ValueError(f"Mapping CSV must include Position or Pos. Found: {pos.columns.tolist()}")

# choose totals dataframe
df_use = df_all if "df_all" in globals() and isinstance(df_all, pd.DataFrame) else df

if "TeamAbbreviation" not in df_use.columns:
    raise ValueError(f"PBP DF missing TeamAbbreviation. Found: {df_use.columns.tolist()[:50]}")

# normalize keys
df_use = df_use.copy()
df_use["Name_norm"] = df_use["Name"].apply(norm_name)
df_use["TeamAbbr_norm"] = df_use["TeamAbbreviation"].apply(norm_team_abbr)

pos = pos.copy()
pos["Name_norm"] = pos["Name"].apply(norm_name)
pos["TeamAbbr_norm"] = pos["Team_Abbreviation"].apply(norm_team_abbr)

# --- 1) Merge on Name + Team ---
pos_team = (
    pos[["Name_norm", "TeamAbbr_norm", pos_col]]
    .dropna(subset=["Name_norm"])
    .drop_duplicates(subset=["Name_norm", "TeamAbbr_norm"])
)
m1 = df_use.merge(pos_team, on=["Name_norm", "TeamAbbr_norm"], how="left").rename(columns={pos_col: "Position_team"})

# --- 2) Fallback merge on Name only (mode position per player) ---
pos_name = (
    pos.dropna(subset=["Name_norm"])
      .groupby("Name_norm")[pos_col]
      .agg(lambda s: s.mode().iloc[0] if not s.mode().empty else s.iloc[0])
      .reset_index()
      .rename(columns={pos_col: "Position_name"})
)

m2 = m1.merge(pos_name, on="Name_norm", how="left")

# combine: prefer team-specific, else name-only
m2["Position"] = m2["Position_team"].combine_first(m2["Position_name"])

print("Missing after Name+Team:", m1["Position_team"].isna().mean())
print("Missing after fallback Name-only:", m2["Position"].isna().mean())

# sanity display
display(m2[["Name", "TeamAbbreviation", "Position", "Position_team", "Position_name"]].head(20))

# show a sample of still-missing
display(
    m2.loc[m2["Position"].isna(), ["Name", "TeamAbbreviation"]]
      .drop_duplicates()
      .head(25)
)

Missing after Name+Team: 0.46060606060606063
Missing after fallback Name-only: 0.1898989898989899


Unnamed: 0,Name,TeamAbbreviation,Position,Position_team,Position_name
0,Arike Ogunbowale,DAL,G,G,G
1,Alyssa Thomas,CON,F,,F
2,Breanna Stewart,NYL,F,F,F
3,Kelsey Mitchell,IND,G,G,G
4,Jewell Loyd,SEA,G,,G
5,Ezi Magbegor,SEA,"F, C","F, C","F, C"
6,Natasha Howard,DAL,F,,F
7,Chelsea Gray,LVA,G,G,G
8,Rhyne Howard,ATL,G,G,G
9,Kelsey Plum,LVA,G,,G


Unnamed: 0,Name,TeamAbbreviation
21,Betnijah Laney,NYL
32,Cheyenne Parker,ATL
40,Crystal Dangerfield,DAL
43,Dorka Juhász,MIN
44,Tianna Hawkins,WAS
54,Jordan Horston,SEA
59,Kristy Wallace,IND
60,Diana Taurasi,PHO
64,Danielle Robinson,ATL
65,Layshia Clarendon,LAS


In [None]:
cols = [c for c in ["Name", "TeamAbbreviation", "Season"] if c in m2.columns]

missing_rows = (
    m2.loc[m2["Position"].isna(), cols]
      .dropna(subset=["Name"])
      .drop_duplicates()
      .sort_values(cols)
      .reset_index(drop=True)
)

print(f"Missing position rows: {len(missing_rows)}")
display(missing_rows.head(50))


Missing position rows: 94


Unnamed: 0,Name,TeamAbbreviation,Season
0,Abby Meyers,WAS,2023
1,Alaina Coates,LVA,2023
2,Amanda Zahui B,IND,2023
3,Amy Atwell,PHO,2024
4,Arella Guirantes,SEA,2023
5,Ashley Joens,PHO,2023
6,Asia (AD) Durr,ATL,2023
7,Astou Ndour-Fall,CON,2024
8,Awak Kuier,DAL,2023
9,Bernadett Hatar,CON,2023


In [None]:
import pandas as pd
import re
import unicodedata
from pathlib import Path

BOX_PATH = Path("/content/player_box_2023.csv")  # <- you said this exists in Colab

def norm_name(s: str) -> str:
    s = "" if pd.isna(s) else str(s).strip().lower()
    s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii")
    s = re.sub(r"[^a-z\s\-']", "", s)
    s = re.sub(r"\s+", " ", s)
    return s

box = pd.read_csv(BOX_PATH)

# Robust column detection (in case casing differs)
cols = {c.lower(): c for c in box.columns}
name_col = cols.get("athlete_display_name")
pos_col  = cols.get("athlete_position_abbreviation")

if not name_col or not pos_col:
    raise ValueError(f"Couldn't find required columns. Found columns: {box.columns.tolist()[:50]}")

# Keep only what we need
box_small = box[[name_col, pos_col]].copy()
box_small.columns = ["Name", "PosAbbr"]

# Normalize
box_small["Name_norm"] = box_small["Name"].apply(norm_name)
box_small["PosAbbr"] = box_small["PosAbbr"].astype(str).str.strip().str.upper().replace({"": pd.NA, "NAN": pd.NA})

# Build a single “best” position per player (mode / most frequent)
pos_lookup_2023 = (
    box_small.dropna(subset=["Name_norm", "PosAbbr"])
             .groupby("Name_norm")["PosAbbr"]
             .agg(lambda s: s.value_counts().index[0])   # top frequency
             .reset_index()
             .rename(columns={"PosAbbr": "Position_from_2023_box"})
)

print("✅ Unique players in 2023 lookup:", len(pos_lookup_2023))
display(pos_lookup_2023.head(10))


✅ Unique players in 2023 lookup: 164


Unnamed: 0,Name_norm,Position_from_2023_box
0,a'ja wilson,F
1,aari mcdonald,G
2,abby meyers,G
3,ad durr,G
4,aerial powers,G
5,alaina coates,C
6,alanna smith,F
7,aliyah boston,F
8,allisha gray,G
9,alysha clark,F


In [None]:
import pandas as pd
import re
import unicodedata
from pathlib import Path

# ----------------------------
# Paths (as you specified)
# ----------------------------
TOTALS_PATH = Path("/content/pbpstats_totals_cleaned.csv")
BOX2023_PATH = Path("/content/player_box_2023.csv")
MAP2025_PATH = Path("/content/mapping_player_team_id_seasons.csv")

OUT_DIR = Path("/content/processed_positions")
OUT_DIR.mkdir(parents=True, exist_ok=True)

OUT_TOTALS = OUT_DIR / "pbpstats_totals_cleaned_with_positions.csv"
OUT_MASTER = OUT_DIR / "position_mapping_master.csv"


# ----------------------------
# Normalizers
# ----------------------------
def norm_name(s: str) -> str:
    s = "" if pd.isna(s) else str(s).strip().lower()
    s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii")
    s = re.sub(r"[^a-z\s\-']", "", s)
    s = re.sub(r"\s+", " ", s)
    return s

def pick_first_existing_col(df: pd.DataFrame, candidates: list[str]) -> str | None:
    cols_lower = {c.lower(): c for c in df.columns}
    for cand in candidates:
        if cand.lower() in cols_lower:
            return cols_lower[cand.lower()]
    return None


# ----------------------------
# 1) Load totals
# ----------------------------
totals = pd.read_csv(TOTALS_PATH)
totals.columns = [c.strip() for c in totals.columns]

name_col_totals = pick_first_existing_col(totals, ["Name", "player", "Player", "athlete_display_name"])
if not name_col_totals:
    raise ValueError(f"Couldn't find a Name column in totals. Columns: {totals.columns.tolist()[:50]}")

totals = totals.rename(columns={name_col_totals: "Name"})
totals["Name_norm"] = totals["Name"].apply(norm_name)

# If totals already has Position, keep it
pos_col_totals = pick_first_existing_col(totals, ["Position", "Pos"])
if pos_col_totals and pos_col_totals != "Position":
    totals = totals.rename(columns={pos_col_totals: "Position"})


# ----------------------------
# 2) Build lookup from 2023 box scores
# ----------------------------
box = pd.read_csv(BOX2023_PATH)
box.columns = [c.strip() for c in box.columns]

name_col_box = pick_first_existing_col(box, ["athlete_display_name"])
pos_col_box  = pick_first_existing_col(box, ["athlete_position_abbreviation"])

if not name_col_box or not pos_col_box:
    raise ValueError(
        "player_box_2023.csv must have athlete_display_name and athlete_position_abbreviation. "
        f"Found: {box.columns.tolist()[:50]}"
    )

box_small = box[[name_col_box, pos_col_box]].copy()
box_small.columns = ["Name", "PosAbbr"]
box_small["Name_norm"] = box_small["Name"].apply(norm_name)
box_small["PosAbbr"] = (
    box_small["PosAbbr"].astype(str).str.strip().str.upper().replace({"": pd.NA, "NAN": pd.NA})
)

# Most frequent position per player
lookup_2023 = (
    box_small.dropna(subset=["Name_norm", "PosAbbr"])
             .groupby("Name_norm")["PosAbbr"]
             .agg(lambda s: s.value_counts().index[0])
             .reset_index()
             .rename(columns={"PosAbbr": "Position_from_2023"})
)


# ----------------------------
# 3) Build lookup from 2025 mapping file
# ----------------------------
mp = pd.read_csv(MAP2025_PATH)
mp.columns = [c.strip() for c in mp.columns]

name_col_map = pick_first_existing_col(mp, ["Name", "player", "Player", "athlete_display_name"])
if not name_col_map:
    raise ValueError(f"Couldn't find Name column in mapping file. Columns: {mp.columns.tolist()[:50]}")

pos_col_map = pick_first_existing_col(mp, ["Position", "Pos"])
if not pos_col_map:
    raise ValueError(
        f"Couldn't find Position/Pos column in mapping file. Columns: {mp.columns.tolist()[:50]}"
    )

mp_small = mp[[name_col_map, pos_col_map]].copy()
mp_small.columns = ["Name", "Position_from_2025"]
mp_small["Name_norm"] = mp_small["Name"].apply(norm_name)

# If multiple entries per player, use mode
lookup_2025 = (
    mp_small.dropna(subset=["Name_norm", "Position_from_2025"])
            .groupby("Name_norm")["Position_from_2025"]
            .agg(lambda s: s.mode().iloc[0] if not s.mode().empty else s.iloc[0])
            .reset_index()
)


# ----------------------------
# 4) Combine lookups -> master
# Priority: 2025 mapping overrides 2023 if conflict
# ----------------------------
master = lookup_2023.merge(lookup_2025, on="Name_norm", how="outer")

master["Position_master"] = master["Position_from_2025"].combine_first(master["Position_from_2023"])

# Optional: keep a representative display Name for readability (prefer 2025)
name_ref_2025 = mp_small.dropna(subset=["Name_norm"]).drop_duplicates("Name_norm")[["Name_norm", "Name"]].rename(columns={"Name": "Name_ref_2025"})
name_ref_2023 = box_small.dropna(subset=["Name_norm"]).drop_duplicates("Name_norm")[["Name_norm", "Name"]].rename(columns={"Name": "Name_ref_2023"})

master = master.merge(name_ref_2025, on="Name_norm", how="left").merge(name_ref_2023, on="Name_norm", how="left")
master["Name"] = master["Name_ref_2025"].combine_first(master["Name_ref_2023"])

master_export = master[["Name_norm", "Name", "Position_master", "Position_from_2025", "Position_from_2023"]].copy()
master_export = master_export.sort_values(["Position_master", "Name"]).reset_index(drop=True)

# Export master mapping
master_export.to_csv(OUT_MASTER, index=False)
print("✅ Wrote master mapping:", OUT_MASTER)
print("Master mapping coverage:", master_export["Position_master"].notna().mean())


# ----------------------------
# 5) Apply master mapping to totals
# Priority: existing totals Position > master mapping
# ----------------------------
totals2 = totals.merge(
    master_export[["Name_norm", "Position_master"]],
    on="Name_norm",
    how="left"
)

# If totals already had Position, keep it; else fill from master
if "Position" in totals2.columns:
    totals2["Position"] = totals2["Position"].combine_first(totals2["Position_master"])
else:
    totals2["Position"] = totals2["Position_master"]

totals2 = totals2.drop(columns=["Position_master"])

# Report missing after fill
missing_pct = totals2["Position"].isna().mean()
missing_n = totals2["Position"].isna().sum()
print(f"✅ Totals rows: {len(totals2)}")
print(f"✅ Missing Position after fill: {missing_n} ({missing_pct:.1%})")

# Export updated totals
totals2.to_csv(OUT_TOTALS, index=False)
print("✅ Wrote updated totals:", OUT_TOTALS)

# Also give you the remaining missing names to patch later
missing_names = (
    totals2.loc[totals2["Position"].isna(), "Name"]
           .dropna()
           .drop_duplicates()
           .sort_values()
           .tolist()
)
print(f"\nPlayers still missing Position: {len(missing_names)}")
print(missing_names[:50])  # preview first 50


✅ Wrote master mapping: /content/processed_positions/position_mapping_master.csv
Master mapping coverage: 1.0
✅ Totals rows: 257
✅ Missing Position after fill: 20 (7.8%)
✅ Wrote updated totals: /content/processed_positions/pbpstats_totals_cleaned_with_positions.csv

Players still missing Position: 20
['Amy Atwell', 'Asia (AD) Durr', 'Astou Ndour-Fall', 'Betnijah Laney-Hamilton', 'Caitlin Bickle', 'Celeste Taylor', 'Charisma Osborne', 'Chennedy Carter', 'Cyesha Goree', 'DiDi Richards', 'Dyaisha Fair', 'Ezinne Kalu', 'Jakia Brown-Turner', 'Jessika Carter', 'Kaela Davis', 'Kysre Gondrezick', 'Mikiah Herbert Harrigan', 'Nika Mühl', 'Olivia Époupa', 'Stephanie Soares']


In [None]:
import pandas as pd
import re
import unicodedata
from pathlib import Path

TOTALS_BY_SEASON_PATH = Path("/content/processed_positions/pbpstats_totals_player_by_season.csv")
MASTER_MAP_PATH       = Path("/content/processed_positions/position_mapping_master.csv")

OUT_PATH = Path("/content/processed_positions/pbpstats_totals_player_by_season_with_positions.csv")

def norm_name(s: str) -> str:
    s = "" if pd.isna(s) else str(s).strip().lower()
    s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii")
    s = re.sub(r"[^a-z\s\-']", "", s)
    s = re.sub(r"\s+", " ", s)
    return s

# ----------------------------
# Load data
# ----------------------------
df = pd.read_csv(TOTALS_BY_SEASON_PATH)
df.columns = [c.strip() for c in df.columns]

mp = pd.read_csv(MASTER_MAP_PATH)
mp.columns = [c.strip() for c in mp.columns]

# Ensure expected columns
if "Name" not in df.columns:
    raise ValueError(f"Totals file missing Name. Found: {df.columns.tolist()[:50]}")
if "Season" not in df.columns:
    raise ValueError(f"Totals file missing Season. Found: {df.columns.tolist()[:50]}")

# Normalize join keys
df = df.copy()
df["Name_norm"] = df["Name"].apply(norm_name)

mp = mp.copy()
if "Name_norm" not in mp.columns:
    if "Name" not in mp.columns:
        raise ValueError(f"Mapping missing Name/Name_norm. Found: {mp.columns.tolist()[:50]}")
    mp["Name_norm"] = mp["Name"].apply(norm_name)

# Identify which position column exists in the mapping
pos_col = None
for cand in ["Position_master", "Position", "Pos"]:
    if cand in mp.columns:
        pos_col = cand
        break
if not pos_col:
    raise ValueError(f"Mapping missing a position column (Position_master/Position/Pos). Found: {mp.columns.tolist()[:50]}")

# ----------------------------
# Season-aware merge if possible
# (only if mapping has Season)
# ----------------------------
if "Season" in mp.columns:
    # Best case: map by Name_norm + Season
    mp_season = (
        mp.dropna(subset=["Name_norm", pos_col, "Season"])
          .drop_duplicates(subset=["Name_norm", "Season"])
          [["Name_norm", "Season", pos_col]]
          .rename(columns={pos_col: "Position_from_map"})
    )

    merged = df.merge(mp_season, on=["Name_norm", "Season"], how="left")

    # Fallback to Name-only (in case season-specific is missing)
    mp_name = (
        mp.dropna(subset=["Name_norm", pos_col])
          .groupby("Name_norm")[pos_col]
          .agg(lambda s: s.mode().iloc[0] if not s.mode().empty else s.iloc[0])
          .reset_index()
          .rename(columns={pos_col: "Position_from_map_name"})
    )

    merged = merged.merge(mp_name, on="Name_norm", how="left")
    merged["Position"] = merged["Position_from_map"].combine_first(merged["Position_from_map_name"])
    merged = merged.drop(columns=["Position_from_map", "Position_from_map_name"])

else:
    # Map by Name_norm only (your current master mapping)
    mp_name = (
        mp.dropna(subset=["Name_norm", pos_col])
          .drop_duplicates(subset=["Name_norm"])
          [["Name_norm", pos_col]]
          .rename(columns={pos_col: "Position"})
    )
    merged = df.merge(mp_name, on="Name_norm", how="left")

# ----------------------------
# Reporting
# ----------------------------
missing_n = merged["Position"].isna().sum()
missing_pct = merged["Position"].isna().mean()

print(f"✅ Rows: {len(merged)}")
print(f"✅ Missing Position: {missing_n} ({missing_pct:.1%})")

# Who is still missing? (unique names + how many seasons affected)
missing_summary = (
    merged.loc[merged["Position"].isna(), ["Name", "Season"]]
          .drop_duplicates()
          .groupby("Name")["Season"]
          .agg(["count", lambda s: sorted(s.unique())])
          .reset_index()
)
missing_summary.columns = ["Name", "Seasons_missing_count", "Seasons_missing_list"]

display(missing_summary.sort_values(["Seasons_missing_count", "Name"], ascending=[False, True]).head(50))

# Save output
merged.to_csv(OUT_PATH, index=False)
print("✅ Wrote:", OUT_PATH)


✅ Rows: 495
✅ Missing Position: 20 (4.0%)


Unnamed: 0,Name,Seasons_missing_count,Seasons_missing_list
0,Amy Atwell,1,[2024]
1,Asia (AD) Durr,1,[2023]
2,Astou Ndour-Fall,1,[2024]
3,Betnijah Laney-Hamilton,1,[2024]
4,Caitlin Bickle,1,[2024]
5,Celeste Taylor,1,[2024]
6,Charisma Osborne,1,[2024]
7,Chennedy Carter,1,[2024]
8,Cyesha Goree,1,[2023]
9,DiDi Richards,1,[2024]


✅ Wrote: /content/processed_positions/pbpstats_totals_player_by_season_with_positions.csv


In [None]:
import pandas as pd
from pathlib import Path

IN_PATH  = Path("/content/processed_positions/pbpstats_totals_player_by_season_with_positions.csv")
OUT_PATH = Path("/content/processed_positions/missing_positions_summary.csv")

df = pd.read_csv(IN_PATH)
df.columns = [c.strip() for c in df.columns]

# Detect GP column name (PBPStats sometimes varies)
gp_col = None
for cand in ["GamesPlayed", "GP", "games_played"]:
    if cand in df.columns:
        gp_col = cand
        break
if not gp_col:
    raise ValueError(f"Couldn't find a games played column. Columns include: {df.columns.tolist()[:40]}")

# Ensure Position exists
if "Position" not in df.columns:
    raise ValueError("Expected a 'Position' column in the merged file.")

# Filter missing positions
miss = df[df["Position"].isna()].copy()

# (optional) make GP numeric
miss[gp_col] = pd.to_numeric(miss[gp_col], errors="coerce")

# Table of missing rows (name + season + GP)
missing_rows = (
    miss[["Name", "Season", gp_col]]
      .drop_duplicates()
      .sort_values(["Name", "Season"])
      .rename(columns={gp_col: "GamesPlayed"})
      .reset_index(drop=True)
)

# Summary per player:
# - list of seasons missing
# - list of games played in those seasons (aligned)
# - total GP across missing seasons (nice for triage)
summary = (
    missing_rows.groupby("Name")
    .agg(
        Seasons_missing_list=("Season", lambda s: sorted(s.tolist())),
        GamesPlayed_by_season=("GamesPlayed", lambda s: s.fillna(0).astype(int).tolist()),
        TotalGamesPlayed_missing=("GamesPlayed", lambda s: int(s.fillna(0).sum())),
        Seasons_missing_count=("Season", "count")
    )
    .reset_index()
    .sort_values(["TotalGamesPlayed_missing", "Seasons_missing_count", "Name"], ascending=[True, False, True])
    .reset_index(drop=True)
)

display(summary)

# Quick “likely safe to ignore” check: total GP == 0
zero_gp = summary[summary["TotalGamesPlayed_missing"] == 0]
print(f"Players missing Position with 0 GP in missing seasons: {len(zero_gp)} / {len(summary)}")

# Export
summary.to_csv(OUT_PATH, index=False)
print("✅ wrote:", OUT_PATH)


Unnamed: 0,Name,Seasons_missing_list,GamesPlayed_by_season,TotalGamesPlayed_missing,Seasons_missing_count
0,Dyaisha Fair,[2024],[1],1,1
1,Ezinne Kalu,[2024],[1],1,1
2,Charisma Osborne,[2024],[2],2,1
3,Jakia Brown-Turner,[2024],[2],2,1
4,Jessika Carter,[2024],[2],2,1
5,Kaela Davis,[2024],[4],4,1
6,Kysre Gondrezick,[2024],[5],5,1
7,Amy Atwell,[2024],[6],6,1
8,Caitlin Bickle,[2024],[8],8,1
9,Cyesha Goree,[2023],[10],10,1


Players missing Position with 0 GP in missing seasons: 0 / 20
✅ wrote: /content/processed_positions/missing_positions_summary.csv


In [None]:
import pandas as pd
from pathlib import Path

DATA_PATH = Path("/content/processed_positions/pbpstats_totals_player_by_season_with_positions.csv")
MAP_PATH  = Path("/content/processed_positions/position_mapping_master.csv")

OUT_PATH  = Path("/content/processed_positions/pbpstats_totals_player_by_season_with_positions_exactpatch.csv")

df = pd.read_csv(DATA_PATH)
mp = pd.read_csv(MAP_PATH)

df.columns = [c.strip() for c in df.columns]
mp.columns = [c.strip() for c in mp.columns]

# Detect mapping name + position columns (flexible)
name_col_map = "Name" if "Name" in mp.columns else None
if not name_col_map:
    raise ValueError(f"Mapping file doesn't have a 'Name' column. Found: {mp.columns.tolist()[:40]}")

pos_col_map = None
for cand in ["Position_master", "Position", "Pos"]:
    if cand in mp.columns:
        pos_col_map = cand
        break
if not pos_col_map:
    raise ValueError(f"Mapping file doesn't have a position column. Found: {mp.columns.tolist()[:40]}")

# 1) find missing names in df
missing_names = (
    df.loc[df["Position"].isna(), "Name"]
      .dropna()
      .drop_duplicates()
      .tolist()
)

print(f"Missing names (unique): {len(missing_names)}")

# 2) create a mapping dict using ONLY exact Name (no normalization)
mp_exact = (
    mp.dropna(subset=[name_col_map, pos_col_map])
      .drop_duplicates(subset=[name_col_map])
      [[name_col_map, pos_col_map]]
      .rename(columns={name_col_map: "Name", pos_col_map: "Position_exact"})
)

exact_map = dict(zip(mp_exact["Name"], mp_exact["Position_exact"]))

# 3) apply only to rows where Position is missing AND Name is in the missing list
mask = df["Position"].isna() & df["Name"].isin(missing_names)
before = df.loc[mask, "Position"].isna().sum()

df.loc[mask, "Position"] = df.loc[mask, "Name"].map(exact_map)

after = df.loc[mask, "Position"].isna().sum()

print(f"✅ Exact-patch filled: {before - after} rows (of {before} missing rows)")

# 4) show who is still missing (unique names)
still_missing = (
    df.loc[df["Position"].isna(), "Name"]
      .dropna()
      .drop_duplicates()
      .sort_values()
      .tolist()
)

print(f"Still missing after exact patch: {len(still_missing)}")
print(still_missing)

# Save output
df.to_csv(OUT_PATH, index=False)
print("✅ wrote:", OUT_PATH)


Missing names (unique): 20
✅ Exact-patch filled: 0 rows (of 20 missing rows)
Still missing after exact patch: 20
['Amy Atwell', 'Asia (AD) Durr', 'Astou Ndour-Fall', 'Betnijah Laney-Hamilton', 'Caitlin Bickle', 'Celeste Taylor', 'Charisma Osborne', 'Chennedy Carter', 'Cyesha Goree', 'DiDi Richards', 'Dyaisha Fair', 'Ezinne Kalu', 'Jakia Brown-Turner', 'Jessika Carter', 'Kaela Davis', 'Kysre Gondrezick', 'Mikiah Herbert Harrigan', 'Nika Mühl', 'Olivia Époupa', 'Stephanie Soares']
✅ wrote: /content/processed_positions/pbpstats_totals_player_by_season_with_positions_exactpatch.csv


In [None]:
import pandas as pd
import numpy as np
import re
from pathlib import Path

IN_PATH = Path("/content/processed_positions/pbpstats_totals_player_by_season_with_positions.csv")
OUT_DIR = Path("/content/processed_positions")
OUT_DIR.mkdir(parents=True, exist_ok=True)

OUT_CLEAN = OUT_DIR / "pbpstats_totals_player_by_season_cleaned_pre_percentiles.csv"
OUT_RENAME_MAP = OUT_DIR / "pbpstats_column_rename_map.csv"

# -------------------------
# Helpers
# -------------------------
def to_snake(name: str) -> str:
    s = str(name).strip().lower()
    s = re.sub(r"[^\w]+", "_", s)          # non-word -> underscore
    s = re.sub(r"_+", "_", s).strip("_")  # collapse underscores
    if re.match(r"^\d", s):               # can't start with a digit in some SQL engines
        s = f"c_{s}"
    return s

def make_unique(cols):
    seen = {}
    out = []
    for c in cols:
        if c not in seen:
            seen[c] = 0
            out.append(c)
        else:
            seen[c] += 1
            out.append(f"{c}_{seen[c]}")
    return out

def first_existing(df, candidates):
    lower = {c.lower(): c for c in df.columns}
    for cand in candidates:
        if cand.lower() in lower:
            return lower[cand.lower()]
    return None

# -------------------------
# Load
# -------------------------
df = pd.read_csv(IN_PATH)
df.columns = [c.strip() for c in df.columns]

# Basic required columns (flexible)
name_col   = first_existing(df, ["Name", "Player", "player"])
season_col = first_existing(df, ["Season"])
team_col   = first_existing(df, ["TeamAbbreviation", "Team_Abbreviation", "Team"])
gp_col     = first_existing(df, ["GamesPlayed", "GP"])

if not name_col or not season_col:
    raise ValueError(f"Missing required columns. Found columns: {df.columns.tolist()[:40]}")

# Normalize core column names (keep originals for now)
rename_core = {name_col: "Name", season_col: "Season"}
if team_col: rename_core[team_col] = "TeamAbbreviation"
if gp_col: rename_core[gp_col] = "GamesPlayed"
df = df.rename(columns=rename_core)

# Ensure Season numeric
df["Season"] = pd.to_numeric(df["Season"], errors="coerce").astype("Int64")

# Strip string columns
for c in df.columns:
    if df[c].dtype == "object":
        df[c] = df[c].astype(str).str.strip().replace({"": pd.NA, "nan": pd.NA, "None": pd.NA})

# -------------------------
# Convert numerics
# -------------------------
# Keep these as non-numeric explicitly
non_numeric = {"Name", "TeamAbbreviation", "Position", "Name_norm"}
for c in df.columns:
    if c in non_numeric:
        continue
    # Try numeric conversion; if it creates a lot of NaNs, keep original
    if df[c].dtype == "object":
        converted = pd.to_numeric(df[c], errors="coerce")
        # If at least 70% of non-null values become numeric, accept it
        non_null = df[c].notna().sum()
        became_num = converted.notna().sum()
        if non_null == 0 or (became_num / max(non_null, 1) >= 0.70):
            df[c] = converted

# -------------------------
# Dedupe (optional but helpful)
# -------------------------
dedupe_keys = ["Name", "Season"]
if "TeamAbbreviation" in df.columns:
    dedupe_keys.append("TeamAbbreviation")

before = len(df)
df = df.drop_duplicates(subset=dedupe_keys, keep="first")
after = len(df)

# -------------------------
# “Standard” helper calcs (safe + low drama)
# -------------------------
if "Minutes" in df.columns and "GamesPlayed" in df.columns:
    df["minutes_per_game"] = np.where(
        (df["GamesPlayed"].fillna(0) > 0),
        df["Minutes"] / df["GamesPlayed"],
        np.nan
    )

# -------------------------
# Percent / rate rounding (3 decimals)
# Also convert 0–100 -> 0–1 when it looks like percentages
# -------------------------
pct_keywords = ("pct", "percent", "percentage")
rate_keywords = ("rate", "ppp", "per_poss", "perposs", "per100", "per_100")

def is_pct_like(colname: str) -> bool:
    s = colname.lower()
    return any(k in s for k in pct_keywords) or s.endswith("%")

def is_rate_like(colname: str) -> bool:
    s = colname.lower()
    return any(k in s for k in rate_keywords)

for c in df.columns:
    if c in non_numeric:
        continue
    if not (is_pct_like(c) or is_rate_like(c)):
        continue
    if not pd.api.types.is_numeric_dtype(df[c]):
        continue

    s = df[c].astype(float)

    # If it looks like 0–100 percentages, convert to 0–1
    mx = s.dropna().max() if s.notna().any() else np.nan
    mn = s.dropna().min() if s.notna().any() else np.nan
    if pd.notna(mx) and pd.notna(mn) and mn >= 0 and mx <= 100 and mx > 1.5:
        s = s / 100.0

    df[c] = s.round(3)

# -------------------------
# BigQuery-friendly column names (snake_case)
# -------------------------
snake_cols = [to_snake(c) for c in df.columns]
snake_cols = make_unique(snake_cols)

rename_map = pd.DataFrame({"original_column": df.columns.tolist(), "clean_column": snake_cols})
df.columns = snake_cols

# -------------------------
# Save outputs
# -------------------------
df.to_csv(OUT_CLEAN, index=False)
rename_map.to_csv(OUT_RENAME_MAP, index=False)

print("✅ Input rows:", before)
print("✅ Output rows (after dedupe):", after)
print("✅ Wrote cleaned file:", OUT_CLEAN)
print("✅ Wrote rename map:", OUT_RENAME_MAP)

# Quick sanity checks
pos_col = "position" if "position" in df.columns else None
if pos_col:
    print("✅ Missing position %:", float(pd.isna(df[pos_col]).mean()))
if "gamesplayed" in df.columns:
    print("✅ Rows with 0 games:", int((df["gamesplayed"].fillna(0) == 0).sum()))


✅ Input rows: 495
✅ Output rows (after dedupe): 495
✅ Wrote cleaned file: /content/processed_positions/pbpstats_totals_player_by_season_cleaned_pre_percentiles.csv
✅ Wrote rename map: /content/processed_positions/pbpstats_column_rename_map.csv
✅ Missing position %: 0.04040404040404041
✅ Rows with 0 games: 0


###############################################
# 2. Aggregation / Cleaning
###############################################

################ OKAY TO OMIT THIS SECTION below WHEN USING PBPSTATS DATA? ###########

# We aggregate game-log level data into season totals for each player
player_season_stats <- raw_box %>%
  # Ensure minutes are numeric (sometimes they come as strings)
  mutate(minutes = as.numeric(minutes)) %>%
  group_by(athlete_id, athlete_display_name, team_short_display_name, athlete_position_abbreviation, season) %>% # Added 'season' here
  summarise(
    games_played  = n(),
    games_started = sum(starter, na.rm = TRUE),
    minutes_total = sum(minutes, na.rm = TRUE),

    # Counting stats
    pts_total     = sum(points, na.rm = TRUE),
    reb_total     = sum(rebounds, na.rm = TRUE),
    oreb_total    = sum(offensive_rebounds, na.rm = TRUE),
    dreb_total    = sum(defensive_rebounds, na.rm = TRUE),
    ast_total     = sum(assists, na.rm = TRUE),
    stl_total     = sum(steals, na.rm = TRUE),
    blk_total     = sum(blocks, na.rm = TRUE),
    tov_total     = sum(turnovers, na.rm = TRUE),

    # Shooting totals
    fgm  = sum(field_goals_made, na.rm = TRUE),
    fga  = sum(field_goals_attempted, na.rm = TRUE),
    fg3m = sum(three_point_field_goals_made, na.rm = TRUE),
    fg3a = sum(three_point_goals_attempted, na.rm = TRUE),
    ftm  = sum(free_throws_made, na.rm = TRUE),
    fta  = sum(free_throws_attempted, na.rm = TRUE),

    .groups = "drop"
  ) %>%

################ OKAY TO OMIT THIS SECTION above WHEN USING PBPSTATS DATA? ###########

# Basic per-game and per-40 calculations
  mutate(
    mpg = minutes_total / games_played,
    ppg = pts_total / games_played,
    rpg = reb_total / games_played,
    apg = ast_total / games_played,
    spg = stl_total / games_played,
    bpg = blk_total / games_played,
    tovpg = tov_total / games_played,

    # Per 40 Mins (Normalize for playing time)
    # Avoid division by zero with pmax
    pts_per_40  = (pts_total / pmax(minutes_total, 1)) * 40,
    reb_per_40  = (reb_total / pmax(minutes_total, 1)) * 40,
    oreb_per_40 = (oreb_total / pmax(minutes_total, 1)) * 40,
    dreb_per_40 = (dreb_total / pmax(minutes_total, 1)) * 40,
    ast_per_40  = (ast_total / pmax(minutes_total, 1)) * 40,
    stl_per_40  = (stl_total / pmax(minutes_total, 1)) * 40,
    blk_per_40  = (blk_total / pmax(minutes_total, 1)) * 40,
    tov_per_40  = (tov_total / pmax(minutes_total, 1)) * 40
  ) %>%n  # Efficiency & Advanced Metrics
  mutate(
    # Shooting Percentages
    fg_pct  = ifelse(fga > 0, fgm / fga, 0),
    fg3_pct = ifelse(fg3a > 0, fg3m / fg3a, 0),
    ft_pct  = ifelse(fta > 0, ftm / fta, 0),

    # Three Point Attempt Rate
    threepar = ifelse(fga > 0, fg3a / fga, 0),

    # Free Throw Attempt Rate
    fta_rate = ifelse(fga > 0, fta / fga, 0),

    # Effective Field Goal %
    efg_pct = ifelse(fga > 0, (fgm + 0.5 * fg3m) / fga, 0),

    # True Shooting %
    # Approximation: TSA = FGA + 0.44 * FTA
    ts_pct = ifelse((fga + 0.44 * fta) > 0,
                    pts_total / (2 * (fga + 0.44 * fta)), 0),

    # Usage Rate (Approximate Version)
    # Basic Formula: (FGA + 0.44*FTA + TOV) / (Minutes) * (Team Minutes / 5)
    # Since we don't have team totals here, we calculate "Usage Load"
    # and will treat it as a proxy or raw usage volume.
    usage_load = (fga + 0.44 * fta + tov_total),
    usage      = usage_load / pmax(minutes_total, 1), # possessions used per minute

    # Ratio Stats
    ast_to_tov = ifelse(tov_total > 0, ast_total / tov_total, ast_total),

    # Estimated percentages (Simplified without team totals)
    # e.g., ast_pct ~ Ast / (FGA + 0.44*FTA + Ast + TOV)
    # This is a 'player-based' approximation often used when team totals aren't joined.
    possessions_estimated = fga + 0.44 * fta + tov_total + ast_total,
    ast_pct  = ifelse(possessions_estimated > 0, ast_total / possessions_estimated, 0),
    tov_pct  = ifelse(possessions_estimated > 0, tov_total / possessions_estimated, 0),

    # Rebounding Shares (Approximation using per-40/position baselines is common if team totals missing)
    # Here we will just stick to the per-40 or total counts unless we join team data.
    # For the lab, we'll create simple ratios:
    oreb_pct = oreb_total / pmax(reb_total, 1), # % of player's rebs that were offensive
    dreb_pct = dreb_total / pmax(reb_total, 1)  # % of player's rebs that were defensive
  )



In [12]:
# ---------------------------------------------
# PBPStats Totals (by season) -> Derived Metrics
#   - per game
#   - per 40 minutes
#   - per 100 possessions (if possessions exists; else uses an estimated possessions proxy)
#   - shooting %s, 3PAr, FTr, eFG%, TS%, AST:TOV
#
# Input (you specified):
#   /content/processed_positions/pbpstats_totals_player_by_season_cleaned_pre_percentiles.csv
# Outputs:
#   /content/processed_positions/pbpstats_totals_player_by_season_derived_pre_percentiles.csv
#   /content/processed_positions/pbpstats_derived_column_rename_map.csv
# ---------------------------------------------

from __future__ import annotations

import re
import numpy as np
import pandas as pd
from pathlib import Path

IN_PATH  = Path("/content/processed_positions/pbpstats_totals_player_by_season_cleaned_pre_percentiles.csv")
OUT_DIR  = Path("/content/processed_positions")
OUT_DIR.mkdir(parents=True, exist_ok=True)

OUT_DATA = OUT_DIR / "pbpstats_totals_player_by_season_derived_pre_percentiles.csv"
OUT_RENAME_MAP = OUT_DIR / "pbpstats_derived_column_rename_map.csv"

# -------------------------
# Helpers
# -------------------------
def to_snake(name: str) -> str:
    s = str(name).strip()
    s = re.sub(r"[^\w]+", "_", s)        # non-word -> underscore
    s = re.sub(r"_+", "_", s).strip("_")
    s = s.lower()
    if re.match(r"^\d", s):
        s = f"c_{s}"
    return s

def make_unique(cols):
    seen = {}
    out = []
    for c in cols:
        if c not in seen:
            seen[c] = 0
            out.append(c)
        else:
            seen[c] += 1
            out.append(f"{c}_{seen[c]}")
    return out

def first_existing(df: pd.DataFrame, candidates: list[str]) -> str | None:
    lower = {c.lower(): c for c in df.columns}
    for cand in candidates:
        if cand.lower() in lower:
            return lower[cand.lower()]
    return None

def safe_div(numer, denom):
    denom = denom.replace(0, np.nan) if isinstance(denom, pd.Series) else (np.nan if denom == 0 else denom)
    return numer / denom

# -------------------------
# 1) Load
# -------------------------
df_raw = pd.read_csv(IN_PATH)
df_raw.columns = [c.strip() for c in df_raw.columns]

# -------------------------
# 2) Normalize column names to snake_case (BigQuery-friendly)
# -------------------------
rename_map = pd.DataFrame({
    "original_column": df_raw.columns,
    "clean_column": make_unique([to_snake(c) for c in df_raw.columns])
})
df = df_raw.copy()
df.columns = rename_map["clean_column"].tolist()

rename_map.to_csv(OUT_RENAME_MAP, index=False)
print("✅ wrote rename map:", OUT_RENAME_MAP)

# -------------------------
# 3) Identify core columns (robust/flexible)
# -------------------------
# Required-ish identifiers
col_name   = first_existing(df, ["name"])
col_season = first_existing(df, ["season"])
col_team   = first_existing(df, ["teamabbreviation", "team_abbreviation", "team"])
col_pos    = first_existing(df, ["position", "pos"])

# Common totals columns (best-effort matching)
col_gp     = first_existing(df, ["gamesplayed", "gp", "games_played"])
col_min    = first_existing(df, ["minutes", "min", "minutes_total"])

# Box-style totals (some PBPStats exports use these; others use different labels)
col_pts    = first_existing(df, ["points", "pts"])
col_reb    = first_existing(df, ["rebounds", "reb", "totalrebounds", "trb"])
col_oreb   = first_existing(df, ["offensiverebounds", "oreb"])
col_dreb   = first_existing(df, ["defensiverebounds", "dreb"])
col_ast    = first_existing(df, ["assists", "ast"])
col_stl    = first_existing(df, ["steals", "stl"])
col_blk    = first_existing(df, ["blocks", "blk"])
col_tov    = first_existing(df, ["turnovers", "tov"])
col_pf     = first_existing(df, ["personalfouls", "pf", "fouls"])

# Shooting totals
col_fgm    = first_existing(df, ["fieldgoalsmade", "fgm"])
col_fga    = first_existing(df, ["fieldgoalsattempted", "fga"])
col_fg3m   = first_existing(df, ["threepointfieldgoalsmade", "fg3m", "3pm"])
col_fg3a   = first_existing(df, ["threepointgoalsattempted", "fg3a", "3pa"])
col_ftm    = first_existing(df, ["freethrowsmade", "ftm"])
col_fta    = first_existing(df, ["freethrowsattempted", "fta"])

# Possessions (may or may not exist in your PBP totals export)
col_poss   = first_existing(df, ["possessions", "poss", "playerpossessions", "offensivepossessions"])

core_report = {
    "name": col_name, "season": col_season, "team": col_team, "position": col_pos,
    "games_played": col_gp, "minutes": col_min, "possessions": col_poss,
    "pts": col_pts, "reb": col_reb, "oreb": col_oreb, "dreb": col_dreb,
    "ast": col_ast, "stl": col_stl, "blk": col_blk, "tov": col_tov, "pf": col_pf,
    "fgm": col_fgm, "fga": col_fga, "fg3m": col_fg3m, "fg3a": col_fg3a, "ftm": col_ftm, "fta": col_fta
}
missing_core = [k for k,v in core_report.items() if v is None and k in ("name","season")]
if missing_core:
    raise ValueError(f"Missing required columns after snake_case: {missing_core}. "
                     f"Available columns (sample): {df.columns.tolist()[:40]}")

print("✅ column matching (None means not found):")
for k,v in core_report.items():
    print(f"  {k:>12}: {v}")

# -------------------------
# 4) Coerce numerics (best-effort)
# -------------------------
id_like = {c for c in [col_name, col_team, col_pos] if c}
for c in df.columns:
    if c in id_like:
        continue
    if df[c].dtype == "object":
        df[c] = pd.to_numeric(df[c], errors="ignore")

# Force key numeric columns if present
for c in [col_gp, col_min, col_poss, col_pts, col_reb, col_oreb, col_dreb, col_ast, col_stl, col_blk, col_tov,
          col_fgm, col_fga, col_fg3m, col_fg3a, col_ftm, col_fta, col_pf]:
    if c and c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce")

# -------------------------
# 5) Per-game
# -------------------------
if col_gp and col_gp in df.columns:
    gp = df[col_gp].fillna(0)
    if col_min and col_min in df.columns:
        df["mpg"] = np.where(gp > 0, df[col_min] / gp, np.nan)

    per_game_stats = {
        "ppg": col_pts, "rpg": col_reb, "orpg": col_oreb, "drpg": col_dreb,
        "apg": col_ast, "spg": col_stl, "bpg": col_blk, "tovpg": col_tov, "pfpg": col_pf
    }
    for newc, src in per_game_stats.items():
        if src and src in df.columns:
            df[newc] = np.where(gp > 0, df[src] / gp, np.nan)

# -------------------------
# 6) Per-40 minutes
# -------------------------
if col_min and col_min in df.columns:
    mins = df[col_min].fillna(0)
    denom = mins.replace(0, np.nan)

    per40_stats = {
        "pts_per_40": col_pts, "reb_per_40": col_reb, "oreb_per_40": col_oreb, "dreb_per_40": col_dreb,
        "ast_per_40": col_ast, "stl_per_40": col_stl, "blk_per_40": col_blk, "tov_per_40": col_tov, "pf_per_40": col_pf,
        "fgm_per_40": col_fgm, "fga_per_40": col_fga, "fg3m_per_40": col_fg3m, "fg3a_per_40": col_fg3a,
        "ftm_per_40": col_ftm, "fta_per_40": col_fta
    }
    for newc, src in per40_stats.items():
        if src and src in df.columns:
            df[newc] = (df[src] / denom) * 40

# -------------------------
# 7) Per-100 possessions
# -------------------------
# If possessions isn't available, we create a proxy estimate:
#   poss_est = FGA + 0.44*FTA + TOV
# (This is a common approximation; when you have true possessions, prefer those.)
if col_poss and col_poss in df.columns:
    poss = df[col_poss].fillna(0).replace(0, np.nan)
    df["possessions_used_for_per100"] = df[col_poss]
else:
    # build poss_est only if the ingredients exist
    if (col_fga and col_fga in df.columns) or (col_fta and col_fta in df.columns) or (col_tov and col_tov in df.columns):
        fga = df[col_fga] if col_fga and col_fga in df.columns else 0
        fta = df[col_fta] if col_fta and col_fta in df.columns else 0
        tov = df[col_tov] if col_tov and col_tov in df.columns else 0
        df["poss_est"] = (fga.fillna(0) + 0.44 * fta.fillna(0) + tov.fillna(0))
        poss = df["poss_est"].replace(0, np.nan)
        df["possessions_used_for_per100"] = df["poss_est"]
        print("⚠️ possessions column not found; using poss_est = FGA + 0.44*FTA + TOV for per-100 rates.")
    else:
        poss = None
        print("⚠️ possessions column not found and cannot estimate (missing FGA/FTA/TOV). Skipping per-100 rates.")

if poss is not None:
    per100_stats = {
        "pts_per_100": col_pts, "reb_per_100": col_reb, "oreb_per_100": col_oreb, "dreb_per_100": col_dreb,
        "ast_per_100": col_ast, "stl_per_100": col_stl, "blk_per_100": col_blk, "tov_per_100": col_tov, "pf_per_100": col_pf,
        "fgm_per_100": col_fgm, "fga_per_100": col_fga, "fg3m_per_100": col_fg3m, "fg3a_per_100": col_fg3a,
        "ftm_per_100": col_ftm, "fta_per_100": col_fta
    }
    for newc, src in per100_stats.items():
        if src and src in df.columns:
            df[newc] = (df[src] / poss) * 100

# -------------------------
# 8) Shooting / efficiency metrics
# -------------------------
# Shooting %
if col_fga and col_fgm and col_fga in df.columns and col_fgm in df.columns:
    df["fg_pct"] = np.where(df[col_fga] > 0, df[col_fgm] / df[col_fga], np.nan)

if col_fg3a and col_fg3m and col_fg3a in df.columns and col_fg3m in df.columns:
    df["fg3_pct"] = np.where(df[col_fg3a] > 0, df[col_fg3m] / df[col_fg3a], np.nan)

if col_fta and col_ftm and col_fta in df.columns and col_ftm in df.columns:
    df["ft_pct"] = np.where(df[col_fta] > 0, df[col_ftm] / df[col_fta], np.nan)

# Attempt rates
if col_fga and col_fg3a and col_fga in df.columns and col_fg3a in df.columns:
    df["threepar"] = np.where(df[col_fga] > 0, df[col_fg3a] / df[col_fga], np.nan)  # 3PA rate

if col_fga and col_fta and col_fga in df.columns and col_fta in df.columns:
    df["fta_rate"] = np.where(df[col_fga] > 0, df[col_fta] / df[col_fga], np.nan)   # FT attempt rate

# eFG% and TS%
if col_fga and col_fgm and col_fg3m and col_fga in df.columns and col_fgm in df.columns and col_fg3m in df.columns:
    df["efg_pct"] = np.where(df[col_fga] > 0, (df[col_fgm] + 0.5 * df[col_fg3m]) / df[col_fga], np.nan)

if col_pts and col_fga and col_fta and col_pts in df.columns and col_fga in df.columns and col_fta in df.columns:
    tsa = (df[col_fga].fillna(0) + 0.44 * df[col_fta].fillna(0))
    df["ts_pct"] = np.where(tsa > 0, df[col_pts] / (2 * tsa), np.nan)

# AST:TOV (simple)
if col_ast and col_tov and col_ast in df.columns and col_tov in df.columns:
    df["ast_to_tov"] = np.where(df[col_tov] > 0, df[col_ast] / df[col_tov], np.nan)

# Usage proxy (volume, not true USG% without team totals)
if col_fga and col_fta and col_tov:
    if col_fga in df.columns and col_fta in df.columns and col_tov in df.columns:
        df["usage_load"] = (df[col_fga].fillna(0) + 0.44 * df[col_fta].fillna(0) + df[col_tov].fillna(0))
        if col_min and col_min in df.columns:
            df["usage_per_min"] = np.where(df[col_min] > 0, df["usage_load"] / df[col_min], np.nan)

# OREB / DREB shares (within-player rebound mix)
if col_reb and col_oreb and col_dreb and col_reb in df.columns and col_oreb in df.columns and col_dreb in df.columns:
    df["oreb_share_of_reb"] = np.where(df[col_reb] > 0, df[col_oreb] / df[col_reb], np.nan)
    df["dreb_share_of_reb"] = np.where(df[col_reb] > 0, df[col_dreb] / df[col_reb], np.nan)

# -------------------------
# 9) Round “rate-like” columns to 3 decimals (keep counts untouched)
# -------------------------
rate_cols = [c for c in df.columns if any(
    c.endswith(suf) for suf in ("_pct", "_rate")
) or c in (
    "threepar", "efg_pct", "ts_pct", "ast_to_tov", "usage_per_min",
    "oreb_share_of_reb", "dreb_share_of_reb"
)]
for c in rate_cols:
    if pd.api.types.is_numeric_dtype(df[c]):
        df[c] = df[c].round(3)

# per-game / per-40 / per-100: nice to keep 2–3 decimals
derived_prefixes = ("ppg","rpg","apg","spg","bpg","tovpg","pfpg","mpg")
for c in df.columns:
    if c.startswith(derived_prefixes) and pd.api.types.is_numeric_dtype(df[c]):
        df[c] = df[c].round(3)
    if c.endswith("_per_40") and pd.api.types.is_numeric_dtype(df[c]):
        df[c] = df[c].round(3)
    if c.endswith("_per_100") and pd.api.types.is_numeric_dtype(df[c]):
        df[c] = df[c].round(3)

# -------------------------
# 10) Save
# -------------------------
df.to_csv(OUT_DATA, index=False)
print("✅ wrote derived dataset:", OUT_DATA)

# Quick sanity
if col_pos and col_pos in df.columns:
    print("✅ missing position %:", float(df[col_pos].isna().mean()))


✅ wrote rename map: /content/processed_positions/pbpstats_derived_column_rename_map.csv
✅ column matching (None means not found):
          name: name
        season: season
          team: teamabbreviation
      position: position
  games_played: gamesplayed
       minutes: minutes
   possessions: None
           pts: points
           reb: rebounds
          oreb: None
          dreb: None
           ast: assists
           stl: steals
           blk: blocks
           tov: turnovers
            pf: fouls
           fgm: None
           fga: None
          fg3m: fg3m
          fg3a: fg3a
           ftm: None
           fta: fta


  df[c] = pd.to_numeric(df[c], errors="ignore")


AttributeError: 'int' object has no attribute 'fillna'

In [14]:
import pandas as pd
import numpy as np
from pathlib import Path

IN_PATH  = Path("/content/processed_positions/pbpstats_totals_player_by_season_cleaned_pre_percentiles.csv")
OUT_PATH = Path("/content/processed_positions/pbpstats_totals_player_by_season_derived_pre_percentiles.csv")

df = pd.read_csv(IN_PATH)
df.columns = [c.strip() for c in df.columns]

# -----------------------------
# helpers
# -----------------------------
def pick_col(df: pd.DataFrame, candidates: list[str]) -> str | None:
    cols_lower = {c.lower(): c for c in df.columns}
    for cand in candidates:
        if cand.lower() in cols_lower:
            return cols_lower[cand.lower()]
    return None

def series_or_zeros(df: pd.DataFrame, col: str | None, dtype=float) -> pd.Series:
    """Always returns a Series aligned to df.index (fixes the int.fillna() issue)."""
    if col is None or col not in df.columns:
        return pd.Series(0, index=df.index, dtype=dtype)
    s = pd.to_numeric(df[col], errors="coerce")
    return s.fillna(0).astype(dtype)

def safe_div(num: pd.Series, den: pd.Series, default=0.0) -> pd.Series:
    den0 = den.replace(0, np.nan)
    out = num / den0
    return out.fillna(default)

# -----------------------------
# locate key columns (flexible)
# -----------------------------
name_col   = pick_col(df, ["Name"])
season_col = pick_col(df, ["Season"])
team_col   = pick_col(df, ["TeamAbbreviation", "Team_Abbreviation", "Team"])
pos_col    = pick_col(df, ["Position", "Pos"])
gp_col     = pick_col(df, ["GamesPlayed", "GP"])
min_col    = pick_col(df, ["Minutes", "minutes_total", "Min"])

pts_col = pick_col(df, ["Points", "PTS", "pts_total"])
reb_col = pick_col(df, ["Rebounds", "REB", "reb_total"])
ast_col = pick_col(df, ["Assists", "AST", "ast_total"])
stl_col = pick_col(df, ["Steals", "STL", "stl_total"])
blk_col = pick_col(df, ["Blocks", "BLK", "blk_total"])
tov_col = pick_col(df, ["Turnovers", "TOV", "tov_total"])

# your provided mappings + common variants
poss_col = pick_col(df, ["TotalPoss", "total_poss", "possessions", "poss"])
oreb_col = pick_col(df, ["OffRebounds", "OReb", "oreb_total", "off_rebounds"])
dreb_col = pick_col(df, ["DefRebounds", "DReb", "dreb_total", "def_rebounds"])

fg2m_col = pick_col(df, ["FG2M", "fg2m"])
fg2a_col = pick_col(df, ["FG2A", "fg2a"])
fg3m_col = pick_col(df, ["FG3M", "3PM", "fg3m"])
fg3a_col = pick_col(df, ["FG3A", "3PA", "fg3a"])
ftm_col  = pick_col(df, ["FTM", "ftm"])
fta_col  = pick_col(df, ["FTA", "fta"])

# -----------------------------
# build Series (ALWAYS Series)
# -----------------------------
gp   = series_or_zeros(df, gp_col, dtype=float)
mins = series_or_zeros(df, min_col, dtype=float)

pts = series_or_zeros(df, pts_col, dtype=float)
reb = series_or_zeros(df, reb_col, dtype=float)
ast = series_or_zeros(df, ast_col, dtype=float)
stl = series_or_zeros(df, stl_col, dtype=float)
blk = series_or_zeros(df, blk_col, dtype=float)
tov = series_or_zeros(df, tov_col, dtype=float)

poss = series_or_zeros(df, poss_col, dtype=float)
oreb = series_or_zeros(df, oreb_col, dtype=float)
dreb = series_or_zeros(df, dreb_col, dtype=float)

fg2m = series_or_zeros(df, fg2m_col, dtype=float)
fg2a = series_or_zeros(df, fg2a_col, dtype=float)
fg3m = series_or_zeros(df, fg3m_col, dtype=float)
fg3a = series_or_zeros(df, fg3a_col, dtype=float)
ftm  = series_or_zeros(df, ftm_col,  dtype=float)
fta  = series_or_zeros(df, fta_col,  dtype=float)

fga = fg2a + fg3a
fgm = fg2m + fg3m

# -----------------------------
# derived stats (teacher R logic adapted)
# -----------------------------
out = df.copy()

# per-game
out["mpg"]   = safe_div(mins, gp, default=np.nan)
out["ppg"]   = safe_div(pts,  gp, default=np.nan)
out["rpg"]   = safe_div(reb,  gp, default=np.nan)
out["apg"]   = safe_div(ast,  gp, default=np.nan)
out["spg"]   = safe_div(stl,  gp, default=np.nan)
out["bpg"]   = safe_div(blk,  gp, default=np.nan)
out["tovpg"] = safe_div(tov,  gp, default=np.nan)

# per-40 mins
min_den = mins.replace(0, np.nan)
out["pts_per_40"]  = (pts  / min_den) * 40
out["reb_per_40"]  = (reb  / min_den) * 40
out["oreb_per_40"] = (oreb / min_den) * 40
out["dreb_per_40"] = (dreb / min_den) * 40
out["ast_per_40"]  = (ast  / min_den) * 40
out["stl_per_40"]  = (stl  / min_den) * 40
out["blk_per_40"]  = (blk  / min_den) * 40
out["tov_per_40"]  = (tov  / min_den) * 40

# per-100 possessions (use PBPStats TotalPoss if present)
poss_den = poss.replace(0, np.nan)
out["pts_per_100"]  = (pts  / poss_den) * 100
out["reb_per_100"]  = (reb  / poss_den) * 100
out["oreb_per_100"] = (oreb / poss_den) * 100
out["dreb_per_100"] = (dreb / poss_den) * 100
out["ast_per_100"]  = (ast  / poss_den) * 100
out["stl_per_100"]  = (stl  / poss_den) * 100
out["blk_per_100"]  = (blk  / poss_den) * 100
out["tov_per_100"]  = (tov  / poss_den) * 100

# shooting %
out["fg2_pct"] = safe_div(fg2m, fg2a, default=0.0)
out["fg3_pct"] = safe_div(fg3m, fg3a, default=0.0)
out["ft_pct"]  = safe_div(ftm,  fta,  default=0.0)
out["fg_pct"]  = safe_div(fgm,  fga,  default=0.0)

# rate stats / advanced
out["threepar"]  = safe_div(fg3a, fga, default=0.0)              # 3PA rate
out["fta_rate"]  = safe_div(fta,  fga, default=0.0)              # FTA per FGA
out["efg_pct"]   = safe_div((fgm + 0.5 * fg3m), fga, default=0.0)

tsa = fga + 0.44 * fta
out["ts_pct"]    = safe_div(pts, 2 * tsa, default=0.0)

out["usage_load"] = fga + 0.44 * fta + tov
out["usage"]      = safe_div(out["usage_load"], mins, default=0.0)  # possessions-used per minute proxy

out["ast_to_tov"] = np.where(tov > 0, safe_div(ast, tov, default=0.0), ast)

# teacher's player-only poss estimate for ast% / tov% (not true team-based AST%)
poss_est = fga + 0.44 * fta + tov + ast
out["possessions_estimated"] = poss_est
out["ast_pct"] = safe_div(ast, poss_est, default=0.0)
out["tov_pct"] = safe_div(tov, poss_est, default=0.0)

out["oreb_pct"] = safe_div(oreb, reb.replace(0, np.nan), default=0.0)
out["dreb_pct"] = safe_div(dreb, reb.replace(0, np.nan), default=0.0)

# -----------------------------
# rounding (3 decimals for rates/%)
# -----------------------------
rate_like = [c for c in out.columns if any(k in c.lower() for k in ["_pct", "rate", "threepar", "usage", "_per_40", "_per_100"])]
for c in rate_like:
    if pd.api.types.is_numeric_dtype(out[c]):
        out[c] = pd.to_numeric(out[c], errors="coerce").round(3)

out.to_csv(OUT_PATH, index=False)
print("✅ wrote:", OUT_PATH)

# quick sanity: show what columns were found for your key mappings
print("\nDetected columns:")
print("possessions:", poss_col, "| oreb:", oreb_col, "| dreb:", dreb_col, "| fg2m:", fg2m_col, "| fg2a:", fg2a_col)
print("fg3m:", fg3m_col, "| fg3a:", fg3a_col, "| ftm:", ftm_col, "| fta:", fta_col)


✅ wrote: /content/processed_positions/pbpstats_totals_player_by_season_derived_pre_percentiles.csv

Detected columns:
possessions: totalposs | oreb: offrebounds | dreb: defrebounds | fg2m: fg2m | fg2a: fg2a
fg3m: fg3m | fg3a: fg3a | ftm: None | fta: fta


In [15]:
import pandas as pd
import re
import unicodedata
from pathlib import Path

# ----------------------------
# Paths
# ----------------------------
MASTER_MAP_PATH = Path("/content/processed_positions/position_mapping_master.csv")
TOTALS_PATH     = Path("/content/processed_positions/pbpstats_totals_player_by_season_with_positions.csv")

OUT_MAP_PATH    = Path("/content/processed_positions/position_mapping_master_v2_manualpatch.csv")
OUT_TOTALS_PATH = Path("/content/processed_positions/pbpstats_totals_player_by_season_with_positions_manualpatch.csv")

# ----------------------------
# Normalizer (same idea as before)
# ----------------------------
def norm_name(s: str) -> str:
    s = "" if pd.isna(s) else str(s).strip().lower()
    s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii")
    s = re.sub(r"[^a-z\s\-']", "", s)
    s = re.sub(r"\s+", " ", s)
    return s

# ----------------------------
# Manual position list (tab-separated)
# ----------------------------
manual_txt = """Name\tPosition
Amy Atwell\tF
Asia (AD) Durr\tG
Astou Ndour-Fall\tC
Betnijah Laney-Hamilton\tF
Caitlin Bickle\tF
Celeste Taylor\tG
Charisma Osborne\tG
Chennedy Carter\tG
Cyesha Goree\tF
DiDi Richards\tG
Dyaisha Fair\tG
Ezinne Kalu\tG
Jakia Brown-Turner\tG
Jessika Carter\tF
Kaela Davis\tF
Kysre Gondrezick\tG
Mikiah Herbert Harrigan\tF
Nika Mühl\tG
Olivia Époupa\tG
Stephanie Soares\tC
"""

manual = pd.read_csv(pd.io.common.StringIO(manual_txt), sep="\t")
manual["Name"] = manual["Name"].astype(str).str.strip()
manual["Position"] = manual["Position"].astype(str).str.strip().str.upper()
manual["Name_norm"] = manual["Name"].apply(norm_name)

# ----------------------------
# 1) Update the master mapping (reusable asset)
# ----------------------------
mp = pd.read_csv(MASTER_MAP_PATH)
mp.columns = [c.strip() for c in mp.columns]

# Ensure Name_norm exists in mapping
if "Name_norm" not in mp.columns:
    if "Name" not in mp.columns:
        raise ValueError(f"Mapping file missing Name and Name_norm. Columns: {mp.columns.tolist()[:50]}")
    mp["Name_norm"] = mp["Name"].apply(norm_name)

# Choose / create a "Position_master" column
pos_master_col = "Position_master" if "Position_master" in mp.columns else None
if not pos_master_col:
    # Fall back: if mapping already has Position, use it as base
    if "Position" in mp.columns:
        mp["Position_master"] = mp["Position"]
    else:
        mp["Position_master"] = pd.NA
    pos_master_col = "Position_master"

# Upsert the manual positions into mapping (by Name_norm)
mp = mp.copy()
mp["_manual_pos"] = mp["Name_norm"].map(dict(zip(manual["Name_norm"], manual["Position"])))
mp[pos_master_col] = mp[pos_master_col].combine_first(mp["_manual_pos"])

# Optional: store provenance
if "Position_manual" not in mp.columns:
    mp["Position_manual"] = pd.NA
mp["Position_manual"] = mp["Position_manual"].combine_first(mp["_manual_pos"])

mp = mp.drop(columns=["_manual_pos"])

mp.to_csv(OUT_MAP_PATH, index=False)
print(f"✅ Wrote updated master mapping: {OUT_MAP_PATH}")
print(f"✅ Master mapping missing Position_master %: {mp[pos_master_col].isna().mean():.2%}")

# ----------------------------
# 2) Patch the totals-by-season file
# ----------------------------
df = pd.read_csv(TOTALS_PATH)
df.columns = [c.strip() for c in df.columns]

if "Name" not in df.columns:
    raise ValueError(f"Totals file missing Name. Columns: {df.columns.tolist()[:50]}")

# Make sure Position column exists
if "Position" not in df.columns:
    df["Position"] = pd.NA

# Exact-name patch first (keeps diacritics/parentheses as-is)
exact_map = dict(zip(manual["Name"], manual["Position"]))
mask_missing = df["Position"].isna()
df.loc[mask_missing, "Position"] = df.loc[mask_missing, "Name"].map(exact_map).combine_first(df.loc[mask_missing, "Position"])

# Norm-name patch second (catches name formatting differences)
df["Name_norm"] = df["Name"].apply(norm_name)
norm_map = dict(zip(manual["Name_norm"], manual["Position"]))
mask_missing = df["Position"].isna()
df.loc[mask_missing, "Position"] = df.loc[mask_missing, "Name_norm"].map(norm_map)

df.to_csv(OUT_TOTALS_PATH, index=False)
print(f"✅ Wrote patched totals file: {OUT_TOTALS_PATH}")
print(f"✅ Remaining missing Position rows: {df['Position'].isna().sum()} (of {len(df)})")

# If anything is still missing, list unique names
if df["Position"].isna().any():
    still = (df.loc[df["Position"].isna(), "Name"].dropna().drop_duplicates().sort_values().tolist())
    print("Still missing names:", still)



✅ Wrote updated master mapping: /content/processed_positions/position_mapping_master_v2_manualpatch.csv
✅ Master mapping missing Position_master %: 0.00%
✅ Wrote patched totals file: /content/processed_positions/pbpstats_totals_player_by_season_with_positions_manualpatch.csv
✅ Remaining missing Position rows: 0 (of 495)


In [16]:
import pandas as pd
import numpy as np
import re
import unicodedata
from pathlib import Path

# -----------------------------
# Paths (edit if you want)
# -----------------------------
IN_DATA = Path("/content/processed_positions/pbpstats_totals_player_by_season_derived_pre_percentiles.csv")

# Optional: re-fill Position from this mapping (recommended)
# Uses Name_norm join, so it works even if names have accents/parentheses
MASTER_MAP = Path("/content/processed_positions/position_mapping_master_v2_manualpatch.csv")

OUT_ALL = Path("/content/processed_positions/pbpstats_totals_player_by_season_with_pos_percentiles_ALL.csv")
OUT_FINAL = Path("/content/processed_positions/pbpstats_totals_player_by_season_with_pos_percentiles_MIN50.csv")

# -----------------------------
# Helpers
# -----------------------------
def pick_col(df: pd.DataFrame, candidates):
    cols_lower = {c.lower(): c for c in df.columns}
    for cand in candidates:
        if cand.lower() in cols_lower:
            return cols_lower[cand.lower()]
    return None

def norm_name(s: str) -> str:
    s = "" if pd.isna(s) else str(s).strip().lower()
    s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii")
    s = re.sub(r"[^a-z\s\-']", "", s)
    s = re.sub(r"\s+", " ", s)
    return s

def weighted_percentile_rank(values: pd.Series, weights: pd.Series) -> pd.Series:
    """
    Returns a percentile rank (0-100) for each value, weighted by weights.
    Ties get the same percentile using the tie group's midpoint cumulative weight.
    """
    out = pd.Series(np.nan, index=values.index, dtype=float)

    v = pd.to_numeric(values, errors="coerce")
    w = pd.to_numeric(weights, errors="coerce").fillna(0)
    mask = v.notna() & (w > 0)

    if mask.sum() == 0:
        return out

    tmp = pd.DataFrame({"v": v[mask], "w": w[mask]})

    total_w = tmp["w"].sum()
    if total_w <= 0:
        return out

    # Aggregate weights by unique value to handle ties cleanly
    agg = (
        tmp.groupby("v", as_index=False)["w"]
           .sum()
           .sort_values("v")
           .reset_index(drop=True)
    )
    agg["cum_w"] = agg["w"].cumsum()
    agg["mid_w"] = agg["cum_w"] - 0.5 * agg["w"]
    agg["pct"] = 100.0 * agg["mid_w"] / total_w

    pct_map = dict(zip(agg["v"], agg["pct"]))
    out.loc[mask] = v.loc[mask].map(pct_map)

    return out

# -----------------------------
# 1) Load
# -----------------------------
df = pd.read_csv(IN_DATA)
df.columns = [c.strip() for c in df.columns]

# Identify key columns (flexible)
name_col = pick_col(df, ["Name", "player"])
pos_col  = pick_col(df, ["Position", "athlete_position_abbreviation", "pos"])
min_col  = pick_col(df, ["Minutes", "minutes_total", "min"])
season_col = pick_col(df, ["Season", "season"])

if not name_col or not min_col:
    raise ValueError(f"Missing required columns. Found name={name_col}, minutes={min_col}. "
                     f"Columns sample: {df.columns.tolist()[:40]}")

# -----------------------------
# 2) (Optional) Re-fill Position from master mapping
# -----------------------------
if (pos_col is None) or df[pos_col].isna().any():
    if MASTER_MAP.exists():
        mp = pd.read_csv(MASTER_MAP)
        mp.columns = [c.strip() for c in mp.columns]

        # Ensure mapping has Name_norm and Position_master
        if "Name_norm" not in mp.columns:
            if "Name" not in mp.columns:
                raise ValueError("Master mapping missing Name and Name_norm.")
            mp["Name_norm"] = mp["Name"].apply(norm_name)

        pos_master_col = "Position_master" if "Position_master" in mp.columns else (
            "Position" if "Position" in mp.columns else None
        )
        if pos_master_col is None:
            raise ValueError("Master mapping missing Position_master/Position.")

        if "Name_norm" not in df.columns:
            df["Name_norm"] = df[name_col].apply(norm_name)

        df = df.merge(
            mp[["Name_norm", pos_master_col]].rename(columns={pos_master_col: "Position_from_map"}),
            on="Name_norm",
            how="left"
        )

        if pos_col is None:
            df["Position"] = df["Position_from_map"]
            pos_col = "Position"
        else:
            df[pos_col] = df[pos_col].combine_first(df["Position_from_map"])

        df = df.drop(columns=["Position_from_map"])
        print("✅ Re-filled Position from master mapping.")
    else:
        print("⚠️ MASTER_MAP not found — skipping Position refill.")

if pos_col is None:
    raise ValueError("No Position column found (and could not create one).")

# -----------------------------
# 3) Compute weighted percentiles by Position (weights = minutes)
# -----------------------------
# Metrics to percentile (R script equivalents)
metrics = {
    "usage_pctile_pos": "usage",
    "ts_pctile_pos": "ts_pct",
    "efg_pctile_pos": "efg_pct",
    "ast_pctile_pos": "ast_per_40",
    "tov_pctile_pos": "tov_per_40",
    "stl_pctile_pos": "stl_per_40",
    "blk_pctile_pos": "blk_per_40",
    "reb_pctile_pos": "reb_per_40",
}

# Verify which source cols exist (skip gracefully if missing)
available = {new: src for new, src in metrics.items() if src in df.columns}
missing = {new: src for new, src in metrics.items() if src not in df.columns}

if missing:
    print("⚠️ Skipping missing metric columns:")
    for new, src in missing.items():
        print(f"   - {new} (source '{src}' not found)")

weights = pd.to_numeric(df[min_col], errors="coerce").fillna(0)

for new_col, src_col in available.items():
    df[new_col] = (
        df.groupby(pos_col, group_keys=False)
          .apply(lambda g: weighted_percentile_rank(g[src_col], g[min_col]))
    )

# Nice rounding for percentiles
for c in available.keys():
    df[c] = pd.to_numeric(df[c], errors="coerce").round(1)

# -----------------------------
# 4) Filter minutes_total >= 50 (same as your R flow)
# -----------------------------
mins = pd.to_numeric(df[min_col], errors="coerce").fillna(0)
df_all = df.copy()
df_min50 = df.loc[mins >= 50].copy()

print(f"✅ Rows (all): {len(df_all)}")
print(f"✅ Rows (minutes>=50): {len(df_min50)}")
print(f"✅ Missing Position % (all): {df_all[pos_col].isna().mean():.2%}")

# -----------------------------
# 5) Export
# -----------------------------
df_all.to_csv(OUT_ALL, index=False)
df_min50.to_csv(OUT_FINAL, index=False)

print("✅ Wrote:", OUT_ALL)
print("✅ Wrote:", OUT_FINAL)


✅ Re-filled Position from master mapping.


  .apply(lambda g: weighted_percentile_rank(g[src_col], g[min_col]))
  .apply(lambda g: weighted_percentile_rank(g[src_col], g[min_col]))
  .apply(lambda g: weighted_percentile_rank(g[src_col], g[min_col]))
  .apply(lambda g: weighted_percentile_rank(g[src_col], g[min_col]))
  .apply(lambda g: weighted_percentile_rank(g[src_col], g[min_col]))
  .apply(lambda g: weighted_percentile_rank(g[src_col], g[min_col]))
  .apply(lambda g: weighted_percentile_rank(g[src_col], g[min_col]))
  .apply(lambda g: weighted_percentile_rank(g[src_col], g[min_col]))


✅ Rows (all): 495
✅ Rows (minutes>=50): 449
✅ Missing Position % (all): 4.04%
✅ Wrote: /content/processed_positions/pbpstats_totals_player_by_season_with_pos_percentiles_ALL.csv
✅ Wrote: /content/processed_positions/pbpstats_totals_player_by_season_with_pos_percentiles_MIN50.csv


In [17]:
import pandas as pd
import numpy as np
import re, unicodedata
from pathlib import Path

# -----------------------------
# Paths (edit if you want)
# -----------------------------
IN_DATA   = Path("/content/processed_positions/pbpstats_totals_player_by_season_derived_pre_percentiles.csv")
MASTER_MAP = Path("/content/processed_positions/position_mapping_master_v2_manualpatch.csv")

OUT_ALL   = Path("/content/processed_positions/pbpstats_totals_player_by_season_with_pos_and_season_percentiles_ALL.csv")
OUT_MIN50 = Path("/content/processed_positions/pbpstats_totals_player_by_season_with_pos_and_season_percentiles_MIN50.csv")

# -----------------------------
# Helpers
# -----------------------------
def pick_col(df: pd.DataFrame, candidates):
    cols_lower = {c.lower(): c for c in df.columns}
    for cand in candidates:
        if cand.lower() in cols_lower:
            return cols_lower[cand.lower()]
    return None

def norm_name(s: str) -> str:
    s = "" if pd.isna(s) else str(s).strip().lower()
    s = unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii")
    s = re.sub(r"[^a-z\s\-']", "", s)
    s = re.sub(r"\s+", " ", s)
    return s

def weighted_percentile_rank(values: pd.Series, weights: pd.Series) -> pd.Series:
    """
    Percentile rank (0-100), weighted by weights.
    Ties share the same percentile using the tie group's midpoint cumulative weight.
    """
    out = pd.Series(np.nan, index=values.index, dtype=float)
    v = pd.to_numeric(values, errors="coerce")
    w = pd.to_numeric(weights, errors="coerce").fillna(0)

    mask = v.notna() & (w > 0)
    if mask.sum() == 0:
        return out

    tmp = pd.DataFrame({"v": v[mask], "w": w[mask]})
    total_w = tmp["w"].sum()
    if total_w <= 0:
        return out

    agg = (
        tmp.groupby("v", as_index=False)["w"]
           .sum()
           .sort_values("v")
           .reset_index(drop=True)
    )
    agg["cum_w"] = agg["w"].cumsum()
    agg["mid_w"] = agg["cum_w"] - 0.5 * agg["w"]
    agg["pct"] = 100.0 * agg["mid_w"] / total_w
    pct_map = dict(zip(agg["v"], agg["pct"]))

    out.loc[mask] = v.loc[mask].map(pct_map)
    return out

def add_weighted_percentiles(df: pd.DataFrame, group_cols, metric_map, weight_col, out_suffix):
    """
    Adds columns like: {metric_name}{out_suffix}
    metric_map: {metric_name: source_col}
    group_cols: list of col names to group by
    """
    for metric_name, src_col in metric_map.items():
        if src_col not in df.columns:
            print(f"⚠️ Skipping {metric_name}: source col '{src_col}' not found.")
            continue

        out_col = f"{metric_name}{out_suffix}"
        out = pd.Series(np.nan, index=df.index, dtype=float)

        groups = df.groupby(group_cols, sort=False).groups
        for _, idx in groups.items():
            out.loc[idx] = weighted_percentile_rank(df.loc[idx, src_col], df.loc[idx, weight_col]).loc[idx]

        df[out_col] = pd.to_numeric(out, errors="coerce").round(1)

# -----------------------------
# Load
# -----------------------------
df = pd.read_csv(IN_DATA)
df.columns = [c.strip() for c in df.columns]

name_col   = pick_col(df, ["Name", "player"])
season_col = pick_col(df, ["Season", "season"])
pos_col    = pick_col(df, ["Position", "athlete_position_abbreviation", "pos"])
min_col    = pick_col(df, ["Minutes", "minutes_total", "min", "minutes"])

if not season_col or not min_col:
    raise ValueError(f"Missing required columns: season={season_col}, minutes={min_col}. "
                     f"Columns sample: {df.columns.tolist()[:40]}")

# -----------------------------
# (Optional) Re-fill Position from master mapping
# -----------------------------
if (pos_col is None) or df[pos_col].isna().any():
    if MASTER_MAP.exists():
        mp = pd.read_csv(MASTER_MAP)
        mp.columns = [c.strip() for c in mp.columns]
        if "Name_norm" not in mp.columns:
            mp["Name_norm"] = mp["Name"].apply(norm_name)

        pos_master_col = "Position_master" if "Position_master" in mp.columns else ("Position" if "Position" in mp.columns else None)
        if pos_master_col is None:
            raise ValueError("Master mapping missing Position_master/Position.")

        if "Name_norm" not in df.columns:
            df["Name_norm"] = df[name_col].apply(norm_name)

        df = df.merge(
            mp[["Name_norm", pos_master_col]].rename(columns={pos_master_col: "Position_from_map"}),
            on="Name_norm",
            how="left"
        )

        if pos_col is None:
            df["Position"] = df["Position_from_map"]
            pos_col = "Position"
        else:
            df[pos_col] = df[pos_col].combine_first(df["Position_from_map"])

        df = df.drop(columns=["Position_from_map"])
        print("✅ Re-filled Position from master mapping.")
    else:
        print("⚠️ MASTER_MAP not found — skipping Position refill.")

if pos_col is None:
    raise ValueError("No Position column found (and could not create one).")

# Ensure minutes numeric
df[min_col] = pd.to_numeric(df[min_col], errors="coerce").fillna(0)

# -----------------------------
# Metrics to percentile (your R equivalents)
# -----------------------------
metrics = {
    "usage_pctile": "usage",
    "ts_pctile":    "ts_pct",
    "efg_pctile":   "efg_pct",
    "ast_pctile":   "ast_per_40",
    "tov_pctile":   "tov_per_40",
    "stl_pctile":   "stl_per_40",
    "blk_pctile":   "blk_per_40",
    "reb_pctile":   "reb_per_40",
}

# -----------------------------
# A) Position percentiles across ALL seasons (what you already had)
#    Group: Position
# -----------------------------
add_weighted_percentiles(
    df=df,
    group_cols=[pos_col],
    metric_map=metrics,
    weight_col=min_col,
    out_suffix="_pos_all"
)

# -----------------------------
# B) Season percentiles (league-wide within season)
#    Group: Season
# -----------------------------
add_weighted_percentiles(
    df=df,
    group_cols=[season_col],
    metric_map=metrics,
    weight_col=min_col,
    out_suffix="_season"
)

# -----------------------------
# C) Season + Position percentiles (positional peers within same season)
#    Group: Season, Position
# -----------------------------
add_weighted_percentiles(
    df=df,
    group_cols=[season_col, pos_col],
    metric_map=metrics,
    weight_col=min_col,
    out_suffix="_pos_season"
)

# -----------------------------
# Filter minutes >= 50 (same as your R pipeline)
# -----------------------------
df_all = df.copy()
df_min50 = df.loc[df[min_col] >= 50].copy()

# -----------------------------
# Export
# -----------------------------
df_all.to_csv(OUT_ALL, index=False)
df_min50.to_csv(OUT_MIN50, index=False)

print("✅ Wrote ALL:  ", OUT_ALL)
print("✅ Wrote MIN50:", OUT_MIN50)
print("Rows ALL:", len(df_all), "| Rows MIN50:", len(df_min50))


✅ Re-filled Position from master mapping.
✅ Wrote ALL:   /content/processed_positions/pbpstats_totals_player_by_season_with_pos_and_season_percentiles_ALL.csv
✅ Wrote MIN50: /content/processed_positions/pbpstats_totals_player_by_season_with_pos_and_season_percentiles_MIN50.csv
Rows ALL: 495 | Rows MIN50: 449


In [18]:
import pandas as pd
import numpy as np
from pathlib import Path

IN_ALL   = Path("/content/processed_positions/pbpstats_totals_player_by_season_with_pos_and_season_percentiles_ALL.csv")
IN_MIN50 = Path("/content/processed_positions/pbpstats_totals_player_by_season_with_pos_and_season_percentiles_MIN50.csv")

OUT_ALL_WIDE   = Path("/content/processed_positions/pbpstats_player_by_season_flags_WIDE_ALL.csv")
OUT_MIN50_WIDE = Path("/content/processed_positions/pbpstats_player_by_season_flags_WIDE_MIN50.csv")

OUT_ALL_LONG   = Path("/content/processed_positions/pbpstats_player_by_season_signals_LONG_ALL.csv")
OUT_MIN50_LONG = Path("/content/processed_positions/pbpstats_player_by_season_signals_LONG_MIN50.csv")


LENS_SUFFIXES = ["pos_all", "season", "pos_season"]

# Map percentile lens -> recommended playbook lens
LENS_TO_PLAYBOOK = {
    "season": "Contract/Valuation",
    "pos_season": "Scouting",
    "pos_all": "Lineup Optimization",  # useful “stable baseline”; feel free to rename
}

# Map metric -> stat category (customize later when you auto-tag ALL pbpstats columns)
METRIC_TO_CATEGORY = {
    "usage": "Scoring / Role",
    "ts": "Scoring / Efficiency",
    "efg": "Scoring / Efficiency",
    "ast": "Assists",
    "tov": "Turnovers",
    "stl": "Defense / Activity",
    "blk": "Defense / Activity",
    "reb": "Rebounds",
}

def find_percentile_cols(df: pd.DataFrame) -> list[str]:
    # catches: usage_pctile_season, ts_pctile_pos_season, etc.
    cols = []
    for c in df.columns:
        if "pctile" in c.lower() and any(c.lower().endswith(sfx) for sfx in LENS_SUFFIXES):
            cols.append(c)
    return cols

def thresholds_for_col(series: pd.Series):
    """Handles either 0–1 or 0–100 percentile scales safely."""
    s = pd.to_numeric(series, errors="coerce")
    mx = s.dropna().max() if s.notna().any() else np.nan
    if pd.notna(mx) and mx <= 1.00001:
        return 0.90, 0.15  # proportions
    return 90.0, 15.0     # percentile points

def parse_metric_and_lens(col: str):
    c = col.lower()
    lens = next((sfx for sfx in LENS_SUFFIXES if c.endswith(sfx)), None)
    metric = c.split("_pctile_")[0] if "_pctile_" in c else c.split("pctile")[0].rstrip("_")
    return metric, lens

def add_flags_wide(df: pd.DataFrame) -> pd.DataFrame:
    pct_cols = find_percentile_cols(df)

    for col in pct_cols:
        hi, lo = thresholds_for_col(df[col])
        df[f"{col}_elite90"] = (pd.to_numeric(df[col], errors="coerce") >= hi).astype("Int64")
        df[f"{col}_low15"]   = (pd.to_numeric(df[col], errors="coerce") <= lo).astype("Int64")

    # Optional: summary counts per lens (nice for quick filtering)
    for lens in LENS_SUFFIXES:
        lens_cols = [c for c in pct_cols if c.lower().endswith(lens)]
        elite_cols = [f"{c}_elite90" for c in lens_cols]
        low_cols   = [f"{c}_low15" for c in lens_cols]
        if elite_cols:
            df[f"elite_count_{lens}"] = df[elite_cols].sum(axis=1).astype("Int64")
        if low_cols:
            df[f"low_count_{lens}"] = df[low_cols].sum(axis=1).astype("Int64")

    return df

def build_signals_long(df: pd.DataFrame) -> pd.DataFrame:
    pct_cols = find_percentile_cols(df)

    id_cols = [c for c in ["Name", "Season", "TeamAbbreviation", "Position", "Minutes", "GamesPlayed"] if c in df.columns]
    base = df[id_cols].copy()

    rows = []
    for col in pct_cols:
        metric, lens = parse_metric_and_lens(col)
        hi, lo = thresholds_for_col(df[col])

        tmp = base.copy()
        tmp["metric"] = metric
        tmp["lens"] = lens
        tmp["percentile"] = pd.to_numeric(df[col], errors="coerce")
        tmp["elite90_flag"] = (tmp["percentile"] >= hi).astype("Int64")
        tmp["low15_flag"]   = (tmp["percentile"] <= lo).astype("Int64")

        tmp["playbook_lens"] = LENS_TO_PLAYBOOK.get(lens, "General")
        tmp["stat_category"] = METRIC_TO_CATEGORY.get(metric, "Uncategorized")

        # One more “human-friendly” label
        tmp["signal"] = np.select(
            [tmp["elite90_flag"] == 1, tmp["low15_flag"] == 1],
            ["ELITE (>=90th)", "CONCERN (<=15th)"],
            default="Neutral"
        )

        rows.append(tmp)

    long_df = pd.concat(rows, ignore_index=True)

    # Optional: keep only “signals” (reduces noise a ton for Notion views)
    # long_df = long_df[long_df["signal"].isin(["ELITE (>=90th)", "CONCERN (<=15th)"])].copy()

    return long_df


def run(in_path: Path, out_wide: Path, out_long: Path):
    df = pd.read_csv(in_path)
    df.columns = [c.strip() for c in df.columns]

    df = add_flags_wide(df)
    df.to_csv(out_wide, index=False)

    long_df = build_signals_long(df)
    long_df.to_csv(out_long, index=False)

    print("✅ Wrote WIDE:", out_wide)
    print("✅ Wrote LONG:", out_long)
    print("WIDE shape:", df.shape, "| LONG shape:", long_df.shape)


run(IN_ALL, OUT_ALL_WIDE, OUT_ALL_LONG)
run(IN_MIN50, OUT_MIN50_WIDE, OUT_MIN50_LONG)


✅ Wrote WIDE: /content/processed_positions/pbpstats_player_by_season_flags_WIDE_ALL.csv
✅ Wrote LONG: /content/processed_positions/pbpstats_player_by_season_signals_LONG_ALL.csv
WIDE shape: (495, 370) | LONG shape: (11880, 8)
✅ Wrote WIDE: /content/processed_positions/pbpstats_player_by_season_flags_WIDE_MIN50.csv
✅ Wrote LONG: /content/processed_positions/pbpstats_player_by_season_signals_LONG_MIN50.csv
WIDE shape: (449, 370) | LONG shape: (10776, 8)


In [19]:
import pandas as pd
import numpy as np
from pathlib import Path

IN_ALL   = Path("/content/processed_positions/pbpstats_player_by_season_flags_WIDE_ALL.csv")
IN_MIN50 = Path("/content/processed_positions/pbpstats_player_by_season_flags_WIDE_MIN50.csv")

OUT_ALL_LONG_FIXED   = Path("/content/processed_positions/pbpstats_player_by_season_signals_LONG_ALL_FIXED.csv")
OUT_MIN50_LONG_FIXED = Path("/content/processed_positions/pbpstats_player_by_season_signals_LONG_MIN50_FIXED.csv")

LENS_SUFFIXES = ["pos_all", "season", "pos_season"]

LENS_TO_PLAYBOOK = {
    "season": "Contract/Valuation",
    "pos_season": "Scouting",
    "pos_all": "Lineup Optimization",
}

METRIC_TO_CATEGORY = {
    "usage": "Scoring / Role",
    "ts": "Scoring / Efficiency",
    "efg": "Scoring / Efficiency",
    "ast": "Assists / Creation",
    "tov": "Turnovers",
    "stl": "Defense / Activity",
    "blk": "Defense / Activity",
    "reb": "Rebounds",
    "oreb": "Second Chance",
    "dreb": "Rebounds",
}

def pick_col(df: pd.DataFrame, candidates: list[str]) -> str | None:
    cols_lower = {c.lower(): c for c in df.columns}
    for cand in candidates:
        if cand.lower() in cols_lower:
            return cols_lower[cand.lower()]
    return None

def find_percentile_cols(df: pd.DataFrame) -> list[str]:
    out = []
    for c in df.columns:
        cl = c.lower()
        if "pctile" in cl and any(cl.endswith(sfx) for sfx in LENS_SUFFIXES):
            out.append(c)
    return out

def thresholds_for_col(series: pd.Series):
    s = pd.to_numeric(series, errors="coerce")
    mx = s.dropna().max() if s.notna().any() else np.nan
    if pd.notna(mx) and mx <= 1.00001:
        return 0.90, 0.15
    return 90.0, 15.0

def parse_metric_and_lens(col: str):
    c = col.lower()
    lens = next((sfx for sfx in LENS_SUFFIXES if c.endswith(sfx)), None)
    # usage_pctile_pos_all -> "usage"
    metric = c.split("_pctile_")[0] if "_pctile_" in c else c.split("pctile")[0].rstrip("_")
    return metric, lens

def build_signals_long(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df.columns = [c.strip() for c in df.columns]

    pct_cols = find_percentile_cols(df)

    # Robust ID detection (handles Name vs name vs player, etc.)
    c_player = pick_col(df, ["Name", "player", "athlete_display_name"])
    c_season = pick_col(df, ["Season", "season"])
    c_team   = pick_col(df, ["TeamAbbreviation", "teamabbreviation", "team_abbreviation", "team"])
    c_pos    = pick_col(df, ["Position", "position", "athlete_position_abbreviation", "pos"])
    c_min    = pick_col(df, ["Minutes", "minutes", "minutes_total", "min"])
    c_gp     = pick_col(df, ["GamesPlayed", "gamesplayed", "gp", "games_played"])
    c_id     = pick_col(df, ["athlete_id", "player_id", "id"])

    base = pd.DataFrame(index=df.index)
    if c_id:     base["athlete_id"]   = df[c_id]
    if c_player: base["player"]       = df[c_player]
    if c_season: base["season"]       = df[c_season]
    if c_team:   base["team"]         = df[c_team]
    if c_pos:    base["position"]     = df[c_pos]
    if c_min:    base["minutes"]      = pd.to_numeric(df[c_min], errors="coerce")
    if c_gp:     base["games_played"] = pd.to_numeric(df[c_gp], errors="coerce")

    rows = []
    for col in pct_cols:
        metric, lens = parse_metric_and_lens(col)
        hi, lo = thresholds_for_col(df[col])

        tmp = base.copy()
        tmp["metric"] = metric
        tmp["lens"] = lens
        tmp["percentile"] = pd.to_numeric(df[col], errors="coerce")

        tmp["elite90_flag"] = (tmp["percentile"] >= hi).astype("Int64")
        tmp["low15_flag"]   = (tmp["percentile"] <= lo).astype("Int64")

        tmp["playbook_lens"] = LENS_TO_PLAYBOOK.get(lens, "General")
        tmp["stat_category"] = METRIC_TO_CATEGORY.get(metric, "Uncategorized")

        tmp["signal"] = np.select(
            [tmp["elite90_flag"] == 1, tmp["low15_flag"] == 1],
            ["ELITE (>=90th)", "CONCERN (<=15th)"],
            default="Neutral"
        )

        rows.append(tmp)

    long_df = pd.concat(rows, ignore_index=True)

    # OPTIONAL: uncomment to keep only “signals” (makes Notion views cleaner)
    # long_df = long_df[long_df["signal"].isin(["ELITE (>=90th)", "CONCERN (<=15th)"])].copy()

    return long_df

def run(in_path: Path, out_long: Path):
    df = pd.read_csv(in_path)
    long_df = build_signals_long(df)
    long_df.to_csv(out_long, index=False)
    print("✅ Wrote LONG (fixed):", out_long)
    print("LONG shape:", long_df.shape)
    print("Columns:", long_df.columns.tolist())

run(IN_ALL, OUT_ALL_LONG_FIXED)
run(IN_MIN50, OUT_MIN50_LONG_FIXED)


✅ Wrote LONG (fixed): /content/processed_positions/pbpstats_player_by_season_signals_LONG_ALL_FIXED.csv
LONG shape: (11880, 14)
Columns: ['player', 'season', 'team', 'position', 'minutes', 'games_played', 'metric', 'lens', 'percentile', 'elite90_flag', 'low15_flag', 'playbook_lens', 'stat_category', 'signal']
✅ Wrote LONG (fixed): /content/processed_positions/pbpstats_player_by_season_signals_LONG_MIN50_FIXED.csv
LONG shape: (10776, 14)
Columns: ['player', 'season', 'team', 'position', 'minutes', 'games_played', 'metric', 'lens', 'percentile', 'elite90_flag', 'low15_flag', 'playbook_lens', 'stat_category', 'signal']


In [26]:
import pandas as pd
import numpy as np
from pathlib import Path

# ----------------------------
# Paths
# ----------------------------
IN_PATH = Path("/content/processed_positions/pbpstats_totals_player_by_season_with_positions_manualpatch.csv")
OUT_DIR = Path("/content/processed_positions")
OUT_DIR.mkdir(parents=True, exist_ok=True)

OUT_WIDE = OUT_DIR / "pbpstats_totals_player_by_season_with_pos_percentiles_v2_ALL.csv"
OUT_LONG = OUT_DIR / "pbpstats_player_by_season_signals_LONG_v2_ALL.csv"

# ----------------------------
# Helpers: robust column picking
# ----------------------------
def pick_col(df, candidates):
    cols = {c.lower(): c for c in df.columns}
    for cand in candidates:
        if cand.lower() in cols:
            return cols[cand.lower()]
    return None

def ensure_numeric(df, col):
    if col is None:
        return
    df[col] = pd.to_numeric(df[col], errors="coerce")

# ----------------------------
# Weighted percentile rank (0-1)
# ----------------------------
def weighted_percentile_rank(values: pd.Series, weights: pd.Series) -> pd.Series:
    """
    Minutes-weighted percentile rank for each row.
    Returns float in [0, 1].
    """
    v = pd.to_numeric(values, errors="coerce")
    w = pd.to_numeric(weights, errors="coerce").fillna(0)

    out = pd.Series(np.nan, index=values.index, dtype="float64")
    m = v.notna() & (w > 0)
    if m.sum() == 0:
        return out

    vv = v[m]
    ww = w[m]

    arr = vv.to_numpy()
    order = np.argsort(arr, kind="mergesort")  # stable
    vv_s = vv.iloc[order]
    ww_s = ww.iloc[order]

    cumw = ww_s.cumsum()
    totalw = ww_s.sum()

    # percentile position at mid-weight point
    pct = (cumw - 0.5 * ww_s) / totalw
    pct = pct.clip(0, 1)

    # map back to original index
    out.loc[vv_s.index] = pct.to_numpy()
    return out

def add_weighted_percentiles(df, group_cols, metric_cols, weight_col, suffix):
    for mc in metric_cols:
        if mc not in df.columns:
            print(f"⚠️ Missing metric column (skipping): {mc}")
            continue
        df[f"{mc}_pctile_{suffix}"] = (
            df.groupby(group_cols, group_keys=False)
              .apply(lambda g: weighted_percentile_rank(g[mc], g[weight_col]))
        )
    return df

# ----------------------------
# Load
# ----------------------------
df = pd.read_csv(IN_PATH)
df.columns = [c.strip() for c in df.columns]

# Core columns (handle naming differences)
name_col   = pick_col(df, ["Name", "player", "Player"])
season_col = pick_col(df, ["Season", "season"])
team_col   = pick_col(df, ["TeamAbbreviation", "teamabbreviation", "Team_Abbreviation", "team"])
pos_col    = pick_col(df, ["Position", "athlete_position_abbreviation", "position"])
min_col    = pick_col(df, ["Minutes", "minutes", "minutes_total"])
gp_col     = pick_col(df, ["GamesPlayed", "games_played", "GP", "gp"])
poss_col   = pick_col(df, ["TotalPoss", "totalposs", "possessions", "Possessions"])

if not (name_col and season_col and pos_col and min_col):
    raise ValueError(
        "Missing required columns. Need at least Name, Season, Position, Minutes.\n"
        f"Found: name={name_col}, season={season_col}, position={pos_col}, minutes={min_col}"
    )

# Normalize key column names for downstream steps
rename = {name_col:"Name", season_col:"Season", pos_col:"Position", min_col:"Minutes"}
if team_col: rename[team_col] = "TeamAbbreviation"
if gp_col: rename[gp_col] = "GamesPlayed"
if poss_col: rename[poss_col] = "TotalPoss"

df = df.rename(columns=rename)

# numeric sanity
ensure_numeric(df, "Season")
ensure_numeric(df, "Minutes")
if "GamesPlayed" in df.columns: ensure_numeric(df, "GamesPlayed")
if "TotalPoss" in df.columns: ensure_numeric(df, "TotalPoss")

# ----------------------------
# (Optional) derive shot distribution frequencies if raw columns exist
# ----------------------------
# We try multiple likely PBPStats column names; if none exist, we skip gracefully.

def first_existing(cands):
    return pick_col(df, cands)

# total FGA candidates
fg2a = first_existing(["FG2A", "fg2a", "FGA2", "fg2a_total"])
fg3a = first_existing(["FG3A", "fg3a", "3PA", "fg3a_total"])

if fg2a and fg3a:
    df["FGA_total"] = pd.to_numeric(df[fg2a], errors="coerce") + pd.to_numeric(df[fg3a], errors="coerce")
else:
    df["FGA_total"] = np.nan

# rim attempts candidates (very site-dependent)
rim_fga = first_existing(["RimFGA", "rim_fga", "AtRimFGA", "atrim_fga", "Rim_FGA"])
corner3_fga = first_existing(["Corner3FGA", "corner3_fga", "Corner3PA", "corner_3_fga"])
arc3_fga = first_existing(["Arc3FGA", "arc3_fga", "NonCorner3FGA", "noncorner3_fga"])

# if we can, compute rim and 3 frequencies
if rim_fga and df["FGA_total"].notna().any():
    df["rim_freq"] = (pd.to_numeric(df[rim_fga], errors="coerce") / df["FGA_total"]).replace([np.inf, -np.inf], np.nan)

if fg3a and df["FGA_total"].notna().any():
    df["three_freq"] = (pd.to_numeric(df[fg3a], errors="coerce") / df["FGA_total"]).replace([np.inf, -np.inf], np.nan)

if corner3_fga and df["FGA_total"].notna().any():
    df["corner3_freq"] = (pd.to_numeric(df[corner3_fga], errors="coerce") / df["FGA_total"]).replace([np.inf, -np.inf], np.nan)

if arc3_fga and df["FGA_total"].notna().any():
    df["arc3_freq"] = (pd.to_numeric(df[arc3_fga], errors="coerce") / df["FGA_total"]).replace([np.inf, -np.inf], np.nan)

# ----------------------------
# Ensure key “basic per-game” stats exist (for league averages)
# ----------------------------
# Try common totals columns; if not found, skip.
pts = pick_col(df, ["Points", "PTS", "pts_total", "pts"])
reb = pick_col(df, ["Rebounds", "REB", "reb_total", "reb"])
ast = pick_col(df, ["Assists", "AST", "ast_total", "ast"])
tov = pick_col(df, ["Turnovers", "TOV", "tov_total", "tov"])
stl = pick_col(df, ["Steals", "STL", "stl_total", "stl"])
blk = pick_col(df, ["Blocks", "BLK", "blk_total", "blk"])

if "GamesPlayed" in df.columns:
    gp = "GamesPlayed"
    for col, outname in [(pts, "ppg"), (reb, "rpg"), (ast, "apg"), (tov, "tovpg"), (stl, "spg"), (blk, "bpg")]:
        if col and outname not in df.columns:
            df[outname] = pd.to_numeric(df[col], errors="coerce") / df[gp].replace({0: np.nan})
    if "mpg" not in df.columns:
        df["mpg"] = df["Minutes"] / df[gp].replace({0: np.nan})

# ----------------------------
# Define metrics you requested for position percentiles
# ----------------------------
# These names must exist as columns in df; we include a few derived ones above (rim_freq, three_freq).

requested_metrics = []

def add_if_exists(colname):
    if colname in df.columns:
        requested_metrics.append(colname)

# OREB%, DREB% (common names)
for c in ["oreb_pct", "dreb_pct", "reb_per_40"]:
    add_if_exists(c)

# turnover + assist rates (common names)
for c in ["tov_pct", "ast_pct", "usage", "ts_pct", "efg_pct"]:
    add_if_exists(c)

# creation-ish rates you might have
for c in ["ast_per_40", "tov_per_40", "ast_per_100", "tov_per_100"]:
    add_if_exists(c)

# shot distribution
for c in ["rim_freq", "three_freq", "corner3_freq", "arc3_freq"]:
    add_if_exists(c)

# If you expected columns but they aren't here, print a clue
print("✅ Metrics to percentile:", requested_metrics)

# ----------------------------
# Compute weighted percentiles by position
# ----------------------------
# Best practice: use Position-within-season ("pos_season") for most comparisons.
# We'll also compute Position-across-all-seasons ("pos_all") because it’s useful as a stable baseline.

df = add_weighted_percentiles(
    df, group_cols=["Position"], metric_cols=requested_metrics, weight_col="Minutes", suffix="pos_all"
)

df = add_weighted_percentiles(
    df, group_cols=["Season", "Position"], metric_cols=requested_metrics, weight_col="Minutes", suffix="pos_season"
)

# Optional: league-wide within season (not position-adjusted, but useful)
df = add_weighted_percentiles(
    df, group_cols=["Season"], metric_cols=requested_metrics, weight_col="Minutes", suffix="season"
)

# ----------------------------
# League averages by season (weighted)
# ----------------------------
def pick_weight_for_metric(metric):
    m = metric.lower()
    if "per_40" in m:
        return "Minutes"
    if ("per_100" in m or "per100" in m or "per_poss" in m) and ("TotalPoss" in df.columns):
        return "TotalPoss"
    # for % rates + frequencies: minutes weighting is usually fine in player-season tables
    return "Minutes"

league_avg_frames = []
for m in requested_metrics + [x for x in ["ppg","rpg","apg","tovpg","spg","bpg","mpg"] if x in df.columns]:
    wcol = pick_weight_for_metric(m)
    if wcol not in df.columns:
        continue
    tmp = df[["Season", m, wcol]].copy()
    tmp[m] = pd.to_numeric(tmp[m], errors="coerce")
    tmp[wcol] = pd.to_numeric(tmp[wcol], errors="coerce").fillna(0)
    tmp = tmp.dropna(subset=[m])
    if tmp.empty:
        continue

    # weighted mean by season
    agg = (
        tmp.groupby("Season")
           .apply(lambda g: np.average(g[m], weights=np.clip(g[wcol].to_numpy(), 0, None)))
           .rename(f"league_avg_{m}")
           .reset_index()
    )
    league_avg_frames.append(agg)

league_avgs = league_avg_frames[0]
for a in league_avg_frames[1:]:
    league_avgs = league_avgs.merge(a, on="Season", how="outer")

df = df.merge(league_avgs, on="Season", how="left")

# ----------------------------
# Build Signals LONG (ALL)
# ----------------------------
def thresholds_for_series(s: pd.Series):
    s = pd.to_numeric(s, errors="coerce")
    mx = s.dropna().max() if s.notna().any() else np.nan
    # handle 0–1 vs 0–100 percentiles
    if pd.notna(mx) and mx <= 1.00001:
        return 0.90, 0.15
    return 90.0, 15.0

id_cols = [c for c in ["Name","Season","TeamAbbreviation","Position","Minutes","GamesPlayed"] if c in df.columns]

rows = []
lenses = ["pos_all", "pos_season", "season"]

# We'll include both percentiles AND league average comparisons for the metric value.
for metric in requested_metrics + [x for x in ["ppg","rpg","apg","tovpg","spg","bpg","mpg"] if x in df.columns]:
    league_col = f"league_avg_{metric}" if f"league_avg_{metric}" in df.columns else None

    for lens in lenses:
        pct_col = f"{metric}_pctile_{lens}"
        if pct_col not in df.columns:
            continue

        hi, lo = thresholds_for_series(df[pct_col])

        tmp = df[id_cols].copy()
        tmp["metric"] = metric
        tmp["lens"] = lens
        tmp["value"] = pd.to_numeric(df[metric], errors="coerce")
        tmp["percentile"] = pd.to_numeric(df[pct_col], errors="coerce")

        tmp["elite90_flag"] = (tmp["percentile"] >= hi).astype("Int64")
        tmp["low15_flag"]   = (tmp["percentile"] <= lo).astype("Int64")

        if league_col:
            tmp["league_avg"] = pd.to_numeric(df[league_col], errors="coerce")
            tmp["delta_vs_league"] = tmp["value"] - tmp["league_avg"]
        else:
            tmp["league_avg"] = np.nan
            tmp["delta_vs_league"] = np.nan

        tmp["signal"] = np.select(
            [tmp["elite90_flag"] == 1, tmp["low15_flag"] == 1],
            ["ELITE (>=90th)", "CONCERN (<=15th)"],
            default="Neutral"
        )
        rows.append(tmp)

signals_long = pd.concat(rows, ignore_index=True)

# ----------------------------
# Save outputs
# ----------------------------
df.to_csv(OUT_WIDE, index=False)
signals_long.to_csv(OUT_LONG, index=False)

print("✅ Wrote WIDE:", OUT_WIDE)
print("✅ Wrote LONG:", OUT_LONG)
print("WIDE shape:", df.shape, "| LONG shape:", signals_long.shape)


import pandas as pd
import numpy as np
from pathlib import Path

IN_ALL   = Path("/content/processed_positions/pbpstats_player_by_season_flags_WIDE_ALL.csv")
IN_MIN50 = Path("/content/processed_positions/pbpstats_player_by_season_flags_WIDE_MIN50.csv")

OUT_ALL_LONG   = Path("/content/processed_positions/pbpstats_player_by_season_signals_LONG_ALL.csv")
OUT_MIN50_LONG = Path("/content/processed_positions/pbpstats_player_by_season_signals_LONG_MIN50.csv")

LENS_SUFFIXES = ["pos_all", "season", "pos_season"]

LENS_TO_PLAYBOOK = {
    "season": "Contract/Valuation",
    "pos_season": "Scouting",
    "pos_all": "Lineup Optimization",
}

# Optional: extend these as you expand your metric set
METRIC_TO_CATEGORY = {
    "usage": "Scoring / Role",
    "ts": "Scoring / Efficiency",
    "efg": "Scoring / Efficiency",
    "ast": "Assists / Creation",
    "tov": "Turnovers",
    "stl": "Defense / Activity",
    "blk": "Defense / Activity",
    "reb": "Rebounds",
    "oreb": "Second Chance",
    "dreb": "Rebounds",
    "rim_freq": "Shot Distribution",
    "three_freq": "Shot Distribution",
}

def pick_col(df: pd.DataFrame, candidates: list[str]) -> str | None:
    cols_lower = {c.lower(): c for c in df.columns}
    for cand in candidates:
        if cand.lower() in cols_lower:
            return cols_lower[cand.lower()]
    return None

def find_percentile_cols(df: pd.DataFrame) -> list[str]:
    out = []
    for c in df.columns:
        cl = c.lower()
        if "pctile" in cl and any(cl.endswith(sfx) for sfx in LENS_SUFFIXES):
            out.append(c)
    return out

def thresholds_for_col(series: pd.Series):
    """Handles both 0–1 and 0–100 percentile scales."""
    s = pd.to_numeric(series, errors="coerce")
    mx = s.dropna().max() if s.notna().any() else np.nan
    if pd.notna(mx) and mx <= 1.00001:
        return 0.90, 0.15
    return 90.0, 15.0

def parse_metric_and_lens(col: str):
    c = col.lower()
    lens = next((sfx for sfx in LENS_SUFFIXES if c.endswith(sfx)), None)
    metric = c.split("_pctile_")[0] if "_pctile_" in c else c.split("pctile")[0].rstrip("_")
    return metric, lens

def _lc_map(df: pd.DataFrame) -> dict[str, str]:
    """lowercase -> original column name"""
    return {c.lower(): c for c in df.columns}

def resolve_metric_value_col(df: pd.DataFrame, metric: str) -> str | None:
    """
    Try to find the "value" column corresponding to a metric name.
    Example: metric='ast' -> prefer ast_per_40, then ast_per_100, then ast_pg, then ast.
    """
    lower_map = _lc_map(df)

    # High-confidence candidate patterns
    candidates = [
        f"{metric}_per_40",
        f"{metric}_per40",
        f"{metric}_per_100",
        f"{metric}_per100",
        f"{metric}_per_game",
        f"{metric}_pg",
        metric,
    ]

    # Some common aliases / special cases (extend freely)
    alias_candidates = {
        "ts": ["ts_pct", "true_shooting", "true_shooting_pct", "ts"],
        "efg": ["efg_pct", "effective_fg_pct", "efg"],
        "usage": ["usage", "usg", "usg_pct", "usage_rate"],
        "tov": ["tov_per_40", "tov_per_100", "tov_pg", "tov", "turnovers"],
        "ast": ["ast_per_40", "ast_per_100", "ast_pg", "ast", "assists"],
        "reb": ["reb_per_40", "reb_per_100", "reb_pg", "reb", "rebounds"],
        "oreb": ["oreb_per_40", "oreb_pg", "oreb", "off_reb", "offreb"],
        "dreb": ["dreb_per_40", "dreb_pg", "dreb", "def_reb", "defreb"],
    }

    search_list = alias_candidates.get(metric, []) + candidates

    for cand in search_list:
        if cand.lower() in lower_map:
            return lower_map[cand.lower()]

    # Fallback: try "contains" match (careful but helpful)
    metric_l = metric.lower()
    contains = [c for c in df.columns if metric_l in c.lower() and "pctile" not in c.lower()]
    if len(contains) == 1:
        return contains[0]

    return None

def build_signals_long(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df.columns = [c.strip() for c in df.columns]

    pct_cols = find_percentile_cols(df)

    # Robust ID detection
    c_player = pick_col(df, ["Name", "player", "athlete_display_name"])
    c_season = pick_col(df, ["Season", "season"])
    c_team   = pick_col(df, ["TeamAbbreviation", "teamabbreviation", "team_abbreviation", "team"])
    c_pos    = pick_col(df, ["Position", "position", "athlete_position_abbreviation", "pos"])
    c_min    = pick_col(df, ["Minutes", "minutes", "minutes_total", "min"])
    c_gp     = pick_col(df, ["GamesPlayed", "gamesplayed", "gp", "games_played"])
    c_id     = pick_col(df, ["athlete_id", "player_id", "id"])

    base = pd.DataFrame(index=df.index)
    if c_id:     base["athlete_id"]   = df[c_id]
    if c_player: base["player"]       = df[c_player]
    if c_season: base["season"]       = df[c_season]
    if c_team:   base["team"]         = df[c_team]
    if c_pos:    base["position"]     = df[c_pos]
    if c_min:    base["minutes"]      = pd.to_numeric(df[c_min], errors="coerce")
    if c_gp:     base["games_played"] = pd.to_numeric(df[c_gp], errors="coerce")

    # Cache metric -> value column resolution
    metric_value_col_cache: dict[str, str | None] = {}

    rows = []
    for col in pct_cols:
        metric, lens = parse_metric_and_lens(col)
        hi, lo = thresholds_for_col(df[col])

        # resolve metric value column once per metric
        if metric not in metric_value_col_cache:
            metric_value_col_cache[metric] = resolve_metric_value_col(df, metric)

        value_col = metric_value_col_cache[metric]

        tmp = base.copy()
        tmp["metric"] = metric
        tmp["lens"] = lens
        tmp["percentile"] = pd.to_numeric(df[col], errors="coerce")

        # NEW: metric value alongside percentile
        tmp["metric_value"] = (
            pd.to_numeric(df[value_col], errors="coerce") if value_col else np.nan
        )
        tmp["metric_value_col"] = value_col  # helpful for debugging / trust

        tmp["elite90_flag"] = (tmp["percentile"] >= hi).astype("Int64")
        tmp["low15_flag"]   = (tmp["percentile"] <= lo).astype("Int64")

        tmp["playbook_lens"] = LENS_TO_PLAYBOOK.get(lens, "General")
        tmp["stat_category"] = METRIC_TO_CATEGORY.get(metric, "Uncategorized")

        tmp["signal"] = np.select(
            [tmp["elite90_flag"] == 1, tmp["low15_flag"] == 1],
            ["ELITE (>=90th)", "CONCERN (<=15th)"],
            default="Neutral"
        )

        rows.append(tmp)

    long_df = pd.concat(rows, ignore_index=True)

    # OPTIONAL: keep only signals (reduces noise)
    # long_df = long_df[long_df["signal"].isin(["ELITE (>=90th)", "CONCERN (<=15th)"])].copy()

    return long_df

def run(in_path: Path, out_path: Path):
    df = pd.read_csv(in_path)
    long_df = build_signals_long(df)
    long_df.to_csv(out_path, index=False)
    print("✅ Wrote LONG with metric_value:", out_path)
    print("LONG shape:", long_df.shape)

    # Quick sanity check: how many metrics failed to map a value col?
    unmapped = long_df["metric_value_col"].isna().mean()
    print(f"ℹ️ metric_value_value_col missing rate: {unmapped:.1%}")

run(IN_ALL, OUT_ALL_LONG)
run(IN_MIN50, OUT_MIN50_LONG)

✅ Metrics to percentile: ['rim_freq', 'three_freq', 'corner3_freq', 'arc3_freq']


  .apply(lambda g: weighted_percentile_rank(g[mc], g[weight_col]))
  .apply(lambda g: weighted_percentile_rank(g[mc], g[weight_col]))
  .apply(lambda g: weighted_percentile_rank(g[mc], g[weight_col]))
  .apply(lambda g: weighted_percentile_rank(g[mc], g[weight_col]))
  .apply(lambda g: weighted_percentile_rank(g[mc], g[weight_col]))
  .apply(lambda g: weighted_percentile_rank(g[mc], g[weight_col]))
  .apply(lambda g: weighted_percentile_rank(g[mc], g[weight_col]))
  .apply(lambda g: weighted_percentile_rank(g[mc], g[weight_col]))
  .apply(lambda g: weighted_percentile_rank(g[mc], g[weight_col]))
  .apply(lambda g: weighted_percentile_rank(g[mc], g[weight_col]))
  .apply(lambda g: weighted_percentile_rank(g[mc], g[weight_col]))
  .apply(lambda g: weighted_percentile_rank(g[mc], g[weight_col]))
  .apply(lambda g: np.average(g[m], weights=np.clip(g[wcol].to_numpy(), 0, None)))
  .apply(lambda g: np.average(g[m], weights=np.clip(g[wcol].to_numpy(), 0, None)))
  .apply(lambda g: np.average(

✅ Wrote WIDE: /content/processed_positions/pbpstats_totals_player_by_season_with_pos_percentiles_v2_ALL.csv
✅ Wrote LONG: /content/processed_positions/pbpstats_player_by_season_signals_LONG_v2_ALL.csv
WIDE shape: (495, 287) | LONG shape: (5940, 15)
✅ Wrote LONG with metric_value: /content/processed_positions/pbpstats_player_by_season_signals_LONG_ALL.csv
LONG shape: (11880, 16)
ℹ️ metric_value_value_col missing rate: 0.0%
✅ Wrote LONG with metric_value: /content/processed_positions/pbpstats_player_by_season_signals_LONG_MIN50.csv
LONG shape: (10776, 16)
ℹ️ metric_value_value_col missing rate: 0.0%


In [30]:
from pathlib import Path
import pandas as pd
import numpy as np
import re

BASE = Path("/content/processed_positions")

# ✅ Point this at the WIDE dataset that already contains percentiles (pos/season)
IN_WIDE = BASE / "/content/processed_positions/pbpstats_totals_player_by_season_with_pos_percentiles_v2_ALL.csv"

# ✅ Output
OUT_LONG = BASE / "pbpstats_player_by_season_signals_LONG_ALL_with_metric_value.csv"

df = pd.read_csv(IN_WIDE)
df.columns = [c.strip() for c in df.columns]

def first_existing(cols, candidates):
    lower = {c.lower(): c for c in cols}
    for cand in candidates:
        if cand.lower() in lower:
            return lower[cand.lower()]
    return None

# --- core id columns (keep player name from getting dropped) ---
player_col  = first_existing(df.columns, ["player", "name"])
season_col  = first_existing(df.columns, ["season"])
team_col    = first_existing(df.columns, ["teamabbreviation", "team"])
pos_col     = first_existing(df.columns, ["position"])
min_col     = first_existing(df.columns, ["minutes_total", "minutes"])

id_vars = [c for c in [player_col, season_col, team_col, pos_col] if c is not None]
missing_ids = [x for x in ["player/name", "season"] if (x == "season" and season_col is None) or (x == "player/name" and player_col is None)]
if missing_ids:
    raise ValueError(f"Missing required id columns: {missing_ids}. Found columns sample: {df.columns[:40].tolist()}")

# --- detect percentile columns ---
pctile_cols = [c for c in df.columns if re.search(r"(pctile|percentile)", c, flags=re.I)]
if not pctile_cols:
    raise ValueError("No percentile columns found. Expected columns containing 'pctile' or 'percentile'.")

# --- parse pctile column -> (metric_stub, lens) ---
def parse_pctile_col(col: str):
    c = col.lower()
    lens = "unknown"
    stub = col
    for suffix, l in [
        ("_pctile_pos", "pos"),
        ("_pctile_position", "pos"),
        ("_pctile_season", "season"),
        ("_percentile_pos", "pos"),
        ("_percentile_season", "season"),
    ]:
        if c.endswith(suffix):
            lens = l
            stub = col[: -len(suffix)]
            return stub, lens
    # fallback: strip first occurrence of _pctile/_percentile and anything after
    stub = re.split(r"_(pctile|percentile)", col, flags=re.I)[0]
    return stub, lens

# --- map metric_stub -> metric_value_col (so LONG has the actual stat value next to the percentile) ---
SPECIAL = {
    "ast": "ast_per_40",
    "tov": "tov_per_40",
    "reb": "reb_per_40",
    "oreb": "oreb_per_40",
    "dreb": "dreb_per_40",
    "stl": "stl_per_40",
    "blk": "blk_per_40",
    "pts": "pts_per_40",
}

def resolve_value_col(metric_stub: str):
    # 1) exact match
    if metric_stub in df.columns:
        return metric_stub
    # 2) special mapping (if present)
    if metric_stub in SPECIAL and SPECIAL[metric_stub] in df.columns:
        return SPECIAL[metric_stub]
    # 3) try common suffixes
    suffixes = ["_per_40", "_per40", "_per_100", "_per100", "_rate", "_pct", "_pctg"]
    for suf in suffixes:
        cand = f"{metric_stub}{suf}"
        if cand in df.columns:
            return cand
    # 4) give up (we can still output percentile rows without metric_value)
    return None

map_rows = []
for pc in pctile_cols:
    stub, lens = parse_pctile_col(pc)
    value_col = resolve_value_col(stub)
    map_rows.append({
        "pctile_col": pc,
        "metric_stub": stub,
        "lens": lens,
        "metric_value_col": value_col
    })

map_df = pd.DataFrame(map_rows)

# --- 1) melt percentiles ---
pct_long = df[id_vars + pctile_cols].melt(
    id_vars=id_vars,
    var_name="pctile_col",
    value_name="metric_pctile"
)

pct_long = pct_long.merge(map_df, on="pctile_col", how="left")

# --- 2) melt metric values (only for resolvable metrics) ---
value_cols = sorted({c for c in map_df["metric_value_col"].dropna().unique().tolist() if c in df.columns})
if value_cols:
    val_long = df[id_vars + value_cols].melt(
        id_vars=id_vars,
        var_name="metric_value_col",
        value_name="metric_value"
    )
    long = pct_long.merge(val_long, on=id_vars + ["metric_value_col"], how="left")
else:
    long = pct_long.copy()
    long["metric_value"] = np.nan

# --- 3) league averages (by season) for metric_value ---
if value_cols:
    league_avg = (
        df.groupby(season_col)[value_cols]
          .mean(numeric_only=True)
          .reset_index()
          .melt(id_vars=[season_col], var_name="metric_value_col", value_name="league_avg_value")
    )
    long = long.merge(league_avg, on=[season_col, "metric_value_col"], how="left")
else:
    long["league_avg_value"] = np.nan

# --- 4) flags ---
long["flag_elite_90"] = long["metric_pctile"].astype(float) >= 0.90
long["flag_low_15"]   = long["metric_pctile"].astype(float) <= 0.15

# --- 5) tidy / ordering ---
# Keep a nice human-readable metric name too
long["metric"] = long["metric_stub"]

# Optional: put lens + metric up front
front = id_vars + ["lens", "metric", "metric_value", "league_avg_value", "metric_pctile", "flag_elite_90", "flag_low_15"]
rest = [c for c in long.columns if c not in front]
long = long[front + rest]

long.to_csv(OUT_LONG, index=False)
print("✅ Wrote:", OUT_LONG)
print("✅ Rows:", len(long))
print("✅ Example columns:", long.columns[:18].tolist())


✅ Wrote: /content/processed_positions/pbpstats_player_by_season_signals_LONG_ALL_with_metric_value.csv
✅ Rows: 5940
✅ Example columns: ['Name', 'Season', 'TeamAbbreviation', 'Position', 'lens', 'metric', 'metric_value', 'league_avg_value', 'metric_pctile', 'flag_elite_90', 'flag_low_15', 'pctile_col', 'metric_stub', 'metric_value_col']


In [31]:
# ============================================
# EXPORT PACK: 4 Canonical Outputs
#   1) WIDE_ALL
#   2) WIDE_MIN50
#   3) LONG_ALL
#   4) LONG_MIN50
# ============================================

from pathlib import Path
import pandas as pd
import numpy as np
import re

# ----------------------------
# Paths
# ----------------------------
BASE_DIR = Path("/content/processed_positions")
IN_WIDE_ALL = BASE_DIR / "pbpstats_totals_player_by_season_with_pos_percentiles_v2_ALL.csv"

OUT_DIR = BASE_DIR / "canonical"
OUT_DIR.mkdir(parents=True, exist_ok=True)

OUT_WIDE_ALL  = OUT_DIR / "WIDE_ALL.csv"
OUT_WIDE_MIN50 = OUT_DIR / "WIDE_MIN50.csv"
OUT_LONG_ALL  = OUT_DIR / "LONG_ALL.csv"
OUT_LONG_MIN50 = OUT_DIR / "LONG_MIN50.csv"

# ----------------------------
# Helpers
# ----------------------------
def first_existing(df: pd.DataFrame, candidates: list[str]) -> str | None:
    lower = {c.lower(): c for c in df.columns}
    for cand in candidates:
        if cand.lower() in lower:
            return lower[cand.lower()]
    return None

def build_long(df_wide: pd.DataFrame) -> pd.DataFrame:
    """
    LONG format:
      id cols + metric + lens + percentile + metric_value (+ league_avg_value/delta if present)
    Percentile columns are detected via '*pctile*' naming.
    metric_value is pulled from the base metric column (same name as metric).
    League average is pulled if a 'league_avg_<metric>' column exists.
    """
    # --- identify ID columns (keep whatever exists)
    id_candidates = [
        "athlete_id", "player_id",
        "name", "player",
        "season",
        "teamabbreviation", "team_abbreviation", "team",
        "position", "athlete_position_abbreviation"
    ]
    id_cols = [c for c in id_candidates if c in df_wide.columns]
    if "name" not in id_cols and "Name" in df_wide.columns:
        id_cols.append("Name")
    if "season" not in id_cols and "Season" in df_wide.columns:
        id_cols.append("Season")

    # Guardrail
    if not id_cols:
        raise ValueError("Couldn't detect ID columns. Add at least ['name','season'] to id_candidates.")

    # --- percentile columns
    pctile_cols = [c for c in df_wide.columns if "pctile" in c.lower()]
    if not pctile_cols:
        raise ValueError("No percentile columns found (expected columns containing 'pctile').")

    # Melt percentiles
    long_p = df_wide[id_cols + pctile_cols].melt(
        id_vars=id_cols,
        var_name="pctile_col",
        value_name="percentile"
    )

    # lens + metric name parsing
    def parse_lens_and_metric(col: str) -> tuple[str, str]:
        s = col.lower()
        lens = "all"
        if s.endswith("_pctile_pos") or s.endswith("pctile_pos"):
            lens = "pos"
            metric = re.sub(r"_?pctile_pos$", "", s)
        elif s.endswith("_pctile_season") or s.endswith("pctile_season"):
            lens = "season"
            metric = re.sub(r"_?pctile_season$", "", s)
        else:
            metric = re.sub(r"_?pctile$", "", s)
        return lens, metric

    parsed = long_p["pctile_col"].apply(parse_lens_and_metric)
    long_p["lens"] = parsed.apply(lambda x: x[0])
    long_p["metric"] = parsed.apply(lambda x: x[1])

    # --- metric_value: melt base metric columns that exist
    base_metrics = sorted(set(long_p["metric"].unique()))
    value_cols = [m for m in base_metrics if m in df_wide.columns]

    if value_cols:
        long_v = df_wide[id_cols + value_cols].melt(
            id_vars=id_cols,
            var_name="metric",
            value_name="metric_value"
        )
        long_p = long_p.merge(long_v, on=id_cols + ["metric"], how="left")
    else:
        long_p["metric_value"] = np.nan

    # --- league averages (optional): expect columns like league_avg_<metric>
    league_cols = [c for c in df_wide.columns if c.lower().startswith("league_avg_")]
    if league_cols:
        # Normalize to long with metric extracted after prefix
        tmp = df_wide[id_cols + league_cols].copy()
        ren = {c: c.lower() for c in league_cols}
        tmp = tmp.rename(columns=ren)
        league_cols_lower = [c.lower() for c in league_cols]

        long_la = tmp[id_cols + league_cols_lower].melt(
            id_vars=id_cols,
            var_name="league_avg_col",
            value_name="league_avg_value"
        )
        long_la["metric"] = long_la["league_avg_col"].str.replace("league_avg_", "", regex=False)

        long_p = long_p.merge(long_la[id_cols + ["metric", "league_avg_value"]],
                              on=id_cols + ["metric"], how="left")
        long_p["delta_vs_league"] = long_p["metric_value"] - long_p["league_avg_value"]
    else:
        long_p["league_avg_value"] = np.nan
        long_p["delta_vs_league"] = np.nan

    # Clean up
    long_p = long_p.drop(columns=["pctile_col"])
    # Helpful ordering
    front = id_cols + ["metric", "lens", "metric_value", "percentile", "league_avg_value", "delta_vs_league"]
    rest = [c for c in long_p.columns if c not in front]
    long_p = long_p[front + rest]

    return long_p

# ----------------------------
# Load canonical WIDE_ALL
# ----------------------------
if not IN_WIDE_ALL.exists():
    raise FileNotFoundError(f"Missing input: {IN_WIDE_ALL}")

wide_all = pd.read_csv(IN_WIDE_ALL)
wide_all.columns = [c.strip() for c in wide_all.columns]

# ----------------------------
# Build WIDE_MIN50
# ----------------------------
min_col = first_existing(wide_all, ["minutes_total", "minutes", "Minutes", "min", "mp"])
if not min_col:
    raise ValueError(
        "Couldn't find a minutes column for MIN50 filter. "
        "Add your minutes column name to the candidates list."
    )

wide_min50 = wide_all.copy()
wide_min50[min_col] = pd.to_numeric(wide_min50[min_col], errors="coerce")
wide_min50 = wide_min50[wide_min50[min_col] >= 50].reset_index(drop=True)

# ----------------------------
# Build LONG datasets
# ----------------------------
# (These will include metric_value, and league avg/delta if those columns exist in WIDE)
long_all = build_long(wide_all)
long_min50 = build_long(wide_min50)

# ----------------------------
# Export exactly 4 files
# ----------------------------
wide_all.to_csv(OUT_WIDE_ALL, index=False)
wide_min50.to_csv(OUT_WIDE_MIN50, index=False)
long_all.to_csv(OUT_LONG_ALL, index=False)
long_min50.to_csv(OUT_LONG_MIN50, index=False)

print("✅ Export pack complete:")
print(" -", OUT_WIDE_ALL)
print(" -", OUT_WIDE_MIN50)
print(" -", OUT_LONG_ALL)
print(" -", OUT_LONG_MIN50)

print("\nQuick sanity:")
print("WIDE_ALL rows:", len(wide_all), "| WIDE_MIN50 rows:", len(wide_min50))
print("LONG_ALL rows:", len(long_all), "| LONG_MIN50 rows:", len(long_min50))
print("Minutes col used for MIN50:", min_col)


✅ Export pack complete:
 - /content/processed_positions/canonical/WIDE_ALL.csv
 - /content/processed_positions/canonical/WIDE_MIN50.csv
 - /content/processed_positions/canonical/LONG_ALL.csv
 - /content/processed_positions/canonical/LONG_MIN50.csv

Quick sanity:
WIDE_ALL rows: 495 | WIDE_MIN50 rows: 449
LONG_ALL rows: 5940 | LONG_MIN50 rows: 5388
Minutes col used for MIN50: Minutes
