In [None]:
import numpy as np
import sqlite3
import pandas as pd
import joblib
from pandas import DataFrame

from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.preprocessing import LabelEncoder


import os
import sys
# Segur q es pot arreglar d"una altra forma
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))


import settings


In [None]:
with sqlite3.connect(settings.DATABASE_PATH) as conn:
    df = pd.read_sql(
        f"""
            SELECT * FROM Matches
        """,
        conn,
    )

In [None]:
df = df.dropna(subset=["score"]).copy()
df["home_score"] = df["score"].str.split(":").str[0].astype(int)
df["away_score"] = df["score"].str.split(":").str[1].astype(int)
df["home_win"] = (df["home_score"] > df["away_score"]).astype(int)
df["away_win"] = (df["home_score"] < df["away_score"]).astype(int)
df["tie"] = (df["home_score"] == df["away_score"]).astype(int)
df["difference_score"] = abs(df["home_score"] - df["away_score"])


In [None]:
def adjust_date(row):
    """
    Adjust the date by adding the correct century based on the season.
    If the date"s year is less than the season"s end year, use the start year,
    otherwise use the end year of the season.

    :param row: A row from the DataFrame containing "date" and "season".
    :return: Adjusted date string in the format DD/MM/YYYY.
    """
    start_year = int(row["season"].split("-")[0])
    end_year = int(row["season"].split("-")[1])
    date_year = int(row["date"].split("/")[-1])

    if date_year == start_year % 100:
        return row["date"][:-2] + str(start_year)
    else:
        return row["date"][:-2] + str(end_year)


df["date"] = df.apply(adjust_date, axis=1)
df = df.dropna(subset=["date"])
df["date"] = pd.to_datetime(df["date"], format="%m/%d/%Y")

In [None]:
df = df.dropna(subset=["score"])
df["season"] = df["season"].str.split("-").str[0].astype(int)

conditions = [
    (df['home_win'] == 1),
    (df['tie'] == 1),
    (df['away_win'] == 1)
]

choices = [1, 0, -1]

df['result'] = np.select(conditions, choices, default=None)


In [None]:
def calculate_team_results(df: pd.DataFrame) -> pd.DataFrame:
    """
    Creates a DataFrame from a historic table of results to a final standings of each season and division.
    It is ordered by points per season. In case two teams have same number of points, is ordered by GD (de-
    scending), and then by GF (also descending).

    :param df: DataFrame containing match data for all teams.
    :return: DataFrame with results for each team and season.
    """

    df_results = (
        pd.concat(
            [
                df.groupby(["division", "season", "home_team"])
                .agg(
                    GF=("home_score", "sum"),
                    GA=("away_score", "sum"),
                    W=("home_win", "sum"),
                    L=("away_win", "sum"),
                    T=("tie", "sum"),
                )
                .reset_index()
                .rename(columns={"home_team": "team"}),
                df.groupby(["division", "season", "away_team"])
                .agg(
                    GF=("away_score", "sum"),
                    GA=("home_score", "sum"),
                    W=("away_win", "sum"),
                    L=("home_win", "sum"),
                    T=("tie", "sum"),
                )
                .reset_index()
                .rename(columns={"away_team": "team"}),
            ]
        )
        .groupby(["division", "season", "team"])
        .sum()
        .reset_index()
    )

    df_results["GD"] = df_results["GF"] - df_results["GA"]
    df_results["points"] = df_results["W"] * 3 + df_results["T"]

    df_results = df_results.sort_values(
        by=["division", "season", "points", "GD", "GF"],
        ascending=[True, False, False, False, False],
    ).reset_index(drop=True)

    df_results["rank"] = (
        df_results.groupby(["season", "division"])["points"]
        .rank("first", ascending=False)
        .astype(int)
    )

    df_results = df_results[
        [
            "season",
            "division",
            "rank",
            "team",
            "GF",
            "GA",
            "GD",
            "W",
            "L",
            "T",
            "points",
        ]
    ]
    return df_results

In [None]:
def get_last_5_results(df, team, season):
    df_team_season = df[(df["home_team"] == team) | (df["away_team"] == team)]
    df_team_season = df_team_season[df_team_season["season"] == season]

    df_team_season["Result"] = np.where(
        (df_team_season["home_team"] == team) & (df_team_season["home_win"] == 1)
        | (df_team_season["away_team"] == team) & (df_team_season["away_win"] == 1),
        "W",
        np.where(
            (df_team_season["home_team"] == team) & (df_team_season["home_win"] == 0)
            | (df_team_season["away_team"] == team) & (df_team_season["away_win"] == 0),
            "L",
            "T",
        ),
    )

    last_5_results = (
        df_team_season.sort_values("date", ascending=False)["Result"].head(5).tolist()
    )
    return f"{last_5_results}"


df_results_extended = df_results.copy()
df_results_extended["last5"] = df_results_extended.apply(
    lambda row: get_last_5_results(df, row["team"], row["season"]), axis=1
)
df_results_extended

In [None]:
def team_confrontations(df, team1, team2):
    df_confrontations = df.loc[
        ((df["home_team"] == team1) | (df["away_team"] == team1))
        & ((df["home_team"] == team2) | (df["away_team"] == team2))
    ]

    return df_confrontations

def won_games(df, team: str):
    """
    Returns those winning games for a given team
    """
    home_wins = (df["home_team"] == team) & (df["home_win"] == 1)
    away_wins = (df["away_team"] == team) & (df["away_win"] == 1)
    return df[home_wins | away_wins]


def lost_games(df, team: str):
    """
    Returns those winning games for a given team
    """
    home_lost = (df["home_team"] == team) & (df["home_win"] == 0)
    away_lost = (df["away_team"] == team) & (df["away_win"] == 0)
    tie_games = (df["home_team"] == team) & (df["tie"] == 1) | (df["away_team"] == team) & (df["tie"] == 1)

    return df[(home_lost | away_lost) & ~tie_games]


In [None]:
# def difference_points(df_calculate, df_results, season):
#     """
#     This function calculates the difference in points between two teams.
#     """
#     return df_calculate.apply(
#         lambda row: df_results.loc[
#             (df_results["team"] == row["home_team"]) & (df_results["season"] == season), "points"
#         ].values[0]
#         - df_results.loc[
#             (df_results["team"] == row["away_team"]) & (df_results["season"] == season), "points"
#         ].values[0],
#         axis=1,
#     )

# def inform_relatives_points(df, df_calculate, season):
#     df_season = df.loc[df["season"] == season].copy()
#     df_results = calculate_team_results(df_season)
#     df_calculate["points_relative"] = difference_points(df_calculate, df_results, season)
#     df_calculate["points_relative_index"] = df_calculate["points_relative"] * (df_calculate["matchday"]**2 / (df_calculate["matchday"]**2 + 38) ) # As the season goes on, the importance of the relative points increases
#     return df_calculate

In [None]:
# def win_lost_index(df, season, team1, team2):
#     """
#     This function returns a victory and lose punctuation between 2 teams.
#     It is given by 1 / ((currentseason - season) + 1)^2
#     This function has a maximum in currentseason and it deacreses as season increase
#     Giving more importance to recent matches.

#     It is symmetric for both teams, so win_team1 = lost_team2.
#     """
#     df_conf = team_confrontations(df, team1, team2)
    
    
#     df_won = won_games(df_conf, team1).copy()
#     df_won.loc[:, "win_punct"] = 1 / ((season - df_won["season"].astype(int))**2 + 1)
#     win_punct = df_won["win_punct"].sum()

#     df_lost = lost_games(df_conf, team1).copy()
#     df_lost.loc[:, "lost_punct"] = 1 / ((season - df_lost["season"].astype(int))**2 + 1)
#     lost_punct = df_lost["lost_punct"].sum()

#     return win_punct, lost_punct


# def inform_win_lost_index(df, df_calculate, season):
#     df_necesary = df.loc[df["season"] >= (season - 20)].copy()
#     df_calculate[["win_punct", "lost_punct"]] = df_calculate.apply(
#         lambda row: win_lost_index(df_necesary,season, row["home_team"], row["away_team"]),
#         axis=1, result_type="expand"
#     )
#     return df_calculate


In [None]:
import pandas as pd

def win_lost_index(row, df_conf_dict):
    """
    Calculates win and loss punctuation between two teams for the row's season,
    using a precomputed dictionary of relevant confrontations for each season.

    This function returns a victory and lose punctuation between 2 teams.
    It is given by 1 / ((currentseason - season) + 1)^2
    This function has a maximum in currentseason and it deacreses as season increase
    Giving more importance to recent matches.
    It is symmetric for both teams, so win_team1 = lost_team2.

    :param row: A row from the DataFrame containing "season", "home_team" and "away_team".
    :param df_conf_dict: A dictionary containing relevant confrontations for each season.
    :return: A tuple with the win and loss punctuation for the row's season.
    """
    season = row["season"]
    team1 = row["home_team"]
    team2 = row["away_team"]
    
    df_conf = df_conf_dict.get((team1, team2), pd.DataFrame())
    
    df_won = won_games(df_conf, team1)
    if not df_won.empty:
        df_won["win_punct"] = 1 / ((season - df_won["season"].astype(int)) ** 2 + 1)
        win_punct = df_won["win_punct"].sum()
    else:
        win_punct = 0
    
    df_lost = lost_games(df_conf, team1)
    if not df_lost.empty:
        df_lost["lost_punct"] = 1 / ((season - df_lost["season"].astype(int)) ** 2 + 1)
        lost_punct = df_lost["lost_punct"].sum()
    else:
        lost_punct = 0
    
    return win_punct, lost_punct

def inform_win_lost_index(df, df_calculate):
    """
    Adds win and loss punctuation to the DataFrame for each row's season.

    :param df: DataFrame containing match data for all teams.
    :param df_calculate: DataFrame containing the matches to calculate the win and loss punctuation.
    :return: DataFrame with the win and loss punctuation for each row's season.
    """
    max_season = df_calculate["season"].max()
    df_recent = df[df["season"] >= (max_season - 20)].copy()
    
    teams = df_calculate[["home_team", "away_team"]].drop_duplicates()
    df_conf_dict = {
        (team1, team2): team_confrontations(df_recent, team1, team2)
        for team1, team2 in zip(teams["home_team"], teams["away_team"])
    }
    
    df_calculate[["win_punct", "lost_punct"]] = df_calculate.apply(
        lambda row: win_lost_index(row, df_conf_dict),
        axis=1, result_type="expand"
    )
    
    return df_calculate


In [None]:
def difference_points(row, df_results_dict):
    """
    Calculates the difference in points between the home and away teams
    for a given season, using a dictionary of results for each season.

    :param row: A row from the DataFrame containing "home_team", "away_team", and "season".
    :param df_results_dict: Dictionary of results for each season.
    """
    season = row["season"]
    home_team_points = df_results_dict[season].loc[df_results_dict[season]["team"] == row["home_team"], "points"].values[0]
    away_team_points = df_results_dict[season].loc[df_results_dict[season]["team"] == row["away_team"], "points"].values[0]
    return home_team_points - away_team_points

def inform_relatives_points(df, df_calculate):
    """
    Calculates the relative points for each match in the DataFrame,
    using a dictionary of results for each season.

    :param df: DataFrame with match data.
    :param df_calculate: DataFrame with calculated features.
    :return: DataFrame with relative points.
    """

    seasons = df["season"].unique()
    df_results_dict = {season: calculate_team_results(df[df["season"] == season]) for season in seasons}
    
    df_calculate["points_relative"] = df_calculate.apply(
        lambda row: difference_points(row, df_results_dict), axis=1
    )
    
    df_calculate["points_relative_index"] = (
        df_calculate["points_relative"] * (df_calculate["matchday"] ** 2 / (df_calculate["matchday"] ** 2 + 38))
    )
    
    return df_calculate



## Pretreatment

In [None]:
# Data that will be used for training
train_season = 2020
depth = 10 # Number of seasons to consider for training

df_train = df.loc[(df["season"] > (train_season - depth)) & (df["season"] <= train_season)].copy()

df_train = inform_relatives_points(df, df_train)
df_train = inform_win_lost_index(df, df_train)

## Train

In [None]:
features = ["win_punct", "lost_punct", "points_relative_index"]
target = "result"

x_train = df_train[features]
y_train = df_train[target]

# Encode target labels as integers if they are not already
le = LabelEncoder()
y_train = le.fit_transform(y_train)  # Converts labels to numeric

x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.1, random_state=42)

clf = GradientBoostingClassifier()
clf.fit(x_train, y_train)

y_val_pred = clf.predict(x_val)
validation_accuracy = (y_val_pred == y_val).mean()
print(f"Validation Accuracy: {validation_accuracy:.2%}")

#Save the model
# import joblib
# joblib.dump(clf, "../models/model.pkl")


## Example of usage

In [None]:
model_path = "../models/model.pkl"

if os.path.exists(model_path):
    clf = joblib.load(model_path)
else:
    print(f"Model file not found at {model_path}, uncomment the previous cell to train the model")

In [None]:
season_to_predict = 2021
matchday_to_predict = 3
df_predict = df.loc[(df["season"] == season_to_predict) & (df["matchday"] == matchday_to_predict)].copy()
df_predict
df_predict = inform_relatives_points(df, df_predict)
df_predict = inform_win_lost_index(df, df_predict)
x_predict = df_predict[features]

y_predict = clf.predict(x_predict)
y_predict = le.inverse_transform(y_predict)
df_predict["prediction"] = y_predict
df_predict["correct"] = df_predict["result"] == df_predict["prediction"]
print(df_predict["correct"].sum() / df_predict.shape[0] * 100)

df_predict

In [None]:
df_predict
#calculate how many results are results_predicted == result
