In [None]:
import polars as pl
import pandas as pd
import numpy as np
import time
import datetime

from google.colab import drive
drive.mount('/content/drive')

pl.Config.set_tbl_cols(200)
pl.Config.set_tbl_rows(200)

FILE_PATH = '/content/drive/My Drive/Betting Models/mlb/hits_model/'

Mounted at /content/drive


## Make Data Easier to work with

In [None]:
files = [
    '2021_statcast_data.parquet',
    '2022_statcast_data.parquet',
    '2023_statcast_data.parquet',
    '2024_statcast_data.parquet',
]

df_list = []
for file in files:
    df_temp = pl.read_parquet(f'{FILE_PATH}{file}')
    df_list.append(df_temp)

df_raw = pl.concat(df_list, how='vertical_relaxed')

In [None]:
cols_to_keep = [
    "game_pk", "at_bat_number", "pitch_number", "batter", "pitcher",
    "events", "stand", "p_throws", "inning_topbot", "home_team", "away_team",
    "game_date", "game_type", "bb_type", "balls", "strikes", "outs_when_up",
    "inning", "game_year", "fielder_2", "fielder_3", "fielder_4", "fielder_5",
    "fielder_6", "fielder_7", "fielder_8", "fielder_9",
]

df = (
    df_raw
    .select(cols_to_keep)
)

del df_raw

In [None]:
# Ensure necessary columns are loaded, including 'game_pk', 'at_bat_number', 'pitch_number', 'events', etc.
# Sort data to ensure 'last()' picks the final pitch event
df_pa = (
    df
    .sort(
        "game_pk", "at_bat_number", "pitch_number"
    )
    .group_by(
        "game_pk", "at_bat_number" # Group by unique PA identifier
    )
    .last() # Take the last pitch record for each PA
)

In [None]:
outcome_col_name = "pa_outcome_category"

# --- Map Events to Categories using pl.when().then() ---
df_with_outcome = (
    df_pa
    .with_columns(
        pl.when(pl.col("events") == "single").then(pl.lit(1))
        .when(pl.col("events") == "double").then(pl.lit(2))
        .when(pl.col("events") == "triple").then(pl.lit(3))
        .when(pl.col("events") == "home_run").then(pl.lit(4))
        .when(pl.col("events").is_in(['strikeout', 'strikeout_double_play'])).then(pl.lit(5))
        .when(pl.col("events").is_in(['walk', 'catcher_interf'])).then(pl.lit(6))
        .when(pl.col("events") == "hit_by_pitch").then(pl.lit(7))
        .when(pl.col("events").is_in([
            # Define all events that count as an out on a ball in play
            'field_out', 'force_out', 'grounded_into_double_play',
            'double_play', 'triple_play', 'sac_fly', 'sac_bunt',
            'sac_fly_double_play', 'sac_bunt_double_play',
            'field_error', # Typically counts as an out for the model's purpose
            'fielders_choice_out',
            'fielders_choice' # Often results in an out, but needs care review depending on goal
        ])).then(pl.lit(0))
        .otherwise(pl.lit(99)) # Assign 99 to nulls or any other unmapped event
        .alias(outcome_col_name)
    )
    .filter(pl.col('pa_outcome_category') != 99)
)

In [None]:
df_with_outcome.write_parquet(f'{FILE_PATH}clean_statcast_data.parquet')

## Create Helper Columns for Rolling Stats

In [14]:
df_pa_full = pl.read_parquet(f'{FILE_PATH}clean_statcast_data.parquet')

In [15]:
df_pa_full = (
    df_pa_full
    .with_columns(
        is_pa = pl.col('events').is_not_null(),
        is_ab = pl.col('events').is_in(['single', 'double', 'triple', 'home_run',
                                        'strikeout', 'strikeout_double_play',
                                        'field_out', 'force_out', 'grounded_into_double_play',
                                        'double_play', 'triple_play', 'field_error',
                                        'fielders_choice_out', 'fielders_choice',
                                        ]),
        is_hit = pl.col('events').is_in(['single', 'double', 'triple', 'home_run']),
        is_k = pl.col('events').is_in(['strikeout', 'strikeout_double_play']),
        is_bb = pl.col('events').is_in(['walk', 'catcher_interf']),
        is_hbp = (pl.col('events') == 'hit_by_pitch'),
        is_1b = (pl.col('events') == 'single'),
        is_2b = (pl.col('events') == 'double'),
        is_3b = (pl.col('events') == 'triple'),
        is_hr = (pl.col('events') == 'home_run'),
        is_out = pl.col('events').is_in([
            'field_out', 'force_out', 'grounded_into_double_play',
            'double_play', 'triple_play', 'sac_fly', 'sac_bunt',
            'sac_fly_double_play', 'sac_bunt_double_play',
            'field_error', # Typically counts as an out for the model's purpose
            'fielders_choice_out',
            'fielders_choice',
        ]),
    )
)

In [16]:
league_totals = (
    df_pa_full
    .filter(pl.col('game_year').is_in([2021, 2022]))
    .select(
        # Denominators
        pl.sum("is_pa").alias("total_pa"),
        pl.sum("is_ab").alias("total_ab"),
        # Numerators (Event Counts)
        pl.sum("is_hit").alias("total_h"),
        pl.sum("is_k").alias("total_k"),
        pl.sum("is_bb").alias("total_bb"),
        pl.sum("is_hbp").alias("total_hbp"),
        pl.sum("is_1b").alias("total_1b"),
        pl.sum("is_2b").alias("total_2b"),
        pl.sum("is_3b").alias("total_3b"),
        pl.sum("is_hr").alias("total_hr"),
    )
)

total_pa = league_totals["total_pa"][0]
total_ab = league_totals["total_ab"][0]
total_h = league_totals["total_h"][0]

In [17]:
league_averages_2122 = {}
if total_ab and total_ab > 0:
    league_averages_2122['lg_avg'] = total_h / total_ab

else:
    league_averages_2122['lg_avg'] = 0.0


if total_pa and total_pa > 0:
    league_averages_2122['lg_k_pct'] = league_totals["total_k"][0] / total_pa
    league_averages_2122['lg_bb_pct'] = league_totals["total_bb"][0] / total_pa
    league_averages_2122['lg_hbp_pct'] = league_totals["total_hbp"][0] / total_pa
    # Add individual hit type rates if needed
    league_averages_2122['lg_1b_pct'] = league_totals["total_1b"][0] / total_pa
    league_averages_2122['lg_2b_pct'] = league_totals["total_2b"][0] / total_pa
    league_averages_2122['lg_3b_pct'] = league_totals["total_3b"][0] / total_pa
    league_averages_2122['lg_hr_pct'] = league_totals["total_hr"][0] / total_pa
else:
     # Handle case with 0 PAs
    league_averages_2122['lg_k_pct'] = 0.0
    league_averages_2122['lg_bb_pct'] = 0.0
    league_averages_2122['lg_hbp_pct'] = 0.0
    league_averages_2122['lg_1b_pct'] = 0.0
    league_averages_2122['lg_2b_pct'] = 0.0
    league_averages_2122['lg_3b_pct'] = 0.0
    league_averages_2122['lg_hr_pct'] = 0.0


total = 0
for k, v in league_averages_2122.items():
    if k in ['lg_bb_pct', 'lg_hbp_pct', 'lg_1b_pct', 'lg_2b_pct', 'lg_3b_pct', 'lg_hr_pct']:
        total += v

lg_out_pct = 1 - total
league_averages_2122['lg_out_pct'] = lg_out_pct

print("\nLeague Averages (based on 2021-2022 data):")
print(league_averages_2122)


League Averages (based on 2021-2022 data):
{'lg_avg': 0.2430046187210952, 'lg_k_pct': 0.22924091208889638, 'lg_bb_pct': 0.08148381370630434, 'lg_hbp_pct': 0.011473038224461158, 'lg_1b_pct': 0.1402169753195324, 'lg_2b_pct': 0.0434629195855918, 'lg_3b_pct': 0.0035882083115064234, 'lg_hr_pct': 0.030792907181573653, 'lg_out_pct': 0.6889821376710302}


In [18]:
# Calculate daily sums for batters
df_batter_daily = (
    df_pa_full
    .group_by("batter", "game_date")
    .agg(
        pl.sum("is_pa").alias("daily_pa"),
        pl.sum("is_ab").alias("daily_ab"),
        pl.sum("is_hit").alias("daily_h"),
        pl.sum("is_k").alias("daily_k"),
        pl.sum("is_bb").alias("daily_bb"),
        # Add sums for HBP, 1B, 2B, 3B, HR if needed for individual rate ballasts
        pl.sum("is_hbp").alias("daily_hbp"),
        pl.sum("is_1b").alias("daily_1b"),
        pl.sum("is_2b").alias("daily_2b"),
        pl.sum("is_3b").alias("daily_3b"),
        pl.sum("is_hr").alias("daily_hr"),
    )
    .sort("batter", "game_date")
)

# Calculate daily sums for pitchers (stats allowed)
df_pitcher_daily = (
    df_pa_full
    .group_by("pitcher", "game_date")
    .agg(
        pl.sum("is_pa").alias("daily_pa_a"), # PA Against
        pl.sum("is_ab").alias("daily_ab_a"), # AB Against
        pl.sum("is_hit").alias("daily_h_a"), # H Allowed
        pl.sum("is_k").alias("daily_k_a"), # K Induced
        pl.sum("is_bb").alias("daily_bb_a"),# BB Allowed
        pl.sum('is_hbp').alias('daily_hbp_a'), # HBP Allowed
        pl.sum("is_1b").alias("daily_1b_a"), # 1B Allowed
        pl.sum("is_2b").alias("daily_2b_a"), # 2B Allowed
        pl.sum("is_3b").alias("daily_3b_a"), # 3B Allowed
        pl.sum("is_hr").alias("daily_hr_a"), # HR Allowed
    )
    .sort("pitcher", "game_date")
)

In [19]:
# Calculate shifted cumulative daily stats for batters
df_batter_daily = (
    df_batter_daily
    .with_columns([
        pl.col("daily_pa").cum_sum().over("batter").sort_by("game_date").alias("tmp_cum_pa_prev_day"),
        pl.col("daily_ab").cum_sum().over("batter").sort_by("game_date").alias("tmp_cum_ab_prev_day"),
        pl.col("daily_h").cum_sum().over("batter").sort_by("game_date").alias("tmp_cum_h_prev_day"),
        pl.col("daily_k").cum_sum().over("batter").sort_by("game_date").alias("tmp_cum_k_prev_day"),
        pl.col("daily_bb").cum_sum().over("batter").sort_by("game_date").alias("tmp_cum_bb_prev_day"),
        pl.col("daily_hbp").cum_sum().over("batter").sort_by("game_date").alias("tmp_cum_hbp_prev_day"),
        pl.col("daily_1b").cum_sum().over("batter").sort_by("game_date").alias("tmp_cum_1b_prev_day"),
        pl.col("daily_2b").cum_sum().over("batter").sort_by("game_date").alias("tmp_cum_2b_prev_day"),
        pl.col("daily_3b").cum_sum().over("batter").sort_by("game_date").alias("tmp_cum_3b_prev_day"),
        pl.col("daily_hr").cum_sum().over("batter").sort_by("game_date").alias("tmp_cum_hr_prev_day"),
    ])
    .with_columns(
        pl.col("tmp_cum_pa_prev_day").shift(1).fill_null(0).alias("cum_pa_prev_day"),
        pl.col("tmp_cum_ab_prev_day").shift(1).fill_null(0).alias("cum_ab_prev_day"),
        pl.col("tmp_cum_h_prev_day").shift(1).fill_null(0).alias("cum_h_prev_day"),
        pl.col("tmp_cum_k_prev_day").shift(1).fill_null(0).alias("cum_k_prev_day"),
        pl.col("tmp_cum_bb_prev_day").shift(1).fill_null(0).alias("cum_bb_prev_day"),
        pl.col("tmp_cum_hbp_prev_day").shift(1).fill_null(0).alias("cum_hbp_prev_day"),
        pl.col("tmp_cum_1b_prev_day").shift(1).fill_null(0).alias("cum_1b_prev_day"),
        pl.col("tmp_cum_2b_prev_day").shift(1).fill_null(0).alias("cum_2b_prev_day"),
        pl.col("tmp_cum_3b_prev_day").shift(1).fill_null(0).alias("cum_3b_prev_day"),
        pl.col("tmp_cum_hr_prev_day").shift(1).fill_null(0).alias("cum_hr_prev_day"),
    )
    .drop(
        'tmp_cum_pa_prev_day', 'tmp_cum_ab_prev_day', 'tmp_cum_h_prev_day', 'tmp_cum_k_prev_day',
        'tmp_cum_bb_prev_day', 'tmp_cum_hbp_prev_day', 'tmp_cum_1b_prev_day', 'tmp_cum_2b_prev_day',
        'tmp_cum_3b_prev_day', 'tmp_cum_hr_prev_day',
    )
)

# Calculate shifted cumulative daily stats for pitchers
df_pitcher_daily = (
    df_pitcher_daily
    .with_columns([
        pl.col("daily_pa_a").cum_sum().over("pitcher").sort_by("game_date").alias("tmp_cum_pa_a_prev_day"),
        pl.col("daily_ab_a").cum_sum().over("pitcher").sort_by("game_date").alias("tmp_cum_ab_a_prev_day"),
        pl.col("daily_h_a").cum_sum().over("pitcher").sort_by("game_date").alias("tmp_cum_h_a_prev_day"),
        pl.col("daily_k_a").cum_sum().over("pitcher").sort_by("game_date").alias("tmp_cum_k_a_prev_day"),
        pl.col("daily_bb_a").cum_sum().over("pitcher").sort_by("game_date").alias("tmp_cum_bb_a_prev_day"),
        pl.col("daily_hbp_a").cum_sum().over("pitcher").sort_by("game_date").alias("tmp_cum_hbp_a_prev_day"),
        pl.col("daily_1b_a").cum_sum().over("pitcher").sort_by("game_date").alias("tmp_cum_1b_a_prev_day"),
        pl.col("daily_2b_a").cum_sum().over("pitcher").sort_by("game_date").alias("tmp_cum_2b_a_prev_day"),
        pl.col("daily_3b_a").cum_sum().over("pitcher").sort_by("game_date").alias("tmp_cum_3b_a_prev_day"),
        pl.col("daily_hr_a").cum_sum().over("pitcher").sort_by("game_date").alias("tmp_cum_hr_a_prev_day"),
    ])
    .with_columns(
        pl.col("tmp_cum_pa_a_prev_day").shift(1).fill_null(0).alias("cum_pa_a_prev_day"),
        pl.col("tmp_cum_ab_a_prev_day").shift(1).fill_null(0).alias("cum_ab_a_prev_day"),
        pl.col("tmp_cum_h_a_prev_day").shift(1).fill_null(0).alias("cum_h_a_prev_day"),
        pl.col("tmp_cum_k_a_prev_day").shift(1).fill_null(0).alias("cum_k_a_prev_day"),
        pl.col("tmp_cum_bb_a_prev_day").shift(1).fill_null(0).alias("cum_bb_a_prev_day"),
        pl.col("tmp_cum_hbp_a_prev_day").shift(1).fill_null(0).alias("cum_hbp_a_prev_day"),
        pl.col("tmp_cum_1b_a_prev_day").shift(1).fill_null(0).alias("cum_1b_a_prev_day"),
        pl.col("tmp_cum_2b_a_prev_day").shift(1).fill_null(0).alias("cum_2b_a_prev_day"),
        pl.col("tmp_cum_3b_a_prev_day").shift(1).fill_null(0).alias("cum_3b_a_prev_day"),
        pl.col("tmp_cum_hr_a_prev_day").shift(1).fill_null(0).alias("cum_hr_a_prev_day"),
    )
    .drop(
        'tmp_cum_pa_a_prev_day', 'tmp_cum_ab_a_prev_day', 'tmp_cum_h_a_prev_day', 'tmp_cum_k_a_prev_day',
        'tmp_cum_bb_a_prev_day', 'tmp_cum_hbp_a_prev_day', 'tmp_cum_1b_a_prev_day', 'tmp_cum_2b_a_prev_day',
        'tmp_cum_3b_a_prev_day', 'tmp_cum_hr_a_prev_day',
    )
)

In [20]:
ballast_weights = {
    'batter': {
        # Outcome: Corresponding Rate & Stabilization Point
        'is_hit': {'rate': 'AVG', 'value': 910, 'unit': 'AB'}, # Overall Hit Rate = AVG
        'is_k': {'rate': 'K%', 'value': 60, 'unit': 'PA'}, # Strikeout Rate
        'is_bb': {'rate': 'BB%', 'value': 120, 'unit': 'PA'}, # Walk Rate
        'is_hbp': {'rate': 'HBP%', 'value': 240, 'unit': 'PA'}, # Hit By Pitch Rate
        'is_1b': {'rate': '1B%', 'value': 290, 'unit': 'PA'}, # Single Rate
        'is_2b': {'rate': '2B%', 'value': 1600, 'unit': 'PA'}, # No specific stabilization point found for 2B Rate alone
        'is_3b': {'rate': '3B%', 'value': 1600, 'unit': 'PA'}, # No specific stabilization point found for 3B Rate alone
        'is_hr': {'rate': 'HR%', 'value': 170, 'unit': 'PA'}, # Home Run Rate
    },
    'pitcher': {
        'is_hit': {'rate': 'AVG_A', 'value': 630, 'unit': 'BF'}, # Overall Hit Rate Allowed = AVG Against (using BF)
        'is_k': {'rate': 'K%_A', 'value': 70, 'unit': 'BF'}, # Strikeout Rate Against
        'is_bb': {'rate': 'BB%_A', 'value': 170, 'unit': 'BF'}, # Walk Rate Against
        'is_hbp': {'rate': 'HBP%_A', 'value': 640, 'unit': 'BF'}, # Hit By Pitch Rate Against
        'is_1b': {'rate': '1B%_A', 'value': 670, 'unit': 'BF'}, # Single Rate Against
        'is_2b': {'rate': '2B%_A', 'value': 1450, 'unit': 'BF'}, # No specific stabilization point found
        'is_3b': {'rate': '3B%_A', 'value': 1450, 'unit': 'BF'}, # No specific stabilization point found
        'is_hr': {'rate': 'HR%_A', 'value': 1320, 'unit': 'BF'}, # Home Run Rate Against (Note: high stabilization)
    },
}

In [21]:
# Calculate daily ballasted stats for batters
df_batter_daily = (
    df_batter_daily
    .with_columns([
        (((pl.col("cum_h_prev_day") + league_averages_2122['lg_avg'] * ballast_weights['batter']['is_hit']['value'])) /
        (pl.col("cum_ab_prev_day") + ballast_weights['batter']['is_hit']['value']))
        .alias("batter_avg_daily_input"),
        (((pl.col("cum_k_prev_day") + league_averages_2122['lg_k_pct'] * ballast_weights['batter']['is_k']['value'])) /
        (pl.col("cum_pa_prev_day") + ballast_weights['batter']['is_k']['value']))
        .alias("batter_k_pct_daily_input"),
        (((pl.col("cum_bb_prev_day") + league_averages_2122['lg_bb_pct']* ballast_weights['batter']['is_bb']['value']))/
        (pl.col("cum_pa_prev_day") + ballast_weights['batter']['is_bb']['value']))
        .alias("batter_bb_pct_daily_input"),
        (((pl.col("cum_hbp_prev_day") + league_averages_2122['lg_hbp_pct'] * ballast_weights['batter']['is_hbp']['value'])) /
        (pl.col("cum_pa_prev_day") + ballast_weights['batter']['is_hbp']['value']))
        .alias("batter_hbp_pct_daily_input"),
        (((pl.col("cum_1b_prev_day") + league_averages_2122['lg_1b_pct'] * ballast_weights['batter']['is_1b']['value'])) /
         (pl.col("cum_pa_prev_day") + ballast_weights['batter']['is_1b']['value']))
        .alias("batter_1b_pct_daily_input"),
        (((pl.col("cum_2b_prev_day") + league_averages_2122['lg_2b_pct'] * ballast_weights['batter']['is_2b']['value'])) /
         (pl.col("cum_pa_prev_day") + ballast_weights['batter']['is_2b']['value']))
        .alias("batter_2b_pct_daily_input"),
        (((pl.col("cum_3b_prev_day") + league_averages_2122['lg_3b_pct'] * ballast_weights['batter']['is_3b']['value'])) /
         (pl.col("cum_pa_prev_day") + ballast_weights['batter']['is_3b']['value']))
        .alias("batter_3b_pct_daily_input"),
        (((pl.col("cum_hr_prev_day") + league_averages_2122['lg_hr_pct'] * ballast_weights['batter']['is_hr']['value'])) /
         (pl.col("cum_pa_prev_day") + ballast_weights['batter']['is_hr']['value']))
        .alias("batter_hr_pct_daily_input"),
    ])
    .with_columns(
        batter_non_k_out_pct_daily_input = ( 1 - (pl.sum_horizontal('batter_k_pct_daily_input', 'batter_bb_pct_daily_input', 'batter_hbp_pct_daily_input',
                                    'batter_1b_pct_daily_input', 'batter_2b_pct_daily_input', 'batter_3b_pct_daily_input', 'batter_hr_pct_daily_input'))
        )
    )
)

# Calculate daily ballasted stats for pitchers
df_pitcher_daily = (
    df_pitcher_daily
    .with_columns([
        (((pl.col("cum_h_a_prev_day") + league_averages_2122['lg_avg'] * ballast_weights['pitcher']['is_hit']['value'])) /
        (pl.col("cum_ab_a_prev_day") + ballast_weights['pitcher']['is_hit']['value']))
        .alias("pitcher_avg_a_daily_input"),
        (((pl.col("cum_k_a_prev_day") + league_averages_2122['lg_k_pct'] * ballast_weights['pitcher']['is_k']['value'])) /
        (pl.col("cum_pa_a_prev_day") + ballast_weights['pitcher']['is_k']['value']))
        .alias("pitcher_k_pct_a_daily_input"),
        (((pl.col('cum_bb_a_prev_day')+ league_averages_2122['lg_bb_pct'] * ballast_weights['pitcher']['is_bb']['value'])) /
         (pl.col('cum_pa_a_prev_day') + ballast_weights['pitcher']['is_bb']['value']))
        .alias("pitcher_bb_pct_a_daily_input"),
        (((pl.col("cum_hbp_a_prev_day") + league_averages_2122['lg_hbp_pct'] * ballast_weights['pitcher']['is_hbp']['value'])) /
        (pl.col("cum_pa_a_prev_day") + ballast_weights['pitcher']['is_hbp']['value']))
        .alias("pitcher_hbp_pct_a_daily_input"),
        (((pl.col("cum_1b_a_prev_day") + league_averages_2122['lg_1b_pct'] * ballast_weights['pitcher']['is_1b']['value'])) /
         (pl.col("cum_pa_a_prev_day") + ballast_weights['pitcher']['is_1b']['value']))
        .alias("pitcher_1b_pct_a_daily_input"),
        (((pl.col("cum_2b_a_prev_day") + league_averages_2122['lg_2b_pct'] * ballast_weights['pitcher']['is_2b']['value'])) /
         (pl.col("cum_pa_a_prev_day") + ballast_weights['pitcher']['is_2b']['value']))
        .alias("pitcher_2b_pct_a_daily_input"),
        (((pl.col("cum_3b_a_prev_day") + league_averages_2122['lg_3b_pct'] * ballast_weights['pitcher']['is_3b']['value'])) /
         (pl.col("cum_pa_a_prev_day") + ballast_weights['pitcher']['is_3b']['value']))
        .alias("pitcher_3b_pct_a_daily_input"),
        (((pl.col("cum_hr_a_prev_day") + league_averages_2122['lg_hr_pct'] * ballast_weights['pitcher']['is_hr']['value'])) /
         (pl.col("cum_pa_a_prev_day") + ballast_weights['pitcher']['is_hr']['value']))
        .alias("pitcher_hr_pct_a_daily_input"),
    ])
    .with_columns(
        pitcher_non_k_out_pct_a_daily_input = ( 1 - (pl.sum_horizontal('pitcher_k_pct_a_daily_input', 'pitcher_bb_pct_a_daily_input', 'pitcher_hbp_pct_a_daily_input',
                                    'pitcher_1b_pct_a_daily_input', 'pitcher_2b_pct_a_daily_input', 'pitcher_3b_pct_a_daily_input', 'pitcher_hr_pct_a_daily_input'))
        )
    )
)

In [22]:
batter_cols = [col for col in df_batter_daily.columns if col.startswith('batter_')]
pitcher_cols = [col for col in df_pitcher_daily.columns if col.startswith('pitcher_')]

In [23]:
# Select only needed stats and join keys from daily stats dfs
batter_stats_to_join = (
    df_batter_daily
    .select([
        "batter",
        "game_date",
        *batter_cols,
    ])
)

pitcher_stats_to_join = (
    df_pitcher_daily
    .select([
        "pitcher",
        "game_date",
        *pitcher_cols,
    ])
)

# Join back to the main PA dataframe
df_pa_full = (
    df_pa_full
    .join(
        batter_stats_to_join,
        on=["batter", "game_date"],
        how="left"
        )
    .join(
        pitcher_stats_to_join,
        on=["pitcher", "game_date"],
        how="left"
        )
    .with_columns(
        is_platoon_adv = (
            pl.when((pl.col('stand') == 'L') & (pl.col('p_throws') == 'R')).then(pl.lit(1))
            .when((pl.col('stand') == 'R') & (pl.col('p_throws') == 'L')).then(pl.lit(1))
        .otherwise(pl.lit(0))
        ),
        is_batter_home =  (pl.when(pl.col("inning_topbot") == "Bot") # Bottom of inning means home team batting
            .then(pl.lit(1))
            .otherwise(pl.lit(0)) # Top of inning means away team batting
            .cast(pl.Int8) # Cast to integer
        )
    )
)

# Rename columns to final input names if needed
# e.g., .rename({"batter_avg_daily_input": "batter_avg_input"})

In [24]:
df_pa_full.write_parquet(f'{FILE_PATH}ballasted_statcast_data.parquet')

## Defensive Stats

In [None]:
df_pa_full = pl.read_parquet(f'{FILE_PATH}clean_statcast_data.parquet')


fielder_cols = [f"fielder_{i}" for i in range(2, 10)] # Fielder 2 (C) to Fielder 9 (RF)

df_long = df_pa_full.unpivot(
    index=["game_pk", "game_year", "inning"], # Columns to keep identifying the context
    on=fielder_cols, # Columns containing player IDs to be 'melted'
    variable_name="position_num_str", # New column for the original column name (e.g., 'fielder_2')
    value_name="player_id" # New column for the player ID value
)

df_player_innings = (
    df_long
    .filter(
        pl.col("player_id").is_not_null() # Ensure player_id is not missing
    )
    .unique(
        subset=["game_pk", "game_year", "inning", "player_id"]
    )
)

df_total_innings = (
    df_player_innings
    .group_by("player_id", "game_year")
    .agg(
        # Count the number of unique game-inning rows for each player-year combination
        pl.len().alias("total_innings_played")
    )
    .sort("player_id", "game_year")
)

In [None]:
df_defensive = pl.read_parquet(f'{FILE_PATH}statcast_defensive_stats.parquet')

In [None]:
unique_players = df_defensive["player_id"].unique()

df_all_players = pl.DataFrame({'player_id': unique_players})
df_all_years = pl.DataFrame({'year': [x for x in range(2021, 2026)]})

# Cross join to get every player paired with every target year
df_grid = df_all_players.join(df_all_years, how='cross').sort("player_id", "year")

df_full_history = (
    df_grid
    .join(
        df_defensive, # Your original data
        on=["player_id", "year"],
        how="left"
    )
    .select(
        'player_id',
        'year',
        'outs_above_average',
    )
    .join(
        df_total_innings,
        left_on=['player_id', 'year'],
        right_on=['player_id', 'game_year'],
        how='left',
    )
    .with_columns([
        pl.col("outs_above_average").fill_null(0),
        pl.col("total_innings_played").fill_null(0),
    ])
    .sort("player_id", "year")
)



df_final_cumulative = (
    df_full_history
    .with_columns(
        pl.col("outs_above_average").cum_sum().over("player_id").alias("cumulative_oaa_temp"),
        pl.col("total_innings_played").cum_sum().over("player_id").alias("cumulative_innings_temp")
    )
    .with_columns(
        pl.col("cumulative_oaa_temp").shift(1).over("player_id").fill_null(0).alias("cumulative_oaa_prior"),
        pl.col("cumulative_innings_temp").shift(1).over("player_id").fill_null(0).alias("cumulative_innings_prior")
    )
    .drop("cumulative_oaa_temp", "cumulative_innings_temp") # Drop temporary columns
    .with_columns(
        outs_above_average_per_inning = (pl.col("cumulative_oaa_prior") / pl.col("cumulative_innings_prior")).fill_nan(0),
    )

)

In [None]:
df_final_cumulative.write_parquet(f'{FILE_PATH}clean_defensive_stats.parquet')

## Park Factors

In [None]:
park_factors = pl.read_parquet(f'{FILE_PATH}statcast_park_factors.parquet')

In [None]:
clean_park_factors = (
    park_factors
    .select(
        'venue_id',
        'venue_name',
        'main_team_id',
        'name_display_club',
        'metric_value_2025',
        'metric_value_2024',
        'metric_value_2023',
        'metric_value_2022',
        'metric_value_2021',
    )
    .unpivot(
        index=['venue_id', 'venue_name', 'main_team_id', 'name_display_club'],
        on=['metric_value_2025', 'metric_value_2024', 'metric_value_2023', 'metric_value_2022', 'metric_value_2021'],
        variable_name='year',
        value_name='park_factor'
    )
    .with_columns(
        year = pl.col('year').str.replace('metric_value_', '').cast(pl.Int64),
        park_factor = pl.col('park_factor').cast(pl.Float64),
    )
)

In [None]:
clean_park_factors.write_parquet(f'{FILE_PATH}clean_park_factors.parquet')