In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("soccer_player").getOrCreate()


In [6]:
df = spark.read.format("csv").option("inferSchema", True).option("header",True).load("player.csv")

In [8]:
df.show()
df.printSchema()

+---+-------------+--------------------+------------------+-------------------+------+------+
| id|player_api_id|         player_name|player_fifa_api_id|           birthday|height|weight|
+---+-------------+--------------------+------------------+-------------------+------+------+
|  1|       505942|  Aaron Appindangoye|            218353|1992-02-29 00:00:00|182.88|   187|
|  2|       155782|     Aaron Cresswell|            189615|1989-12-15 00:00:00|170.18|   146|
|  3|       162549|         Aaron Doran|            186170|1991-05-13 00:00:00|170.18|   163|
|  4|        30572|       Aaron Galindo|            140161|1982-05-08 00:00:00|182.88|   198|
|  5|        23780|        Aaron Hughes|             17725|1979-11-08 00:00:00|182.88|   154|
|  6|        27316|          Aaron Hunt|            158138|1986-09-04 00:00:00|182.88|   161|
|  7|       564793|          Aaron Kuhl|            221280|1996-01-30 00:00:00|172.72|   146|
|  8|        30895|        Aaron Lennon|            152747|1

In [13]:
player_attr_df = spark.read.format("csv").option("header",True).option("inferSchema",True).load("player_attributes.csv")

In [15]:
player_attr_df.show(5)
player_attr_df.printSchema()

+---+------------------+-------------+-------------------+--------------+---------+--------------+-------------------+-------------------+--------+---------+----------------+-------------+-------+---------+-----+------------------+------------+------------+------------+------------+-------+---------+-------+----------+-------+-------+--------+----------+----------+-------------+-----------+------+---------+-------+---------------+--------------+---------+-----------+----------+--------------+-----------+
| id|player_fifa_api_id|player_api_id|               date|overall_rating|potential|preferred_foot|attacking_work_rate|defensive_work_rate|crossing|finishing|heading_accuracy|short_passing|volleys|dribbling|curve|free_kick_accuracy|long_passing|ball_control|acceleration|sprint_speed|agility|reactions|balance|shot_power|jumping|stamina|strength|long_shots|aggression|interceptions|positioning|vision|penalties|marking|standing_tackle|sliding_tackle|gk_diving|gk_handling|gk_kicking|gk_posit

In [17]:
player_attr_df.count()

183978

In [18]:
df.count()

11060

In [47]:
df = df.drop('id','player_fifa_api_id')

In [48]:
player_attr_df = player_attr_df.drop('id','player_fifa_api_id','preferred_foot','attacking_work_rate','defensive_work_rate','crossing','jumping','sprint_speed','balance','aggression','short_passing','potential')

In [49]:
df.dropna()

DataFrame[player_api_id: int, player_name: string, birthday: timestamp, height: double, weight: int]

In [50]:
player_attr_df.dropna()

DataFrame[player_api_id: int, overall_rating: int, finishing: int, heading_accuracy: int, volleys: int, dribbling: int, curve: int, free_kick_accuracy: int, long_passing: int, ball_control: int, acceleration: int, agility: int, reactions: int, shot_power: int, stamina: int, strength: int, long_shots: int, interceptions: int, positioning: int, vision: int, penalties: int, marking: int, standing_tackle: int, sliding_tackle: int, gk_diving: int, gk_handling: int, gk_kicking: int, gk_positioning: int, gk_reflexes: int, year: string]

In [51]:
from pyspark.sql.functions import udf
year_extract_udf = udf(lambda date : date.year)


In [52]:
player_attr_df = player_attr_df.withColumn('year',year_extract_udf(player_attr_df.date))

AttributeError: 'DataFrame' object has no attribute 'date'

In [53]:
player_attr_df.show(2)

+-------------+--------------+---------+----------------+-------+---------+-----+------------------+------------+------------+------------+-------+---------+----------+-------+--------+----------+-------------+-----------+------+---------+-------+---------------+--------------+---------+-----------+----------+--------------+-----------+----+
|player_api_id|overall_rating|finishing|heading_accuracy|volleys|dribbling|curve|free_kick_accuracy|long_passing|ball_control|acceleration|agility|reactions|shot_power|stamina|strength|long_shots|interceptions|positioning|vision|penalties|marking|standing_tackle|sliding_tackle|gk_diving|gk_handling|gk_kicking|gk_positioning|gk_reflexes|year|
+-------------+--------------+---------+----------------+-------+---------+-----+------------------+------------+------------+------------+-------+---------+----------+-------+--------+----------+-------------+-----------+------+---------+-------+---------------+--------------+---------+-----------+----------+-

In [54]:
player_attr_df = player_attr_df.drop('date')

In [55]:
player_attr_df.columns

['player_api_id',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes',
 'year']

In [56]:
## find best strikers based on some of the attributes
## then join with player df to get names of the players
## how to find best striker :- from recent data 2016 year {finishing : avg, shot_power : avg, acceleration : avg}
## weitage for considering each attribute for best striker is weight_finish=1,weight_shot_power=2,weight_acceleration=1
## total weight = sum of all weights

In [57]:
filtered_plyr_attr = player_attr_df.filter(player_attr_df['year'] == 2016)

In [58]:
filtered_plyr_attr.count()

14103

In [59]:
player_attr_df.count()

183978

In [60]:
#get distinct players count
filtered_plyr_attr.select('player_api_id').distinct().count()

5586

In [61]:
#group each player attr and find avg of each selecting attribute
filtered_plyr_attr = filtered_plyr_attr.groupBy('player_api_id').agg({'finishing':'avg','shot_power':'avg','acceleration':'avg'})

In [70]:
filtered_plyr_attr.count()
filtered_plyr_attr = filtered_plyr_attr.withColumnRenamed('avg(finishing)','finishing').withColumnRenamed('avg(shot_power)','shot_power').withColumnRenamed('avg(acceleration)','acceleration')

In [72]:
## finding score for each player
weight_finish=1
weight_shot_power=2
weight_acceleration=1
total_weight = weight_finish+weight_shot_power+weight_acceleration
filtered_plyr_attr.printSchema()

root
 |-- player_api_id: integer (nullable = true)
 |-- finishing: double (nullable = true)
 |-- acceleration: double (nullable = true)
 |-- shot_power: double (nullable = true)



In [75]:
filtered_plyr_attr = filtered_plyr_attr.withColumn('score',(filtered_plyr_attr['finishing']*weight_finish + filtered_plyr_attr['acceleration']*weight_acceleration+filtered_plyr_attr['shot_power']*weight_shot_power)/total_weight)

In [76]:
filtered_plyr_attr.show(2)

+-------------+---------+------------+----------+-----+
|player_api_id|finishing|acceleration|shot_power|score|
+-------------+---------+------------+----------+-----+
|       114503|     78.0|        77.0|      75.0|76.25|
|       171094|     39.0|        65.0|      36.0| 44.0|
+-------------+---------+------------+----------+-----+
only showing top 2 rows



In [87]:
striker_details = filtered_plyr_attr.drop('finishing','acceleration','shot_power')

In [88]:
striker_details.show(2)

+-------------+-----+-------------+-------------------+------+------+
|player_api_id|score|  player_name|           birthday|height|weight|
+-------------+-----+-------------+-------------------+------+------+
|        20276|89.25|         Hulk|1986-07-25 00:00:00|180.34|   187|
|        37412| 89.0|Sergio Aguero|1988-06-02 00:00:00|172.72|   163|
+-------------+-----+-------------+-------------------+------+------+
only showing top 2 rows



In [89]:
striker_details = striker_details.filter(filtered_plyr_attr['score'] > 70)

In [90]:
best_strikers = striker_details.join(df,['player_api_id']).sort(striker_details['score'].desc())

In [91]:
best_strikers.show(2)


+-------------+-----+-------------+-------------------+------+------+-------------+-------------------+------+------+
|player_api_id|score|  player_name|           birthday|height|weight|  player_name|           birthday|height|weight|
+-------------+-----+-------------+-------------------+------+------+-------------+-------------------+------+------+
|        20276|89.25|         Hulk|1986-07-25 00:00:00|180.34|   187|         Hulk|1986-07-25 00:00:00|180.34|   187|
|        37412| 89.0|Sergio Aguero|1988-06-02 00:00:00|172.72|   163|Sergio Aguero|1988-06-02 00:00:00|172.72|   163|
+-------------+-----+-------------+-------------------+------+------+-------------+-------------------+------+------+
only showing top 2 rows



In [97]:
# performing join by broadcasting dataframes.
from pyspark.sql.functions import broadcast
best_strikers_1 = df.select(['player_api_id','player_name']).join(broadcast(striker_details),['player_api_id'],'inner')

In [100]:
best_strikers_1.sort(striker_details.score.desc()).show(2)

+-------------+-------------+-----+-------------+-------------------+------+------+
|player_api_id|  player_name|score|  player_name|           birthday|height|weight|
+-------------+-------------+-----+-------------+-------------------+------+------+
|        20276|         Hulk|89.25|         Hulk|1986-07-25 00:00:00|180.34|   187|
|        37412|Sergio Aguero| 89.0|Sergio Aguero|1988-06-02 00:00:00|172.72|   163|
+-------------+-------------+-----+-------------+-------------------+------+------+
only showing top 2 rows



In [102]:
# check if height of player results in better heading accuracy
player_attr_df.count(), df.count()

(183978, 11060)

In [103]:
# join to get height and heading_accuracy
player_heading_acc = player_attr_df.select('player_api_id','heading_accuracy').join(broadcast(df),['player_api_id'],'inner')

In [120]:
player_heading_acc.printSchema()
player_heading_acc.count()

player_heading_acc = player_heading_acc.dropna()

root
 |-- player_api_id: integer (nullable = true)
 |-- heading_accuracy: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- birthday: timestamp (nullable = true)
 |-- height: double (nullable = true)
 |-- weight: integer (nullable = true)



In [121]:
# categorize each player into different height bucket
short_count = spark.sparkContext.accumulator(0)
medium_short_count = spark.sparkContext.accumulator(0)
medium_high_count = spark.sparkContext.accumulator(0)
tall_count = spark.sparkContext.accumulator(0)

# accumulator for heading accuracy count ha
short_ha_count = spark.sparkContext.accumulator(0)
medium_short_ha_count = spark.sparkContext.accumulator(0)
medium_high_ha_count = spark.sparkContext.accumulator(0)
tall_ha_count = spark.sparkContext.accumulator(0)

In [122]:
def categorize (row, threashold_score):
    height = float(row.height)
    ha = float(row.heading_accuracy)
    
    if(height <= 175 ):
        short_count.add(1)
        if(ha > threashold_score):
            short_ha_count.add(1)
            
    if(height <= 183 and height > 175 ):
        medium_short_count.add(1)
        if(ha > threashold_score):
            medium_short_ha_count.add(1)
            
    if(height <= 195 and height > 183):
        medium_high_count.add(1)
        if(ha > threashold_score):
            medium_high_ha_count.add(1)
            
    if(height > 195 ):
        tall_count.add(1)
        if(ha > threashold_score):
            tall_ha_count.add(1)


In [123]:
player_heading_acc.foreach(lambda x : categorize(x,60))

In [125]:
total_acc = [short_count,medium_short_count,medium_high_count,tall_count]

In [126]:
total_acc

[Accumulator<id=12, value=19128>,
 Accumulator<id=13, value=98541>,
 Accumulator<id=14, value=62081>,
 Accumulator<id=15, value=3392>]

In [127]:
total_acc_ha = [short_ha_count,medium_short_ha_count,medium_high_ha_count,tall_ha_count]

In [128]:
total_acc_ha

[Accumulator<id=16, value=3694>,
 Accumulator<id=17, value=42023>,
 Accumulator<id=18, value=40634>,
 Accumulator<id=19, value=1578>]

In [136]:
# find the percent of accuracy
player_heading_acc.printSchema()
percent_with_value = [(short_ha_count.value/short_count.value)*100 , (medium_short_ha_count.value/medium_short_count.value)*100, (medium_high_ha_count.value/medium_high_count.value)*100,(tall_ha_count.value/tall_count.value)*100 ]

root
 |-- player_api_id: integer (nullable = true)
 |-- heading_accuracy: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- birthday: timestamp (nullable = true)
 |-- height: double (nullable = true)
 |-- weight: integer (nullable = true)



In [137]:
percent_with_value

[19.312003345880385, 42.64519337128708, 65.45319824100771, 46.52122641509434]

In [138]:
# saving data as csv or json
plyr_2016 = player_attr_df.filter(player_attr_df['year'] == 2016)


In [145]:
plyr_2016.select('player_api_id','overall_rating').coalesce(1).write.option('header',True).csv('player_2016.csv')

In [146]:
plyr_2016.select('player_api_id','overall_rating').write.option('header',True).json('player_2016.json')

In [150]:
## sql query on data spark
from pyspark.sql.types import Row
from datetime import datetime

record = sc.parallelize([Row(id=1,
                            name ="jill",
                            active= True,
                            clubs=['hockey','chess'],
                            subjects={'math':80,'english':56},
                            enrolled=datetime(2014,8,1,14,1,5)),
                        Row(id=2,
                            name="George",
                            active= False,
                            clubs=['soccer','chess'],
                            subjects={'math':60,'english':96},
                            enrolled=datetime(2015,3,21,8,2,5))])

In [151]:
record_df = record.toDF()

In [152]:
record_df.printSchema()

root
 |-- active: boolean (nullable = true)
 |-- clubs: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- enrolled: timestamp (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- subjects: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = true)



In [153]:
record_df.show()

+------+---------------+-------------------+---+------+--------------------+
|active|          clubs|           enrolled| id|  name|            subjects|
+------+---------------+-------------------+---+------+--------------------+
|  true|[hockey, chess]|2014-08-01 14:01:05|  1|  jill|Map(english -> 56...|
| false|[soccer, chess]|2015-03-21 08:02:05|  2|George|Map(english -> 96...|
+------+---------------+-------------------+---+------+--------------------+



In [154]:
record_df.createOrReplaceTempView('records')

In [155]:
all_records_df = sqlContext.sql('select * from records')

In [156]:
all_records_df.show()

+------+---------------+-------------------+---+------+--------------------+
|active|          clubs|           enrolled| id|  name|            subjects|
+------+---------------+-------------------+---+------+--------------------+
|  true|[hockey, chess]|2014-08-01 14:01:05|  1|  jill|Map(english -> 56...|
| false|[soccer, chess]|2015-03-21 08:02:05|  2|George|Map(english -> 96...|
+------+---------------+-------------------+---+------+--------------------+



In [158]:
sqlContext.sql("select id, clubs[1],subjects['english'] from records").show()

+---+--------+-----------------+
| id|clubs[1]|subjects[english]|
+---+--------+-----------------+
|  1|   chess|               56|
|  2|   chess|               96|
+---+--------+-----------------+



In [159]:
sqlContext.sql("select * from records where active = True").show()

+------+---------------+-------------------+---+----+--------------------+
|active|          clubs|           enrolled| id|name|            subjects|
+------+---------------+-------------------+---+----+--------------------+
|  true|[hockey, chess]|2014-08-01 14:01:05|  1|jill|Map(english -> 56...|
+------+---------------+-------------------+---+----+--------------------+



In [160]:
sqlContext.sql("select id, NOT active from records").show()

+---+------------+
| id|(NOT active)|
+---+------------+
|  1|       false|
|  2|        true|
+---+------------+



In [161]:
# to access a table on all spark sessions in this cluster, register table as global table
record_df.createGlobalTempView('global_records')

In [164]:
sqlContext.sql("select * from global_temp.global_records").show()

+------+---------------+-------------------+---+------+--------------------+
|active|          clubs|           enrolled| id|  name|            subjects|
+------+---------------+-------------------+---+------+--------------------+
|  true|[hockey, chess]|2014-08-01 14:01:05|  1|  jill|Map(english -> 56...|
| false|[soccer, chess]|2015-03-21 08:02:05|  2|George|Map(english -> 96...|
+------+---------------+-------------------+---+------+--------------------+



In [169]:
# processing airline data with sql
airline = spark.read.format('csv').option('header',True).option('inferSchema',True).load('airlines.csv')
flights = spark.read.format('csv').option('header',True).option('inferSchema',True).load('flights.csv')

In [170]:
airline.createOrReplaceTempView('airlines')
flights.createOrReplaceTempView('flights')

In [173]:
sqlContext.sql("select * from airlines").show()

+-----+--------------------+
| Code|         Description|
+-----+--------------------+
|19031|Mackey Internatio...|
|19032|Munz Northern Air...|
|19033|Cochise Airlines ...|
|19034|Golden Gate Airli...|
|19035|  Aeromech Inc.: RZZ|
|19036|Golden West Airli...|
|19037|Puerto Rico Intl ...|
|19038|Air America Inc.:...|
|19039|Swift Aire Lines ...|
|19040|American Central ...|
|19041|Valdez Airlines: VEZ|
|19042|Southeast Alaska ...|
|19043|Altair Airlines I...|
|19044|Chitina Air Servi...|
|19045|Marco Island Airw...|
|19046|Caribbean Air Ser...|
|19047|Sundance Airlines...|
|19048|Seair Alaska Airl...|
|19049|Southeast Airline...|
|19050|Alaska Aeronautic...|
+-----+--------------------+
only showing top 20 rows



In [174]:
flights.count(),airline.count()

(476881, 1579)

In [175]:
sqlContext.sql("select count(*) from airlines").show()

+--------+
|count(1)|
+--------+
|    1579|
+--------+



In [181]:
spark.sql("select distance from flights").agg({'distance':'sum'}).withColumnRenamed('sum(distance)', 'total').show()

+------------+
|       total|
+------------+
|3.79052917E8|
+------------+



In [183]:
# get all delayed flight in 2012
spark.sql("select date, airlines, flight_number from flights where departure_delay > 0 and year(date) = 2012").show()

+----+--------+-------------+
|date|airlines|flight_number|
+----+--------+-------------+
+----+--------+-------------+



In [185]:
# get all delayed flight in 2014
spark.sql("select date, airlines, flight_number, date from flights where departure_delay > 0 and year(date) = 2014").show()

+-------------------+--------+-------------+-------------------+
|               date|airlines|flight_number|               date|
+-------------------+--------+-------------+-------------------+
|2014-04-01 00:00:00|   19805|            2|2014-04-01 00:00:00|
|2014-04-01 00:00:00|   19805|            4|2014-04-01 00:00:00|
|2014-04-01 00:00:00|   19805|            6|2014-04-01 00:00:00|
|2014-04-01 00:00:00|   19805|            7|2014-04-01 00:00:00|
|2014-04-01 00:00:00|   19805|            8|2014-04-01 00:00:00|
|2014-04-01 00:00:00|   19805|           10|2014-04-01 00:00:00|
|2014-04-01 00:00:00|   19805|           14|2014-04-01 00:00:00|
|2014-04-01 00:00:00|   19805|           16|2014-04-01 00:00:00|
|2014-04-01 00:00:00|   19805|           18|2014-04-01 00:00:00|
|2014-04-01 00:00:00|   19805|           20|2014-04-01 00:00:00|
|2014-04-01 00:00:00|   19805|           24|2014-04-01 00:00:00|
|2014-04-01 00:00:00|   19805|           27|2014-04-01 00:00:00|
|2014-04-01 00:00:00|   1

In [186]:
#inferred and explicit schema
lines = sc.textFile('students.txt')
lines = lines.map(lambda x : x.split(','))

In [187]:
lines

PythonRDD[516] at RDD at PythonRDD.scala:48

In [188]:
lines.collect()

[['Emily', '44', '55', '78'],
 ['Andy', '47', '34', '89'],
 ['Rick', '55', '78', '55'],
 ['Aaron', '66', '34', '98']]

In [199]:
rdd = lines.map(lambda x : Row(name=x[0],math=int(x[1]),science=int(x[2]),english=int(x[3])))

In [200]:
rdd

PythonRDD[535] at RDD at PythonRDD.scala:48

In [201]:
rdd.collect()

[Row(english=78, math=44, name='Emily', science=55),
 Row(english=89, math=47, name='Andy', science=34),
 Row(english=55, math=55, name='Rick', science=78),
 Row(english=98, math=66, name='Aaron', science=34)]

In [202]:
# converting rdd to DF
df = rdd.toDF()

In [203]:
df.printSchema()

root
 |-- english: long (nullable = true)
 |-- math: long (nullable = true)
 |-- name: string (nullable = true)
 |-- science: long (nullable = true)



In [204]:
dfUsingSpark = spark.createDataFrame(rdd)

In [205]:
dfUsingSpark.printSchema()

root
 |-- english: long (nullable = true)
 |-- math: long (nullable = true)
 |-- name: string (nullable = true)
 |-- science: long (nullable = true)



In [206]:
dfUsingSpark.columns


['english', 'math', 'name', 'science']

In [207]:
dfUsingSpark.schema

StructType(List(StructField(english,LongType,true),StructField(math,LongType,true),StructField(name,StringType,true),StructField(science,LongType,true)))

In [208]:
df.schema

StructType(List(StructField(english,LongType,true),StructField(math,LongType,true),StructField(name,StringType,true),StructField(science,LongType,true)))

In [212]:
from pyspark.sql.types import StructType, StructField, StringType, LongType

In [213]:
fields = [StructField("name",StringType(),True),
          StructField('math',LongType(),True), 
          StructField('english',LongType(),True),
          StructField('science',LongType(),True)]

In [215]:
schema = StructType(fields)

In [216]:
rdd.collect()

[Row(english=78, math=44, name='Emily', science=55),
 Row(english=89, math=47, name='Andy', science=34),
 Row(english=55, math=55, name='Rick', science=78),
 Row(english=98, math=66, name='Aaron', science=34)]

In [219]:
withSchema = rdd.toDF(schema)

In [221]:
withSchema.printSchema()

root
 |-- name: string (nullable = true)
 |-- math: long (nullable = true)
 |-- english: long (nullable = true)
 |-- science: long (nullable = true)



In [223]:
spark.createDataFrame(rdd,schema).show()

+-----+----+-------+-------+
| name|math|english|science|
+-----+----+-------+-------+
|Emily|  44|     78|     55|
| Andy|  47|     89|     34|
| Rick|  55|     55|     78|
|Aaron|  66|     98|     34|
+-----+----+-------+-------+

