In [2]:
from pyspark.sql import SparkSession

In [53]:
sc

In [3]:
spark = SparkSession.builder.appName('Analyzing Soccer Player').getOrCreate()

In [4]:
playersPath =r'C:\\Hadoop\\Data\\player.csv'

In [5]:
players = spark.read.format('csv')\
                    .option('header', 'true')\
                    .option('inferSchema', 'true')\
                    .load(playersPath)

In [6]:
players.printSchema()

root
 |-- id: integer (nullable = true)
 |-- player_api_id: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_fifa_api_id: integer (nullable = true)
 |-- birthday: timestamp (nullable = true)
 |-- height: double (nullable = true)
 |-- weight: integer (nullable = true)



In [7]:
players.show(5)

+---+-------------+------------------+------------------+-------------------+------+------+
| id|player_api_id|       player_name|player_fifa_api_id|           birthday|height|weight|
+---+-------------+------------------+------------------+-------------------+------+------+
|  1|       505942|Aaron Appindangoye|            218353|1992-02-29 00:00:00|182.88|   187|
|  2|       155782|   Aaron Cresswell|            189615|1989-12-15 00:00:00|170.18|   146|
|  3|       162549|       Aaron Doran|            186170|1991-05-13 00:00:00|170.18|   163|
|  4|        30572|     Aaron Galindo|            140161|1982-05-08 00:00:00|182.88|   198|
|  5|        23780|      Aaron Hughes|             17725|1979-11-08 00:00:00|182.88|   154|
+---+-------------+------------------+------------------+-------------------+------+------+
only showing top 5 rows



In [8]:
playersAttributePath =r'C:\\Hadoop\\Data\\player_attributes.csv'
playerAttributes = spark.read.format('csv')\
                    .option('header', 'true')\
                    .option('inferSchema', 'true')\
                    .load(playersAttributePath)

In [9]:
playerAttributes.show(5)

+---+------------------+-------------+-------------------+--------------+---------+--------------+-------------------+-------------------+--------+---------+----------------+-------------+-------+---------+-----+------------------+------------+------------+------------+------------+-------+---------+-------+----------+-------+-------+--------+----------+----------+-------------+-----------+------+---------+-------+---------------+--------------+---------+-----------+----------+--------------+-----------+
| id|player_fifa_api_id|player_api_id|               date|overall_rating|potential|preferred_foot|attacking_work_rate|defensive_work_rate|crossing|finishing|heading_accuracy|short_passing|volleys|dribbling|curve|free_kick_accuracy|long_passing|ball_control|acceleration|sprint_speed|agility|reactions|balance|shot_power|jumping|stamina|strength|long_shots|aggression|interceptions|positioning|vision|penalties|marking|standing_tackle|sliding_tackle|gk_diving|gk_handling|gk_kicking|gk_posit

In [10]:
playerAttributes.printSchema()

root
 |-- id: integer (nullable = true)
 |-- player_fifa_api_id: integer (nullable = true)
 |-- player_api_id: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- overall_rating: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- attacking_work_rate: string (nullable = true)
 |-- defensive_work_rate: string (nullable = true)
 |-- crossing: integer (nullable = true)
 |-- finishing: integer (nullable = true)
 |-- heading_accuracy: integer (nullable = true)
 |-- short_passing: integer (nullable = true)
 |-- volleys: integer (nullable = true)
 |-- dribbling: integer (nullable = true)
 |-- curve: integer (nullable = true)
 |-- free_kick_accuracy: integer (nullable = true)
 |-- long_passing: integer (nullable = true)
 |-- ball_control: integer (nullable = true)
 |-- acceleration: integer (nullable = true)
 |-- sprint_speed: integer (nullable = true)
 |-- agility: integer (nullable = true)
 |-- reactions: in

In [11]:
players.count(), playerAttributes.count()

(11060, 183978)

In [12]:
playerAttributes = playerAttributes.dropna()
players = players.dropna()

In [13]:
players.count(), playerAttributes.count()

(11060, 180354)

In [14]:
from pyspark.sql.functions import udf

In [15]:
year_extract_udf = udf(lambda date: date.year)
playerAttributes = playerAttributes.withColumn('year', year_extract_udf(playerAttributes.date))
playerAttributes.columns

['id',
 'player_fifa_api_id',
 'player_api_id',
 'date',
 'overall_rating',
 'potential',
 'preferred_foot',
 'attacking_work_rate',
 'defensive_work_rate',
 'crossing',
 'finishing',
 'heading_accuracy',
 'short_passing',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'sprint_speed',
 'agility',
 'reactions',
 'balance',
 'shot_power',
 'jumping',
 'stamina',
 'strength',
 'long_shots',
 'aggression',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes',
 'year']

In [16]:
player_attributes_for_year_2016 = playerAttributes.filter(playerAttributes.year == 2016)
player_attributes_for_year_2016.count()

14098

In [17]:
player_attributes_for_year_2016.select(player_attributes_for_year_2016.player_api_id).distinct().count()

5586

In [18]:
pa_striker_2016 = player_attributes_for_year_2016.groupBy('player_api_id')\
                                                 .agg({
                                                        'finishing':'avg',
                                                        'shot_power':'avg',
                                                        'acceleration':'avg'
                                                      })\
                                                 .withColumnRenamed('avg(finishing)','finishing')\
                                                 .withColumnRenamed('avg(acceleration)','acceleration')\
                                                 .withColumnRenamed('avg(shot_power)','shot_power')

In [19]:
pa_striker_2016.show()

+-------------+------------------+-----------------+-----------------+
|player_api_id|         finishing|     acceleration|       shot_power|
+-------------+------------------+-----------------+-----------------+
|       114503|              78.0|             77.0|             75.0|
|       171094|              39.0|             65.0|             36.0|
|        27484|              61.0|71.66666666666667|             71.0|
|       166648| 86.83333333333333|             84.0|             81.0|
|       265363|              53.0|88.33333333333333|72.66666666666667|
|        41890|              66.4|             79.0|             77.8|
|       195413|61.666666666666664|             77.0|             76.0|
|        89476|              70.0|             72.0|             72.0|
|        27974|              61.0|             40.5|             77.0|
|        26708|              38.0|             55.0|             47.0|
|       689987|              41.0|             52.0|             52.0|
|     

In [20]:
weight_finishing = 1
weight_acceleration = 2 
weight_shot_power = 1

total_weight = weight_finishing + weight_acceleration + weight_shot_power

In [21]:
strikers = pa_striker_2016.withColumn('grade', (pa_striker_2016.finishing * weight_finishing + \
                                               pa_striker_2016.finishing * weight_finishing + \
                                               pa_striker_2016.finishing * weight_finishing) /total_weight) 

strikers = strikers.filter(strikers.grade > 65) \
                   .sort(strikers.grade.desc())

strikers.show()

+-------------+-----------------+-----------------+----------+------+
|player_api_id|        finishing|     acceleration|shot_power| grade|
+-------------+-----------------+-----------------+----------+------+
|        37412|             90.0|             92.0|      87.0|  67.5|
|        25759|89.66666666666667|             79.0|      85.0| 67.25|
|        93447|             89.0|             79.0|      84.5| 66.75|
|       116772|             89.0|             76.0|      74.0| 66.75|
|        70409|             89.0|             87.0|      75.0| 66.75|
|        19243|             89.0|             75.0|      83.0| 66.75|
|        19533|             88.5|             91.0|      77.5|66.375|
|       150565|             88.0|             95.0|      78.0|  66.0|
|        38817|             88.0|             90.0|      88.5|  66.0|
|       282770|87.33333333333333|             76.5|      78.0|  65.5|
|        30709|             87.0|             34.0|      77.0| 65.25|
|       184138|     

In [22]:
strikers.count(), players.count()

(15, 11060)

In [23]:
# join approach 1 
striker_details = players.join(strikers, players.player_api_id == strikers.player_api_id)
striker_details.columns

['id',
 'player_api_id',
 'player_name',
 'player_fifa_api_id',
 'birthday',
 'height',
 'weight',
 'player_api_id',
 'finishing',
 'acceleration',
 'shot_power',
 'grade']

In [24]:
# join approach 2, help for multi column joins
striker_details = players.join(strikers, ['player_api_id'])
striker_details.columns

['player_api_id',
 'id',
 'player_name',
 'player_fifa_api_id',
 'birthday',
 'height',
 'weight',
 'finishing',
 'acceleration',
 'shot_power',
 'grade']

In [25]:
striker_details.show(2)

+-------------+----+---------------+------------------+-------------------+------+------+-----------------+------------+----------+-----+
|player_api_id|  id|    player_name|player_fifa_api_id|           birthday|height|weight|        finishing|acceleration|shot_power|grade|
+-------------+----+---------------+------------------+-------------------+------+------+-----------------+------------+----------+-----+
|        37412|9674|  Sergio Aguero|            153079|1988-06-02 00:00:00|172.72|   163|             90.0|        92.0|      87.0| 67.5|
|        25759|3936|Gonzalo Higuain|            167664|1987-12-10 00:00:00|182.88|   181|89.66666666666667|        79.0|      85.0|67.25|
+-------------+----+---------------+------------------+-------------------+------+------+-----------------+------------+----------+-----+
only showing top 2 rows



In [27]:
#broad casting
from pyspark.sql.functions import broadcast

In [28]:
# strikers can be broad casted, as they are kind of lookups which avoids duplicacy of data at each task level 
# instead the are managed at the node level
# broadcast is used to speed up the execution

striker_details = players.select('player_api_id','player_name','birthday')\
                         .join(broadcast(strikers), ['player_api_id'], 'inner')

In [29]:
striker_details.show()

+-------------+--------------------+-------------------+-----------------+-----------------+----------+------+
|player_api_id|         player_name|           birthday|        finishing|     acceleration|shot_power| grade|
+-------------+--------------------+-------------------+-----------------+-----------------+----------+------+
|       184138|   Antoine Griezmann|1991-03-21 00:00:00|             87.0|             86.0|      79.0| 65.25|
|       166648|        Carlos Bacca|1986-09-08 00:00:00|86.83333333333333|             84.0|      81.0|65.125|
|        38817|        Carlos Tevez|1984-02-05 00:00:00|             88.0|             90.0|      88.5|  66.0|
|        19243|         Diego Costa|1988-10-07 00:00:00|             89.0|             75.0|      83.0| 66.75|
|        25759|     Gonzalo Higuain|1987-12-10 00:00:00|89.66666666666667|             79.0|      85.0| 67.25|
|        70409|    Javier Hernandez|1988-06-01 00:00:00|             89.0|             87.0|      75.0| 66.75|
|

In [31]:
striker_details = striker_details.sort(striker_details.grade.desc())

In [32]:
striker_details.show()

+-------------+--------------------+-------------------+-----------------+-----------------+----------+------+
|player_api_id|         player_name|           birthday|        finishing|     acceleration|shot_power| grade|
+-------------+--------------------+-------------------+-----------------+-----------------+----------+------+
|        37412|       Sergio Aguero|1988-06-02 00:00:00|             90.0|             92.0|      87.0|  67.5|
|        25759|     Gonzalo Higuain|1987-12-10 00:00:00|89.66666666666667|             79.0|      85.0| 67.25|
|        70409|    Javier Hernandez|1988-06-01 00:00:00|             89.0|             87.0|      75.0| 66.75|
|        93447|  Robert Lewandowski|1988-08-21 00:00:00|             89.0|             79.0|      84.5| 66.75|
|        19243|         Diego Costa|1988-10-07 00:00:00|             89.0|             75.0|      83.0| 66.75|
|       116772|      Thomas Mueller|1989-09-13 00:00:00|             89.0|             76.0|      74.0| 66.75|
|

In [33]:
players.count(), playerAttributes.count()

(11060, 180354)

In [144]:
#broadcasting players as it is smaller dataset when compared to player attributes
players_heading_accuracy = playerAttributes.select('player_api_id', 'heading_accuracy')\
                                           .join(broadcast(players), playerAttributes.player_api_id == players.player_api_id)

# there is an issue with birthday column, when included so removed it
# need to understand why that happened

players_heading_accuracy = players_heading_accuracy.drop('id', 'player_fifa_api_id', 'birthday')
players_heading_accuracy.printSchema()
players_heading_accuracy.show(5)
#players_heading_accuracy = players_heading_accuracy.filter(playerAttributes.player_api_id == 505942)

root
 |-- player_api_id: integer (nullable = true)
 |-- heading_accuracy: integer (nullable = true)
 |-- player_api_id: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- height: double (nullable = true)
 |-- weight: integer (nullable = true)

+-------------+----------------+-------------+------------------+------+------+
|player_api_id|heading_accuracy|player_api_id|       player_name|height|weight|
+-------------+----------------+-------------+------------------+------+------+
|       505942|              71|       505942|Aaron Appindangoye|182.88|   187|
|       505942|              71|       505942|Aaron Appindangoye|182.88|   187|
|       505942|              71|       505942|Aaron Appindangoye|182.88|   187|
|       505942|              70|       505942|Aaron Appindangoye|182.88|   187|
|       505942|              70|       505942|Aaron Appindangoye|182.88|   187|
+-------------+----------------+-------------+------------------+------+------+
only showing 

In [134]:
#using accumulators 

short_count = spark.sparkContext.accumulator(0)
medium_low_count = spark.sparkContext.accumulator(0)
medium_high_count = spark.sparkContext.accumulator(0)
tail_count = spark.sparkContext.accumulator(0)

all_counters = [ short_count.value, medium_high_count.value, medium_low_count.value, tail_count.value]
all_counters

[0, 0, 0, 0]

In [135]:
def count_of_players_by_height(row):   
    height = row.height 
    if (height <= 175):
         short_count.add(1)
    elif (height <= 183 and height > 175):
        medium_low_count.add(1)
    elif (height <= 195 and height > 183):   
        medium_high_count.add(1)
    elif (height > 195):   
        tail_count.add(1)

In [136]:
players_heading_accuracy.foreach(lambda row: count_of_players_by_height(row))

all_counters = [ short_count.value, medium_high_count.value, medium_low_count.value, tail_count.value]
all_counters

[18869, 61184, 96967, 3334]

In [155]:
short_ha_count = spark.sparkContext.accumulator(0)
medium_low_ha_count = spark.sparkContext.accumulator(0)
medium_high_ha_count = spark.sparkContext.accumulator(0)
tail_ha_count = spark.sparkContext.accumulator(0)

In [156]:
def count_of_players_by_height_and_heading_accuracy(row, threshold_score):   
    height = row.height
    ha = row.heading_accuracy
    
    if (ha <= threshold_score):
        return 
    
    if (height <= 175):
         short_ha_count.add(1)
    elif (height <= 183 and height > 175):
        medium_low_ha_count.add(1)
    elif (height <= 195 and height > 183):   
        medium_high_ha_count.add(1)
    elif (height > 195):   
        tail_ha_count.add(1)

In [157]:
players_heading_accuracy.foreach(lambda row: count_of_players_by_height_and_heading_accuracy(row, 60))
all_counters_above_threshold = [
                short_ha_count.value,
                medium_low_ha_count.value,
                medium_high_ha_count.value,
                tail_ha_count.value
               ]
all_counters_above_threshold

[3614, 41234, 40091, 1557]

In [158]:
percentage_values = [short_ha_count.value / short_count.value *100,
                     medium_low_ha_count.value / medium_low_count.value *100,
                     medium_high_ha_count.value / medium_high_count.value *100,
                     tail_ha_count.value / tail_count.value *100
                    ]

percentage_values

[13.848335057669464, 22.9799091593056, 28.35971874425251, 24.147022332506204]

In [160]:
# writing CSV file
# coalesce(1) - will create one file with all required contents
player_attributes_for_year_2016.select('player_api_id','overall_rating')\
                               .coalesce(1)\
                               .write\
                               .option('header','true')\
                               .csv('player_overall_rating.csv')

In [162]:
# write content as JSON file, with files equal to number of partitions
player_attributes_for_year_2016.select('player_api_id','overall_rating')\
                               .write\
                               .json('player_overall_rating.json')
