In [1]:
# %pip install -U prefect

In [2]:
import os
import shap
import pandas as pd
import numpy as np
import time
from typing import Optional, List, Dict
import sqlite3
from collections import defaultdict
from prefect import task, flow
from nba_api.stats.static import teams, players
from nba_api.stats.endpoints import leaguegamefinder, boxscoresummaryv2, boxscoretraditionalv2, playerprofilev2, playerindex, playercareerstats, commonplayerinfo
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)

Using `tqdm.autonotebook.tqdm` in notebook mode. Use `tqdm.tqdm` instead to force console mode (e.g. in jupyter console)


In [3]:
@task(log_prints=True)
def get_db_path() -> str:
    # TODO: add reading prefect variables
    result = '../../../data/basnya.db'
    print(f"DB path: '{result}'")
    return result

@task(retries=2, log_prints=True)
def get_latest_game(db_path: str) -> pd.DataFrame:
    """
    Retrieve the row(s) for the game(s) with the latest GAME_DATE_EST from the "GAMES" table.

    Parameters:
    - db_path (str): The path to the SQLite database file.

    Returns:
    - pd.DataFrame or None: A DataFrame containing the row(s) for the game(s) with the latest GAME_DATE_EST,
      or None if an error occurs.
    """
    # SQL query to find the game with the latest GAME_DATE_EST
    sql_query = """
    SELECT *
    FROM "GAMES"
    WHERE "GAME_DATE_EST" = (SELECT MAX("GAME_DATE_EST") FROM "GAMES");
    """

    # Use 'with' statement to automatically close the connection
    with sqlite3.connect(db_path) as connection:
        # Use pandas to execute the SQL query and read the results into a DataFrame
        result_df = pd.read_sql(sql_query, connection)
    if result_df.empty:
        print("DB seems to be empty")
    else:
        print(f"Found games on latest date of '{result_df.GAME_DATE_EST.iloc[0]}': {len(result_df)}")
    return result_df


@task(retries=2, log_prints=True)
def get_games_from_to(date_from, date_to):
    _dfs = []
    for s_t in [leaguegamefinder.SeasonTypeNullable.regular, leaguegamefinder.SeasonTypeNullable.playoffs]:
        _df = leaguegamefinder.LeagueGameFinder(
            date_from_nullable=(pd.to_datetime(date_from) + pd.Timedelta(days=1)).strftime('%m/%d/%Y'), 
            date_to_nullable=pd.to_datetime(date_to).strftime('%m/%d/%Y'), 
            # season_nullable=season, 
            season_type_nullable=s_t,
            league_id_nullable=leaguegamefinder.LeagueIDNullable.nba
        ).get_data_frames()[0]
        _dfs.append(_df)
    return pd.concat(_dfs, axis=0)

@task(retries=2, log_prints=True)
def get_games_by_ids(game_ids: List[str]) -> pd.DataFrame:
    _dfs = []
    for g_i in game_ids:
        _game_df = boxscoresummaryv2.BoxScoreSummaryV2(game_id=g_i).get_data_frames()[0]
        if _game_df.empty:
            print(f'not found game for id: {g_i}')
        _dfs.append(_game_df)
    return pd.concat(_dfs, axis=0)

def get_games_minimal_date(games_by_ids_df: pd.DataFrame) -> str:
    min_dt = pd.to_datetime(games_by_ids_df.GAME_DATE_EST.min())
    return min_dt.date().strftime('%m/%d/%Y')

@flow(retries=2, log_prints=True)
def get_games_ids_to_append(latest_games: pd.DataFrame, game_ids: List[str]) -> List[str]:
    games_by_ids_df = get_games_by_ids(game_ids=game_ids)
    latest_game_date_from_db = pd.to_datetime(latest_games.GAME_DATE_EST.max()).date().strftime('%m/%d/%Y')
    earliest_game_date_from_game_ids = get_games_minimal_date(games_by_ids_df)
    print(f"earching for games from {latest_game_date_from_db} to {earliest_game_date_from_game_ids}")
    missing_games = get_games_from_to(
        date_from=latest_game_date_from_db,
        date_to=earliest_game_date_from_game_ids)
    result = list(set(missing_games.GAME_ID.to_list() + game_ids))
    print(f"need to append games: {len(result)}")
    return result

@task(retries=2, log_prints=True)
def get_current_players(db_path: str) -> pd.DataFrame:
    """
    Retrieves players from "player_0" table.

    Parameters:
    - db_path (str): The path to the SQLite database file.

    Returns:
    - pd.DataFrame or None: A DataFrame containing all players in DB,
      or None if an error occurs.
    """
    # SQL query to find the game with the latest GAME_DATE_EST
    sql_query = """
    SELECT *
    FROM "player_0";
    """
    with sqlite3.connect(db_path) as connection:
        result_df = pd.read_sql(sql_query, connection)
    if result_df.empty:
        print("DB seems to be empty")
    else:
        print(f"Found players: {len(result_df)}")
    return result_df

@task(retries=2, log_prints=True)
def prepare_games_to_append(game_ids_to_append) -> Dict[str, pd.DataFrame]:
    result = defaultdict(list)
    for g_i in game_ids_to_append:
        for endpoint, table_prefix in zip([boxscoresummaryv2.BoxScoreSummaryV2, boxscoretraditionalv2.BoxScoreTraditionalV2],
                                          ['boxscoresummaryv2', 'boxscoretraditionalv2']):
            _frames = endpoint(game_id=g_i).get_data_frames()
            for i, _df in enumerate(_frames):
                result[f"{table_prefix}_{i}"].append(_df)
    return {k: pd.concat(v, axis=0) for k, v in result.items()}
    
@task(retries=2, log_prints=True)
def prepare_players_to_append(current_players: pd.DataFrame, games_dataframe_dict: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
    players_from_games = games_dataframe_dict.get('boxscoretraditionalv2_0', pd.DataFrame({'PLAYER_ID': []}))
    players_to_append = set(players_from_games.PLAYER_ID) - set(current_players.PERSON_ID)
    if players_to_append:
        print(f"need to append players: {len(players_to_append)}")
        result = defaultdict(list)
        for pl_i in players_to_append:
            _frames = commonplayerinfo.CommonPlayerInfo(player_id=pl_i).get_data_frames()
            for i, _df in enumerate(_frames):
                result[f"player_{i}"].append(_df)
        return {k: pd.concat(v, axis=0) for k, v in result.items()}    
    else:
        print("no need to append players")
        return {}                                              


@task(retries=2, log_prints=True)
def append_tables(db_path: str, dataframe_dict: Dict[str, pd.DataFrame]):
    with sqlite3.connect(db_path) as connection:
        for table_name, _df in dataframe_dict.items():
            print(f"writing {len(_df)} records to {table_name}")
            _df.to_sql(table_name, con=connection, if_exists='append')
    return
        


@flow(log_prints=True)
def backfill_games(game_ids: List[str]) -> Dict[str, pd.DataFrame]:
    db_path = get_db_path()
    latest_game_df = get_latest_game(db_path=db_path)
    current_players = get_current_players(db_path=db_path)
    game_ids_to_append = get_games_ids_to_append(
        latest_games=latest_game_df, 
        game_ids=game_ids
    )
    games_to_append_dict = prepare_games_to_append(game_ids_to_append)
    players_to_append_dict = prepare_players_to_append(
        current_players=current_players, 
        games_dataframe_dict=games_to_append_dict
    )
    games_to_append_dict.update(players_to_append_dict)
    append_tables(
        db_path=db_path, 
        dataframe_dict=games_to_append_dict
    )
    
    return games_to_append_dict

_d = backfill_games(game_ids=['0022200002'])
len(_d)

12

In [4]:
_d['boxscoretraditionalv2_1']

Unnamed: 0,GAME_ID,TEAM_ID,TEAM_NAME,TEAM_ABBREVIATION,TEAM_CITY,MIN,FGM,FGA,FG_PCT,FG3M,FG3A,FG3_PCT,FTM,FTA,FT_PCT,OREB,DREB,REB,AST,STL,BLK,TO,PF,PTS,PLUS_MINUS
0,22200002,1610612747,Lakers,LAL,Los Angeles,240.000000:00,40,94,0.426,10,40,0.25,19,25,0.76,9,39,48,23,12,4,21,18,109,-14.0
1,22200002,1610612744,Warriors,GSW,Golden State,240.000000:00,45,99,0.455,16,45,0.356,17,23,0.739,11,37,48,31,11,4,18,23,123,14.0
0,22200001,1610612755,76ers,PHI,Philadelphia,240.000000:00,40,80,0.5,13,34,0.382,24,28,0.857,4,27,31,16,8,3,14,25,117,-9.0
1,22200001,1610612738,Celtics,BOS,Boston,240.000000:00,46,82,0.561,12,35,0.343,22,28,0.786,6,30,36,24,8,3,10,24,126,9.0
