# Statistical Summary/Analysis
Use this notebook to calculate summary stats, correlation analyses
and other useful metrics for training and evalution data.

To use:
1. Indicate which data file(s) to analyze by setting _DATA_FILES_PATTERN_
2. Review/change _COLS_TO_DROP_, _COLS_TO_ANALYZE_, _TARGET_PATTERN_, _ANALYSES_ and other settings

### Error Analysis
Visualize the error between prediction and truth
- For error analysis create the error dataset using _dumpdata.sc_ with the _--model_ argument.
- (notebook) Set _ANALYSIS_ to 'error'
- In the notebook set _ERROR_COLS_ to a dict with _truth_ and _predict_ values for the columns containing the true and predicted values

### Summary
Basic summary stats of the loaded dataframe statistics

### Histogram
Histograms showing distribution of statistics

In [None]:
import os
from typing import Literal

AnalysisType = Literal["summary", "error", "histogram"]

ANALYSES: list[AnalysisType]
"""which analyses should be done"""
COLS_TO_DROP = ".*recent-[1-9].*"
COLS_TO_ANALYZE = None
FILTER_QUERY = None
PREVIEW_DATA = False
"""show a preview of data before analysis"""
TARGET_PATTERN = None
"""regexp of the target columns"""

# histogram visualization settings
HISTOGRAM_COLS = 3
HISTOGRAM_BINS = 10
HISTOGRAM_INCHES_PER_ROW = 3
HISTOGRAM_INCHES_PER_COL = 5

DATA_LIMIT = 100000
"""limit data to speed up analysis"""

ERROR_COLS = None
ADDL_FEATURE_FUNC = None
DROP_TARGET_OUTLIERS = True

DATA_FILES_DIR = os.path.join("/", "fantasy-isync", "fantasy-modeling", "2025.03", "data")
"""the path to data files"""

DATA_FILES_PATTERN: str
"""Pattern to glob for data files to analyze"""

print("Defaults set")

In [None]:
# Define what analysis will be done

from glob import glob

import pandas as pd

# DATA_FILES_PATTERN  = "nfl_team.csv"
# DATA_FILES_PATTERN  = "nfl_WRTE.csv"
# DATA_FILES_PATTERN  = "nfl_QB.csv"
# DATA_FILES_PATTERN  = "nfl_K.csv"
# DATA_FILES_PATTERN  = "mlb_pitcher.parquet"
DATA_FILES_PATTERN  = "mlb_hitter.parquet"
# DATA_FILES_PATTERN  = "mlb_team.parquet"
# DATA_FILES_PATTERN  = "nhl_goalie.parquet"


GLOB_PATTERN= os.path.join(DATA_FILES_DIR, DATA_FILES_PATTERN)
DATA_FILES = glob(GLOB_PATTERN)

# ANALYSES = ["error"]
ERROR_COLS = {
    "truth": "calc:dk_score",
    "predict": "calc:dk_score:std-mean",  #  "prediction:calc:dk_score"
}
TARGET_PATTERN = "(calc:[^:]+|stat:[^:]+)$"
ANALYSES = ["histogram", "summary"]
# PREVIEW_DATA = True

COLS_TO_ANALYZE = r"^(stat|extra|calc):(?!venue|y_score).*$"

assert len(DATA_FILES) > 0, "No datafiles found to process"
print(f"Found {len(DATA_FILES)} files at '{GLOB_PATTERN}'")

In [None]:
import re
from math import sqrt
from typing import Callable, Pattern

import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
import numpy as np
from sklearn import metrics
from tqdm import tqdm


def load(
    path: str,
    cols_to_drop: Pattern[str] | None = None,
    cols_to_analyze: Pattern[str] | None = None,
    filter_query: str | None = None,
    addl_features_func: Callable | None = None,
):
    """
    filter_query: Rows not matching this query will be dropped
    cols_to_drop: list of cols drop, strings or regexs
    cols_to_analyze: list of cols to analyze. cols are names or regexs
    """
    extension = path.rsplit('.', 1)[1]
    if extension == "csv":
        df = pd.read_csv(path)
    elif extension in ['pq', 'parquet']:
        df = pd.read_parquet(path)
    else:
        raise ValueError(f"Unknown file type for path '{path}'")
    print(f"Loaded {len(df)=} {len(df.columns)=}")
    if DATA_LIMIT is not None:
        df = df.head(DATA_LIMIT)
    # if df.isna().any().any():
    #     print("NAs found. filling na->0.0")
    #     df = df.fillna(0)

    if ADDL_FEATURE_FUNC is not None:
        cols = sorted(
            {":".join(col.split(":", 2)[:2]) for col in df.columns if col[:4] in ("stat", "calc")}
        )
        print(f"{cols=}")

        tqdm.pandas(desc="addl_features")
        addl_df = df.progress_apply(addl_features_func, result_type="expand", axis=1)
        addl_cols = list(addl_df.columns)
        print(f"applied addl_func. addl_cols = {addl_cols}")
        df = pd.concat([df, addl_df], axis=1)
    else:
        addl_cols = []

    file_len = len(df)
    if filter_query:
        df = df.query(filter_query)
        print(f"Filter query dropped {file_len - len(df)} rows, {len(df)} remaining")
    if cols_to_drop is not None:
        cols = [col for col in df.columns if re.match(cols_to_drop, col)]
        print(f"Dropping columns from {cols_to_drop=}: {cols}")
        df = df.drop(columns=cols)
    if cols_to_analyze is not None:
        cols = [col for col in df.columns if re.match(cols_to_analyze, col)]
        if (cols_dropped := len(df.columns) - len(cols)) > 0:
            print(
                f"Dropping {cols_dropped} columns, {len(cols)} columns remaining in analysis. "
                f"Dropped cols: {set(df.columns) - set(cols)}"
            )
            df = df[cols]
    print(f"Loaded features: {sorted(list(df.columns))}")
    return df


def summarize(df: pd.DataFrame, targets: list[str] | None):
    """stat summarization and correlation analysis"""
    if targets is not None:
        drop_cols = targets + [col for col in df.columns if ":" not in col]
        no_target_df = df.drop(columns=drop_cols)
        print(
            f"Running correlation of {targets=} against numeric features in {sorted(no_target_df.columns)}"
        )
        corr_df = pd.DataFrame(
            {target: no_target_df.corrwith(df[target], numeric_only=True) for target in targets}
        ).sort_values(targets, ascending=False, key=abs)
    else:
        print("Running full cross correlation")
        corr_df = df.corr()

    # use a color map that cycles from intense to grey to intense cause highly negative correlation
    # is as good as highly positive
    cmap = plt.cm.get_cmap("PRGn")  # Choose a diverging colormap

    summary = {
        "summary": df.describe().transpose().drop(columns="count"),
        "correlation": corr_df.style.background_gradient(axis=None, cmap=cmap, vmin=-1, vmax=1),
    }
    return summary


def drop_target_outliers(df: pd.DataFrame, targets: set[str], outlier_range: float = 2.0):
    """drop rows with target outliers"""
    df_no_outliers = df
    for target in targets:
        mean = df[target].mean()
        std = df[target].std()
        lower_bound = mean - outlier_range * std
        upper_bound = mean + outlier_range * std
        df = df[(df[target] >= lower_bound) & (df[target] <= upper_bound)]
    if len(df) < len(df_no_outliers):
        print(f"Dropped {len(df_no_outliers) - len(df)} rows due to outliers")
    return df


def error_analysis(df: pd.DataFrame, desc, truth_col, pred_col):
    assert truth_col in df.columns and pred_col in df.columns, "truth or prediction col not found"
    error_df = df[[truth_col, pred_col]].dropna()
    error_df["error"] = error_df[truth_col] - error_df[pred_col]
    error_df.rename(columns={truth_col: "truth", pred_col: "prediction"}, inplace=True)
    r2 = round(metrics.r2_score(error_df.truth, error_df.prediction), 4)
    rmse = round(sqrt(metrics.mean_squared_error(error_df.truth, error_df.prediction)), 4)
    mae = round(sqrt(metrics.mean_absolute_error(error_df.truth, error_df.prediction)), 4)

    fig, axs = plt.subplots(1, 2, figsize=(10, 5))
    fig.suptitle(f"{desc or 'unknown model'} : n={len(df)} {r2=} {rmse=} {mae=}")
    for ax in axs:
        ax.axis("equal")

    min_v = min(error_df.truth.min(), error_df.prediction.min())
    max_v = max(error_df.truth.max(), error_df.prediction.max())

    axs[0].plot((min_v, max_v), (min_v, max_v), "-g", linewidth=1)
    error_df.plot(kind="scatter", x="truth", y="prediction", ax=axs[0])

    axs[1].yaxis.set_label_position("right")
    axs[1].plot((min_v, max_v), (0, 0), "-g", linewidth=1)
    error_df.plot(kind="scatter", x="truth", y="error", ax=axs[1])

In [None]:
for filepath in DATA_FILES:
    print(f"Analyzing '{filepath}'")
    df = load(
        filepath,
        filter_query=FILTER_QUERY,
        cols_to_drop=COLS_TO_DROP,
        cols_to_analyze=COLS_TO_ANALYZE,
        addl_features_func=ADDL_FEATURE_FUNC,
    )

    targets = (
        [col for col in df.columns if re.match(TARGET_PATTERN, col)] if TARGET_PATTERN else None
    )

    if DROP_TARGET_OUTLIERS:
        assert (
            targets is not None or ERROR_COLS is not None
        ), f"must specify outlier_cols or ERROR_COLS to drop outliers {targets=} {ERROR_COLS=}"
        outlier_cols = set(targets) if targets is not None else {ERROR_COLS["truth"]}
        assert outlier_cols <= set(df.columns), f"{outlier_cols=} must be a subset of {df.columns=}"
        df = drop_target_outliers(df, outlier_cols)

    if PREVIEW_DATA:
        display(f"data preview total-n={len(df)}", df.head(5).style.hide())

    if "summary" in ANALYSES:
        print(f"{TARGET_PATTERN=} => {targets=}")
        assert (targets is not None and len(targets) > 0) == (TARGET_PATTERN is not None)
        summary = summarize(df, targets)
        with pd.option_context(
            "display.max_rows", None, "display.max_columns", None, "display.max_colwidth", None
        ):
            for name, summary_df in summary.items():
                display(name)
                display(summary_df)

    if "error" in ANALYSES:
        assert ERROR_COLS is not None
        assert ERROR_COLS['truth'] in df.columns, f"{ERROR_COLS['truth']=}"
        assert ERROR_COLS['predict'] in df.columns, f"{ERROR_COLS['predict']} not found in df"
        error_analysis(df, os.path.basename(filepath), ERROR_COLS['truth'], ERROR_COLS['predict'])

    if "histogram" in ANALYSES:
        rows = 1 + (len(df.columns) // HISTOGRAM_COLS)
        layout = rows, HISTOGRAM_COLS
        figsize = (HISTOGRAM_COLS * HISTOGRAM_INCHES_PER_COL), (rows * HISTOGRAM_INCHES_PER_ROW)
        print(f"{layout=} {figsize=}")
        df.hist(bins=HISTOGRAM_BINS, layout=layout, figsize=figsize)
        plt.tight_layout()
        plt.show()