In [77]:
import findspark
findspark.init()
findspark.find()

import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml import Pipeline,Transformer
from pyspark.ml.feature import Imputer,StandardScaler,StringIndexer,OneHotEncoder, VectorAssembler

from pyspark.sql.functions import col, count, when, corr, SparkContext
from pyspark.sql.types import *
import numpy as np
import pandas as pd

appName = "Big Data Analytics"
master = "local"

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
    .set('spark.driver.host','127.0.0.1')\
    .setAppName(appName)\
    .setMaster(master)

# Create Spark Context with the new configurations rather than relying on the default one
sc = SparkContext.getOrCreate(conf=conf)

spark = (
    SparkSession.builder
    .appName("Big Data Analytics")
    .master("local")
    .config("spark.driver.host", '127.0.0.1')
    .getOrCreate()
)

columns_to_drop = ['sofifa_id', 'player_url', 'short_name', 'long_name', 'player_face_url', 'club_logo_url', 'club_flag_url', 'nation_logo_url', 'nation_flag_url', 'UID', 'Year'] # Identifiers and URLs
columns_to_drop = columns_to_drop + ['club_team_id', 'club_name', 'league_name', 'nationality_id', 'nationality_name', 'nation_team_id'] # Club and nation names
columns_to_drop = columns_to_drop + ['dob', 'club_joined', 'club_contract_valid_until', 'club_loaned_from'] # Personal Details
columns_to_drop = columns_to_drop + ['club_position', 'club_jersey_number', 'nation_position', 'nation_jersey_number', 'player_positions'] # Positional Details
columns_to_drop = columns_to_drop + ['real_face', 'player_tags', 'player_traits'] # Aesthetic Details
columns_to_drop = columns_to_drop + ['ls', 'st', 'rs', 'lw', 'lf', 'cf', 'rf', 'rw', 'lam', 'cam', 'ram', 'lm', 'lcm', 'cm', 'rcm', 'rm', 'lwb', 'ldm', 'cdm', 'rdm', 'rwb', 'lb', 'lcb', 'cb', 'rcb', 'rb', 'gk'] # Derived Positional Details
columns_to_drop = columns_to_drop + ['potential', 'value_eur', 'wage_eur'] # Highly correlated with overall

def fifa_pipeline(dataset):
    from pyspark.ml.feature import VectorAssembler
    dataset = dataset.drop(*columns_to_drop)
    
    null_count = dataset.select([count(when(col(c).isNull(), c)).alias(c) for c in dataset.columns]).toPandas()
    drop_columns = null_count.columns[null_count.iloc[0] > (0.2 * dataset.count())]
    dataset = dataset.drop(*drop_columns)
    dropna_col = null_count.columns[(null_count.iloc[0] > 0) & (null_count.iloc[0] < 10000)]
    dataset = dataset.dropna(subset=dropna_col.tolist())

    impute_cols = null_count.columns[(null_count.iloc[0] > 10000) & (null_count.iloc[0] < 20000)]
    dataset = dataset.withColumn('is_gk', when(dataset[impute_cols[0]].isNull(), 1).otherwise(0))
    for cols in impute_cols:
        dataset = dataset.na.fill(0, subset = [cols])

    string_cols = dataset.select([c for c,t in dataset.dtypes if t in ['string']]).columns
    indexers = [StringIndexer(inputCol=column, outputCol=column + "_index") for column in string_cols]
    encoders = [OneHotEncoder(inputCol=column + "_index", outputCol=column + "_vec") for column in string_cols]
    pipeline = Pipeline(stages=indexers + encoders)

    dataset = pipeline.fit(dataset).transform(dataset)
    dataset = dataset.drop(*string_cols)
    
    vec_cols = [c+"_vec" for c in string_cols]
    index_cols = [c+"_index" for c in string_cols]
    string_cols = string_cols + vec_cols + index_cols + ['overall']

    numerical_cols = dataset.drop(*string_cols).columns
    feature_correlation_with_target = {column: dataset.stat.corr(column, "overall") for column in numerical_cols}
    selected_features = []
    for column, corr_value in feature_correlation_with_target.items():
        if __builtins__.abs(corr_value) > 0.4:
            selected_features.append(column)

    corr_matrix = pd.DataFrame(columns=selected_features, index=selected_features)

    for col1 in selected_features:
        for col2 in selected_features:
            corr_value = dataset.select(corr(col1, col2)).collect()[0][0]
            corr_matrix.loc[col1, col2] = corr_value

    correlated_groups = []
    for col1 in corr_matrix.columns:
        for col2 in corr_matrix.columns:
            if col1 != col2 and (__builtins__.abs(corr_matrix.loc[col1, col2]) > 0.8):
                # Check if the pair is already in any of the groups
                already_grouped = any([col1 in group or col2 in group for group in correlated_groups])
                if not already_grouped:
                    # Add the new group
                    correlated_groups.append({col1, col2})

    print(correlated_groups)
    strongest_corr_with_target = 0
    strongest_corr_col = ''
    keep_col = []
    for group in correlated_groups:
        for column in group:
            if __builtins__.abs(feature_correlation_with_target[column]) > strongest_corr_with_target:
                strongest_corr_with_target = __builtins__.abs(feature_correlation_with_target[column])
                strongest_corr_col = column
        keep_col.append(strongest_corr_col)
        strongest_corr_with_target = 0
    print(f"Selected columns: {keep_col}")

    dropped_cols = []
    for group in correlated_groups:
        for column in group:
            if column not in keep_col:
                dropped_cols.append(column)
    print(f"Dropped columns: {dropped_cols}")

    dataset = dataset.drop(*dropped_cols)

    feature_cols = dataset.drop(*string_cols).columns
    VectorAssembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    dataset = VectorAssembler.transform(dataset)
    return dataset

In [4]:
raw_df = spark.read.csv('data/merged_raw.csv', header=True, inferSchema=True)

In [78]:
df = fifa_pipeline(raw_df)

df.show(10)

[{'attacking_short_passing', 'skill_long_passing'}, {'skill_curve', 'skill_ball_control'}, {'power_long_shots', 'power_shot_power'}]
Selected columns: ['attacking_short_passing', 'skill_ball_control', 'power_shot_power']
Dropped columns: ['skill_long_passing', 'skill_curve', 'power_long_shots']
+-------+---+---------+---------+------------+---------+-----------+------------------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+--------------------+-----------------------+---------------------+----------------+-------------------+---------------------------+-------------------------+------------------------+------------------+--------------------+-------------------+-