In [42]:
from pyspark.sql.functions import regexp_extract, col
import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import to_date, col, count, when, isnan, regexp_replace
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import *

appName = "Big Data Analytics"
master = "local"

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
    .set('spark.driver.host','127.0.0.1')\
    .setAppName(appName)\
    .setMaster(master)

# Create Spark Context with the new configurations rather than rely on the default one
sc = SparkContext.getOrCreate(conf=conf)

# You need to create SQL Context to conduct some database operations like what we will see later.
sqlContext = SQLContext(sc)

# If you have SQL context, you create the session from the Spark Context
spark = sqlContext.sparkSession.builder.config('spark.sql.codegen.wholeStage', 'false').getOrCreate()

#Ingest data from the players.csv into Spark Dataframe. 
players_15_df = (spark.read
         .format("csv")
         .option("inferSchema", "true")
         .option("header","true")
         .load("/Users/wyc/players_15.csv")
      )

players_16_df = (spark.read
         .format("csv")
         .option("inferSchema", "true")
         .option("header","true")
         .load("/Users/wyc/players_16.csv")
      )

players_17_df = (spark.read
         .format("csv")
         .option("inferSchema", "true")
         .option("header","true")
         .load("/Users/wyc/players_17.csv")
      )

players_18_df = (spark.read
         .format("csv")
         .option("inferSchema", "true")
         .option("header","true")
         .load("/Users/wyc/players_18.csv")
      )

players_19_df = (spark.read
         .format("csv")
         .option("inferSchema", "true")
         .option("header","true")
         .load("/Users/wyc/players_19.csv")
      )

players_20_df = (spark.read
         .format("csv")
         .option("inferSchema", "true")
         .option("header","true")
         .load("/Users/wyc/players_20.csv")
      )

In [43]:
merged_df = players_15_df.union(players_16_df).union(players_17_df)\
    .union(players_18_df).union(players_19_df).union(players_20_df)

In [44]:
merged_df.printSchema()

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: string (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- nationality: string (nullable = true)
 |-- club: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: integer (nullable = true)
 |-- wage_eur: integer (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- international_reputation: integer (nullable = true)
 |-- weak_foot: integer (nullable = true)
 |-- skill_moves: integer (nullable = true)
 |-- work_rate: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- real_face: string (nullable = true)
 |-- release_clause_eur: string (nullable = true)
 |-- player_tags: s

In [45]:
full_df = merged_df

In [46]:
total_rows = full_df.count()
th = total_rows / 2
variables = ['sofifa_id','player_url','short_name','long_name','age','dob','height_cm','weight_kg','nationality','club','overall','potential','value_eur','wage_eur','player_positions','preferred_foot','international_reputation','weak_foot','skill_moves','work_rate','body_type','real_face','release_clause_eur','player_tags','team_position','team_jersey_number','loaned_from','joined','contract_valid_until','nation_position','nation_jersey_number','pace','shooting','passing','dribbling','defending','physic','gk_diving','gk_handling','gk_kicking','gk_reflexes','gk_speed','gk_positioning','player_traits','attacking_crossing','attacking_finishing','attacking_heading_accuracy','attacking_short_passing','attacking_volleys','skill_dribbling','skill_curve','skill_fk_accuracy','skill_long_passing','skill_ball_control','movement_acceleration','movement_sprint_speed','movement_agility','movement_reactions','movement_balance','power_shot_power','power_jumping','power_stamina','power_strength','power_long_shots','mentality_aggression','mentality_interceptions','mentality_positioning','mentality_vision','mentality_penalties','mentality_composure','defending_marking','defending_standing_tackle','defending_sliding_tackle','goalkeeping_diving','goalkeeping_handling','goalkeeping_kicking','goalkeeping_positioning','goalkeeping_reflexes','ls','st','rs','lw','lf','cf','rf','rw','lam','cam','ram','lm','lcm','cm','rcm','rm','lwb','ldm','cdm','rdm','rwb','lb','lcb','cb','rcb','rb']
full_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in variables]).show()
th




+---------+----------+----------+---------+---+---+---------+---------+-----------+----+-------+---------+---------+--------+----------------+--------------+------------------------+---------+-----------+---------+---------+---------+------------------+-----------+-------------+------------------+-----------+------+--------------------+---------------+--------------------+-----+--------+-------+---------+---------+------+---------+-----------+----------+-----------+--------+--------------+-------------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-----------------------+---------------------+----------------+-------------------+-------------------+--

                                                                                

50497.5

In [47]:
dropped = list(variables[i] for i in [22,23,26,29,30,37,38,39,40,41,42,43])
for v in dropped:
    full_df = full_df.drop(col(v))
full_df.printSchema()

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: string (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- nationality: string (nullable = true)
 |-- club: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: integer (nullable = true)
 |-- wage_eur: integer (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- international_reputation: integer (nullable = true)
 |-- weak_foot: integer (nullable = true)
 |-- skill_moves: integer (nullable = true)
 |-- work_rate: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- real_face: string (nullable = true)
 |-- team_position: string (nullable = true)
 |-- team_jersey_number:

In [56]:
dropped

['release_clause_eur',
 'player_tags',
 'loaned_from',
 'nation_position',
 'nation_jersey_number',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_reflexes',
 'gk_speed',
 'gk_positioning',
 'player_traits']

In [48]:
skills = ['attacking_crossing','attacking_finishing','attacking_heading_accuracy','attacking_short_passing','attacking_volleys','skill_dribbling','skill_curve','skill_fk_accuracy','skill_long_passing','skill_ball_control','movement_acceleration','movement_sprint_speed','movement_agility','movement_reactions','movement_balance','power_shot_power','power_jumping','power_stamina','power_strength','power_long_shots','mentality_aggression','mentality_interceptions','mentality_positioning','mentality_vision','mentality_penalties','mentality_composure','defending_marking','defending_standing_tackle','defending_sliding_tackle','goalkeeping_diving','goalkeeping_handling','goalkeeping_kicking','goalkeeping_positioning','goalkeeping_reflexes','ls','st','rs','lw','lf','cf','rf','rw','lam','cam','ram','lm','lcm','cm','rcm','rm','lwb','ldm','cdm','rdm','rwb','lb','lcb','cb','rcb','rb']
for name in skills:
    main = name + '_main'
    plus = name + '_plus'
    minus = name + '_minus'
    new = name + '_c'
    full_df = full_df\
        .withColumn(main,regexp_extract(col(name), '^([0-9]+)', 1).cast("integer"))\
        .withColumn("tmp1",regexp_extract(col(name), '^([0-9]+)\+([0-9]+)', 2).cast("integer"))\
        .withColumn("tmp2",regexp_extract(col(name), '^([0-9]+)\-([0-9]+)', 2).cast("integer"))\
        .withColumn(plus, when(col('tmp1').isNull(),0).otherwise(col('tmp1')).cast("integer"))\
        .withColumn(minus, when(col('tmp2').isNull(),0).otherwise(col('tmp2')).cast("integer"))\
        .withColumn(new, col(main)+col(plus)-col(minus))
# here cast integer is important since it will transform "" to Null
    full_df = full_df\
        .drop(col(main))\
        .drop(col('tmp1'))\
        .drop(col('tmp2'))\
        .drop(col(plus))\
        .drop(col(minus))\
        .drop(col(name))\
        .withColumnRenamed(new,name)
full_df.printSchema()


root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: string (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- nationality: string (nullable = true)
 |-- club: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: integer (nullable = true)
 |-- wage_eur: integer (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- international_reputation: integer (nullable = true)
 |-- weak_foot: integer (nullable = true)
 |-- skill_moves: integer (nullable = true)
 |-- work_rate: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- real_face: string (nullable = true)
 |-- team_position: string (nullable = true)
 |-- team_jersey_number:

In [49]:
full_df.show(5)

[Stage 102:>                                                        (0 + 1) / 1]

+---------+--------------------+-----------------+--------------------+---+----------+---------+---------+-----------+-------------------+-------+---------+---------+--------+----------------+--------------+------------------------+---------+-----------+-------------+---------+---------+-------------+------------------+----------+--------------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-----------------------+---------------------+----------------+-------------------+-------------------+-----------------+-------------------------+------------------------+------------------+--------------------

                                                                                

In [50]:
thr = full_df.count()/2
full_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in skills]).show()
    
    



+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-----------------------+---------------------+----------------+-------------------+-------------------+-----------------+-------------------------+------------------------+------------------+--------------------+-------------------+-----------------------+--------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|attacking_crossing|attacking_finishing|attacking_heading_accuracy|attacking_short_passing|attacking_volleys|skill_dribbling|skill_curve|skill_fk_accuracy|skill_long_passing|skil

                                                                                

We do not need to remove any columns, since no columns contains more than 50% of null

In [51]:
imputer = Imputer (
inputCols=skills,
outputCols=["{}_imputed".format(c) for c in skills]
).setStrategy("mean").setMissingValue(0)

full_imputed_df = imputer.fit(full_df).transform(full_df)
for name in skills:
    newname = name + '_imputed'
    full_imputed_df = full_imputed_df.drop(name)
    full_imputed_df = full_imputed_df.withColumnRenamed(newname,name)

                                                                                

In [52]:
full_imputed_df.show(5)

[Stage 114:>                                                        (0 + 1) / 1]

+---------+--------------------+-----------------+--------------------+---+----------+---------+---------+-----------+-------------------+-------+---------+---------+--------+----------------+--------------+------------------------+---------+-----------+-------------+---------+---------+-------------+------------------+----------+--------------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-----------------------+---------------------+----------------+-------------------+-------------------+-----------------+-------------------------+------------------------+------------------+--------------------

                                                                                

Inputed

In [53]:
from pyspark import SparkContext, SparkConf, SQLContext
import os
db_properties={}
db_properties['username']="postgres"
db_properties['password']="990331"
# make sure to use the correct port number. These 
db_properties['url']= "jdbc:postgresql://localhost:5432/postgres"
db_properties['driver']="org.postgresql.Driver"

In [54]:
full_imputed_df.write.format("jdbc")\
.mode("overwrite")\
.option("url", "jdbc:postgresql://localhost:5432/postgres")\
.option("dbtable", "fifa.player_info")\
.option("user", "postgres")\
.option("password", "990331")\
.option("Driver", "org.postgresql.Driver")\
.save()

                                                                                

In [55]:
full_imputed_df_read = sqlContext.read.format("jdbc")\
    .option("url", "jdbc:postgresql://localhost:5432/postgres")\
    .option("dbtable", "fifa.player_info")\
    .option("user", "postgres")\
    .option("password", "990331")\
    .option("Driver", "org.postgresql.Driver")\
    .load()

full_imputed_df_read.printSchema()
full_imputed_df_read.show(5)

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: string (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- nationality: string (nullable = true)
 |-- club: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: integer (nullable = true)
 |-- wage_eur: integer (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- international_reputation: integer (nullable = true)
 |-- weak_foot: integer (nullable = true)
 |-- skill_moves: integer (nullable = true)
 |-- work_rate: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- real_face: string (nullable = true)
 |-- team_position: string (nullable = true)
 |-- team_jersey_number:

[Stage 116:>                                                        (0 + 1) / 1]

+---------+--------------------+------------+--------------------+---+----------+---------+---------+------------+--------------------+-------+---------+---------+--------+----------------+--------------+------------------------+---------+-----------+-------------+---------+---------+-------------+------------------+----------+--------------------+----+--------+-------+---------+---------+------+------------------+-------------------+--------------------------+-----------------------+-----------------+---------------+-----------+-----------------+------------------+------------------+---------------------+---------------------+----------------+------------------+----------------+----------------+-------------+-------------+--------------+----------------+--------------------+-----------------------+---------------------+----------------+-------------------+-------------------+-----------------+-------------------------+------------------------+------------------+--------------------+--

                                                                                