Some snippets inspired by GavinNg tutorial: https://www.kaggle.com/code/gavinjpng/fpl-prediction-and-selection/notebook

# Cleaning Codes

In [23]:
import pandas as pd
import xgboost as xgb
from sklearn.model_selection import RandomizedSearchCV, cross_val_score, GridSearchCV
import pulp
from collections import Counter
import numpy as np
import math

In [3]:
def combine_names(name1, name2):
    #the data comes with separate cols for first and last name, we want to merge them
    full_name = name1 +'_' + name2
    full_name = full_name.replace(" ", "_") 
    #earlier gw dfs list the name differently to later ones, so catch both cases
    full_name = full_name.replace("-", "_")
    
    return full_name

In [4]:
def cleaned(players, gws, team_ids):
    #get position data and player team name from players df, and insert into gws df
    #also matches the correct team code to the correct team as it changes from year to year with promotion/relegation
    players_df = players.copy()[['full_name', 'position','player_team_name', 'season']] 
    gws_df = gws.copy()
    merged_gws = gws_df.merge(players_df, on = ['full_name', 'season'])
    
    df = []
    for i, season in merged_gws.groupby('season'):
        team_id = team_ids[['team', str('team_' +i)]].dropna()
        seasons = season[['opponent_team']]
        #also apply team map to opponents
        seasons['opponent_team_name'] = seasons.opponent_team.map(team_id.set_index(str('team_'+i)).team)
        
        df.append(seasons[['opponent_team_name']])
        
    clean_df = pd.concat(df, axis = 0)
    
    return pd.concat([merged_gws, clean_df], axis = 1)   

In [5]:
def get_team_points(was_home, h_score, a_score):
    #used to determine how well the player's team has been doing
    if h_score == a_score: #draw
        return 1
    if h_score > a_score: #win
        if was_home:
            return 3
        else: 
            return 0
    if h_score < a_score: #loss
        if was_home:
            return 0
        else: 
            return 3

In [6]:
def get_opponent_points(team_points):
    #same as above but opposition form
    if team_points == 1: #draw
        return 1
    
    if team_points == 3: #opponent lost
        return 0
    
    if team_points == 0: #opponent won
        return 3

In [8]:
def historical_data(gws, features, history):
    """This function creates columns from the past x matches i.e. 1, 3, 5, 7 ... for each player
    This will be useful when making transfers"""
    
    df = gws.copy()
    
    for feature in features: #we define this as all the historical features we want to use
        for hist in history:
            
            hist_feature = 'last_' + str(hist) + '_' + feature #create column name
            
            if hist == 'all': #entire season before
                df[hist_feature] = df.sort_values('round').groupby(['full_name', 
                                                                    'season'])[feature].apply(lambda x: x.cumsum() - x)
                #this will create all historical data average per season for that player, i.e. avg 2016-17...21-22
            
            else: #past 1, 3, 5, n game rolling average
                df[hist_feature] = df.sort_values('round').groupby(['full_name', 
                                                                    'season'])[feature].apply(lambda x: x.rolling(min_periods = 1, 
                                                                                                                  window = hist +1).sum() - x)
    return df

In [9]:
def historical_team(historical_players, features, history):
    """Same as above but for team data.Helps to determine relative difficult of fixtures 
    for the transfer problem based on home and away team forms as we get the form of the
    opposition and the current team"""
    df = historical_players.copy()
    
    for feature in features:
        feat = feature + '_team'
        opp_feat = feat + '_opponent'
        
        ft = df.groupby(['player_team_name','season','round','kickoff_time','opponent_team_name'])[feature].max().rename(feat).reset_index()
        ft = ft.merge(ft, left_on = ['player_team_name','season','round','kickoff_time', 'opponent_team_name'], right_on = ['opponent_team_name','season','round','kickoff_time', 'player_team_name'],
                     how = 'left', suffixes = ('', '_opponent'))
        
        for hist in history:
            hist_feature = 'last_' + str(hist) + '_' + feat
            opp_feature = 'opponent_last_' + str(hist) + '_' + feature
            
            if hist =='all':
                ft[hist_feature] = ft.sort_values('round').groupby('player_team_name')[feat].apply(lambda x: x.cumsum() - x)
                ft[opp_feature] = ft.groupby('player_team_name')[opp_feat].apply(lambda x: x.cumsum() - x)
                
            else:
                ft[hist_feature] = ft.sort_values('round').groupby('player_team_name')[feat].apply(lambda x: x.rolling(min_periods = 1, window = hist + 1).sum() - x)
                ft[opp_feature] = ft.groupby('player_team_name')[opp_feat].apply(lambda x: x.rolling(min_periods = 1, window = hist+1).sum() - x)
    
    return df.merge(ft,on=['player_team_name', 'season', 'round', 'kickoff_time', 'opponent_team_name'],how='left')

# Modelling Codes

In [10]:
def get_categorical_labels(df, categoric, numeric):
    #converts categorical cols to numeric labels, leaves numeric as is
    cat = df[categoric].astype(str)
    num = df[numeric]
    
    cat = pd.get_dummies(cat)
    X = pd.concat([cat, num], axis = 1)
    
    return X

In [11]:
def get_predictions(y_test, y_pred, df):
    pred_df = pd.DataFrame(list(zip(y_test.tolist(), y_pred.tolist())), columns = ["actual", "predicted"])
    #make df of predictions vs actual
    pred_df.reset_index(drop = True, inplace = True)
    df.reset_index(inplace = True)
    
    output = pd.concat([df, pred_df], axis = 1) #append the df ^ to the original data
    
    return output

In [None]:
def get_position_model(df,position, params, model_features, 
                       categorical_features, numerical_features):
    
    """Given a position, we want to train an XGboost model and return predictions for either all players, 
    or all players wrt a certain position.
    
    position :: a str e.g. 'FWD', 'MID', 'DEF
    params :: a dict of parameters to optimise
    model_features :: what does XGBoost use as a feature
    categorical_features :: non numerical features e.g. True/False
    numerical_features :: numeric features 
    
    '"""
    
    if position == 'all': #consider the model wrt to all players
        
        train = df[(df.season != 2122)]
        test = df[(df.season == 2122)]
        
    else: #consider modelling each individual position
        train = df[(df.season != 2122) & (df.position == position)]
        test = df[(df.season == 2122) & (df.position == position)]
    
    X_train = get_categorical_labels(train[model_features], categorical_features, numerical_features)
    X_train = X_train.loc[:,~X_train.columns.duplicated()].copy() 
    #unsure what I did wrong here, but some cols were duplicated

    y_train = train.total_points #thing to predict

    X_test = get_categorical_labels(test, categorical_features, numerical_features)
    y_test = test.total_points
    
    #first fit a basic, unoptimised model
    model = xgb.XGBRegressor(objective='reg:squarederror')
    model.fit(X_train, y_train)
    
    #perform a search for the best param combinations as defined by -rmse score
    xgb_cv = RandomizedSearchCV(model, params, cv=3, scoring='neg_root_mean_squared_error', random_state=999,
                                n_iter = 25) 
    
    #xgb_cv_df = pd.Dataframe.from_dict(xgb_cv) 
    #don't really need this, but print if you want to see the results of the search including run-time 
    
    xgb_cv.fit(X_train, y_train) #fit on each combination
    
    #new best model initialisation
    xgb_best = xgb.XGBRegressor(objective='reg:squarederror')
    xgb_best.set_params(**xgb_cv.best_params_)
    
    #cross validate on training data (results are found in the return)
    cv_results = -cross_val_score(xgb_best, X_train, y_train, cv=10, scoring='neg_root_mean_squared_error')
    
    #fit best model and predict test data
    xgb_best.fit(X_train, y_train)
    xgb_predictions = xgb_best.predict(X_test)
    predicted_df_xgb = get_predictions(y_test, xgb_predictions, test[model_features + ['full_name', 'GW', 'player_team_name', 'total_points']])
   
    
    return model,  cv_results.mean(), predicted_df_xgb
    #need the first arg to plot feature importance, second to double check for overfitting, third is the actual results

# Linear Programming Scripts

In [None]:
def linear_prog(df, max_budget = 1000, gk_constraint = 2, def_constraint = 5, fwd_constraint = 3,
                mid_constraint = 5):
    
    #define the sets
    pos = df.position.unique()
    teams = df.player_team_name.unique()
    
    #define the constraints
    pos_constraint = {'GK': gk_constraint, 'DEF': def_constraint, 'MID': mid_constraint, 'FWD': fwd_constraint}
    
    #set up decision variables
    positions, clubs, names, values,  points, decision_vars = [], [], [], [], [], []
    for i in df.index:
        positions.append(df.position[i])
        clubs.append(df.player_team_name[i])
        names.append(df.full_name[i])
        values.append(df.value[i])
        points.append(df.predicted[i])
        decision_vars.append(pulp.LpVariable("x_" + str(i), cat = 'Binary')) #this makes variables like x_1, x_2 for the players
      
    #define the objective function
    prob = pulp.LpProblem("Initialisation", pulp.LpMaximize) #maximisation problem
    prob += pulp.lpSum(decision_vars[i] * points[i] for i in range(len(df))) #objective to maximise
    
    #set the constraints/add them to the problem
    prob += pulp.lpSum(decision_vars[i] * df.value[df.index[i]] for i in range(len(df))) <= max_budget
    
    for p in pos:
      prob += pulp.lpSum(decision_vars[i] for i in range(len(df)) if positions[i] == p) <= pos_constraint[p]

    for t in teams:
      prob += pulp.lpSum(decision_vars[i] for i in range(len(df)) if clubs[i] == t) <= 3 

    #solve problem
    prob.solve()
    
    return prob

In [12]:
def construct_best_team(df, prob):
    
    pred_df = pd.DataFrame(columns = ['Name', 'Team', 'Position', 'Value', 'Predicted Points', 'Actual Points'])

    for v in prob.variables():
      if v.varValue != 0: #the linear program outputs 1 if it selects a player to use
        name = df.full_name[int(v.name.split("_")[1])]
        club = df.player_team_name[int(v.name.split("_")[1])]
        position = df.position[int(v.name.split("_")[1])]
        point = df.predicted[int(v.name.split("_")[1])]
        price = df.value[int(v.name.split("_")[1])]
        actuals = df.actual[int(v.name.split("_")[1])]
        
        
        pred_df = pred_df.append({'Name': name, 'Team': club, 'Position': position, 'Value': price, 
                                  'Predicted Points': point, 'Actual Points': actuals}, ignore_index = True)
        
    return pred_df

In [13]:
def modify_team_constraints(bench_df, df):
    """This function will update the team dict and thus the constraints. We need this when manually picking the cheapest
    bench as the constraint of 3 will change to 0,1,2 etc"""
    teams = df.player_team_name.unique()
    cheapest_teams = bench_df.player_team_name.unique()
    teams_3 = np.setdiff1d(teams, cheapest_teams) #takes the bench player teams away from overall team list
    #this ^ list can still have all 3 players from one team available
    store_name, teams_2, teams_1, teams_0 = [],[],[],[]
    cheapest_counts = Counter([i for i in bench_df.player_team_name])
    
    for k,v in cheapest_counts.items():
        if v == 3:
            teams_0.append(k)
        elif v == 2:
            teams_1.append(k)
        elif v == 1:
            teams_2.append(k)
        else:
            print('Illegal bench selection, update required')
             #note it would be quite unlikely that the bench players will all be the same team i.e. (4 == illegal)
        
    return teams_3, teams_2, teams_1, teams_0

In [15]:
#probably should edit the other function somehow

def linear_prog_with_bench(bench_df, df, max_budget = 1000, gk_constraint = 2, def_constraint = 5, fwd_constraint = 3,
                mid_constraint = 5):
    #define the sets
    pos = df.position.unique()
    teams_3, teams_2, teams_1, teams_0 = modify_team_constraints(bench_df,df)
    
    #define the constraints
    pos_constraint = {'GK': gk_constraint, 'DEF': def_constraint, 'MID': mid_constraint, 'FWD': fwd_constraint}
    
    #set up decision variables
    positions, clubs, names, values,  points, decision_vars = [], [], [], [], [], []
    for i in df.index:
        positions.append(df.position[i])
        clubs.append(df.player_team_name[i])
        names.append(df.full_name[i])
        values.append(df.value[i])
        points.append(df.predicted[i])
        decision_vars.append(pulp.LpVariable("x_" + str(i), cat = 'Binary')) #this makes variables like x_1, x_2 for the players
      
    #define the problem
    prob = pulp.LpProblem("Initialisation", pulp.LpMaximize) #maximisation problem
    prob += pulp.lpSum(decision_vars[i] * points[i] for i in range(len(df))) #objective to maximise
    
    #set the constraints/add them to the problem
    prob += pulp.lpSum(decision_vars[i] * df.value[df.index[i]] for i in range(len(df))) <= max_budget
    
    for p in pos:
      prob += pulp.lpSum(decision_vars[i] for i in range(len(df)) if positions[i] == p) <= pos_constraint[p]

    for t in teams_3:
      prob += pulp.lpSum(decision_vars[i] for i in range(len(df)) if clubs[i] == t) <= 3 
    for t in teams_2:
      prob += pulp.lpSum(decision_vars[i] for i in range(len(df)) if clubs[i] == t) <= 2
    
    for t in teams_1:
      prob += pulp.lpSum(decision_vars[i] for i in range(len(df)) if clubs[i] == t) <= 1
    
    for t in teams_0:
      prob += pulp.lpSum(decision_vars[i] for i in range(len(df)) if clubs[i] == t) == 0

    #solve problem
    prob.solve()
    
    return prob

In [24]:
#it was quite difficult to code the logic for the combinations of 
#the different bench options so just chose one from each
def pick_bench(output_df):
    gk_options = [i for i in output_df[output_df.Position == 'GK'].sort_values('Predicted Points').head(1).Name][0]
    def_options = [i for i in output_df[output_df.Position == 'DEF'].sort_values('Predicted Points').head(1).Name][0]
    mid_options = [i for i in output_df[output_df.Position == 'MID'].sort_values('Predicted Points').head(1).Name][0]
    fwd_options = [i for i in output_df[output_df.Position == 'FWD'].sort_values('Predicted Points').head(1).Name][0]
    
    #we plot in this order because the fwd option has the greatest chance of scoring highly
    bench_dict = {'GK': gk_options, 'FWD':fwd_options, 'MID': mid_options, 'DEF': def_options}
    return bench_dict

In [26]:
def make_subs(output_df, original_df,gw, bench_dict):
    names = list(output_df.Name) #whole team
    
    bench_names = list(bench_dict.values()) #bench
    bench_count = {'GK': 1, 'DEF': 1, 'MID':1, 'FWD':1}
    
    team_list = [x for x in names if x not in bench_names] #team without the bench
    
    #forgot to include minutes in output df so lets just grab it again instead of re-running all models
    out_with_mins = original_df[(original_df.GW == gw) & (original_df.season == 2122) & (original_df.full_name.isin(team_list))].sort_values('full_name')
    names = list(out_with_mins.full_name) #new names
    
    for i, r in out_with_mins.iterrows():
        if r.minutes == 0: #if someone didnt play
            if r.position == 'GK':
                if bench_count['GK'] != 0: #first check if they are a goal keeper as we need to replace with gk
                    index_player = names.index(str(r.full_name))
                    names[index_player] = bench_dict['GK']
                    bench_count['GK'] -= 1
                else: 
                    index_player = names.index(str(r.full_name))
                    names[index_player] = r.full_name
            
            #if not then replace in order fwd,mid,def 
            elif len(original_df[(original_df.full_name == bench_dict['FWD']) & (original_df.season == 2122) & (original_df.minutes != 0)]) > 0:
                if bench_count['FWD'] != 0:
                    index_player = names.index(str(r.full_name))
                    names[index_player] = bench_dict['FWD']
                    bench_count['FWD'] -=1
                
                    
            elif len(original_df[(original_df.full_name == bench_dict['MID']) & (original_df.season == 2122) & (original_df.minutes != 0)]) > 0: 
                if bench_count['MID'] != 0:
                    index_player = names.index(r.full_name)
                    names[index_player] = bench_dict['MID']
                    bench_count['MID'] -= 1
                
            elif len(original_df[(original_df.full_name == bench_dict['MID']) & (original_df.season == 2122) & (original_df.minutes != 0)]) > 0:
                if bench_count['DEF'] != 0:
                    index_player = names.index(r.full_name)
                    names[index_player] = bench_dict['DEF']
                    
                    bench_count['DEF'] -= 1
    
    
    #return original team as we need to revert to non subs the next week 
    return  team_list, names
                       

# Transfer Algorithm

In [16]:
def get_rolling_points_avg(overall):
    player_df = overall
    #this creates a rolling average of the actual amount scored vs the predictions
    #generally helps us to determine who is performing better than expected, so we should probably get them
    player_df['avg_actuals'] = player_df.sort_values('GW').groupby('full_name')['total_points'].apply(lambda x: x.rolling(min_periods = 1, window =  38 + 1).mean())
    player_df['avg_preds'] = player_df.sort_values('GW').groupby('full_name')['predicted'].apply(lambda x: x.rolling(min_periods = 1, window =  38 + 1).mean())
    player_df['avg_error'] = player_df.avg_actuals - player_df.avg_preds
    
    return player_df

In [17]:
def get_player_out(overall, team_list, gw):
    #checks who performed a lot worse than expected in the week before
    player_df = overall[(overall.full_name.isin(team_list)) & (overall.GW  == gw-1)].sort_values('avg_error') #get df of current players
    
    #pick the worst player
    player_out = [i for i in player_df.head(1).full_name][0]
    
    return player_out

In [18]:
def get_available_cost(player_to_transfer, overall_df, gw, purchase_week):
    """
    purchase week needs to be declared because we must compare the player price when they were bought which
    may not neccessarily be game week 1"""
    #fpl has some weird rule where you only get half the profit of a player when selling them
    purchase_price = [i for i in overall_df[(overall_df.full_name == player_to_transfer) & (overall_df.GW == purchase_week)].value]
    next_week_cost = [i for i in overall_df[(overall_df.full_name == player_to_transfer) & (overall_df.GW == gw-1)].value]
    
    profit = next_week_cost[0] - purchase_price[0]
    
    if profit > 0:
        profit = math.floor(0.5*profit)
        available_to_spend = purchase_price[0] + profit
        
    else:
        available_to_spend = next_week_cost[0]
        
    return available_to_spend

In [19]:
def get_best_predicted_swap(player_out, overall_df , gw, cost_available, team_dict,purchase_dict, team_sheet):
    
    out_records = overall_df[(overall_df.full_name == player_out)] #search player in df
    position = [i for i in out_records['position']][0] #get their position
    team = [i for i in out_records['player_team_name']][0] #get their team
    
    team_dict[str(team)] -= 1 #update the team dictionary
    
    #find players which are a) available in next gw, b) same position and c) within budget
    next_week_db = overall_df[(overall_df.GW == gw) & (overall_df.position == str(position)) & (overall_df.value <= cost_available)]
    next_week_db = next_week_db[~next_week_db.full_name.isin(team_sheet)]
    #predictions for one week may be lower than previous due to double gws, so we cant have a greater than prev week
    #condition because it wont make sense, instead we just search through top performers
    
    #intuition is the prediction gets better with each week as the avg overachievement becomes clearer
    candidates = []
    costs = []
    team = []
    for idx, row in next_week_db.sort_values('avg_error', ascending = False).iterrows():
        if team_dict.get(str(row.player_team_name)) != 3:
            candidates.append(row.full_name)
            costs.append(row.value)
            team.append(row.player_team_name)
    
    team_dict[str(team[0])] += 1 
    purchase_dict[candidates[0]] = gw
    purchase_dict.pop(player_out, None)
    
    
    #remove the [0] to see lists of best candidates, this just gives me the top candidate
    return candidates[0],costs[0],team_dict, purchase_dict

In [20]:
def get_team_sheet(old_team, player_out, player_in):
    #need to create a list of new team sheet to analyse team each week in post processing
    for i in range(len(old_team)):
        if old_team[i] == player_out:
            old_team[i] = player_in
            
    
    return old_team

In [21]:
#if re=running. then reinitialise the dataframes in above cells as initial team wont be the same

def transfer_algorithm(overall, initial_team, team_and_bench, gw_start = 2, gw_end = 39):
    team_list = [list(initial_team.Name)]
    gw_list = np.arange(gw_start,gw_end)
    team_dict = Counter([i for i in team_and_bench.Team])
    money_banked = 1000 - team_and_bench.Value.sum() 
    #in our case this will always be 0 as we used the full 1000,
    #but it is good to set the algorithm up to handle any team
    
    purchase_dict = {}

    for i in team_and_bench.Name:
        purchase_dict[i] = 1
     
    options_in, options_out = [], []
    
    for gw in gw_list:
        player_out = get_player_out(overall, team_list[gw-gw_start], gw)
        cost_player_out = get_available_cost(player_out, overall, gw, purchase_dict[player_out])
        
        money_banked = money_banked + cost_player_out
        print(player_out,money_banked)
        team_sheet = list(team_list[gw-gw_start])
        player_in,  player_in_cost,  team_dict, purchase_dict = get_best_predicted_swap(player_out, overall, gw, money_banked, team_dict, purchase_dict, team_sheet)
        money_banked = money_banked - player_in_cost
        print(player_in, money_banked)
        new_team = get_team_sheet(team_list[gw-gw_start], player_out, player_in)
        team_list.append(list(new_team))
        options_in.append(player_in)
        options_out.append(player_out)
#         if gw == 5:
#             print(money_banked)
        
    return options_in, options_out, team_list

In [22]:
def create_output_table(overall, players_in, players_out, teams, gw_start = 2, gw_end = 39):
    #this is just a nice formatting function, also chooses the captain
    labels = ['GW', 'transferred_in', 'transferred_out', 'out_points', 'in_points']#, 'captained', 'total points']
    
    
    players_in.insert(0,'None') #there is no player then
    players_out.insert(0, 'None')
    player_in_points, player_out_points = [], [] # total_pts, captained, cap_points = [], [], [], [], []
    
    for i,j,k in zip(np.arange(gw_start-1,gw_end), players_in, players_out):
        if i == 1: #we wouldnt have a player to transfer in
            points_in = 'None'
            points_out = points_out = overall[(overall.full_name == k) & (overall.GW == i)]['actual'].sum()
            
            player_in_points.append(points_in)
            player_out_points.append(points_out)
        else: 
            points_in = overall[(overall.full_name == j) & (overall.GW == i)]['actual'].sum()
            points_out = overall[(overall.full_name == k) & (overall.GW == i)]['actual'].sum()
            
            player_in_points.append(points_in)
            player_out_points.append(points_out)
        
        #did this wrong but just need the player in and out
        #total_df = overall[(overall.full_name.isin(teams[i-1])) & (overall.GW == i)]
        #total_points = total_df.actual.sum()
        #total_pts.append(total_points)
        
        #cap = [i for i in total_df.sort_values('predicted',ascending = False)['full_name']][0]
        #cap_pts = [i for i in total_df.sort_values('predicted',ascending = False)['actual']][0]
        
        #captained.append(cap)
        #cap_points.append(cap_pts)
        
        
    data = {'GW': np.arange(gw_start-1,gw_end), 'transferred_in': players_in, 'transferred_out': players_out, 
             'out_points': player_out_points, 'in_points': player_in_points
            }
    
    
    
    ov_df = pd.DataFrame(data)
    
    
    return ov_df