# Draft Prep

Lots prepare csv files for draft activity. Also prepare weighted csv files for other uses.

In [None]:
import functools
from functools import partial
import os
import sys
import warnings
import re
from collections.abc import Callable, Iterable, Mapping


import pandas as pd
import numpy as np
from numpy.typing import ArrayLike
from scipy.stats import rankdata

from philosofool.data_sources.utils import read_yml  # type: ignore
from philosofool.data_science.graph import MetricGraph

from fantasy_baseball_draft.spg import position_adjusted_fwar, largest, spg_model
from fantasy_baseball_draft.utils import StatSynonyms, load_cbs_data, DataLoader
from fantasy_baseball_draft.utils import cbs_player_col_to_df
from fantasy_baseball_draft.stats import StatCalculator
from fantasy_baseball_draft.draft_prep.align import build_id_map_from_stat_associations, build_id_map

data_path = read_yml('local/config.yml')['paths']['local_data']


## ID Functions

Why is this so complicated? We need to join players expected to play in the current year with Fangraphs ID data to create a unique ID column for merging.
TL;DR: we're working with disjoint lists to make it all work.
Historical data solves this problem simply for players who have played.
But there are players expected who did not appear in the previous year.
For those, the name of the player is helpful for the merging.
But the result is that we need (1) a dataset of historical MLB play and (2) some preseason projections.
The players 

In [None]:
pitching_metrics = ['ERA', 'WHIP', 'W', 'S', 'K']
hitting_metrics = ['R', 'HR', 'RBI', 'SB', 'BA']
scoring_metrics = pitching_metrics + hitting_metrics


## Add Fangraphs Player Id to CBS data

In [None]:
hitter_match = ['AB', 'H', 'BB', 'RBI', 'K']
pitcher_match = ['IP', 'W', 'G', 'K', 'H', 'W']

In [None]:
data_path = read_yml('local/config.yml')['paths']['local_data']
hist_path = os.path.join(data_path, 'historical')
loader = DataLoader(hist_path)

pitcher_ids = build_id_map_from_stat_associations(
    loader.load_cbs_csv('cbs_pitchers_2023.csv'),
    loader.load_csv('fg_pitchers_2023.csv'),
    pitcher_match
)

hitter_ids = build_id_map_from_stat_associations(
    loader.load_cbs_csv('cbs_hitters_2023.csv'),
    loader.load_csv('fg_hitters_2023.csv'),
    hitter_match
)

In [None]:
def weight_stats(df1, df2, stats: list[str]) -> pd.DataFrame:
    """Combine stats from two dataframes."""

    if not len(df1.columns.intersection(stats)) == len(stats):
        raise ValueError(f"Some requested stats are not in df1 {df1}")
    if not len(df2.columns.intersection(stats)) == len(stats):
        raise ValueError(f"Some requested stats are not in df2 {df2}")

    shared_idx = df2.index.intersection(df1.index)
    out = df1.copy()[stats].astype({col: float for col in stats})
    out.loc[shared_idx, stats] = (df1[stats] + df2[stats]) / 2
    return out


In [None]:
def test_weight_stats():
    df1 = pd.DataFrame({'a': [1, 3, 0], 'b': [0, 1, 2], 'c': [0, 0, 0]}, index=[1, 4, 2]).astype(float)
    df2 = pd.DataFrame({'a': [1, 1, 3], 'b': [0, 1, 2]}, index=[1, 2, 3]).astype(float)
    result = weight_stats(df1, df2, ['a', 'b'])

    assert isinstance(result, pd.DataFrame)
    assert result['a'].loc[2] == 0.5
    assert 'c' not in result


test_weight_stats()


In [None]:
def as_int_safe(idx):
    if isinstance(idx, str):
        try:
            return int(idx)
        except Exception:
            return idx
    return idx

In [None]:
def add_missing_pitchers(df: pd.DataFrame):
    df = df.copy()
    # Fix Edwin Diaz
    # TODO: figure out why he wasn't found. Probably the diacritic in his last name.
    assert 'Edwin Diaz' in df.loc[15].Player
    df.loc[15, 'playerid'] = 14710
    return df

In [None]:
eligibility = DataLoader(os.path.join(data_path, 'eligibility')).load_cbs_csv('eligibility.csv')
projections = DataLoader(os.path.join(data_path, 'projections/2024'))
fg_hitter_proj = projections.load_csv('fg_depth_hitters.csv').map(as_int_safe)
fg_pitcher_proj = projections.load_csv('fg_depth_pitchers.csv').map(as_int_safe)

cbs_hitters_proj = (
    projections
    .load_cbs_csv('cbs_hitters.csv')
    .drop(columns=['BA'])  # recalculate later.
    .assign(
        playerid=lambda df: build_id_map(df, fg_hitter_proj, hitter_ids),
        PA=lambda df: df.AB + df.BB
    )
    .merge(eligibility[["Player", "Eligible"]], how='left', on="Player")
    .merge(
        projections.load_csv('age_data.csv').drop_duplicates(subset='playerid').filter(['playerid', 'Age']),
        on='playerid',
        how='left'
    )
    .fillna({'Age': 25})
)

cbs_pitchers_proj = (
    projections
    .load_cbs_csv('cbs_pitchers.csv')
    .assign(
        playerid=lambda df: build_id_map(df, fg_pitcher_proj, pitcher_ids),
        ER=lambda df: df.ERA * df.IP / 9
    )
    .drop(columns=['ERA', 'WHIP'])  # recalculate later.
)

def _drop_dups_set_index(df, col: str|list[str]) -> pd.DataFrame:
    return df.drop_duplicates(subset=col).set_index(col)

In [None]:

def make_hitter_draft_data(cbs_data: pd.DataFrame, fangraphs_data: pd.DataFrame, eligibility: pd.DataFrame, age_data: pd.DataFrame, hitter_ids: pd.Series):
    cbs_hitters_proj = (
        cbs_data
        .drop(columns=['BA'])  # recalculate later.
        .assign(
            playerid=lambda df: build_id_map(df, fangraphs_data, hitter_ids),
            PA=lambda df: df.AB + df.BB
        )
        .merge(eligibility[["Player", "Eligible"]], how='left', on="Player")
        .merge(
            age_data.drop_duplicates(subset='playerid').filter(['playerid', 'Age']),
            on='playerid',
            how='left'
        )
        .fillna({'Age': 25})
    )

    hitters = (
        cbs_hitters_proj
        .set_index('playerid')
        .assign(**weight_stats(
            cbs_hitters_proj.drop_duplicates(subset='playerid').set_index('playerid'),
            fg_hitter_proj.set_index('playerid'),
            ['PA', 'AB', 'BB', 'H', 'HR', 'K', 'RBI', 'R', 'SB'])
        )
    )
    return hitters

def make_pitcher_draft_data(cbs_data: pd.DataFrame, fangraphs_data: pd.DataFrame, pitcher_ids: pd.Series):
    cbs_pitchers_proj = (
        cbs_data
        .assign(
            playerid=lambda df: build_id_map(df, fangraphs_data, pitcher_ids),
            ER=lambda df: df.ERA * df.IP / 9
        )
        .drop(columns=['ERA', 'WHIP'])  # recalculate later.
        .pipe(add_missing_pitchers)
    )

    pitchers = (
        cbs_pitchers_proj
        .set_index('playerid')
        .assign(**weight_stats(cbs_pitchers_proj.drop_duplicates('playerid').set_index('playerid'), fg_pitcher_proj.set_index('playerid'), ['IP', 'G', 'GS', 'QS', 'W', 'L', 'S', 'K', 'BB', 'H', 'ER']))
    )
    return pitchers


In [None]:
cbs_pitchers_proj.head()

In [None]:
hitters
pitchers.head()

## Model Fantasy League Value

In [None]:
# Positional eligibilty and positional value

def largest(arr: np.ndarray, k: int) -> np.ndarray:
    """Return the k largest elements in an array.

    The output is unsorted, but element 0 is its minimum value.
    """
    k_ = arr.size - k
    return np.partition(arr, kth=k_)[-k:]

def position_value(hitter_fwar: ArrayLike, eligible: ArrayLike, position: str, n_rostered: int) -> float:
    if hasattr(hitter_fwar, 'values') :
        hitter_fwar = hitter_fwar.values
    eligible_idx = np.nonzero(matches_eligible(eligible, position))
    best = np.partition(hitter_fwar[eligible_idx], kth=n_rostered)[-n_rostered:]
    return best[0]

def matches_eligible(eligbible: ArrayLike, position: str) -> pd.Series:
    """Return boolean series where eligible matches position."""
    elig_series = pd.Series(eligbible)
    pattern = f'^{position}$|,{position},|^{position},|,{position}$'
    clean_elig = elig_series.str.replace(r'\s', '', regex=True)
    return clean_elig.str.contains(pattern, regex=True)


def position_adjusted_fwar(raw_fwar: ArrayLike, eligible: ArrayLike, position: str, pos_rostered: int, roster_depth: int) -> np.ndarray:
    """Model position adjusted fantasy value."""
    # TODO: Figure out how to link graphs so that positonal valuation can pass through more efficiently:
    # this process won't cache positional value during the calculations, so multiple calls repeat the
    # baseline call.
    raw_fwar = raw_fwar.fillna(-1.)
    baseline_replacement_level = largest(raw_fwar, roster_depth)[0]
    position_replacement_level = position_value(raw_fwar, eligible, position, pos_rostered)
    pos_value = baseline_replacement_level - position_replacement_level
    return np.where(matches_eligible(eligible, position), raw_fwar + pos_value - baseline_replacement_level, raw_fwar - baseline_replacement_level)

# some tests

def test_matches_eligible():
    """Test of matches_eligible."""
    eligible = ['C', 'C, CF', '1B, C', '1B, C, SS', 'CF', 'SS']
    result = matches_eligible(eligible, 'C')
    assert np.array_equal(result, [True, True, True, True, False, False])

def test_postion_value():
    """Test of postiion_values."""
    fwar = np.arange(11, 0, -1) + 2
    eligible = np.array(('c' + ' 1b'*6 + ' c'*4).split())
    result = position_value(fwar, eligible, 'c', 4)
    assert result == 3
    result = position_value(fwar, eligible, 'c', 1)
    assert result == 13
    np.testing.assert_raises(ValueError, position_value, fwar, eligible, 'c', 10)

    fwar = pd.Series(fwar)  # series may not play with internals of position_value.
    result = position_value(fwar, eligible, 'c', 4)
    assert result == 3


test_matches_eligible()
test_postion_value()

In [None]:
from sklearn.linear_model import LinearRegression

def standings_html_to_df(standings: list[pd.DataFrame]) -> pd.DataFrame:
    """Combine several tables extracted from a webpage into a single DataFrame."""
    stat_standings = []
    for df in standings:
        metrics = StatCalculator().metrics
        for i, row in df.iterrows():
            if row.loc[0] == 'Team':
                idx = i
                break
        stat_standings.append(df.loc[i + 1:, 0:1].rename(columns=df.loc[i]))
    standings = functools.reduce(lambda a, b: a.merge(b, on='Team'), stat_standings)
    return standings.astype({col: float for col in standings.columns if col != 'Team'})


def weighted_ratio(x: ArrayLike, weight: ArrayLike, median: ArrayLike):
    return weight * (x - median)

def model_spg(arr: np.ndarray|pd.Series, low_better=False) -> Callable[[ArrayLike], np.ndarray]:
    """Calculate a linear regression for spg weights and return a function that applies it."""
    if isinstance(arr, pd.Series):
        arr = arr.values
    arr = arr.reshape(-1, 1)
    points = rankdata(arr * -1) if low_better else rankdata(arr)
    slope = LinearRegression().fit(arr, points).coef_[0]

    def spg_value(x: ArrayLike) -> np.ndarray:
        return x * slope

    return spg_value


spg_model = MetricGraph.from_model({
    # 'IP': (lambda _: 1200., ('ERA',)),
    # 'AB': (lambda _: 5600., ('BA',)),
    'median_ERA': (np.median, ('ERA',)),
    'median_WHIP': (np.median, ('WHIP',)),
    'median_BA': (np.median, ('BA',)),
    'xER': (weighted_ratio, ('ERA', 'IP', 'median_ERA')),
    'xWHIP': (weighted_ratio, ('WHIP', 'IP', 'median_WHIP')),
    'xH': (weighted_ratio, ('BA', 'AB', 'median_BA')),
    'W_spg': (model_spg, ('W',)),
    'S_spg': (model_spg, ('S',)),
    'K_spg': (model_spg, ('K',)),
    'ERA_spg': (partial(model_spg, low_better=True), ('xER',)),
    'WHIP_spg': (partial(model_spg, low_better=True), ('xWHIP',)),
    'R_spg': (model_spg, ('R',)),
    'HR_spg': (model_spg, ('HR',)),
    'RBI_spg': (model_spg, ('RBI',)),
    'SB_spg': (model_spg, ('SB',)),
    'BA_spg': (model_spg, ('xH',)),
})

def extract_model(df: pd.DataFrame, metric_graph: MetricGraph, metrics: Iterable) -> MetricGraph:
    """Convert standings to a metric graph for subsequent calculations."""
    calculated_model = metric_graph.calculate_metrics(df, metrics)
    model_graph = {metric: metric_graph.dependency_graph[metric] for metric in metrics}
    model_fns = {metric: calculated_model[metric] for metric in metrics}
    return MetricGraph(model_graph, model_fns)


fwar_model = {
    'pitcher_raw_fWAR': (StatCalculator.reduce_sum, tuple(f'{metric}_spg' for metric in pitching_metrics)),
    'pitcher_fWAR': (lambda fwar: fwar - largest(fwar.fillna(-1), 160)[0], ('pitcher_raw_fWAR',)),
    'pitcher_fWAR150': (lambda raw, ip: np.divide(raw * 150., ip, where=(ip!= 0.), out=np.zeros_like(ip, dtype=np.float32)), ('pitcher_raw_fWAR', 'IP')),
    'hitter_raw_fWAR': (StatCalculator.reduce_sum, ('R_spg', 'RBI_spg', 'HR_spg', 'BA_spg', 'SB_spg')),
    'hitter_fWAR': (partial(position_adjusted_fwar, position='C', pos_rostered=16, roster_depth=176), ('hitter_raw_fWAR', 'Eligible')),
    'hitter_fWAR600':(lambda raw, pa: np.divide(raw * 600, pa, where=(pa!= 0.), out=np.zeros_like(pa, dtype=np.float32)), ('hitter_raw_fWAR', 'PA')),
}


In [None]:

# a bit of boilerplate with known values.
standings = standings_html_to_df(pd.read_html(os.path.join(data_path, 'standings/klf_2023.html'))[5:])
spg_names = [f'{metric}_spg' for metric in pitching_metrics + hitting_metrics]

fantasy_stat_model = StatCalculator.from_model(
    extract_model(standings.assign(AB=lambda _: 5600, IP=lambda _: 1200), spg_model, spg_names).model()
    | spg_model.model(['xER', 'xH', 'xWHIP'])
    | fwar_model
    | StatCalculator().model
)
median_stats = spg_model.calculate_metrics(standings, ['median_BA', 'median_ERA', 'median_WHIP'])

In [None]:
hitters = fantasy_stat_model.add_metrics(hitters.assign(median_BA=lambda _: median_stats['median_BA']), metrics=['BA', 'hitter_fWAR', 'hitter_fWAR600'])

In [None]:
pitchers = fantasy_stat_model.add_metrics(
    pitchers.assign(
        median_ERA=lambda _: median_stats['median_ERA'],
        median_WHIP=lambda _: median_stats['median_WHIP']
    ),
    metrics=[f'{metric}_spg' for metric in pitching_metrics] + ['ERA', 'WHIP', 'pitcher_fWAR', 'pitcher_fWAR150']
)

## Closers

Which live in their own separate world.

In [None]:
def fix_ids(original_id) -> int:
    """Fix the datatype where some there are alpha numeric and integers in the id column."""
    if type(original_id) == str:
        if re.search(r'[A-Za-z].', original_id):
            id = int(re.sub(r'[A-Za-z].', '', original_id))
            return id
    elif pd.isna(original_id):
        return -1
    return int(original_id)


def cummulative_rank(df):
    # TODO: less lazy version where we at least pass the rank column names.
    stacked = np.stack([df[col] for col in ('Eno', 'Greg', 'lenhart_rank')], axis=1)
    stacked = np.where(stacked <= 40, 40 - stacked, 0.)
    stacked = np.sqrt(np.sum(stacked ** 2, axis=1))
    return rankdata(-1*stacked)

closer_data = (
    (athlethic_projections := projections.load_csv('the_athletic_closers.csv'))

    .merge(
        fg_pitcher_proj,
        how='left',
        on='Name',
    )
    .assign(playerid=lambda df: df.playerid.apply(lambda s: fix_ids(s)))
    .filter(regex=r'Name|Eno|Greg|ADP|playerid')
    .astype({'playerid': int}).set_index('playerid')
    .merge(pitchers.filter(pitching_metrics + ['pitcher_fWAR']), right_index=True, left_index=True, how='left')
    .dropna()
    .assign(
        lenhart_rank=lambda df: rankdata(df.pitcher_fWAR.values * -1),
        closer_rank=cummulative_rank
    )
    .filter(['playerid', 'closer_rank', 'Eno', 'Greg'])
)

pitchers = pitchers.merge(closer_data, left_index=True, right_index=True, how='left').fillna(-1)#.query('closer_rank > 0')
#closer_data


## Explore Data

In [None]:
hitter_essentials = ['Avail', 'Player', 'Age', 'Eligible', 'PA', 'R', 'HR', 'RBI', 'K', 'SB', 'BA', 'hitter_fWAR', 'hitter_fWAR600']
pitcher_essentials = ['Avail', 'Player', 'IP', 'GS', 'W', 'S', 'K', 'ERA', 'WHIP', 'pitcher_fWAR', 'pitcher_fWAR150', 'closer_rank']

hitter_df = hitters[hitter_essentials].sort_values('hitter_fWAR', ascending=False)
pitcher_df = pitchers[pitcher_essentials].sort_values('pitcher_fWAR', ascending=False)

def filter_avail(df, regex):
    return df[df.Avail.str.contains(regex)]

def filter_elig(df, position):
    return df[matches_eligible(df.Eligible, position)]

def filter_name(df, regex):
    return df[df.Player.str.contains(regex, re.IGNORECASE)]


In [None]:
fresh_pitch = DataLoader(os.path.join(data_path, 'draft')).load_cbs_csv('pitchers.csv').Player.tolist()
# fresh_hit = DataLoader(os.path.join(data_path, 'draft')).load_cbs_csv(...).Player
pitcher_df[pitcher_df.Player.isin(fresh_pitch)]
# hitter_df[hitter_df.Player.isin(fresh_hit)]
# fresh_pitch

In [None]:
hitter_df

In [None]:
position = 'CI|MI'
avail = 'W\W'
name = ''
start = 5
hitter_df.pipe(filter_avail, avail).pipe(filter_elig, position).pipe(filter_name, name)[start:start+15].sort_values('hitter_fWAR', ascending=False)
# hitter_df.query('Age < 25').pipe(filter_avail, avail)[start:start+15]

In [None]:
query = ''
avail = 'W\W'
name = ''
start = 28
closers = False


def pitcher_filter(df, avail: str, query: str, closers: bool, name: str, start: int):
    stop = start + 15
    _filter_closers = 'closer_rank != -1 or S > 0'
    _q_string = _filter_closers if closers else query
    _sort_by = 'pitcher_fWAR' if not closers else 'closer_rank'
    ascend = True if closers else False
    pitchers = (
        pitcher_df
        .filter(pitcher_essentials)
        .pipe(filter_avail, avail)
    )
    if query or closers:
        pitchers = pitchers.query(_q_string)
    return pitchers.pipe(filter_name, name)[start:stop].sort_values(_sort_by, ascending=ascend)

pitcher_filter(pitcher_df, avail, query, False, name, start)

In [None]:
pitcher_filter(pitcher_df, avail, query, True, name, start)

In [None]:

pitchers.columns
hitters[hitter_essentials].pipe(filter_avail, '^W[^a-z]')
hitters[hitter_essentials].pipe(filter_elig, 'U').pipe(filter_avail, '^W[^a-z]').head(25)[['Avail', 'Age', 'Player', 'Eligible', 'hitter_fWAR']].sort_values('hitter_fWAR', ascending=False)

In [None]:
positions = ['C', '1B', '2B', '3B', 'SS', 'CF', 'LF', 'RF', 'U']
for position in positions:
    position_df = (
        hitters
        .pipe(filter_elig, position)
        .sort_values('hitter_fWAR', ascending=False)
        .filter(hitter_essentials)
        .pipe(filter_avail, '^W[^a-z]')
        .query('hitter_fWAR > .1 or (hitter_fWAR600 > 15 and PA > 400)')
        #.drop(columns='Avail')
        .round(3)
    )


## Matching Unfound Players

Everything above is solid. WIP stuff to find more player ids.