In [50]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.functions import broadcast
from pyspark.accumulators import AccumulatorParam

In [2]:
spark = SparkSession.builder\
                    .appName("Analyzing soccer players")\
                    .getOrCreate()

In [3]:
players = spark.read\
                .format("csv")\
                .option("header", "true")\
                .load("../datasets/player.csv")

In [4]:
players.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- height: string (nullable = true)
 |-- weight: string (nullable = true)



In [5]:
players.show(5)

+---+-------------+------------------+------------------+-------------------+------+------+
| id|player_api_id|       player_name|player_fifa_api_id|           birthday|height|weight|
+---+-------------+------------------+------------------+-------------------+------+------+
|  1|       505942|Aaron Appindangoye|            218353|1992-02-29 00:00:00|182.88|   187|
|  2|       155782|   Aaron Cresswell|            189615|1989-12-15 00:00:00|170.18|   146|
|  3|       162549|       Aaron Doran|            186170|1991-05-13 00:00:00|170.18|   163|
|  4|        30572|     Aaron Galindo|            140161|1982-05-08 00:00:00|182.88|   198|
|  5|        23780|      Aaron Hughes|             17725|1979-11-08 00:00:00|182.88|   154|
+---+-------------+------------------+------------------+-------------------+------+------+
only showing top 5 rows



In [6]:
player_attributes = spark.read\
                        .format("csv")\
                        .option("header", "true")\
                        .load("../datasets/player_attributes.csv")

In [7]:
player_attributes.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- overall_rating: string (nullable = true)
 |-- potential: string (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- attacking_work_rate: string (nullable = true)
 |-- defensive_work_rate: string (nullable = true)
 |-- crossing: string (nullable = true)
 |-- finishing: string (nullable = true)
 |-- heading_accuracy: string (nullable = true)
 |-- short_passing: string (nullable = true)
 |-- volleys: string (nullable = true)
 |-- dribbling: string (nullable = true)
 |-- curve: string (nullable = true)
 |-- free_kick_accuracy: string (nullable = true)
 |-- long_passing: string (nullable = true)
 |-- ball_control: string (nullable = true)
 |-- acceleration: string (nullable = true)
 |-- sprint_speed: string (nullable = true)
 |-- agility: string (nullable = true)
 |-- reactions: string (nullable = true

In [8]:
players.count() , player_attributes.count()

(11060, 183978)

In [9]:
player_attributes.select('player_api_id')\
                .distinct()\
                .count()

11060

In [10]:
players = players.drop('id', 'player_fifa_api_id')
players.columns

['player_api_id', 'player_name', 'birthday', 'height', 'weight']

In [11]:
player_attributes = player_attributes.drop(
    'id',
    'player_fifa_api_id',
    'preferred_foot',
    'attacking_work_rate',
    'defensive_work_rate',
    'crossing',
    'jumping',
    'sprint_speed',
    'balance',
    'aggression',
    'short_passing',
    'potential')
player_attributes.columns

['player_api_id',
 'date',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes']

In [12]:
player_attributes = player_attributes.dropna()
players = players.dropna()

In [13]:
players.count() , player_attributes.count()

(11060, 181265)

In [14]:
# udf function to extract the year from a date
# also an example of a python lambda
year_extract_udf = udf(lambda date: date.split('-')[0])

In [15]:
player_attributes = player_attributes.withColumn(
    "year",
    year_extract_udf(player_attributes.date))

In [16]:
player_attributes = player_attributes.drop('date')

In [17]:
player_attributes.columns

['player_api_id',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes',
 'year']

In [18]:
pa_2016 = player_attributes.filter(player_attributes.year == 2016)

In [19]:
pa_2016.count()

14098

In [20]:
pa_2016.select("player_api_id")\
        .distinct()\
        .count()

5586

In [21]:
pa_striker_2016 = pa_2016.groupBy('player_api_id')\
                        .agg({
                            'finishing':"avg",
                            "shot_power":"avg",
                            "acceleration":"avg"
                        })

In [22]:
pa_striker_2016.count()

5586

In [23]:
pa_striker_2016.show(5)

+-------------+-----------------+-----------------+---------------+
|player_api_id|   avg(finishing)|avg(acceleration)|avg(shot_power)|
+-------------+-----------------+-----------------+---------------+
|       309726|75.44444444444444|74.11111111111111|           76.0|
|        26112|             53.0|             51.0|           76.0|
|        38433|            68.25|             74.0|           74.0|
|       295060|             25.0|             62.0|           40.0|
|       161396|             29.0|             72.0|           69.0|
+-------------+-----------------+-----------------+---------------+
only showing top 5 rows



In [24]:
pa_striker_2016 = pa_striker_2016.withColumnRenamed("avg(finishing)", "finishing")\
                                .withColumnRenamed("avg(shot_power)", "shot_power")\
                                .withColumnRenamed("avg(acceleration)", "acceleration")

In [25]:
weight_finishing = 1
weight_shot_power = 2
weight_acceleration = 1

total_weight = weight_finishing + weight_shot_power + weight_acceleration

In [26]:
strikers = pa_striker_2016.withColumn("striker_grade",
                                     (pa_striker_2016.finishing * weight_finishing + \
                                     pa_striker_2016.shot_power * weight_shot_power + \
                                     pa_striker_2016.acceleration * weight_acceleration) / total_weight)

In [27]:
strikers = strikers.drop('finishing',
             'acceleration',
             'shot_power')

In [28]:
strikers = strikers.filter(strikers.striker_grade > 70)\
                                .sort("striker_grade", ascending=0)

strikers.show(10)

+-------------+-----------------+
|player_api_id|    striker_grade|
+-------------+-----------------+
|        20276|            89.25|
|        37412|             89.0|
|        38817|            88.75|
|        32118|            88.25|
|        31921|             87.0|
|        30834|            86.75|
|       303824|85.10714285714286|
|       129944|             85.0|
|       150565|            84.75|
|       158263|            84.75|
+-------------+-----------------+
only showing top 10 rows



In [29]:
strikers.count() , players.count()

(1609, 11060)

In [30]:
strikers_details = players.join(strikers, players.player_api_id == strikers.player_api_id)

In [31]:
strikers_details.columns

['player_api_id',
 'player_name',
 'birthday',
 'height',
 'weight',
 'player_api_id',
 'striker_grade']

In [32]:
strikers_details.count()

1609

In [33]:
# alternative join syntax
striker_details = players.join(strikers, ['player_api_id'])

In [34]:
striker_details.show(5)

+-------------+--------------+-------------------+------+------+-------------+
|player_api_id|   player_name|           birthday|height|weight|striker_grade|
+-------------+--------------+-------------------+------+------+-------------+
|        20276|          Hulk|1986-07-25 00:00:00|180.34|   187|        89.25|
|        37412| Sergio Aguero|1988-06-02 00:00:00|172.72|   163|         89.0|
|        38817|  Carlos Tevez|1984-02-05 00:00:00|172.72|   157|        88.75|
|        32118|Lukas Podolski|1985-06-04 00:00:00|182.88|   183|        88.25|
|        31921|   Gareth Bale|1989-07-16 00:00:00|182.88|   163|         87.0|
+-------------+--------------+-------------------+------+------+-------------+
only showing top 5 rows



In [35]:
# make sure you broadcast the smaller data frame so you don't copy too much data to all nodes

striker_details = players.select(
                                "player_api_id",
                                "player_name"
                                )\
                        .join(
                            broadcast(strikers),
                            ['player_api_id'],
                            'inner'
                        )

In [36]:
players.count() , player_attributes.count()

(11060, 181265)

In [37]:
players_heading_acc = player_attributes.select('player_api_id',
                                              'heading_accuracy')\
                                        .join(broadcast(players),
                                             player_attributes.player_api_id == players.player_api_id)

In [38]:
players_heading_acc.columns

['player_api_id',
 'heading_accuracy',
 'player_api_id',
 'player_name',
 'birthday',
 'height',
 'weight']

<h1>Working with accumulators to build aggregates</h1>

In [39]:
# using accumulators to track counts of attributes
short_count = spark.sparkContext.accumulator(0)
medium_low_count = spark.sparkContext.accumulator(0)
medium_high_count = spark.sparkContext.accumulator(0)
tall_count = spark.sparkContext.accumulator(0)

In [40]:
def count_players_by_height(row):
    height = float(row.height)
    
    if(height <= 175):
        short_count.add(1)
    elif(height <= 183 and height > 175):
        medium_low_count.add(1)
    elif(height <= 195 and height > 183):
        medium_high_count.add(1)
    elif(height > 195):
        tall_count.add(1)

In [41]:
players_heading_acc.foreach(lambda x: count_players_by_height(x))

In [42]:
all_players = [short_count.value,
              medium_low_count.value,
              medium_high_count.value,
              tall_count.value]
all_players

[18977, 97399, 61518, 3371]

In [43]:
short_ha_count = spark.sparkContext.accumulator(0)
medium_low_ha_count = spark.sparkContext.accumulator(0)
medium_high_ha_count = spark.sparkContext.accumulator(0)
tall_ha_count = spark.sparkContext.accumulator(0)

In [44]:
def count_players_by_height_and_heading_accuracy(row, threshold_score):
    
    height = float(row.height)
    ha = float(row.heading_accuracy)
    
    if(ha <= threshold_score):
        return
    
    if(height <= 175):
        short_ha_count.add(1)
    elif(height <= 183 and height > 175):
        medium_low_ha_count.add(1)
    elif(height <= 195 and height > 183):
        medium_high_ha_count.add(1)
    elif(height > 195):
        tall_ha_count.add(1)

In [45]:
players_heading_acc.foreach(lambda x: count_players_by_height_and_heading_accuracy(x, 60))

In [46]:
all_players_above_threshold = [short_ha_count.value,
                              medium_low_ha_count.value,
                              medium_high_ha_count.value,
                              tall_ha_count.value]
all_players_above_threshold

[3653, 41448, 40270, 1573]

In [47]:
percent_above_threshold = [short_ha_count.value / short_count.value * 100,
                          medium_low_ha_count.value / medium_low_count.value * 100,
                          medium_high_ha_count.value / medium_high_count.value * 100,
                          tall_ha_count.value / tall_count.value * 100]
percent_above_threshold

[19.249617958581442, 42.55485169252251, 65.46051562144413, 46.66271136161376]

<h1>Saving analysis data to CSV and JSON</h1>

In [48]:
# coalesce - repartitions the data into a specific number of partitiions
# in this case we are repartitioning into 1 partition so we only create 1 file
# NOTE: spark will write out a file for each parition

pa_2016.select("player_api_id", "overall_rating")\
        .coalesce(1)\
        .write\
        .option("header","true")\
        .csv("players_overall.csv")

AnalysisException: 'path file:/Users/mbeesley/code/sparkdemo/code/players_overall.csv already exists.;'

In [None]:
# example of writing data that hasn't been coalesced into a single partition
pa_2016.select("player_api_id", "overall_rating")\
        .write\
        .json("players_overall.json")

<h1>Miscellaneous Operations in Spark</h1>
<ul>
    <li>Creating Custom Accumulators</li>
    <li>Different Join Operations</li>
</ul>

In [51]:
class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, value):
        return [0.0] * len(value)
    def addInPlace(self, v1, v2):
        for i in range(len(v1)):
            v1[i] += v2[i]
        return v1

In [53]:
vector_accum = sc.accumulator([10.0, 20.0, 30.0], VectorAccumulatorParam())

vector_accum.value

[10.0, 20.0, 30.0]

In [54]:
vector_accum += [1, 2, 3]
vector_accum.value

[11.0, 22.0, 33.0]

In [55]:
valuesA = [('John', 100000), ('James', 150000), ('Emily', 65000), ('Nina', 200000)]
tableA = spark.createDataFrame(valuesA, ['name', 'salary'])

In [56]:
tableA.show()

+-----+------+
| name|salary|
+-----+------+
| John|100000|
|James|150000|
|Emily| 65000|
| Nina|200000|
+-----+------+



In [57]:
valuesB = [('James', 2), ('Emily', 3), ('Darth Vader', 5), ('Princess Leia', 6)]
tableB = spark.createDataFrame(valuesB, ['name', 'employee_id'])

In [58]:
tableB.show()

+-------------+-----------+
|         name|employee_id|
+-------------+-----------+
|        James|          2|
|        Emily|          3|
|  Darth Vader|          5|
|Princess Leia|          6|
+-------------+-----------+



In [59]:
inner_join = tableA.join(tableB, tableA.name == tableB.name)
inner_join.show()

+-----+------+-----+-----------+
| name|salary| name|employee_id|
+-----+------+-----+-----------+
|James|150000|James|          2|
|Emily| 65000|Emily|          3|
+-----+------+-----+-----------+



In [60]:
left_join = tableA.join(tableB, tableA.name == tableB.name, how='left')
left_join.show()

+-----+------+-----+-----------+
| name|salary| name|employee_id|
+-----+------+-----+-----------+
|James|150000|James|          2|
| John|100000| null|       null|
|Emily| 65000|Emily|          3|
| Nina|200000| null|       null|
+-----+------+-----+-----------+



In [61]:
right_join = tableA.join(tableB, tableA.name == tableB.name, how='right')
right_join.show()

+-----+------+-------------+-----------+
| name|salary|         name|employee_id|
+-----+------+-------------+-----------+
|James|150000|        James|          2|
| null|  null|Princess Leia|          6|
|Emily| 65000|        Emily|          3|
| null|  null|  Darth Vader|          5|
+-----+------+-------------+-----------+



In [62]:
full_outer_join = tableA.join(tableB, tableA.name == tableB.name, how='full')
full_outer_join.show()

+-----+------+-------------+-----------+
| name|salary|         name|employee_id|
+-----+------+-------------+-----------+
|James|150000|        James|          2|
| John|100000|         null|       null|
| null|  null|Princess Leia|          6|
|Emily| 65000|        Emily|          3|
| Nina|200000|         null|       null|
| null|  null|  Darth Vader|          5|
+-----+------+-------------+-----------+

