In [2]:
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName("BroadcastAccumulators").getOrCreate()

## Topics Covered

* **pyspark.sql.functions**
    * udf
    * broadcast
    
* **DataFrames**
    * join
    * sort
    * foreach
    * coalesce
    * write <option /csv|json>
* **sc: Spark Context**
    * accumulator
    

In [11]:
players = spark.read\
               .format("csv")\
               .option("header", "true")\
               .load("./datasets/player.csv")

In [12]:
player_attributes = spark.read\
                         .format("csv")\
                         .option("header", "true")\
                         .load("./datasets/player_attributes.csv")

In [13]:
players = players.drop('id', 'player_fifa_api_id')
player_attributes = player_attributes.drop('id', 'player_fifa_api_id', 'preferred_foot','attacking_work_rate',
                                           'defensive_work_rate','crossing','jumping','sprint_speed','balance',
                                           'aggression','short_passing','potential')

In [14]:
from pyspark.sql.functions import udf #user defined function
year_extract_udf = udf(lambda date: date.split('-')[0])

In [15]:
player_attributes = player_attributes.withColumn(
    "year",
    year_extract_udf(player_attributes.date)
)
player_attributes.drop("year")

DataFrame[player_api_id: string, date: string, overall_rating: string, finishing: string, heading_accuracy: string, volleys: string, dribbling: string, curve: string, free_kick_accuracy: string, long_passing: string, ball_control: string, acceleration: string, agility: string, reactions: string, shot_power: string, stamina: string, strength: string, long_shots: string, interceptions: string, positioning: string, vision: string, penalties: string, marking: string, standing_tackle: string, sliding_tackle: string, gk_diving: string, gk_handling: string, gk_kicking: string, gk_positioning: string, gk_reflexes: string]

In [16]:
pa_2016 = player_attributes.filter(player_attributes.year == 2016)

In [20]:
pa_striker_2016 = pa_2016.groupBy('player_api_id')\
                       .agg({
                           'finishing':"avg",
                           "shot_power":"avg",
                           "acceleration":"avg"})\
                        .withColumnRenamed("avg(finishing)","finishing")\
                        .withColumnRenamed("avg(shot_power)","shot_power")\
                        .withColumnRenamed("avg(acceleration)","acceleration")
pa_striker_2016.show(5)

+-------------+-----------------+-----------------+----------+
|player_api_id|        finishing|     acceleration|shot_power|
+-------------+-----------------+-----------------+----------+
|       309726|75.44444444444444|74.11111111111111|      76.0|
|        26112|             53.0|             51.0|      76.0|
|        38433|            68.25|             74.0|      74.0|
|       295060|             25.0|             62.0|      40.0|
|       161396|             29.0|             72.0|      69.0|
+-------------+-----------------+-----------------+----------+
only showing top 5 rows



In [21]:
weight_finishing = 1
weight_shot_power = 2
weight_acceleration = 1

total_weight = weight_finishing + weight_shot_power + weight_acceleration
strikers = pa_striker_2016.withColumn("striker_grade",
                                      (pa_striker_2016.finishing * weight_finishing + \
                                       pa_striker_2016.shot_power * weight_shot_power+ \
                                       pa_striker_2016.acceleration * weight_acceleration) / total_weight)
strikers = strikers.drop('finishing',
                         'acceleration',
                         'shot_power'
)
strikers = strikers.filter(strikers.striker_grade > 70)\
                   .sort(strikers.striker_grade.desc())

## Joins

In [22]:
players.show(5)

+-------------+------------------+-------------------+------+------+
|player_api_id|       player_name|           birthday|height|weight|
+-------------+------------------+-------------------+------+------+
|       505942|Aaron Appindangoye|1992-02-29 00:00:00|182.88|   187|
|       155782|   Aaron Cresswell|1989-12-15 00:00:00|170.18|   146|
|       162549|       Aaron Doran|1991-05-13 00:00:00|170.18|   163|
|        30572|     Aaron Galindo|1982-05-08 00:00:00|182.88|   198|
|        23780|      Aaron Hughes|1979-11-08 00:00:00|182.88|   154|
+-------------+------------------+-------------------+------+------+
only showing top 5 rows



In [23]:
strikers.show(5)

+-------------+-------------+
|player_api_id|striker_grade|
+-------------+-------------+
|        20276|        89.25|
|        37412|         89.0|
|        38817|        88.75|
|        32118|        88.25|
|        31921|         87.0|
+-------------+-------------+
only showing top 5 rows



In [24]:
striker_details = players.join(strikers, players.player_api_id == strikers.player_api_id)
striker_details.show(5)

+-------------+--------------+-------------------+------+------+-------------+-------------+
|player_api_id|   player_name|           birthday|height|weight|player_api_id|striker_grade|
+-------------+--------------+-------------------+------+------+-------------+-------------+
|        20276|          Hulk|1986-07-25 00:00:00|180.34|   187|        20276|        89.25|
|        37412| Sergio Aguero|1988-06-02 00:00:00|172.72|   163|        37412|         89.0|
|        38817|  Carlos Tevez|1984-02-05 00:00:00|172.72|   157|        38817|        88.75|
|        32118|Lukas Podolski|1985-06-04 00:00:00|182.88|   183|        32118|        88.25|
|        31921|   Gareth Bale|1989-07-16 00:00:00|182.88|   163|        31921|         87.0|
+-------------+--------------+-------------------+------+------+-------------+-------------+
only showing top 5 rows



In [25]:
striker_details = players.join(strikers, ['player_api_id'])
striker_details.show(5)

+-------------+--------------+-------------------+------+------+-------------+
|player_api_id|   player_name|           birthday|height|weight|striker_grade|
+-------------+--------------+-------------------+------+------+-------------+
|        20276|          Hulk|1986-07-25 00:00:00|180.34|   187|        89.25|
|        37412| Sergio Aguero|1988-06-02 00:00:00|172.72|   163|         89.0|
|        38817|  Carlos Tevez|1984-02-05 00:00:00|172.72|   157|        88.75|
|        32118|Lukas Podolski|1985-06-04 00:00:00|182.88|   183|        88.25|
|        31921|   Gareth Bale|1989-07-16 00:00:00|182.88|   163|         87.0|
+-------------+--------------+-------------------+------+------+-------------+
only showing top 5 rows



## Broadcast

In [26]:
from pyspark.sql.functions import broadcast

In [27]:
#Broadcast the smaller dataframe so it is available on all cluster machines
striker_details = players.select(
                                "player_api_id",
                                "player_name"
                                 )\
                  .join(
                        broadcast(strikers), 
                        ['player_api_id'],   
                        'inner'
                  )

In [29]:
striker_details = striker_details.sort(striker_details.striker_grade.desc())
striker_details.show(5)

+-------------+--------------+-------------+
|player_api_id|   player_name|striker_grade|
+-------------+--------------+-------------+
|        20276|          Hulk|        89.25|
|        37412| Sergio Aguero|         89.0|
|        38817|  Carlos Tevez|        88.75|
|        32118|Lukas Podolski|        88.25|
|        31921|   Gareth Bale|         87.0|
+-------------+--------------+-------------+
only showing top 5 rows



## Accumulator

In [30]:
players_heading_acc = player_attributes.select('player_api_id',
                                               'heading_accuracy')\
                                       .join(broadcast(players),
                                             player_attributes.player_api_id == players.player_api_id)

In [31]:
short_count = spark.sparkContext.accumulator(0)
medium_low_count = spark.sparkContext.accumulator(0)
medium_high_count = spark.sparkContext.accumulator(0)
tall_count = spark.sparkContext.accumulator(0)

In [32]:
def count_players_by_height(row):
    height = float(row.height)
    
    if (height <= 175 ):
        short_count.add(1)
    elif (height <= 183 and height > 175 ):
        medium_low_count.add(1)
    elif (height <= 195 and height > 183 ):
        medium_high_count.add(1)
    elif (height > 195) :
        tall_count.add(1)

In [33]:
players_heading_acc.foreach(lambda x: count_players_by_height(x))
all_players = [short_count.value,
               medium_low_count.value,
               medium_high_count.value,
               tall_count.value]

all_players

[19204, 98958, 62411, 3405]

## Save

In [34]:
pa_2016.select("player_api_id", "overall_rating")\
    .coalesce(1)\
    .write\
    .option("header", "true")\
    .csv("players_overall.csv")

In [35]:
pa_2016.select("player_api_id", "overall_rating")\
    .write\
    .json("players_overall.json")