<center><h1><font size=6> Initial Data Processing and Feature Engineering </h1></center>

### Load libraries and setup notebook configuration

In [1]:
# import packages
import pandas as pd 
import numpy as np
import os
from pathlib import Path
import warnings
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.tree import DecisionTreeRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import LabelEncoder



# set pandas configurations
pd.set_option("display.precision", 2) # display to 1 decimpal place
pd.set_option("display.max.columns", None) # display all columns so we can view the whole dataset
pd.set_option('display.float_format', '{:.2f}'.format) # Disable scientific notation for pandas
warnings.filterwarnings("ignore", category=pd.errors.SettingWithCopyWarning) # Disable setting with copy warnings


# set directories
os.chdir('..') # change current working directory to the parent directory to help access files/directories at a higher level
DATAPATH = Path(r'data') # set data path


# import from source directory
from src import constants

### Load training data

In [2]:
X_train_full = pd.read_csv(f"{DATAPATH}/processed/X_train_full.csv")
y_train_full = pd.read_csv(f"{DATAPATH}/processed/y_train_full.csv")

In [3]:
train = X_train_full.copy().set_index('unique_match_id', inplace=False)

### Dealing with missing values

In [4]:
# define function to summarise missing values
def missing_values_summary(df):
    missing_values = df.isnull().sum()  # Calculate total missing values per column
    total_values = df.shape[0]  # Total number of rows in the DataFrame
    missing_percentage = (missing_values / total_values) * 100  # Calculate percentage of missing values

    summary_df = pd.DataFrame({'Total Missing Values': missing_values, 'Missing Percentage': missing_percentage})
    return summary_df

# summarise missing values for data frame
summary = missing_values_summary(train)
print(summary)

                                    Total Missing Values  Missing Percentage
season                                                 0                0.00
round                                                  0                0.00
day                                                    0                0.00
team                                                   0                0.00
promoted                                               0                0.00
opponent                                               0                0.00
promoted_opponent                                      0                0.00
home                                                   0                0.00
days_since_last_game                                 434                2.43
games_played_last_21_days                            465                2.60
pl_total_points                                        4                0.02
pl_total_gf                                            4                0.02

#### Define a set of custom transformers inhereted from scikit-learn base classes to transform the data

For some columns, we only have a small amount of irregular missing values. Here, it makes sense to just remove the rows associated with these observations from the data.

In [5]:
# define custom transformer to remove any rows that have NA values for 
class DropNaRowsTransformer(BaseEstimator, TransformerMixin):
    # initalise additional parameters based on the chosen columns
    def __init__(self, columns):
        self.columns = columns
    
    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        X_transformed = X.dropna(subset=self.columns, inplace=False)
        return X_transformed
    
# define list of columns to remove NAs
columns_to_remove_nas = ['season', 'day', 'round',  'team', 'home', 'promoted', 'opponent', 'promoted_opponent', 'pl_position', 'pl_position_opponent',
                        'pl_total_points', 'pl_total_gf', 'pl_total_ga', 'pl_total_goal_diff', 'pl_total_points_opponent', 'pl_total_gf_opponent', 'pl_total_ga_opponent', 'pl_total_goal_diff_opponent']

For other columns, it makes sense to impute the missing values with the averages of the values. An example of this is the days since last game and number of games in last 21 days feature. From the EDA, we saw that the missing values for these features tend to all be in the first game week due to the lack of data. It therefore makes sense to just impute them as the average over the course of the season. This means all teams/opponents will have the same value in the first gameweek which makes sense intuitively.

In [6]:
# define a custom transformer to impute missing values with the mean of each column
class MeanImputer(BaseEstimator, TransformerMixin):
    # initalise additional parameters based on the chosen columns
    def __init__(self, columns):
        self.columns = columns
        self.means = {}
        
    def fit(self, X, y=None):
        for column in self.columns:
            self.means[column] = {
                'mean': X[column].mean()
            }
        return self 
    
    def transform(self, X, y=None):
        X_transformed = X.copy()
        for column in self.columns:
            X_transformed[column] = X_transformed[column].fillna(value=self.means[column]['mean'])
        return X_transformed
    
    
# define list of columns to be imputed using the mean imputer
columns_to_impute_with_mean = ['days_since_last_game', 'games_played_last_21_days', 'days_since_last_game_opponent', 'games_played_last_21_days_opponent']

For some variables, whether there is a missing value or not is highly linked to the team's promotion status. This is because promoted teams dont appear in the previous season's data so we can't calculate some metrics. For these cases, it makes sense to impute the missing values based on the average historic values of promoted or non-promoted teams.

In [7]:
# Define custom transformer for imputing missing values based on the promotion status - if the team is promoted, replace NA the 10th percentile value or the 50th percentile if not
class PromotedImputer(BaseEstimator, TransformerMixin):
    def __init__(self, columns, columns_opponent):
        self.columns = columns
        self.columns_opponent = columns_opponent
        self.percentiles = {}
        self.percentiles_opponent = {}

    def fit(self, X, y=None):
        for column in self.columns:
            self.percentiles[column] = {
                '10th': X[column].quantile(0.1),
                '50th': X[column].quantile(0.5)
            }
        for column_opponent in self.columns_opponent:
            self.percentiles_opponent[column_opponent] = {
                '10th': X[column_opponent].quantile(0.1),
                '50th': X[column_opponent].quantile(0.5)
            }
        return self

    def transform(self, X, y=None):
        X_transformed = X.copy()
        for column in self.columns:
            X_transformed.loc[X_transformed['promoted'] == 1, column] = X_transformed.loc[X_transformed['promoted'] == 1, column].fillna(value=self.percentiles[column]['10th'])
            X_transformed.loc[X_transformed['promoted'] == 0, column] = X_transformed.loc[X_transformed['promoted'] == 0, column].fillna(value=self.percentiles[column]['50th'])
        for column_opponent in self.columns_opponent:
            X_transformed.loc[X_transformed['promoted_opponent'] == 1, column_opponent] = X_transformed.loc[X_transformed['promoted_opponent'] == 1, column_opponent].fillna(value=self.percentiles_opponent[column_opponent]['10th'])
            X_transformed.loc[X_transformed['promoted_opponent'] == 0, column_opponent] = X_transformed.loc[X_transformed['promoted_opponent'] == 0, column_opponent].fillna(value=self.percentiles_opponent[column_opponent]['50th'])
        return X_transformed
    
    
# define list of columns to impute in this way
columns_to_impute_based_on_promotion_status = ['prev_season_points', 'prev_season_gf', 'prev_season_ga', 'prev_season_goal_diff', 'points_pl_form', 'gf_pl_form', 'ga_pl_form']
columns_to_impute_based_on_promotion_status_oppontent = ['prev_season_points_opponent', 'prev_season_gf_opponent', 'prev_season_ga_opponent', 'prev_season_goal_diff_opponent', 'points_pl_form_opponent', 'gf_pl_form_opponent', 'ga_pl_form_opponent']

For some other columns, we might want to impute the missing values in a more sophisticated way. For example, the teams' and opponents' H2H record is found to have a high correlation with the predictor (see EDA) and so could be an important determinant in our model. However, where two teams havent played each other before, we won't have this metric. I want to impute the missing values here using a clustering algorithm based on important determinants of game results, namely the venue of the match and the team and opponents' points acheived in previous seasons. This will impute the head to head records based on a metric of how good the team and opponent is, as well as the venue identifier.

In [8]:
# define a custom transformer to impute missing values based on a decision tree algorithm trained on the relationship between venue and team metrics for the non-missing value data we have
class DecisionTreeClassifierImputer(BaseEstimator, TransformerMixin):
    def __init__(self, target_columns, reference_columns):
        self.target_columns = target_columns
        self.reference_columns = reference_columns
        self.decision_tree_classifiers = {}
        
    def fit(self, X, y=None):
        
        # Fit a separate Decision Tree model for each target column
        for target_column in self.target_columns:
    
            # Split out X and y for decision tree: the X will be the reference columns and the data where the target column is not NA
            X_where_target_non_na = X[X[target_column].notna()]
            X_decision_tree = X_where_target_non_na[self.reference_columns]
            y_decision_tree = X_where_target_non_na[target_column]

            # Create an instance of the Decision Tree regressor
            decision_tree = DecisionTreeRegressor()

            # Fit the model on the reference columns and target variable
            decision_tree.fit(X_decision_tree, y_decision_tree)

            # assign decision tree model to column
            self.decision_tree_classifiers[target_column] = decision_tree
        
        return self
    
    def transform(self, X, y=None):
        
        X_transformed = X.copy()
        
        # Apply Decision Tree model to target columns
        for target_column in self.target_columns:
    
            # Split out the X where these is a missing value in the target column for prediction
            X_where_target_is_na = X[X[target_column].isna()]
            X_decision_tree_predict = X_where_target_is_na[self.reference_columns]

            # collect classifier for specific variable
            decision_tree = self.decision_tree_classifiers[target_column]

            # use decision tree to predict target column
            y_decision_tree_predicted = decision_tree.predict(X_decision_tree_predict)

            # Replace the missing values in the target column with the predicted values
            X_transformed.loc[X_where_target_is_na.index, target_column] = y_decision_tree_predicted
        
        return X_transformed

# define a list of columns to be imputed in this way alongside the columns to use as reference columns
columns_to_impute_based_on_decision_tree = ['last_h2h', 'last_h2h_form', 'last_h2h_venue', 'last_h2h_venue_form']
reference_columns_for_decision_tree = ['home', 'prev_season_points', 'prev_season_points_opponent']

#### Transform data using transformation pipeline based on pre-defined custom transformers

In [9]:
missing_value_transformation_pipe = Pipeline(
    steps=[
        ("remove_rows_with_nas", DropNaRowsTransformer(columns=columns_to_remove_nas)),
        ("impute_with_means_of_columns", MeanImputer(columns=columns_to_impute_with_mean)),
        ("impute_with_mean_of_columns_by_promotion_status", PromotedImputer(columns=columns_to_impute_based_on_promotion_status,
                                                                           columns_opponent=columns_to_impute_based_on_promotion_status_oppontent)),
        ("impute_based_on_decision_tree_algorithm", DecisionTreeClassifierImputer(target_columns=columns_to_impute_based_on_decision_tree,
                                                                                  reference_columns=reference_columns_for_decision_tree))
    ]
)

In [10]:
transformed_training_data = missing_value_transformation_pipe.fit_transform(train)
summary = missing_values_summary(transformed_training_data)
print(summary)

                                    Total Missing Values  Missing Percentage
season                                                 0                0.00
round                                                  0                0.00
day                                                    0                0.00
team                                                   0                0.00
promoted                                               0                0.00
opponent                                               0                0.00
promoted_opponent                                      0                0.00
home                                                   0                0.00
days_since_last_game                                   0                0.00
games_played_last_21_days                              0                0.00
pl_total_points                                        0                0.00
pl_total_gf                                            0                0.00

### Feature scaling

I want to scale my numeric features using the standard scaler

In [11]:
# define categogrical variables not for scaling
categorical_columns = ['season', 'round', 'day', 'promoted', 'promoted_opponent', 'home']

# Store the index of transformed_training_data
index = transformed_training_data.index

# Select only numeric columns for scaling - ignore ID and any categorical variables like season, round and venue
scaling_columns = transformed_training_data.drop(columns=categorical_columns, inplace=False).select_dtypes(include='number').columns

# Subset the transformed_train DataFrame with numeric columns
scaling_data = transformed_training_data[scaling_columns]

# Scale the numeric data using StandardScaler
scaler = StandardScaler()
scaled_data = scaler.fit_transform(scaling_data)

# Create a DataFrame from the scaled numeric data
scaled_data = pd.DataFrame(scaled_data, columns=scaling_columns)

# Set the index of scaled_data to the stored index
scaled_data.index = index

# combine back with categorical columns
scaled_training_data = pd.concat([transformed_training_data[categorical_columns], scaled_data], axis=1)

In [12]:
# write generic function to scale numeric data
def scale_numeric_data(data, categorical_columns):
    # Store the index of the input DataFrame
    index = data.index

    # Select only numeric columns for scaling
    scaling_columns = data.drop(columns=categorical_columns, inplace=False).select_dtypes(include='number').columns

    # Subset the DataFrame with numeric columns
    scaling_data = data[scaling_columns]

    # Scale the numeric data using StandardScaler
    scaler = StandardScaler()
    scaled_data = scaler.fit_transform(scaling_data)

    # Create a DataFrame from the scaled numeric data
    scaled_data = pd.DataFrame(scaled_data, columns=scaling_columns)

    # Set the index of the scaled data to the stored index
    scaled_data.index = index

    # Combine the scaled numeric data with the categorical columns
    scaled_data = pd.concat([data[categorical_columns], scaled_data], axis=1)

    return scaled_data

### Feature selection

### Process target variable for training data and test data

#### Training Y

In [13]:
# collect training target data
train_y = y_train_full.copy().set_index('unique_match_id', inplace=False)

# encode target variable (0, 1, 2)
encoder = LabelEncoder()
train_y_encoded = encoder.fit_transform(train_y)

# Create a DataFrame with the encoded target variable and assign the index
train_y_encoded = pd.DataFrame(train_y_encoded, index=train_y.index, columns=['target'])

# keep only the index columns that are in the processed train X
scaled_train_y = train_y_encoded[train_y_encoded.index.isin(scaled_training_data.index)]

  return f(*args, **kwargs)


#### Test X

In [14]:
# collect testing data
X_test_full = pd.read_csv(f"{DATAPATH}/processed/X_test_full.csv")
y_test_full = pd.read_csv(f"{DATAPATH}/processed/y_test_full.csv")

In [15]:
test_x = X_test_full.copy().set_index('unique_match_id', inplace=False)

# missing value transformation
transformed_test_data = missing_value_transformation_pipe.fit_transform(test_x)

# feature scaling
scaled_test_data = scale_numeric_data(transformed_test_data, categorical_columns = ['season', 'round', 'day', 'promoted', 'promoted_opponent', 'home'])

#### Test Y

In [16]:
# collect training target data
test_y = y_test_full.copy().set_index('unique_match_id', inplace=False)

# encode target variable (0, 1, 2)
encoder = LabelEncoder()
test_y_encoded = encoder.fit_transform(test_y)

# Create a DataFrame with the encoded target variable and assign the index
test_y_encoded = pd.DataFrame(test_y_encoded, index=test_y.index, columns=['target'])

# keep only the index columns that are in the processed train X
scaled_test_y = test_y_encoded[test_y_encoded.index.isin(scaled_test_data.index)]

  return f(*args, **kwargs)


In [17]:
# store in local data file
# Define the output directory
output_dir = f"{DATAPATH}/processed/"

# Save the train and test sets as CSV files
scaled_training_data.to_csv(os.path.join(output_dir, 'X_train_full_processed.csv'), index=True)
scaled_train_y.to_csv(os.path.join(output_dir, 'y_train_full_processed.csv'), index=True)
scaled_test_data.to_csv(os.path.join(output_dir, 'X_test_full_processed.csv'), index=True)
scaled_test_y.to_csv(os.path.join(output_dir, 'y_test_full_processed.csv'), index=True)