In [2]:
# %pip install pyarrow

In [3]:
import sys
import os
sys.path.insert(0,'../../')
from scripts.base import *
import scripts.base as base
import pandas as pd
import numpy as np
import pyarrow as pa
import bs4
from IPython.display import display_html,clear_output
import re
from datetime import datetime
import ast
import itertools
from tqdm import tqdm,trange

dtype_backend = 'pyarrow'

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
pd.set_option('compute.use_numexpr', False)
# pd.set_option('compute.default_to_pandas', False)
pd.set_option('io.parquet.engine', 'pyarrow')

In [4]:
def iter_tables(tables,sleep=2):
    if isinstance(tables,dict):
        tables_list = tables.items()
    elif isinstance(tables,(list,tuple)):
        tables_list = enumerate(tables)
    tables_list = tqdm(tables_list)
    for table_id, table in tables_list:
        tables_list.set_description(table_id)
        display_html(table)
        time.sleep(sleep)
        clear_output()

def convert_time_to_minutes(time_str):
    if pd.notnull(time_str):
        minutes, seconds = time_str.split(':')
        return float(minutes) + float(seconds) / 60.0
    else:
        return pd.NA

def convert_series_dtype(series):
    # backend = {'numpy':np, 'pyarrow':pa}[backend]
    try:    return series.astype(pd.Int32Dtype())
    except: pass
    try:    return series.astype(pd.Float32Dtype())
    except: pass
    try:    return series.astype(str).apply(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S'))
    except: pass
    try:
        if series.str.contains('\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}').all():
            return series.apply(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S'))
        elif series.str.contains('\d{4}-\d{2}-\d{2}').all():
            return pd.to_datetime(series, format='%Y-%m-%d')
        elif series.str.isnumeric().all():
            return series.astype(pd.Int32Dtype())
        elif series.str.isnumeric().any():
            return series.astype(pd.Float32Dtype())
    except: pass
    return series 

def parse_html_table(html_text,**kwargs):
    return pd.concat(pd.read_html(html_text,flavor='bs4',dtype_backend=dtype_backend,**kwargs))

def parse_html_table_as_hrefs(html_text,columns=None):
    df_hrefs = parse_html_table(html_text,extract_links='all')
    df_hrefs = df_hrefs.apply(lambda x: x.apply(lambda y: ast.literal_eval(y)[1]))
    if columns is not None:
        df_hrefs.columns = columns
    return df_hrefs

def __parse_boxscores_tables_type1__(html_text):
    # parse line score and four factors
    html_soup = make_soup(html_text)
    parsed_boxscores_tables = {}
    for table_elmt in html_soup.find_all('table'):
        table_id = table_elmt.get('id')
        if table_id in ['line_score','four_factors']:
            df = parse_html_table(str(table_elmt)).droplevel(0,axis=1)
            df.columns = df.columns.str.lower()
            df.rename(columns={'unnamed: 0_level_1':'team'},inplace=True)
            df_hrefs = parse_html_table_as_hrefs(str(table_elmt),columns=df.columns)
            df.insert(1,'team_href',df_hrefs['team'])
            parsed_boxscores_tables[table_id] = df
    return parsed_boxscores_tables
            
def __parse_boxscores_tables_type2__(html_text):
    # parse box scores that starts with box
    table_away_home_teams = __parse_game_info_tables__(html_text)['info-away-home-teams']
    away,home = table_away_home_teams['team_id']
    away_href,home_href = table_away_home_teams['team_href']

    html_soup = make_soup(html_text)
    parsed_boxscores_tables = {}
    for table_elmt in html_soup.find_all('table'):
        table_id = table_elmt.get('id')
        if table_id and table_id.startswith('box'):
            if away in table_id:
                table_id = re.sub(away,'away',table_id)
                team = away
                team_href = away_href
            elif home in table_id:
                table_id = re.sub(home,'home',table_id)
                team = home
                team_href = home_href

            df_players = parse_html_table(str(table_elmt)).droplevel(0,axis=1)
            df_players.columns = df_players.columns.str.lower()
            df_players.rename(columns={'starters':'player'},inplace=True)
            df_hrefs = parse_html_table_as_hrefs(str(table_elmt),columns=df_players.columns)
            df_players = df_players.replace('Did Not Play', float('nan'))
            df_players.insert(1,'player_href', df_hrefs['player'])
            df_players.insert(2, 'played', df_players['mp'].notna().astype(int))

            df_total = df_players[df_players['player'].str.startswith('Team Totals')].apply(convert_series_dtype)
            df_total.insert(0,'team',df_total.pop('player'))
            df_total.insert(1,'team_href',df_total.pop('player_href'))

            df_players = df_players[~df_players['player'].isin(('Reserves','Team Totals'))].apply(convert_series_dtype)
            df_players.insert(0,'team',team)
            df_players.insert(1,'team_href',team_href)

            parsed_boxscores_tables[table_id] = df_players
            parsed_boxscores_tables[table_id+'-total'] = df_total
    return parsed_boxscores_tables

def __parse_game_info_tables__(html_text):
    html_soup = make_soup(html_text)
    away,home = html_soup.select('div .scorebox strong a')
    df_game_info = []
    for team_type, team_element in zip(('away','home'),(away,home)):
        team_name = team_element.text
        team_href = team_element['href']
        team_id,season = re.search(r'/teams/(?P<team>\w+)/(?P<season>\d+)\.html',team_href).groups()
        df_game_info.append([team_id, team_href, team_name, season, team_type])
    df_game_info = pd.DataFrame(df_game_info,columns = ['team_id', 'team_href', 'team_name', 'season', 'team_type'])
    return {'info-away-home-teams': df_game_info}    
    
def parse_all_boxscores_tables(html_text):
    parsed_boxscores_tables = {
        **__parse_game_info_tables__(html_text),
        **__parse_boxscores_tables_type1__(html_text),
        **__parse_boxscores_tables_type2__(html_text),
    }
    return parsed_boxscores_tables

href = 'https:/www.basketball-reference.com/boxscores/202301100GSW.html'
url = f'{href}'
html_text = fetch_html(url,source=base.LOCAL_HOST)
parsed_boxscores_tables = parse_all_boxscores_tables(html_text)

# iter_tables(parsed_boxscores_tables,3)

In [5]:
def pairwise(iterable):
    # pairwise('ABCDEFG') --> AB BC CD DE EF FG
    a, b = itertools.tee(iterable)
    next(b, None)
    return zip(a, b)

def __parse_pbp_tables_type1__(html_text):
    html_soup = make_soup(html_text)
    pbp_table_elmt = html_soup.find('table',{'id':'pbp'})
    player_href_table = pd.DataFrame({(a.text,a['href']) for a in pbp_table_elmt.find_all('a')},columns=['player','player_href'])
    return {'player-href': player_href_table}

def __parse_pbp_tables_type2__(html_text):
    html_soup = make_soup(html_text)
    pbp_table_elmt = html_soup.find('table',{'id':'pbp'})

    pbp_table = pd.concat(pd.read_html(str(pbp_table_elmt),flavor='bs4',dtype_backend=dtype_backend))
    pbp_table.columns = pbp_table.columns.droplevel(0)
    away,home = pbp_table.columns[[1,5]]
    pbp_table = pbp_table.rename({away:'away_action',home:'home_action','Unnamed: 2_level_1':'away_pts_change','Unnamed: 4_level_1':'home_pts_change'},axis=1)
    pbp_table.insert(1,'away',away)
    pbp_table.insert(len(pbp_table.columns),'home',home)
    pbp_table.columns = pbp_table.columns.str.lower()

    time_series = pbp_table[pbp_table['time'].str.contains('Q') | pbp_table['time'].str.contains('OT')]['time']
    quarter_marks = [0] + list(time_series.index) + [None]
    quarter_order = range(1,16) # maximum expected Qs = 15 (OT1 = 5, OT2 = 6,...)
    for q,(i,j) in list(zip(quarter_order,pairwise(quarter_marks))):
        pbp_table.loc[pbp_table.index[i:j],'quarter'] = q
    pbp_table.insert(0,'quarter',pbp_table.pop('quarter'))
    # pbp_table.insert(2,'time_min',pbp_table['time'].apply(lambda x: int(x.split(':')[0]) if pd.notnull(x) and ':' in x else pd.NA))
    # pbp_table.insert(3,'time_sec',pbp_table['time'].apply(lambda x: float(x.split(':')[1]) if pd.notnull(x) and ':' in x else pd.NA))
    pbp_table = pbp_table[pbp_table['time'].str.contains(':')].reset_index(drop=True)
    return {'pbp': pbp_table}


def parse_all_pbp_tables(html_text):
    parsed_pbp_tables = {
        **__parse_game_info_tables__(html_text),
        **__parse_pbp_tables_type1__(html_text),
        **__parse_pbp_tables_type2__(html_text),
    }
    return parsed_pbp_tables

href = 'https:/www.basketball-reference.com/boxscores/pbp/202211090OKC.html'
url = f'{href}'
html_text = fetch_html(url,source=base.LOCAL_HOST)
parsed_pbp_tables = parse_all_pbp_tables(html_text)

# iter_tables(parsed_pbp_tables,5)

In [6]:
def __parse_shotchart_tables_type1__(html_text):
    html_soup = make_soup(html_text)
    table_away_home_teams = __parse_game_info_tables__(html_text)['info-away-home-teams']
    away,home = table_away_home_teams['team_id']
    away_href,home_href = table_away_home_teams['team_href']

    parsed_shot_charts = {}
    shooting_tables = {table_elmt.get('id'): table_elmt for table_elmt in html_soup.find_all('table') if table_elmt.get('id')}
    for table_id, table_elmt in shooting_tables.items():
        if away in table_id:
            team_type = 'away'
            table_id = re.sub(away,team_type,table_id)
            team = away
            team_href = away_href
        elif home in table_id:
            team_type = 'home'
            table_id = re.sub(home,team_type,table_id)
            team = home
            team_href = home_href
        shooting_table_team = parse_html_table(str(table_elmt))
        shooting_table_team.insert(0,'team',team)
        shooting_table_team.insert(1,'team_href',team_href)
        parsed_shot_charts[table_id] = shooting_table_team
    return parsed_shot_charts


def __parse_shotchart_tables_type2__(html_text):
    html_soup = make_soup(html_text)
    table_away_home_teams = __parse_game_info_tables__(html_text)['info-away-home-teams']
    away,home = table_away_home_teams['team_id']
    away_href,home_href = table_away_home_teams['team_href']

    parsed_shot_charts = {}
    shot_area_divs = {div_elmt.get('id'): div_elmt for div_elmt in html_soup.find_all('div', {'class': 'shot-area'}) if div_elmt.get('id')} 
    for div_id, div_elmt in shot_area_divs.items():
        if away in div_id:
            team_type = 'away'
            div_id = re.sub(away,team_type,div_id)
            team = away
            team_href = away_href
        elif home in div_id:
            team_type = 'home'
            div_id = re.sub(home,team_type,div_id)
            team = home
            team_href = home_href
        tooltip_divs = div_elmt.find_all('div', {'class': 'tooltip'})

        data = []
        for tooltip_div in tooltip_divs:
            classes_str = tooltip_div['class']
            # parse quarter, player, shot result
            _, q_class, player_code, shot_result = classes_str
            quarter = int(re.search(r'q-(\d)', q_class).group(1))
            # parse shot position
            style_str = tooltip_div['style']
            px_match = re.search(r'top:(\d+)px;left:(\d+)px;', style_str)
            px_y = int(px_match.group(1))
            px_x = int(px_match.group(2))
            # parse detailed info from description
            tip_str = tooltip_div['tip']
            description = re.sub('<br>',', ',tip_str)
            time_str = re.findall(r'(\d{1,2}:\d{2}.\d{1}) remaining', tip_str)[0]  # extract time
            shot_distance = re.findall(r'from (\d+?) ft', tip_str)[0] 
            score_str = re.findall(r'(\d+-\d+)', tip_str)[0]  # extract score
            team_str = re.findall(r'<br>(\w+|\w+[\s\w+]+?) (?:now|tied|leads|trails)', tip_str)[0] 
            score_res_str = re.findall(r'(tied|leads|trails|now leads|now tied)', tip_str)[0] 
            data.append({
                'quarter': quarter,
                'time': time_str,
                'player_code': player_code,
                'shot_result': shot_result,
                'distance': shot_distance,
                'pos_x': px_x,
                'pos_y': px_y,
                'team': team_str,
                'score_res': score_res_str,
                'score': score_str,
                'description': description,
            })

        parsed_shot_charts[f'shots-{team_type}'] = pd.DataFrame(data)

    return parsed_shot_charts

def parse_all_shotchart_tables(html_text):
    parsed_shot_charts = {
        **__parse_game_info_tables__(html_text),
        **__parse_shotchart_tables_type1__(html_text),
        **__parse_shotchart_tables_type2__(html_text)
    }
    return parsed_shot_charts

href = 'https:/www.basketball-reference.com/boxscores/shot-chart/202211090OKC.html'
url = f'{href}'
html_text = fetch_html(url,source=base.LOCAL_HOST)
html_soup = make_soup(html_text)

parsed_shot_charts = parse_all_shotchart_tables(html_text)
# iter_tables(parsed_shot_charts)


In [7]:
def __parse_plusminus_tables_type1__(html_text):
    html_soup = make_soup(html_text)
    plusminus_elmt = html_soup.find('div',{'class':'plusminus'}) 
    header_div = plusminus_elmt.find('div', class_='header')
    headers = [h.text for h in header_div.find_all('div')]
    df_header = pd.DataFrame({'quarter': headers, 'width': [int(d['style'].split(':')[-1].replace('px;', '')) for d in header_div.find_all('div')]})
    return {'info-quarter-width': df_header}

def __parse_plusminus_tables_type2__(html_text):
    html_soup = make_soup(html_text)
    table_away_home_teams = __parse_game_info_tables__(html_text)['info-away-home-teams']
    away_name,home_name = table_away_home_teams['team_name']

    table_type = 'plusminus-player-total'
    plusminus_elmt = html_soup.find('div',{'class':'plusminus'})
    parsed_pm_tables = {}
    for team_pm_elmt in plusminus_elmt.div.find_all('div',recursive=False):
        team_name = team_pm_elmt.find('h3').text
        if away_name in team_name:
            team_type = 'away'
            table_name = f'{team_type}-{table_type}'
        elif home_name in team_name:
            team_type = 'home'
            table_name = f'{team_type}-{table_type}'

        team_player_divs = team_pm_elmt.find_all('div', recursive=False, class_='player')
        team_player_pm = []
        for player_div in team_player_divs:
            player_name = player_div.find('span').text
            player_on = re.findall(r"On: ([-+]*\d+)", player_div.text)[0]
            player_off = re.findall(r"Off: ([-+]*\d+)", player_div.text)[0]
            player_net = re.findall(r"Net: ([-+]*\d+)", player_div.text)[0]
            player_dict = {
                'team_name': team_name,
                'player_name': player_name,
                'player_on': player_on,
                'player_off': player_off,
                'player_net': player_net,
            }
            team_player_pm.append(player_dict)
        df_team_pm = pd.DataFrame(team_player_pm)
        parsed_pm_tables[table_name] = df_team_pm
    return parsed_pm_tables

def __parse_plusminus_tables_type3__(html_text):
    html_soup = make_soup(html_text)
    table_away_home_teams = __parse_game_info_tables__(html_text)['info-away-home-teams']
    away_name,home_name = table_away_home_teams['team_name']

    table_type = 'player-plusminus-on-off'
    plusminus_elmt = html_soup.find('div',{'class':'plusminus'})
    parsed_pm_tables = {}
    for team_pm_elmt in plusminus_elmt.div.find_all('div',recursive=False):
        team_name = team_pm_elmt.find('h3').text
        if away_name in team_name:
            team_type = 'away'
            table_name = f'{team_type}-{table_type}'
        elif home_name in team_name:
            team_type = 'home'
            table_name = f'{team_type}-{table_type}'

        player_divs = team_pm_elmt.find_all('div', {'class':'player'}, recursive=False)
        player_pm_divs = team_pm_elmt.find_all('div', {'class':'player-plusminus'}, recursive=False, )
        team_pm_list = []
        for player_div, player_pm_div in zip(player_divs,player_pm_divs):
            player_name = player_div.find('span').text
            player_pm_list = []
            for pm_div in player_pm_div.find_all('div'):
                pm_class = pm_div.get('class')[0] if pm_div.get('class') is not None else 'off'
                pm_width = int(pm_div['style'].split(':')[-1].replace('px;', ''))
                pm_value = pm_div.text if pm_div.text.strip() != '' else '0'
                player_pm_list.append({
                    'team_name': team_name,
                    'player_name' : player_name,
                    'pm_class': pm_class,
                    'pm_width': pm_width,
                    'pm_value': pm_value,
                })
            player_pm_list = pd.DataFrame(player_pm_list)
            team_pm_list.append(player_pm_list)
        df_team_plusminus = pd.concat(team_pm_list).reset_index()
        parsed_pm_tables[table_name] = df_team_plusminus
    return parsed_pm_tables

def parse_all_plusminus_tables(html_text):
    parsed_plusminus_charts = {
        **__parse_game_info_tables__(html_text),
        **__parse_plusminus_tables_type1__(html_text),
        **__parse_plusminus_tables_type2__(html_text),
        **__parse_plusminus_tables_type3__(html_text),
    }
    return parsed_plusminus_charts

href = 'https:/www.basketball-reference.com/boxscores/plus-minus/201812220WAS.html'
url = f'{href}'
html_text = fetch_html(url,source=base.LOCAL_HOST)
html_soup = make_soup(html_text)

parsed_pm_tables = parse_all_plusminus_tables(html_text)
# iter_tables(parsed_pm_tables,5)

In [8]:
def extract_id(text,query):
    patterns = {
        'boxscore'   : r'/(\d+\w{3})',
        'team'       : r'(\b[A-Z]{3}\b)',
        'team/season': r'(\b[A-Z]{3}/\d{4})',
        'season'     : r'(\b\d{4})',
        'player'     : r'(\b\w/\w+\d{2}\b)'
    }
    try: 
        return re.search(patterns[query], text).group(1)
    except:
        return None

href = 'https:/www.basketball-reference.com/boxscores/201812220WAS.html'
href = 'https://www.basketball-reference.com/teams/ATL/'
href = 'https://www.basketball-reference.com/teams/ATL/2021.html'
href = 'https://www.basketball-reference.com/players/p/pettibo01.html'
extract_id(href, 'player')


'p/pettibo01'

In [40]:
def parse_boxscores(href,boxscore_query = ['boxscores','pbp','shot-chart','plus-minus'],from_local=base.LOCAL_HOST):
    parsed_boxscores_tables = {}
    parsed_pbp_tables = {}
    parsed_shotchart_tables = {}
    parsed_pm_tables = {}

    boxscore_id =  extract_id(href,query='boxscore')
    for query in to_list(boxscore_query):
        if query in ['boxscores','boxscore',None]:
            bxscr_href = f'https://www.basketball-reference.com/boxscores/{boxscore_id}.html'
            html_text = fetch_html(bxscr_href,source=from_local)
            parsed_boxscores_tables = parse_all_boxscores_tables(html_text)

        elif query in ['play-by-play','pbp']:
            bxscr_href = f'https://www.basketball-reference.com/boxscores/pbp/{boxscore_id}.html'
            html_text = fetch_html(bxscr_href,source=from_local)
            parsed_pbp_tables = parse_all_pbp_tables(html_text)

        elif query in ['shot-chart','shotchart']:
            bxscr_href = f'https://www.basketball-reference.com/boxscores/shot-chart/{boxscore_id}.html'
            html_text = fetch_html(bxscr_href,source=from_local)
            parsed_shotchart_tables = parse_all_shotchart_tables(html_text)

        elif query in [ 'plus-minus','plusminus']:
            bxscr_href = f'https://www.basketball-reference.com/boxscores/plus-minus/{boxscore_id}.html'
            html_text = fetch_html(bxscr_href,source=from_local)
            parsed_pm_tables = parse_all_plusminus_tables(html_text)

    parsed_boxscores = {
        **parsed_boxscores_tables,
        **parsed_pbp_tables,
        **parsed_shotchart_tables,
        **parsed_pm_tables,        
    }
    return parsed_boxscores

href = 'https:/www.basketball-reference.com/boxscores/201812220WAS.html'
parsed_boxscores = parse_boxscores(href,from_local=base.LOCAL_HOST)
iter_tables(parsed_boxscores,2)

home-player-plusminus-on-off: 100%|██████████| 58/58 [01:56<00:00,  2.02s/it]


In [10]:
checkpoint = Checkpoint(levels=3)


    Example Usage:
    -------
    ```
    checkpoint = Checkpoint(levels=3)
    for i in range(10):
        if checkpoint.skip_this_point(i):
            # Skip points prior to checkpoint
            continue
        for j in range(10):
            if checkpoint.skip_this_point(i,j):
                # Skip points prior to checkpoint
                continue
            for k in range(10):
                if checkpoint.skip_this_point(i,j,k):
                    # Skip points prior to checkpoint
                    continue
                # Only executes when the checkpoint is reached
                # Do something
                print(i,j,k)
                time.sleep(0.5)
                # Update and save
                checkpoint.update(i,j,k)
                checkpoint.save()
    # Delete when task is finished
    checkpoint.delete()
    ```
    


In [None]:
class Query:
    def __init__(self) -> None:
        pass



class Season

In [None]:
# df = parsed_pbp_tables['pbp']
# %timeit df.to_csv('test.csv')
# %timeit df = pd.read_csv('test.csv')

1.7 ms ± 60.4 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
1.01 ms ± 22.7 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


In [None]:
# !ls -GFlash test.csv

112 -rw-r--r--  1 jasetran  staff    55K Apr 24 19:09 test.csv
