In [1]:
"""
MLB Trade Simulator - Value Determination Module
Author: Niels Christoffersen
Version: 1.0
Last Updated: 12/23/2024

This module calculates player values based on WAR projections and contract status.
It handles data loading, cleaning, and value calculations for MLB players.
"""

# Standard library imports
import os
from pathlib import Path
from typing import List, Dict, Optional

# Third-party imports
import pandas as pd
import numpy as np
from pandas import DataFrame
import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Type aliases
PathLike = str | Path

# Global constants
ROOT_DIR = Path(os.getcwd()).parent  # Changed from Path(__file__).parent.parent
DATA_DIR = ROOT_DIR / 'data'
GENERATED_DIR = DATA_DIR / 'generated'
OUTPUT_DIR = GENERATED_DIR / 'value_by_year'
HITTER_COLUMNS = [
    'Name', 'Age', 'IDfg', 'BB%', 'K%', 'AVG', 'OBP', 'SLG', 'wOBA', 
    'wRC+', 'EV', 'Off', 'BsR', 'Def', 'WAR'
]

PITCHER_COLUMNS = [
    'Name', 'Age', 'IDfg', 'FIP', 'SIERA', 'K%', 'BB%', 'GB%', 'FB%',
    'Stuff+', 'Location+', 'Pitching+', 'FBv', 'WAR'
]
# Ensure required directories exist
for directory in [DATA_DIR, GENERATED_DIR, OUTPUT_DIR]:
    directory.mkdir(parents=True, exist_ok=True)
    logger.debug(f"Verified directory exists: {directory}")

logger.info("Initialized MLB Trade Simulator value determination module")

2025-01-02 17:06:52,409 - INFO - Initialized MLB Trade Simulator value determination module


In [2]:
# Constants
PREDICTION_YEARS = range(2025, 2040)
REQUIRED_COLUMNS = {
    'predictions': ['Name', 'IDfg', 'Age', 'WAR'],
    'salary': ['Name', 'IDfg', 'Salary', 'Contract_Status']
}

def validate_files_exist(pattern: str, years: range) -> None:
    """Validate prediction files exist for given years."""
    missing_files = [
        f"{pattern}_{year}.csv" 
        for year in years 
        if not (GENERATED_DIR / f"{pattern}_{year}.csv").exists()
    ]
    if missing_files:
        raise FileNotFoundError(f"Missing files: {', '.join(missing_files)}")

def load_prediction_files(pattern: str, years: range = PREDICTION_YEARS) -> DataFrame:
    """Load and combine prediction CSVs with validation."""
    validate_files_exist(pattern, years)
    dfs = []
    
    for year in years:
        file_path = GENERATED_DIR / f"{pattern}_{year}.csv"
        logger.info(f"Loading {file_path}")
        
        try:
            df = pd.read_csv(file_path)
            missing_cols = set(REQUIRED_COLUMNS['predictions']) - set(df.columns)
            if missing_cols:
                raise ValueError(f"Missing columns in {file_path}: {missing_cols}")
                
            df['prediction_year'] = year
            dfs.append(df)
            
        except Exception as e:
            logger.error(f"Error loading {file_path}: {str(e)}")
            raise
    
    return pd.concat(dfs, ignore_index=True)

# Main data loading
try:
    sp_data = load_prediction_files('SP_Predictions')
    rp_data = load_prediction_files('RP_Predictions')
    batter_data = load_prediction_files('Batter_Predictions')
    salary_data = pd.read_csv(DATA_DIR / 'SPORTRAC_MLB_SALARY_DATA.csv')
    
    logger.info(f"Loaded {len(sp_data)} SP, {len(rp_data)} RP, "
                f"{len(batter_data)} batter, and {len(salary_data)} salary records")
    
except Exception as e:
    logger.error(f"Critical error during data loading: {str(e)}")
    raise

2025-01-02 17:06:52,447 - INFO - Loading c:\Users\User\Desktop\MLBTradeSim\data\generated\SP_Predictions_2025.csv
2025-01-02 17:06:52,458 - INFO - Loading c:\Users\User\Desktop\MLBTradeSim\data\generated\SP_Predictions_2026.csv
2025-01-02 17:06:52,464 - INFO - Loading c:\Users\User\Desktop\MLBTradeSim\data\generated\SP_Predictions_2027.csv
2025-01-02 17:06:52,470 - INFO - Loading c:\Users\User\Desktop\MLBTradeSim\data\generated\SP_Predictions_2028.csv
2025-01-02 17:06:52,476 - INFO - Loading c:\Users\User\Desktop\MLBTradeSim\data\generated\SP_Predictions_2029.csv
2025-01-02 17:06:52,483 - INFO - Loading c:\Users\User\Desktop\MLBTradeSim\data\generated\SP_Predictions_2030.csv
2025-01-02 17:06:52,490 - INFO - Loading c:\Users\User\Desktop\MLBTradeSim\data\generated\SP_Predictions_2031.csv
2025-01-02 17:06:52,498 - INFO - Loading c:\Users\User\Desktop\MLBTradeSim\data\generated\SP_Predictions_2032.csv
2025-01-02 17:06:52,506 - INFO - Loading c:\Users\User\Desktop\MLBTradeSim\data\generate

Group Positions, and merge data

In [3]:
"""
Position Grouping and Data Merging
- Groups player positions into SP/RP/POS categories
- Merges prediction datasets
- Validates data quality
- Provides summary statistics
"""

# Add position grouping
sp_data['position_group'] = 'SP'
rp_data['position_group'] = 'RP'
batter_data['position_group'] = 'POS'

def merge_prediction_data(sp_df, rp_df, batter_df):
    """Merge prediction datasets with validation."""
    required_cols = ['Name', 'IDfg', 'position_group', 'Age', 'prediction_year', 'WAR']
    
    # Validate columns exist
    for df, name in [(sp_df, 'SP'), (rp_df, 'RP'), (batter_df, 'Batter')]:
        missing = set(required_cols) - set(df.columns)
        if missing:
            raise ValueError(f"Missing columns in {name} data: {missing}")

    # Combine datasets
    player_predictions = pd.concat([
        sp_df[required_cols],
        rp_df[required_cols],
        batter_df[required_cols]
    ], ignore_index=True)
    
    return player_predictions

# Merge data and generate summary
try:
    player_predictions = merge_prediction_data(sp_data, rp_data, batter_data)
    
    # Print summary statistics
    summary = player_predictions.groupby(['position_group', 'prediction_year'])['WAR'].agg([
        'count',
        'mean',
        'std',
        'min',
        'max'
    ]).round(3)
    
    logger.info("\nPrediction Data Summary:")
    print(summary)
    
    # Validate no missing values
    missing_values = player_predictions.isnull().sum()
    if missing_values.any():
        logger.warning(f"\nMissing values found:\n{missing_values[missing_values > 0]}")
        
    logger.info(f"Successfully merged {len(player_predictions)} player predictions")
    
except Exception as e:
    logger.error(f"Error merging prediction data: {str(e)}")
    raise

2025-01-02 17:06:52,852 - INFO - 
Prediction Data Summary:
Missing values found:
WAR    15
dtype: int64
2025-01-02 17:06:52,885 - INFO - Successfully merged 15660 player predictions


                                count   mean    std    min    max
position_group prediction_year                                   
POS            2025               405  0.987  1.647 -3.557  6.374
               2026               405  0.586  1.876 -4.631  6.991
               2027               405  0.143  2.045 -5.906  7.254
               2028               405 -0.283  2.181 -6.005  7.274
               2029               405 -0.722  2.288 -6.295  7.260
               2030               405 -1.153  2.341 -6.335  7.099
               2031               405 -1.565  2.342 -6.205  6.852
               2032               405 -1.961  2.283 -6.328  6.477
               2033               405 -2.335  2.160 -6.303  5.955
               2034               405 -2.679  1.975 -6.303  5.289
               2035               405 -2.978  1.752 -6.346  4.499
               2036               405 -3.220  1.510 -6.404  3.734
               2037               405 -3.400  1.266 -6.254  2.606
          

In [4]:
"""
Salary Data Processing Module
- Cleans and standardizes salary data
- Preserves contract status information
- Handles missing and invalid values
- Validates data quality
"""

def clean_salary_data(df: DataFrame) -> DataFrame:
    """
    Clean and standardize salary data from Sportrac.
    
    Args:
        df (DataFrame): Raw salary data with Payroll and Status columns
        
    Returns:
        DataFrame: Cleaned salary data with standardized values
    """
    logger.info("Starting salary data cleaning process")
    
    cleaned_df = df.copy()
    
    try:
        # Remove non-player rows (options, buyouts, etc)
        cleaned_df = cleaned_df[~cleaned_df['Player Name'].str.contains(
            'OPT-OUT|UFA|PLAYER OPT|CLUB OPT', 
            na=False, 
            case=False
        )]
        
        # Clean Year column
        cleaned_df['Year'] = pd.to_numeric(cleaned_df['Year'], errors='coerce')
        cleaned_df = cleaned_df.dropna(subset=['Year'])
        
        # Clean Payroll column - two-step process
        payroll = (cleaned_df['Payroll']
                  .astype(str)
                  .str.replace('$', '', regex=False)
                  .str.replace(',', '', regex=False)
                  .str.replace('-', '', regex=False))
        
        cleaned_df['Payroll'] = pd.to_numeric(payroll, errors='coerce')
        
        # Status validation and cleaning
        if 'Status' not in cleaned_df.columns:
            logger.warning("Status column missing from input data")
        else:
            status_counts = cleaned_df['Status'].value_counts()
            logger.info("\nStatus distribution:")
            logger.info(status_counts)
        
        # Generate summary statistics
        stats = {
            'original_rows': len(df),
            'cleaned_rows': len(cleaned_df),
            'valid_salary_rows': cleaned_df['Payroll'].notna().sum(),
            'min_salary': cleaned_df['Payroll'].min(),
            'max_salary': cleaned_df['Payroll'].max(),
            'mean_salary': cleaned_df['Payroll'].mean()
        }
        
        logger.info("\nSalary cleaning summary:")
        for key, value in stats.items():
            logger.info(f"{key}: {value:,.2f}" if isinstance(value, float) else f"{key}: {value}")
            
        return cleaned_df[['Player Name', 'Year', 'Team', 'Payroll', 'Status']].copy()
        
    except Exception as e:
        logger.error(f"Error cleaning salary data: {str(e)}")
        raise

# Execute cleaning
try:
    salary_data_clean = clean_salary_data(salary_data)
    
    print("\nSample of cleaned salary data:")
    print(salary_data_clean.head())
    
    print("\nData validation:")
    print(f"Null values:\n{salary_data_clean.isnull().sum()}")
    
except Exception as e:
    logger.error(f"Failed to process salary data: {str(e)}")
    raise

2025-01-02 17:06:52,917 - INFO - Starting salary data cleaning process
2025-01-02 17:06:52,944 - INFO - 
Status distribution:
2025-01-02 17:06:52,945 - INFO - Status
Estimate       871
PRE-ARB        740
UFA            423
ARB 1          278
ARB 3          213
              ... 
$2,370,968       1
$36,571,428      1
$28,071,428      1
$10,015,872      1
$2,962,963       1
Name: count, Length: 134, dtype: int64
2025-01-02 17:06:52,947 - INFO - 
Salary cleaning summary:
2025-01-02 17:06:52,948 - INFO - original_rows: 3821
2025-01-02 17:06:52,949 - INFO - cleaned_rows: 3806
2025-01-02 17:06:52,949 - INFO - valid_salary_rows: 2043
2025-01-02 17:06:52,950 - INFO - min_salary: 250,000.00
2025-01-02 17:06:52,951 - INFO - max_salary: 51,875,000.00
2025-01-02 17:06:52,951 - INFO - mean_salary: 10,248,524.89



Sample of cleaned salary data:
      Player Name    Year Team     Payroll   Status
0  Corbin Carroll  2023.0  ari   1625000.0      NaN
1  Corbin Carroll  2024.0  ari   3625000.0  Pre-Arb
2  Corbin Carroll  2025.0  ari   5625000.0  Pre-Arb
3  Corbin Carroll  2026.0  ari  10625000.0    ARB 1
4  Corbin Carroll  2027.0  ari  12625000.0    ARB 2

Data validation:
Null values:
Player Name       0
Year              0
Team              0
Payroll        1763
Status          407
dtype: int64


In [5]:
"""
Player Reference and ID Integration with Enhanced Name Matching
Handles UTF-8 encoding and accent normalization
"""

import unidecode
from thefuzz import fuzz

def normalize_name(name: str) -> str:
    """Normalize player names by removing accents and standardizing format."""
    if pd.isna(name):
        return name
    return unidecode.unidecode(str(name)).upper().strip()

def create_player_reference(sp_df: pd.DataFrame, 
                          rp_df: pd.DataFrame, 
                          batter_df: pd.DataFrame) -> pd.DataFrame:
    """Create unified player reference with normalized names."""
    player_ref = pd.concat([
        sp_df[['Name', 'IDfg', 'position_group']],
        rp_df[['Name', 'IDfg', 'position_group']],
        batter_df[['Name', 'IDfg', 'position_group']]
    ]).drop_duplicates()
    
    player_ref['Name_Normalized'] = player_ref['Name'].apply(normalize_name)
    return player_ref

def merge_salary_with_ids(salary_df: pd.DataFrame, 
                         player_ref: pd.DataFrame) -> pd.DataFrame:
    """Merge salary data with player reference using normalized names."""
    # Normalize salary data names
    salary_df['Name_Normalized'] = salary_df['Player Name'].apply(normalize_name)
    
    # Perform merge
    merged_df = salary_df.merge(
        player_ref[['Name_Normalized', 'IDfg']],
        on='Name_Normalized',
        how='left'
    )
    
    # Log matching statistics
    total = len(salary_df['Player Name'].unique())
    matched = len(merged_df[merged_df['IDfg'].notna()]['Player Name'].unique())
    
    logger.info(f"\nMerge Results:")
    logger.info(f"Total players: {total}")
    logger.info(f"Matched: {matched}")
    logger.info(f"Unmatched: {total - matched}")
    
    return merged_df.drop('Name_Normalized', axis=1)

try:
    player_ref = create_player_reference(sp_data, rp_data, batter_data)
    salary_data_with_id = merge_salary_with_ids(salary_data_clean, player_ref)
    
    # Display unmatched players
    unmatched = salary_data_with_id[salary_data_with_id['IDfg'].isna()]
    if not unmatched.empty:
        print("\nSample unmatched players:")
        print(unmatched['Player Name'].unique()[:10])
        
except Exception as e:
    logger.error(f"Error in ID integration: {str(e)}")
    raise

2025-01-02 17:06:53,056 - INFO - 
Merge Results:
2025-01-02 17:06:53,057 - INFO - Total players: 1137
2025-01-02 17:06:53,057 - INFO - Matched: 695
2025-01-02 17:06:53,058 - INFO - Unmatched: 442



Sample unmatched players:
['Tim Tawa' 'Jordan Lawlar' 'Joe Elbis' 'Cristian Mena' 'Jorge Barrosa'
 'Yilber Diaz' 'Adrian Del Castillo' 'Blake Walston' 'Slade Cecconi'
 'Blaze Alexander']


In [6]:
"""
Contract Status Processing Module
- Determines Free Agency years for all players
- Handles arbitration progression
- Processes contract options and UFA designations
- Validates contract timelines
"""

def determine_fa_year(status: str, current_year: int) -> Optional[int]:
    """
    Calculate Free Agency year based on current status.
    
    Args:
        status: Player's current contract status
        current_year: Current season year
        
    Returns:
        Optional[int]: Year player reaches free agency
    """
    if not status or pd.isna(status):
        return None
        
    status = str(status).upper().strip()
    
    # Direct FA indicators
    if any(x in status for x in ['UFA', 'OPT-OUT', 'PLAYER']):
        return current_year
    
    # Service time progression
    status_to_years = {
        'ESTIMATE': 6,
        'PRE-ARB': 4,
        'ARB1': 3,
        'ARB 1': 3,
        'ARB2': 2,
        'ARB 2': 2,
        'ARB3': 1,
        'ARB 3': 1,
        'ARB4': 1,
        'ARB 4': 1
    }
    
    for key, years in status_to_years.items():
        if key in status:
            return current_year + years
            
    return None

"""
Contract Status Processing with IDfg
Determines FA years based on latest available status
"""

def process_contract_statuses(df: pd.DataFrame) -> pd.DataFrame:
    """Process latest contract status for each player."""
    logger.info("Processing contract statuses")
    
    # Get latest year for each player
    latest_status = (df[df['IDfg'].notna()]
                    .sort_values('Year', ascending=True)
                    .groupby('IDfg')
                    .last()
                    .reset_index())
    
    # Calculate FA years
    fa_years = {}
    for _, player in latest_status.iterrows():
        status = str(player.get('Status', '')).upper().strip()
        current_year = int(player['Year'])
        
        # Direct FA indicators
        if any(x in status for x in ['UFA', 'OPT-OUT', 'PLAYER']):
            fa_years[player['IDfg']] = current_year
        # Next year FA
        elif any(x in status for x in ['CLUB', 'VESTING', 'ARB3', 'ARB 3', 'ARB4', 'ARB 4']):
            fa_years[player['IDfg']] = current_year + 1
        # Service time progression
        elif any(x in status for x in ['ARB2', 'ARB 2']):
            fa_years[player['IDfg']] = current_year + 2
        elif any(x in status for x in ['ARB1', 'ARB 1']):
            fa_years[player['IDfg']] = current_year + 3
        elif 'PRE' in status and 'ARB' in status:
            fa_years[player['IDfg']] = current_year + 4
        elif status == 'ESTIMATE':
            fa_years[player['IDfg']] = current_year + 6
    
    # Map FA years back to original data
    df['FA_Year'] = df['IDfg'].map(fa_years)
    
    return df

try:
    contract_data = process_contract_statuses(salary_data_with_id)
    
    # Output validation summary
    print("\nContract Processing Summary:")
    print(f"Total players: {len(contract_data['IDfg'].unique())}")
    print(f"Players with FA years: {len(contract_data[contract_data['FA_Year'].notna()]['IDfg'].unique())}")
    print("\nFA Years by Status:")
    print(contract_data.groupby('Status')['FA_Year'].agg(['count', 'mean']).round(2))
    
except Exception as e:
    logger.error(f"Failed to process contract statuses: {str(e)}")
    raise

2025-01-02 17:06:53,095 - INFO - Processing contract statuses



Contract Processing Summary:
Total players: 699
Players with FA years: 695

FA Years by Status:
                      count     mean
Status                              
$1,250,000                1  2027.00
$1,750,000                1  2026.00
$1,800,000                1  2026.00
$1,950,000                0      NaN
$10,000,000               5  2026.80
...                     ...      ...
Pre-Arb                  33  2030.30
RFA / QO                  0      NaN
UFA                     384  2027.95
Vesting                   9  2030.11
arbitration-bypassed      2  2028.50

[134 rows x 2 columns]


In [7]:
def generate_contract_timeline(df: pd.DataFrame) -> pd.DataFrame:
    """Generate timeline until each player's FA year."""
    logger.info("Generating contract timeline")
    
    # Handle duplicates
    df_unique = (df.sort_values('Payroll', ascending=False)
                  .drop_duplicates(subset=['IDfg', 'Year'], keep='first'))
    
    # Create lookup dict
    base_data = {idfg: df[df['IDfg'] == idfg].iloc[0].to_dict() 
                 for idfg in df['IDfg'].unique() if not pd.isna(idfg)}
    
    all_rows = []
    years = range(2025, 2040)
    
    for idfg, base_row in base_data.items():
        fa_year = base_row.get('FA_Year')
        if pd.isna(fa_year):
            continue
            
        for year in years:
            # Stop at FA year
            if year > fa_year:
                continue
                
            # Check existing data
            existing = df_unique[
                (df_unique['IDfg'] == idfg) & 
                (df_unique['Year'] == year)
            ]
            
            if not existing.empty:
                all_rows.append(existing.iloc[0].to_dict())
                continue
                
            # Generate new row
            new_row = base_row.copy()
            new_row['Year'] = year
            
            # Calculate status
            years_to_fa = fa_year - year
            if years_to_fa <= 0:
                new_row['Status'] = 'FA'
            elif years_to_fa <= 1:
                new_row['Status'] = 'ARB3'
            elif years_to_fa <= 2:
                new_row['Status'] = 'ARB2'
            elif years_to_fa <= 3:
                new_row['Status'] = 'ARB1'
            else:
                new_row['Status'] = 'Pre-ARB'
                
            new_row['Payroll'] = None
            all_rows.append(new_row)
    
    result = pd.DataFrame(all_rows)
    return result[df.columns].sort_values(['IDfg', 'Year'])

# Execute timeline generation
try:
    contract_timeline = generate_contract_timeline(contract_data)
    logger.info(f"Generated {len(contract_timeline)} timeline records")
except Exception as e:
    logger.error(f"Timeline generation failed: {str(e)}")
    raise

2025-01-02 17:06:53,184 - INFO - Generating contract timeline
2025-01-02 17:06:54,937 - INFO - Generated 3387 timeline records


In [8]:
"""
WAR Value Calculation Module
Applies tiered WAR values and inflation adjustments
"""

# Constants
WAR_VALUE_TIERS = {
    'tier1': {'max': 2, 'value': 6_000_000},
    'tier2': {'max': 4, 'value': 9_000_000},
    'tier3': {'value': 12_000_000}
}
INFLATION_RATE = 0.05
BASE_YEAR = 2025

def calculate_inflation_multiplier(year: int) -> float:
    """Calculate inflation multiplier from base year."""
    return (1 + INFLATION_RATE) ** (year - BASE_YEAR)

def calculate_war_value(war: float, year: int) -> float:
    """
    Calculate WAR value using tiered system and inflation.
    
    Args:
        war (float): WAR value
        year (int): Year for inflation adjustment
    """
    if pd.isna(war) or war <= 0:
        return 0.0
        
    value = 0.0
    remaining_war = war
    
    # Tier 1: 0-2 WAR
    tier1_war = min(remaining_war, WAR_VALUE_TIERS['tier1']['max'])
    value += tier1_war * WAR_VALUE_TIERS['tier1']['value']
    remaining_war -= tier1_war
    
    if remaining_war <= 0:
        return value * calculate_inflation_multiplier(year)
        
    # Tier 2: 2-4 WAR
    tier2_war = min(remaining_war, WAR_VALUE_TIERS['tier2']['max'] - WAR_VALUE_TIERS['tier1']['max'])
    value += tier2_war * WAR_VALUE_TIERS['tier2']['value']
    remaining_war -= tier2_war
    
    if remaining_war <= 0:
        return value * calculate_inflation_multiplier(year)
        
    # Tier 3: 4+ WAR
    value += remaining_war * WAR_VALUE_TIERS['tier3']['value']
    
    return value * calculate_inflation_multiplier(year)

try:
    # Join predictions with timeline
    timeline_with_war = contract_timeline.merge(
        player_predictions[['IDfg', 'prediction_year', 'WAR']],
        left_on=['IDfg', 'Year'],
        right_on=['IDfg', 'prediction_year'],
        how='left'
    )
    
    # Calculate WAR values
    timeline_with_war['Base_Value'] = timeline_with_war.apply(
        lambda x: calculate_war_value(x['WAR'], x['Year']), 
        axis=1
    )
    
    # Clean up and validate
    timeline_with_war = timeline_with_war.drop('prediction_year', axis=1)
    
    logger.info(f"Processed {len(timeline_with_war)} rows")
    logger.info(f"Average WAR value: ${timeline_with_war['Base_Value'].mean():,.2f}")
    
except Exception as e:
    logger.error(f"Failed to calculate WAR values: {str(e)}")
    raise

2025-01-02 17:06:55,016 - INFO - Processed 3387 rows
2025-01-02 17:06:55,018 - INFO - Average WAR value: $8,745,984.29


In [9]:
"""
Contract Value Calculator
Determines player contract values based on:
1. Existing payroll data (if available)
2. Contract status (Pre-ARB, ARB1-3, FA)
3. WAR-based market value
"""

# Contract value constants
MIN_SALARY = {
    'Pre-ARB': 720000,
    'ARB1': 1000000,
    'ARB2': 2500000,
    'ARB3': 4000000
}

ARB_PERCENT = {
    'ARB1': 0.25,
    'ARB2': 0.33,
    'ARB3': 0.50
}

def normalize_status(status: str) -> str:
    """
    Normalize contract status strings for consistent processing.
    
    Args:
        status: Raw contract status string
        
    Returns:
        Normalized status string (Pre-ARB, ARB1-3, FA, or None)
    """
    if pd.isna(status):
        return None
        
    status = str(status).upper().strip()
    
    # Handle Pre-ARB variations
    if 'PRE' in status and 'ARB' in status:
        return 'Pre-ARB'
    
    # Handle ARB variations
    if 'ARB' in status:
        for i in range(1, 5):
            if str(i) in status:
                return f'ARB{min(i, 3)}'  # ARB4 counts as ARB3
                
    # Handle FA variations
    if any(x in status for x in ['UFA', 'FA']):
        return 'FA'
        
    return status

def calculate_contract_value(row: pd.Series) -> float:
    """
    Calculate player's contract value based on status and market value.
    
    Logic:
    1. Use existing Payroll if available
    2. Skip FA calculations
    3. Apply ARB percentages or minimum salary
    
    Args:
        row: DataFrame row with Status, Payroll, Base_Value, and Year
    
    Returns:
        Contract value for the given year
    """
    # 1. Check for existing Payroll
    if pd.notna(row['Payroll']):
        return row['Payroll']
    
    # 2. Get normalized status
    status = normalize_status(row['Status'])
    
    # 3. Handle status-based calculations
    if status == 'FA' or status == 'UFA':
        return None
        
    # 4. Calculate minimum salary with inflation
    year_offset = row['Year'] - BASE_YEAR
    min_salary = MIN_SALARY.get(
        status, 
        MIN_SALARY['Pre-ARB']
    ) * (1 + INFLATION_RATE) ** year_offset
    
    # 5. Handle Pre-ARB
    if status == 'Pre-ARB':
        return min_salary
        
    # 6. Handle ARB years
    if status in ARB_PERCENT:
        return max(
            min_salary,
            row['Base_Value'] * ARB_PERCENT[status]
        )
        
    # 7. If status exists but isn't handled above, use existing Payroll
    return row['Payroll']

try:
    # Calculate contract values
    timeline_with_values = timeline_with_war.copy()
    timeline_with_values['Contract_Value'] = timeline_with_values.apply(
        calculate_contract_value, 
        axis=1
    )
    
    # Calculate surplus value
    timeline_with_values['Surplus_Value'] = (
        timeline_with_values['Base_Value'] - 
        timeline_with_values['Contract_Value']
    )
    
    # Validation summary
    logger.info("\nContract Value Calculation Summary:")
    logger.info(f"Total rows processed: {len(timeline_with_values)}")
    logger.info(f"Rows with contract values: "
               f"{timeline_with_values['Contract_Value'].notna().sum()}")
    logger.info("\nContract values by status:")
    print(timeline_with_values.groupby('Status')['Contract_Value'].describe())
    
except Exception as e:
    logger.error(f"Failed to calculate contract values: {str(e)}")
    raise

2025-01-02 17:06:55,141 - INFO - 
Contract Value Calculation Summary:
2025-01-02 17:06:55,142 - INFO - Total rows processed: 3387
2025-01-02 17:06:55,143 - INFO - Rows with contract values: 2687
2025-01-02 17:06:55,144 - INFO - 
Contract values by status:


                      count          mean           std         min  \
Status                                                                
$1,250,000              1.0  1.250000e+06           NaN   1250000.0   
$1,750,000              1.0  1.750000e+06           NaN   1750000.0   
$1,800,000              1.0  1.800000e+06           NaN   1800000.0   
$10,000,000             2.0  1.750000e+07  1.060660e+07  10000000.0   
$10,750,000             1.0  1.075000e+07           NaN  10750000.0   
...                     ...           ...           ...         ...   
Pre-ARB                 4.0  7.749000e+05  2.182384e+04    756000.0   
Pre-Arb                 3.0  3.430555e+06  1.929456e+06   2000000.0   
UFA                     0.0           NaN           NaN         NaN   
Vesting                 9.0  1.792593e+07  4.912506e+06  10000000.0   
arbitration-bypassed    1.0  8.400000e+06           NaN   8400000.0   

                             25%         50%         75%         max  
Statu

In [10]:
def validate_salary_timeline(df: pd.DataFrame) -> tuple:
    """
    Validate salary timeline and generate summary statistics.
    
    Args:
        df: DataFrame with calculated values
    Returns:
        tuple: (missing_data, invalid_progression, min_salary_violations)
    """
    # Data quality checks
    validation_cols = ['IDfg', 'Year', 'Status', 'WAR', 
                      'Base_Value', 'Contract_Value', 'Surplus_Value']
    missing_data = df[validation_cols].isnull().sum()
    
    # Status progression validation
    def check_progression(group_df):
        ordered_statuses = group_df.sort_values('Year')['Status']
        status_sequence = ['Pre-ARB', 'ARB1', 'ARB2', 'ARB3', 'FA']
        current_idx = -1
        
        for status in ordered_statuses:
            if status in status_sequence:
                new_idx = status_sequence.index(status)
                if new_idx <= current_idx:
                    return True
                current_idx = new_idx
        return False
    
    invalid_progression = df.groupby('IDfg').apply(check_progression)
    
    # Value thresholds
    def check_min_salary(row):
        if row['Status'] not in MIN_SALARY:
            return False
        min_salary = MIN_SALARY[row['Status']] * (1 + INFLATION_RATE) ** (row['Year'] - BASE_YEAR)
        return row['Contract_Value'] < min_salary
    
    min_salary_violations = df[df.apply(check_min_salary, axis=1)]
    
    # Log validation results
    logger.info("\nValidation Results:")
    logger.info(f"Missing data:\n{missing_data}")
    logger.info(f"Players with invalid progression: {invalid_progression.sum()}")
    logger.info(f"Minimum salary violations: {len(min_salary_violations)}")
    
    return missing_data, invalid_progression, min_salary_violations

try:
    validation_results = validate_salary_timeline(timeline_with_values)
except Exception as e:
    logger.error(f"Validation failed: {str(e)}")
    raise

2025-01-02 17:06:55,522 - INFO - 
Validation Results:
2025-01-02 17:06:55,523 - INFO - Missing data:
IDfg                0
Year                0
Status            262
WAR                 6
Base_Value          0
Contract_Value    700
Surplus_Value     700
dtype: int64
2025-01-02 17:06:55,524 - INFO - Players with invalid progression: 2
2025-01-02 17:06:55,525 - INFO - Minimum salary violations: 0


In [11]:
"""
Clean duplicate IDfg/Year pairs from timeline data
"""

try:
    # Sort by number of non-null values (keep rows with most data)
    timeline_with_values = (timeline_with_values
        .loc[timeline_with_values
             .groupby(['IDfg', 'Year'])
             .apply(lambda x: x.isnull().sum(axis=1).idxmin())]
        .reset_index(drop=True))
    
    # Verify uniqueness
    duplicate_check = timeline_with_values.groupby(['IDfg', 'Year']).size()
    if (duplicate_check > 1).any():
        raise ValueError("Duplicates still exist after cleaning")
        
    logger.info(f"Cleaned timeline data shape: {timeline_with_values.shape}")
    
except Exception as e:
    logger.error(f"Failed to clean duplicates: {str(e)}")
    raise

2025-01-02 17:06:57,217 - INFO - Cleaned timeline data shape: (3387, 11)


In [12]:
def integrate_player_statistics(value_data, batter_data, sp_data, rp_data):
    """Simple integration of stats into value data"""
    
    # Start with value data, dropping any duplicate columns
    result = value_data.loc[:,~value_data.columns.duplicated()].copy()
    
    # Prepare batter stats (rename columns to avoid conflicts)
    batter_stats = (batter_data[['IDfg', 'prediction_year'] + [col for col in HITTER_COLUMNS if col not in ['Name', 'IDfg', 'WAR']]]
                   .rename(columns={
                       'prediction_year': 'Year',
                       'BB%': 'BB%_bat',
                       'K%': 'K%_bat',
                       'Age': 'Age_bat'
                   }))
    
    # Prepare pitcher stats (rename columns to avoid conflicts)
    pitcher_stats = (pd.concat([
        sp_data[['IDfg', 'prediction_year'] + [col for col in PITCHER_COLUMNS if col not in ['Name', 'IDfg', 'WAR']]],
        rp_data[['IDfg', 'prediction_year'] + [col for col in PITCHER_COLUMNS if col not in ['Name', 'IDfg', 'WAR']]]
    ])
    .rename(columns={
        'prediction_year': 'Year',
        'BB%': 'BB%_pit',
        'K%': 'K%_pit',
        'Age': 'Age_pit'
    })
    .drop_duplicates(subset=['IDfg', 'Year']))

    # Perform merges with renamed columns
    result = (result
             .merge(batter_stats, on=['IDfg', 'Year'], how='left')
             .merge(pitcher_stats, on=['IDfg', 'Year'], how='left'))
    
    return result

# Execute
try:
    export_data = integrate_player_statistics(
        timeline_with_values,
        batter_data,
        sp_data, 
        rp_data
    )
    print(f"Records processed: {len(export_data)}")
    print(f"Columns: {export_data.columns.tolist()}")
    
except Exception as e:
    logger.error(f"Error: {str(e)}")
    raise

Records processed: 3387
Columns: ['Player Name', 'Year', 'Team', 'Payroll', 'Status', 'IDfg', 'FA_Year', 'WAR', 'Base_Value', 'Contract_Value', 'Surplus_Value', 'Age_bat', 'BB%_bat', 'K%_bat', 'AVG', 'OBP', 'SLG', 'wOBA', 'wRC+', 'EV', 'Off', 'BsR', 'Def', 'Age_pit', 'FIP', 'SIERA', 'K%_pit', 'BB%_pit', 'GB%', 'FB%', 'Stuff+', 'Location+', 'Pitching+', 'FBv']


In [13]:
# Let's first verify the actual columns in export_data
print("\nACTUAL COLUMNS IN EXPORT_DATA:")
print(export_data.columns.tolist())

# Now let's verify if our export columns exist
export_cols = [
    'Player Name', 'Team', 'Status', 'WAR', 
    'Base_Value', 'Contract_Value', 'Surplus_Value'
]

hitting_stats = [
    'BB%_bat', 'K%_bat', 'AVG', 'OBP', 'SLG', 
    'wOBA', 'wRC+', 'EV', 'Off', 'BsR', 'Def'
]

pitching_stats = [
    'FIP', 'SIERA', 'K%_pit', 'BB%_pit', 
    'GB%', 'FB%', 'Stuff+', 'Location+', 'Pitching+', 'FBv'
]

# Check which columns are missing
print("\nMISSING COLUMNS:")
all_cols = export_cols + hitting_stats + pitching_stats
missing = [col for col in all_cols if col not in export_data.columns]
print(missing)


ACTUAL COLUMNS IN EXPORT_DATA:
['Player Name', 'Year', 'Team', 'Payroll', 'Status', 'IDfg', 'FA_Year', 'WAR', 'Base_Value', 'Contract_Value', 'Surplus_Value', 'Age_bat', 'BB%_bat', 'K%_bat', 'AVG', 'OBP', 'SLG', 'wOBA', 'wRC+', 'EV', 'Off', 'BsR', 'Def', 'Age_pit', 'FIP', 'SIERA', 'K%_pit', 'BB%_pit', 'GB%', 'FB%', 'Stuff+', 'Location+', 'Pitching+', 'FBv']

MISSING COLUMNS:
[]


In [13]:
"""
Value Export Module
Exports yearly player valuations sorted by team and WAR
"""

def export_value_data(df: pd.DataFrame, output_dir: Path) -> None:
    """Export sorted value data by year, excluding FA players."""
    logger.info("Starting value data export")
    
    output_dir.mkdir(parents=True, exist_ok=True)
    
    # Define all columns to export in correct order
    export_cols = [
        # Base info
        'Player Name', 'Team', 'Status', 'WAR',
        'Base_Value', 'Contract_Value', 'Surplus_Value',
        
        # Hitting stats
        'Age_bat', 'BB%_bat', 'K%_bat', 'AVG', 'OBP', 'SLG',
        'wOBA', 'wRC+', 'EV', 'Off', 'BsR', 'Def',
        
        # Pitching stats
        'Age_pit', 'FIP', 'SIERA', 'K%_pit', 'BB%_pit',
        'GB%', 'FB%', 'Stuff+', 'Location+', 'Pitching+', 'FBv'
    ]
    
    for year in sorted(df['Year'].unique()):
        try:
            # Get year's data, excluding FA players
            year_data = df[
                (df['Year'] == year) & 
                (df['Status'] != 'FA') & 
                (~df['Status'].str.contains('FA', na=False))
            ].copy()
            
            # Sort by team and WAR
            year_data = year_data.sort_values(['Team', 'WAR'], ascending=[True, False])
            
            # Format numeric columns
            numeric_cols = ['Base_Value', 'Contract_Value', 'Surplus_Value']
            for col in numeric_cols:
                year_data[col] = year_data[col].round(2)
            
            # Export with all columns
            output_file = output_dir / f'player_values_{year}.csv'
            year_data[export_cols].to_csv(output_file, index=False)
            
            logger.info(f"Exported {len(year_data)} records for {year}")
            
        except Exception as e:
            logger.error(f"Error processing year {year}: {str(e)}")



try:
    # Filter out FA rows and missing values
    export_data = timeline_with_values[
        (timeline_with_values['Status'] != 'FA') & 
        (timeline_with_values['WAR'].notna())
    ].copy()
    
    # Execute export
    export_value_data(export_data, OUTPUT_DIR)
    
    # Print summary
    print("\nExport Summary:")
    print(export_data.groupby('Year').size())
    
except Exception as e:
    logger.error(f"Export process failed: {str(e)}")
    raise

2025-01-02 16:56:53,338 - INFO - Starting value data export
2025-01-02 16:56:53,348 - ERROR - Error processing year 2025.0: "['Age_bat', 'BB%_bat', 'K%_bat', 'AVG', 'OBP', 'SLG', 'wOBA', 'wRC+', 'EV', 'Off', 'BsR', 'Def', 'Age_pit', 'FIP', 'SIERA', 'K%_pit', 'BB%_pit', 'GB%', 'FB%', 'Stuff+', 'Location+', 'Pitching+', 'FBv'] not in index"
2025-01-02 16:56:53,356 - ERROR - Error processing year 2026.0: "['Age_bat', 'BB%_bat', 'K%_bat', 'AVG', 'OBP', 'SLG', 'wOBA', 'wRC+', 'EV', 'Off', 'BsR', 'Def', 'Age_pit', 'FIP', 'SIERA', 'K%_pit', 'BB%_pit', 'GB%', 'FB%', 'Stuff+', 'Location+', 'Pitching+', 'FBv'] not in index"
2025-01-02 16:56:53,366 - ERROR - Error processing year 2027.0: "['Age_bat', 'BB%_bat', 'K%_bat', 'AVG', 'OBP', 'SLG', 'wOBA', 'wRC+', 'EV', 'Off', 'BsR', 'Def', 'Age_pit', 'FIP', 'SIERA', 'K%_pit', 'BB%_pit', 'GB%', 'FB%', 'Stuff+', 'Location+', 'Pitching+', 'FBv'] not in index"
2025-01-02 16:56:53,374 - ERROR - Error processing year 2028.0: "['Age_bat', 'BB%_bat', 'K%_bat',


Export Summary:
Year
2025.0    694
2026.0    694
2027.0    612
2028.0    507
2029.0    294
2030.0    160
2031.0     38
2032.0     26
2033.0     18
2034.0     13
2035.0      5
2036.0      3
2037.0      2
2038.0      2
2039.0      1
dtype: int64
