In [1]:
player_ds_path  = r'D:/spark_practice/02/demos/datasets/player.csv'
player_attr_ds_path  = r'D:/spark_practice/02/demos/datasets/player_attributes.csv'

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
                        .appName("Analyzing soccer players")\
                        .getOrCreate()


In [3]:
players = spark.read\
                .format('csv')\
                .option('header','true')\
                .load(player_ds_path)

In [4]:
players.show(5)

+---+-------------+------------------+------------------+-------------------+------+------+
| id|player_api_id|       player_name|player_fifa_api_id|           birthday|height|weight|
+---+-------------+------------------+------------------+-------------------+------+------+
|  1|       505942|Aaron Appindangoye|            218353|1992-02-29 00:00:00|182.88|   187|
|  2|       155782|   Aaron Cresswell|            189615|1989-12-15 00:00:00|170.18|   146|
|  3|       162549|       Aaron Doran|            186170|1991-05-13 00:00:00|170.18|   163|
|  4|        30572|     Aaron Galindo|            140161|1982-05-08 00:00:00|182.88|   198|
|  5|        23780|      Aaron Hughes|             17725|1979-11-08 00:00:00|182.88|   154|
+---+-------------+------------------+------------------+-------------------+------+------+
only showing top 5 rows



In [5]:
player_attribute = spark.read\
                .format('csv')\
                .option('header','true')\
                .load(player_attr_ds_path)

In [6]:
player_attribute.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- overall_rating: string (nullable = true)
 |-- potential: string (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- attacking_work_rate: string (nullable = true)
 |-- defensive_work_rate: string (nullable = true)
 |-- crossing: string (nullable = true)
 |-- finishing: string (nullable = true)
 |-- heading_accuracy: string (nullable = true)
 |-- short_passing: string (nullable = true)
 |-- volleys: string (nullable = true)
 |-- dribbling: string (nullable = true)
 |-- curve: string (nullable = true)
 |-- free_kick_accuracy: string (nullable = true)
 |-- long_passing: string (nullable = true)
 |-- ball_control: string (nullable = true)
 |-- acceleration: string (nullable = true)
 |-- sprint_speed: string (nullable = true)
 |-- agility: string (nullable = true)
 |-- reactions: string (nullable = true

In [7]:
players.count(), player_attribute.count()

(11060, 183978)

In [8]:
player_attribute.select('player_api_id')\
                .distinct()\
                .count()

11060

In [9]:
players = players.drop('id','player_fifa_api_id')
players.columns

['player_api_id', 'player_name', 'birthday', 'height', 'weight']

In [10]:
player_attribute = player_attribute.drop(
    "id",
    'player_fifa_api_id',
    'preferred_foot',
    'attacking_work_rate',
    'defensive_work_rate',
    'crossing',
    'jumping',
    'sprint_speed',
    'balance',
    'aggression',
    'short_passing',
    'potential'
)

player_attribute.columns

['player_api_id',
 'date',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes']

In [11]:
player_attribute = player_attribute.dropna()
players = players.dropna()

In [12]:
from pyspark.sql.functions import udf

In [13]:
year_extract_udf = udf(lambda date : date.split('-')[0])

player_attribute = player_attribute.withColumn(
    "year",
    year_extract_udf(player_attribute.date)
)

In [14]:
player_attribute = player_attribute.drop('date')

In [15]:
player_attribute.columns

['player_api_id',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes',
 'year']

In [16]:
pa_2016 = player_attribute.filter(player_attribute['year']==2016)

In [17]:
pa_2016.count()

14098

In [18]:
pa_2016.select(pa_2016['player_api_id'])\
        .distinct()\
        .count()

5586

In [19]:
pa_striker_2016 = pa_2016.groupBy('player_api_id')\
            .agg({
    "finishing":"avg",
    "shot_power":"avg",
    "acceleration":"avg"
})

In [20]:
pa_striker_2016.count()

5586

In [21]:
pa_striker_2016.show(5)

+-------------+-----------------+-----------------+---------------+
|player_api_id|   avg(finishing)|avg(acceleration)|avg(shot_power)|
+-------------+-----------------+-----------------+---------------+
|       309726|75.44444444444444|74.11111111111111|           76.0|
|        26112|             53.0|             51.0|           76.0|
|        38433|            68.25|             74.0|           74.0|
|       295060|             25.0|             62.0|           40.0|
|       161396|             29.0|             72.0|           69.0|
+-------------+-----------------+-----------------+---------------+
only showing top 5 rows



In [22]:
pa_striker_2016 = pa_striker_2016.withColumnRenamed("avg(finishing)", "finishing")\
                                    .withColumnRenamed("avg(acceleration)", "acceleration")\
                                    .withColumnRenamed("avg(shot_power)", "shot_power")

In [23]:
weight_finishing = 1
weight_shot_power = 2
weight_acceleration = 1

total_weight = weight_acceleration+weight_finishing+weight_shot_power

In [41]:
strikers = pa_striker_2016.withColumn("striker_grade",
                                   (pa_striker_2016["finishing"]*weight_finishing+\
                                   pa_striker_2016["acceleration"]*weight_acceleration+\
                                    
                                   pa_striker_2016["shot_power"]*weight_shot_power
                                   )/total_weight 
                                   )

In [42]:
strikers= strikers.drop('finishing','acceleration','shot_power')

In [43]:
strikers = strikers.filter(strikers['striker_grade']>70)

In [31]:
strikers.show(10)

+-------------+-----------------+
|player_api_id|    striker_grade|
+-------------+-----------------+
|       309726|75.38888888888889|
|        38433|          72.5625|
|        41157|             82.0|
|        40740|           70.375|
|       109653|             73.5|
|       190851|            74.25|
|       196957|73.33333333333334|
|       362212|70.52777777777777|
|        26005|             74.0|
|         3517|             78.0|
+-------------+-----------------+
only showing top 10 rows



In [32]:
players.show(10)

+-------------+------------------+-------------------+------+------+
|player_api_id|       player_name|           birthday|height|weight|
+-------------+------------------+-------------------+------+------+
|       505942|Aaron Appindangoye|1992-02-29 00:00:00|182.88|   187|
|       155782|   Aaron Cresswell|1989-12-15 00:00:00|170.18|   146|
|       162549|       Aaron Doran|1991-05-13 00:00:00|170.18|   163|
|        30572|     Aaron Galindo|1982-05-08 00:00:00|182.88|   198|
|        23780|      Aaron Hughes|1979-11-08 00:00:00|182.88|   154|
|        27316|        Aaron Hunt|1986-09-04 00:00:00|182.88|   161|
|       564793|        Aaron Kuhl|1996-01-30 00:00:00|172.72|   146|
|        30895|      Aaron Lennon|1987-04-16 00:00:00| 165.1|   139|
|       528212|      Aaron Lennox|1993-02-19 00:00:00| 190.5|   181|
|       101042|     Aaron Meijers|1987-10-28 00:00:00|175.26|   170|
+-------------+------------------+-------------------+------+------+
only showing top 10 rows



In [44]:
striker_details = players.join(strikers, on = ['player_api_id'])

In [35]:
striker_details.show(10)

+-------------+--------------------+-------------------+------+------+-----------------+
|player_api_id|         player_name|           birthday|height|weight|    striker_grade|
+-------------+--------------------+-------------------+------+------+-----------------+
|       309726|      Andrea Belotti|1993-12-20 00:00:00|180.34|   159|75.38888888888889|
|        38433|        Borja Valero|1985-01-12 00:00:00|175.26|   161|          72.5625|
|        41157|  Giovani dos Santos|1989-05-11 00:00:00|175.26|   163|             82.0|
|        40740|        Jeremy Morel|1984-04-02 00:00:00|172.72|   157|           70.375|
|       109653|       John Goossens|1988-07-25 00:00:00|175.26|   150|             73.5|
|       190851|        Kenny McLean|1992-01-08 00:00:00|180.34|   154|            74.25|
|       196957|Mihai Alexandru R...|1992-05-31 00:00:00| 190.5|   176|73.33333333333334|
|       362212|     Piotr Zielinski|1994-05-20 00:00:00|180.34|   165|70.52777777777777|
|        26005|    Th

# using broadcast to speed up join
Broadcasting smaller dataframe will cache it in the worker, hence making the join faster. Broadcast variable are read only

In [37]:
from pyspark.sql.functions import broadcast

In [45]:
sriker_details = players.select(
    "player_api_id",
    "player_name"
).join(
    broadcast(strikers),
    on = ['player_api_id'],
    how = 'inner'
)

In [47]:
striker_details = striker_details.sort(striker_details['striker_grade'].desc())

In [49]:
striker_details.show(10)

+-------------+--------------------+-------------------+------+------+-----------------+
|player_api_id|         player_name|           birthday|height|weight|    striker_grade|
+-------------+--------------------+-------------------+------+------+-----------------+
|        20276|                Hulk|1986-07-25 00:00:00|180.34|   187|            89.25|
|        37412|       Sergio Aguero|1988-06-02 00:00:00|172.72|   163|             89.0|
|        38817|        Carlos Tevez|1984-02-05 00:00:00|172.72|   157|            88.75|
|        32118|      Lukas Podolski|1985-06-04 00:00:00|182.88|   183|            88.25|
|        31921|         Gareth Bale|1989-07-16 00:00:00|182.88|   163|             87.0|
|        30834|        Arjen Robben|1984-01-23 00:00:00|180.34|   176|            86.75|
|       303824|       Memphis Depay|1994-02-13 00:00:00|175.26|   172|85.10714285714286|
|       129944|          Marco Reus|1989-05-31 00:00:00|180.34|   165|             85.0|
|       158263|      

# finding correlation of height with header accuracy (use of accumulators)

In [50]:
player_heading_acc = player_attribute.select('player_api_id','heading_accuracy').join(
    broadcast(players),
    ['player_api_id'],
    'inner'
)

In [51]:
player_heading_acc.columns

['player_api_id',
 'heading_accuracy',
 'player_name',
 'birthday',
 'height',
 'weight']

In [66]:
short_count = spark.sparkContext.accumulator(0)
medium_low_count = spark.sparkContext.accumulator(0)
medium_high_count = spark.sparkContext.accumulator(0)
high_count = spark.sparkContext.accumulator(0)

In [67]:
def count_players_by_height(row):
    height = float(row['height'])
    
    if height<=175:
        short_count.add(1)
    elif height<=183 and height>175:
        medium_low_count.add(1)
    elif height<=195 and height>183:
        medium_high_count.add(1)
    else:
        high_count.add(1)

In [68]:
player_heading_acc.foreach(lambda x: count_players_by_height(x))

In [56]:
short_count.value, medium_low_count.value, medium_low_count.value, high_count.value

(18977, 97399, 97399, 3371)

In [69]:
ha_short_count = spark.sparkContext.accumulator(0)
ha_medium_low_count = spark.sparkContext.accumulator(0)
ha_medium_high_count = spark.sparkContext.accumulator(0)
ha_high_count = spark.sparkContext.accumulator(0)

In [70]:
def count_players_by_height_accuracy(row, threshold):
    height = float(row['height'])
    ha = float(row['heading_accuracy'])
    if ha<=threshold:
        return
    if height<=175:
        ha_short_count.add(1)
    elif height<=183 and height>175:
        ha_medium_low_count.add(1)
    elif height<=195 and height>183:
        ha_medium_high_count.add(1)
    else:
        ha_high_count.add(1)

In [71]:
player_heading_acc.foreach(lambda x: count_players_by_height_accuracy(x,75))

In [78]:
ha_short_count.value/short_count.value, ha_medium_low_count.value/medium_low_count.value, ha_medium_high_count.value/medium_high_count.value,ha_high_count.value/high_count.value

(0.005849185856563208,
 0.0522592634421298,
 0.1929679118306837,
 0.24503114802729162)