In [1]:
import findspark
findspark.init()

In [89]:
from pyspark.sql import SparkSession
from pyspark.sql.types import * 
from pyspark.sql.functions import count, sum, expr, col, row_number
import pyspark

In [3]:
sc = pyspark.SparkContext()
spark = SparkSession.builder.getOrCreate()

22/11/10 18:12:13 WARN Utils: Your hostname, NB00WWG059 resolves to a loopback address: 127.0.1.1; using 172.30.180.26 instead (on interface eth0)
22/11/10 18:12:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/10 18:12:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [91]:
# Load the data

player_name = spark.read.csv('Master.csv', inferSchema=True, header=True)
player_award = spark.read.csv('AwardsPlayers.csv', inferSchema = True, header=True)
player_goal = spark.read.csv('Scoring.csv', inferSchema = True, header=True)
player_team = spark.read.csv('Teams.csv', inferSchema = True, header=True)

sources = [fullnames_df, goals_df, awards_df, teams_df]

for i in sources:
    i.printSchema()  

root
 |-- playerID: string (nullable = true)
 |-- fullName: string (nullable = true)

root
 |-- playerID: string (nullable = true)
 |-- goals: integer (nullable = true)
 |-- teamID: string (nullable = true)

root
 |-- playerID: string (nullable = true)
 |-- award: string (nullable = true)

root
 |-- teamID: string (nullable = true)
 |-- teamName: string (nullable = true)



In [80]:
# Select particular columns

fullnames_df = player_name\
    .select('playerID', expr("concat(firstName, ' ', lastName) as fullName"))

goals_df = player_goal\
    .select('playerID', 'stint', 'tmID')\
    .withColumnRenamed('stint', 'goals')\
    .withColumnRenamed('tmID', 'teamID')

awards_df = player_award.select('playerID', 'award')

teams_df = player_team\
    .select('tmID', 'name')\
    .withColumnRenamed('tmID', 'teamID')\
    .withColumnRenamed('name', 'teamName')\
    .distinct()

In [95]:
awards = awards_df\
    .groupBy('playerID')\
    .agg(count('award').alias('number_of_awards'))

awards.toPandas()

Unnamed: 0,playerID,number_of_awards
0,macouja01,1
1,oatesad01,1
2,henriad01,1
3,kennete01,4
4,mcsorma01,1
...,...,...
641,vailer01,1
642,francro01,6
643,sjobela01,2
644,robinea01,1


In [92]:
goals = goals_df\
    .groupBy('playerID', 'teamID')\
    .agg(sum('goals').alias('number_of_goals'))

goals.toPandas()

Unnamed: 0,playerID,teamID,number_of_goals
0,baldehe01,MNS,1
1,boudran01,MNS,2
2,boughbo01,CAR,1
3,bragnri01,WAS,4
4,brookwa01,BOS,1
...,...,...,...
18276,vailer01,DET,2
18277,vernomi01,FLO,2
18278,wilkiba01,PIT,3
18279,wilsodu01,NYR,3


In [101]:
result_df = goals\
    .join(awards, on='playerID', how='inner')\
    .join(fullnames_df, on='playerID', how='inner')\
    .join(teams_df, on='teamID', how='inner')\
    .select('playerID', 'fullName', 'number_of_goals', 'number_of_awards', 'teamName')\
    .orderBy('number_of_goals', ascending = False)\
    .limit(10)

result_df.toPandas()

Unnamed: 0,playerID,fullName,number_of_goals,number_of_awards,teamName
0,howego01,Gordie Howe,25,36,Detroit Red Wings
1,delveal01,Alex Delvecchio,24,5,Detroit Red Wings
2,yzermst01,Steve Yzerman,22,6,Detroit Red Wings
3,mikitst01,Stan Mikita,22,16,Chicago Blackhawks
4,mikitst01,Stan Mikita,22,16,Chicago Black Hawks
5,bucykjo01,John Bucyk,21,4,Boston Bruins
6,bourqra01,Raymond Bourque,21,26,Boston Bruins
7,hortoti01,Tim Horton,20,6,Toronto Maple Leafs
8,daneyke01,Ken Daneyko,20,1,New Jersey Devils
9,richahe01,Henri Richard,20,5,Montreal Canadiens


In [103]:
result_df.write.csv('result.csv', header=True, sep=',')

In [109]:
result_df.write.parquet('result.parquet')