Imports

In [3]:
import pandas as pd
from pybaseball import statcast

Default Variables

In [4]:
start_date = '2023-04-01'
end_date = '2023-09-30'
minPA = 900
minIP = 1150

Run query to grab Data

In [None]:
data = statcast(start_dt=start_date, end_dt=end_date)
selected_columns = [
    'pitch_type', 'release_speed', 'batter', 'events', 
    'plate_x', 'plate_z', 'description', 'game_date', 'inning', 'pitcher',
    'balls', 'strikes'
]
data = data[selected_columns]

# Group by batter for PA and pitcher for IP
pa_counts = data.groupby('batter').size().reset_index(name='PA')
ip_counts = data.groupby('pitcher')['inning'].nunique().reset_index(name='IP')
# Merge PA and IP counts back to the main dataset
data = data.merge(pa_counts, on='batter', how='left')
data = data.merge(ip_counts, on='pitcher', how='left')
# Filter data by minPA and minIPp
data = data[(data['PA'] >= minPA) & (data['IP'] >= minIP)]

  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)


In [4]:
# Define success events for analysis
success_events = ['single', 'double', 'triple', 'home_run', 'walk']
data['success'] = data['events'].isin(success_events).astype(int)

Success Calculation Functions

In [5]:
# Function to analyze scenarios based on count
def analyze_scenario(balls, strikes):
    scenario_data = data[(data['balls'] == balls) & (data['strikes'] == strikes)]
    if scenario_data.empty:
        print(f"No data available for count {balls}-{strikes}.")
        return

    # Group by pitch type
    analysis = scenario_data.groupby('pitch_type').agg(
        total_pitches=('pitch_type', 'size'),
        successes=('success', 'sum')
    ).reset_index()

    # Calculate success rates
    analysis['success_rate'] = analysis['successes'] / analysis['total_pitches']

    print(f"Analysis for count {balls}-{strikes}:")
    print(analysis)

In [7]:
# Function to analyze success rate for a specific pitch sequence
def analyze_pitch_sequence(sequence):
    sequence_length = len(sequence)

    # Identify rows with the given pitch sequence using a custom approach
    data['sequence_match'] = data['pitch_type'].shift(0).eq(sequence[0])
    for i in range(1, sequence_length):
        data['sequence_match'] &= data['pitch_type'].shift(-i).eq(sequence[i])

    # Filter data for matching sequences
    sequence_data = data[data['sequence_match']]

    if sequence_data.empty:
        print(f"No data available for pitch sequence {sequence}.")
        return

    # Group by final pitch type in the sequence
    analysis = sequence_data.groupby('pitch_type').agg(
        total_pitches=('pitch_type', 'size'),
        successes=('success', 'sum')
    ).reset_index()

    # Calculate success rates
    analysis['success_rate'] = analysis['successes'] / analysis['total_pitches']

    print(f"Analysis for pitch sequence {sequence}:")
    print(analysis)

In [None]:
def analyze_scenario_and_sequence(balls, strikes, sequence):
    # Filter by count
    scenario_data = data[(data['balls'] == balls) & (data['strikes'] == strikes)]
    if scenario_data.empty:
        print(f"No data available for count {balls}-{strikes}.")
        return

    # Identify rows with the given pitch sequence
    sequence_length = len(sequence)
    scenario_data['sequence_match'] = False

    for idx in range(len(scenario_data) - sequence_length + 1):
        # Check if the sequence matches
        if list(scenario_data['pitch_type'].iloc[idx:idx + sequence_length]) == sequence:
            scenario_data.iloc[idx + sequence_length - 1, scenario_data.columns.get_loc('sequence_match')] = True

    # Filter data for matching sequences
    sequence_data = scenario_data[scenario_data['sequence_match']]

    if sequence_data.empty:
        print(f"No data available for pitch sequence {sequence} at count {balls}-{strikes}.")
        return

    # Group by final pitch type in the sequence
    analysis = sequence_data.groupby('pitch_type').agg(
        total_pitches=('pitch_type', 'size'),
        successes=('success', 'sum')
    ).reset_index()

    # Calculate success rates
    analysis['success_rate'] = analysis['successes'] / analysis['total_pitches']

    print(f"Analysis for pitch sequence {sequence} at count {balls}-{strikes}:")
    print(analysis)

Generates Default Data CSV

In [None]:
import pandas as pd
from pybaseball import statcast

def create_filtered_mlb_csv(output_csv='mlb_2024_filtered_data.csv'):
    # Define date range for the last MLB season
    start_date = '2024-04-01'
    end_date = '2024-10-01'

    print("Fetching MLB data for the 2024 season...")
    data = statcast(start_dt=start_date, end_dt=end_date)
    print(f"Data fetched. Shape: {data.shape}")

    # Select relevant columns
    selected_columns = [
        'pitch_type', 'release_speed', 'batter', 'events', 
        'plate_x', 'plate_z', 'description', 'game_date', 'inning', 'pitcher',
        'balls', 'strikes'
    ]
    data = data[selected_columns]
    print(f"Data after selecting relevant columns. Shape: {data.shape}")

    # Filter by pitch types
    valid_pitch_types = ['FB', 'CH', 'SL', 'CB', 'SI', 'CU']
    data = data[data['pitch_type'].isin(valid_pitch_types)]
    print(f"Data after filtering by pitch types. Shape: {data.shape}")

    # Calculate total innings pitched (IP) for each pitcher
    print("Calculating total innings pitched (IP) for each pitcher...")
    data['inning'] = pd.to_numeric(data['inning'], errors='coerce')  # Ensure 'inning' is numeric
    ip_totals = data.groupby('pitcher')['inning'].sum().reset_index(name='IP')
    print(f"Total innings pitched calculated. Example:\n{ip_totals.head()}")

    # Merge IP totals back into the main dataset
    data = data.merge(ip_totals, on='pitcher', how='left')
    print(f"Data after adding total IP. Shape: {data.shape}")

    # Calculate plate appearances (PA) for batters
    pa_counts = data.groupby('batter').size().reset_index(name='PA')
    batters = data.merge(pa_counts, on='batter', how='left')
    print(f"Total batters with PA calculated. Shape: {batters.shape}")

    # Determine the 80th percentile thresholds
    pa_80th_percentile = batters['PA'].quantile(0.8)
    ip_80th_percentile = data['IP'].quantile(0.8)
    print(f"80th Percentile for PA: {pa_80th_percentile}, IP: {ip_80th_percentile}")

    # Filter top 80th percentile for batters and pitchers
    top_batters = batters[batters['PA'] >= pa_80th_percentile]
    top_pitchers = data[data['IP'] >= ip_80th_percentile]
    print(f"Top 80th percentile batters: {len(top_batters)}, pitchers: {len(top_pitchers)}")

    # Combine filtered batter and pitcher data
    filtered_data = pd.concat([top_batters, top_pitchers]).drop_duplicates()
    print(f"Combined rows after filtering: {len(filtered_data)}")

    # Save the filtered data to a CSV
    filtered_data.to_csv(output_csv, index=False)
    print(f"Filtered data saved to {output_csv}")

# Call the function to create the CSV
create_filtered_mlb_csv(output_csv='mlb_2024_filtered_data.csv')


Test Cases

In [None]:
# Example: Analyze a pitch sequence ['FF', 'CH']
analyze_pitch_sequence(['FF', 'CH'])

In [None]:
# Example: Analyze 3-2 count
analyze_scenario(3, 2)

In [None]:
# Example: Analyze 3-2 count with a pitch sequence ['FF', 'CH']
analyze_scenario_and_sequence(3, 2, ['FF', 'CH'])