In [1]:
import time
import pandas as pd
import polars as pl
import numpy as np
import pickle as pkl
import os
import sys
import json
import re
import pyarrow

from scipy import stats
from collections import defaultdict
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.pipeline import Pipeline
from sklearn.decomposition import PCA
from datetime import timedelta
from IPython.display import clear_output
from pathlib import Path
from datetime import datetime as dt

from multimodal_communication import cloud_functions as cf
from mlb_simulation.build_datasets import constants
from mlb_simulation.build_datasets.utils_polars import (_correct_home_away_swap,
                   _get_wind_direction,
                   _convert_wind_direction,
                   _segregate_plays_by_pitbat_combo
)
from mlb_simulation.build_datasets.dataset_builder_polars import DatasetBuilder

In [2]:
with open('../../../../../../../Documents/MLB-Data/raw_pitches/pitches_2017.pkl', 'rb') as fpath:
    df = pkl.load(fpath)

In [None]:
class DatasetBuilder():

    def __init__(self, rolling_windows=[75, 504], verbose=False,  gcloud_upload=False,
                 gcloud_upload_path='', local_save=False, local_save_dir_path=''):
        
        self.rolling_windows = rolling_windows
        self.verbose = verbose
        self.gcloud_upload = gcloud_upload
        self.gcloud_upload_path = gcloud_upload_path
        self.local_save = local_save
        self.local_save_dir_path = local_save_dir_path

    def build_training_dataset(self, raw_pitches, save_coefficients=False, coef_save_path=''):
        """
        Cleans raw pitch data, generates neutralization coefficients, anad build a final
        machine readable dataset.

        Args:
            raw_pitches (dict): Raw pitch data for each 'pitbat' combo.
            suffix (str): Suffix for file names.
            save_coefficients (bool): Whether to save neutralization coefficients.

        Returns:
            dict: Training dataset dictionary containing features and target values.

        FUNCTION CONNECTIONS:
        ----------------------
        Calls On: _clean_raw_pitches()
                  _build_neutralization_coefficient_dictionaries()
                  _make_final_dataset()
        """

        # Clean raw pitches and return a cleaned pitches DataFrame
        cleaned_data = self._clean_raw_pitches(raw_pitches)

        # Create a neutralization coefficients dictionary
        coef_dicts = self.build_neutralization_coefficient_dictionaries(cleaned_data)

        if save_coefficients:
            # Format the windows in a variable to help with the naming conventions while saving
            windows = '_'.join([window for window in self.rolling_windows])

            if self.gcloud_upload:
                cf.CloudHelper(obj=coef_dicts).upload_to_cloud(
                    'simulation_training_data', f"neutralization_coefficients_dict_{windows}")
            if self.local_save:
                if not coef_save_path: # Ensure a path is given for a local save
                    raise ValueError('In order to save the coefficients locally, a path to a directory must be provided')
                
                base_path = Path(coef_save_path)
                filename = f'/neutralization_coefficients_dict_{windows}.pkl'
                full_path = base_path + filename
                base_path.mkdir(parents=True, exist_ok=True)

                with open(full_path, 'wb') as f:
                    pkl.dump(coef_dicts, f)

        # Build the final dataset
        final_dataset = self._make_final_dataset(cleaned_data, coef_dicts)
        if self.gcloud_upload:
            cf.CloudHelper(obj=final_dataset).upload_to_cloud(
                'simulation_training_data', f"Final Datasets/final_dataset_{windows}")
        if self.local_save:
            base_path = Path(self.local_save_dir_path)
            filename = f'/daily_stats_df_updated_{dt.today().strftime("%Y-%m-%d")}.pkl'
            full_path = base_path + filename
            base_path.mkdir(parents=True, exist_ok=True)

            with open(full_path, 'wb') as f:
                pkl.dump(final_dataset, f)
        
        return final_dataset
    
    ######################################################################################
    # Clean Pitch Data
    ######################################################################################
    def clean_raw_pitches(self, raw_pitches_df: pd.DataFrame) -> pl.LazyFrame:
        """
        Cleans a DataFrame of raw pitch data, filtering and transforming it into a usable format 
        for subsequent analyses, including attaching weather and ballpark information.

        Parameters:
            raw_pitches_df (DataFrame): A DataFrame of uncleaned pitch data from the Statcast API.

        Returns:
            dict: A dictionary with 4 keys ("RR", "RL", "LR", "LL"), each containing a DataFrame 
            of pitches divided by batter-pitcher handedness combination.

        FUNCTION CONNECTIONS:
        ----------------------
        Calls On: 
        """

        if self.verbose:
            print("Cleaning Data")

        # Grab necesssary information from the df for later use that would
        # later require calling collect early
        self.unique_years = raw_pitches_df.game_date.dt.year.unique().tolist()

        # Convert the raw_pitches file to a LazyFrame
        raw_pitches_df = pl.from_pandas(raw_pitches_df).lazy()

        # Filter down to only regular season games
        raw_pitches_df = raw_pitches_df.filter(pl.col('game_type') == 'R')
    
        # Correct home and away mistakes in the pitch data
        #raw_pitches_df = _correct_home_away_swap(raw_pitches_df)

        # Convert the datetime game_date to a string formatted as YYYY-MM-DD, and sort the df on the column to make sure everything is in order
        raw_pitches_df = raw_pitches_df.with_columns(
            pl.col("game_date").dt.strftime("%Y-%m-%d").alias("game_date")
        ).sort(by=["game_date", "inning", "inning_topbot", "at_bat_number"],
               descending=[False, False, False, False])

        # Filter all pitches to only those with an event\
        raw_plays = raw_pitches_df.drop_nulls(subset=['events'])

        # Filter all pitches with an event to only those types we care about
        # As well as only the columns we care about
        final_plays = raw_plays.filter(
            pl.col('events').is_in(constants.RELEVANT_PLAY_TYPES)
        ).select(
            constants.RELEVANT_BATTING_COLUMNS
        )

        # Add a new column that groups all the event types into eventual Y labels
        final_plays = final_plays.with_columns(
            pl.col('events').replace(constants.PLAY_TYPE_DICT).alias('play_type')
        )

        # Insert a new 'type counter' coulumn that will be used repeatedly for calculating rolling stats
        final_plays = final_plays.with_columns(
            pl.lit(1).alias('type_counter')
        )
        

        ############ ATTATCH WEATHER INFORMATION TO EACH PITCH ############
        
        weather_dictionary_holder = {}

        for year in self.unique_years:
            # Pull in the proreference weather data 
            yearly_weather_df = pd.DataFrame()
            base_path = self.local_save_dir_path
            filename = f'/proreference_weather_data/weather_data_{year}.pkl'
            weather_filepath = base_path + filename
            if os.path.exists(weather_filepath):
                with open(weather_filepath, 'rb') as fpath:
                    yearly_weather_df = pl.read_csv(fpath)
            
            # If not locally, download from the cloud
            if len(yearly_weather_df) == 0:
                yearly_weather_df = cf.CloudHelper().download_from_cloud("proreference_weather_data/weather_data_{}".format(year))
                yearly_weather_df = pl.from_pandas(yearly_weather_df)  

            if len(yearly_weather_df) == 0:
                raise Exception(f'No Prorefence weather data was found for {year}. Ensure that the data exists and paths are correct!')
            
            # Insert each years data into storage as a LazyFrame
            weather_dictionary_holder[year] = yearly_weather_df.lazy()
        
        # Concat all the yearly weather dataframes into one larger one
        total_weather_df = pl.concat([df for df in weather_dictionary_holder.values()], how='vertical')

        # Create a new column with the converted home team names in total_weather_df
        team_name_map = {v: k for k, v in constants.WEATHER_NAME_CONVERSIONS.items()}
        total_weather_df = total_weather_df.with_columns([
            pl.col('home_team').replace(team_name_map).alias('converted_home_team'),
            pl.col('away_team').replace(team_name_map).alias('converted_away_team')
        ])

        # Drop the old columns
        total_weather_df = total_weather_df.drop(['home_team', 'away_team'])

        # Attatch the full str description of the weather, found by joining the pitches df and the weather df
        # Note that this will add atrifically add some rows due to double headers, that will join with both
        # Descriptions, but this can be thought of an 'average' of the two
        total_weather_df = total_weather_df.rename({'date':'game_date', 'converted_home_team':'home_team'})
        final_plays = final_plays.join(total_weather_df, on=['game_date', 'home_team']).drop(
            ['converted_away_team', 'url']
        )
        
        # Break up the full weather info into temp, wind speed, and wind direction seperately
        final_plays = final_plays.with_columns(
            pl.col('weather').str.split(': ').list.get(1).str.split("°").list.get(0).cast(pl.Int64).alias('temperature'),
            
            # Wind direction
            pl.when(pl.col("weather").is_not_null())
            .then(
                pl.when(pl.col("weather").str.replace("Wind", "").str.contains("(?i)in"))
                    .then(pl.lit("in"))
                .when(pl.col("weather").str.replace("Wind", "").str.contains("(?i)out"))
                    .then(pl.lit("out"))
                .when(pl.col("weather").str.contains("Left|Right"))
                    .then(pl.col("weather").str.extract(r"from ([^,.]+)", 1).str.to_lowercase())
                .otherwise(None)
            )
            .otherwise(None)
            .alias("wind_direction"),

            # Wind Speed
            pl.when(pl.col('weather').str.contains("Wind"))
            .then(
                pl.col('weather').str.split('Wind ').list.get(1).str.split('mph').list.get(0).cast(pl.Int64) 
            )
            .otherwise(pl.lit(0))
            .alias('wind_speed')
        )

        # Convert the wind direction for 0 wind results from 'in' to 'zero'
        final_plays = final_plays.with_columns(
            pl.when(pl.col('wind_speed') == 0)
                .then(pl.lit('zero'))
                .otherwise(pl.col('wind_direction'))
                .alias('wind_direction')
        )

        # Create columns for each of the wind directions, with the value of the wind speed in that direction
        final_plays = final_plays.with_columns(
            (pl.col('wind_speed') * (pl.col('wind_direction').eq(pl.lit('in')))).alias('in'),
            (pl.col('wind_speed') * (pl.col('wind_direction').eq(pl.lit('out')))).alias('out'),
            (pl.col('wind_speed') * (pl.col('wind_direction').eq(pl.lit('right to left')))).alias('rtl'),
            (pl.col('wind_speed') * (pl.col('wind_direction').eq(pl.lit('left to right')))).alias('ltr')
        ).drop('wind_direction', 'wind_speed')

        return final_plays


In [33]:
builder = DatasetBuilder(local_save_dir_path='../../../../../../../Documents/MLB-Data')

x = builder.clean_raw_pitches(df)

In [34]:
x.collect()

game_date,player_name,batter,pitcher,events,stand,p_throws,home_team,away_team,hit_location,bb_type,on_3b,on_2b,on_1b,outs_when_up,inning,inning_topbot,game_type,game_pk,estimated_ba_using_speedangle,launch_speed_angle,bat_score,fld_score,post_bat_score,if_fielding_alignment,of_fielding_alignment,delta_home_win_exp,play_type,type_counter,Unnamed: 29_level_0,weather,temperature,wind_direction,wind_speed,in,out,rtl,ltr
str,str,i64,i64,str,str,str,str,str,i64,str,i64,i64,i64,i64,i64,str,str,i64,f64,i64,i64,i64,i64,str,str,f64,str,i32,i64,str,i64,str,i64,i64,i64,i64,i64
"""2017-04-02""","""Tanaka, Masahiro""",572816,547888,"""single""","""L""","""R""","""TB""","""NYY""",8,"""line_drive""",,,,0,1,"""Bot""","""R""",490106,0.893,4,0,0,0,"""Standard""","""Standard""",0.036,"""single""",1,2,"""Start Time Weather: 72° F, Win…",72,"""zero""",0,0,0,0,0
"""2017-04-02""","""Lester, Jon""",451594,452657,"""field_out""","""R""","""L""","""STL""","""CHC""",8,"""line_drive""",,,,0,1,"""Bot""","""R""",490099,0.233,3,0,0,0,"""Standard""","""Standard""",-0.021,"""field_out""",1,1,"""Start Time Weather: 63° F, Win…",63,"""right to left""",8,0,0,8,0
"""2017-04-02""","""Tanaka, Masahiro""",595281,547888,"""double""","""L""","""R""","""TB""","""NYY""",7,"""line_drive""",,,572816,0,1,"""Bot""","""R""",490106,0.893,4,0,0,0,"""Standard""","""Standard""",0.103,"""double""",1,2,"""Start Time Weather: 72° F, Win…",72,"""zero""",0,0,0,0,0
"""2017-04-02""","""Bumgarner, Madison""",572041,518516,"""field_out""","""R""","""L""","""ARI""","""SF""",3,"""popup""",,,,0,1,"""Bot""","""R""",490110,0.0,3,0,0,0,"""Standard""","""Standard""",-0.021,"""field_out""",1,0,"""Start Time Weather: 75° F, Win…",75,"""left to right""",7,0,0,0,7
"""2017-04-02""","""Lester, Jon""",649557,452657,"""double""","""R""","""L""","""STL""","""CHC""",9,"""line_drive""",,,,1,1,"""Bot""","""R""",490099,0.453,5,0,0,0,"""Standard""","""Standard""",0.04,"""double""",1,1,"""Start Time Weather: 63° F, Win…",63,"""right to left""",8,0,0,8,0
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""2017-10-01""","""Skoglund, Eric""",592273,607215,"""field_out""","""R""","""L""","""KC""","""ARI""",8,"""fly_ball""",,,571757,1,9,"""Top""","""R""",492514,0.007,3,14,2,14,"""Standard""","""Standard""",0.0,"""field_out""",1,2420,"""Start Time Weather: 77° F, Win…",77,"""right to left""",11,0,0,11,0
"""2017-10-01""","""Skoglund, Eric""",518614,607215,"""field_out""","""L""","""L""","""KC""","""ARI""",8,"""line_drive""",,,571757,2,9,"""Top""","""R""",492514,0.647,4,14,2,14,"""Standard""","""Standard""",0.0,"""field_out""",1,2420,"""Start Time Weather: 77° F, Win…",77,"""right to left""",11,0,0,11,0
"""2017-10-01""","""Albers, Matt""",607471,458006,"""strikeout""","""R""","""R""","""WSH""","""PIT""",2,,,,,0,9,"""Top""","""R""",492525,,,11,6,11,"""Standard""","""Standard""",0.001,"""strikeout""",1,2429,"""Start Time Weather: 67° F, Win…",67,"""in""",5,5,0,0,0
"""2017-10-01""","""Albers, Matt""",516782,458006,"""field_out""","""R""","""R""","""WSH""","""PIT""",6,"""line_drive""",,,,1,9,"""Top""","""R""",492525,0.673,4,11,6,11,"""Standard""","""Standard""",0.0,"""field_out""",1,2429,"""Start Time Weather: 67° F, Win…",67,"""in""",5,5,0,0,0


In [None]:
df1 = x[0].rename({'date':'game_date', 'converted_home_team':'home_team'}).collect()
df2 = x[1].collect()

TypeError: 'LazyFrame' object is not subscriptable (aside from slicing)

Use `select()` or `filter()` instead.

In [None]:
df1

Unnamed: 0_level_0,game_date,weather,url,home_team,converted_away_team
i64,str,str,str,str,str
0,"""2017-04-02""","""Start Time Weather: 75° F, Win…","""https://www.baseball-reference…","""ARI""","""SF"""
1,"""2017-04-02""","""Start Time Weather: 63° F, Win…","""https://www.baseball-reference…","""STL""","""CHC"""
2,"""2017-04-02""","""Start Time Weather: 72° F, Win…","""https://www.baseball-reference…","""TB""","""NYY"""
3,"""2017-04-03""","""Start Time Weather: 65° F, Win…","""https://www.baseball-reference…","""BAL""","""TOR"""
4,"""2017-04-03""","""Start Time Weather: 48° F, Win…","""https://www.baseball-reference…","""BOS""","""PIT"""
…,…,…,…,…,…
2463,"""2017-10-27""","""Start Time Weather: 65° F, Win…","""https://www.baseball-reference…","""LAD""","""HOU"""
2464,"""2017-10-28""","""Start Time Weather: 67° F, Win…","""https://www.baseball-reference…","""LAD""","""HOU"""
2465,"""2017-10-29""","""Start Time Weather: 69° F, Win…","""https://www.baseball-reference…","""LAD""","""HOU"""
2466,"""2017-10-31""","""Start Time Weather: 67° F, Win…","""https://www.baseball-reference…","""LAD""","""HOU"""


In [None]:
df2.shape

(184282, 29)

In [None]:
df2.join(df1, on =['game_date', 'home_team'], how='left').select('home_team')

home_team
str
"""TB"""
"""STL"""
"""TB"""
"""ARI"""
"""STL"""
…
"""KC"""
"""KC"""
"""WSH"""
"""WSH"""


In [None]:
x[0].collect()

Unnamed: 0_level_0,date,weather,url,converted_home_team,converted_away_team
i64,str,str,str,str,str
0,"""2017-04-02""","""Start Time Weather: 75° F, Win…","""https://www.baseball-reference…","""ARI""","""SF"""
1,"""2017-04-02""","""Start Time Weather: 63° F, Win…","""https://www.baseball-reference…","""STL""","""CHC"""
2,"""2017-04-02""","""Start Time Weather: 72° F, Win…","""https://www.baseball-reference…","""TB""","""NYY"""
3,"""2017-04-03""","""Start Time Weather: 65° F, Win…","""https://www.baseball-reference…","""BAL""","""TOR"""
4,"""2017-04-03""","""Start Time Weather: 48° F, Win…","""https://www.baseball-reference…","""BOS""","""PIT"""
…,…,…,…,…,…
2463,"""2017-10-27""","""Start Time Weather: 65° F, Win…","""https://www.baseball-reference…","""LAD""","""HOU"""
2464,"""2017-10-28""","""Start Time Weather: 67° F, Win…","""https://www.baseball-reference…","""LAD""","""HOU"""
2465,"""2017-10-29""","""Start Time Weather: 69° F, Win…","""https://www.baseball-reference…","""LAD""","""HOU"""
2466,"""2017-10-31""","""Start Time Weather: 67° F, Win…","""https://www.baseball-reference…","""LAD""","""HOU"""
