In [5]:
import logging
import os
import time
from datetime import timedelta
from typing import Iterable, List, Optional

import pandas as pd
from nba_api.stats.endpoints import playergamelog
from nba_api.stats.static import players

try:
    from dotenv import load_dotenv
    load_dotenv()
except Exception:
    pass

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("nba_pipeline")

SEASON = os.getenv("NBA_SEASON", "2024-25")
RUN_FULL_EXTRACT = os.getenv("NBA_RUN_FULL_EXTRACT", "false").lower() == "true"
MAX_PLAYERS = int(os.getenv("NBA_MAX_PLAYERS", "25"))


  from pandas.core.computation.check import NUMEXPR_INSTALLED
  from pandas.core import (


# NBA Analytics Pipeline

An end-to-end data pipeline that fetches NBA player statistics, generates daily leaderboards, and stores data in Google Cloud Platform.

**Pipeline Flow:**
```
NBA API → Pandas (extract/clean) → GCS (land) → BigQuery (warehouse + SQL analytics) → Claude AI Summary
```

**Technologies:** Python, NBA API, Google Cloud Storage, BigQuery, Claude API

## 1. Data Collection

Fetch player game logs from the NBA API. Includes rate limiting to avoid API throttling.

In [6]:
# Get all active NBA players
active_players = players.get_active_players()
players_df = pd.DataFrame(active_players)

selected_players = active_players if RUN_FULL_EXTRACT else active_players[:MAX_PLAYERS]
logger.info("Found %s active players", len(players_df))
logger.info("Processing %s players for season %s", len(selected_players), SEASON)

players_df.head(10)


Found 530 active players


Unnamed: 0,id,full_name,first_name,last_name,is_active
0,1630173,Precious Achiuwa,Precious,Achiuwa,True
1,203500,Steven Adams,Steven,Adams,True
2,1628389,Bam Adebayo,Bam,Adebayo,True
3,1630534,Ochai Agbaji,Ochai,Agbaji,True
4,1630583,Santi Aldama,Santi,Aldama,True
5,1641725,Trey Alexander,Trey,Alexander,True
6,1629638,Nickeil Alexander-Walker,Nickeil,Alexander-Walker,True
7,1628960,Grayson Allen,Grayson,Allen,True
8,1628386,Jarrett Allen,Jarrett,Allen,True
9,1630631,Jose Alvarado,Jose,Alvarado,True


In [7]:
# Function to get game log for a player

def get_player_game_log(player_id: int, season: str = SEASON, retries: int = 3, delay: float = 0.8) -> pd.DataFrame:
    """Get normalized game logs for a player with retry logic."""
    cols = ["GAME_DATE", "MATCHUP", "WL", "MIN", "PTS", "REB", "AST", "STL", "BLK", "TOV"]

    for attempt in range(1, retries + 1):
        try:
            gamelog = playergamelog.PlayerGameLog(player_id=player_id, season=season)
            df = gamelog.get_data_frames()[0]

            missing_cols = [c for c in cols if c not in df.columns]
            if missing_cols:
                raise ValueError(f"Missing expected columns: {missing_cols}")

            out = df[cols].copy()
            out["GAME_DATE"] = pd.to_datetime(out["GAME_DATE"], errors="coerce")
            out = out.dropna(subset=["GAME_DATE"])

            numeric_cols = ["MIN", "PTS", "REB", "AST", "STL", "BLK", "TOV"]
            for col in numeric_cols:
                out[col] = pd.to_numeric(out[col], errors="coerce").fillna(0)

            out["SEASON"] = season
            out["INGESTED_AT_UTC"] = pd.Timestamp.utcnow()
            return out
        except Exception:
            if attempt == retries:
                logger.exception("Failed player_id=%s after %s attempts", player_id, retries)
                return pd.DataFrame()
            sleep_seconds = delay * attempt
            logger.warning("Retrying player_id=%s attempt=%s/%s in %.1fs", player_id, attempt, retries, sleep_seconds)
            time.sleep(sleep_seconds)

    return pd.DataFrame()


Player: LeBron James, ID: 2544


Unnamed: 0,GAME_DATE,MATCHUP,WL,MIN,PTS,REB,AST,STL,BLK,TOV
0,"Apr 11, 2025",LAL vs. HOU,W,22,14,4,8,1,0,1
1,"Apr 09, 2025",LAL @ DAL,W,36,27,7,3,1,0,2
2,"Apr 08, 2025",LAL @ OKC,L,35,28,7,3,1,0,6
3,"Apr 06, 2025",LAL @ OKC,W,34,19,3,7,1,0,2
4,"Apr 04, 2025",LAL vs. NOP,W,33,27,0,8,2,0,1
5,"Apr 03, 2025",LAL vs. GSW,L,40,33,5,9,1,1,4
6,"Mar 31, 2025",LAL vs. HOU,W,38,16,8,4,2,2,3
7,"Mar 29, 2025",LAL @ MEM,W,37,25,6,8,3,1,3
8,"Mar 27, 2025",LAL @ CHI,L,39,17,5,12,2,0,4
9,"Mar 26, 2025",LAL @ IND,W,38,13,13,7,1,0,1


In [8]:
# Get game logs for multiple players (with rate limiting)
def get_all_player_game_logs(player_list: Iterable[dict], season: str = SEASON, delay: float = 0.6) -> pd.DataFrame:
    """Fetch game logs for multiple players with basic resilience controls."""
    all_logs = []
    player_list = list(player_list)

    for i, player in enumerate(player_list, start=1):
        player_id = player["id"]
        player_name = player["full_name"]
        logger.info("Fetching %s/%s: %s", i, len(player_list), player_name)

        games = get_player_game_log(player_id, season=season)
        if not games.empty:
            games["PLAYER_ID"] = player_id
            games["PLAYER_NAME"] = player_name
            all_logs.append(games)

        time.sleep(delay)

    if not all_logs:
        raise RuntimeError("No game logs were fetched. Check API availability and season value.")

    all_game_logs = pd.concat(all_logs, ignore_index=True)
    all_game_logs = all_game_logs.drop_duplicates(subset=["PLAYER_ID", "GAME_DATE", "MATCHUP"]).copy()
    all_game_logs = all_game_logs.sort_values(["GAME_DATE", "PLAYER_ID"], ascending=[False, True])

    required = {"GAME_DATE", "PLAYER_ID", "PLAYER_NAME", "PTS", "REB", "AST"}
    missing = required - set(all_game_logs.columns)
    if missing:
        raise ValueError(f"Missing required fields in merged logs: {sorted(missing)}")

    logger.info("Fetched %s rows across %s players", len(all_game_logs), all_game_logs["PLAYER_ID"].nunique())
    return all_game_logs.reset_index(drop=True)


all_game_logs = get_all_player_game_logs(selected_players)
all_game_logs.head(10)


Fetching 1/5: Precious Achiuwa
Fetching 2/5: Steven Adams
Fetching 3/5: Bam Adebayo
Fetching 4/5: Ochai Agbaji
Fetching 5/5: Santi Aldama


Unnamed: 0,GAME_DATE,MATCHUP,WL,MIN,PTS,REB,AST,STL,BLK,TOV,PLAYER_ID,PLAYER_NAME
0,"Apr 13, 2025",NYK @ BKN,W,33,18,9,2,2,0,2,1630173,Precious Achiuwa
1,"Apr 11, 2025",NYK vs. CLE,L,15,0,6,0,2,1,1,1630173,Precious Achiuwa
2,"Apr 10, 2025",NYK @ DET,L,40,18,10,3,2,3,0,1630173,Precious Achiuwa
3,"Apr 05, 2025",NYK @ ATL,W,25,6,3,0,0,2,1,1630173,Precious Achiuwa
4,"Apr 02, 2025",NYK @ CLE,L,21,13,6,1,1,2,0,1630173,Precious Achiuwa
...,...,...,...,...,...,...,...,...,...,...,...,...
317,"Oct 30, 2024",MEM vs. BKN,L,22,2,3,0,2,0,1,1630583,Santi Aldama
318,"Oct 28, 2024",MEM vs. CHI,L,35,11,13,7,0,1,1,1630583,Santi Aldama
319,"Oct 26, 2024",MEM vs. ORL,W,28,22,7,5,0,1,2,1630583,Santi Aldama
320,"Oct 25, 2024",MEM @ HOU,L,22,2,5,3,0,2,2,1630583,Santi Aldama


## 2. Google Cloud Setup

Configure Google Cloud Storage for data persistence.

In [None]:
from google.cloud import bigquery, storage

# Required runtime config
PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_NAME = os.getenv("GCS_BUCKET_NAME")
BQ_DATASET = os.getenv("BQ_DATASET", "nba_data")
BQ_LOCATION = os.getenv("BQ_LOCATION", "US")

missing_cfg = [name for name, value in {
    "GCP_PROJECT_ID": PROJECT_ID,
    "GCS_BUCKET_NAME": BUCKET_NAME,
}.items() if not value]

if missing_cfg:
    raise EnvironmentError(f"Missing required environment variables: {', '.join(missing_cfg)}")

logger.info("Configured PROJECT_ID=%s BUCKET_NAME=%s DATASET=%s", PROJECT_ID, BUCKET_NAME, BQ_DATASET)


In [12]:
# Create bucket if it doesn't exist
client = storage.Client(project=PROJECT_ID)
bucket = client.bucket(BUCKET_NAME)

if bucket.exists(client):
    logger.info("Bucket %s already exists", BUCKET_NAME)
else:
    bucket = client.create_bucket(BUCKET_NAME, location=BQ_LOCATION.lower())
    logger.info("Created bucket %s", BUCKET_NAME)




Bucket nba-data-485505 already exists


In [None]:
# Upload DataFrame to GCS
def upload_df_to_gcs(df: pd.DataFrame, project_id: str, bucket_name: str, destination_blob_name: str) -> str:
    """Upload a DataFrame as CSV to Google Cloud Storage and return gs:// URI."""
    if df.empty:
        raise ValueError(f"Refusing to upload empty DataFrame to {destination_blob_name}")

    gcs_client = storage.Client(project=project_id)
    bucket = gcs_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    csv_data = df.to_csv(index=False)
    blob.upload_from_string(csv_data, content_type="text/csv")
    uri = f"gs://{bucket_name}/{destination_blob_name}"
    logger.info("Uploaded %s rows to %s", len(df), uri)
    return uri


## 3. Analytics - Daily Leaderboards

Generate daily leaderboards for Points, Rebounds, and Assists. Also calculate season averages (PPG, RPG, APG).

In [None]:
# Daily Leaderboards: Points, Rebounds, Assists

def get_daily_leaderboard(df: pd.DataFrame, stat_column: str) -> pd.DataFrame:
    """Get deterministic daily leader per stat with date-safe sorting."""
    required_cols = {"GAME_DATE", "PLAYER_NAME", stat_column}
    missing = required_cols - set(df.columns)
    if missing:
        raise ValueError(f"Missing columns for leaderboard: {sorted(missing)}")

    working = df.copy()
    working["GAME_DATE"] = pd.to_datetime(working["GAME_DATE"], errors="coerce")
    working = working.dropna(subset=["GAME_DATE"])
    working[stat_column] = pd.to_numeric(working[stat_column], errors="coerce").fillna(0)

    leaders = (
        working.sort_values(["GAME_DATE", stat_column, "PLAYER_NAME"], ascending=[False, False, True])
        .drop_duplicates(subset=["GAME_DATE"], keep="first")
        [["GAME_DATE", "PLAYER_NAME", stat_column]]
        .rename(columns={"GAME_DATE": "Date", "PLAYER_NAME": "Player"})
        .sort_values("Date", ascending=False)
        .reset_index(drop=True)
    )
    return leaders


pts_leaders = get_daily_leaderboard(all_game_logs, "PTS")
reb_leaders = get_daily_leaderboard(all_game_logs, "REB")
ast_leaders = get_daily_leaderboard(all_game_logs, "AST")

print("=== POINTS LEADERS BY DAY ===")
display(pts_leaders.head(10))

print("\n=== REBOUNDS LEADERS BY DAY ===")
display(reb_leaders.head(10))

print("\n=== ASSISTS LEADERS BY DAY ===")


In [None]:
display(ast_leaders.head(10))

# Combined view: All leaders side by side for each day
combined = pts_leaders.merge(reb_leaders, on="Date", suffixes=("_pts", "_reb"))
combined = combined.merge(ast_leaders, on="Date")
combined.columns = ["Date", "PTS Leader", "PTS", "REB Leader", "REB", "AST Leader", "AST"]
combined["Date"] = pd.to_datetime(combined["Date"]).dt.date
combined = combined.sort_values("Date", ascending=False).reset_index(drop=True)

print("\n=== COMBINED DAILY LEADERBOARD ===")
combined.head(15)


In [None]:
# Season totals leaderboard
season_totals = (
    all_game_logs.groupby(["PLAYER_ID", "PLAYER_NAME"], as_index=False)
    .agg({"PTS": "sum", "REB": "sum", "AST": "sum", "GAME_DATE": "count"})
    .rename(columns={"GAME_DATE": "GP"})
)

season_totals["PPG"] = (season_totals["PTS"] / season_totals["GP"]).round(1)
season_totals["RPG"] = (season_totals["REB"] / season_totals["GP"]).round(1)
season_totals["APG"] = (season_totals["AST"] / season_totals["GP"]).round(1)

eligible = season_totals[season_totals["GP"] >= 5].copy()
if eligible.empty:
    eligible = season_totals.copy()

print("=== SEASON LEADERS (Per Game Averages, GP>=5 when available) ===")
print(f"\nPPG Leader: {eligible.loc[eligible['PPG'].idxmax(), 'PLAYER_NAME']} - {eligible['PPG'].max()}")
print(f"RPG Leader: {eligible.loc[eligible['RPG'].idxmax(), 'PLAYER_NAME']} - {eligible['RPG'].max()}")
print(f"APG Leader: {eligible.loc[eligible['APG'].idxmax(), 'PLAYER_NAME']} - {eligible['APG'].max()}")

print("\n=== FULL SEASON STATS ===")
season_totals.sort_values(["PPG", "RPG", "APG"], ascending=False).head(20)


## 4. Upload to Cloud Storage

Export raw game logs and leaderboard data to Google Cloud Storage as CSV files.

In [None]:
# Upload raw game logs to GCS for BigQuery
run_stamp = pd.Timestamp.utcnow().strftime("%Y%m%d")
game_logs_blob = f"nba_data/{SEASON}/{run_stamp}/game_logs.csv"
game_logs_uri = upload_df_to_gcs(all_game_logs, PROJECT_ID, BUCKET_NAME, game_logs_blob)

print("\nRaw game logs uploaded to GCS!")
print(game_logs_uri)


## 5. BigQuery Integration

Load raw game logs into BigQuery with **date partitioning** and **player clustering** for cost-optimized queries.
Then perform analytics entirely in SQL: window functions, CTEs, conditional aggregation, and reusable views.

In [None]:
# Initialize BigQuery client
bq_client = bigquery.Client(project=PROJECT_ID)

# Create dataset if it doesn't exist
dataset_id = f"{PROJECT_ID}.{BQ_DATASET}"
dataset = bigquery.Dataset(dataset_id)
dataset.location = BQ_LOCATION
bq_client.create_dataset(dataset, exists_ok=True)

RAW_STAGING_TABLE = f"{PROJECT_ID}.{BQ_DATASET}.stg_game_logs"
RAW_GAME_LOGS_TABLE = f"{PROJECT_ID}.{BQ_DATASET}.raw_game_logs"
GAME_LOGS_TABLE = RAW_GAME_LOGS_TABLE

logger.info("Dataset ready: %s", dataset_id)
logger.info("Staging table: %s", RAW_STAGING_TABLE)
logger.info("Raw table: %s", RAW_GAME_LOGS_TABLE)


In [None]:
# Load CSV from GCS to BigQuery with optional partitioning and clustering
def load_gcs_to_bigquery(
    gcs_uri: str,
    table_id: str,
    schema: List[bigquery.SchemaField],
    partition_field: Optional[str] = None,
    clustering_fields: Optional[List[str]] = None,
    write_disposition: str = bigquery.WriteDisposition.WRITE_APPEND,
) -> None:
    """Load a CSV from GCS into BigQuery with optional partitioning and clustering."""
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        schema=schema,
        write_disposition=write_disposition,
    )

    if partition_field:
        job_config.time_partitioning = bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field=partition_field,
        )
    if clustering_fields:
        job_config.clustering_fields = clustering_fields

    load_job = bq_client.load_table_from_uri(gcs_uri, table_id, job_config=job_config)
    load_job.result()

    table = bq_client.get_table(table_id)
    logger.info(
        "Loaded %s rows to %s (write=%s, partitioned=%s, clustered=%s)",
        table.num_rows,
        table_id,
        write_disposition,
        partition_field or "none",
        clustering_fields or "none",
    )


In [None]:
# Load incoming game logs into staging table
game_logs_schema = [
    bigquery.SchemaField("GAME_DATE", "DATE"),
    bigquery.SchemaField("MATCHUP", "STRING"),
    bigquery.SchemaField("WL", "STRING"),
    bigquery.SchemaField("MIN", "FLOAT"),
    bigquery.SchemaField("PTS", "INTEGER"),
    bigquery.SchemaField("REB", "INTEGER"),
    bigquery.SchemaField("AST", "INTEGER"),
    bigquery.SchemaField("STL", "INTEGER"),
    bigquery.SchemaField("BLK", "INTEGER"),
    bigquery.SchemaField("TOV", "INTEGER"),
    bigquery.SchemaField("SEASON", "STRING"),
    bigquery.SchemaField("INGESTED_AT_UTC", "TIMESTAMP"),
    bigquery.SchemaField("PLAYER_ID", "INTEGER"),
    bigquery.SchemaField("PLAYER_NAME", "STRING"),
]

load_gcs_to_bigquery(
    game_logs_uri,
    RAW_STAGING_TABLE,
    game_logs_schema,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)

print(f"\nLoaded incoming file to staging table: {RAW_STAGING_TABLE}")


In [None]:
# Data quality checks + MERGE from staging into partitioned raw table

dq_query = f"""
WITH base AS (
  SELECT *
  FROM `{RAW_STAGING_TABLE}`
),
dups AS (
  SELECT COUNT(*) AS duplicate_keys
  FROM (
    SELECT player_id, game_date, matchup, COUNT(*) AS cnt
    FROM base
    GROUP BY player_id, game_date, matchup
    HAVING COUNT(*) > 1
  )
)
SELECT
  (SELECT COUNT(*) FROM base) AS total_rows,
  (SELECT COUNT(*) FROM base WHERE player_id IS NULL OR game_date IS NULL OR matchup IS NULL) AS null_key_rows,
  (SELECT duplicate_keys FROM dups) AS duplicate_key_rows
"""

dq = bq_client.query(dq_query).to_dataframe().iloc[0].to_dict()
print("=== DATA QUALITY CHECKS (staging) ===")
print(dq)

if dq["total_rows"] == 0:
    raise ValueError("DQ failed: staging table has zero rows")
if dq["null_key_rows"] > 0:
    raise ValueError(f"DQ failed: found {dq['null_key_rows']} rows with null business keys")
if dq["duplicate_key_rows"] > 0:
    raise ValueError(f"DQ failed: found {dq['duplicate_key_rows']} duplicate business keys")

create_raw_table_ddl = f"""
CREATE TABLE IF NOT EXISTS `{RAW_GAME_LOGS_TABLE}` (
  game_date DATE,
  matchup STRING,
  wl STRING,
  min FLOAT64,
  pts INT64,
  reb INT64,
  ast INT64,
  stl INT64,
  blk INT64,
  tov INT64,
  season STRING,
  ingested_at_utc TIMESTAMP,
  player_id INT64,
  player_name STRING
)
PARTITION BY game_date
CLUSTER BY player_id, player_name
"""

merge_sql = f"""
MERGE `{RAW_GAME_LOGS_TABLE}` T
USING `{RAW_STAGING_TABLE}` S
ON T.player_id = S.player_id
AND T.game_date = S.game_date
AND T.matchup = S.matchup
WHEN NOT MATCHED THEN
  INSERT (game_date, matchup, wl, min, pts, reb, ast, stl, blk, tov, season, ingested_at_utc, player_id, player_name)
  VALUES (S.game_date, S.matchup, S.wl, S.min, S.pts, S.reb, S.ast, S.stl, S.blk, S.tov, S.season, S.ingested_at_utc, S.player_id, S.player_name)
"""

bq_client.query(create_raw_table_ddl).result()
pre_count = bq_client.query(f"SELECT COUNT(*) AS c FROM `{RAW_GAME_LOGS_TABLE}`").to_dataframe().iloc[0]["c"]
bq_client.query(merge_sql).result()
post_count = bq_client.query(f"SELECT COUNT(*) AS c FROM `{RAW_GAME_LOGS_TABLE}`").to_dataframe().iloc[0]["c"]

print("\nMERGE completed")
print(f"Rows before merge: {pre_count}")
print(f"Rows after merge:  {post_count}")
print(f"Rows inserted:     {post_count - pre_count}")
print(f"Partitioned by: game_date | Clustered by: player_id, player_name")


In [None]:
# Daily leaderboard using ROW_NUMBER() window function
daily_leaders_query = f"""
WITH ranked AS (
  SELECT
    game_date,
    player_name,
    pts, reb, ast,
    ROW_NUMBER() OVER (
      PARTITION BY game_date ORDER BY pts DESC, player_name
    ) AS pts_rank,
    ROW_NUMBER() OVER (
      PARTITION BY game_date ORDER BY reb DESC, player_name
    ) AS reb_rank,
    ROW_NUMBER() OVER (
      PARTITION BY game_date ORDER BY ast DESC, player_name
    ) AS ast_rank
  FROM `{GAME_LOGS_TABLE}`
)
SELECT
  p.game_date,
  p.player_name AS pts_leader, p.pts,
  r.player_name AS reb_leader, r.reb,
  a.player_name AS ast_leader, a.ast
FROM ranked p
JOIN ranked r ON p.game_date = r.game_date AND r.reb_rank = 1
JOIN ranked a ON p.game_date = a.game_date AND a.ast_rank = 1
WHERE p.pts_rank = 1
ORDER BY p.game_date DESC
LIMIT 15
"""

results = bq_client.query(daily_leaders_query).to_dataframe()
print("=== Daily Leaders (BigQuery window function: ROW_NUMBER) ===")
results

In [None]:
# Season averages using GROUP BY with HAVING filter
season_avg_query = f"""
SELECT
  player_name,
  COUNT(*) AS gp,
  SUM(pts) AS total_pts,
  SUM(reb) AS total_reb,
  SUM(ast) AS total_ast,
  ROUND(AVG(pts), 1) AS ppg,
  ROUND(AVG(reb), 1) AS rpg,
  ROUND(AVG(ast), 1) AS apg,
  ROUND(AVG(stl), 1) AS spg,
  ROUND(AVG(blk), 1) AS bpg
FROM `{GAME_LOGS_TABLE}`
GROUP BY player_name
HAVING COUNT(*) >= 5
ORDER BY ppg DESC
"""

season_avg_df = bq_client.query(season_avg_query).to_dataframe()
print("=== Season Averages (BigQuery GROUP BY + HAVING) ===")
season_avg_df

In [None]:
# Create a reusable view for the daily leaderboard
view_ddl = f"""
CREATE OR REPLACE VIEW `{PROJECT_ID}.{BQ_DATASET}.v_daily_leaderboard` AS
WITH ranked AS (
  SELECT
    game_date,
    player_name,
    pts, reb, ast,
    ROW_NUMBER() OVER (
      PARTITION BY game_date ORDER BY pts DESC, player_name
    ) AS pts_rank,
    ROW_NUMBER() OVER (
      PARTITION BY game_date ORDER BY reb DESC, player_name
    ) AS reb_rank,
    ROW_NUMBER() OVER (
      PARTITION BY game_date ORDER BY ast DESC, player_name
    ) AS ast_rank
  FROM `{GAME_LOGS_TABLE}`
)
SELECT
  p.game_date,
  p.player_name AS pts_leader, p.pts,
  r.player_name AS reb_leader, r.reb,
  a.player_name AS ast_leader, a.ast
FROM ranked p
JOIN ranked r ON p.game_date = r.game_date AND r.reb_rank = 1
JOIN ranked a ON p.game_date = a.game_date AND a.ast_rank = 1
WHERE p.pts_rank = 1
"""

bq_client.query(view_ddl).result()
logger.info("Created view: %s.%s.v_daily_leaderboard", PROJECT_ID, BQ_DATASET)

# Verify the view works
view_df = bq_client.query(f"""
SELECT * FROM `{PROJECT_ID}.{BQ_DATASET}.v_daily_leaderboard`
ORDER BY game_date DESC
LIMIT 5
""").to_dataframe()

print("=== View: v_daily_leaderboard (sample) ===")
view_df

In [None]:
# Trend detection: last 5 games vs prior 5 using ROW_NUMBER + conditional aggregation
trend_query = f"""
WITH game_numbered AS (
  SELECT
    player_id,
    player_name,
    pts, reb, ast,
    ROW_NUMBER() OVER (
      PARTITION BY player_id ORDER BY game_date DESC
    ) AS game_num
  FROM `{GAME_LOGS_TABLE}`
),
splits AS (
  SELECT
    player_name,
    COUNT(CASE WHEN game_num <= 5 THEN 1 END) AS recent_gp,
    COUNT(CASE WHEN game_num BETWEEN 6 AND 10 THEN 1 END) AS prior_gp,
    ROUND(AVG(CASE WHEN game_num <= 5 THEN pts END), 1) AS recent_ppg,
    ROUND(AVG(CASE WHEN game_num BETWEEN 6 AND 10 THEN pts END), 1) AS prior_ppg,
    ROUND(AVG(CASE WHEN game_num <= 5 THEN reb END), 1) AS recent_rpg,
    ROUND(AVG(CASE WHEN game_num BETWEEN 6 AND 10 THEN reb END), 1) AS prior_rpg,
    ROUND(AVG(CASE WHEN game_num <= 5 THEN ast END), 1) AS recent_apg,
    ROUND(AVG(CASE WHEN game_num BETWEEN 6 AND 10 THEN ast END), 1) AS prior_apg
  FROM game_numbered
  WHERE game_num <= 10
  GROUP BY player_id, player_name
  HAVING recent_gp >= 3 AND prior_gp >= 3
)
SELECT
  player_name,
  recent_gp,
  prior_gp,
  recent_ppg,
  prior_ppg,
  ROUND(recent_ppg - prior_ppg, 1) AS pts_delta,
  recent_rpg,
  prior_rpg,
  ROUND(recent_rpg - prior_rpg, 1) AS reb_delta,
  recent_apg,
  prior_apg,
  ROUND(recent_apg - prior_apg, 1) AS ast_delta
FROM splits
ORDER BY pts_delta DESC
"""

bq_trends = bq_client.query(trend_query).to_dataframe()
print("=== Trend Detection (BigQuery: last 5 games vs prior 5) ===")
bq_trends

## 6. Automated Recent Trend Detection

Automatically identify the most recent statistically meaningful player trend by comparing recent performance vs the prior window.


In [None]:
TREND_WINDOW_DAYS = int(os.getenv("TREND_WINDOW_DAYS", "7"))
TREND_MIN_GAMES = int(os.getenv("TREND_MIN_GAMES", "2"))


def detect_recent_trend(df: pd.DataFrame, window_days: int = TREND_WINDOW_DAYS, min_games: int = TREND_MIN_GAMES):
    """Return top recent player trends by stat change across two adjacent windows."""
    required = {"GAME_DATE", "PLAYER_ID", "PLAYER_NAME", "PTS", "REB", "AST"}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns for trend detection: {sorted(missing)}")

    working = df.copy()
    working["GAME_DATE"] = pd.to_datetime(working["GAME_DATE"], errors="coerce")
    working = working.dropna(subset=["GAME_DATE"]).copy()

    latest_date = working["GAME_DATE"].max().normalize()
    recent_start = latest_date - timedelta(days=window_days - 1)
    prior_start = recent_start - timedelta(days=window_days)
    prior_end = recent_start - timedelta(days=1)

    recent = working[(working["GAME_DATE"] >= recent_start) & (working["GAME_DATE"] <= latest_date)]
    prior = working[(working["GAME_DATE"] >= prior_start) & (working["GAME_DATE"] <= prior_end)]

    stat_cols = ["PTS", "REB", "AST"]

    recent_agg = (
        recent.groupby(["PLAYER_ID", "PLAYER_NAME"], as_index=False)
        .agg({**{c: "mean" for c in stat_cols}, "GAME_DATE": "count"})
        .rename(columns={"GAME_DATE": "recent_games", **{c: f"recent_{c}" for c in stat_cols}})
    )
    prior_agg = (
        prior.groupby(["PLAYER_ID", "PLAYER_NAME"], as_index=False)
        .agg({**{c: "mean" for c in stat_cols}, "GAME_DATE": "count"})
        .rename(columns={"GAME_DATE": "prior_games", **{c: f"prior_{c}" for c in stat_cols}})
    )

    merged = recent_agg.merge(prior_agg, on=["PLAYER_ID", "PLAYER_NAME"], how="inner")
    merged = merged[(merged["recent_games"] >= min_games) & (merged["prior_games"] >= min_games)].copy()

    if merged.empty:
        return pd.DataFrame(), "No trend detected: insufficient overlapping games in recent and prior windows."

    long_rows = []
    for stat in stat_cols:
        tmp = merged[["PLAYER_ID", "PLAYER_NAME", "recent_games", "prior_games", f"recent_{stat}", f"prior_{stat}"]].copy()
        tmp = tmp.rename(columns={f"recent_{stat}": "recent_avg", f"prior_{stat}": "prior_avg"})
        tmp["stat"] = stat
        tmp["delta"] = (tmp["recent_avg"] - tmp["prior_avg"]).round(2)
        tmp["pct_change"] = ((tmp["delta"] / tmp["prior_avg"].replace(0, pd.NA)) * 100).round(1)
        long_rows.append(tmp)

    trends = pd.concat(long_rows, ignore_index=True)
    trends = trends.sort_values(["delta", "recent_avg"], ascending=[False, False]).reset_index(drop=True)

    top = trends.iloc[0]
    summary = (
        f"Top recent trend through {latest_date.date()}: {top['PLAYER_NAME']} increased {top['stat']} by "
        f"{top['delta']} (from {top['prior_avg']:.1f} to {top['recent_avg']:.1f}) "
        f"over the last {window_days} days vs the previous {window_days} days."
    )

    return trends, summary


trend_candidates, trend_summary = detect_recent_trend(all_game_logs)
print("=== AUTOMATED RECENT TREND HIGHLIGHT ===")
print(trend_summary)
print("\n=== TOP 10 TREND CANDIDATES ===")
display(trend_candidates.head(10))


## 7. AI-Powered Analysis Article

Use Claude API to synthesize leaderboard data, season averages, and player trends into a full analysis article with actionable insights and a TLDR.

In [None]:
import anthropic
from pathlib import Path

# Initialize Claude client (uses ANTHROPIC_API_KEY environment variable)
ant_api_key = os.getenv("ANTHROPIC_API_KEY")
claude_client = anthropic.Anthropic(api_key=ant_api_key) if ant_api_key else None
ANTHROPIC_MODEL = os.getenv("ANTHROPIC_MODEL", "claude-sonnet-4-5-20250929")

OUTPUT_DIR = Path(os.getenv("NBA_OUTPUT_DIR", "reports"))
OUTPUT_DIR.mkdir(exist_ok=True)


def generate_analysis_article(
    daily_leaders: pd.DataFrame,
    season_averages: pd.DataFrame,
    trends: pd.DataFrame,
    trend_highlight: str = "",
    season: str = SEASON,
) -> str:
    """Generate a blog-style NBA analysis article from leaderboard and trend data."""
    if claude_client is None:
        return "Skipping AI analysis: ANTHROPIC_API_KEY is not set."
    if daily_leaders.empty and season_averages.empty and trends.empty:
        return "Skipping AI analysis: no data available."

    leaders_preview = daily_leaders.head(15).to_string(index=False) if not daily_leaders.empty else "N/A"
    season_preview = season_averages.head(15).to_string(index=False) if not season_averages.empty else "N/A"
    trends_preview = trends.head(10).to_string(index=False) if not trends.empty else "N/A"
    today = pd.Timestamp.utcnow().strftime("%B %d, %Y")

    message = claude_client.messages.create(
        model=ANTHROPIC_MODEL,
        max_tokens=2500,
        messages=[
            {
                "role": "user",
                "content": f"""You are an NBA beat writer for a popular sports blog. Write an engaging,
editorial-style article that reads like something you'd find on The Ringer or ESPN.
Use ONLY the data below. Do not invent stats or reference players not in the data.

--- DAILY LEADERBOARD (recent game-day leaders for PTS, REB, AST) ---
{leaders_preview}

--- SEASON AVERAGES (per-game stats, minimum 5 games played) ---
{season_preview}

--- PLAYER TRENDS (last 5 games vs prior 5 games, delta = recent minus prior) ---
{trends_preview}

Trend highlight from automated detection:
{trend_highlight}

Today's date: {today}
Season: {season}

Write a blog article (strictly 400-500 words, do NOT exceed 500 words) with this structure:

1. A compelling headline

2. A bold TLDR at the top (2-3 sentences, italicized) that hooks the reader with the
   single biggest storyline from the data.

3. A narrative opening paragraph that sets the scene. Don't list stats yet — tell the
   reader what the story is. What's the headline? Who should they be paying attention to?

4. A "Who's Running the Show" section written in flowing paragraphs (NOT bullet points).
   Weave together the leaderboard dominance and season averages into a narrative. Compare
   players, describe what makes their recent stretch impressive, and reference specific
   game dates and stat lines from the leaderboard data.

5. A "The Hot Hand" section (also flowing prose, NOT bullets) covering players trending
   up. Describe the shift in their game — what changed in the last 5 games vs the prior
   5? Use the delta numbers naturally in sentences, not as a list.

6. A "Cooling Off" section (prose) for players trending down. Frame it as a narrative —
   is it a slump, a role change, fatigue? Use the delta numbers in context.

7. A "What to Watch" closing section with 2-3 forward-looking storylines written as
   short editorial paragraphs. Frame these as questions or scenarios that fans should
   track. End with a sentence that leaves the reader wanting to check back.

Style rules:
- Write like a journalist, not a data analyst. Stats support the story, not the other way around.
- Use transitions between sections. The article should flow, not feel like five disconnected blocks.
- Vary sentence length. Mix short punchy lines with longer analytical ones.
- No bullet points or numbered lists anywhere in the article.
- Reference specific game dates and matchups from the leaderboard when possible.
- Every stat claim must come from the provided data.
- IMPORTANT: Keep it tight. 400-500 words total. Be concise and selective with stats.""",
            }
        ],
    )

    return message.content[0].text


# Generate the analysis article from BigQuery results
article = generate_analysis_article(
    daily_leaders=results,
    season_averages=season_avg_df,
    trends=bq_trends,
    trend_highlight=trend_summary,
)

# Save article to text file with run date in filename
run_date = pd.Timestamp.utcnow().strftime("%Y-%m-%d")
output_file = OUTPUT_DIR / f"nba_analysis_{run_date}.txt"
output_file.write_text(article, encoding="utf-8")

print(f"Article saved to: {output_file}")
print(f"{'=' * 60}\n")
print(article)