## Task-I: Build and populate necessary tables (30% of course project grade)

In [1]:
# Uncomment the following lines if you are using Windows
import findspark
findspark.init()
findspark.find()

import pyspark

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("SystemsToolChains") \
    .getOrCreate()

In [None]:
# do this instead the upper section if running on the cloud
"""
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, SparkSession

appName = "fifa"
master = "yarn"

conf = SparkConf().set('spark.driver.host', '127.0.0.1')\
                  .set("spark.jars.packages", "org.postgresql:postgresql:42.2.18")\
                  .setAppName(appName)\
                  .setMaster(master)

sc = SparkContext.getOrCreate(conf=conf)

spark = SparkSession.builder.config(conf=sc.getConf()).getOrCreate()
"""

In [3]:
# Uncomment the following lines if you are running on GCP
# change hdfs://cluster-2ca4-m/ based on your cluster node
# !hdfs dfs -put "/archive" hdfs://cluster-2ca4-m/

In [4]:
file_paths = []
for year in range(15, 23):
    file_path = f"archive/players_{year}.csv"
    file_paths.append(file_path)

In [8]:
from pyspark.sql.functions import *
from functools import reduce
data_frames = []

for file_path in file_paths:
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    
    # Add a new column for the year.
    year = int(file_path.split("_")[1].split(".")[0])
    df = df.withColumn('year', lit(year))
    
    # for test
    # df.show(1, vertical = True)
    
    data_frames.append(df)

merged_df = reduce(lambda x, y: x.union(y), data_frames)

# Add a column unique_id so that every record can be uniquely identified
merged_df = merged_df.withColumn("unique_id", monotonically_increasing_id())

merged_df.show(1, vertical = True)

-RECORD 0-------------------------------------------
 sofifa_id                   | 158023               
 player_url                  | https://sofifa.co... 
 short_name                  | L. Messi             
 long_name                   | Lionel Andrés Mes... 
 player_positions            | CF                   
 overall                     | 93                   
 potential                   | 95                   
 value_eur                   | 1.005E8              
 wage_eur                    | 550000.0             
 age                         | 27                   
 dob                         | 1987-06-24           
 height_cm                   | 169                  
 weight_kg                   | 67                   
 club_team_id                | 241.0                
 club_name                   | FC Barcelona         
 league_name                 | Spain Primera Div... 
 league_level                | 1                    
 club_position               | CF             

In [9]:
# Ingest the data from all years (2015-2022) into one Postgres Database table
# change postgres_url based on your GCP PostgresSQL, change the user and password based on your own settings
postgres_url = "jdbc:postgresql://localhost:5432/postgres"
properties = {
    "user": "postgres",
    "password": "123",
    "driver": "org.postgresql.Driver"
}

table_name = "fifa.fifa_table" 

merged_df.write.jdbc(url=postgres_url, table=table_name, mode="overwrite", properties=properties)

## Task-II: Conduct analytics on your dataset (20% of course project grade)

In [10]:
# Ingest the data from all years (2015-2022) into one Postgres Database table
# change postgres_url based on your GCP PostgresSQL, change the user and password based on your own settings
from pyspark.sql.functions import col, when, count, avg, desc, dense_rank, rank, row_number
from pyspark.sql.window import Window

# Read data from PostgreSQL
jdbc_url = "jdbc:postgresql://localhost:5432/postgres"
db_properties = {"user": "postgres", "password": "123", "driver": "org.postgresql.Driver"}
df = spark.read.jdbc(url=jdbc_url, table="fifa.fifa_table", properties=db_properties)

In [11]:
def find_x(x):

    # players whose contracts end in 2023
    filtered_df = df.filter(df.year == 22)
    filtered_df = filtered_df.filter(df.club_contract_valid_until == 2023)

    # Task 1: X clubs with the highest number of players with contracts ending in 2023
    x_clubs = filtered_df.groupBy("club_name").agg(count("club_name").alias("player_count")) \
        .orderBy(desc("player_count")).limit(x)
    print(str(x)+' clubs that have the highest number of players with contracts ending in 2023')
    x_clubs.show()
    
find_x(5)

5 clubs that have the highest number of players with contracts ending in 2023
+--------------------+------------+
|           club_name|player_count|
+--------------------+------------+
|En Avant de Guingamp|          19|
| Club Atlético Lanús|          17|
|       Lechia Gdańsk|          17|
|            Barnsley|          16|
|        Kasimpaşa SK|          16|
+--------------------+------------+



In [12]:
def find_y(y):
    # drop row with club_name is null
    filtered_df = df.filter(col("club_name").isNotNull())
    
    # players whose age > 27
    older_players = filtered_df.filter(col("age") > 27)

    # players whose age > 27 for each club per year
    club_per_year = older_players.groupBy("club_name", "year").agg(count("sofifa_id").alias("older_players_count"))

    # average count per club 
    # Notice: taking into account clubs not available in certain years
    avg_club_counts = club_per_year.groupBy("club_name").agg(avg("older_players_count").alias("average_older_players"))

    # rank clubs based on average count
    window_spec = Window.orderBy(desc("average_older_players"))
    ranked_clubs = avg_club_counts.withColumn("rank", rank().over(window_spec))
    top_y_clubs = ranked_clubs.filter(col("rank") <= y)
    print('Top {} clubs with the highest average number of players older than 27 years:'.format(y))
    top_y_clubs.show()

find_y(5)

Top 5 clubs with the highest average number of players older than 27 years:
+--------------------+---------------------+----+
|           club_name|average_older_players|rank|
+--------------------+---------------------+----+
|  Dorados de Sinaloa|                 19.0|   1|
| Matsumoto Yamaga FC|                 19.0|   1|
| Shanghai Shenhua FC|                 18.5|   3|
|          Qingdao FC|                 18.0|   4|
|Club Deportivo Jo...|                 17.5|   5|
+--------------------+---------------------+----+



In [13]:
def find_frequent_nation_position(substitute):
    # drop row with nation_position is SUB (substitute)
    if substitute:
        filtered_df = df
    else:
        filtered_df = df.filter(col("nation_position") != "SUB")
    
    window_spec = Window.partitionBy("year").orderBy(desc("count"))

    # count nation_position per year
    position_freq = filtered_df.groupBy("year", "nation_position").agg(count("nation_position").alias("count"))
    
    # rank frequency per year
    position_freq_ranked = position_freq.withColumn("rank", rank().over(window_spec))
    most_frequent_positions = position_freq_ranked.filter(col("rank") == 1).select("year", "nation_position", "count").orderBy("year")
    
    print('The most frequent nation_position in the dataset for each year', end = " -- ")
    if(substitute):
        print("with substitute")
    else:
        print("without substitute")
    most_frequent_positions.show(most_frequent_positions.count()) # To display the complete data

find_frequent_nation_position(True)
find_frequent_nation_position(False)

The most frequent nation_position in the dataset for each year -- with substitute
+----+---------------+-----+
|year|nation_position|count|
+----+---------------+-----+
|  15|            SUB|  564|
|  16|            SUB|  511|
|  17|            SUB|  564|
|  18|            SUB|  600|
|  19|            SUB|  576|
|  20|            SUB|  588|
|  21|            SUB|  588|
|  22|            SUB|  396|
+----+---------------+-----+

The most frequent nation_position in the dataset for each year -- without substitute
+----+---------------+-----+
|year|nation_position|count|
+----+---------------+-----+
|  15|            LCB|   47|
|  15|            RCB|   47|
|  15|             GK|   47|
|  16|            RCB|   46|
|  16|             GK|   46|
|  17|             GK|   47|
|  17|            LCB|   47|
|  17|            RCB|   47|
|  18|            LCB|   50|
|  18|             GK|   50|
|  18|            RCB|   50|
|  19|            RCB|   48|
|  19|             GK|   48|
|  19|            LC

## Task III

In [14]:
skill_feature1 = ['st', 'rs','lw','lf','cf', 'rf','rw',
              'lam','cam','ram','lm','lcm','cm','rcm',
              'rm','lwb','ldm','cdm','rdm','rwb','lb',
              'lcb','cb','rcb','rb','gk']
skill_feature2 = ['pace', 'shooting', 'passing', 'dribbling', 'defending', 'physic', 'attacking_crossing',
                 'attacking_finishing', 'attacking_heading_accuracy', 'attacking_short_passing', 'attacking_volleys',
                 'skill_dribbling', 'skill_curve', 'skill_fk_accuracy', 'skill_long_passing', 'skill_ball_control',
                 'movement_acceleration', 'movement_sprint_speed', 'movement_agility', 'movement_reactions',
                 'movement_balance', 'power_shot_power', 'power_jumping', 'power_stamina', 'power_strength',
                 'power_long_shots', 'mentality_aggression', 'mentality_interceptions', 'mentality_positioning',
                 'mentality_vision', 'mentality_penalties', 'defending_marking_awareness', 'defending_standing_tackle',
                 'defending_sliding_tackle', 'goalkeeping_diving', 'goalkeeping_handling', 'goalkeeping_kicking',
                 'goalkeeping_positioning', 'goalkeeping_reflexes', 'goalkeeping_speed']
feature_list = skill_feature1 + skill_feature2
overall_list = feature_list + ['overall']
player_df = merged_df[overall_list]

In [15]:
missing_count = player_df.select(*[(sum(col(c).isNull().cast("int")).alias(c + "_missing")) for c in player_df.columns])

missing_count.show(vertical = True)

-RECORD 0-------------------------------------
 st_missing                          | 0      
 rs_missing                          | 0      
 lw_missing                          | 0      
 lf_missing                          | 0      
 cf_missing                          | 0      
 rf_missing                          | 0      
 rw_missing                          | 0      
 lam_missing                         | 0      
 cam_missing                         | 0      
 ram_missing                         | 0      
 lm_missing                          | 0      
 lcm_missing                         | 0      
 cm_missing                          | 0      
 rcm_missing                         | 0      
 rm_missing                          | 0      
 lwb_missing                         | 0      
 ldm_missing                         | 0      
 cdm_missing                         | 0      
 rdm_missing                         | 0      
 rwb_missing                         | 0      
 lb_missing  

In [16]:
columns_to_drop = ['goalkeeping_speed']
columns_to_fill = ['pace', 'shooting', 'passing', 'dribbling', 'defending', 'physic']

feature_list.remove('goalkeeping_speed')
overall_list.remove('goalkeeping_speed')

In [17]:
from pyspark.ml import Pipeline,Transformer
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import Imputer,StandardScaler,StringIndexer,OneHotEncoder, VectorAssembler
class FeatureTypeCaster(Transformer):
    def __init__(self):
        super().__init__()

    def _transform(self, dataset):
        
        output_df = dataset
        for i in skill_feature1:
            output_df = output_df.withColumn(i, split(output_df[i], "\\+")[0])
            output_df = output_df.withColumn(i, split(output_df[i], "\\-")[0])
            output_df = output_df.withColumn(i, output_df[i].cast("int"))
            
        return output_df

class ColumnDropper(Transformer):
    def __init__(self, columns_to_drop = None):
        super().__init__()
        self.columns_to_drop = columns_to_drop
    def _transform(self, dataset):
        output_df = dataset
        for col_name in self.columns_to_drop:
            output_df = output_df.drop(col_name)
        return output_df

class ColumnFillna(Transformer):
    def __init__(self, columns_to_fill = None):
        super().__init__()
        self.columns_to_fill = columns_to_fill

    def _transform(self, dataset):
        output_df = dataset
        for col_name in self.columns_to_fill:
            mean_value = output_df.select(mean(col_name)).collect()[0][0]
            output_df = output_df.fillna(mean_value, subset=[col_name])
        

        return output_df

def get_preprocess_pipeline():
    # Cast the feature
    stage_typecaster = FeatureTypeCaster()
    
    # Scale the features
    stage_scaler = StandardScaler(inputCol= 'vectorized_features', outputCol= 'features')

    stage_column_dropper = ColumnDropper(columns_to_drop = columns_to_drop)
    
    stage_column_fillna = ColumnFillna(columns_to_fill = columns_to_fill)
    
    stage_vector_assembler = VectorAssembler(inputCols=feature_list, outputCol="vectorized_features")
    
    # Connect the pipeline
    pipeline = Pipeline(stages=[stage_typecaster,stage_column_dropper,stage_column_fillna,
        stage_vector_assembler, stage_scaler])
    return pipeline

In [18]:
preprocess_pipeline = get_preprocess_pipeline()
preprocess_pipeline_model = preprocess_pipeline.fit(player_df)
players = preprocess_pipeline_model.transform(player_df)
players = players['features', 'overall']
players.show(1, vertical = True)

-RECORD 0------------------------
 features | [6.58435804760700... 
 overall  | 93                   
only showing top 1 row



In [19]:
# feature column: feature_list
# output column: overall

seed = 2023
train_df, val_df, test_df = players.randomSplit([0.6, 0.2, 0.2], seed=seed)

In [20]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder

lr = LinearRegression(featuresCol='features', labelCol='overall')

paramGrid = (ParamGridBuilder()
               .addGrid(lr.regParam, [0.01, 0.1, 1.0]) 
               .addGrid(lr.maxIter, [10, 30, 50])
               .build())

evaluator = RegressionEvaluator(labelCol="overall", predictionCol="prediction", metricName="rmse")

lr_best_rmse = float("inf")
lr_best_model = None
lr_best_params = None


for params in paramGrid:
    # set params
    param_dict = {param.name: val for param, val in params.items()}
    
    lr.setParams(**param_dict)
    
    model = lr.fit(train_df)

    val_predictions = model.transform(val_df)
    val_rmse = evaluator.evaluate(val_predictions)
    print(f"Params: {param_dict}，Validation RMSE: {val_rmse}")

    # update best params and model
    if val_rmse < lr_best_rmse:
        lr_best_rmse = val_rmse
        lr_best_model = model
        lr_best_params = param_dict

print(f"Linear Regression Best Params: {lr_best_params}, Linear Regression Best Validation RMSE: {lr_best_rmse}")

Params: {'regParam': 0.01, 'maxIter': 10}，Validation RMSE: 2.7936499275036044
Params: {'regParam': 0.01, 'maxIter': 30}，Validation RMSE: 2.793649927503598
Params: {'regParam': 0.01, 'maxIter': 50}，Validation RMSE: 2.7936499275035835
Params: {'regParam': 0.1, 'maxIter': 10}，Validation RMSE: 2.8054156039487985
Params: {'regParam': 0.1, 'maxIter': 30}，Validation RMSE: 2.8054156039488265
Params: {'regParam': 0.1, 'maxIter': 50}，Validation RMSE: 2.805415603948817
Params: {'regParam': 1.0, 'maxIter': 10}，Validation RMSE: 2.913144924746847
Params: {'regParam': 1.0, 'maxIter': 30}，Validation RMSE: 2.9131449247468506
Params: {'regParam': 1.0, 'maxIter': 50}，Validation RMSE: 2.9131449247468195
Linear Regression Best Params: {'regParam': 0.01, 'maxIter': 50}, Linear Regression Best Validation RMSE: 2.7936499275035835


In [None]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder

gbt = GBTRegressor(featuresCol='features', labelCol='overall')

paramGrid = (ParamGridBuilder()
               .addGrid(gbt.maxDepth, [4, 5, 6])
               .addGrid(gbt.maxIter, [10, 20, 30])
               .build())

evaluator = RegressionEvaluator(labelCol="overall", predictionCol="prediction", metricName="rmse")

gbt_best_rmse = float("inf")
gbt_best_model = None
gbt_best_params = None

for params in paramGrid:
    # set params
    param_dict = {param.name: val for param, val in params.items()}
    
    gbt.setParams(**param_dict)
    
    model = gbt.fit(train_df)

    val_predictions = model.transform(val_df)
    val_rmse = evaluator.evaluate(val_predictions)
    print(f"Params: {param_dict}，Validation RMSE: {val_rmse}")

    # update best params and model
    if val_rmse < gbt_best_rmse:
        gbt_best_rmse = val_rmse
        gbt_best_model = model
        gbt_best_params = param_dict

print(f"GBT Regression Best Params: {gbt_best_params}, GBT Regression Best Validation RMSE: {gbt_best_rmse}")

Params: {'maxDepth': 4, 'maxIter': 10}，Validation RMSE: 2.389552564880566
Params: {'maxDepth': 4, 'maxIter': 20}，Validation RMSE: 2.075372082819166
Params: {'maxDepth': 4, 'maxIter': 30}，Validation RMSE: 1.8598984942204393
Params: {'maxDepth': 5, 'maxIter': 10}，Validation RMSE: 2.0075485757555813
Params: {'maxDepth': 5, 'maxIter': 20}，Validation RMSE: 1.7089747401330297
Params: {'maxDepth': 5, 'maxIter': 30}，Validation RMSE: 1.5707517706392125
Params: {'maxDepth': 6, 'maxIter': 10}，Validation RMSE: 1.7870962238669803
Params: {'maxDepth': 6, 'maxIter': 20}，Validation RMSE: 1.5715816121622939


In [None]:
test_predictions = gbt_best_model.transform(test_df)

test_rmse = evaluator.evaluate(test_predictions)

print(f"Test RMSE: {test_rmse}")