# NBA ELT Pipeline Prototyping

For this pipeline, we will use Pandas to perform data transformation and load the data into a PostgreSQL database (Data Warehouse). We will use the nba-api package to achieve this.

### Requirements:

- Python 3.11+
- Kafka
- PostgreSQL 13.4+
- Pandas
- SQLAlchemy

### Setup

In [1]:
!pip install sqlalchemy



In [2]:
!pip install psycopg2



In [1]:
from nba_api.stats.endpoints import *
from nba_api.stats.static import players, teams
import time
import pandas as pd
import sqlalchemy
import datetime as dt

In [4]:
engine = sqlalchemy.create_engine('postgresql://postgres:postgres@localhost:5432/nba')

In [5]:
proxy_info = {'http': 'http://auto:apify_proxy_X4cFIqR8cbXXz2hq2bTusiFAQctscK3bu1lI@proxy.apify.com:8000'}

## Static Datasets

We'll load the static datasets first, such as players, teams, and games.

In [6]:
all_players = players.get_players()
all_players_df = pd.DataFrame(all_players)

In [7]:
all_players_df.to_sql(
    name='all_players',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

4815

In [2]:
all_teams = teams.get_teams()
all_teams_df = pd.DataFrame(all_teams)

In [8]:
print(all_teams_df['id'].

None


In [5]:
display(all_teams_df)

Unnamed: 0,id,full_name,abbreviation,nickname,city,state,year_founded
0,1610612737,Atlanta Hawks,ATL,Hawks,Atlanta,Georgia,1949
1,1610612738,Boston Celtics,BOS,Celtics,Boston,Massachusetts,1946
2,1610612739,Cleveland Cavaliers,CLE,Cavaliers,Cleveland,Ohio,1970
3,1610612740,New Orleans Pelicans,NOP,Pelicans,New Orleans,Louisiana,2002
4,1610612741,Chicago Bulls,CHI,Bulls,Chicago,Illinois,1966
5,1610612742,Dallas Mavericks,DAL,Mavericks,Dallas,Texas,1980
6,1610612743,Denver Nuggets,DEN,Nuggets,Denver,Colorado,1976
7,1610612744,Golden State Warriors,GSW,Warriors,Golden State,California,1946
8,1610612745,Houston Rockets,HOU,Rockets,Houston,Texas,1967
9,1610612746,Los Angeles Clippers,LAC,Clippers,Los Angeles,California,1970


In [9]:
all_teams_df.to_sql(
    name='all_teams',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

30

In [10]:
all_players_df.info()
all_teams_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4815 entries, 0 to 4814
Data columns (total 5 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   id          4815 non-null   int64 
 1   full_name   4815 non-null   object
 2   first_name  4815 non-null   object
 3   last_name   4815 non-null   object
 4   is_active   4815 non-null   bool  
dtypes: bool(1), int64(1), object(3)
memory usage: 155.3+ KB
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 30 entries, 0 to 29
Data columns (total 7 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   id            30 non-null     int64 
 1   full_name     30 non-null     object
 2   abbreviation  30 non-null     object
 3   nickname      30 non-null     object
 4   city          30 non-null     object
 5   state         30 non-null     object
 6   year_founded  30 non-null     int64 
dtypes: int64(2), object(5)
memory usage: 1.8+ KB


## Historical Stats

### All Time Leaders

In [11]:
alltimeleadersgrids_data = alltimeleadersgrids.AllTimeLeadersGrids()

In [12]:
ast_leaders = alltimeleadersgrids_data.ast_leaders.get_data_frame()
blk_leaders = alltimeleadersgrids_data.blk_leaders.get_data_frame()
dreb_leaders = alltimeleadersgrids_data.dreb_leaders.get_data_frame()
fg3_a_leaders = alltimeleadersgrids_data.fg3_a_leaders.get_data_frame()
fg3_m_leaders = alltimeleadersgrids_data.fg3_m_leaders.get_data_frame()
fg3_pct_leaders = alltimeleadersgrids_data.fg3_pct_leaders.get_data_frame()
fga_leaders = alltimeleadersgrids_data.fga_leaders.get_data_frame()
fgm_leaders = alltimeleadersgrids_data.fgm_leaders.get_data_frame()
fg_pct_leaders = alltimeleadersgrids_data.fg_pct_leaders.get_data_frame()
fta_leaders = alltimeleadersgrids_data.fta_leaders.get_data_frame()
ftm_leaders = alltimeleadersgrids_data.ftm_leaders.get_data_frame()
ft_pct_leaders = alltimeleadersgrids_data.ft_pct_leaders.get_data_frame()
g_p_leaders = alltimeleadersgrids_data.g_p_leaders.get_data_frame()
oreb_leaders = alltimeleadersgrids_data.oreb_leaders.get_data_frame()
pf_leaders = alltimeleadersgrids_data.pf_leaders.get_data_frame()
pts_leaders = alltimeleadersgrids_data.pts_leaders.get_data_frame()
reb_leaders = alltimeleadersgrids_data.reb_leaders.get_data_frame()
stl_leaders = alltimeleadersgrids_data.stl_leaders.get_data_frame()
tov_leaders = alltimeleadersgrids_data.tov_leaders.get_data_frame()

#### Bronze

In [13]:
ast_leaders.to_sql(
    name='ast_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

10

In [14]:
blk_leaders.to_sql(
    name='blk_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

dreb_leaders.to_sql(
    name='dreb_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

fg3_a_leaders.to_sql(
    name='fg3_a_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

fg3_m_leaders.to_sql(
    name='fg3_m_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

10

In [15]:
fg3_pct_leaders.to_sql(
    name='fg3_pct_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

fga_leaders.to_sql(
    name='fga_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

fgm_leaders.to_sql(
    name='fgm_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

fg_pct_leaders.to_sql(
    name='fg_pct_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

fta_leaders.to_sql(
    name='fta_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

ftm_leaders.to_sql(
    name='ftm_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

ft_pct_leaders.to_sql(
    name='ft_pct_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

g_p_leaders.to_sql(
    name='g_p_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

oreb_leaders.to_sql(
    name='oreb_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

pf_leaders.to_sql(
    name='pf_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

pts_leaders.to_sql(
    name='pts_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

reb_leaders.to_sql(
    name='reb_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

stl_leaders.to_sql(
    name='stl_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

tov_leaders.to_sql(
    name='tov_leaders',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

10

### Assist Tracker

In [16]:
assist_tracker = assisttracker.AssistTracker()

global_assist_counter = assist_tracker.get_data_frames()

In [17]:
display(global_assist_counter[0])

Unnamed: 0,ASSISTS
0,57727


In [18]:
assist_counter  = global_assist_counter[0]

In [19]:
assist_counter['_ingestion_time'] = dt.datetime.now()

In [20]:
assist_counter.to_sql(
    name='assist_counter',
    con=engine,
    schema='bronze',
    if_exists='append',
    method='multi'
)

1

### Games

In [21]:
team_dataframes = []

for team in all_teams:
    print(team['full_name'])
    games = leaguegamefinder.LeagueGameFinder(team_id_nullable=team['id'])
    games_df = games.get_data_frames()
    print(f'Retrieved list of game dataframes for {team["full_name"]}')
    team_dataframes.append({'team_name': team['full_name'], 'games_df_list': games_df})
    time.sleep(45)

Atlanta Hawks
Retrieved list of game dataframes for Atlanta Hawks
Boston Celtics
Retrieved list of game dataframes for Boston Celtics
Cleveland Cavaliers
Retrieved list of game dataframes for Cleveland Cavaliers
New Orleans Pelicans
Retrieved list of game dataframes for New Orleans Pelicans
Chicago Bulls
Retrieved list of game dataframes for Chicago Bulls
Dallas Mavericks
Retrieved list of game dataframes for Dallas Mavericks
Denver Nuggets
Retrieved list of game dataframes for Denver Nuggets
Golden State Warriors
Retrieved list of game dataframes for Golden State Warriors
Houston Rockets
Retrieved list of game dataframes for Houston Rockets
Los Angeles Clippers
Retrieved list of game dataframes for Los Angeles Clippers
Los Angeles Lakers
Retrieved list of game dataframes for Los Angeles Lakers
Miami Heat
Retrieved list of game dataframes for Miami Heat
Milwaukee Bucks
Retrieved list of game dataframes for Milwaukee Bucks
Minnesota Timberwolves
Retrieved list of game dataframes for Min

In [22]:
print(team_dataframes[0])

{'team_name': 'Atlanta Hawks', 'games_df_list': [     SEASON_ID     TEAM_ID TEAM_ABBREVIATION      TEAM_NAME     GAME_ID   
0        42022  1610612737               ATL  Atlanta Hawks  0042200116  \
1        42022  1610612737               ATL  Atlanta Hawks  0042200115   
2        42022  1610612737               ATL  Atlanta Hawks  0042200114   
3        42022  1610612737               ATL  Atlanta Hawks  0042200113   
4        42022  1610612737               ATL  Atlanta Hawks  0042200112   
...        ...         ...               ...            ...         ...   
3598     21983  1610612737               ATL  Atlanta Hawks  0028300058   
3599     21983  1610612737               ATL  Atlanta Hawks  0028300041   
3600     21983  1610612737               ATL  Atlanta Hawks  0028300027   
3601     21983  1610612737               ATL  Atlanta Hawks  0028300014   
3602     21983  1610612737               ATL  Atlanta Hawks  0028300005   

       GAME_DATE      MATCHUP WL  MIN  PTS  ...  F

In [23]:
dfs = [team['games_df_list'] for team in team_dataframes]

In [24]:
print(len(dfs))

30


In [25]:
final_dfs_list = []
for df_list in dfs:
    final_dfs_list.extend(df_list)

In [26]:
for i in final_dfs_list:
    if type(i) != pd.DataFrame:
        print(i)

In [27]:
# stack all the dataframes together print()

all_games = pd.concat(final_dfs_list)

In [28]:
display(all_games)

Unnamed: 0,SEASON_ID,TEAM_ID,TEAM_ABBREVIATION,TEAM_NAME,GAME_ID,GAME_DATE,MATCHUP,WL,MIN,PTS,...,FT_PCT,OREB,DREB,REB,AST,STL,BLK,TOV,PF,PLUS_MINUS
0,42022,1610612737,ATL,Atlanta Hawks,0042200116,2023-04-27,ATL vs. BOS,L,241,120,...,0.773,12.0,33.0,45.0,28,5.0,7,10,15,-8.0
1,42022,1610612737,ATL,Atlanta Hawks,0042200115,2023-04-25,ATL @ BOS,W,242,119,...,1.000,6.0,28.0,34.0,26,5.0,4,8,16,2.0
2,42022,1610612737,ATL,Atlanta Hawks,0042200114,2023-04-23,ATL vs. BOS,L,240,121,...,0.875,11.0,31.0,42.0,25,8.0,4,12,24,-8.0
3,42022,1610612737,ATL,Atlanta Hawks,0042200113,2023-04-21,ATL vs. BOS,W,240,130,...,0.813,11.0,37.0,48.0,24,5.0,6,18,15,8.0
4,42022,1610612737,ATL,Atlanta Hawks,0042200112,2023-04-18,ATL @ BOS,L,241,106,...,0.500,19.0,30.0,49.0,21,10.0,4,15,11,-13.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2874,21988,1610612766,CHH,Charlotte Hornets,0028800062,1988-11-12,CHH @ ATL,L,238,111,...,0.813,14.0,21.0,35.0,30,12.0,2,19,26,
2875,21988,1610612766,CHH,Charlotte Hornets,0028800052,1988-11-11,CHH @ WAS,L,240,87,...,0.760,11.0,32.0,43.0,22,9.0,1,23,26,
2876,21988,1610612766,CHH,Charlotte Hornets,0028800024,1988-11-08,CHH vs. LAC,W,240,117,...,0.738,17.0,38.0,55.0,28,9.0,1,17,31,
2877,21988,1610612766,CHH,Charlotte Hornets,0028800015,1988-11-05,CHH @ DET,L,240,85,...,1.000,21.0,19.0,40.0,18,8.0,6,11,21,


In [29]:
all_games.to_sql(
    name='all_games',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

102841

#### Box Scores

In [30]:
# get the game ids as a list
try:
    game_ids = all_games['GAME_ID'].tolist()
except Exception as e:
    game_ids = pd.read_sql_query('select * from bronze.all_games', con=engine)['GAME_ID'].tolist()

In [31]:
from nba_api.stats.endpoints._base import Endpoint
import typing

def boxscore_getter(endpoint: Endpoint) -> typing.Union[pd.DataFrame, typing.List[pd.DataFrame]]:
    try:
        return endpoint.get_data_frames()
    except Exception as e:
        try:
            return endpoint.get_data_frame()
        except Exception as e:
            raise e

In [32]:
test = boxscore_getter(boxscoreadvancedv2.BoxScoreAdvancedV2(game_id=game_ids[0], proxy='http://groups-RESIDENTIAL:apify_proxy_X4cFIqR8cbXXz2hq2bTusiFAQctscK3bu1lI@proxy.apify.com:8000'))

In [33]:
test[0].info()
test[1].info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 29 entries, 0 to 28
Data columns (total 32 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   GAME_ID            29 non-null     object 
 1   TEAM_ID            29 non-null     int64  
 2   TEAM_ABBREVIATION  29 non-null     object 
 3   TEAM_CITY          29 non-null     object 
 4   PLAYER_ID          29 non-null     int64  
 5   PLAYER_NAME        29 non-null     object 
 6   NICKNAME           29 non-null     object 
 7   START_POSITION     29 non-null     object 
 8   COMMENT            29 non-null     object 
 9   MIN                19 non-null     object 
 10  E_OFF_RATING       19 non-null     float64
 11  OFF_RATING         29 non-null     float64
 12  E_DEF_RATING       19 non-null     float64
 13  DEF_RATING         29 non-null     float64
 14  E_NET_RATING       19 non-null     float64
 15  NET_RATING         29 non-null     float64
 16  AST_PCT            19 non-nu

In [34]:
test_game_boxscore = pd.concat(test)

In [19]:
!pip install tqdm

Collecting tqdm
  Downloading tqdm-4.65.0-py3-none-any.whl (77 kB)
     ---------------------------------------- 77.1/77.1 kB 4.5 MB/s eta 0:00:00
Installing collected packages: tqdm
Successfully installed tqdm-4.65.0


In [42]:
boxscore_advanced_bag = []
boxscore_defensive_bag = []
boxscore_fourfactors_bag = []
boxscore_matchups_bag = []
boxscore_misc_v2_bag = []
boxscore_playertrackv2_bag = []
boxscore_scoring_v2_bag = []
boxscore_summary_v2_bag = []
boxscore_traditional_v2_bag = []
boxscore_usage_v2_bag = []

import multiprocessing as mp
import tqdm

# loop through each game, and append the results to the appropriate list
def boxscore_looper(endpoint: Endpoint, game_id_list: typing.Iterable[str]) -> typing.List[pd.DataFrame]:
    temp_list = []
    
    with tqdm.tqdm(total=len(game_id_list)) as pbar:
        for game_id in game_id_list:
            try:
                response_1 = boxscore_getter(endpoint(game_id=game_id, timeout=5, proxy='http://groups-RESIDENTIAL:apify_proxy_X4cFIqR8cbXXz2hq2bTusiFAQctscK3bu1lI@proxy.apify.com:8000'))
                if type(response_1) == pd.DataFrame:
                    temp_list.append(response_1)
                    response_1.to_sql(name='boxscore_advanced_v2', con=engine, schema='bronze', if_exists='append', method='multi')
                else:
                    temp_list.extend(response_1)
                    for i in response_1:
                        i.to_sql(name='boxscore_advanced_v2', con=engine, schema='bronze', if_exists='append', method='multi')
            except Exception as e:
                if isinstance(e, TimeoutError):
                    time.sleep(5)
                    continue
                else:
                    pass
            pbar.update(1)
    
    return temp_list

In [43]:
adv_boxscore = boxscore_looper(boxscoreadvancedv2.BoxScoreAdvancedV2, game_id_list=set(game_ids)) # converted to a set so we only do it for unique games

  0%|          | 0/51511 [00:00<?, ?it/s]

  2%|▏         | 1199/51511 [1:00:06<42:01:57,  3.01s/it]


KeyboardInterrupt: 

## Common

In [35]:
common_all_players_response = commonallplayers.CommonAllPlayers(timeout=15).get_data_frames()

In [38]:
common_all_players_response[0].to_sql(name='common_all_players', con=engine, schema='bronze', if_exists='replace', method='multi')

4820

In [None]:
common_player_info = commonplayerinfo.CommonPlayerInfo(timeout=15).get_data_frames()

In [42]:
common_playoff_series = commonplayoffseries.CommonPlayoffSeries(timeout=15).get_data_frames()
# defensehub_df = defensehub.DefenseHub(proxy= 'http://groups-RESIDENTIAL:apify_proxy_X4cFIqR8cbXXz2hq2bTusiFAQctscK3bu1lI@proxy.apify.com:8000',timeout=15).get_data_frames()
draftboard_df = draftboard.DraftBoard(proxy='http://groups-RESIDENTIAL:apify_proxy_X4cFIqR8cbXXz2hq2bTusiFAQctscK3bu1lI@proxy.apify.com:8000',timeout=15).get_data_frames()
draftcombinedrillresults_df = draftcombinedrillresults.DraftCombineDrillResults(timeout=15).get_data_frames()
draftcombine_non_stationary = draftcombinenonstationaryshooting.DraftCombineNonStationaryShooting(timeout=15).get_data_frames()
draftcombine_stats = draftcombinestats.DraftCombineStats(timeout=15).get_data_frames()
drafthistoryz = drafthistory.DraftHistory(timeout=15).get_data_frames()
fantasywidgetz = fantasywidget.FantasyWidget(timeout=15).get_data_frames()

In [44]:
common_playoff_series[0].to_sql(
    name='common_playoff_series',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

draftboard_df[0].to_sql(
    name='draftboard',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

draftcombinedrillresults_df[0].to_sql(
    name='draftcombinedrillresults',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

draftcombine_non_stationary[0].to_sql(
    name='draftcombine_non_stationary',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

draftcombine_stats[0].to_sql(
    name='draftcombine_stats',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

drafthistoryz[0].to_sql(
    name='drafthistory',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

fantasywidgetz[0].to_sql(
    name='fantasywidget',
    con=engine,
    schema='bronze',
    if_exists='replace',
    method='multi'
)

539

In [45]:
league_season_matchups = leagueseasonmatchups.LeagueSeasonMatchups(timeout=15).get_data_frames()

## Streaming Stats