# Project - Option 1

## Task 1

In [42]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline,Transformer
from pyspark.ml.feature import Imputer,StandardScaler,StringIndexer,OneHotEncoder, VectorAssembler
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import numpy as np
import os
import sys
from pyspark.sql.functions import udf, col, when
from pyspark.sql.types import ArrayType, FloatType, StringType, IntegerType
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType
import warnings
import torch
from sklearn.metrics import r2_score
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from itertools import product
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import mean_squared_error, r2_score
from functools import reduce

In [43]:
import warnings
warnings.filterwarnings('ignore')

In [44]:
appName = "Big Data Analytics"
master = "local[*]"

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
 .set('spark.driver.host','127.0.0.1')\
   .setAppName(appName)\
    .setMaster(master)

spark = SparkSession.builder.config(conf = conf).getOrCreate()

In [45]:
db_properties={}
db_properties['username']="postgres"
db_properties['password']=""
db_properties['url']= "jdbc:postgresql://localhost:5432/postgres"
db_properties['table']="fifa.fifa"
db_properties['driver']="org.postgresql.Driver"

In [46]:
df_male = spark.read.csv('./Data/players_15.csv', header = True)
combined_df = df_male.withColumn("year", lit(2015))
# combined_df = combined_df.withColumn("record_id", monotonically_increasing_id()).select("record_id", *combined_df.columns)
combined_df = combined_df.withColumn("gender", lit("Male"))
folder_path = "./Data"
for file_name in os.listdir(folder_path):
    if file_name == "players_15.csv":
        continue
    year = "20" + file_name[-6:-4]
    file_path = os.path.join(folder_path, file_name)
    df_read = spark.read.csv(file_path, header = True)
    df_read = df_read.withColumn("year", lit(int(year)))
    # df_read = df_read.withColumn("record_id", monotonically_increasing_id()).select("record_id", *df_read.columns)
    if "female" in file_name:
        df_read = df_read.withColumn("gender", lit("Female"))
    else:
        df_read = df_read.withColumn("gender", lit("Male"))
    combined_df = combined_df.union(df_read)
combined_df = combined_df.withColumn("record_id", monotonically_increasing_id()).select("record_id", *combined_df.columns)

In [47]:
# Write to PostgreSQL
combined_df.write.format("jdbc")\
.mode("overwrite")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()

                                                                                

In [48]:
# Read from PostgreSQL to verify
df_from_postgres = spark.read \
    .format("jdbc") \
    .option("url", db_properties['url'])\
    .option("dbtable", db_properties['table'])\
    .option("user", db_properties['username'])\
    .option("password", db_properties['password'])\
    .option("Driver", db_properties['driver'])\
    .load()

## Task 2

In [50]:
def read_from_spark(spark, db_properties):
    df_from_postgres = spark.read \
        .format("jdbc") \
        .option("url", db_properties['url'])\
        .option("dbtable", db_properties['table'])\
        .option("user", db_properties['username'])\
        .option("password", db_properties['password'])\
        .option("Driver", db_properties['driver'])\
        .load()
    df = df_from_postgres.filter(df_from_postgres["gender"] == "Male")
    return df

In [51]:
def get_top_clubs_with_contracts_ending(spark, db_properties, X, Y, Z):
    df = read_from_spark(spark, db_properties)
    df_filtered = df.filter(col("year") == X)
    df_expiring = df_filtered.filter(col("club_contract_valid_until").cast("int") >= Z)
    result = df_expiring.groupBy("club_name") \
        .count() \
        .orderBy(col("count").desc()) \
        .limit(Y)
    return result.collect()

In [52]:
def find_clubs_by_average_age(spark, db_properties, X, Y, highest=True):
    if X <= 0:
        return "X must be a positive integer"
    if Y < 2015 or Y > 2022:
        return "Y must be a year between 2015 and 2022 inclusively"
    df = read_from_spark(spark, db_properties)
    
    # Filter data for specified year Y
    df_filtered = df.filter(col("year") == Y)
    avg_age_per_club = df_filtered.groupBy("club_name") \
        .agg(round(avg("age").cast("float"),2).alias("average_age"))
    if highest:
        sorted_clubs = avg_age_per_club.orderBy(desc("average_age"))
    else:
        sorted_clubs = avg_age_per_club.orderBy(asc("average_age"))

    top_clubs = sorted_clubs.limit(X)
    last_club = top_clubs.collect()[-1]
    threshold_age = last_club["average_age"]
    if highest:
        result_clubs = sorted_clubs.filter(col("average_age") >= threshold_age).collect()
    else:
        result_clubs = sorted_clubs.filter(col("average_age") <= threshold_age).collect()
    return result_clubs

In [53]:
def get_most_popular_nationality(spark, db_properties):
    df = read_from_spark(spark, db_properties)
    # df_filtered = df.filter((col("year") >= 2015) & (col("year") <= 2022))
    nationality_counts = df.groupBy("year", "nationality_name") \
        .agg(count("*").alias("count"))
    # Create a window partitioned by year and ordered by count descending
    window = Window.partitionBy("year").orderBy(desc("count"))
    
    # Add row number within each year partition
    ranked_nationalities = nationality_counts.withColumn("rank", row_number().over(window))
    # Filter for the top nationality for each year
    most_popular_nationalities = ranked_nationalities.filter(col("rank") == 1) \
        .select("year", "nationality_name", "count") \
        .orderBy("year")
    
    return most_popular_nationalities.collect()

## Task-2.1

In [54]:
top_clubs = get_top_clubs_with_contracts_ending(spark=spark, db_properties=db_properties, X=2021, Y=10, Z=2023)

In [55]:
top_clubs

[Row(club_name='GwangJu FC', count=28),
 Row(club_name='Zamora Fútbol Club', count=27),
 Row(club_name='Club Plaza de Deportes Colonia', count=27),
 Row(club_name='SL Benfica', count=26),
 Row(club_name='Club Deportivo El Nacional', count=26),
 Row(club_name='Sociedad Deportiva Aucas', count=26),
 Row(club_name='Gangwon FC', count=26),
 Row(club_name='Club Atlético Nacional Potosí', count=26),
 Row(club_name='Busan IPark', count=26),
 Row(club_name='Club Sportivo Luqueño', count=25)]

<h1>Task-2.2</h1>

In [56]:
clubs_by_age = find_clubs_by_average_age(spark=spark, db_properties=db_properties, X=10, Y=2017, highest=False)

In [57]:
clubs_by_age

[Row(club_name='Sevilla Atlético', average_age=19.920000076293945),
 Row(club_name='Swindon Town', average_age=21.3700008392334),
 Row(club_name='CD Huachipato', average_age=21.40999984741211),
 Row(club_name='FC Nordsjælland', average_age=21.40999984741211),
 Row(club_name='FC Twente', average_age=21.59000015258789),
 Row(club_name='Envigado FC', average_age=21.610000610351562),
 Row(club_name='KRC Genk', average_age=21.6299991607666),
 Row(club_name='Crewe Alexandra', average_age=21.81999969482422),
 Row(club_name='Barnsley', average_age=21.8700008392334),
 Row(club_name='Ajax', average_age=21.969999313354492)]

<h1>Task-2.3</h1>

In [58]:
popular_nationalities = get_most_popular_nationality(spark=spark, db_properties=db_properties)

In [59]:
popular_nationalities

[Row(year=2015, nationality_name='England', count=1627),
 Row(year=2016, nationality_name='England', count=1519),
 Row(year=2017, nationality_name='England', count=1627),
 Row(year=2018, nationality_name='England', count=1633),
 Row(year=2019, nationality_name='England', count=1625),
 Row(year=2020, nationality_name='England', count=1670),
 Row(year=2021, nationality_name='England', count=1685),
 Row(year=2022, nationality_name='England', count=1719)]

# Task III

## Preprocessing

Refer to preprocessing.ipynb for detailed investigation on correlations and outliers

In [60]:
#Setup column categories
col_names = ['record_id',
 'sofifa_id',
 'player_url',
 'short_name',
 'long_name',
 'player_positions',
 'overall',
 'potential',
 'value_eur',
 'wage_eur',
 'age',
 'dob',
 'height_cm',
 'weight_kg',
 'club_team_id',
 'club_name',
 'league_name',
 'league_level',
 'club_position',
 'club_jersey_number',
 'club_loaned_from',
 'club_joined',
 'club_contract_valid_until',
 'nationality_id',
 'nationality_name',
 'nation_team_id',
 'nation_position',
 'nation_jersey_number',
 'preferred_foot',
 'weak_foot',
 'skill_moves',
 'international_reputation',
 'work_rate',
 'body_type',
 'real_face',
 'release_clause_eur',
 'player_tags',
 'player_traits',
 'pace',
 'shooting',
 'passing',
 'dribbling',
 'defending',
 'physic',
 'attacking_crossing',
 'attacking_finishing',
 'attacking_heading_accuracy',
 'attacking_short_passing',
 'attacking_volleys',
 'skill_dribbling',
 'skill_curve',
 'skill_fk_accuracy',
 'skill_long_passing',
 'skill_ball_control',
 'movement_acceleration',
 'movement_sprint_speed',
 'movement_agility',
 'movement_reactions',
 'movement_balance',
 'power_shot_power',
 'power_jumping',
 'power_stamina',
 'power_strength',
 'power_long_shots',
 'mentality_aggression',
 'mentality_interceptions',
 'mentality_positioning',
 'mentality_vision',
 'mentality_penalties',
 'mentality_composure',
 'defending_marking_awareness',
 'defending_standing_tackle',
 'defending_sliding_tackle',
 'goalkeeping_diving',
 'goalkeeping_handling',
 'goalkeeping_kicking',
 'goalkeeping_positioning',
 'goalkeeping_reflexes',
 'goalkeeping_speed',
 'ls',
 'st',
 'rs',
 'lw',
 'lf',
 'cf',
 'rf',
 'rw',
 'lam',
 'cam',
 'ram',
 'lm',
 'lcm',
 'cm',
 'rcm',
 'rm',
 'lwb',
 'ldm',
 'cdm',
 'rdm',
 'rwb',
 'lb',
 'lcb',
 'cb',
 'rcb',
 'rb',
 'gk',
 'player_face_url',
 'club_logo_url',
 'club_flag_url',
 'nation_logo_url',
 'nation_flag_url',
 'year',
 'gender']

ordinal_cols = ['work_rate']
nominal_cols = ['body_type']
continuous_cols = ['potential', 'value_eur', 'wage_eur', 'age', 'height_cm', 'weight_kg',
                   'weak_foot', 'skill_moves', 'international_reputation',
                   'attacking_crossing','attacking_finishing', 'attacking_heading_accuracy', 'attacking_short_passing', 'attacking_volleys',
                   'movement_sprint_speed', 'movement_agility', 'movement_reactions', 'movement_balance',
                   'power_shot_power', 'power_jumping', 'power_stamina', 'power_strength', 'power_long_shots',
                   'mentality_aggression', 'mentality_interceptions', 'mentality_positioning', 'mentality_vision', 'mentality_penalties',
                   'defending_marking_awareness', 'defending_standing_tackle', 'defending_sliding_tackle']

main_traits_cols = ['pace', 'shooting', 'passing', 'dribbling', 'defending', 'physic', 
                            'goalkeeping_diving', 'goalkeeping_handling', 'goalkeeping_kicking',
                            'goalkeeping_positioning', 'goalkeeping_reflexes', 'goalkeeping_speed']

skill_cols = ['skill_dribbling', 'skill_curve', 'skill_fk_accuracy', 'skill_long_passing', 'skill_ball_control']

position_cols = ['ls', 'st', 'rs', 'lw', 'lf', 'cf', 'rf', 'rw', 'lam', 'cam', 'ram', 'lm', 'lcm', 'cm',
                   'rcm', 'rm', 'lwb', 'ldm', 'cdm', 'rdm', 'rwb', 'lb', 'lcb', 'cb', 'rcb', 'rb', 'gk']

needed_cols = ['record_id', 'player_positions'] #needed for preprocessing but will be dropped

columns_to_drop = ['sofifa_id', 'player_url', 'short_name', 'long_name',
                   'dob', 'club_name', 'league_name', 'club_position', 'club_jersey_number', 'club_loaned_from',
                   'club_joined','club_contract_valid_until', 'nationality_id', 'nationality_name', 'nation_team_id',
                   'nation_position', 'nation_jersey_number', 'preferred_foot', 'real_face', 'player_tags', 'player_traits',
                   'player_face_url', 'club_logo_url', 'club_flag_url', 'nation_logo_url', 'nation_flag_url',
                   'year', 'gender', 'league_level']

null_cols_drop = ['release_clause_eur']

corr_cols_drop = ['movement_acceleration']

dropped_null_rows = ['value_eur', 'wage_eur', 'trait6']

imputed_cols = ['mentality_composure', 'club_team_id']

In [61]:
class NullImputer(Transformer):
    def __init__(self):
        super().__init__()

    def _transform(self, dataset):
        output_df = dataset.withColumn("club_team_id_imputed", when(col("club_team_id").isNull(), -1).otherwise(col("club_team_id")))
        return output_df
    
class MLImputer(Transformer):
    def __init__(self):
        super().__init__()
    
    def _transform(self, dataset):
        s = dataset.select(['record_id','mentality_composure', 'mentality_aggression', 'mentality_interceptions', 'mentality_positioning', 'mentality_vision', 'mentality_penalties']).toPandas()
       
        # Prepare the data
        train_data = s.dropna(subset=['mentality_composure'])
        X = train_data[['mentality_aggression', 'mentality_interceptions', 'mentality_positioning', 'mentality_vision', 'mentality_penalties']]
        Y = train_data['mentality_composure']

        # Fit the linear regression model
        model = LinearRegression()
        model.fit(X, Y)

        # Predict missing values
        X_missing = s[s['mentality_composure'].isnull()][['mentality_aggression', 'mentality_interceptions', 'mentality_positioning', 'mentality_vision', 'mentality_penalties']]
        predicted_values = model.predict(X_missing)
        rounded_predicted_values = np.round(predicted_values).astype(int)
        # Impute the predicted values
        s.loc[s['mentality_composure'].isnull(), 'mentality_composure'] = rounded_predicted_values
        s = s.rename(columns = {'mentality_composure': 'mentality_composure_imputed'})
        spark_s = spark.createDataFrame(s[['record_id','mentality_composure_imputed']])
        spark_df = dataset.join(spark_s, on = 'record_id', how = 'left')
        return spark_df
    
class AverageSkillCreator(Transformer):
    def __init__(self, average_skills_cols = None):
        super().__init__()
        self.average_skills_cols = average_skills_cols

    def _transform(self, dataset):
        spark_df = dataset
        spark_df = spark_df.withColumn(
        'corr_av_skills', 
        aggregate(
            array(*[col(pos) for pos in self.average_skills_cols]),
            lit(0.0),
            lambda acc, x: acc + x,
            lambda acc: acc / size(array(*[col(pos) for pos in self.average_skills_cols]))
        )
        )

        return spark_df
        
class AveragePositionCreator(Transformer):
    def __init__(self, position_cols = None):
        super().__init__()
        self.position_cols = position_cols

    # Define UDF for safe evaluation
    def safe_eval(self, x):
        try:
            return float(eval(str(x)))
        except:
            return None
        
    def _transform(self, dataset):
        attacking_positions = ['ls', 'st', 'rs', 'lw', 'lf', 'cf', 'rf', 'rw',
                           'lam', 'cam', 'ram']
        midfield_positions = ['lam', 'cam', 'ram', 'lm', 'lcm', 'cm', 'rcm', 'rm',
                            'ldm', 'cdm', 'rdm']
        defensive_positions = ['rwb', 'lb', 'lcb', 'cb', 'lwb', 'rcb', 'rb',
                            'ldm', 'cdm', 'rdm']
        gk_positions = ['gk']

        spark_df = dataset
        safe_eval_udf = udf(self.safe_eval, FloatType())

        #Apply safe_eval to each position
        for pos in position_cols:
            spark_df = spark_df.withColumn(pos, safe_eval_udf(pos))

         #Add new columns 'average_val_attacking', 'average_val_midfield', 'average_val_defensive', 'average_val_gk'
        spark_df = spark_df.withColumn(
            'average_val_attacking', 
            aggregate(
                array(*[col(pos) for pos in attacking_positions]),
                lit(0.0),
                lambda acc, x: acc + x,
                lambda acc: acc / size(array(*[col(pos) for pos in attacking_positions]))
            )
        )
        spark_df = spark_df.withColumn(
            'average_val_midfield', 
            aggregate(
                array(*[col(pos) for pos in midfield_positions]),
                lit(0.0),
                lambda acc, x: acc + x,
                lambda acc: acc / size(array(*[col(pos) for pos in midfield_positions]))
            )
        )
        spark_df = spark_df.withColumn(
            'average_val_defensive', 
            aggregate(
                array(*[col(pos) for pos in defensive_positions]),
                lit(0.0),
                lambda acc, x: acc + x,
                lambda acc: acc / size(array(*[col(pos) for pos in defensive_positions]))
            )
        )
        spark_df = spark_df.withColumn(
            'average_val_gk', 
            aggregate(
                array(*[col(pos) for pos in gk_positions]),
                lit(0.0),
                lambda acc, x: acc + x,
                lambda acc: acc / size(array(*[col(pos) for pos in gk_positions]))
            )
        )

        return spark_df

def assign_traits(player_positions, *traits):
        #Trait1 = pace/goalkeeping_diving
        #Trait2 = shooting/goalkeeping_handling
        #Trait3 = passing/goalkeeping_kicking
        #Trait4 = dribbling/goalkeeping_positioning
        #Trait5 = defending/goalkeeping_reflexes
        #Trait6 = physic/goalkeeping_speed

        traits = [float(t) if t is not None else None for t in traits]
        if player_positions and 'GK' in player_positions:
            return traits[6:12]
        else:
            return traits[:6]

class MergeMainTraits(Transformer):
    def __init__(self, main_traits_cols = None):
        super().__init__()
        self.main_traits_cols = main_traits_cols
        
    def _transform(self, dataset):
        output_df = dataset
        assign_traits_udf = udf(assign_traits, ArrayType(FloatType()))
        output_df = output_df.withColumn('traits', 
                                       assign_traits_udf(col('player_positions'), *[col(trait) for trait in self.main_traits_cols]))
        for i in range(1, 7):
            output_df = output_df.withColumn(f'trait{i}', col('traits').getItem(i-1))
        
        output_df = output_df.drop('traits')
        return output_df
    
class DropOutliers(Transformer):
    def __init__(self, continuous_cols = None):
        super().__init__()
        self.continuous_cols = continuous_cols

    def column_add(self, a,b):
        return  a.__add__(b)
    
    def find_outliers(self, df, continuous_cols):
        # Identifying the numerical columns in a spark dataframe

        # Using the `for` loop to create new columns by identifying the outliers for each feature
        for column in continuous_cols:

            less_Q1 = 'less_Q1_{}'.format(column)
            more_Q3 = 'more_Q3_{}'.format(column)
            Q1 = 'Q1_{}'.format(column)
            Q3 = 'Q3_{}'.format(column)

            # Q1 : First Quartile ., Q3 : Third Quartile
            Q1 = df.approxQuantile(column,[0.25],relativeError=0)
            Q3 = df.approxQuantile(column,[0.75],relativeError=0)
            
            # IQR : Inter Quantile Range
            # We need to define the index [0], as Q1 & Q3 are a set of lists., to perform a mathematical operation
            # Q1 & Q3 are defined seperately so as to have a clear indication on First Quantile & 3rd Quantile
            IQR = Q3[0] - Q1[0]
            
            #selecting the data, with -1.5*IQR to + 1.5*IQR., where param = 1.5 default value
            less_Q1 =  Q1[0] - 1.5*IQR
            more_Q3 =  Q3[0] + 1.5*IQR
            
            isOutlierCol = 'is_outlier_{}'.format(column)
            
            df = df.withColumn(isOutlierCol,when((df[column] > more_Q3) | (df[column] < less_Q1), 1).otherwise(0))
        

        # Selecting the specific columns which we have added above, to check if there are any outliers
        selected_columns = [column for column in df.columns if column.startswith("is_outlier")]
        # Adding all the outlier columns into a new colum "total_outliers", to see the total number of outliers
        df = df.withColumn('total_outliers',reduce(self.column_add, ( df[col] for col in  selected_columns)))

        # Dropping the extra columns created above, just to create nice dataframe., without extra columns
        df = df.drop(*[column for column in df.columns if column.startswith("is_outlier")])

        return df

    def _transform(self, dataset):
        df_outliers = self.find_outliers(dataset, continuous_cols)
        df_no_outliers = df_outliers.filter(df_outliers['total_outliers']<=6) #Drop all rows with more than 6 outliers
        df_no_outliers = df_no_outliers.drop("total_outliers")
        return df_no_outliers    
     
class DropNullRows(Transformer):
    def __init__(self, dropped_null_rows = None):
        super().__init__()
        self.null_cols_drop = dropped_null_rows

    def _transform(self, dataset):
        output_df = dataset.na.drop(subset=self.null_cols_drop)
        return output_df

class FeatureTypeCaster(Transformer): # this transformer will cast the columns as appropriate types  
    def __init__(self):
        super().__init__()

    def _transform(self, dataset):
        output_df = dataset
        for col_name in (continuous_cols + imputed_cols):
            output_df = output_df.withColumn(col_name,col(col_name).cast(DoubleType()))
        return output_df
    
class ColumnDropper(Transformer): # this transformer drops unnecessary columns
    def __init__(self, columns_to_drop = None):
        super().__init__()
        self.columns_to_drop=columns_to_drop
    def _transform(self, dataset):
        output_df = dataset
        for col_name in self.columns_to_drop:
            output_df = output_df.drop(col_name)
        return output_df

In [62]:
def get_preprocessing_pipeline():
    #Drop all initial columns that are not needed
    stage_dropper = ColumnDropper(columns_to_drop = columns_to_drop + null_cols_drop + corr_cols_drop)

    # Stage where columns are casted as appropriate types
    stage_typecaster = FeatureTypeCaster()

    #Engineer features
    stage_maintraits = MergeMainTraits(main_traits_cols)
    trait_cols = ['trait1', 'trait2', 'trait3', 'trait4', 'trait5', 'trait6']

    stage_average_positions = AveragePositionCreator(position_cols)
    avg_position_cols = ['average_val_attacking', 'average_val_midfield', 'average_val_defensive', 'average_val_gk']
   
    stage_average_skills = AverageSkillCreator(skill_cols)
    avg_skill_cols = ["corr_av_skills"]

    #Impute features
    imputed_id_cols = [x+"_imputed" for x in imputed_cols]
    stage_imputer = NullImputer()
    stage_ml_imputer = MLImputer()
    
    #Stage where null rows are dropped based on certain features
    stage_null_dropper = DropNullRows(dropped_null_rows)

    #Stage where outlier rows are dropped for continuous features
    stage_outlier_dropper = DropOutliers(continuous_cols)

    # Stage where nominal columns are transformed to index columns using StringIndexer
    nominal_id_cols = [x+"_index" for x in nominal_cols]
    nominal_onehot_cols = [x+"_encoded" for x in nominal_cols]
    stage_nominal_indexer = StringIndexer(inputCols = nominal_cols, outputCols = nominal_id_cols )

    # Stage where the index columns are further transformed using OneHotEncoder
    stage_nominal_onehot_encoder = OneHotEncoder(inputCols=nominal_id_cols, outputCols=nominal_onehot_cols)

    # Stage where ordinal columns are transformed to index columns using StringIndexer
    ordinal_id_cols = [x+"_index" for x in ordinal_cols]
    stage_ordinal_indexer = StringIndexer(inputCols = ordinal_cols, outputCols = ordinal_id_cols )

    # Stage where all relevant features are assembled into a vector (and dropping a few)
    feature_cols = continuous_cols + nominal_onehot_cols + ordinal_id_cols + trait_cols + avg_position_cols + avg_skill_cols + imputed_id_cols #[imputed_id_cols[1]]
    stage_vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="vectorized_features")

    # Stage where we scale the columns
    stage_scaler = StandardScaler(inputCol= 'vectorized_features', outputCol= 'features')
    
    # Removing all unnecessary columbs, only keeping the 'features' and 'outcome' columns
    stage_column_dropper = ColumnDropper(columns_to_drop = feature_cols + main_traits_cols + 
                                         position_cols + skill_cols + imputed_cols + needed_cols + ordinal_cols + nominal_cols + nominal_id_cols +['vectorized_features'])
    
    # Connect the columns into a pipeline
    '''
    pipeline = Pipeline(stages=[stage_dropper, stage_typecaster, stage_maintraits, stage_average_positions, stage_average_skills,
                                stage_imputer, stage_ml_imputer, stage_null_dropper, stage_outlier_dropper,
                                stage_nominal_indexer, stage_nominal_onehot_encoder, stage_ordinal_indexer,
                                stage_vector_assembler, stage_scaler, stage_column_dropper])
    '''

    pipeline = Pipeline(stages=[stage_dropper, stage_typecaster, stage_maintraits, stage_average_positions, stage_average_skills,
                                stage_imputer, stage_ml_imputer, stage_null_dropper, stage_nominal_indexer, stage_nominal_onehot_encoder, stage_ordinal_indexer,
                                stage_vector_assembler, stage_scaler, stage_column_dropper])
    # pipeline = Pipeline(stages=[stage_ml_imputer])

    
    return pipeline

In [63]:
df = read_from_spark(spark, db_properties)

In [64]:
pipeline = get_preprocessing_pipeline()
preprocess_model = pipeline.fit(df)
df_clean = preprocess_model.transform(df)
df_clean.show()

[Stage 1503:>                                                       (0 + 1) / 1]

+-------+--------------------+
|overall|            features|
+-------+--------------------+
|     69|[11.1535470006512...|
|     69|[11.4722197720984...|
|     64|[10.9942106149276...|
|     69|[10.9942106149276...|
|     86|[14.0216019436758...|
|     86|[14.4996111008465...|
|     66|[12.5875744721635...|
|     85|[13.5435927865050...|
|     63|[10.6755378434804...|
|     69|[11.4722197720984...|
|     64|[11.7908925435455...|
|     69|[11.4722197720984...|
|     58|[11.1535470006512...|
|     64|[12.2689017007163...|
|     65|[10.8348742292040...|
|     80|[12.7469108578871...|
|     80|[12.7469108578871...|
|     68|[10.8348742292040...|
|     78|[12.4282380864399...|
|     63|[10.0381923005861...|
+-------+--------------------+
only showing top 20 rows



                                                                                

In [65]:
df_clean = df_clean.withColumn('overall', col('overall').cast(IntegerType()))

<h1>SparkML</h1>

In [66]:
from pyspark.ml.regression import LinearRegression as spark_LR, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [67]:
#80/20 split into Train and Test
train_data, test_data = df_clean.randomSplit([0.8, 0.2], seed=42)

In [68]:
from pyspark.ml.regression import DecisionTreeRegressor

def get_ML_pipeline(ML_model):
    if ML_model == "LR":
        ml = spark_LR(featuresCol='features', labelCol='overall')
        
        paramGrid = (ParamGridBuilder()
                .addGrid(ml.regParam, [0.01, 0.5, 2.0])
                .addGrid(ml.maxIter, [1, 5, 10])
                .build())
        
    elif ML_model == "DT":
        ml = DecisionTreeRegressor(featuresCol='features', labelCol='overall')

        paramGrid = (ParamGridBuilder()
                .addGrid(ml.minInfoGain, [0.0, 0.1, 0.2])
                .addGrid(ml.minInstancesPerNode, [1, 2])
                .build())
    
    evaluator = RegressionEvaluator(predictionCol='prediction', 
            labelCol='overall', metricName='r2')
    
    stage_ML = CrossValidator(estimator=ml, estimatorParamMaps=paramGrid, 
                           evaluator=evaluator, numFolds=5)
    
    pipeline = Pipeline(stages=[stage_ML])

    return pipeline

<h3>Code for training, hyperparametertuning, testing</h3>

In [None]:
ML_model = ["DT", "LR"]

evaluator = RegressionEvaluator(predictionCol='prediction', 
            labelCol='overall', metricName='r2')

for model in ML_model:
    ML_pipeline = get_ML_pipeline(model)
    ML_model_pipeline = ML_pipeline.fit(train_data) #training
    model_df_train = ML_model_pipeline.transform(train_data) #train predictions
    model_df_test = ML_model_pipeline.transform(test_data)  #test predictions

    r2_train = evaluator.evaluate(model_df_train)
    r2_test = evaluator.evaluate(model_df_test)
    print(f"{model} train r2 score: {r2_train}")
    print(f"{model} test r2 score: {r2_test}")

    best_model = ML_model_pipeline.stages[-1].bestModel
    best_params = best_model.extractParamMap()
    print(f"Best {model} parameters: {best_params}")


                                                                                

DT train r2 score: 93.05%
DT test r2 score: 92.87%
Best DT parameters: {Param(parent='DecisionTreeRegressor_75bf3b076d8c', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='DecisionTreeRegressor_75bf3b076d8c', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='DecisionTreeRegressor_75bf3b076d8c', name='featuresCol', doc='features column name.'): 'features', Param(parent='DecisionTreeRegressor_75bf3b076d8c', name='impurity', doc='Criterion used for information gain calculat

[Stage 2849:>                                                       (0 + 1) / 1]

LR train r2 score: 93.59%
LR test r2 score: 93.53%
Best LR parameters: {Param(parent='LinearRegression_415e5ac84707', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2, Param(parent='LinearRegression_415e5ac84707', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LinearRegression_415e5ac84707', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'): 1.35, Param(parent='LinearRegression_415e5ac84707', name='featuresCol', doc='features column name.'): 'features', Param(parent='LinearRegression_415e5ac84707', name='fitIntercept', doc='whether to fit an intercept term.'): True, Param(parent='LinearRegression_415e5ac84707', name='labelCol', doc='label column name.'): 'overall', Param(parent='LinearRegression_415e5ac84707', name='loss', doc='The loss function to be 

                                                                                

<h3>Output after ~10mins</h3>

In [None]:
ML_model = ["DT", "LR"]

evaluator = RegressionEvaluator(predictionCol='prediction', 
            labelCol='overall', metricName='r2')

for model in ML_model:
    ML_pipeline = get_ML_pipeline(model)
    ML_model_pipeline = ML_pipeline.fit(train_data) #training
    model_df_train = ML_model_pipeline.transform(train_data) #train predictions
    model_df_test = ML_model_pipeline.transform(test_data)  #test predictions

    r2_train = evaluator.evaluate(model_df_train)
    r2_test = evaluator.evaluate(model_df_test)
    print(f"{model} train r2 score: {r2_train}")
    print(f"{model} test r2 score: {r2_test}")

    best_model = ML_model_pipeline.stages[-1].bestModel
    best_params = best_model.extractParamMap()
    print(f"Best {model} parameters: {best_params}")


                                                                                

DT train r2 score: 93.05%
DT test r2 score: 92.87%
Best DT parameters: {Param(parent='DecisionTreeRegressor_ab8a1d978e02', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='DecisionTreeRegressor_ab8a1d978e02', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='DecisionTreeRegressor_ab8a1d978e02', name='featuresCol', doc='features column name.'): 'features', Param(parent='DecisionTreeRegressor_ab8a1d978e02', name='impurity', doc='Criterion used for information gain calculat

24/11/14 12:53:49 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/11/14 12:53:49 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/11/14 12:53:49 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
[Stage 1418:>                                                       (0 + 1) / 1]

LR train r2 score: 93.59%
LR test r2 score: 93.53%
Best LR parameters: {Param(parent='LinearRegression_331661238a01', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2, Param(parent='LinearRegression_331661238a01', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0, Param(parent='LinearRegression_331661238a01', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'): 1.35, Param(parent='LinearRegression_331661238a01', name='featuresCol', doc='features column name.'): 'features', Param(parent='LinearRegression_331661238a01', name='fitIntercept', doc='whether to fit an intercept term.'): True, Param(parent='LinearRegression_331661238a01', name='labelCol', doc='label column name.'): 'overall', Param(parent='LinearRegression_331661238a01', name='loss', doc='The loss function to be 

                                                                                

<h1>Pytorch code</h1>

In [70]:
s = df_clean.toPandas()

                                                                                

In [71]:
# Check if MPS is available
if torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")
print(f"Using device: {device}")

Using device: mps


In [72]:
# Separate features and target
X = s.drop('overall', axis=1)
y = s['overall']
# Split the data into train+validation and test sets
X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Further split train+validation into train and validation sets
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.2, random_state=42)

In [73]:
X_train = torch.from_numpy(np.array(X_train['features'].values.tolist(), np.float32))
X_val = torch.from_numpy(np.array(X_val['features'].values.tolist(), np.float32))
X_test = torch.from_numpy(np.array(X_test['features'].values.tolist(), np.float32))
y_train = torch.from_numpy(np.array(y_train, np.float32))
y_val = torch.from_numpy(np.array(y_val, np.float32))
y_test = torch.from_numpy(np.array(y_test, np.float32))

In [74]:
# Convert to PyTorch tensors
X_train_tensor = torch.FloatTensor(X_train).to(device)
y_train_tensor = torch.FloatTensor(y_train).reshape(-1, 1).to(device)
X_val_tensor = torch.FloatTensor(X_val).to(device)
y_val_tensor = torch.FloatTensor(y_val).reshape(-1, 1).to(device)
X_test_tensor = torch.FloatTensor(X_test).to(device)
y_test_tensor = torch.FloatTensor(y_test).reshape(-1, 1).to(device)

In [75]:
# Create DataLoaders
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

In [76]:
# Model 1: Multi-layer Perceptron (MLP)
class MLP(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, 1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

In [77]:
# Linear Regression Model
class LinearRegression(nn.Module):
    def __init__(self, input_size):
        super(LinearRegression, self).__init__()
        self.linear = nn.Linear(input_size, 1)

    def forward(self, x):
        return self.linear(x)

In [78]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=100):
    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        train_true = []
        train_pred = []
        
        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            
            train_true.extend(targets.cpu().numpy())
            train_pred.extend(outputs.detach().cpu().numpy())

        model.eval()
        val_loss = 0.0
        val_true = []
        val_pred = []
        
        with torch.no_grad():
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = model(inputs)
                val_loss += criterion(outputs, targets).item()
                
                val_true.extend(targets.cpu().numpy())
                val_pred.extend(outputs.cpu().numpy())

        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        
        train_r2 = r2_score(train_true, train_pred)
        val_r2 = r2_score(val_true, val_pred)
        
        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], '
                  f'Train Loss: {train_loss:.4f}, Train R2: {train_r2:.4f}, '
                  f'Val Loss: {val_loss:.4f}, Val R2: {val_r2:.4f}')
    
    return val_loss, val_r2

In [79]:
from itertools import product

In [80]:
def tune_hyperparameters(model_class, param_grid):
    best_val_loss = float('inf')
    best_params = None
    best_model = None
    best_val_r2 = -float('inf')

    for params in product(*param_grid.values()):
        current_params = dict(zip(param_grid.keys(), params))
        print(f"Training with parameters: {current_params}")

        if model_class == MLP:
            model = model_class(input_size=X_train.shape[1], hidden_size=current_params['hidden_size'])
        elif model_class == LinearRegression:
            model = model_class(input_size=X_train.shape[1])
        else:
            raise ValueError("Unsupported model class")

        optimizer = optim.Adam(model.parameters(), lr=current_params['lr'])
        criterion = nn.MSELoss()
        train_loader = DataLoader(train_dataset, batch_size=current_params['batch_size'], shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=current_params['batch_size'])

        val_loss, val_r2 = train_model(model, train_loader, val_loader, criterion, optimizer)

        if val_loss < best_val_loss:
            best_val_r2 = val_r2
            best_val_loss = val_loss
            best_params = current_params
            best_model = model.state_dict()

    return best_params, best_val_loss, best_model, best_val_r2

In [81]:
def test_function(model_class):
    input_size = X_test_tensor.shape[1]
    if model_class == "LR":
        model = LinearRegression(input_size)
        model.load_state_dict(torch.load('./best_linear_model.pth'))
    elif model_class == "MLP":
        model = MLP(input_size, mlp_best_params['hidden_size'])
        model.load_state_dict(torch.load('./best_mlp_model.pth'))
    model.to(device)
    model.eval()
    with torch.no_grad():
        y_pred = model(X_test_tensor)
    y_pred_np = y_pred.cpu().numpy()
    y_test_np = y_test_tensor.cpu().numpy()
    
    r2 = r2_score(y_test_np, y_pred_np)
    mse = mean_squared_error(y_test_np, y_pred_np)
    rmse = np.sqrt(mse)
    
    print(f"R2 Score: {r2:.4f}")
    print(f"Mean Squared Error: {mse:.4f}")
    print(f"Root Mean Squared Error: {rmse:.4f}")

In [82]:
# Define hyperparameter grids
mlp_param_grid = {
    'lr': [0.001, 0.01],
    'batch_size': [32, 64],
    'hidden_size': [32, 64]
}
linear_param_grid = {
    'lr': [0.001, 0.01],
    'batch_size': [32, 64]
}

<h3>To just start hyperprameter tuning</h3>

In [None]:
# Tune and train models
print("Tuning MLP model...")
mlp_best_params, mlp_best_loss, mlp_best_model, mlp_best_r2 = tune_hyperparameters(MLP, mlp_param_grid)
print(f"Best MLP parameters: {mlp_best_params}")
print(f"Best MLP validation loss: {mlp_best_loss}")
print(f"Best MLP validation r2: {mlp_best_r2}")

<h3>These are the results you get after ~1.5hrs</h3>

In [38]:
# Tune and train models
print("Tuning MLP model...")
mlp_best_params, mlp_best_loss, mlp_best_model, mlp_best_r2 = tune_hyperparameters(MLP, mlp_param_grid)
print(f"Best MLP parameters: {mlp_best_params}")
print(f"Best MLP validation loss: {mlp_best_loss}")
print(f"Best MLP validation r2: {mlp_best_r2}")

Tuning MLP model...
Training with parameters: {'lr': 0.001, 'batch_size': 32, 'hidden_size': 32}
Epoch [10/100], Train Loss: 1.1208, Train R2: 0.9776, Val Loss: 1.0210, Val R2: 0.9794
Epoch [20/100], Train Loss: 0.9914, Train R2: 0.9802, Val Loss: 0.9165, Val R2: 0.9815
Epoch [30/100], Train Loss: 0.9536, Train R2: 0.9810, Val Loss: 1.0133, Val R2: 0.9796
Epoch [40/100], Train Loss: 0.9319, Train R2: 0.9814, Val Loss: 1.0564, Val R2: 0.9787
Epoch [50/100], Train Loss: 0.9126, Train R2: 0.9818, Val Loss: 0.8816, Val R2: 0.9822
Epoch [60/100], Train Loss: 0.8908, Train R2: 0.9822, Val Loss: 0.9425, Val R2: 0.9810
Epoch [70/100], Train Loss: 0.8801, Train R2: 0.9824, Val Loss: 0.8150, Val R2: 0.9836
Epoch [80/100], Train Loss: 0.8693, Train R2: 0.9827, Val Loss: 0.9018, Val R2: 0.9818
Epoch [90/100], Train Loss: 0.8462, Train R2: 0.9831, Val Loss: 0.8127, Val R2: 0.9836
Epoch [100/100], Train Loss: 0.8287, Train R2: 0.9835, Val Loss: 0.7972, Val R2: 0.9839
Training with parameters: {'lr':

<h3>Hyperparameter tuning</h3>

In [None]:
print("\nTuning Linear Regression model...")
linear_best_params, linear_best_loss, linear_best_model, linear_best_r2 = tune_hyperparameters(LinearRegression, linear_param_grid)
print(f"Best Linear Regression parameters: {linear_best_params}")
print(f"Best Linear Regression validation loss: {linear_best_loss}")
print(f"Best Linear Regression validation R2: {linear_best_r2}")

<h3>Output after hyperparameter tuning ~40mins</h3>

In [39]:
print("\nTuning Linear Regression model...")
linear_best_params, linear_best_loss, linear_best_model, linear_best_r2 = tune_hyperparameters(LinearRegression, linear_param_grid)
print(f"Best Linear Regression parameters: {linear_best_params}")
print(f"Best Linear Regression validation loss: {linear_best_loss}")
print(f"Best Linear Regression validation R2: {linear_best_r2}")


Tuning Linear Regression model...
Training with parameters: {'lr': 0.001, 'batch_size': 32}
Epoch [10/100], Train Loss: 3.5918, Train R2: 0.9284, Val Loss: 3.5292, Val R2: 0.9289
Epoch [20/100], Train Loss: 3.3189, Train R2: 0.9338, Val Loss: 3.2670, Val R2: 0.9341
Epoch [30/100], Train Loss: 3.2876, Train R2: 0.9344, Val Loss: 3.2376, Val R2: 0.9347
Epoch [40/100], Train Loss: 3.2846, Train R2: 0.9345, Val Loss: 3.2352, Val R2: 0.9348
Epoch [50/100], Train Loss: 3.2819, Train R2: 0.9345, Val Loss: 3.2496, Val R2: 0.9345
Epoch [60/100], Train Loss: 3.2777, Train R2: 0.9346, Val Loss: 3.2134, Val R2: 0.9352
Epoch [70/100], Train Loss: 3.2796, Train R2: 0.9346, Val Loss: 3.2179, Val R2: 0.9351
Epoch [80/100], Train Loss: 3.2796, Train R2: 0.9346, Val Loss: 3.2810, Val R2: 0.9339
Epoch [90/100], Train Loss: 3.2742, Train R2: 0.9347, Val Loss: 3.2217, Val R2: 0.9351
Epoch [100/100], Train Loss: 3.2748, Train R2: 0.9347, Val Loss: 3.2043, Val R2: 0.9354
Training with parameters: {'lr': 0.0

<h3>Save the best models (trained and tuned models are saved on github)</h3>

In [40]:
# Save best models
torch.save(mlp_best_model, 'best_mlp_model.pth')
torch.save(linear_best_model, 'best_linear_model.pth')

<h3>Test results using best models</h3>

In [41]:
test_function("MLP")

R2 Score: 0.9884
Mean Squared Error: 0.5814
Root Mean Squared Error: 0.7625


In [42]:
test_function("LR")

R2 Score: 0.9350
Mean Squared Error: 3.2618
Root Mean Squared Error: 1.8060
