In [68]:
import pandas as pd
import numpy as np
import pymc as pm
import altair as alt
import arviz as az
import pymc_extras as pmx
import pytensor.tensor as pt

t_dict = {}

In [None]:
# Overall team stats
stats = pd.read_csv('./data/basketball_results/team_statistics.csv').rename(columns={'Unnamed: 0': 'team'})
stats

In [None]:
# Granular Match results
results = pd.read_csv('./data/basketball_results/all_matches.csv')
results.date_time = pd.to_datetime(results.date_time)
results['diff'] = results.home_score - results.away_score
results.head()

In [None]:
# Plot home/away score differences over time

base = alt.Chart().mark_circle().encode(
    x=alt.X('yearmonth(date_time):T').title(None).axis(grid=False, labelAngle=270, tickCount=2, format='%m-%y'),
    y=alt.Y('diff:Q').title('score difference').axis(grid=False).scale(domainMid=0), 
    color=alt.when(alt.datum.diff>0).then(alt.value("steelblue")).otherwise(alt.value("red")),
    tooltip=[alt.Tooltip('home_team:N', title='home team:'), alt.Tooltip('away_team:N', title='away team:'), alt.Tooltip('diff:Q', title='score diff:')]
    #facet=alt.Facet('home_team:N', columns=4).title(None)
).properties(height=80, width=80)

line = alt.Chart(pd.DataFrame({'y': [1]})).mark_rule(size=0.5, strokeDash=[2,2]).encode(y='y')

home = alt.layer(base, line, data=results).facet('home_team:N', columns=5)

base = alt.Chart().mark_circle().transform_calculate(
    a_diff='0-datum.diff'
).encode(
    x=alt.X('yearmonth(date_time):T').title(None).axis(grid=False, labelAngle=270, tickCount=2, format='%m-%y'),
    y=alt.Y('a_diff:Q').title('score difference').axis(grid=False).title(None).scale(domainMid=0), 
    color=alt.when(alt.datum.a_diff>0).then(alt.value("steelblue")).otherwise(alt.value("red")),
    tooltip=[alt.Tooltip('home_team:N', title='home team:'), alt.Tooltip('away_team:N', title='away team:'), alt.Tooltip('diff:Q', title='score diff:')]
    #facet=alt.Facet('home_team:N', columns=4).title(None)
).properties(height=80, width=80)

line = alt.Chart(pd.DataFrame({'y': [1]})).mark_rule(size=0.5, strokeDash=[2,2]).encode(y='y')

away = alt.layer(base, line, data=results).facet('away_team:N', columns=5)

(home | away).configure(font='SF Compact Rounded')

In [None]:
# Get list of teams
teams = list(set(results["home_team"]).union(set(results["away_team"])))
n_teams = len(teams)

# Encode teams as indices
team_idx = {team: i for i, team in enumerate(teams)}
results["home_idx"] = results["home_team"].map(team_idx)
results["away_idx"] = results["away_team"].map(team_idx)

# Add binary outcome variable
results['home_win'] = results['home_score'] > results['away_score']

results[['home_team','home_idx','home_score','away_team','away_idx','away_score','diff','home_win']].head()

In [None]:
# Win/loss model with team strenghts (ELO)
with pm.Model(coords={"teams": teams}) as model:
    # Latent strength for each team
    team_strengths = pm.ZeroSumNormal("team_strengths", sigma=3, dims="teams")
    
    # Expected log-odds of home win
    mu = (team_strengths[results["home_idx"].values] 
          - team_strengths[results["away_idx"].values])
    
    # Observed win/loss
    pm.Bernoulli("score_diff_obs", p=pm.math.sigmoid(mu), observed=results["home_win"])
    
    # Sampling
    trace = pm.sample(accept=0.9, draws=2000) #, nuts_sampler='nutpie')
    pm.compute_log_likelihood(trace, extend_inferencedata=True)
    pm.sample_posterior_predictive(trace, extend_inferencedata=True)

print(pm.summary(trace))
pm.plot_trace(trace)


In [None]:
# Posterior analysis
az.plot_forest(trace, var_names=["team_strengths"], combined=True, textsize=10)

### 👇 Improve!

In [None]:
# Baseline score difference model with Normal likelihood

with pm.Model(coords={"teams": teams}) as model:
    # Latent strength for each team
    team_strengths = pm.Normal("team_strengths", mu=0, sigma=30, dims="teams")
    
    # Expected score difference
    mu = (team_strengths[results["home_idx"].values] 
          - team_strengths[results["away_idx"].values])
    
    # Normal likelihood
    sigma = pm.HalfNormal("sigma", sigma=30)
    score_diff_obs = pm.Normal("score_diff_obs", 
                                mu=mu, sigma=sigma, 
                                observed=results["diff"].values)
                
    # Sampling
    trace = pm.sample(accept=0.9, draws=2000) #, nuts_sampler='nutpie')
    pm.compute_log_likelihood(trace, extend_inferencedata=True)
    pm.sample_posterior_predictive(trace, extend_inferencedata=True)

t_dict['normal_model'] = trace # Save the model trace for model comparison

az.plot_forest(trace, var_names=["team_strengths"], combined=True, textsize=10)

In [None]:
# Use SkewNormal for score difference

with pm.Model(coords={"teams": teams}) as model:
    # Latent strength for each team
    team_strengths = pm.Normal("team_strengths", mu=0, sigma=20, dims="teams")
    
    # Skewness parameter for SkewNormal
    alpha = pm.Normal("alpha", mu=0, sigma=1)
    
    # Expected score difference
    mu = (team_strengths[results["home_idx"].values] 
          - team_strengths[results["away_idx"].values])
    
    # SkewNormal likelihood
    sigma = pm.HalfNormal("sigma", sigma=30)
    score_diff_obs = pm.SkewNormal("score_diff_obs", 
                                    mu=mu, sigma=sigma, alpha=alpha, 
                                    observed=results["diff"].values)
    
    # Sampling
    trace = pm.sample(accept=0.9, draws=2000) #, nuts_sampler='nutpie')
    pm.compute_log_likelihood(trace, extend_inferencedata=True)
    pm.sample_posterior_predictive(trace, extend_inferencedata=True)

t_dict['skewnormal_model'] = trace

az.plot_forest(trace, var_names=["team_strengths"], combined=True, textsize=10)

In [None]:
# Add home advantage
with pm.Model(coords={"teams": teams}) as model:
    # Latent strength for each team
    team_strengths = pm.Normal("team_strengths", mu=0, sigma=20, dims="teams")
    
    # Home advantage parameter
    home_advantage = pm.TruncatedNormal("home_advantage", mu=1, sigma=2, lower=0, dims="teams")

    # Skewness parameter for SkewNormal
    alpha = pm.Normal("alpha", mu=0, sigma=1)
    
    # Expected score difference
    mu = (team_strengths[results["home_idx"].values] 
          - team_strengths[results["away_idx"].values] 
          + home_advantage[results['home_idx'].values])
    
    # SkewNormal likelihood
    sigma = pm.HalfNormal("sigma", sigma=30)
    score_diff_obs = pm.SkewNormal("score_diff_obs", mu=mu, sigma=sigma, alpha=alpha, observed=results["diff"].values)
    
    # Sampling
    trace = pm.sample(accept=0.9, draws=2000) #, nuts_sampler='nutpie')
    pm.compute_log_likelihood(trace, extend_inferencedata=True)
    pm.sample_posterior_predictive(trace, extend_inferencedata=True)

t_dict['skewnormal_hadv_model'] = trace

az.plot_forest(trace, var_names=["team_strengths"], combined=True, textsize=10)

In [None]:
print(f"Mean home court advantage: {trace.posterior.home_advantage.mean().values:.2f}")
pm.plot_forest(trace, var_names='home_advantage', combined=True, textsize=10)

In [None]:
# Check in: let's compare the models
c = az.compare(t_dict)
print(c)
az.plot_compare(c, textsize=10)

In [None]:
# For this you will need to conda install pymc_extras

from pymc_extras.distributions import Skellam

# Model scores separately
with pm.Model(coords={"teams": teams}) as model:
    # Latent strength for each team
    team_strengths = pm.Normal("team_strengths", mu=60, sigma=30, dims="teams")
    
    # Home advantage parameter
    home_advantage = pm.TruncatedNormal("home_advantage", mu=1, sigma=2, lower=0)#, dims="teams")

    # Poisson distribution for home and away scores
    lambda_home = pm.math.maximum(1, team_strengths[results["home_idx"].values] + home_advantage)
    lambda_away = pm.math.maximum(1, team_strengths[results["away_idx"].values])

    # Observed scores
    home_score = pm.Poisson("home_score", lambda_home, observed=results["home_score"])
    away_score = pm.Poisson("away_score", lambda_away, observed=results["away_score"])

    # Calculate score difference and save it to posterior
    score_diff = pm.Deterministic("score_diff", home_score - away_score)
    
    trace = pm.sample() 

with model:
    # Likelihood: The score difference follows a Skellam distribution (diff of two poisson distributions)
    # We are adding it here as a pseudo-observation to get it in posterior so we can compare it with other models
    score_diff_obs = Skellam("score_diff_obs", lambda_home, lambda_away, observed=results["diff"].values)
    pm.compute_log_likelihood(trace, extend_inferencedata=True)
    pm.sample_posterior_predictive(trace, extend_inferencedata=True)
    
t_dict['poisson_model'] = trace

az.plot_forest(trace, var_names=["team_strengths"], combined=True, textsize=10)

In [None]:
from pymc_extras.distributions import Skellam

# Model scores separately
with pm.Model(coords={"teams": teams}) as model:
    # Latent strength for each team
    team_strengths = pm.Normal("team_strengths", mu=60, sigma=30, dims="teams")

    # Defence strength for each team, i.e. how many points they deny to the opposing team
    team_defence = pm.HalfNormal("team_defence", sigma=30, dims="teams")
    
    # Home advantage parameter
    home_advantage = pm.TruncatedNormal("home_advantage", mu=1, sigma=2, lower=0)#, dims="teams")
    
    # Poisson distribution for home and away scores
    lambda_home = pm.math.maximum(1, team_strengths[results["home_idx"].values] + home_advantage
                                    - team_defence[results["away_idx"].values])
    lambda_away = pm.math.maximum(1, team_strengths[results["away_idx"].values]
                                    - team_defence[results["home_idx"].values])

    # Observed scores
    pm.Poisson("home_score", lambda_home, observed=results["home_score"])
    pm.Poisson("away_score", lambda_away, observed=results["away_score"])
    
    trace = pm.sample() #nuts_sampler='nutpie')

with model:
    # Likelihood: The score difference follows a Skellam distribution (diff of two poisson distributions)
    # We are adding it here as a pseudo-observation to get it in posterior so we can compare it with other models
    score_diff_obs = Skellam("score_diff_obs", lambda_home, lambda_away, observed=results["diff"].values)
    pm.compute_log_likelihood(trace, extend_inferencedata=True)
    pm.sample_posterior_predictive(trace, extend_inferencedata=True)
    
t_dict['poisson_defence_model'] = trace

az.plot_forest(trace, var_names=["team_strengths"], combined=True, textsize=10)

In [None]:
pm.plot_trace(trace)

In [None]:
# Multiplicative model for Poisson
with pm.Model(coords={"teams": teams}) as model:
    # Latent strength for each team
    team_offence = pm.Normal("team_offence", mu=np.log(60), sigma=5, dims="teams")
    team_defence = pm.Normal("team_defence", sigma=3, dims="teams")
    
    # Home advantage 
    home_advantage = pm.TruncatedNormal("home_advantage", mu=1, sigma=1, lower=0)#, dims="teams")

    # Another option for defense - as a multiplier for the other team's scoring rate
    lambda_home = pm.math.exp(team_offence[results["home_idx"].values] + home_advantage
                                - team_defence[results["away_idx"].values])
    lambda_away = pm.math.exp(team_offence[results["away_idx"].values] 
                                - team_defence[results["home_idx"].values])

    pm.Poisson("home_score", lambda_home, observed=results["home_score"])
    pm.Poisson("away_score", lambda_away, observed=results["away_score"])
    
    trace = pm.sample(nuts_sampler='nutpie')

with model:
    # Likelihood: The score difference follows a Skellam distribution
    score_diff_obs = Skellam("score_diff_obs", lambda_home, lambda_away, observed=results["diff"].values)
    pm.compute_log_likelihood(trace, extend_inferencedata=True)
    pm.sample_posterior_predictive(trace, extend_inferencedata=True)

t_dict['poisson_mult'] = trace

In [None]:
# Model with matchup-specific effects

# Create a separate dimension name for the opponent teams
coords = {"teams": teams, "opp_teams": teams} 
with pm.Model(coords=coords) as model:
    # Latent strength for each team
    team_strengths = pm.Normal("team_strengths", mu=0, sigma=20, dims="teams")
    
    # Home advantage parameter
    home_advantage = pm.TruncatedNormal("home_advantage", mu=1, sigma=2, lower=0, dims="teams")

    # Skewness parameter for SkewNormal
    alpha = pm.Normal("alpha", mu=0, sigma=7)
    
    # Matchup effect - note the different dimension names
    matchup_effect = pm.Normal("matchup_effect", mu=0, sigma=10, dims=("teams", "opp_teams"))
    
    # Expected score difference with matchup effect
    mu = (team_strengths[results["home_idx"].values] 
          - team_strengths[results["away_idx"].values] 
          + home_advantage[results["home_idx"].values]
          + matchup_effect[results["home_idx"].values, results["away_idx"].values])
    
    # Likelihood
    sigma = pm.HalfNormal("sigma", sigma=10)
    score_diff_obs = pm.SkewNormal("score_diff_obs", mu=mu, sigma=sigma, alpha=alpha, observed=results["diff"].values)

    #w = pm.Beta("w", alpha=2, beta=2)  # Mixture weight
    #component1 = pm.SkewNormal.dist(mu=mu, sigma=sigma, alpha=alpha)
    #component2 = pm.SkewNormal.dist(mu=mu, sigma=sigma*2, alpha=alpha)  # Wider variance component
    #score_diff_obs = pm.Mixture("score_diff_obs", w=[w, 1-w], comp_dists=[component1, component2], observed=results["diff"].values)

    # Sampling
    trace = pm.sample(draws=2000, nuts_sampler='nutpie')
    pm.compute_log_likelihood(trace, extend_inferencedata=True)
    pm.sample_posterior_predictive(trace, extend_inferencedata=True)

t_dict['interactions_model'] = trace

pm.plot_trace(trace)

In [None]:
# And now ... drumroll.. the final model comparison:

comparison = az.compare(t_dict,var_name="score_diff_obs")
print(comparison)
az.plot_compare(comparison, textsize=10)