In [None]:
from collections.abc import Callable, Mapping, Iterable, Sequence, Generator
from typing import Any, Optional

import numpy as np
import pandas as pd
import os

In [None]:
import numpy as np
import pandas as pd
import os

import tensorflow as tf, keras
from keras import layers
from keras.layers import StringLookup, IntegerLookup, Embedding, Normalization, Dense

from sklearn.model_selection import train_test_split

In [None]:
path = os.path.normpath("practice_baseball_data.csv")

In [None]:
df = pd.read_csv(path)

def make_label(df) -> pd.Series:
    above_average_for_season = df.groupby(['Season'])['HR'].transform(np.mean)
    return (df.HR > above_average_for_season).astype(int)


def preprocess_data(df) -> pd.DataFrame:
    """Not really a part of practice. This is cleaning we would normally expect to be finished."""
    df = (
        df.rename(columns={"K%": "K", "BB%": "BB"})
        .assign(BB=lambda df: df['BB'].str.replace('%', ""), K=lambda df: df['K'].str.replace('%', ""))
        .astype({"BB": 'float', 'K': 'float'})
        .drop(columns=['xwOBA'])
    )
    df = df.merge(df.assign(Season=lambda df: df.Season + 1, label=make_label)[['Season', 'playerid','label']], on=['Season', 'playerid'], how='left').dropna()
    return df

df = preprocess_data(df)
# df = df.drop(columns=['Name', "Team"])
selected = ['label'] + ["Season", "HR", "K", "BB", "PA", "Team"]
df = df[selected]

In [None]:
def split_data(df, seed=None, frac=None) -> tuple[pd.DataFrame, ...]:
    test_size = min(int(len(df) * .2), 5000)
    train, val = train_test_split(df, test_size=test_size, random_state=seed)
    print(len(train), len(val))
    return train, val


In [None]:
def make_dataset(df: pd.DataFrame, shuffle: bool = True, batch_size: int = 1024) -> tf.data.Dataset:
    """Turn dataframe into tensorflow dataset."""
    df = df.copy()
    labels = df.pop('label')
    df = {key: value.values[:, tf.newaxis] for key, value in df.items()}
    ds = tf.data.Dataset.from_tensor_slices((dict(df), labels))
    if shuffle:
        ds = ds.shuffle(buffer_size=len(df))
    ds = ds.batch(batch_size)
    ds = ds.prefetch(batch_size)
    return ds

def extract_feature_ds(ds: keras.Input, name) -> tuple:
    """Return a dataset of a single feature. Helpful for adpapting lookups."""
    feature_ds = ds.map(lambda x, _: x[name])
    dtype = feature_ds.element_spec.dtype
    return feature_ds, dtype

def make_embedding(data: tf.data.Dataset, name) -> Callable[..., Embedding]:
    feature, dtype = extract_feature_ds(data, name)
    if dtype == tf.string:
        lookup = StringLookup()
    else:
        lookup = IntegerLookup()
    lookup.adapt(feature)
    n_tokens = len(lookup.get_vocabulary())
    output_dim = int(np.log(n_tokens))
    embedding = Embedding(n_tokens, max(1, output_dim))
    # return lookup
    return lambda x: embedding(lookup(x))

def make_normalization(data, name):
    feature, dtype = extract_feature_ds(data, name)
    encoder = Normalization()
    encoder.adapt(feature)
    return encoder

def make_inputs_and_encoded_features(data, cat_cols) -> tuple[list, dict]:
    all_inputs = []
    encoded_features = {}
    features = data.element_spec[0]
    for name, spec in features.items():
        # if name in ["Name", "Team"]:
        #     continue
        _input = keras.Input(shape=(1, ), name=name, dtype=spec.dtype)
        if name in cat_cols:
            encoder = make_embedding(data, name)
        else:
            encoder = make_normalization(data, name)
        # print(name, encoder)
        encoded_input = encoder(_input)
        all_inputs.append(_input)
        encoded_features[name] = encoded_input
    return all_inputs, encoded_features


In [None]:
def test_make_inputs_and_encoded_features():
    df = pd.DataFrame({
        'a': list('abcdefgh'),
        'label': [i % 2 for i in range(8)]
    })
    ds = make_dataset(df)

    feature_ds = ds.map(lambda x, _: x['a'])
    lookup = StringLookup()
    lookup.adapt(feature_ds)
    input_dims = len(lookup.get_vocabulary())
    embedding = lambda x: Embedding(input_dim=input_dims, output_dim=2)(lookup(x))

    # embedding works as expected on a numpy array.
    assert embedding(df['a'].values).shape == (8, 2), "This embedding doesn't work on raw data."

    inputs = keras.Input((1,), name='a', dtype=tf.string)
    # x = embedding(inputs)
    x = Embedding(input_dim=input_dims, output_dim=2)(lookup(inputs))
    x = keras.layers.Flatten()(x)
    out = Dense(1, 'sigmoid')(x)

    model = keras.Model(inputs, out)
    model({'a': df.a.values})
    model.summary()
    model.predict(ds)  # !! fails with error


test_make_inputs_and_encoded_features()

# emb(list('abczxy'))

In [None]:

cat_cols = [col for col in df.select_dtypes('object').columns] + ['Season', 'playerid']

train, val = split_data(df, 42)
train_ds = make_dataset(train)
val_ds = make_dataset(val, shuffle=False)

In [None]:
all_inputs, all_features = make_inputs_and_encoded_features(train_ds, cat_cols)


In [None]:

def model_topology(all_features, cat_cols):
    flat_categorical = [keras.layers.Flatten()(all_features[x]) for x in cat_cols if x in all_features]
    non_cat = [all_features[key] for key in all_features if key not in cat_cols]
    x = keras.layers.Concatenate()(flat_categorical + non_cat)
    x = Dense(1, activation='sigmoid')(x)
    return x

model = keras.Model(all_inputs, model_topology(all_features, cat_cols))


In [None]:
model.compile(keras.optimizers.Adam(learning_rate=.0031), keras.losses.BinaryCrossentropy(), metrics=[keras.metrics.BinaryAccuracy()])

In [None]:
model.fit(val_ds, epochs = 30, validation_data=val_ds)

In [None]:
val['label'].values

In [None]:
from sklearn.metrics import confusion_matrix
from sklearn.calibration import CalibrationDisplay
y_true = val['label'].values.astype(float)
y_hat = model.predict(val_ds).reshape(-1)
confusion_matrix(y_true, y_hat > .5)
CalibrationDisplay.from_predictions(y_true, y_hat)