In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("Analyzing Soccer Players").getOrCreate()

In [4]:
spark

In [5]:
players = spark.read.format('csv').option('header',"true").load("/home/sudeep/sources/github/apache spark/spark getting started/02/demos/datasets/player.csv")

In [6]:
players

DataFrame[id: string, player_api_id: string, player_name: string, player_fifa_api_id: string, birthday: string, height: string, weight: string]

In [7]:
players.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- height: string (nullable = true)
 |-- weight: string (nullable = true)



In [8]:
players.show()

+---+-------------+--------------------+------------------+-------------------+------+------+
| id|player_api_id|         player_name|player_fifa_api_id|           birthday|height|weight|
+---+-------------+--------------------+------------------+-------------------+------+------+
|  1|       505942|  Aaron Appindangoye|            218353|1992-02-29 00:00:00|182.88|   187|
|  2|       155782|     Aaron Cresswell|            189615|1989-12-15 00:00:00|170.18|   146|
|  3|       162549|         Aaron Doran|            186170|1991-05-13 00:00:00|170.18|   163|
|  4|        30572|       Aaron Galindo|            140161|1982-05-08 00:00:00|182.88|   198|
|  5|        23780|        Aaron Hughes|             17725|1979-11-08 00:00:00|182.88|   154|
|  6|        27316|          Aaron Hunt|            158138|1986-09-04 00:00:00|182.88|   161|
|  7|       564793|          Aaron Kuhl|            221280|1996-01-30 00:00:00|172.72|   146|
|  8|        30895|        Aaron Lennon|            152747|1

In [9]:
players.collect()[0]

Row(id='1', player_api_id='505942', player_name='Aaron Appindangoye', player_fifa_api_id='218353', birthday='1992-02-29 00:00:00', height='182.88', weight='187')

In [10]:
players['id']

Column<b'id'>

In [11]:
players.select(players['id']).show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
| 20|
+---+
only showing top 20 rows



In [12]:
players_attributes = spark.read.format('csv').option('header',"true").load("/home/sudeep/sources/github/apache spark/spark getting started/02/demos/datasets/player_attributes.csv")

In [13]:
players_attributes.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- overall_rating: string (nullable = true)
 |-- potential: string (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- attacking_work_rate: string (nullable = true)
 |-- defensive_work_rate: string (nullable = true)
 |-- crossing: string (nullable = true)
 |-- finishing: string (nullable = true)
 |-- heading_accuracy: string (nullable = true)
 |-- short_passing: string (nullable = true)
 |-- volleys: string (nullable = true)
 |-- dribbling: string (nullable = true)
 |-- curve: string (nullable = true)
 |-- free_kick_accuracy: string (nullable = true)
 |-- long_passing: string (nullable = true)
 |-- ball_control: string (nullable = true)
 |-- acceleration: string (nullable = true)
 |-- sprint_speed: string (nullable = true)
 |-- agility: string (nullable = true)
 |-- reactions: string (nullable = true

In [14]:
    players.count(), players_attributes.count()

(11060, 183978)

In [15]:
players_attributes.select('player_api_id').distinct().count()

11060

In [16]:
players.drop('id', 'player_fifa_api_id')
players.columns

['id',
 'player_api_id',
 'player_name',
 'player_fifa_api_id',
 'birthday',
 'height',
 'weight']

In [17]:
players.dropna()

DataFrame[id: string, player_api_id: string, player_name: string, player_fifa_api_id: string, birthday: string, height: string, weight: string]

In [18]:
players_attributes.dropna()

DataFrame[id: string, player_fifa_api_id: string, player_api_id: string, date: string, overall_rating: string, potential: string, preferred_foot: string, attacking_work_rate: string, defensive_work_rate: string, crossing: string, finishing: string, heading_accuracy: string, short_passing: string, volleys: string, dribbling: string, curve: string, free_kick_accuracy: string, long_passing: string, ball_control: string, acceleration: string, sprint_speed: string, agility: string, reactions: string, balance: string, shot_power: string, jumping: string, stamina: string, strength: string, long_shots: string, aggression: string, interceptions: string, positioning: string, vision: string, penalties: string, marking: string, standing_tackle: string, sliding_tackle: string, gk_diving: string, gk_handling: string, gk_kicking: string, gk_positioning: string, gk_reflexes: string]

In [19]:
    players.count(), players_attributes.count()

(11060, 183978)

In [20]:
from pyspark.sql.functions import udf

In [21]:
year_extract_udf = udf(lambda date: date.split('-')[0])
players_attributes = players_attributes.withColumn("year", year_extract_udf(players_attributes.date))

In [22]:
players_attributes.show()

+---+------------------+-------------+-------------------+--------------+---------+--------------+-------------------+-------------------+--------+---------+----------------+-------------+-------+---------+-----+------------------+------------+------------+------------+------------+-------+---------+-------+----------+-------+-------+--------+----------+----------+-------------+-----------+------+---------+-------+---------------+--------------+---------+-----------+----------+--------------+-----------+----+
| id|player_fifa_api_id|player_api_id|               date|overall_rating|potential|preferred_foot|attacking_work_rate|defensive_work_rate|crossing|finishing|heading_accuracy|short_passing|volleys|dribbling|curve|free_kick_accuracy|long_passing|ball_control|acceleration|sprint_speed|agility|reactions|balance|shot_power|jumping|stamina|strength|long_shots|aggression|interceptions|positioning|vision|penalties|marking|standing_tackle|sliding_tackle|gk_diving|gk_handling|gk_kicking|gk_

In [23]:
players_attributes.date

Column<b'date'>

In [24]:
players_attributes.drop(players_attributes.date)

DataFrame[id: string, player_fifa_api_id: string, player_api_id: string, overall_rating: string, potential: string, preferred_foot: string, attacking_work_rate: string, defensive_work_rate: string, crossing: string, finishing: string, heading_accuracy: string, short_passing: string, volleys: string, dribbling: string, curve: string, free_kick_accuracy: string, long_passing: string, ball_control: string, acceleration: string, sprint_speed: string, agility: string, reactions: string, balance: string, shot_power: string, jumping: string, stamina: string, strength: string, long_shots: string, aggression: string, interceptions: string, positioning: string, vision: string, penalties: string, marking: string, standing_tackle: string, sliding_tackle: string, gk_diving: string, gk_handling: string, gk_kicking: string, gk_positioning: string, gk_reflexes: string, year: string]

In [26]:
players_attributes.columns

['id',
 'player_fifa_api_id',
 'player_api_id',
 'date',
 'overall_rating',
 'potential',
 'preferred_foot',
 'attacking_work_rate',
 'defensive_work_rate',
 'crossing',
 'finishing',
 'heading_accuracy',
 'short_passing',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'sprint_speed',
 'agility',
 'reactions',
 'balance',
 'shot_power',
 'jumping',
 'stamina',
 'strength',
 'long_shots',
 'aggression',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes',
 'year']

In [28]:
# find best striker
pa_2016 = players_attributes.filter(players_attributes.year == 2016)

In [30]:
pa_2016.count()

14103

In [31]:
pa_2016.select(pa_2016.player_api_id).distinct().count()

5586

In [38]:
pa_striker_2016 =  pa_2016.groupBy('player_api_id').agg({'finishing':'avg',
                                                        'shot_power': 'avg',
                                                       'acceleration':'avg'})

In [39]:
pa_striker_2016.show()

+-------------+-----------------+------------------+------------------+
|player_api_id|   avg(finishing)| avg(acceleration)|   avg(shot_power)|
+-------------+-----------------+------------------+------------------+
|       309726|75.44444444444444| 74.11111111111111|              76.0|
|        26112|             53.0|              51.0|              76.0|
|        38433|            68.25|              74.0|              74.0|
|       295060|             25.0|              62.0|              40.0|
|       161396|             29.0|              72.0|              69.0|
|        37774|             61.0|              64.0|              68.0|
|        41157|             81.0|              87.0|              80.0|
|        40740|             58.0|              73.5|              75.0|
|        31432|             14.0|              59.0|              65.0|
|       109653|             62.0|              65.0|              83.5|
|       282680|             12.0|              33.0|            

In [44]:
pa_striker_2016  = pa_striker_2016.withColumnRenamed("avg(finishing)","finishing")\
                                  .withColumnRenamed("avg(acceleration)","acceleration")\
                                  .withColumnRenamed("avg(shot_power)","shot_power")

In [45]:
pa_striker_2016.show()

+-------------+-----------------+------------------+------------------+
|player_api_id|        finishing|      acceleration|        shot_power|
+-------------+-----------------+------------------+------------------+
|       309726|75.44444444444444| 74.11111111111111|              76.0|
|        26112|             53.0|              51.0|              76.0|
|        38433|            68.25|              74.0|              74.0|
|       295060|             25.0|              62.0|              40.0|
|       161396|             29.0|              72.0|              69.0|
|        37774|             61.0|              64.0|              68.0|
|        41157|             81.0|              87.0|              80.0|
|        40740|             58.0|              73.5|              75.0|
|        31432|             14.0|              59.0|              65.0|
|       109653|             62.0|              65.0|              83.5|
|       282680|             12.0|              33.0|            

In [47]:
weight_finishing = 1
weight_shot_power = 2
weight_acceleration = 1

total_weight = weight_finishing + weight_shot_power + weight_acceleration

In [48]:
strikers = pa_striker_2016.withColumn('striker_grade',
                                     (pa_striker_2016.finishing * weight_finishing +
                                     pa_striker_2016.shot_power * weight_shot_power +
                                     pa_striker_2016.acceleration * weight_acceleration * weight_acceleration) / total_weight)

In [49]:
strikers.show()

+-------------+-----------------+------------------+------------------+-----------------+
|player_api_id|        finishing|      acceleration|        shot_power|    striker_grade|
+-------------+-----------------+------------------+------------------+-----------------+
|       309726|75.44444444444444| 74.11111111111111|              76.0|75.38888888888889|
|        26112|             53.0|              51.0|              76.0|             64.0|
|        38433|            68.25|              74.0|              74.0|          72.5625|
|       295060|             25.0|              62.0|              40.0|            41.75|
|       161396|             29.0|              72.0|              69.0|            59.75|
|        37774|             61.0|              64.0|              68.0|            65.25|
|        41157|             81.0|              87.0|              80.0|             82.0|
|        40740|             58.0|              73.5|              75.0|           70.375|
|        3

In [53]:
strikers

DataFrame[player_api_id: string, finishing: double, acceleration: double, shot_power: double, striker_grade: double]

In [57]:
strikers = strikers.filter(strikers.striker_grade >  70).sort(strikers.striker_grade.desc())

In [58]:
strikers.show()

+-------------+-----------------+------------+-----------------+-----------------+
|player_api_id|        finishing|acceleration|       shot_power|    striker_grade|
+-------------+-----------------+------------+-----------------+-----------------+
|        20276|             85.0|        84.0|             94.0|            89.25|
|        37412|             90.0|        92.0|             87.0|             89.0|
|        38817|             88.0|        90.0|             88.5|            88.75|
|        32118|             85.0|        82.0|             93.0|            88.25|
|        31921|             81.0|        93.0|             87.0|             87.0|
|        30834|             85.0|        90.0|             86.0|            86.75|
|       303824|73.42857142857143|        91.0|             88.0|85.10714285714286|
|       129944|             83.0|        89.0|             84.0|             85.0|
|       158263|             77.0|        90.0|             86.0|            84.75|
|   

In [60]:
strikers.count(), players.count()

(1609, 11060)

In [66]:
striker_details = players.join(strikers, players.player_api_id == strikers.player_api_id) #inner join

In [63]:
striker_details.show()

+----+-------------+--------------------+------------------+-------------------+------+------+-------------+-----------------+------------+-----------------+-----------------+
|  id|player_api_id|         player_name|player_fifa_api_id|           birthday|height|weight|player_api_id|        finishing|acceleration|       shot_power|    striker_grade|
+----+-------------+--------------------+------------------+-------------------+------+------+-------------+-----------------+------------+-----------------+-----------------+
|4283|        20276|                Hulk|            189362|1986-07-25 00:00:00|180.34|   187|        20276|             85.0|        84.0|             94.0|            89.25|
|9674|        37412|       Sergio Aguero|            153079|1988-06-02 00:00:00|172.72|   163|        37412|             90.0|        92.0|             87.0|             89.0|
|1581|        38817|        Carlos Tevez|            143001|1984-02-05 00:00:00|172.72|   157|        38817|            

In [67]:
striker_details.columns

['id',
 'player_api_id',
 'player_name',
 'player_fifa_api_id',
 'birthday',
 'height',
 'weight',
 'player_api_id',
 'finishing',
 'acceleration',
 'shot_power',
 'striker_grade']

In [68]:
striker_details.count()

1609

In [69]:
# different syntax 
striker_details = players.join(strikers, ['player_api_id'])

In [78]:
striker_details = striker_details.drop('finishing')
striker_details =striker_details.drop('acceleration')
striker_details =striker_details.drop('shot_power')



striker_details.show(5)


+-------------+----+--------------+------------------+-------------------+------+------+-------------+
|player_api_id|  id|   player_name|player_fifa_api_id|           birthday|height|weight|striker_grade|
+-------------+----+--------------+------------------+-------------------+------+------+-------------+
|        20276|4283|          Hulk|            189362|1986-07-25 00:00:00|180.34|   187|        89.25|
|        37412|9674| Sergio Aguero|            153079|1988-06-02 00:00:00|172.72|   163|         89.0|
|        38817|1581|  Carlos Tevez|            143001|1984-02-05 00:00:00|172.72|   157|        88.75|
|        32118|6400|Lukas Podolski|            150516|1985-06-04 00:00:00|182.88|   183|        88.25|
|        31921|3660|   Gareth Bale|            173731|1989-07-16 00:00:00|182.88|   163|         87.0|
+-------------+----+--------------+------------------+-------------------+------+------+-------------+
only showing top 5 rows



In [79]:
# heavy dataset joing use brodcast variables

In [80]:
from pyspark.sql.functions import broadcast

In [81]:
striker_details = players.select('player_api_id', 'player_name').join(broadcast(strikers), ['player_api_id'], 'inner')

In [82]:
striker_details.show(5) # broadcast smaller dataframes 

+-------------+-----------------+---------+------------+----------+-------------+
|player_api_id|      player_name|finishing|acceleration|shot_power|striker_grade|
+-------------+-----------------+---------+------------+----------+-------------+
|        27316|       Aaron Hunt|     72.0|        75.0|      76.0|        74.75|
|        40719|     Aaron Niguez|     67.0|        82.0|      74.0|        74.25|
|        75489|     Aaron Ramsey|     75.0|        70.5|      81.0|       76.875|
|       120919|Aatif Chahechouhe|     76.0|        80.0|      78.0|         78.0|
|        67334|Abdoul Karim Yoda|     70.0|        84.0|      71.0|         74.0|
+-------------+-----------------+---------+------------+----------+-------------+
only showing top 5 rows



In [83]:
striker_details = striker_details.sort(striker_details.striker_grade.desc())

In [84]:
striker_details.show(5)

+-------------+--------------+---------+------------+----------+-------------+
|player_api_id|   player_name|finishing|acceleration|shot_power|striker_grade|
+-------------+--------------+---------+------------+----------+-------------+
|        20276|          Hulk|     85.0|        84.0|      94.0|        89.25|
|        37412| Sergio Aguero|     90.0|        92.0|      87.0|         89.0|
|        38817|  Carlos Tevez|     88.0|        90.0|      88.5|        88.75|
|        32118|Lukas Podolski|     85.0|        82.0|      93.0|        88.25|
|        31921|   Gareth Bale|     81.0|        93.0|      87.0|         87.0|
+-------------+--------------+---------+------------+----------+-------------+
only showing top 5 rows



# Accumulators

In [86]:
players.count(), players_attributes.count()

(11060, 183978)

In [89]:
players_heading_acc = players_attributes.select('player_api_id', 'heading_accuracy')\
.join(broadcast(players), players_attributes.player_api_id == players.player_api_id)

In [90]:
players_heading_acc.show()

+-------------+----------------+---+-------------+------------------+------------------+-------------------+------+------+
|player_api_id|heading_accuracy| id|player_api_id|       player_name|player_fifa_api_id|           birthday|height|weight|
+-------------+----------------+---+-------------+------------------+------------------+-------------------+------+------+
|       505942|              71|  1|       505942|Aaron Appindangoye|            218353|1992-02-29 00:00:00|182.88|   187|
|       505942|              71|  1|       505942|Aaron Appindangoye|            218353|1992-02-29 00:00:00|182.88|   187|
|       505942|              71|  1|       505942|Aaron Appindangoye|            218353|1992-02-29 00:00:00|182.88|   187|
|       505942|              70|  1|       505942|Aaron Appindangoye|            218353|1992-02-29 00:00:00|182.88|   187|
|       505942|              70|  1|       505942|Aaron Appindangoye|            218353|1992-02-29 00:00:00|182.88|   187|
|       155782| 

In [92]:
players_heading_acc.columns

['player_api_id',
 'heading_accuracy',
 'id',
 'player_api_id',
 'player_name',
 'player_fifa_api_id',
 'birthday',
 'height',
 'weight']

In [95]:
short_count = spark.sparkContext.accumulator(0)
medium_low_count = spark.sparkContext.accumulator(0)
medium_high_count = spark.sparkContext.accumulator(0)
tall_count = spark.sparkContext.accumulator(0)

In [96]:
def count_players_by_height(row):
    height = float(row.height)
    
    if (height <175):
        short_count.add(1)
    elif (height <= 183 and height > 175):
        medium_low_count.add(1)
    elif (height <= 195 and height > 183):
        medium_high_count.add(1)
    elif (height > 195):
        tall_count.add(1)

In [97]:
players_heading_acc.foreach(lambda x : count_players_by_height(x))

In [99]:
all_players = [short_count.value, 
            medium_low_count.value,
            medium_high_count.value,
            tall_count.value]

In [119]:
all_players

AnalysisException: "Reference 'player_api_id' is ambiguous, could be: player_api_id, player_api_id.;"

In [105]:
# heading accuracy
short_ha_count = spark.sparkContext.accumulator(0)
medium_low_ha_count = spark.sparkContext.accumulator(0)
medium_high_ha_count = spark.sparkContext.accumulator(0)
tall_ha_count = spark.sparkContext.accumulator(0)

In [122]:
def count_players_by_height_heading_accuracy(row,threshold_score):
    print(row.show())
    return
    height = float(row.height)
    print(row.heading_accuracy)
    ha = float(row.heading_accuracy)
    
    if ha < threshold_score:
        return

    
    if (height <175):
        short_ha_count.add(1)
    elif (height <= 183 and height > 175):
        medium_low_ha_count.add(1)
    elif (height <= 195 and height > 183):
        medium_high_ha_count.add(1)
    elif (height > 195):
        tall_ha_count.add(1)

In [125]:
players_heading_acc.foreach(lambda x : count_players_by_height_heading_accuracy(x, 60))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 133.0 failed 1 times, most recent failure: Lost task 2.0 in stage 133.0 (TID 5986, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 1527, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'show' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/pyspark/rdd.py", line 352, in func
    return f(iterator)
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/pyspark/rdd.py", line 787, in processPartition
    f(x)
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-125-b07319efe403>", line 1, in <lambda>
  File "<ipython-input-122-cabe873c41b1>", line 2, in count_players_by_height_heading_accuracy
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 1532, in __getattr__
    raise AttributeError(item)
AttributeError: show

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 1527, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'show' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/pyspark/rdd.py", line 352, in func
    return f(iterator)
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/pyspark/rdd.py", line 787, in processPartition
    f(x)
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-125-b07319efe403>", line 1, in <lambda>
  File "<ipython-input-122-cabe873c41b1>", line 2, in count_players_by_height_heading_accuracy
  File "/home/sudeep/program files/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 1532, in __getattr__
    raise AttributeError(item)
AttributeError: show

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [118]:
players_heading_acc.select('heading_accuracy').show()
players_heading_acc.printSchema()

+----------------+
|heading_accuracy|
+----------------+
|              71|
|              71|
|              71|
|              70|
|              70|
|              58|
|              58|
|              57|
|              57|
|              57|
|              57|
|              57|
|              57|
|              56|
|              56|
|              56|
|              51|
|              51|
|              51|
|              51|
+----------------+
only showing top 20 rows

root
 |-- player_api_id: string (nullable = true)
 |-- heading_accuracy: string (nullable = true)
 |-- id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- height: string (nullable = true)
 |-- weight: string (nullable = true)



In [124]:
# saving to json
pa_2016.columns


['id',
 'player_fifa_api_id',
 'player_api_id',
 'date',
 'overall_rating',
 'potential',
 'preferred_foot',
 'attacking_work_rate',
 'defensive_work_rate',
 'crossing',
 'finishing',
 'heading_accuracy',
 'short_passing',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'sprint_speed',
 'agility',
 'reactions',
 'balance',
 'shot_power',
 'jumping',
 'stamina',
 'strength',
 'long_shots',
 'aggression',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes',
 'year']

In [126]:
pa_2016.select("player_api_id", "overall_rating").coalesce(1).write.option("header", "true").csv("players_overall.csv")

In [127]:
pa_2016.select("player_api_id", "overall_rating").write.json("players_overall.json")