In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder\
                    .appName("Analyzing Soccer Players")\
                    .getOrCreate()

In [3]:
players = spark.read \
                   .format("csv") \
                   .option("header","true")\
                   .load('../datasets/player.csv')

In [12]:
players.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- height: string (nullable = true)
 |-- weight: string (nullable = true)



In [13]:
players.show()

+---+-------------+--------------------+------------------+-------------------+------+------+
| id|player_api_id|         player_name|player_fifa_api_id|           birthday|height|weight|
+---+-------------+--------------------+------------------+-------------------+------+------+
|  1|       505942|  Aaron Appindangoye|            218353|1992-02-29 00:00:00|182.88|   187|
|  2|       155782|     Aaron Cresswell|            189615|1989-12-15 00:00:00|170.18|   146|
|  3|       162549|         Aaron Doran|            186170|1991-05-13 00:00:00|170.18|   163|
|  4|        30572|       Aaron Galindo|            140161|1982-05-08 00:00:00|182.88|   198|
|  5|        23780|        Aaron Hughes|             17725|1979-11-08 00:00:00|182.88|   154|
|  6|        27316|          Aaron Hunt|            158138|1986-09-04 00:00:00|182.88|   161|
|  7|       564793|          Aaron Kuhl|            221280|1996-01-30 00:00:00|172.72|   146|
|  8|        30895|        Aaron Lennon|            152747|1

In [14]:
players.count()

11060

In [4]:
player_attributes = spark.read \
                         .format('csv')\
                         .option('header','true')\
                         .load('../datasets/player_attributes.csv')

In [16]:
player_attributes.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- overall_rating: string (nullable = true)
 |-- potential: string (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- attacking_work_rate: string (nullable = true)
 |-- defensive_work_rate: string (nullable = true)
 |-- crossing: string (nullable = true)
 |-- finishing: string (nullable = true)
 |-- heading_accuracy: string (nullable = true)
 |-- short_passing: string (nullable = true)
 |-- volleys: string (nullable = true)
 |-- dribbling: string (nullable = true)
 |-- curve: string (nullable = true)
 |-- free_kick_accuracy: string (nullable = true)
 |-- long_passing: string (nullable = true)
 |-- ball_control: string (nullable = true)
 |-- acceleration: string (nullable = true)
 |-- sprint_speed: string (nullable = true)
 |-- agility: string (nullable = true)
 |-- reactions: string (nullable = true

In [18]:
player_attributes.select('player_fifa_api_id').show()

+------------------+
|player_fifa_api_id|
+------------------+
|            218353|
|            218353|
|            218353|
|            218353|
|            218353|
|            189615|
|            189615|
|            189615|
|            189615|
|            189615|
|            189615|
|            189615|
|            189615|
|            189615|
|            189615|
|            189615|
|            189615|
|            189615|
|            189615|
|            189615|
+------------------+
only showing top 20 rows



In [23]:
player_attributes.count(), players.count()

(183978, 11060)

In [24]:
player_attributes.select('player_api_id')\
                 .distinct().count()

11060

In [5]:
players = players.drop('id','player_fifa_api_id')
players.show()
players.columns

+-------------+--------------------+-------------------+------+------+
|player_api_id|         player_name|           birthday|height|weight|
+-------------+--------------------+-------------------+------+------+
|       505942|  Aaron Appindangoye|1992-02-29 00:00:00|182.88|   187|
|       155782|     Aaron Cresswell|1989-12-15 00:00:00|170.18|   146|
|       162549|         Aaron Doran|1991-05-13 00:00:00|170.18|   163|
|        30572|       Aaron Galindo|1982-05-08 00:00:00|182.88|   198|
|        23780|        Aaron Hughes|1979-11-08 00:00:00|182.88|   154|
|        27316|          Aaron Hunt|1986-09-04 00:00:00|182.88|   161|
|       564793|          Aaron Kuhl|1996-01-30 00:00:00|172.72|   146|
|        30895|        Aaron Lennon|1987-04-16 00:00:00| 165.1|   139|
|       528212|        Aaron Lennox|1993-02-19 00:00:00| 190.5|   181|
|       101042|       Aaron Meijers|1987-10-28 00:00:00|175.26|   170|
|        23889|       Aaron Mokoena|1980-11-25 00:00:00|182.88|   181|
|     

['player_api_id', 'player_name', 'birthday', 'height', 'weight']

In [28]:
player_attributes.columns

['id',
 'player_fifa_api_id',
 'player_api_id',
 'date',
 'overall_rating',
 'potential',
 'preferred_foot',
 'attacking_work_rate',
 'defensive_work_rate',
 'crossing',
 'finishing',
 'heading_accuracy',
 'short_passing',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'sprint_speed',
 'agility',
 'reactions',
 'balance',
 'shot_power',
 'jumping',
 'stamina',
 'strength',
 'long_shots',
 'aggression',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes']

In [6]:
player_attributes = player_attributes.drop(
 'id',
 'player_fifa_api_id',
 'preferred_foot',
 'attacking_work_rate',
 'defensive_work_rate',
 'crossing',
 'sprint_speed',
 'jumping',
 'balance',
 'aggression',
 'short_passing',
 'potential')
player_attributes.columns

['player_api_id',
 'date',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes']

In [31]:
player_attributes.count()

183978

In [7]:
player_attributes.dropna() # used to drop the records having missing elements
#player.drop() -> used to drop specified column within drop parameter

DataFrame[player_api_id: string, date: string, overall_rating: string, finishing: string, heading_accuracy: string, volleys: string, dribbling: string, curve: string, free_kick_accuracy: string, long_passing: string, ball_control: string, acceleration: string, agility: string, reactions: string, shot_power: string, stamina: string, strength: string, long_shots: string, interceptions: string, positioning: string, vision: string, penalties: string, marking: string, standing_tackle: string, sliding_tackle: string, gk_diving: string, gk_handling: string, gk_kicking: string, gk_positioning: string, gk_reflexes: string]

In [8]:
from pyspark.sql.functions import udf

In [9]:
year_extract_udf = udf(lambda date : date.split('-')[0])

player_attributes = player_attributes.withColumn(
    'year',
     year_extract_udf(player_attributes.date)
)
player_attributes.columns

['player_api_id',
 'date',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes',
 'year']

In [10]:
player_attributes=player_attributes.drop('date')

In [11]:
player_attributes.columns

['player_api_id',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes',
 'year']

In [12]:
players_attributes_2016 = player_attributes.filter(player_attributes.year=='2016')

In [13]:
players_attributes_2016.count()

14103

In [21]:
players_attributes_2016.select('year').distinct().count()

1

In [14]:
players_attributes_2016.dropna()

DataFrame[player_api_id: string, overall_rating: string, finishing: string, heading_accuracy: string, volleys: string, dribbling: string, curve: string, free_kick_accuracy: string, long_passing: string, ball_control: string, acceleration: string, agility: string, reactions: string, shot_power: string, stamina: string, strength: string, long_shots: string, interceptions: string, positioning: string, vision: string, penalties: string, marking: string, standing_tackle: string, sliding_tackle: string, gk_diving: string, gk_handling: string, gk_kicking: string, gk_positioning: string, gk_reflexes: string, year: string]

In [23]:
players_attributes_2016.count()

14103

In [15]:
players_attributes_2016.select('player_api_id').distinct().count()

5586

In [16]:
pa_striker_2016 = players_attributes_2016\
                       .groupBy('player_api_id')\
                       .agg({'shot_power':'avg',
                             'finishing':'avg',
                             'acceleration':'avg'})

In [30]:
pa_striker_2016.show(5)

+-------------+-----------------+-----------------+---------------+
|player_api_id|   avg(finishing)|avg(acceleration)|avg(shot_power)|
+-------------+-----------------+-----------------+---------------+
|       309726|75.44444444444444|74.11111111111111|           76.0|
|        26112|             53.0|             51.0|           76.0|
|        38433|            68.25|             74.0|           74.0|
|       295060|             25.0|             62.0|           40.0|
|       161396|             29.0|             72.0|           69.0|
+-------------+-----------------+-----------------+---------------+
only showing top 5 rows



In [17]:
pa_striker_2016 = pa_striker_2016.withColumnRenamed('avg(finishing)','finishing')\
                                 .withColumnRenamed('avg(acceleration)','acceleration')\
                                 .withColumnRenamed('avg(shot_power)','shot_power')

In [18]:
pa_striker_2016.show(5)

+-------------+-----------------+-----------------+----------+
|player_api_id|        finishing|     acceleration|shot_power|
+-------------+-----------------+-----------------+----------+
|       309726|75.44444444444444|74.11111111111111|      76.0|
|        26112|             53.0|             51.0|      76.0|
|        38433|            68.25|             74.0|      74.0|
|       295060|             25.0|             62.0|      40.0|
|       161396|             29.0|             72.0|      69.0|
+-------------+-----------------+-----------------+----------+
only showing top 5 rows



In [19]:
weight_finishing = 1
weight_shot_power = 2
weight_acceleration = 1

total_weights = weight_finishing+weight_shot_power+weight_acceleration

In [23]:
pa_striker_2016= pa_striker_2016.withColumn(
    'striker_grade',
    (pa_striker_2016.finishing*weight_finishing+pa_striker_2016.acceleration*weight_acceleration+pa_striker_2016.shot_power*weight_shot_power)/total_weights)

In [24]:
pa_striker_2016.show(5)

+-------------+-----------------+-----------------+----------+-----------------+
|player_api_id|        finishing|     acceleration|shot_power|    striker_grade|
+-------------+-----------------+-----------------+----------+-----------------+
|       309726|75.44444444444444|74.11111111111111|      76.0|75.38888888888889|
|        26112|             53.0|             51.0|      76.0|             64.0|
|        38433|            68.25|             74.0|      74.0|          72.5625|
|       295060|             25.0|             62.0|      40.0|            41.75|
|       161396|             29.0|             72.0|      69.0|            59.75|
+-------------+-----------------+-----------------+----------+-----------------+
only showing top 5 rows



In [27]:
pa_striker_2016=pa_striker_2016.drop('finishing','acceleration','shot_power')

In [28]:
pa_striker_2016.show(5)

+-------------+-----------------+
|player_api_id|    striker_grade|
+-------------+-----------------+
|       309726|75.38888888888889|
|        26112|             64.0|
|        38433|          72.5625|
|       295060|            41.75|
|       161396|            59.75|
+-------------+-----------------+
only showing top 5 rows



In [30]:
pa_striker_2016 =  pa_striker_2016.filter(pa_striker_2016.striker_grade >70)\
                                  .sort(pa_striker_2016.striker_grade.desc())
pa_striker_2016.show(10)

+-------------+-----------------+
|player_api_id|    striker_grade|
+-------------+-----------------+
|        20276|            89.25|
|        37412|             89.0|
|        38817|            88.75|
|        32118|            88.25|
|        31921|             87.0|
|        30834|            86.75|
|       303824|85.10714285714286|
|       129944|             85.0|
|       158263|            84.75|
|       150565|            84.75|
+-------------+-----------------+
only showing top 10 rows



In [31]:
pa_striker_2016.count(), players.count()

(1609, 11060)

In [35]:
#player_details = players.join(pa_striker_2016, pa_striker_2016.player_api_id == players.player_api_id)
#Join can happen by two ways
player_details = players.join(pa_striker_2016,['player_api_id'])

In [36]:
player_details.count()

1609

In [37]:
player_details.columns

['player_api_id',
 'player_name',
 'birthday',
 'height',
 'weight',
 'striker_grade']

In [38]:
player_details.show(5)

+-------------+--------------+-------------------+------+------+-------------+
|player_api_id|   player_name|           birthday|height|weight|striker_grade|
+-------------+--------------+-------------------+------+------+-------------+
|        20276|          Hulk|1986-07-25 00:00:00|180.34|   187|        89.25|
|        37412| Sergio Aguero|1988-06-02 00:00:00|172.72|   163|         89.0|
|        38817|  Carlos Tevez|1984-02-05 00:00:00|172.72|   157|        88.75|
|        32118|Lukas Podolski|1985-06-04 00:00:00|182.88|   183|        88.25|
|        31921|   Gareth Bale|1989-07-16 00:00:00|182.88|   163|         87.0|
+-------------+--------------+-------------------+------+------+-------------+
only showing top 5 rows



In [40]:
from pyspark.sql.functions import broadcast

In [43]:
# We should use broadcast variable for join operation on bigger data frame in real time scenario because simple
# join take long time and we broadcast smaller dataFrame compare to bigger dataFrame 
player_details = players.select('player_api_id','player_name')\
                        .join(
                            broadcast(pa_striker_2016),
                            ['player_api_id'],
                            'inner'
                        )

In [44]:
player_details.show(5)

+-------------+-----------------+-------------+
|player_api_id|      player_name|striker_grade|
+-------------+-----------------+-------------+
|        27316|       Aaron Hunt|        74.75|
|        40719|     Aaron Niguez|        74.25|
|        75489|     Aaron Ramsey|       76.875|
|       120919|Aatif Chahechouhe|         78.0|
|        67334|Abdoul Karim Yoda|         74.0|
+-------------+-----------------+-------------+
only showing top 5 rows



In [45]:
player_details = player_details.sort(player_details.striker_grade.desc())

In [46]:
player_details.show(5)

+-------------+--------------+-------------+
|player_api_id|   player_name|striker_grade|
+-------------+--------------+-------------+
|        20276|          Hulk|        89.25|
|        37412| Sergio Aguero|         89.0|
|        38817|  Carlos Tevez|        88.75|
|        32118|Lukas Podolski|        88.25|
|        31921|   Gareth Bale|         87.0|
+-------------+--------------+-------------+
only showing top 5 rows



In [47]:
pa_striker_2016.count()

1609

In [48]:
pa_striker_2016.columns

['player_api_id', 'striker_grade']

In [52]:
pa_striker_2016.select('player_api_id','striker_grade')\
               .coalesce(1)\
               .write\
               .option('header','true')\
               .csv('player_overall.csv')
#Coalesce(1) function helps to repartitioned all the partition acroos every cluster into 1(specified as parameter)

In [51]:
pa_striker_2016.select('player_api_id','striker_grade')\
               .write\
               .option('header','true')\
               .json('player_overall.json')

In [53]:
pa_striker_2016.select('player_api_id','striker_grade')\
               .coalesce(2)\
               .write\
               .option('header','true')\
               .csv('player_overall2.csv')