In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Analyzing soccer players").getOrCreate()

In [3]:
players=spark.read.format("csv").option("header", "true").load("..\datasets\player.csv")

In [4]:
players.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- height: string (nullable = true)
 |-- weight: string (nullable = true)



In [5]:
players.show(5)

+---+-------------+------------------+------------------+-------------------+------+------+
| id|player_api_id|       player_name|player_fifa_api_id|           birthday|height|weight|
+---+-------------+------------------+------------------+-------------------+------+------+
|  1|       505942|Aaron Appindangoye|            218353|1992-02-29 00:00:00|182.88|   187|
|  2|       155782|   Aaron Cresswell|            189615|1989-12-15 00:00:00|170.18|   146|
|  3|       162549|       Aaron Doran|            186170|1991-05-13 00:00:00|170.18|   163|
|  4|        30572|     Aaron Galindo|            140161|1982-05-08 00:00:00|182.88|   198|
|  5|        23780|      Aaron Hughes|             17725|1979-11-08 00:00:00|182.88|   154|
+---+-------------+------------------+------------------+-------------------+------+------+
only showing top 5 rows



In [6]:
player_attributes=spark.read.format("csv").option("header", "true").load("..\datasets\Player_Attributes.csv")

In [8]:
player_attributes.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- overall_rating: string (nullable = true)
 |-- potential: string (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- attacking_work_rate: string (nullable = true)
 |-- defensive_work_rate: string (nullable = true)
 |-- crossing: string (nullable = true)
 |-- finishing: string (nullable = true)
 |-- heading_accuracy: string (nullable = true)
 |-- short_passing: string (nullable = true)
 |-- volleys: string (nullable = true)
 |-- dribbling: string (nullable = true)
 |-- curve: string (nullable = true)
 |-- free_kick_accuracy: string (nullable = true)
 |-- long_passing: string (nullable = true)
 |-- ball_control: string (nullable = true)
 |-- acceleration: string (nullable = true)
 |-- sprint_speed: string (nullable = true)
 |-- agility: string (nullable = true)
 |-- reactions: string (nullable = true

In [9]:
players.count(), player_attributes.count()

(11060, 183978)

In [10]:
player_attributes.select('player_api_id').distinct().count()

11060

In [11]:
players.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- height: string (nullable = true)
 |-- weight: string (nullable = true)



In [12]:
players=players.drop('id','player_fifa_api_id')

In [13]:
players.columns

['player_api_id', 'player_name', 'birthday', 'height', 'weight']

In [14]:
player_attributes=player_attributes.drop(
'id', 'player_fifa_api_id', 'preferred_foot', 'attacking_work_rate',
'defensive_work_rate','crossing','jumping','sprint_speed','balance','aggression','short_passing','potential')

In [15]:
player_attributes.columns

['player_api_id',
 'date',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes']

In [16]:
player_attributes=player_attributes.dropna()

In [17]:
players=players.dropna()

In [18]:
players.count(), player_attributes.count()

(11060, 181265)

In [19]:
from pyspark.sql.functions import udf

In [20]:
year_extract_udf=udf(lambda date: date.split('-')[0])
player_attributes = player_attributes.withColumn("year", year_extract_udf(player_attributes.date))

In [21]:
player_attributes=player_attributes.drop('date')

In [23]:
player_attributes.columns

['player_api_id',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes',
 'year']

In [24]:
pa_2016=player_attributes.filter(player_attributes.year==2016)

In [25]:
pa_2016.count()

14098

In [26]:
pa_2016.select(pa_2016.player_api_id).distinct().count()

5586

In [27]:
pa_striker_2016=pa_2016.groupBy('player_api_id').agg({'finishing':"avg", "shot_power":"avg","acceleration":"avg"})

In [28]:
pa_striker_2016.show()

+-------------+-----------------+------------------+------------------+
|player_api_id|   avg(finishing)| avg(acceleration)|   avg(shot_power)|
+-------------+-----------------+------------------+------------------+
|       309726|75.44444444444444| 74.11111111111111|              76.0|
|        26112|             53.0|              51.0|              76.0|
|        38433|            68.25|              74.0|              74.0|
|       295060|             25.0|              62.0|              40.0|
|       161396|             29.0|              72.0|              69.0|
|        37774|             61.0|              64.0|              68.0|
|        41157|             81.0|              87.0|              80.0|
|        40740|             58.0|              73.5|              75.0|
|        31432|             14.0|              59.0|              65.0|
|       109653|             62.0|              65.0|              83.5|
|       282680|             12.0|              33.0|            

In [29]:
pa_striker_2016=pa_striker_2016.withColumnRenamed("avg(finishing)","finishing")\
.withColumnRenamed("avg(shot_power)","shot_power")\
.withColumnRenamed("avg(acceleration)","acceleration")

In [30]:
wt_finishing=1
wt_shot_power=2
wt_acceleration=1
total_wt=wt_finishing+wt_shot_power+wt_acceleration

In [31]:
strikers=pa_striker_2016.withColumn("striker_grade",(pa_striker_2016.finishing*wt_finishing+ \
                                                    pa_striker_2016.shot_power*wt_shot_power+\
                                                    pa_striker_2016.acceleration*wt_acceleration)/total_wt)

In [32]:
strikers=strikers.drop('finishing','shot_power','acceleration')

In [33]:
strikers = strikers.filter(strikers.striker_grade>70).sort(strikers.striker_grade.desc())

In [34]:
strikers.show()

+-------------+-----------------+
|player_api_id|    striker_grade|
+-------------+-----------------+
|        20276|            89.25|
|        37412|             89.0|
|        38817|            88.75|
|        32118|            88.25|
|        31921|             87.0|
|        30834|            86.75|
|       303824|85.10714285714286|
|       129944|             85.0|
|       150565|            84.75|
|       158263|            84.75|
|        25759|84.66666666666667|
|       156726|             84.5|
|       169193|          84.4375|
|       286119|84.42857142857143|
|        30348|           84.375|
|        93447|            84.25|
|        50047|            84.25|
|        46509|            84.25|
|       178812|             84.0|
|       181276|             84.0|
+-------------+-----------------+
only showing top 20 rows



In [35]:
striker_details = players.join(strikers, players.player_api_id == strikers.player_api_id)

In [36]:
striker_details.show()

+-------------+--------------------+-------------------+------+------+-------------+-----------------+
|player_api_id|         player_name|           birthday|height|weight|player_api_id|    striker_grade|
+-------------+--------------------+-------------------+------+------+-------------+-----------------+
|        20276|                Hulk|1986-07-25 00:00:00|180.34|   187|        20276|            89.25|
|        37412|       Sergio Aguero|1988-06-02 00:00:00|172.72|   163|        37412|             89.0|
|        38817|        Carlos Tevez|1984-02-05 00:00:00|172.72|   157|        38817|            88.75|
|        32118|      Lukas Podolski|1985-06-04 00:00:00|182.88|   183|        32118|            88.25|
|        31921|         Gareth Bale|1989-07-16 00:00:00|182.88|   163|        31921|             87.0|
|        30834|        Arjen Robben|1984-01-23 00:00:00|180.34|   176|        30834|            86.75|
|       303824|       Memphis Depay|1994-02-13 00:00:00|175.26|   172|   

In [37]:
from pyspark.sql.functions import broadcast

In [38]:
striker_details = players.select("player_api_id","player_name").join(broadcast(strikers),['player_api_id'],'inner')

In [39]:
striker_details=striker_details.sort(striker_details.striker_grade.desc())

In [40]:
striker_details.show(5)

+-------------+--------------+-------------+
|player_api_id|   player_name|striker_grade|
+-------------+--------------+-------------+
|        20276|          Hulk|        89.25|
|        37412| Sergio Aguero|         89.0|
|        38817|  Carlos Tevez|        88.75|
|        32118|Lukas Podolski|        88.25|
|        31921|   Gareth Bale|         87.0|
+-------------+--------------+-------------+
only showing top 5 rows



In [41]:
heading_acc = player_attributes.select('player_api_id','heading_accuracy')\
.join(broadcast(players),player_attributes.player_api_id == players.player_api_id)

In [42]:
heading_acc.columns

['player_api_id',
 'heading_accuracy',
 'player_api_id',
 'player_name',
 'birthday',
 'height',
 'weight']

In [43]:
st_cnt = spark.sparkContext.accumulator(0)
ml_cnt = spark.sparkContext.accumulator(0)
mh_cnt = spark.sparkContext.accumulator(0)
tl_cnt = spark.sparkContext.accumulator(0)

In [44]:
def count_players_by_height(row):
    ht = float(row.height)
    if (ht <=175):
        st_cnt.add(1)
    elif (ht <= 183 and ht > 175):
        ml_cnt.add(1)
    elif (ht <= 195 and ht > 183):
        mh_cnt.add(1)
    elif (ht > 195):
        tl_cnt.add(1)

In [45]:
heading_acc.foreach(lambda x: count_players_by_height(x))

In [47]:
all_players = [st_cnt, ml_cnt, mh_cnt, tl_cnt]
all_players

[Accumulator<id=0, value=18977>,
 Accumulator<id=1, value=97399>,
 Accumulator<id=2, value=61518>,
 Accumulator<id=3, value=3371>]

In [48]:
pa_2016.columns

['player_api_id',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes',
 'year']

In [53]:
# pa_2016.select('player_api_id','overall_rating').coalesce(1).write.option("header", "true").csv("players_overall.csv")

In [54]:
# pa_2016.select('player_api_id','overall_rating').write.json("players_overall.json")

In [55]:
from pyspark.accumulators import AccumulatorParam

In [56]:
class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, value):
        return [0.0]*len(value)
    
    def addInPlace(self,v1,v2):
        for i in range(len(v1)):
            v1[i] += v2[i]
            
        return v1
    

In [57]:
vector_accum = sc.accumulator([10.0,20.0,30.0], VectorAccumulatorParam())


In [58]:
vector_accum.value


[10.0, 20.0, 30.0]

In [59]:
vector_accum += [1,2,3]
vector_accum.value

[11.0, 22.0, 33.0]