<a href="https://colab.research.google.com/github/mherskovitz/Baseball/blob/main/z_score_v2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
"""
Baseball Player Analysis
------------------------
This script analyzes baseball player performance data, normalizing statistics by position
and calculating aggregate offensive value scores.

Functions:
- standardize_position: Converts position data to standardized format
- aggregate_player_data: Groups and aggregates statistics by player
- add_baseball_stats: Calculates OBP, SLG, and OPS
- add_position_z_scores: Calculates position-specific z-scores
- add_aggregate_z_score: Creates a composite offensive value metric

The main function reads data and executes the analysis pipeline.
"""

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score
import os
import time


def standardize_position(pos):
    """
    Standardize player positions by taking the last position played
    and combining outfield positions into 'OF'.

    Parameters:
    pos (str): Position string from raw data

    Returns:
    str: Standardized position
    """
    # Get the last position in the string
    pos = str(pos)
    last_pos = pos.split()[-1]

    # Convert outfield positions to OF
    if last_pos in ['RF', 'LF', 'CF']:
        return 'OF'
    else:
        return last_pos


def aggregate_player_data(df, position_col='Std_Pos'):
    """
    Group data by player and aggregate statistics.

    Parameters:
    df (pandas.DataFrame): DataFrame with player statistics and positions
    position_col (str): Column name containing position information

    Returns:
    pandas.DataFrame: Aggregated statistics by player
    """
    # Define columns to sum
    sum_columns = [
        'PA', 'AB', 'R', 'H', '1B', '2B', '3B', 'HR',
        'RBI', 'BB', 'SB', 'HBP', 'IBB'
    ]

    # Create aggregation dictionary
    agg_dict = {col: 'sum' for col in sum_columns if col in df.columns}
    agg_dict[position_col] = 'first'  # Take the first standardized position

    # Group by player and aggregate
    aggregated = df.groupby('Player').agg(agg_dict).reset_index()

    return aggregated


def add_baseball_stats(df):
    """
    Add OBP, SLG, and OPS columns to a baseball dataframe.

    Parameters:
    df (pandas.DataFrame): DataFrame with baseball statistics

    Returns:
    pandas.DataFrame: DataFrame with added rate statistics
    """
    # Create a copy to avoid modifying the original
    result = df.copy()

    # Calculate OBP (On Base Percentage)
    # OBP = (H + BB + HBP + IBB) / (AB + BB + HBP + IBB)
    denominator = result['AB'] + result['BB'] + result['HBP'] + result['IBB']
    result['OBP'] = np.where(
        denominator == 0,
        0,  # Handle division by zero
        (result['H'] + result['BB'] + result['HBP'] + result['IBB']) / denominator
    )

    # Calculate SLG (Slugging Percentage)
    # SLG = (1B + 2*2B + 3*3B + 4*HR) / AB
    result['SLG'] = np.where(
        result['AB'] == 0,
        0,  # Handle division by zero
        (result['1B'] + 2*result['2B'] + 3*result['3B'] + 4*result['HR']) / result['AB']
    )

    # Calculate OPS (On-base Plus Slugging)
    result['OPS'] = result['OBP'] + result['SLG']

    return result


def add_position_z_scores(df, pos_column='Std_Pos', stats_to_normalize=None, min_players=5):
    """
    Add z-scores for specified statistics grouped by position.

    Parameters:
    df (pandas.DataFrame): DataFrame with baseball statistics
    pos_column (str): Column name containing position information
    stats_to_normalize (list): List of statistics to calculate z-scores for
    min_players (int): Minimum number of players required for position to be analyzed

    Returns:
    pandas.DataFrame: DataFrame with added z-score columns
    """
    if stats_to_normalize is None:
        stats_to_normalize = ['H', 'R', 'HR', 'RBI', 'SB', 'OPS']

    # Create a copy to avoid modifying the original
    result = df.copy()

    # Get valid positions (those with enough players)
    position_counts = result[pos_column].value_counts()
    valid_positions = position_counts[position_counts >= min_players].index.tolist()

    if len(valid_positions) < len(position_counts):
        excluded = set(position_counts.index) - set(valid_positions)
        print(f"\nExcluding positions with fewer than {min_players} players: {', '.join(excluded)}")

    # Initialize z-score columns
    for stat in stats_to_normalize:
        result[f'{stat}_z'] = np.nan

    # Calculate z-scores for each valid position and each statistic
    for pos in valid_positions:
        # Get players at this position
        pos_players = result[pos_column] == pos

        # Calculate z-score for each specified statistic
        for stat in stats_to_normalize:
            # Get values for this position
            values = result.loc[pos_players, stat]

            # Calculate mean and standard deviation
            mean = values.mean()
            std = values.std()

            # Skip if standard deviation is 0 or NaN
            if std == 0 or np.isnan(std):
                print(f"Warning: Zero or NaN standard deviation for {stat} at position {pos}. Skipping z-score calculation.")
                continue

            # Calculate z-scores
            result.loc[pos_players, f'{stat}_z'] = (values - mean) / std

    return result

def add_aggregate_z_score(df, stats_to_include=None, weights=None):
    """
    Create an aggregate z-score from multiple individual z-scores.

    Parameters:
    df (pandas.DataFrame): DataFrame with calculated z-scores
    stats_to_include (list): List of z-score columns to include in the aggregate
    weights (dict): Optional dictionary of weights for each statistic

    Returns:
    pandas.DataFrame: DataFrame with added aggregate z-score column
    """
    if stats_to_include is None:
        stats_to_include = ['H_z', 'R_z', 'HR_z', 'RBI_z', 'SB_z', 'OPS_z']

    # Create a copy to avoid modifying the original
    result = df.copy()

    # Check if all required z-score columns exist
    missing_cols = [col for col in stats_to_include if col not in result.columns]
    if missing_cols:
        print(f"Warning: Missing z-score columns: {missing_cols}")
        # Only use available columns
        stats_to_include = [col for col in stats_to_include if col in result.columns]

    if not weights:
        # Simple average of z-scores
        result['Aggregate_z'] = result[stats_to_include].mean(axis=1)
    else:
        # Check for missing columns in weights
        available_weights = {col: weight for col, weight in weights.items() if col in result.columns}

        # Normalize weights to sum to 1
        total_weight = sum(available_weights.values())
        normalized_weights = {col: weight/total_weight for col, weight in available_weights.items()}

        # Calculate weighted aggregate z-score
        result['Aggregate_z'] = sum(result[col] * weight for col, weight in normalized_weights.items())

    return result


def print_top_players(df, metric='Aggregate_z', n=5, by_position=True, min_players=5):
    """
    Print top players overall or by position based on a specified metric.

    Parameters:
    df (pandas.DataFrame): DataFrame with player statistics
    metric (str): Column name of the metric to sort by
    n (int): Number of top players to display
    by_position (bool): Whether to group by position
    min_players (int): Minimum number of players required for position to be displayed
    """
    display_cols = ['Player', 'Std_Pos', 'PA', 'OPS', 'HR', 'RBI', 'SB', metric]

    if not by_position:
        # Display top n players overall
        print(f"\nTop {n} Players Overall by {metric}:")
        top_overall = df.sort_values(metric, ascending=False).head(n)
        print(top_overall[display_cols].to_string(index=False))
    else:
        # Display top n players by position
        print(f"\nTop {n} Players by {metric} for Each Position:")
        # Get positions with enough players
        position_counts = df['Std_Pos'].value_counts()
        valid_positions = position_counts[position_counts >= min_players].index.tolist()

        for pos in valid_positions:
            pos_players = df[df['Std_Pos'] == pos]
            print(f"\n{pos} Position:")
            top_players = pos_players.sort_values(metric, ascending=False).head(n)
            print(top_players[display_cols].to_string(index=False))

def print_position_top_performers(df, stats=None, min_players=5):
    """
    Print top performer for each statistic by position.

    Parameters:
    df (pandas.DataFrame): DataFrame with player statistics and z-scores
    stats (list): List of statistics to display top performers for
    min_players (int): Minimum number of players required for position to be displayed
    """
    if stats is None:
        stats = ['OPS', 'HR', 'RBI', 'R', 'SB', 'H']

    print("\nTop performers for each statistic by position:")

    # Get positions with enough players
    position_counts = df['Std_Pos'].value_counts()
    valid_positions = position_counts[position_counts >= min_players].index.tolist()
#add new code here
    for pos in valid_positions:
        pos_players = df[df['Std_Pos'] == pos]
        print(f"\n{pos} Position Top Performers:")

        # Create empty dataframe for top performers
        top_performers = pd.DataFrame(columns=['Statistic', 'Player', 'Value', 'Z-Score'])

        # Find top performer for each statistic
        for stat in stats:
            # Check if z-score column exists
            z_col = f'{stat}_z'
            if z_col in df.columns:
                top_player = pos_players.sort_values(z_col, ascending=False).iloc[0]
#end of code block being replaced
                new_row = {
                    'Statistic': stat,
                    'Player': top_player['Player'],
                    'Value': top_player[stat],
                    'Z-Score': top_player[z_col]
                }
                top_performers = pd.concat([top_performers, pd.DataFrame([new_row])], ignore_index=True)

        print(top_performers.to_string(index=False, float_format=lambda x: f"{x:.2f}" if isinstance(x, float) else x))

def fix_encoding(name):
    """
    Fix encoding issues in player names with non-ASCII characters.

    Parameters:
    name (str): Player name that might have encoding issues

    Returns:
    str: Name with corrected encoding
    """
    try:
        # This often fixes mojibake from latin-1/utf-8 mismatches
        return name.encode('latin-1').decode('utf-8')
    except:
        return name  # If it fails, return the original
def get_date_range(df):
    """
    Extract the date range from the original dataframe.

    Parameters:
    df (pandas.DataFrame): Original dataframe with 'Date' column

    Returns:
    tuple: (start_date, end_date) as strings
    """
    try:
        # Check if the Date column exists
        if 'Date' not in df.columns:
            print("Warning: 'Date' column not found in dataframe.")
            return None, None

        # Print a sample date value to help debugging
        sample_date = df['Date'].iloc[0] if not df['Date'].empty else None
        if sample_date:
            print(f"Sample date value: {sample_date}")

        # Try to convert using a format that matches "4/2/24" (M/D/YY)
        df['Date'] = pd.to_datetime(df['Date'], format='%m/%d/%y', errors='coerce')

        # Check if conversion was successful
        if df['Date'].isna().all():
            print("Warning: Could not convert dates with '%m/%d/%y' format.")
            # Try with different format
            df['Date'] = pd.to_datetime(df['Date'], format='%m/%d/%Y', errors='coerce')

            if df['Date'].isna().all():
                print("Warning: Could not convert dates with '%m/%d/%Y' format.")
                # Final fallback with no format specified
                df['Date'] = pd.to_datetime(df['Date'], errors='coerce')

        # Get the min and max dates
        if df['Date'].isna().all():
            print("Warning: All date values are NaN after conversion.")
            return None, None

        start_date = df['Date'].min().strftime('%Y-%m-%d')
        end_date = df['Date'].max().strftime('%Y-%m-%d')

        return start_date, end_date
    except Exception as e:
        print(f"Warning: Could not determine date range. Error: {e}")
        return None, None


def export_to_csv(df, date_range=None, filename='baseball_analysis_results.csv', top_n=10):
    """
    Export the analysis results to a CSV file with date range information.

    Parameters:
    df (pandas.DataFrame): DataFrame with analysis results
    date_range (tuple): (start_date, end_date) tuple
    filename (str): Name of the output CSV file
    top_n (int): Number of top players to include for each position/category

    Returns:
    str: Path to the saved file
    """
    try:
        # Create a copy of the DataFrame for export
        export_df = df.copy()

        # Create a temporary file with date range information
        temp_filename = 'temp_' + filename

        # Write the date range as the first line(s)
        with open(temp_filename, 'w', encoding='utf-8') as f:
            f.write("Baseball Analysis Results\n")

            if date_range and date_range[0] and date_range[1]:
                f.write(f"From {date_range[0]} to {date_range[1]}\n\n")
            else:
                f.write("\n")

            # Write info about the number of players shown
            f.write(f"Showing top {top_n} players for each category\n\n")

        # Get top players overall
        top_overall = export_df.sort_values('Aggregate_z', ascending=False).head(top_n)

        # Filter the export to include only the top players by position
        position_top_players = pd.DataFrame()
        for pos in export_df['Std_Pos'].unique():
            pos_players = export_df[export_df['Std_Pos'] == pos]
            if len(pos_players) >= 3:  # Only include positions with at least 3 players
                top_pos = pos_players.sort_values('Aggregate_z', ascending=False).head(top_n)
                position_top_players = pd.concat([position_top_players, top_pos])

        # Combine overall top with position tops (may have duplicates)
        combined_top = pd.concat([top_overall, position_top_players]).drop_duplicates()

        # Append the filtered DataFrame to the file
        combined_top.to_csv(temp_filename, index=False, mode='a')

        # Rename the file to the final filename
        import os
        if os.path.exists(filename):
            os.remove(filename)
        os.rename(temp_filename, filename)

        print(f"Results successfully exported to {filename} (showing top {top_n} players)")
        return filename
    except Exception as e:
        print(f"Error exporting to CSV: {e}")
        return None


def export_to_html(df, date_range=None, filename='baseball_analysis_results.html', top_n=10):
    """
    Export the analysis results to an HTML file with formatting.

    Parameters:
    df (pandas.DataFrame): DataFrame with analysis results
    date_range (tuple): (start_date, end_date) tuple
    filename (str): Name of the output HTML file
    top_n (int): Number of top players to include for each position/category

    Returns:
    str: Path to the saved file
    """
    try:
        # Create a copy with rounded values for better display
        display_df = df.copy()
        for col in display_df.columns:
            if display_df[col].dtype in [np.float64, np.float32]:
                display_df[col] = display_df[col].round(3)

        # Begin HTML content
        html_content = """
        <!DOCTYPE html>
        <html>
        <head>
            <title>Baseball Player Analysis</title>
            <style>
                body { font-family: Arial, sans-serif; margin: 20px; }
                h1, h2, h3 { color: #333; }
                h1 { margin-bottom: 5px; }
                .date-range { font-size: 1.2em; margin-bottom: 10px; color: #555; }
                .display-info { font-size: 1em; margin-bottom: 20px; color: #555; font-style: italic; }
                table { border-collapse: collapse; margin: 15px 0; width: 100%; }
                th { background-color: #f2f2f2; }
                th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
                tr:nth-child(even) { background-color: #f9f9f9; }
                .positive { color: green; }
                .negative { color: red; }
                .header { margin-bottom: 20px; }
                .section { margin-top: 30px; }
            </style>
        </head>
        <body>
            <div class="header">
                <h1>Baseball Player Analysis Results</h1>
        """

        # Add date range
        if date_range and isinstance(date_range, tuple) and len(date_range) == 2:
            start_date, end_date = date_range
            if start_date and end_date:
                html_content += f'<p class="date-range">From {start_date} to {end_date}</p>'

        # Add info about number of players shown
        html_content += f'<p class="display-info">Showing top {top_n} players for each category</p>'

        html_content += """
            </div>
        """

        # Top N overall players
        html_content += f'<div class="section"><h2>Top {top_n} Players by Aggregate Z-Score</h2>'
        top_overall = display_df.sort_values('Aggregate_z', ascending=False).head(top_n)
        display_cols = ['Player', 'Std_Pos', 'PA', 'OPS', 'HR', 'RBI', 'SB', 'Aggregate_z']
        html_content += top_overall[display_cols].to_html(index=False, classes='dataframe')
        html_content += '</div>'

        # Top players by position
        html_content += '<div class="section"><h2>Top Players by Position</h2>'

        position_counts = display_df['Std_Pos'].value_counts()
        valid_positions = position_counts[position_counts >= 5].index.tolist()

        for pos in valid_positions:
            pos_players = display_df[display_df['Std_Pos'] == pos]
            html_content += f'<div class="position-section"><h3>{pos} Position</h3>'
            top_players = pos_players.sort_values('Aggregate_z', ascending=False).head(top_n)
            html_content += top_players[display_cols].to_html(index=False, classes='dataframe')
            html_content += '</div>'

        html_content += '</div>'

        # Close the HTML tags
        html_content += """
        </body>
        </html>
        """

        # Write to file
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(html_content)

        print(f"Results successfully exported to {filename} (showing top {top_n} players)")
        return filename

    except Exception as e:
        print(f"Error exporting to HTML: {e}")
        return None


def main(display_count=10):
    """
    Main function to run the baseball analysis pipeline.

    Parameters:
    display_count (int): Number of top players to show in reports

    Returns:
    pandas.DataFrame: The final analyzed dataframe
    """
    # Setup Google Drive path if needed
    from google.colab import drive
    drive.mount('/content/drive')
    os.chdir('/content/drive/MyDrive/CSV/Baseball')

    # Read the CSV file
    print("Reading data...")
    batting = pd.read_csv('/content/drive/MyDrive/CSV/Baseball/batting_ending_may3_test.csv', encoding='latin-1')
    print(f"Raw data shape: {batting.shape}")

 # Get date range from original data with debug output
    print("Extracting date range...")

    # Check if 'Date' column exists
    if 'Date' in batting.columns:
        print(f"Date column found with {batting['Date'].nunique()} unique values")
        print(f"First few date values: {batting['Date'].head().tolist()}")
    else:
        print("Warning: 'Date' column not found in the dataframe")

    date_range = get_date_range(batting)
    print(f"Extracted date range: {date_range}")

    if date_range and date_range[0] and date_range[1]:
        print(f"\nData covers period from {date_range[0]} to {date_range[1]}")
    else:
        print("\nWarning: Could not determine date range from the data")
    # Get date range from original data
    date_range = get_date_range(batting)
    if date_range[0] and date_range[1]:
        print(f"\nData covers period from {date_range[0]} to {date_range[1]}")

    # Fix encoding in player names
    print("Fixing encoding issues in player names...")
    batting['Player'] = batting['Player'].apply(fix_encoding)


    # Apply position standardization
    print("\nStandardizing positions...")
    batting['Std_Pos'] = batting['Pos'].apply(standardize_position)

    # Display position distribution
    print("Standardized Position Distribution:")
    std_pos_counts = batting['Std_Pos'].value_counts()
    print(std_pos_counts)

    # Aggregate player data
    print("\nAggregating player statistics...")
    aggregated_batting = aggregate_player_data(batting)
    print(f"Aggregated data shape: {aggregated_batting.shape}")
    print("Sample of aggregated data:")
    print(aggregated_batting.head())

    # Calculate advanced batting statistics
    print("\nCalculating OBP, SLG, and OPS...")
    stats_batting = add_baseball_stats(aggregated_batting)
    print("Sample of calculated stats:")
    print(stats_batting[['Player', 'Std_Pos', 'OBP', 'SLG', 'OPS']].head())

 # Define minimum players per position
    min_players_threshold = 5

    # Calculate position-specific z-scores
    print("\nCalculating z-scores by position...")
    normalized_batting = add_position_z_scores(stats_batting, min_players=min_players_threshold)

    # Calculate position-specific z-scores
    print("\nCalculating z-scores by position...")
    normalized_batting = add_position_z_scores(stats_batting)
    print("Sample of normalized data:")
    print(normalized_batting[['Player', 'Std_Pos', 'OPS', 'OPS_z']].head())

    # Calculate aggregate offensive value
    print("\nCalculating aggregate offensive value...")
    # Optional: define custom weights for the aggregate score
    # weights = {'H_z': 0.1, 'R_z': 0.15, 'HR_z': 0.2, 'RBI_z': 0.2, 'SB_z': 0.05, 'OPS_z': 0.3}
    final_batting = add_aggregate_z_score(normalized_batting)
    print("Sample of final data with aggregate score:")
    print(final_batting[['Player', 'Std_Pos', 'OPS', 'Aggregate_z']].head())

   # Print results
    print_top_players(final_batting, n=20, by_position=False)
    print_top_players(final_batting, n=8, by_position=True, min_players=min_players_threshold)
    print_position_top_performers(final_batting, min_players=min_players_threshold)

   # Export results with date range and specified player count
    print(f"\nExporting results (showing top {display_count} players)...")
    export_to_csv(final_batting, date_range=date_range, top_n=display_count)
    export_to_html(final_batting, date_range=date_range, top_n=display_count)

    return final_batting

if __name__ == "__main__":
    # Define how many top players to show in exports
    players_to_display = 10  # Change this value to show more or fewer players

    # Execute the analysis pipeline
    result_df = main(display_count=players_to_display)

Mounted at /content/drive
Reading data...
Raw data shape: (9817, 31)
Extracting date range...
Date column found with 42 unique values
First few date values: [nan, '4/26/25', '4/20/25', '4/15/25', '4/19/25']
Sample date value: nan
Extracted date range: ('2025-03-18', '2025-05-03')

Data covers period from 2025-03-18 to 2025-05-03
Sample date value: NaT

Data covers period from 2025-03-18 to 2025-05-03
Fixing encoding issues in player names...

Standardizing positions...
Standardized Position Distribution:
Std_Pos
OF     3267
DH     1092
1B     1066
C      1057
3B     1056
2B     1043
SS      997
PH      189
PR       48
nan       1
P         1
Name: count, dtype: int64

Aggregating player statistics...
Aggregated data shape: (479, 15)
Sample of aggregated data:
           Player     PA     AB     R     H    1B   2B   3B    HR   RBI    BB  \
0     Aaron Judge  148.0  125.0  31.0  54.0  33.0  8.0  2.0  11.0  33.0  21.0   
1    Aaron Schunk   21.0   20.0   1.0   5.0   4.0  1.0  0.0   0.0   

  top_performers = pd.concat([top_performers, pd.DataFrame([new_row])], ignore_index=True)
  top_performers = pd.concat([top_performers, pd.DataFrame([new_row])], ignore_index=True)
  top_performers = pd.concat([top_performers, pd.DataFrame([new_row])], ignore_index=True)
  top_performers = pd.concat([top_performers, pd.DataFrame([new_row])], ignore_index=True)
  top_performers = pd.concat([top_performers, pd.DataFrame([new_row])], ignore_index=True)
  top_performers = pd.concat([top_performers, pd.DataFrame([new_row])], ignore_index=True)
  top_performers = pd.concat([top_performers, pd.DataFrame([new_row])], ignore_index=True)
  top_performers = pd.concat([top_performers, pd.DataFrame([new_row])], ignore_index=True)


Results successfully exported to baseball_analysis_results.html (showing top 10 players)
