This notebook can be used to either backfill or incrementally add data to our MLB database. We pull data for all completed regular season games in the selected seasons.

The notebook creates three tables. 

1. *mlb_schedule* consists of data around the various games - who was playing, when was it, where was it played and when. 
2. *mlb_pitch_level* consists of pitch (or more precisely event) level data. For each pitch (or other event, like stolen base, etc), we record details about the batter, pitcher, game state, and details about the pitch like speed, angle, ball/strike decision, etc.
3. *mlb_play_results* is an aggregation of the data in *mlb_pitch_level*, focusing only on the final event in the at bat. That's when we learn the outcome of the at bat: was it an out? a run? etc.

In [1]:
import pandas as pd
import statsapi
import json
import requests
import datetime as dt

from multiprocessing.pool import ThreadPool
from psycopg2.extensions import register_adapter, AsIs

from db_connect import get_connection
from utils import get_schedule, clean_columns

In [2]:
engine = get_connection()

In [3]:
seasons = range(2021, 2024)

schedule = pd.concat([get_schedule(s) for s in seasons])

schedule.head()

  schedule = pd.concat([get_schedule(s) for s in seasons])


Unnamed: 0,gamepk,link,gametype,season,gamedate,officialdate,istie,gamenumber,publicfacing,doubleheader,...,description,teams_away_probablepitcher_note,rescheduledate,reschedulegamedate,rescheduledfrom,rescheduledfromdate,resumedate,resumegamedate,resumedfrom,resumedfromdate
417,634642,/api/v1.1/game/634642/feed/live,R,2021,2021-04-01T17:05:00Z,2021-04-01,False,1,True,N,...,,"Ryu opens as the Blue Jays' undisputed ace, co...",,,,,,,,
418,634645,/api/v1.1/game/634645/feed/live,R,2021,2021-04-01T17:10:00Z,2021-04-01,False,1,True,N,...,Tigers home opener,"For the second straight year, Bieber will get ...",,,,,,,,
419,634638,/api/v1.1/game/634638/feed/live,R,2021,2021-04-01T18:10:00Z,2021-04-01,False,1,True,N,...,Brewers home opener,Maeda will make his first Opening Day start in...,,,,,,,,
420,634634,/api/v1.1/game/634634/feed/live,R,2021,2021-04-01T18:20:00Z,2021-04-01,False,1,True,N,...,Cubs home opener,Kuhl will make his first career Opening Day st...,,,,,,,,
421,634622,/api/v1.1/game/634622/feed/live,R,2021,2021-04-01T19:05:00Z,2021-04-01,False,1,True,N,...,Phillies home opener,Fried will be making his first career Opening ...,,,,,,,,


In [4]:
schedule.to_sql(name = 'mlb_schedule', con = engine, if_exists = 'replace', index = False)

771

In [5]:
# Some entries are dictionaries, so we need an "adapter" to process those dictionaries.
def adapt_dict(dict_var):
    return AsIs("'" + json.dumps(dict_var) + "'")

register_adapter(dict, adapt_dict)

In [6]:
def process_game(gamepk):
    a = statsapi.get('game', {'gamePk': gamepk})
    all_plays = [process_at_bat(x, gamepk) for x in a.get('liveData').get('plays')['allPlays']]
    return(all_plays)

def process_at_bat(a, gamepk):
    results = a['result']
    results.update(a['about'])
    results.update(a['count'])
    results.update({'batter_id': a['matchup']['batter']['id'], 'batter_name': a['matchup']['batter']['fullName']})
    results.update({'pitcher_id': a['matchup']['pitcher']['id'], 'pitcher_name': a['matchup']['pitcher']['fullName']})
    results.update({'gamepk': gamepk})
    results.update({'men_on_base': a['matchup']['splits']['menOnBase']})

    events = pd.json_normalize(a, record_path = 'playEvents', meta = ['atBatIndex'])
    events['gamepk'] = gamepk
    return(results, events)

In [7]:
# To "reset" the database, if the schema gets changed, data gets duplicated, etc.
BACKFILL = False

if BACKFILL:
    engine.execute('DELETE FROM mlb_game_recorded where 1 = 1;')

#engine.execute('DELETE FROM mlb_play_results WHERE 1 = 1')
#engine.execute('DELETE FROM mlb_pitch_results WHERE 1 = 1;')

    engine.execute('DROP TABLE IF EXISTS mlb_pitch_results;')
    engine.execute('DROP TABLE IF EXISTS mlb_play_results')


In [8]:
tpool = ThreadPool(25)

In [9]:
games_to_write = True
grand_start = dt.datetime.now()
start = grand_start

while games_to_write:
    
    game_pks = pd.read_sql("SELECT DISTINCT gamepk FROM mlb_schedule \
                           WHERE gamepk NOT IN (SELECT DISTINCT gamepk FROM mlb_game_recorded)\
                           ", engine) 
#    AND season::NUMERIC = 2023\
                           
                           
    print('Records left: {}, time since start: {}, time since update: {}'.format(len(game_pks), dt.datetime.now() - grand_start, dt.datetime.now() - start))
    
    start = dt.datetime.now()
    
    game_pks = game_pks.sample(n = min(500, len(game_pks)), replace=False)
    
    if len(game_pks) == 0:
        games_to_write = False
        print('done!')
        continue
        
    try:
        data = tpool.map(process_game, game_pks['gamepk'])
    except:
        continue
    
    play_results = pd.json_normalize([play[0] for game in data for play in game])
    play_results.columns = clean_columns(play_results.columns)
    
    pitch_results = pd.concat([play[1] for game in data for play in game])
    pitch_results.columns = clean_columns(pitch_results.columns) 
    
    play_results.to_sql('mlb_play_results', engine, if_exists = 'append', index = False)
    pitch_results.to_sql('mlb_pitch_results', engine, if_exists = 'append', index = False)
    
    engine.execute('INSERT INTO mlb_game_recorded VALUES ' + ', '.join(['({}, True)'.format(x) for x in game_pks['gamepk']]))

Records left: 183, time since start: 0:00:00.020519, time since update: 0:00:00.020532
Records left: 0, time since start: 0:02:56.996222, time since update: 0:02:56.975552
done!


In [None]:
#%%timeit 
#process_game(718399)

In [None]:
#%%time 
#process_game(718397)