In [1]:
import sys
from datetime import datetime
from pathlib import Path

import numpy as np
import pandas as pd
from sklearn import preprocessing

sys.path.append("../")

In [2]:
from skimpy import skim
from tqdm import tqdm

In [None]:
from loguru import logger

log_dir = Path.cwd().parent / "logs"
logger.add(log_dir / "log.log")

In [4]:
import warnings

warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=UserWarning)

## Prepare the data

In [None]:
MIN_MATCHES = 250
LAST_TRAIN_MATCH_DATE = datetime(2023, 7, 31)
this_dir = Path().resolve()
DATA_DIR = this_dir.parent / "data"
print(DATA_DIR)
df = pd.read_csv(DATA_DIR / "ml_rows.csv", parse_dates=["start_date"])

#### Separate train/test sets

test data is the more recent set of matches - want to see how well the clusters produced match the outcomes from the data used in clustering

In [6]:
df_train = {
    inns: df[(df.start_date <= LAST_TRAIN_MATCH_DATE) & (df.match_number > MIN_MATCHES) & (df.innings == inns)]
    for inns in range(2)
}
df_test = {
    inns: df[(df.start_date > LAST_TRAIN_MATCH_DATE) & (df.match_number > MIN_MATCHES) & (df.innings == inns)]
    for inns in range(2)
}

train_dfs = {
    inns: {over: df_train[inns].loc[(df_train[inns]["over"] == over)] for over in range(20)} for inns in range(2)
}

In [None]:
skim(df)

In [8]:
def extract_values(d, values: list[str]):
    return d[values]


X_values = [
    "wickets_down",
    "run_rate",
    "req_rate",
    "batter_in_first_10",
    "batter_strike_rate",
    "batter_dismissal_prob",
    "batter_dismissal_vs_style",
    "bowler_economy",
    "bowler_wicket_prob",
    "bowler_wicket_vs_style",
    "bowler_wide_noball_rate",
]

y_values = [
    "outcome",
]

In [9]:
SCALER_CLASSES = {
    "quantile": preprocessing.QuantileTransformer(),
    "standard": preprocessing.StandardScaler(),
    "gaussian": preprocessing.QuantileTransformer(output_distribution="normal"),
    "normalizer": preprocessing.Normalizer(),
}


def prepare_training_set(inns, over, scaler: str) -> np.ndarray:
    X = extract_values(train_dfs[inns][over], X_values)
    scaler_obj = SCALER_CLASSES[scaler]
    fitted_scaler = scaler_obj.fit(X)
    return fitted_scaler.transform(X)


In [10]:
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans

In [11]:
from IPython.display import Markdown, display

## Estimate most effective numbers of clusters per innings, over

In [None]:
from kneed import KneeLocator


def test_n(n: int, X) -> float:
    kmeans = KMeans(n_clusters=n)
    kmeans.fit(X)
    return kmeans.inertia_


def find_knee(inns, over):
    X = prepare_training_set(inns, over, "normalizer")
    clusters_range = range(11, 31, 1)
    sse = [test_n(n, X) for n in clusters_range]
    kneedle = KneeLocator(clusters_range, sse, curve="convex", direction="decreasing")
    return kneedle.knee


knees = {}

with tqdm(total=40) as pbar:
    for inns in range(2):
        knees[inns] = []
        for over in range(20):
            res = find_knee(inns, over)
            knees[inns].append(res)
            pbar.update()

for inns in range(2):
    print(inns, ",".join([f"{v}" for v in knees[inns]]))
    print(f"{inns} mean: {np.mean(knees[inns])}")

## Build clusters

Given a cluster count (in `knees`) fit each over's data

In [None]:
from collections import Counter

from hdbscan import flat

clfs: dict[int, list] = {0: [], 1: []}

logger.info("starting with estimated clusters...")
for scaler in [
    "normalizer",
]:  # ["standard", "quantile", "gaussian", "normalizer"]:
    for inns in range(2):
        for over in range(20):
            X_scaled = prepare_training_set(inns, over, scaler)
            n_clusters = knees[inns][over]
            clf = flat.HDBSCAN_flat(X_scaled, n_clusters=n_clusters)
            clf.fit(X_scaled)
            clfs[inns].append(clf)
            labels = clf.labels_
            clustered = labels >= 0
            counts = Counter(labels)
            total_clusters = np.max(labels) + 1
            coverage = np.sum(clustered) / X_scaled.shape[0]
            total_clusters = np.max(labels) + 1
            logger.info(
                f"{inns} {over} {n_clusters} {total_clusters} {coverage:.2%} pts:{len(labels)} noisy:{counts[-1]} {[v for k, v in counts.items() if k >= 0]}"
            )
logger.info("all done!")

## Examine clustering outcomes

Run a T-SNE plot for the overs in an innings to get an impression of what kind of results we are getting. Use a sample to reduce execution time.

In [14]:
import seaborn as sns
from sklearn.manifold import TSNE

sns.set_theme(style="darkgrid", font_scale=0.5)

In [None]:
fig, axs = plt.subplots(4, 5, figsize=(15, 12))

inns = 0

for over in tqdm(range(20)):
    data = train_dfs[inns][over].sample(10000)

    X = extract_values(data, X_values)
    scaler_obj = SCALER_CLASSES["normalizer"]
    fitted_scaler = scaler_obj.fit(X)
    Xp = fitted_scaler.transform(X)

    tsne = TSNE(n_components=2, metric="euclidean", verbose=0, perplexity=40, n_iter=300)
    results = tsne.fit_transform(Xp)

    n_clusters = knees[inns][over]
    # clf = flat.HDBSCAN_flat(X, n_clusters=n_clusters)
    clf = clfs[inns][over]
    clf.fit(Xp)
    clfs[inns].append(clf)
    labels = clf.labels_

    df_res = pd.DataFrame()
    df_res["x"] = results[:, 0]
    df_res["y"] = results[:, 1]
    df_res["c"] = labels

    r = over % 5
    c = over // 5
    ax = axs[c, r]
    sns.scatterplot(
        x="x",
        y="y",
        data=df_res,
        hue="c",
        palette=sns.color_palette("hls", 10),
        legend=False,
        alpha=0.3,
        ax=ax,
        s=10,
    ).set(
        xticklabels=[],
        yticklabels=[],
        xlabel=None,
        ylabel=None,
        title=f"over {over}",
    )


## Apply clusterings - extract probabilities

In [16]:
from collections import defaultdict

In [None]:
inns, over = 0, 8
clf = clfs[inns][over]
Xt = extract_values(train_dfs[inns][over], X_values)
yt = extract_values(train_dfs[inns][over], y_values)
scaled_Xt = SCALER_CLASSES["normalizer"].fit(Xt).transform(Xt)
clf.fit(scaled_Xt)
labels = clf.labels_
outcomes = yt["outcome"]
outcomes.reset_index(drop=True, inplace=True)
print(len(labels), len(outcomes))

res = {k: defaultdict(int) for k in np.unique(labels)}
for i, label in enumerate(labels):
    res[label][outcomes[i]] += 1


In [18]:
OUTCOME_DESCS = [
    "label",
    "tot",
    "wide",
    "noball",
    "bye",
    "lebbye",
    "wicket",
    "dot",
    "single",
    "two",
    "three",
    "four",
    "six",
]

In [None]:
def title():
    return f"## Innings {inns}, Over {over}"


def table_hdr():
    return "\n".join(
        [
            "| " + " | ".join(OUTCOME_DESCS) + " |",
            "| " + " | ".join("---:" for _ in OUTCOME_DESCS) + " |",
        ]
    )


def table_line(k, v):
    tot = sum(v.values())
    return (
        f"| {k:-2d} | {tot:-6d} | "
        + " | ".join(f"{v[i] / tot:8.2%}" for i in range(np.unique(outcomes).shape[0]))
        + " |"
    )


body_lines = "\n".join(table_line(k, v) for k, v in res.items())

display(Markdown(title() + "\n" + table_hdr() + "\n" + body_lines))