In [None]:
import numpy as np
import polars as pl
import polars.selectors as cs
import scipy.stats as stats
from numpy.typing import NDArray
from sklearn.naive_bayes import GaussianNB

pl.Config.set_tbl_rows(n=10)
pl.Config.set_fmt_str_lengths(n=100);

In [None]:
df_m = (
    (pl.scan_parquet("data/aoe2/matches/*/*.parquet"))
    .filter(pl.col("raw_match_type").is_between(6, 9))
    .select(pl.all().shrink_dtype())
)
df_m_c, profile = df_m.profile()
print(f"Match data is about {df_m_c.estimated_size('gb'):.2f} GB")
df_m_c

In [None]:
df_p = pl.scan_parquet("data/aoe2/player/*/*.parquet")
df_p_c = (
    df_p.join(other=df_m_c.lazy(), on="game_id", how="semi")
    .collect()
    .select(pl.all().shrink_dtype())
)
print(f"Player data is about {df_p_c.estimated_size('gb'):.2f} GB")
df_p_c

In [None]:
rating_diffs = (
    df_p_c.join(
        other=df_m_c.filter(
            raw_match_type=6  # For filtering a column on equality, Polars offers some syntactical sugar
        ),  # Restrict the data to 1-vs-1 games to get a more accurate image
        on="game_id",
        how="inner",
    )
    .select(
        "match_rating_diff", "winner"
    )  # We only want the rating difference and the winner-flag
    .drop_nulls()  # Placement matches have players with no rating calculated yet, can be removed
)
rating_diffs

In [None]:
gnb = GaussianNB()
gnb.fit(X=rating_diffs.select("match_rating_diff"), y=rating_diffs.select("winner"))
gnb.score(X=rating_diffs.select("match_rating_diff"), y=rating_diffs.select("winner"))

In [None]:
winning_rating_diffs = pl.DataFrame(
    pl.int_range(start=-1000, end=1000 + 5, step=5, eager=True)
    .cast(pl.Float64)
    .alias("Rating diff")
).with_columns(
    pl.col("Rating diff")
    .map_batches(lambda x: gnb.predict_proba(X=np.atleast_2d(x.to_numpy()).T)[:, 1])
    .alias("Win chance")
)
winning_rating_diffs

In [None]:
winning_rating_diffs.plot.line(x="Rating diff", y="Win chance", width=800, height=800)

In [None]:
test = pl.DataFrame({"Rating diff": [0.0]}).sort("Rating diff")
test.join_asof(
    other=winning_rating_diffs.sort("Rating diff"), on="Rating diff", strategy="nearest"
).select("Win chance").item()

In [None]:
# fmt: off
belcup_player_names = [
    "NeutralWouter", "Jolatis", "TrCL.Welcometorapture", "[ARKP]Anthoxx", "[SMURFS] Bernoob", "BloodyNival", "Caféglacé", "Cooper", "flem.Dieterkiller", "[mPzzA] Fluxbastia", "[Argus] Grigor Grinta", "KEVAIN_25", "[Argus] Kevin Hubris", "KTT TheKiiwii", "PAP.KingBug", "KnightofSaintJohn", "[BraBros] Koen", "Koztikoz", "[TSU] Midu", "Mighty.Plumber", "AOKI_MIPS", "[Argus] Nietblom", "Pairu", "PieterDela", "The_Damn_Bugum", "SmellyLeopard", "Squatting Slav in Tracksuit", "The Barbarian_BE", "flem.TheLastBender", "the_belgian", "Tusky", "Vico", "Royal Yakuza",
]
belcup_player_ids = [
    4690424, 252907, 4460059, 1613746, 1867046, 2931610, 1784887, 443371, 2541027, 120364, 1908961, 2416466, 1140888, 1141095, 1957729, 2303174, 4016370, 333933, 2337644, 2358049, 11300774, 227894, 11662735, 3040509, 213305, 372754, 9684159, 819301, 2034401, 3379379, 238628, 5249964, 6434030,
]
# fmt: on
players_belcup = pl.DataFrame(
    {"name": belcup_player_names, "profile_id": belcup_player_ids}
).select(pl.all().shrink_dtype())
players_belcup

In [None]:
df_belcup = (
    df_p_c.join(other=players_belcup, on="profile_id", how="inner")
    .drop("profile_id")
    .join(
        other=df_m_c.filter(raw_match_type=6).select("game_id", "started_timestamp"),
        on="game_id",
        how="inner",
    )
    .select("name", "new_rating", "started_timestamp")
)
df_belcup

In [None]:
belcup_ratings = (
    df_belcup.group_by("name")
    .agg(
        pl.max("new_rating").alias("Maximum ELO"),
        pl.col("new_rating").sort_by("started_timestamp").last().alias("Current ELO"),
        pl.col("started_timestamp").sort().last().alias("Current ELO timestamp"),
    )
    .with_columns(pl.mean_horizontal("Maximum ELO", "Current ELO").alias("Seeding ELO"))
    .sort("Seeding ELO", descending=True)
).with_row_index(name="Seeding spot", offset=1)
belcup_ratings

In [None]:
df_first_round = pl.DataFrame(
    {
        "Player 1": [
            "[BraBros] Koen",
            "KnightofSaintJohn",
            "SmellyLeopard",
            "Caféglacé",
            "TrCL.Welcometorapture",
            "the_belgian",
        ],
        "Player 2": [
            "Pairu",
            "Squatting Slav in Tracksuit",
            "PAP.KingBug",
            "ADCuitA",
            "NeutralWouter",
            "KEVAIN_25",
        ],
    }
)
predict_first_round = (
    df_first_round.join(
        other=belcup_ratings.select(
            pl.col("name").alias("Player 1"),
            pl.col("Maximum ELO").alias("ELO 1 Maximum"),
        ),
        on="Player 1",
        how="left",
    )
    .join(
        other=belcup_ratings.select(
            pl.col("name").alias("Player 2"),
            pl.col("Maximum ELO").alias("ELO 2 Maximum"),
        ),
        on="Player 2",
        how="left",
    )
    .with_columns(
        cs.numeric().fill_null(1000)  # Need a default elo when it isn't available
    )
    .with_columns(
        (pl.col("ELO 1 Maximum") - pl.col("ELO 2 Maximum")).alias("Rating diff"),
    )
    .with_columns(pl.col(pl.UInt16).cast(pl.Float64))
    .sort("Rating diff")
)
predict_first_round

In [None]:
predict_first_round = predict_first_round.join_asof(
    other=winning_rating_diffs.sort("Rating diff"),
    on="Rating diff",
    strategy="nearest",
)
predict_first_round

In [None]:
# Prepares a LazyFrame listing all possible winning end results for
# a given Best-Of series.
def create_bestof_records(BO: int) -> pl.LazyFrame:
    N = (BO // 2) + 1
    df = pl.DataFrame({"Wins": list(range(N + 1))}).lazy()
    return (
        df.join(other=df, how="cross")
        .rename({"Wins": "Wins Player 1", "Wins_right": "Wins Player 2"})
        .filter(
            ((pl.col("Wins Player 1") >= N) | (pl.col("Wins Player 2") >= N))
            & (pl.col("Wins Player 1") != pl.col("Wins Player 2"))
        )
    )


# Another UDF, where the entire column is propagated and can be calculated in a vectorized way
def calculate_win_chance(df: pl.Series) -> NDArray:
    p1wins = df.struct.field("Wins Player 1").to_numpy()
    p2wins = df.struct.field("Wins Player 2").to_numpy()
    p = df.struct.field("Win chance").to_numpy()
    p = np.where(p2wins > p1wins, 1 - p, p)
    wins = np.where(p2wins > p1wins, p2wins, p1wins) - 1
    total_games = (p1wins + p2wins) - 1
    return stats.binom.pmf(wins, total_games, p) * p


BO = 5
N = (BO // 2) + 1

prediction = (
    predict_first_round.lazy()
    .join(other=create_bestof_records(BO=BO), how="cross")
    .with_columns(
        pl.struct(
            "Wins Player 1", "Wins Player 2", "Win chance"
        )  # Passing multiple columns at once to a UDF can be done by creating a struct column
        .map_batches(calculate_win_chance)
        .alias("Score probability"),
        pl.when(pl.col("Wins Player 1") == N)
        .then(
            "Player 1"
        )  # In general, strings are interpreted as column names, not literals
        .otherwise("Player 2")
        .alias("Winner"),
    )
    .with_columns(
        pl.col("Score probability").sum().over("Winner").alias("Winner probability")
    )
    .filter(
        pl.col("Score probability")
        == pl.col("Score probability").max().over("Player 1", "Player 2")
    )  # The combination of Player 1 and Player 2 determines each match-up uniquely
)
prediction.collect()