In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import Row
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SQLContext

In [2]:
app_name = "i_dunno"
player_file = "data/Player.csv"
attributes_file = "data/Player_Attributes.csv"

def toIntSafe(inval):
    try:
        return int(inval)
    except ValueError:
        return None

def toTimeSafe(inval):
    try:
        return datetime.strptime(inval, "%Y-%m-%d %H:%M:%S")
    except ValueError:
        return None

def toFloatSafe(inval):
    try:
        return float(inval)
    except ValueError:
        return 

def stringToPlayer(s):
    return Row(
        toIntSafe(s[0]),
        toIntSafe(s[1]),
        s[2].strip('"'),
        toIntSafe(s[3]),
        toTimeSafe(s[4].strip('"')),
        toFloatSafe(s[5]),
        toIntSafe(s[6]))

def stringToAttributes(s):
    return Row(
        toIntSafe(s[0]),
        toIntSafe(s[1]),
        toIntSafe(s[2]),
        toTimeSafe(s[3].strip('"')),
        toIntSafe(s[4]),
        toIntSafe(s[5]),
        s[6],
        s[7],
        s[8],
        *[toIntSafe(field) for field in s[9:]]
        )

playerSchema = StructType([
    StructField("id", IntegerType(), False),
    StructField("player_api_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("player_fifa_api_id", IntegerType(), True),
    StructField("birthday", TimestampType(), True),
    StructField("height", FloatType(), True),
    StructField("weight", IntegerType(), True)
    ])

finalColumns = ["crossing","finishing","heading_accuracy","short_passing","volleys","dribbling","curve","free_kick_accuracy","long_passing","ball_control","acceleration","sprint_speed","agility","reactions","balance","shot_power","jumping","stamina","strength","long_shots","aggression","interceptions","positioning","vision","penalties","marking","standing_tackle","sliding_tackle","gk_diving","gk_handling","gk_kicking","gk_positioning","gk_reflexes"]

attributesSchema = StructType([
    StructField("id", IntegerType(), False),
    StructField("player_fifa_api_id", IntegerType(), True),
    StructField("player_api_id", IntegerType(), True),
    StructField("date", TimestampType(), True),
    StructField("overall_rating", IntegerType(), True),
    StructField("potential", IntegerType(), True),
    StructField("preferred_foot", StringType(), True),
    StructField("attacking_work_rate", StringType(), True),
    StructField("defensive_work_rate", StringType(), True)] + [StructField(s, IntegerType(), True) for s in finalColumns])



In [3]:
player_rdd = sc.textFile(player_file)
player_header = player_rdd.first()
player_rdd = player_rdd.filter(lambda row: row != player_header).persist()
attributes_rdd = sc.textFile(attributes_file)
attributes_header = attributes_rdd.first()
attributes_rdd = attributes_rdd.filter(lambda row: row != attributes_header).persist()
player_rdd.count()
attributes_rdd.count()

183978

In [4]:
player_df = sqlContext.createDataFrame(player_rdd.map(lambda row: stringToPlayer(row.split(','))), playerSchema)
attributes_df = sqlContext.createDataFrame(attributes_rdd.map(lambda row: stringToAttributes(row.split(','))), attributesSchema)

In [5]:
player_merge=player_df.join(attributes_df,player_df.player_api_id==attributes_df.player_api_id,'left').select(player_df.id, player_df.player_api_id, player_df.player_fifa_api_id,"player_name", "birthday", "height", "weight", "date", "overall_rating", "potential", "preferred_foot", "attacking_work_rate","defensive_work_rate","crossing","finishing","heading_accuracy","short_passing","volleys","dribbling","curve","free_kick_accuracy","long_passing","ball_control","acceleration","sprint_speed","agility","reactions","balance","shot_power","jumping","stamina","strength","long_shots","aggression","interceptions","positioning","vision","penalties","marking","standing_tackle","sliding_tackle","gk_diving","gk_handling","gk_kicking","gk_positioning","gk_reflexes")

In [6]:
#cached parent DF for final DF below to save in memory and aid computation time
player_merge.cache()
player_merge.first()

Row(id=3402, player_api_id=11317, player_fifa_api_id=153267, player_name=u'Florent Ghisolfi', birthday=datetime.datetime(1985, 2, 28, 0, 0), height=175.25999450683594, weight=154, date=datetime.datetime(2014, 3, 21, 0, 0), overall_rating=64, potential=65, preferred_foot=u'right', attacking_work_rate=u'medium', defensive_work_rate=u'medium', crossing=44, finishing=40, heading_accuracy=59, short_passing=67, volleys=40, dribbling=61, curve=41, free_kick_accuracy=49, long_passing=65, ball_control=63, acceleration=58, sprint_speed=61, agility=60, reactions=69, balance=74, shot_power=53, jumping=62, stamina=71, strength=64, long_shots=57, aggression=76, interceptions=61, positioning=55, vision=63, penalties=44, marking=55, standing_tackle=71, sliding_tackle=61, gk_diving=7, gk_handling=10, gk_kicking=15, gk_positioning=5, gk_reflexes=9)

# FeaturesExtraction

In [7]:
#Calculation of Age
df2=player_merge.withColumn('age',datediff('date','birthday')/365.0)
df2=df2.withColumn("intage",df2["age"].cast(IntegerType())).drop('age').withColumnRenamed("intage","age")

In [8]:
df2.select("age").show()

+---+
|age|
+---+
| 29|
| 28|
| 28|
| 28|
| 27|
| 27|
| 27|
| 26|
| 25|
| 23|
| 22|
| 21|
| 32|
| 31|
| 31|
| 29|
| 28|
| 28|
| 27|
| 26|
+---+
only showing top 20 rows



In [9]:
df2.write.saveAsTable("Players")