In [36]:
# importing pyspark session
from pyspark.sql import SparkSession
spark = SparkSession.builder\
                    .appName('Analyzing Soccer Players')\
                    .getOrCreate()

In [37]:
# loading data of soccer players
players_data = spark.read\
            .format('csv')\
            .option('header', 'true')\
            .load('../datasets/player.csv')

In [38]:
# getting schema of the data
players_data.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- height: string (nullable = true)
 |-- weight: string (nullable = true)



In [39]:
# showing data
players_data.show(5)

+---+-------------+------------------+------------------+-------------------+------+------+
| id|player_api_id|       player_name|player_fifa_api_id|           birthday|height|weight|
+---+-------------+------------------+------------------+-------------------+------+------+
|  1|       505942|Aaron Appindangoye|            218353|1992-02-29 00:00:00|182.88|   187|
|  2|       155782|   Aaron Cresswell|            189615|1989-12-15 00:00:00|170.18|   146|
|  3|       162549|       Aaron Doran|            186170|1991-05-13 00:00:00|170.18|   163|
|  4|        30572|     Aaron Galindo|            140161|1982-05-08 00:00:00|182.88|   198|
|  5|        23780|      Aaron Hughes|             17725|1979-11-08 00:00:00|182.88|   154|
+---+-------------+------------------+------------------+-------------------+------+------+
only showing top 5 rows



In [40]:
# reading players_attributes data
players_attributes = spark.read\
                          .format('csv')\
                          .option('header', 'true')\
                          .load('../datasets/player_attributes.csv')

In [41]:
# printing schema of players attributes
players_attributes.printSchema()
# displays various attributes of players

root
 |-- id: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- overall_rating: string (nullable = true)
 |-- potential: string (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- attacking_work_rate: string (nullable = true)
 |-- defensive_work_rate: string (nullable = true)
 |-- crossing: string (nullable = true)
 |-- finishing: string (nullable = true)
 |-- heading_accuracy: string (nullable = true)
 |-- short_passing: string (nullable = true)
 |-- volleys: string (nullable = true)
 |-- dribbling: string (nullable = true)
 |-- curve: string (nullable = true)
 |-- free_kick_accuracy: string (nullable = true)
 |-- long_passing: string (nullable = true)
 |-- ball_control: string (nullable = true)
 |-- acceleration: string (nullable = true)
 |-- sprint_speed: string (nullable = true)
 |-- agility: string (nullable = true)
 |-- reactions: string (nullable = true

In [42]:
# getting count of the data
players_data.count(), players_attributes.count()

(11060, 183978)

In [43]:
# selecting dictinct players with id
players_attributes.select('player_api_id').distinct().count()
# which is equal to players data count

11060

In [44]:
# removing unusable columns for faster processing from players data
players_data = players_data.drop('id', 'player_fifa_api_id')
players_data.columns

['player_api_id', 'player_name', 'birthday', 'height', 'weight']

In [45]:
# removing unusable columns for faster processing from players attributes data
players_attributes = players_attributes.drop(
                     'id',
                     'player_fifa_api_id',
                     'preferred_foot',
                     'attacking_work_rate',
                     'defensive_work_rate',
                     'crossing',
                     'jumping',
                     'sprint_speed',
                     'balance',
                     'aggregation',
                     'short_passing',
                     'potential'
                    )
players_attributes.columns

['player_api_id',
 'date',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'aggression',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes']

In [46]:
# cleaning data
players_data = players_data.dropna()
players_attributes = players_attributes.dropna()

In [47]:
# getting count
players_data.count(), players_attributes.count()
# decrease in players attribute data count from 183978 to 181265

(11060, 181265)

In [48]:
# creating user defined function
from pyspark.sql.functions import udf

year_extract_udf = udf(lambda date: date.split('-')[0])
players_attributes = players_attributes.withColumn(
                     'years',
                     year_extract_udf(players_attributes.date)
)
# removing date cloumn as year has been extracted
players_attributes = players_attributes.drop('date')
players_attributes.columns
# a new column has added as years

['player_api_id',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'aggression',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes',
 'years']

In [76]:
# filitering only player attribute data for the year 2016
pa_data_2016 = players_attributes.filter(players_attributes['years'] == 2016)
pa_data_2016.count()

14098

In [77]:
# getting distinct players count
pa_data_2016.select(pa_data_2016.player_api_id)\
                          .distinct()\
                          .count()

5586

In [78]:
# getting best stricker
pa_striker_2016 = pa_data_2016.groupBy('player_api_id')\
                              .agg({
                                'finishing':'avg',
                                'shot_power':'avg',
                                'acceleration':'avg'
                            })
pa_striker_2016.count()

5586

In [79]:
# showing data
pa_striker_2016.show(5)

+-------------+-----------------+-----------------+---------------+
|player_api_id|   avg(finishing)|avg(acceleration)|avg(shot_power)|
+-------------+-----------------+-----------------+---------------+
|       309726|75.44444444444444|74.11111111111111|           76.0|
|        26112|             53.0|             51.0|           76.0|
|        38433|            68.25|             74.0|           74.0|
|       295060|             25.0|             62.0|           40.0|
|       161396|             29.0|             72.0|           69.0|
+-------------+-----------------+-----------------+---------------+
only showing top 5 rows



In [80]:
# renaming columns name
pa_striker_2016 = pa_striker_2016.withColumnRenamed('avg(finishing)', 'finishing')\
                                 .withColumnRenamed('avg(acceleration)', 'acceleration')\
                                 .withColumnRenamed('avg(shot_power)', 'shot_power')
pa_striker_2016.show(5)

+-------------+-----------------+-----------------+----------+
|player_api_id|        finishing|     acceleration|shot_power|
+-------------+-----------------+-----------------+----------+
|       309726|75.44444444444444|74.11111111111111|      76.0|
|        26112|             53.0|             51.0|      76.0|
|        38433|            68.25|             74.0|      74.0|
|       295060|             25.0|             62.0|      40.0|
|       161396|             29.0|             72.0|      69.0|
+-------------+-----------------+-----------------+----------+
only showing top 5 rows



In [81]:
# assigning weights for each attributes
weight_finishing = 1
weight_acceleration = 1
weight_shot_power = 2
total_weight = weight_finishing + weight_acceleration + weight_shot_power

In [82]:
# finding strikers player grade
strickers = pa_striker_2016.withColumn('striker_grade',
                                       (pa_striker_2016.finishing * weight_finishing + \
                                        pa_striker_2016.acceleration * weight_acceleration + \
                                        pa_striker_2016.shot_power * weight_shot_power) / total_weight)

In [83]:
# dropping unused/extra columns for faster processing
strickers = strickers.drop('finishing', 'acceleration', 'shot_power')
strickers.columns

['player_api_id', 'striker_grade']

In [84]:
# showing data with player_api_id and striker_grade
strickers.show(5)

+-------------+-----------------+
|player_api_id|    striker_grade|
+-------------+-----------------+
|       309726|75.38888888888889|
|        26112|             64.0|
|        38433|          72.5625|
|       295060|            41.75|
|       161396|            59.75|
+-------------+-----------------+
only showing top 5 rows



In [85]:
# getting grade score only greater than 70
strickers = strickers.filter(strickers.striker_grade > 70)\
                     .sort(strickers.striker_grade.desc())
strickers.show(10)

+-------------+-----------------+
|player_api_id|    striker_grade|
+-------------+-----------------+
|        20276|            89.25|
|        37412|             89.0|
|        38817|            88.75|
|        32118|            88.25|
|        31921|             87.0|
|        30834|            86.75|
|       303824|85.10714285714286|
|       129944|             85.0|
|       158263|            84.75|
|       150565|            84.75|
+-------------+-----------------+
only showing top 10 rows



In [86]:
# getting count
strickers.count(), players_data.count()

(1609, 11060)

In [93]:
# getting overall details by joining two tables
strikers_details = players_data.join(strickers,
                                     players_data.player_api_id == strickers.player_api_id
                                    )
strikers_details.columns
# getting player_api_id twice one from both table

['player_api_id',
 'player_name',
 'birthday',
 'height',
 'weight',
 'player_api_id',
 'striker_grade']

In [94]:
# getting count
strikers_details.count()

1609

In [95]:
# other efficient way to join the two data table
strikers_details = players_data.join(strickers, ['player_api_id'])
strikers_details.columns
# getting player_api_id only once

['player_api_id',
 'player_name',
 'birthday',
 'height',
 'weight',
 'striker_grade']

In [97]:
# showing data 
strikers_details.show(10)

+-------------+--------------------+-------------------+------+------+-----------------+
|player_api_id|         player_name|           birthday|height|weight|    striker_grade|
+-------------+--------------------+-------------------+------+------+-----------------+
|        20276|                Hulk|1986-07-25 00:00:00|180.34|   187|            89.25|
|        37412|       Sergio Aguero|1988-06-02 00:00:00|172.72|   163|             89.0|
|        38817|        Carlos Tevez|1984-02-05 00:00:00|172.72|   157|            88.75|
|        32118|      Lukas Podolski|1985-06-04 00:00:00|182.88|   183|            88.25|
|        31921|         Gareth Bale|1989-07-16 00:00:00|182.88|   163|             87.0|
|        30834|        Arjen Robben|1984-01-23 00:00:00|180.34|   176|            86.75|
|       303824|       Memphis Depay|1994-02-13 00:00:00|175.26|   172|85.10714285714286|
|       129944|          Marco Reus|1989-05-31 00:00:00|180.34|   165|             85.0|
|       158263|      

In [98]:
# using broadcast shared variable to join the table
# only one copy data is shared to all the nodes
# broadcast smaller data to larger data table, strickers(1609) to player(11060)
from pyspark.sql.functions import broadcast
strikers_details = players_data.select(
                                        'player_api_id',
                                        'player_name'
                                      )\
                                .join(
                                        broadcast(strickers),
                                        ['player_api_id'],
                                        'inner'
                                     )

In [102]:
# best strickers details for selection in fifa
strikers_details = strikers_details.sort(strikers_details.striker_grade.desc())
strikers_details.show(5)

+-------------+--------------+-------------+
|player_api_id|   player_name|striker_grade|
+-------------+--------------+-------------+
|        20276|          Hulk|        89.25|
|        37412| Sergio Aguero|         89.0|
|        38817|  Carlos Tevez|        88.75|
|        32118|Lukas Podolski|        88.25|
|        31921|   Gareth Bale|         87.0|
+-------------+--------------+-------------+
only showing top 5 rows



In [106]:
# finding relation between heading accuracy with players heights
players_data.count(), players_attributes.count()

(11060, 181265)

In [109]:
players_heading_acc = players_attributes.select(
                                                'player_api_id',
                                                'heading_accuracy'
                                                )\
                                         .join(broadcast(players_data),
                                               ['player_api_id'],
                                               'inner'
                                              )
players_heading_acc.columns

['player_api_id',
 'heading_accuracy',
 'player_name',
 'birthday',
 'height',
 'weight']

In [115]:
players_heading_acc.show(5)

+-------------+----------------+------------------+-------------------+------+------+
|player_api_id|heading_accuracy|       player_name|           birthday|height|weight|
+-------------+----------------+------------------+-------------------+------+------+
|       505942|              71|Aaron Appindangoye|1992-02-29 00:00:00|182.88|   187|
|       505942|              71|Aaron Appindangoye|1992-02-29 00:00:00|182.88|   187|
|       505942|              71|Aaron Appindangoye|1992-02-29 00:00:00|182.88|   187|
|       505942|              70|Aaron Appindangoye|1992-02-29 00:00:00|182.88|   187|
|       505942|              70|Aaron Appindangoye|1992-02-29 00:00:00|182.88|   187|
+-------------+----------------+------------------+-------------------+------+------+
only showing top 5 rows



In [116]:
# using accumulator as we have to take count for height
# writing data by workers to var in driver
# defining 4 types of height categories
short_height_count = spark.sparkContext.accumulator(0)
medium_low_height_count = spark.sparkContext.accumulator(0)
medium_high_height_count = spark.sparkContext.accumulator(0)
tall_height_count = spark.sparkContext.accumulator(0)

In [117]:
# defining helper function for counting players
def count_players_by_height(row):
    height = float(row.height)
    
    if(height <= 175):
        short_height_count.add(1)
    elif(height <= 183 and height > 175):
        medium_low_height_count.add(1)
    elif(height <= 195 and height > 183):
        medium_high_height_count.add(1)
    elif(height > 195):
        tall_height_count.add(1)

In [118]:
# applying this function to each records in table
players_heading_acc.foreach(lambda x: count_players_by_height(x))
#for rows in players_heading_acc

In [121]:
all_players = [
               short_height_count.value,
               medium_low_height_count.value,
               medium_high_height_count.value,
               tall_height_count.value
]
all_players

[18977, 97399, 61518, 3371]

In [128]:
# saving dataframes in csv file
player_data_2016 .select('player_api_id', 'overall_rating')\
                 .coalesce(1)\
                 .write\
                 .option('header', 'true')\
                 .csv('player_id_name.csv')

In [129]:
# saving dataframes in json file
player_data_2016.select('player_api_id', 'overall_rating')\
                .write\
                .json('player_id_name.json')

In [130]:
# creating custom vector accumulator
from pyspark.accumulators import AccumulatorParam

class VectorAccumulatorParam(AccumulatorParam):
    
    def zero(self, value):
        return [0.0] * len(value)
    
    def addInPlace(self, v1, v2):
        for i in range(len(v1)):
            v1[i] +=v2[i]
            
        return v1

In [135]:
# printing values
vector_accum = sc.accumulator([10.0, 20.0, 30.0], VectorAccumulatorParam())
vector_accum.value

[10.0, 20.0, 30.0]

In [136]:
# adding values
vector_accum += [5, 5, 5]
vector_accum.value

[15.0, 25.0, 35.0]