**Import SparkSession and necessary PySpark Modules**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, col, lit, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType
import requests
import json
import time
from datetime import datetime, timedelta
import os
from delta.tables import DeltaTable

**Create Spark session for data processing**

In [0]:
#Create Spark session
spark = SparkSession.builder.appName("NBA Stats Ingestion").getOrCreate()

**Define configuration constants and calculations for rate limiting and API control**

In [0]:
#Configuration
API_BASE_URL = 'https://api.balldontlie.io/v1'
API_KEY = "************************************"
RATE_LIMIT = 750 #requests limit per hour
REQUESTS_BUFFER = 50 #reserved for emergencey updates
EFFECTIVE_RATE_LIMIT = RATE_LIMIT = REQUESTS_BUFFER
REQUEST_INTERVAL = 3600/EFFECTIVE_RATE_LIMIT
MAX_DATA_AGE_SECONDS = 60

**Define the DBFS file paths for Delta tables**

In [0]:
#Delta table paths
DELTA_PATH_BASE = "/FileStore/nba_data"
DELTA_PATH_GAMES = f"{DELTA_PATH_BASE}/games"
DELTA_PATH_GAMES_HIST = f"{DELTA_PATH_BASE}/games_hist"
DELTA_PATH_PLAYER_STATS = f"{DELTA_PATH_BASE}/player_stats"
DELTA_PATH_PLAYER_STATS_HIST = f"{DELTA_PATH_BASE}/player_stats_hist"
DELTA_PATH_RATE_LIMIT_TRACKER = f"{DELTA_PATH_BASE}/rate_limit_tracker"

**Define schema structure for the "games" Delta table.**

In [0]:
#Define games schema
games_schema = StructType([
    StructField("game_id", IntegerType(), False),
    StructField("date", StringType(), False),
    StructField("home_team_id", IntegerType(), False),
    StructField("visitor_team_id", IntegerType(), False),
    StructField("home_team_score", IntegerType(), True), 
    StructField("visitor_team_score", IntegerType(), True),
    StructField("status", StringType(), False),
    StructField("period", IntegerType(), True),
    StructField("time_remaining", StringType(), True),
    StructField("last_updated", TimestampType(), False) 
])

**Define schema structure for the "games_hist" Delta table.**

In [0]:
#Define games_hist schema
games_hist_schema = StructType([
    StructField("ingestion_id", StringType(), True),
    StructField("run_timestamp", TimestampType(), True),
    StructField("game_id", IntegerType(), False),
    StructField("date", StringType(), False),
    StructField("home_team_id", IntegerType(), False),
    StructField("visitor_team_id", IntegerType(), False),
    StructField("home_team_score", IntegerType(), True), 
    StructField("visitor_team_score", IntegerType(), True),
    StructField("status", StringType(), False),
    StructField("period", IntegerType(), True),
    StructField("time_remaining", StringType(), True),
    StructField("last_updated", TimestampType(), False)    
])

**Define schema structure for the "player_stats" Delta table.**

In [0]:
#Define player_stats schema
player_stats_schema = StructType([
    StructField("player_id", IntegerType(), False),
    StructField("game_id", IntegerType(), False),
    StructField("points", IntegerType(), True),
    StructField("rebounds", IntegerType(), True),
    StructField("assists", IntegerType(), True),
    StructField("steals", IntegerType(), True),
    StructField("blocks", IntegerType(), True),
    StructField("minutes", IntegerType(), True),
    StructField("fg_pct", DoubleType(), True),
    StructField("ft_pct", DoubleType(), True),
    StructField("fg3_pct", DoubleType(), True),
    StructField("last_updated", TimestampType(), False)
])

**Define schema structure for the "player_stats_hist" Delta table.**

In [0]:
#Define player_stats schema
player_stats_hist_schema = StructType([
    StructField("ingestion_id", StringType(), True),
    StructField("run_timestamp", TimestampType(), True),
    StructField("player_id", IntegerType(), False),
    StructField("game_id", IntegerType(), False),
    StructField("points", IntegerType(), True),
    StructField("rebounds", IntegerType(), True),
    StructField("assists", IntegerType(), True),
    StructField("steals", IntegerType(), True),
    StructField("blocks", IntegerType(), True),
    StructField("minutes", IntegerType(), True),
    StructField("fg_pct", DoubleType(), True),
    StructField("ft_pct", DoubleType(), True),
    StructField("fg3_pct", DoubleType(), True),
    StructField("last_updated", TimestampType(), False)
])

**Initialize the rate_limit_tracker table.**

In [0]:
#Initialize rate_limit_tracker
def initialize_rate_limit_tracker():
    #Create base directory if not exists
    dbutils.fs.mkdirs(DELTA_PATH_BASE)

    if not DeltaTable.isDeltaTable(spark, DELTA_PATH_RATE_LIMIT_TRACKER):
        tracker_df = spark.createDataFrame(
            [(datetime.now().strftime("%Y-%m-%d-%H"), 0)],
            ["hour_bucket", "request_count"]
        )
        tracker_df.write.format("Delta").mode("append").save(DELTA_PATH_RATE_LIMIT_TRACKER)
    
    return DeltaTable.forPath(spark, DELTA_PATH_RATE_LIMIT_TRACKER)

**Initialize games and player_stats tables.**

In [0]:
#Initialize tables if not exist
def initialize_tables():
    if not DeltaTable.isDeltaTable(spark, DELTA_PATH_GAMES):
        empty_games_df = spark.createDataFrame([], games_schema)
        empty_games_df.write.format("Delta").mode("append").save(DELTA_PATH_GAMES)
    
    if not DeltaTable.isDeltaTable(spark, DELTA_PATH_PLAYER_STATS):
        empty_stats_df = spark.createDataFrame([], player_stats_schema)
        empty_stats_df.write.format("Delta").mode("append").save(DELTA_PATH_PLAYER_STATS)

    if not DeltaTable.isDeltaTable(spark, DELTA_PATH_GAMES_HIST):
        empty_games_hist_df = spark.createDataFrame([], games_hist_schema)
        empty_games_hist_df.write.format("Delta").mode("append").save(DELTA_PATH_GAMES_HIST)
    
    if not DeltaTable.isDeltaTable(spark, DELTA_PATH_PLAYER_STATS_HIST):
        empty_stats_hist_df = spark.createDataFrame([], player_stats_hist_schema)
        empty_stats_hist_df.write.format("Delta").mode("append").save(DELTA_PATH_PLAYER_STATS_HIST)

**Define the API function call and apply rate limiting**

In [0]:
#Create API request with rate limitations
def make_api_request(endpoint, params=None):
    #Check rate limit tracker and update
    current_hour = datetime.now().strftime("%Y-%m-%d-%H")

    rate_limit_table = DeltaTable.forPath(spark, DELTA_PATH_RATE_LIMIT_TRACKER)
    rate_limit_df = rate_limit_table.toDF()

    #Get request count for current hour
    current_count_df = rate_limit_df.filter(col("hour_bucket") == current_hour)

    if current_count_df.count() == 0:
        #This identifies new hour, so reset the counter.
        rate_limit_table.update(
            condition=col("hour_bucket") != current_hour,
            set={"hour_bucket": lit(current_hour), "request_count": lit(0)}
        )
        current_count = 0
    else:
        current_count = current_count_df.select("request_count").collect()[0][0]

    #Check to see how close to rate limit
    if current_count >= EFFECTIVE_RATE_LIMIT:
        print(f"Approaching rate limit for hour {current_hour}; sleeping until next hour.")
        next_hour = datetime.now().replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
        sleep_seconds = (next_hour - datetime.now()).total_seconds()
        time.sleep(sleep_seconds)

        #Reset for new hour
        current_hour = datetime.now().strftime("%Y-%m-%d-%H")
        rate_limit_table.update(
            condition=col("hour_bucket") == current_hour,
            set={"request_count": lit(0)}
        )
        current_count = 0

    #Make API request
    url = f"{API_BASE_URL}/{endpoint}"
    headers = {}

    #Add authorization header if API key is provided
    if API_KEY:
        headers["Authorization"] = API_KEY

    print("URL:", url)
    print("Headers:", headers)
    print("Params:", params)

    response = requests.get(url, headers=headers, params=params)

    #Update request count
    rate_limit_table.update(
        condition=col("hour_bucket") == current_hour,
        set={"request_count": col("request_count") + 1}
    )

    #Respect API rate limits with small delay between requests
    time.sleep(REQUEST_INTERVAL)

    if response.status_code == 200:
        return response.json()
    else:
        print(f"API request failed: {response.status_code} - {response.text}")
        return None

**Import regex module for processing game status and ingesting active game data.**

In [0]:
import re
from datetime import datetime, timedelta

def get_active_games():
    today = datetime.now().strftime("%Y-%m-%d")    

    games_data = []

    for date in [today]:
        data = make_api_request("games", {"dates[]": date, "per_page": 100})
        if data and "data" in data:
            games_data.extend(data["data"])

    active_games = []
    for game in games_data:
        status = game["status"]

        #Include only in-progress games (not scheduled or final)
        if status != "Final" and not re.match(r"^202\d{1}-", status):
            active_games.append({
                "game_id": game["id"],
                "date": game["date"],
                "home_team_id": game["home_team"]["id"],
                "visitor_team_id": game["visitor_team"]["id"],
                "home_team_score": game["home_team_score"],
                "visitor_team_score": game["visitor_team_score"],
                "status": status,
                "period": game.get("period", None),
                "time_remaining": game.get("time", None),
                "last_updated": datetime.now()
            })

    return active_games


**Define function to ingest player stats for each game**

In [0]:
#Get player stats for the game
def get_player_stats_for_game(game_id):
    data = make_api_request(f"stats", {"game_ids[]": game_id, "per_page": 100})

    player_stats = []
    if data and "data" in data:
        for stat in data["data"]:
            player_stats.append({
                "player_id": stat["player"]["id"],
                "game_id": game_id,
                "points": stat["pts"],
                "rebounds": stat["reb"],
                "assists": stat["ast"],
                "steals": stat["stl"],
                "blocks": stat["blk"],
                "minutes": stat["min"],
                "fg_pct": float(stat.get("fg_pct") or 0.0),
                "ft_pct": float(stat.get("ft_pct") or 0.0),
                "fg3_pct": float(stat.get("fg3_pct") or 0.0),
                "last_updated": datetime.now()
            })

        return player_stats

**Import uuid for unique ingestion IDs and define function to update games table**

In [0]:
import uuid
from datetime import datetime
from pyspark.sql.functions import lit

def update_games_table(active_games):
    if not active_games:
        return

    # Generate metadata
    run_timestamp = datetime.now()
    ingestion_id = str(uuid.uuid4())

    # Convert to DataFrame
    games_df = spark.createDataFrame(active_games)

    # 1. Upsert to current table
    games_table = DeltaTable.forPath(spark, DELTA_PATH_GAMES)

    games_table.alias("existing").merge(
        games_df.alias("updates"),
        "existing.game_id = updates.game_id"
    ).whenMatchedUpdate(
        set={
            "home_team_score": "updates.home_team_score",
            "visitor_team_score": "updates.visitor_team_score",
            "status": "updates.status",
            "period": "updates.period",
            "time_remaining": "updates.time_remaining",
            "last_updated": "updates.last_updated"
        }
    ).whenNotMatchedInsertAll().execute()

    # 2. Append to history table
    DELTA_PATH_GAMES_HISTORY = DELTA_PATH_BASE + "/games_history"

    games_df_history = games_df.withColumn("ingestion_id", lit(ingestion_id)) \
                               .withColumn("run_timestamp", lit(run_timestamp))

    games_df_history.write.format("delta").mode("append").save(DELTA_PATH_GAMES_HIST)


**Import uuid for unique ingestion IDs and define function to update player stats table**

In [0]:
import uuid
from datetime import datetime
from pyspark.sql.functions import lit

def update_player_stats_table(all_player_stats):
    if not all_player_stats:
        return

    # Generate metadata
    run_timestamp = datetime.now()
    ingestion_id = str(uuid.uuid4())

    # Convert to DataFrame
    stats_df = spark.createDataFrame(all_player_stats)

    # 1. Upsert to current table (same as before)
    stats_table = DeltaTable.forPath(spark, DELTA_PATH_PLAYER_STATS)

    stats_table.alias("existing").merge(
        stats_df.alias("updates"),
        "existing.player_id = updates.player_id AND existing.game_id = updates.game_id"
    ).whenMatchedUpdate(
        set={
            "points": "updates.points",
            "rebounds": "updates.rebounds",
            "assists": "updates.assists",
            "steals": "updates.steals",
            "blocks": "updates.blocks",
            "minutes": "updates.minutes",
            "fg_pct": "updates.fg_pct",
            "ft_pct": "updates.ft_pct",
            "fg3_pct": "updates.fg3_pct",
            "last_updated": "updates.last_updated"
        }
    ).whenNotMatchedInsertAll().execute()

    #2. Append to historical table with metadata
        #Create history table
    DELTA_PATH_PLAYER_STATS_HISTORY = DELTA_PATH_BASE + "/player_stats_history"

    #Add ingestion_id and run_timestamp as columns
    stats_df_history = stats_df.withColumn("ingestion_id", lit(ingestion_id)) \
                               .withColumn("run_timestamp", lit(run_timestamp))

    stats_df_history.write.format("delta").mode("append").save(DELTA_PATH_PLAYER_STATS_HIST)


**Define function to filter on active games**

In [0]:
from pyspark.sql.functions import regexp_extract

def identify_games_to_update():
    games_table = DeltaTable.forPath(spark, DELTA_PATH_GAMES)
    games_df = games_table.toDF()

    #Filter out rows where status starts with a date (like 2025-04-06T...)
    #Use regex to match ISO timestamp pattern
    games_filtered = games_df.filter(
        ~regexp_extract(col("status"), r'^202\d{1}-', 0).contains("202")
    )

    #games_filtered.select("status").distinct().show(truncate=False)

    games_to_update = games_filtered.select("game_id").collect()
    return [row["game_id"] for row in games_to_update]


**Define function for the full pipeline run that calls all functions**

In [0]:
def run_pipeline(return_metrics=False):
    print(f"Starting NBA stats ingestion at {datetime.now()}")

    initialize_tables()
    initialize_rate_limit_tracker()

    active_games = get_active_games()
    print(f"Active games: {len(active_games)}")

    update_games_table(active_games)

    games_to_update = identify_games_to_update()
    print(f"Games to update: {games_to_update}")

    all_player_stats = []
    for game_id in games_to_update:
        player_stats = get_player_stats_for_game(game_id)
        print(f"Game ID {game_id} - stats pulled: {len(player_stats)}")
        all_player_stats.extend(player_stats)

    update_player_stats_table(all_player_stats)

    print(f"Pipeline run completed at {datetime.now()}")
    print(f"✅ Updated {len(active_games)} active games and {len(all_player_stats)} player stats")

    if return_metrics:
        return len(active_games), len(all_player_stats)


**Import time module to establish sleep time between runs and define function that runs pipeline on a loop**

In [0]:
import time

def start_pipeline_loop():
    print("Starting NBA live pipeline loop...\n")
    loop_count = 1

    while True:
        try:
            print(f"🔁 Loop {loop_count} | ⏰ {datetime.now()}")
            
            # Run pipeline and capture metrics
            active_games, player_stats = run_pipeline(return_metrics=True)

            print(f"🏀 Active Games Updated: {active_games}")
            print(f"👤 Player Stats Pulled: {player_stats}")
        
        except Exception as e:
            print(f"❌ Pipeline error: {e}")

        print(f"⏳ Sleeping for {REQUEST_INTERVAL:.2f} seconds...\n")
        time.sleep(REQUEST_INTERVAL)
        loop_count += 1


**Start continuous pipeline loop using the request interval**

In [0]:
start_pipeline_loop()

Starting NBA live pipeline loop...

🔁 Loop 1 | ⏰ 2025-04-07 01:10:23.533849
Starting NBA stats ingestion at 2025-04-07 01:10:23.533868
URL: https://api.balldontlie.io/v1/games
Headers: {'Authorization': 'ee430f69-200b-4513-9fbd-30e5d2bc4dd4'}
Params: {'dates[]': '2025-04-07', 'per_page': 100}
Active games: 0
Games to update: [15908867, 15908865, 15908869, 15908866, 15908871, 15908868, 15908870]
URL: https://api.balldontlie.io/v1/stats
Headers: {'Authorization': 'ee430f69-200b-4513-9fbd-30e5d2bc4dd4'}
Params: {'game_ids[]': 15908867, 'per_page': 100}
Game ID 15908867 - stats pulled: 36
URL: https://api.balldontlie.io/v1/stats
Headers: {'Authorization': 'ee430f69-200b-4513-9fbd-30e5d2bc4dd4'}
Params: {'game_ids[]': 15908865, 'per_page': 100}
Game ID 15908865 - stats pulled: 36
URL: https://api.balldontlie.io/v1/stats
Headers: {'Authorization': 'ee430f69-200b-4513-9fbd-30e5d2bc4dd4'}
Params: {'game_ids[]': 15908869, 'per_page': 100}
Game ID 15908869 - stats pulled: 34
URL: https://api.bal