In [37]:
from sklearn.metrics import root_mean_squared_error, accuracy_score, precision_recall_fscore_support,mean_squared_error
from sklearn.preprocessing import MinMaxScaler, LabelEncoder, StandardScaler, OneHotEncoder
from sklearn.feature_selection import SelectKBest, chi2, RFE, mutual_info_regression
from tensorflow.keras.layers import Dense, LSTM, Dropout
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from tensorflow.keras.models import Sequential
from tensorflow.keras import regularizers
from sklearn.svm import SVC, LinearSVC
from keras.utils import to_categorical
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import tensorflow as tf
import seaborn as sns
import pandas as pd
import numpy as np
import joblib
from sklearn.model_selection import GridSearchCV
import networkx as nx
from sklearn.model_selection import ParameterGrid
from sklearn.metrics import mean_squared_error, log_loss
from sklearn.linear_model import LogisticRegression
from scipy.stats import norm
import itertools

In [54]:
data_dir = "football-prediction/epl-training.csv"
df = pd.read_csv(data_dir)

In [55]:
def compute_difference_using_home_team_as_reference():
        features_to_differentiate = [
            ('Full Time Home Goals', 'Full Time Away Goals', 'Full Time Goal Difference'),
            ('Half Time Home Goals', 'Half Time Away Goals', 'Half Time Goal Difference'),
            ('Home Shots', 'Away Shots', 'Shot Difference'),
            ('Home Shots on Target', 'Away Shots on Target', 'Shots on Target Difference'),
            ('Home Corners', 'Away Corners', 'Corner Difference'),
            ('Home Fouls', 'Away Fouls', 'Foul Difference'),
            ('Home Yellow Cards', 'Away Yellow Cards', 'Yellow Card Difference'),
            ('Home Red Cards', 'Away Red Cards', 'Red Card Difference')
        ]
        for home_feature, away_feature, new_feature in features_to_differentiate:
            df[new_feature] = df[home_feature] - df[away_feature]
    
def get_season(date):
    if date.month >= 8:  
        return (date.year)
    else:  
        return (date.year - 1)

In [56]:
df.columns = [
            'Date',               
            'Home Team',          
            'Away Team',
            'Full Time Home Goals',  
            'Full Time Away Goals',  
            'Full Time Result',       
            'Half Time Home Goals',  
            'Half Time Away Goals',  
            'Half Time Result',      
            'Referee',
            'Home Shots',             
            'Away Shots',             
            'Home Shots on Target',   
            'Away Shots on Target',   
            'Home Corners',          
            'Away Corners',           
            'Home Fouls',             
            'Away Fouls',             
            'Home Yellow Cards',     
            'Away Yellow Cards',     
            'Home Red Cards',     
            'Away Red Cards'      
        ]
        
df['Home Goal Conversion Rate'] = df['Full Time Home Goals'] / df['Home Shots on Target'].replace(0, 1)
df['Away Goal Conversion Rate'] = df['Full Time Away Goals'] / df['Away Shots on Target'].replace(0, 1)
df['Home Attacking Intensity'] = 2 * df["Home Shots on Target"] + 1 * (df['Home Shots'] - df["Home Shots on Target"]) + 0.5 * df['Home Corners']
df['Away Attacking Intensity'] = 2 * df["Away Shots on Target"] + 1 * (df['Away Shots'] - df["Away Shots on Target"]) + 0.5 * df['Away Corners']
df['Attacking Intensity Difference'] = df['Home Attacking Intensity'] - df['Away Attacking Intensity']
df['Home Disciplinary Pressure'] = df['Home Fouls'] + df['Home Yellow Cards'] + df['Home Red Cards']
df['Away Disciplinary Pressure'] = df['Away Fouls'] + df['Away Yellow Cards'] + df['Away Red Cards']
df['Disciplinary Pressure Difference'] = df['Home Disciplinary Pressure'] - df['Away Disciplinary Pressure']
df['xG Home'] = df['Home Goal Conversion Rate'] * df['Home Attacking Intensity']
df['xG Away'] = df['Away Goal Conversion Rate'] * df['Away Attacking Intensity']
df['Match Outcome'] = df['Full Time Result'].map({'H': 1, 'D': 0, 'A': 2})

df['Date'] = pd.to_datetime(df['Date'], format='%d/%m/%Y')
df = df.sort_values(by='Date')
df['Season'] = df['Date'].apply(get_season)

In [57]:
df['Home Team Goals For'] = df.groupby('Home Team')['Full Time Home Goals'].transform(lambda x: x.shift().rolling(window=5, min_periods=1).mean())
df['Home Team Goals Against'] = df.groupby('Home Team')['Full Time Away Goals'].transform(lambda x: x.shift().rolling(window=5, min_periods=1).mean())
df['Away Team Goals For'] = df.groupby('Away Team')['Full Time Away Goals'].transform(lambda x: x.shift().rolling(window=5, min_periods=1).mean())
df['Away Team Goals Against'] = df.groupby('Away Team')['Full Time Home Goals'].transform(lambda x: x.shift().rolling(window=5, min_periods=1).mean())
df['Avg Home Shots on Target'] = df.groupby('Home Team')['Home Shots on Target'].transform(lambda x: x.shift().rolling(window=5, min_periods=1).mean())
df['Avg Away Shots on Target'] = df.groupby('Away Team')['Away Shots on Target'].transform(lambda x: x.shift().rolling(window=5, min_periods=1).mean())

# Fill any NaN values resulting from rolling mean with the overall mean
df.dropna(inplace=True)

In [58]:
# Get the list of unique teams
teams = pd.concat([df['Home Team'], df['Away Team']]).unique()

# Initialize ratings for each team
ratings = {}
for team in teams:
    ratings[team] = {
        'home': 0.0,         # Home background rating br_H
        'away': 0.0,         # Away background rating br_A
        'provisional': 0.0,  # Provisional rating pr
        'form_counter': 0,   # φ_cx, the count of continuous over/under-performances
        'over_performing': False,
        'under_performing': False
    }

### Define the Diminishing Function \(\psi(e)\)
def psi(e, c=3, b=10):
    return c * np.log10(1 + e)

In [59]:
# Use a logistic function to map ratings to expected goals
def rating_to_goals(rating, scale=5):
    # Clamp the rating to prevent extreme values
    rating = np.clip(rating, -10, 10)  # Adjust bounds as needed
    return scale / (1 + np.exp(-rating))

# Update Ratings Function
def update_ratings(home_team, away_team, home_goals, away_goals, lambda_param, gamma, mu, phi, delta):
    # Extract current ratings
    br_xH_t_minus_1 = ratings[home_team]['home']
    br_xA_t_minus_1 = ratings[home_team]['away']
    br_yA_t_minus_1 = ratings[away_team]['away']
    br_yH_t_minus_1 = ratings[away_team]['home']
    
    # Observed goal difference
    go = home_goals - away_goals
    
    # Expected goal difference
    gp_x = rating_to_goals(br_xH_t_minus_1)
    gp_y = rating_to_goals(br_yA_t_minus_1)
    gp = gp_x - gp_y
    
    # Error between observed and expected goal difference
    e = abs(go - gp)
    
    # Diminishing function ψ(e)
    psi_e = psi(e)
    
    # Determine ψ_x(e) and ψ_y(e)
    if gp < go:
        psi_x_e = psi_e
    else:
        psi_x_e = -psi_e
    
    if gp > go:
        psi_y_e = psi_e
    else:
        psi_y_e = -psi_e
    
    # Update background ratings for home team x
    br_xH_t = br_xH_t_minus_1 + psi_x_e * lambda_param
    br_xA_t = br_xA_t_minus_1 + (br_xH_t - br_xH_t_minus_1) * gamma
    
    # Update background ratings for away team y
    br_yA_t = br_yA_t_minus_1 + psi_y_e * lambda_param
    br_yH_t = br_yH_t_minus_1 + (br_yA_t - br_yA_t_minus_1) * gamma
    
    # Update the ratings in the dictionary
    ratings[home_team]['home'] = br_xH_t
    ratings[home_team]['away'] = br_xA_t
    ratings[away_team]['away'] = br_yA_t
    ratings[away_team]['home'] = br_yH_t
    
    # Update form factors for both teams
    update_form_factor(home_team, br_xH_t, mu, phi, delta, over_performance=(go > gp))
    update_form_factor(away_team, br_yA_t, mu, phi, delta, over_performance=(go < gp))


### Update Form Factor Function
def update_form_factor(team, br_t, mu, phi, delta, over_performance):
    # Get team info
    team_info = ratings[team]
    
    if over_performance:
        if team_info['over_performing']:
            team_info['form_counter'] += 1
        else:
            team_info['over_performing'] = True
            team_info['under_performing'] = False
            team_info['form_counter'] = 1
    else:
        if team_info['under_performing']:
            team_info['form_counter'] += 1
        else:
            team_info['over_performing'] = False
            team_info['under_performing'] = True
            team_info['form_counter'] = 1
    
    # Check if form factor should be applied
    if team_info['form_counter'] > phi:
        # Calculate form impact
        form_impact = mu * (delta ** (phi - team_info['form_counter']))
        if team_info['over_performing']:
            # Overperformance: increase provisional rating
            pr_t = br_t + form_impact
        else:
            # Underperformance: decrease provisional rating
            pr_t = br_t - form_impact
    else:
        # No form factor applied
        pr_t = br_t
    
    # Update provisional rating
    team_info['provisional'] = pr_t

### Calculate Prediction Error Function
def calculate_error(df):
    errors = []
    df_sorted = df.sort_values('Date')
    for index, row in df_sorted.iterrows():
        home_team = row['Home Team']
        away_team = row['Away Team']
        observed_diff = row['Full Time Home Goals'] - row['Full Time Away Goals']
        
        # Predicted difference using provisional ratings
        pr_x = ratings[home_team]['provisional']
        pr_y = ratings[away_team]['provisional']
        predicted_diff = pr_x - pr_y
        
        errors.append(observed_diff - predicted_diff)
    mse = np.mean(np.square(errors))
    return mse

def predict_match(home_team, away_team):
    pr_x = ratings[home_team]['provisional']
    pr_y = ratings[away_team]['provisional']
    predicted_diff = pr_x - pr_y
    if predicted_diff > 0:
        prediction = 'Home Win'
    elif predicted_diff < 0:
        prediction = 'Away Win'
    else:
        prediction = 'Draw'
    return prediction, predicted_diff


In [60]:
def run_model(df, lambda_param, gamma, mu, phi, delta):
    # Reset ratings
    for team in ratings:
        ratings[team]['home'] = 0.0
        ratings[team]['away'] = 0.0
        ratings[team]['provisional'] = 0.0
        ratings[team]['form_counter'] = 0
        ratings[team]['over_performing'] = False
        ratings[team]['under_performing'] = False
    
    # Lists to store data for advanced prediction
    rating_diffs = []
    outcomes = []
    match_indices = []
    home_prov_ratings = []
    away_prov_ratings = []
    home_bg_ratings = []
    away_bg_ratings = []
    
    # Iterate over each match
    for index, row in df.iterrows():
        home_team = row['Home Team']
        away_team = row['Away Team']
        home_goals = row['Full Time Home Goals']
        away_goals = row['Full Time Away Goals']
        
        # Before updating ratings, get the current provisional ratings to use for prediction
        pr_xH = ratings[home_team]['provisional']
        pr_yA = ratings[away_team]['provisional']
        rating_diff = pr_xH - pr_yA
        
        # Record the data
        rating_diffs.append(rating_diff)
        match_indices.append(index)
        home_prov_ratings.append(pr_xH)
        away_prov_ratings.append(pr_yA)
        home_bg_ratings.append(ratings[home_team]['home'])
        away_bg_ratings.append(ratings[away_team]['away'])
        
        # Determine actual match outcome
        if home_goals > away_goals:
            outcome = 1  # Home Win
        elif home_goals < away_goals:
            outcome = 2  # Away Win
        else:
            outcome = 0  # Draw
        outcomes.append(outcome)
        
        # Update ratings after the match
        update_ratings(home_team, away_team, home_goals, away_goals, 
                       lambda_param, gamma, mu, phi, delta)
    
    # Add the collected data back to the DataFrame
    df = df.copy()  # Create a full copy to avoid warnings

    # Explicitly update columns using .loc
    df.loc[match_indices, 'Rating Difference'] = rating_diffs
    df.loc[match_indices, 'Actual Outcome'] = outcomes
    df.loc[match_indices, 'Home Provisional Rating'] = home_prov_ratings
    df.loc[match_indices, 'Away Provisional Rating'] = away_prov_ratings
    df.loc[match_indices, 'Away Background Rating'] = away_bg_ratings
    return df

In [None]:
### Parameter Optimization Using Grid Search
# Define parameter grid
param_grid = {
    'lambda_param': np.arange(0.02, 0.06, 0.01),  
    'gamma': np.arange(0.7, 0.9, 0.05),          
    'mu': [0.005, 0.0075, 0.01],                 
    'phi': [1.5, 2, 2.5],                       
    'delta': [1.5, 1.75, 2.0]                   
}

df_training = df[df['Season'] >= 2018.0]

best_params = None
best_error = float('inf')

# grid search
param_list = list(ParameterGrid(param_grid))
for params in param_list:
    print(f"Testing parameters: {params}")
    run_model(df_training, params['lambda_param'], params['gamma'], 
              params['mu'], params['phi'], params['delta'])
    error = calculate_error(df_training)
    print(f"Error (MSE): {error}")
    
    if error < best_error:
        best_error = error
        best_params = params

print(f"\nBest parameters: {best_params}")
print(f"Best error (MSE): {best_error}")

# The grid search will test various combinations of parameters and output the MSE for each. At the end, it will display the best parameters found.



Testing parameters: {'delta': 1.5, 'gamma': np.float64(0.7), 'lambda_param': np.float64(0.02), 'mu': 0.005, 'phi': 1.5}
Error (MSE): 3.096214742950143
Testing parameters: {'delta': 1.5, 'gamma': np.float64(0.7), 'lambda_param': np.float64(0.02), 'mu': 0.005, 'phi': 2}
Error (MSE): 3.0968797970379933
Testing parameters: {'delta': 1.5, 'gamma': np.float64(0.7), 'lambda_param': np.float64(0.02), 'mu': 0.005, 'phi': 2.5}
Error (MSE): 3.0969086286256435
Testing parameters: {'delta': 1.5, 'gamma': np.float64(0.7), 'lambda_param': np.float64(0.02), 'mu': 0.0075, 'phi': 1.5}
Error (MSE): 3.0959524653664765
Testing parameters: {'delta': 1.5, 'gamma': np.float64(0.7), 'lambda_param': np.float64(0.02), 'mu': 0.0075, 'phi': 2}
Error (MSE): 3.0969441070173165
Testing parameters: {'delta': 1.5, 'gamma': np.float64(0.7), 'lambda_param': np.float64(0.02), 'mu': 0.0075, 'phi': 2.5}
Error (MSE): 3.0969878095744092
Testing parameters: {'delta': 1.5, 'gamma': np.float64(0.7), 'lambda_param': np.float64(0.

In [61]:
df = run_model(df, best_params['lambda_param'], best_params['gamma'], best_params['mu'], best_params['phi'], best_params['delta'])

In [62]:
df.columns

Index(['Date', 'Home Team', 'Away Team', 'Full Time Home Goals',
       'Full Time Away Goals', 'Full Time Result', 'Half Time Home Goals',
       'Half Time Away Goals', 'Half Time Result', 'Referee', 'Home Shots',
       'Away Shots', 'Home Shots on Target', 'Away Shots on Target',
       'Home Corners', 'Away Corners', 'Home Fouls', 'Away Fouls',
       'Home Yellow Cards', 'Away Yellow Cards', 'Home Red Cards',
       'Away Red Cards', 'Home Goal Conversion Rate',
       'Away Goal Conversion Rate', 'Home Attacking Intensity',
       'Away Attacking Intensity', 'Attacking Intensity Difference',
       'Home Disciplinary Pressure', 'Away Disciplinary Pressure',
       'Disciplinary Pressure Difference', 'xG Home', 'xG Away',
       'Match Outcome', 'Season', 'Home Team Goals For',
       'Home Team Goals Against', 'Away Team Goals For',
       'Away Team Goals Against', 'Avg Home Shots on Target',
       'Avg Away Shots on Target', 'Rating Difference', 'Actual Outcome',
       'Home

In [64]:
feature_columns = [
    'Rating Difference', 
    'Home Team Goals For', 'Home Team Goals Against', 
    'Away Team Goals For', 'Away Team Goals Against',
    'Avg Home Shots on Target', 'Avg Away Shots on Target',
]

df['Actual Outcome'] = df['Actual Outcome'].astype(int)
y = df['Actual Outcome']
X = df[feature_columns]

train_df = df[df['Season'] <= 2018.0]
test_df = df[df['Season'] > 2018.0]

X_train = train_df[feature_columns]
y_train = train_df['Actual Outcome']
X_test = test_df[feature_columns]
y_test = test_df['Actual Outcome']

In [65]:
model = LogisticRegression(multi_class='multinomial', solver='lbfgs', max_iter=1000)
model.fit(X_train, y_train)



In [66]:
#### **Make Predictions and Calculate Probabilities**
test_df['Predicted Probabilities'] = model.predict_proba(X_test).tolist()
test_df['Predicted Outcome'] = model.predict(X_test)

# Extract individual probabilities
prob_cols = ['Prob_Draw', 'Prob_HomeWin', 'Prob_AwayWin']
prob_df = pd.DataFrame(test_df['Predicted Probabilities'].to_list(), columns=prob_cols)
test_df = pd.concat([test_df.reset_index(drop=True), prob_df], axis=1)

#### **Evaluate the Model**
from sklearn.metrics import accuracy_score, log_loss

# Adjust the ordering of probabilities if necessary
# By default, LogisticRegression classes are sorted in ascending order
# Ensure the mapping matches your outcome labels (0: Draw, 1: Home Win, 2: Away Win)

accuracy = accuracy_score(y_test, test_df['Predicted Outcome'])
print(f"Accuracy: {accuracy}")

ll = log_loss(y_test, model.predict_proba(X_test))
print(f"Log Loss: {ll}")

#### **Add Ratings and Other Features to the DataFrame**
# We'll add the current ratings just before each match
def get_current_ratings(row):
    home_team = row['Home Team']
    away_team = row['Away Team']
    return pd.Series({
        'Home Provisional Rating': ratings[home_team]['provisional'],
        'Away Provisional Rating': ratings[away_team]['provisional'],
        'Home Background Rating': ratings[home_team]['home'],
        'Away Background Rating': ratings[away_team]['away'],
    })

df_ratings = df.apply(get_current_ratings, axis=1)
df = pd.concat([df, df_ratings], axis=1)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_df['Predicted Probabilities'] = model.predict_proba(X_test).tolist()
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_df['Predicted Outcome'] = model.predict(X_test)


Accuracy: 0.5481444332998997
Log Loss: 0.9717173214740242
