# SOEN 471 Project

The objective of our project is to train a data model to classify football players into different posiitons (Forward, Midfielder, Defender), based on both their skill and performance characteristics, utilizing **supervised classification**.

Our training models are based on three algorithms: **Decision Tree**, **Random Forest** and **K-Nearest Neighbors (KNN)**.

## 1. Data Preparation

### Import libraries

In [11]:
import pandas as pd
import matplotlib.pyplot as pl
import pathlib
import shutil
import numpy as np
import pyspark.sql.functions as F
import findspark
import os
import sys

from data_processing import *
from datetime import datetime

from IPython.display import display

from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, when, col
from pyspark.ml.classification import DecisionTreeClassifier as DTC, RandomForestClassifier as RFC
from pyspark.sql.functions import array, col, monotonically_increasing_id, row_number
from pyspark.sql.window import Window
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.feature import StringIndexer,VectorAssembler

from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier

### Spark Initialization/ Environment Setup

In [12]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

findspark.init()

def init_spark():
    spark = (
        SparkSession.builder.appName("SOEN 471 Project")
        .config("spark.some.config.option", "some-value")
        .getOrCreate()
    )
    return spark

### Sample of Initial Data

<p>Initial row count is of 10,003,590 values, and dataset contains a total of 110 different features.</p>

In [13]:
data_dir = os.path.join(os.path.dirname(os.getcwd()), 'data')

spark = init_spark()

df1 = spark.read.csv(os.path.join(data_dir, 'male_players1.csv'), header=True)
df2 = spark.read.csv(os.path.join(data_dir, 'male_players2.csv'), header=True)
df3 = spark.read.csv(os.path.join(data_dir, 'male_players3.csv'), header=True)
df4 = spark.read.csv(os.path.join(data_dir, 'male_players4.csv'), header=True)
df5 = spark.read.csv(os.path.join(data_dir, 'male_players5.csv'), header=True)
df6 = spark.read.csv(os.path.join(data_dir, 'male_players6.csv'), header=True)

players = df1.union(df2).union(df3).union(df4).union(df5).union(df6)

sample = players.take(5)
sample = pd.DataFrame(sample, columns=sample[0].__fields__)
sample = sample.drop(sample.columns[0], axis=1)

display(sample)

Unnamed: 0,player_id,player_url,fifa_version,fifa_update,fifa_update_date,short_name,long_name,player_positions,overall,potential,...,cdm,rdm,rwb,lb,lcb,cb,rcb,rb,gk,player_face_url
0,158023,/player/158023/lionel-messi/230009,23,9,2023-01-13,L. Messi,Lionel Andrés Messi Cuccittini,RW,91,91,...,63+3,63+3,64+3,59+3,50+3,50+3,50+3,59+3,19+3,https://cdn.sofifa.net/players/158/023/23_120.png
1,165153,/player/165153/karim-benzema/230009,23,9,2023-01-13,K. Benzema,Karim Benzema,"CF, ST",91,91,...,64+3,64+3,64+3,60+3,55+3,55+3,55+3,60+3,18+3,https://cdn.sofifa.net/players/165/153/23_120.png
2,188545,/player/188545/robert-lewandowski/230009,23,9,2023-01-13,R. Lewandowski,Robert Lewandowski,ST,91,91,...,66+3,66+3,64+3,61+3,60+3,60+3,60+3,61+3,19+3,https://cdn.sofifa.net/players/188/545/23_120.png
3,192985,/player/192985/kevin-de-bruyne/230009,23,9,2023-01-13,K. De Bruyne,Kevin De Bruyne,"CM, CAM",91,91,...,79+3,79+3,78+3,74+3,68+3,68+3,68+3,74+3,21+3,https://cdn.sofifa.net/players/192/985/23_120.png
4,231747,/player/231747/kylian-mbappe/230009,23,9,2023-01-13,K. Mbappé,Kylian Mbappé Lottin,"ST, LW",91,95,...,63+3,63+3,67+3,63+3,54+3,54+3,54+3,63+3,18+3,https://cdn.sofifa.net/players/231/747/23_120.png


### Sample of Final Data

<p>Summary of all steps performed in the data preparation phase:</p>
<li>Select all field-related features.
<li>Discard all goalkeepers / goal-keeping attributes from the dataset (too many null values).
<li>Drop features that are not discriminative towards any class (same values can easily be obtained by players of all classes).
<li>Select most preferred position for each player.
<li>Convert FIFA-defined position (RW, CF, ST) to one of three labels (Forward, Midfielder, or Defender).
<li>Drop null rows.
<li>Cast all values to integers.

<p>Total number of features: 28

Reduced our row count from 10,003,590 to 8,882,438.<p>

In [14]:
players = players.select(
        "player_positions",
        "pace",
        "shooting",
        "passing",
        "dribbling",
        "defending",
        "attacking_crossing",
        "attacking_finishing",
        "attacking_heading_accuracy",
        "attacking_short_passing",
        "attacking_volleys",
        "skill_dribbling",
        "skill_fk_accuracy",
        "skill_long_passing",
        "skill_ball_control",
        "movement_acceleration",
        "movement_sprint_speed",
        "movement_agility",
        "movement_balance",
        "power_shot_power",
        "power_stamina",
        "power_long_shots",
        "mentality_interceptions",
        "mentality_positioning",
        "mentality_penalties",
        "defending_marking_awareness",
        "defending_standing_tackle",
        "defending_sliding_tackle",
    )

players = players.withColumn(
        "player_position", split(players["player_positions"], ",")[0]
    ).drop("player_positions")

players = players.filter(players.player_position != "GK")

players = players.withColumn(
        "label_position", label_conversion(players["player_position"])
    ).drop("player_position").dropna()

for col_name in players.columns:
        if col_name != "label_position":
            players = players.withColumn(col_name, col(col_name).cast(IntegerType()))

sample = players.take(10)
sample = pd.DataFrame(sample, columns=sample[0].__fields__)
display(sample)

Unnamed: 0,pace,shooting,passing,dribbling,defending,attacking_crossing,attacking_finishing,attacking_heading_accuracy,attacking_short_passing,attacking_volleys,...,power_shot_power,power_stamina,power_long_shots,mentality_interceptions,mentality_positioning,mentality_penalties,defending_marking_awareness,defending_standing_tackle,defending_sliding_tackle,label_position
0,81,89,90,94,34,84,90,70,91,88,...,86,70,91,40,93,75,20,35,24,Forward
1,80,88,83,87,39,75,92,90,89,88,...,87,82,80,39,92,84,43,24,18,Forward
2,75,91,79,86,44,71,94,91,84,89,...,91,76,84,49,94,90,35,42,19,Forward
3,74,88,93,87,63,94,85,55,93,83,...,92,89,91,64,88,83,65,65,53,Midfielder
4,97,89,80,92,36,78,93,72,85,83,...,88,87,82,38,92,80,26,34,32,Forward
5,90,89,82,90,45,80,93,59,84,84,...,83,87,85,55,92,86,38,43,41,Forward
6,87,83,85,93,37,83,83,63,85,86,...,79,79,81,37,86,91,35,32,29,Forward
7,63,73,78,73,88,62,64,81,84,65,...,88,88,81,88,75,66,90,88,87,Midfielder
8,69,91,83,83,47,80,93,85,84,87,...,92,83,86,44,94,92,50,36,38,Forward
9,81,60,71,72,90,53,52,86,79,45,...,81,74,64,89,47,62,91,91,86,Defender


Dataset counts **3,266,866** Defenders, **3,712,577** Midfielders, and **1,902,995** Forwards.

### Data Sampling

<p>As an attempt to improve results, we undersample the data to reduce class imbalance, returning <b>1,500,000</b> player values for each of the three classes (also easily splittable into two thirds training and one third testing).</p>

In [15]:
defenders = players.filter(players.label_position == "Defender")
midfielders = players.filter(players.label_position == "Midfielder")
forwards = players.filter(players.label_position == "Forward")

In [16]:
defenders = defenders.sample(fraction=1500000 / defenders.count()).limit(1500000)
midfielders = midfielders.sample(fraction=1500000 / midfielders.count()).limit(1500000)
forwards = forwards.sample(fraction=1500000 / forwards.count()).limit(1500000)

sampled_players = defenders.union(midfielders).union(forwards)

print("Defenders: {}".format(defenders.count()))
print("Midfielders: {}".format(midfielders.count()))
print("Forwards: {}".format(forwards.count()))
print("Total number of sampled players: {}".format(sampled_players.count()))

Defenders: 1499176
Midfielders: 1499377
Forwards: 1499643
Total number of sampled players: 4498196


## Supervised Classification Algorithms

<p>Males_players.csv dataset is used to train a Decision Tree model, a Random Forest Model, and a K-nearest neighbors (K-NN) model . The dataset is first preprocessed by encoding the categorical label data to numerical values column. Next, these models are trained on a training set that consists of 2/3 of the dataset, and evaluated by the remaining 1/3 of the dataset, the test set, to evaluate the performance of the trained model.Lastly, classification report and confusion matrix are generated to assess the accuracy and quality of the trained models.</p>

<p>Override the <b>printf()</b> function to display output with time</p>

In [None]:
def printf(*arg, **kwarg):
    timestamp = datetime.now().strftime("%H:%M:%S.%f")
    print(f'[{timestamp}]', *arg, **kwarg)

### Categorical Encoding

<p>Categorical encoding is done to the label_position column before splitting, fitting and evaluating the random forest model by using StringIndexer, which converts textual data to numeric data while keeping its categorical context

In [None]:
players = sampled_data()
def encode(players):
    # Apply string indexer
    indexer = StringIndexer(inputCol="label_position", outputCol="label_position_index")
    return indexer.fit(players).transform(players)
df = encode(players)

The table below displays the resulting dataset after assigning numerical values 0, 1 and 2 to the label positions <b>"Forward"</b>, <b>"Midfielder"</b> and <b>"Defender"</b>, respectively.</p>

In [None]:
unique_values = [row['label_position'] for row in df.select('label_position').distinct().orderBy('label_position').collect()]
for value in unique_values:
    sample = df[df['label_position'] == value].head(1)
    sample = pd.DataFrame(sample, columns=sample[0].__fields__)
    display(sample)

<p>To prepare the input data for the random forest model, the "<b>label_position</b>" and "<b>label_position_index</b>" columns are removed, and a new list of DataFrame columns called "<b>list_of_features</b>" is created</p>

In [None]:
# Drop the label_position column to only get the features
def getFeatures(df):
    list_of_features = df.drop("label_position").drop("label_position_index").columns
    assembler = VectorAssembler(inputCols=list_of_features, outputCol="indexed_features")
    df = assembler.transform(df)
    return df
df = getFeatures(df)
# Display the DataFrame
sample = df.take(5)
sample = pd.DataFrame(sample, columns=sample[0].__fields__)
display(sample)

### Split the data set into a training set and test set

<p>To train the Random Forest Model, a training set is created using 2/3 of the dataset, while the remaining 1/3 is used as the test set.</p>

In [None]:
trainingSet, testSet = df.randomSplit([0.67, 0.33], 24)
# Drop labels of the test set to evaluate later
unlabeledTestSet, testSetRealLabels = testSet.drop(
        "label_position"
    ), testSet.select("label_position").withColumnRenamed(
        "label_position", "real_label"
    )

<p>After splitting, the trainning set and test set will look as shown below:</p>

In [None]:
trainingDataSample= trainingSet.take(10)
trainingDataSample= pd.DataFrame(sample, columns=trainingDataSample[0].__fields__)
testDataSample= testSet.take(10)
testDataSample= pd.DataFrame(sample, columns=testDataSample[0].__fields__)
print("Training Data Sample")
display(trainingDataSample)
print("Test Data Sample")
display(testDataSample)

### Train a model using PySpark

#### Training Decision Tree model

<p>We'll train the model using the code below.<p>

In [None]:
dt = DTC(labelCol="label_position_index", featuresCol="indexed_features", impurity="entropy",
                                maxDepth=15)
printf("Training Decision Tree model...")
model = dt.fit(trainingSet)
printf("Finished fitting model.")
printf("Predicting test labels...")
spark_dt_predictions = model.transform(testSet)
printf("Finished predictions.")

#### Training Random Forest  model

<p>We'll train the model using the code below.<p>

In [None]:
rf = RFC(labelCol="label_position_index", featuresCol="indexed_features", numTrees=10, maxDepth=5)
printf("Training Random Forest model...")
model = rf.fit(trainingSet)
printf("Finished fitting model.")
printf("Predicting test labels...")
spark_rf_predictions = model.transform(testSet)
printf("Finished predictions.")

#### K-nearest neighbors algorithm (K-NN)

<p>Since the K-NN model uses distance between data points to predict an unlabeled data point, we need a function to compute their distance as shown below<p> 

In [None]:
def euclidean_distance(v1, v2):
    return np.sum((v1 - v2) ** 2)

<p>We'll train the model using the following code.<p>

In [None]:
def knnModel(trainingSet, toPredictSet, k):
    # Convert features to vectors to compute their distances 
    featuresArray = array(
        "pace",
        "shooting",
        "passing",
        "dribbling",
        "defending",
        "attacking_crossing",
        "attacking_finishing",
        "attacking_heading_accuracy",
        "attacking_short_passing",
        "attacking_volleys",
        "skill_dribbling",
        "skill_fk_accuracy",
        "skill_long_passing",
        "skill_ball_control",
        "movement_acceleration",
        "movement_sprint_speed",
        "movement_agility",
        "movement_balance",
        "power_shot_power",
        "power_stamina",
        "power_long_shots",
        "mentality_interceptions",
        "mentality_positioning",
        "mentality_penalties",
        "defending_marking_awareness",
        "defending_standing_tackle",
        "defending_sliding_tackle",
    )
    # Convert the training set into a DataFrame with two columns: 
    # one column represents feature vectors, and the other column represents a player's position.
    trainingSetDf = trainingSet.select(
        featuresArray.alias("features_in_array_training"), "label_position"
    )
    # Drop labels of the test set and convert its features into vectors
    toPredictSetDf = toPredictSet.select(
        monotonically_increasing_id().alias("id"),
        featuresArray.alias("features_in_array_predict"),
    )
    # Compute cartesian product of two Dataframes
    merged = toPredictSetDf.crossJoin(trainingSetDf)
    mergedRDD = merged.rdd
    mergedRDD = mergedRDD.map(
        lambda r: [
            r.id,
            euclidean_distance(
                np.array(r.features_in_array_predict).astype(float),
                np.array(r.features_in_array_training).astype(float),
            ).tolist(),
            r.label_position,
        ]
    )
    merged = mergedRDD.toDF(["id", "distance", "label_position"])
    
    windowMerged = Window.partitionBy("id").orderBy(col("distance").asc())
    merged = (
        merged.withColumn("row", row_number().over(windowMerged))
        .filter(col("row") <= k)
        .drop("row")
        .drop("distance")
    )
    # Get distances between an unlabeled data point with its K nearest neighbors
    merged = merged.groupBy(["id", "label_position"]).count()
    windowMerged = Window.partitionBy("id").orderBy(col("count").desc())
    # Obtain the majority labels of its K nearest neighbors
    merged = (
        merged.withColumn("row", row_number().over(windowMerged))
        .filter(col("row") == 1)
        .drop("row")
        .drop("count")
    )
    merged = merged.orderBy("id").drop("id")
    merged = merged.withColumnRenamed("label_position", "prediction")
    return merged

<p>Now we will train our model using the training set<p>

In [None]:
trainingData = trainingSet.drop("label_position_index").drop("indexed_features")
unlabeledTestData = unlabeledTestSet.drop("label_position_index").drop("indexed_features")
printf("Predicting test labels...")
testSetPredictions = knnModel(trainingData,unlabeledTestData,k=3)
printf("Finished predictions.")

### Model Evaluation

The model will be evaluated as shown below:

In [None]:
def evaluation(testSetLabels, predictions, features):
    # Compute evaluation metrics
    cr = classification_report(testSetLabels,predictions, output_dict=True)
    report_df = pd.DataFrame.from_dict(cr).transpose()
    print("Classification Report:")
    print(report_df)
    cm = confusion_matrix(testSetLabels, predictions)
    print("\nConfusion Matrix")
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels = features)
    disp.plot()
    pl.show()


#### Evaluate a model trained by Decision Tree algorithm

<p>To generate the classification report, the "<b>prediction</b>" and "<b>label_position_index</b>" columns are selected from the "<b>predictions</b>" dataframe. The resulting dataframe is then converted to a Pandas dataframe, allowing the use of the <b>classification_report()</b> function from the scikit-learn metrics module.<p>

In [None]:
# Compute evaluation metrics
printf("Evaluating predictions...\n")
dt_predictions_and_labels_pd = spark_dt_predictions.select("prediction", "label_position_index").toPandas()
evaluation(dt_predictions_and_labels_pd['label_position_index'],dt_predictions_and_labels_pd['prediction'], [0.0,1.0,2.0])

#### Evaluate a model trained by Random Forest algorithm

<p>To generate the classification report, the "<b>prediction</b>" and "<b>label_position_index</b>" columns are selected from the "<b>predictions</b>" dataframe. The resulting dataframe is then converted to a Pandas dataframe, allowing the use of the <b>classification_report()</b> function from the scikit-learn metrics module.</p>

In [None]:
# Compute evaluation metrics
printf("Evaluating predictions...\n")
rf_predictions_and_labels_pd = spark_rf_predictions.select("prediction", "label_position_index").toPandas()
evaluation(rf_predictions_and_labels_pd['label_position_index'],rf_predictions_and_labels_pd['prediction'],[0.0,1.0,2.0])

#### Evaluate a model trained by K-nearest neighbors (K-NN) algorithm 

In [None]:
# Compute evaluation metrics
printf("Evaluating predictions...\n")
evaluation(testSetRealLabels.toPandas(),testSetPredictions.toPandas(), ['Forward', 'Midfielder', 'Defender'])

<p><b>Observation:</b> Our models have been trained on a subsample of <bo>1,476</b> data points, which is much smaller than the original dataset. However, increasing the size of the subsample can take hours to train a model. As a result, we have decided to use Apache Parquet, which can generate lower storage costs for data and maximize the effectiveness of data queries. This allows a large amount of data to fit in memory, reducing the time for swapping data from disks. We will save the training set and test set as .parquet. This process is done by executing the following code:</p>

In [None]:
def cleanup_data_and_store_as_parquet(players):
    trainingSet, testSet = players.randomSplit([0.67, 0.33], 24)
    male_players_cleaned_training = pathlib.Path("../data/male_players_cleaned_training")
    if male_players_cleaned_training.exists():
         shutil.rmtree("../data/male_players_cleaned_training",ignore_errors=True)
    male_players_cleaned_test = pathlib.Path("../data/male_players_cleaned_test")
    if male_players_cleaned_test.exists():
        shutil.rmtree("../data/male_players_cleaned_test",ignore_errors=True)
        
    trainingSet.write.parquet('../data/male_players_cleaned_training')
    testSet.write.parquet('../data/male_players_cleaned_test') 

# Read the dataset using PySpark
players = data_preparation("../data/male_players.csv")
cleanup_data_and_store_as_parquet(players)
printf("-------Using the entire of data set-------")
printf("Loading dataset...")

# Convert to Pandas Dataframe to train with Scikit-Learn later
trainingSet = pd.read_parquet('../data/male_players_cleaned_training')
testSet = pd.read_parquet('../data/male_players_cleaned_test')
printf(f'Finished loading dataset. Training set: {trainingSet.shape[0]} rows, Test set: {testSet.shape[0]} rows')

# Get labels of the training set
trainingSetLabels = trainingSet.iloc[:, -1:].values.ravel()
# Drop the column "label_positions" of the training set 
trainingSet = trainingSet.iloc[:, :-1]

# Get labels of the test set
testSetLabels = testSet.iloc[:, -1:].values.ravel()
# Drop the column "label_positions" of the test set
testSet = testSet.iloc[:, :-1]

### Train a model using Scikit-Learn

<p>As PySpark does not implement the K-NN model, we developed our own implementation. However, our algorithm is computationally expensive because we generated a Cartesian product between the training set and test set. Fortunately, Scikit-Learn offers optimized K-NN models that use BallTree and KDTree for efficient nearest-neighbor searches.
</p>
<p>We will now create a <b>predict()</b> method.</p>

In [None]:
def predict(model, name, trainingSet, trainingSetLabels, testSetLabels):
    printf(f'Fitting {name} model...')
    model.fit(trainingSet, trainingSetLabels)
    printf(f"Finished fitting model.")

    printf("Predicting test labels...")

    predictions = model.predict(testSet)
    printf("Finished predictions.")
    return predictions

#### Train model using K-nearest neighbors (K-NN) algorithm 

<p>We'll train K-NN model using the code below.</p>

In [None]:
knn_model = KNeighborsClassifier(n_neighbors=3, algorithm='kd_tree', n_jobs=-1)
knn_predictions = predict(knn_model,"KNN",trainingSet, trainingSetLabels, testSetLabels)

#### Train model using Random Forest algorithm 

In [None]:
rf_model = RandomForestClassifier(max_depth=7, random_state=0)
rf_predictions = predict(rf_model,"Random Forest",trainingSet, trainingSetLabels, testSetLabels)

#### Train model using Decision Tree algorithm 

In [None]:
dt_model = DecisionTreeClassifier(criterion="entropy", max_depth=7)
dt_predictions = predict(dt_model, "Decision Tree", trainingSet, trainingSetLabels, testSetLabels)

#### Model Evaluation

#### Evaluate a model trained by K-NN algorithm

In [None]:
printf("Evaluating predictions...\n")
evaluation(testSetLabels,knn_predictions, ['Forward', 'Midfielder', 'Defender'])

#### Evaluate a model trained by Random Forest algorithm

In [None]:
printf("Evaluating predictions...\n")
evaluation(testSetLabels,rf_predictions, [0.0,1.0,2.0])

#### Evaluate a model trained by Decision Tree algorithm

In [None]:
printf("Evaluating predictions...\n")
evaluation(testSetLabels,dt_predictions, [0.0,1.0,2.0])