<a href="https://colab.research.google.com/github/samaneh-m/Modeling-association-football-scores/blob/main/Dixon_Coles.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import pandas as pd
import numpy as np
from math import factorial
from scipy.optimize import minimize
import matplotlib.pyplot as plt
import seaborn as sns


In [4]:
import pandas as pd

df = pd.read_csv("bundesliga_all_seasons_clean.csv")

df.head()


Unnamed: 0,Date,HomeTeam,AwayTeam,FTHG,FTAG,B365H,B365D,B365A,Season
0,2020-09-18,Bayern Munich,Schalke 04,8,0,1.1,11.0,21.0,2020_2021
1,2020-09-19,Dortmund,M'gladbach,3,0,1.6,4.33,5.0,2020_2021
2,2020-09-19,Ein Frankfurt,Bielefeld,1,1,1.61,4.2,5.25,2020_2021
3,2020-09-19,FC Koln,Hoffenheim,2,3,2.6,3.5,2.6,2020_2021
4,2020-09-19,Stuttgart,Freiburg,2,3,2.25,3.5,3.1,2020_2021


In [5]:
LEAGUES = {
    "bundesliga": ("bundesliga_all_seasons_clean.csv", "Bundesliga"),
}


In [6]:
def load_data():
    df = pd.read_csv("bundesliga_all_seasons_clean.csv")
    df = df.sort_values("Date").reset_index(drop=True)
    print(f"Loaded Bundesliga (all seasons): {len(df)} matches")
    return df


In [7]:
def split_params(params):
    n = (len(params) - 2) // 2
    alpha = params[:n]
    beta = params[n:2*n]
    gamma = params[2*n]
    rho = params[2*n + 1]
    return alpha, beta, gamma, rho

In [8]:
def expected_goals(i, j, params):
    alpha, beta, gamma, _ = split_params(params)
    lam = np.exp(alpha[i] - beta[j] + gamma)
    mu = np.exp(alpha[j] - beta[i])
    return lam, mu


In [9]:
def tau_correction(x, y, lam, mu, rho):
    if x == 0 and y == 0:
        return 1 - lam * mu * rho
    if x == 0 and y == 1:
        return 1 + lam * rho
    if x == 1 and y == 0:
        return 1 + mu * rho
    if x == 1 and y == 1:
        return 1 - rho
    return 1


In [10]:
def dc_log_likelihood_single(x, y, i, j, params):
    lam, mu = expected_goals(i, j, params)
    _, _, _, rho = split_params(params)

    tau = tau_correction(x, y, lam, mu, rho)
    if tau <= 0:
        return -1e15  # big penalty (or return -np.inf)

    log_px = x * np.log(lam) - lam - np.log(factorial(x))
    log_py = y * np.log(mu)  - mu - np.log(factorial(y))

    return log_px + log_py + np.log(tau)


In [11]:
def dc_log_likelihood_full(params, data, team_index, xi):
    """
    TOTAL NEGATIVE weighted log-likelihood (Dixon–Coles with time decay).
    """
    n_matches = len(data)

    ll_sum = 0.0
    for k, (_, row) in enumerate(data.iterrows()):
        i = team_index[row["HomeTeam"]]
        j = team_index[row["AwayTeam"]]
        t_diff = n_matches - k - 1
        weight = np.exp(-xi * t_diff)

        ll = dc_log_likelihood_single(
            int(row["FTHG"]),
            int(row["FTAG"]),
            i,
            j,
            params
        )
        ll_sum += weight * ll

    return -ll_sum


In [12]:

def fit_dixon_coles_model(data, team_index, xi):
    """
    Fit Dixon & Coles model WITH time-decay weighting.
    """
    n_teams = len(team_index)
    initial_params = np.zeros(2 * n_teams + 2)
    n_params = 2 * n_teams + 2

    bounds = [(None, None)] * n_params
    bounds[-1] = (-0.5, 0.5)  # rho bound

    result = minimize(
        dc_log_likelihood_full,
        initial_params,
        args=(data, team_index, xi),
        method="L-BFGS-B",
        bounds=bounds
    )
    if not result.success:
        raise RuntimeError(
            f"Optimization failed: {result.message}"
    )
    params = result.x.copy()
    alpha, beta, gamma, rho = split_params(params)

    m = np.mean(alpha)
    alpha -= m
    beta += m

    params[:n_teams] = alpha
    params[n_teams:2*n_teams] = beta
    result.x = params
    return result



In [13]:
def match_score_prob_matrix(i, j, params, max_goals=10):
    alpha, beta, gamma, rho = split_params(params)
    lam = np.exp(alpha[i] - beta[j] + gamma)
    mu = np.exp(alpha[j] - beta[i])

    P = np.zeros((max_goals, max_goals))
    for x in range(max_goals):
        for y in range(max_goals):
            px = lam**x * np.exp(-lam) / factorial(x)
            py = mu**y * np.exp(-mu) / factorial(y)
            P[x, y] = px * py * tau_correction(x, y, lam, mu, rho)
    return P


In [14]:
def predict_match(i, j, params):
    P = match_score_prob_matrix(i, j, params)
    ph = np.sum(P[np.triu_indices_from(P, 1)])
    pd = np.trace(P)
    pa = np.sum(P[np.tril_indices_from(P, -1)])
    s = ph + pd + pa
    return {"H": ph/s, "D": pd/s, "A": pa/s}


In [15]:
def bookmaker_probabilities(row):
    """
    Convert Bet365 odds to implied probabilities (no margin removal).
    """
    pH = 1.0 / row["B365H"]
    pD = 1.0 / row["B365D"]
    pA = 1.0 / row["B365A"]

    s = pH + pD + pA
    return {"H": pH / s, "D": pD / s, "A": pA / s}


In [16]:
def realised_return(actual, odds):
    """
    Realised return for a 1-unit bet.
    """
    if actual:
        return odds - 1.0
    else:
        return -1.0


In [17]:
def table_outcome_probabilities_2025(test_data, team_index, params, n_matches=10):
    """
    Create a Dixon–Coles Table 6 style output:
    match outcome probabilities for selected matches.
    """
    rows = []

    for _, r in test_data.head(n_matches).iterrows():
        probs = predict_match(
            team_index[r["HomeTeam"]],
            team_index[r["AwayTeam"]],
            params
        )

        rows.append({
            "Match": f'{r["HomeTeam"]} vs {r["AwayTeam"]}',
            "Home win": probs["H"],
            "Draw": probs["D"],
            "Away win": probs["A"]
        })

    return pd.DataFrame(rows).round(3)


In [18]:
def outcome_score_S(xi, data, team_index):
    params = fit_dixon_coles_model(data, team_index, xi).x
    S = 0.0

    for _, row in data.iterrows():
        probs = predict_match(
            team_index[row["HomeTeam"]],
            team_index[row["AwayTeam"]],
            params
        )
        if row["FTHG"] > row["FTAG"]:
            S += np.log(probs["H"] + 1e-12)
        elif row["FTHG"] == row["FTAG"]:
            S += np.log(probs["D"] + 1e-12)
        else:
            S += np.log(probs["A"] + 1e-12)
    return S


In [19]:
def choose_xi(data, team_index):
    grid = np.linspace(0.0, 0.01, 21)
    scores = [outcome_score_S(xi, data, team_index) for xi in grid]
    for xi in grid:
        print(f"Evaluating xi = {xi:.5f}")
        scores.append(outcome_score_S(xi, data, team_index))

    return grid[np.argmax(scores)]


In [20]:
def fit_league(data, label):
    teams = sorted(data["HomeTeam"].unique())
    team_index = {t: i for i, t in enumerate(teams)}

    xi = choose_xi(data, team_index)
    print(f"{label}: chosen ξ = {xi:.5f}")

    res = fit_dixon_coles_model(data, team_index, xi)
    alpha, beta, gamma, rho = split_params(res.x)

    return {
        "league": label,
        "n_matches": len(data),
        "gamma": gamma,
        "gamma_exp": np.exp(gamma),
        "rho": rho,
        "xi": xi
    }, res.x, team_index


In [21]:
def actual_label(x, y):
    return "H" if x > y else "D" if x == y else "A"


In [22]:
def evaluate_model_on_test(data, team_index, params):
    brier = 0.0
    logloss = 0.0

    for _, r in data.iterrows():
        probs = predict_match(
            team_index[r["HomeTeam"]],
            team_index[r["AwayTeam"]],
            params
        )

        act = actual_label(int(r["FTHG"]), int(r["FTAG"]))

        for k in probs:
            brier += (probs[k] - (1 if act == k else 0)) ** 2

        logloss += -np.log(probs[act] + 1e-12)

    n = len(data)
    return brier / n, logloss / n



In [23]:
def evaluate_league(data, label):
    teams = sorted(data["HomeTeam"].unique())
    team_index = {t: i for i, t in enumerate(teams)}

    train = data[data["Season"] != "2025_2026"]
    test  = data[data["Season"] == "2025_2026"]


    # --------------------------------------------------
    # CHOOSE ξ ON TRAINING DATA (paper-consistent)
    # --------------------------------------------------
    xi = choose_xi(train, team_index)

    # --------------------------------------------------
    # FIT MODEL USING TRAINING DATA + ξ
    # --------------------------------------------------
    params = fit_dixon_coles_model(train, team_index, xi).x

    acc = 0
    brier = 0.0
    logloss = 0.0

    for _, r in test.iterrows():
        probs = predict_match(
            team_index[r["HomeTeam"]],
            team_index[r["AwayTeam"]],
            params
        )

        act = actual_label(int(r["FTHG"]), int(r["FTAG"]))
        pred = max(probs, key=probs.get)
        acc += int(pred == act)

        for k in probs:
            brier += (probs[k] - (1 if act == k else 0))**2

        logloss += -np.log(probs[act] + 1e-12)

    n = len(test)
    return {
        "league": label,
        "n_test": n,
        "accuracy": acc / n,
        "brier": brier / n,
        "logloss": logloss / n
    }



In [24]:
def fit_poisson_model(data, team_index, xi):
    res = fit_dixon_coles_model(data, team_index, xi)
    params = res.x.copy()
    params[-1] = 0.0  # force rho = 0
    return params


In [25]:
def evaluate_bundesliga(data):
    teams = sorted(data["HomeTeam"].unique())
    team_index = {t: i for i, t in enumerate(teams)}

    train = data[data["Season"] != "2025_2026"]
    test  = data[data["Season"] == "2025_2026"]

    xi = choose_xi(train, team_index)
    params = fit_dixon_coles_model(train, team_index, xi).x

    brier, logloss = evaluate_model_on_test(test, team_index, params)

    return {
        "league": "Bundesliga",
        "n_train": len(train),
        "n_test": len(test),
        "xi": xi,
        "brier": brier,
        "logloss": logloss
    }


In [26]:
def poisson_vs_dixon_coles_table(data, league_name):
    teams = sorted(data["HomeTeam"].unique())
    team_index = {t: i for i, t in enumerate(teams)}

    split = int(0.8 * len(data))
    train = data.iloc[:split]
    test  = data.iloc[split:]

    xi = choose_xi(train, team_index)

    dc_params = fit_dixon_coles_model(train, team_index, xi).x
    b_dc, l_dc = evaluate_model_on_test(test, team_index, dc_params)

    pois_params = fit_poisson_model(train, team_index, xi)
    b_p, l_p = evaluate_model_on_test(test, team_index, pois_params)

    table = pd.DataFrame({
        "Model": ["Poisson (ρ = 0)", "Dixon–Coles"],
        "Brier score": [b_p, b_dc],
        "Log loss": [l_p, l_dc]
    })

    return table.round(4)


In [27]:
def bookmaker_probs_and_margin(row):
    inv = np.array([
        1 / row["B365H"],
        1 / row["B365D"],
        1 / row["B365A"]
    ])
    margin = inv.sum() - 1
    probs = inv / inv.sum()
    return probs, margin


In [28]:
def diagnostic_binned_returns(df, bins=np.linspace(0.5, 1.5, 11)):
    """
    Bin probability ratios and compute observed mean return.
    """
    df = df.copy()
    df["bin"] = pd.cut(df["ratio"], bins=bins)

    grouped = df.groupby("bin", observed=True)["return"].mean().reset_index()

    return grouped


In [29]:
def diagnostic_return_data(test_data, team_index, params):
    """
    Construct data for Dixon–Coles diagnostic plot.
    """
    records = []

    for _, r in test_data.iterrows():
        model_probs = predict_match(
            team_index[r["HomeTeam"]],
            team_index[r["AwayTeam"]],
            params
        )

        book_probs = bookmaker_probabilities(r)

        actual = actual_label(int(r["FTHG"]), int(r["FTAG"]))

        odds = {
            "H": r["B365H"],
            "D": r["B365D"],
            "A": r["B365A"]
        }

        for outcome in ["H", "D", "A"]:
            ratio = model_probs[outcome] / book_probs[outcome]
            ret = realised_return(actual == outcome, odds[outcome])

            records.append({
                "ratio": ratio,
                "return": ret
            })

    return pd.DataFrame(records)


In [30]:
def plot_diagnostic_return_curve(binned_df):
    plt.figure(figsize=(8, 5))

    x = binned_df["bin"].astype(str)
    y = binned_df["return"]

    plt.plot(x, y, marker="o")
    plt.axhline(0, color="black", linestyle="--", linewidth=1)

    plt.xticks(rotation=45)
    plt.xlabel("Ratio: model probability / bookmaker probability")
    plt.ylabel("Observed mean return")
    plt.title("Diagnostic plot (Dixon–Coles style)")

    plt.tight_layout()
    plt.show()


In [31]:
def attack_defence_table(params, team_index, n_teams=20):
    alpha, beta, _, _ = split_params(params)

    # Invert team_index to get index -> team name
    index_team = {i: t for t, i in team_index.items()}

    rows = []
    for i in range(len(alpha)):
        rows.append({
            "Team": index_team[i],
            "Attack (alpha)": alpha[i],
            "Defence (beta)": beta[i]
        })

    df = pd.DataFrame(rows)

    # Sort by attack strength (descending)
    df = df.sort_values("Attack (alpha)", ascending=False)

    # Keep top n teams (e.g. 20)
    df = df.head(n_teams)

    return df.round(3)



In [None]:
if __name__ == "__main__":

    # --------------------------------------------------
    # 1. Load Bundesliga multi-season data
    # --------------------------------------------------
    df = pd.read_csv("bundesliga_all_seasons_clean.csv")
    df = df.sort_values("Date").reset_index(drop=True)

    # --------------------------------------------------
    # 2. Team index
    # --------------------------------------------------
    teams = sorted(df["HomeTeam"].unique())
    team_index = {t: i for i, t in enumerate(teams)}

    # --------------------------------------------------
    # 3. Temporal split (paper-faithful)
    # --------------------------------------------------
    train = df[df["Season"] != "2025_2026"]
    test  = df[df["Season"] == "2025_2026"]

    print(f"Training matches: {len(train)}")
    print(f"Test matches (2025/26): {len(test)}")

    # --------------------------------------------------
    # 4. Choose xi (Eq. 4.7) on training data
    # --------------------------------------------------
    xi = choose_xi(train, team_index)
    print(f"Chosen xi: {xi:.5f}")

    # --------------------------------------------------
    # 5. Fit Dixon–Coles model
    # --------------------------------------------------
    params = fit_dixon_coles_model(train, team_index, xi).x

    # --------------------------------------------------
    # Table: Attack and defence parameters (top teams)
    # --------------------------------------------------
    ad_table = attack_defence_table(
        params=params,
        team_index=team_index,
        n_teams=20
    )

    print("\nTable: Attack and defence parameters (Bundesliga)")
    print(ad_table.to_string(index=False))

    # --------------------------------------------------
    # 6. Evaluate on future season
    # --------------------------------------------------
    brier, logloss = evaluate_model_on_test(test, team_index, params)

    # --------------------------------------------------
    # Diagnostic plot
    # --------------------------------------------------
    diag_df = diagnostic_return_data(test, team_index, params)
    binned = diagnostic_binned_returns(diag_df)

    plot_diagnostic_return_curve(binned)

    print("\nEvaluation on 2025/2026 season:")
    print(f"Brier score: {brier:.4f}")
    print(f"Log loss:    {logloss:.4f}")

    # --------------------------------------------------
    # 7. Poisson vs Dixon–Coles (optional but good)
    # --------------------------------------------------
    table = poisson_vs_dixon_coles_table(df, "Bundesliga")
    print("\nPoisson vs Dixon–Coles comparison:")
    print(table.to_string(index=False))

    table6 = table_outcome_probabilities_2025(
        test_data=test,        # 2025/26 season only
        team_index=team_index,
        params=params,
        n_matches=10
    )

    print("\nTable: Outcome probabilities (2025/26 season)")
    print(table6.to_string(index=False))



Training matches: 1530
Test matches (2025/26): 126
