In [1]:
spark

In [2]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from datetime import datetime, timedelta
import argparse

# Edges

In [6]:
game_plays = sqlContext.read.option("header", "true").option("delimiter", ",").csv("skola")

In [12]:
goals = game_plays.select('play_id').where("playerType = 'Scorer'").where("game_id > 2017090000")

In [31]:
goals_goalie = game_plays.join(goals, game_plays["play_id"] == goals["play_id"], how='right').withColumn('play_id_player',game_plays['play_id']).withColumn('play_id_goalie',game_plays['play_id'])

In [62]:
players = goals_goalie.where("playerType = 'Scorer'").select('play_id_player',F.col('game_id').alias('game_id_player'),F.col('player_id').alias('player_id_player'),F.col('playerType').alias('playerTypePlayer'))

In [63]:
goalies = goals_goalie.where("playerType = 'Goalie'").select('play_id_goalie',F.col('game_id').alias('game_id_goalie'),F.col('player_id').alias('player_id_goalie'),F.col('playerType').alias('playerTypeGoalie'))

In [67]:
df = players.join(goalies,players["play_id_player"] == goalies["play_id_goalie"])

In [94]:
edges = df.groupBy('player_id_player','player_id_goalie').agg(F.count('*').alias('Goals'))

In [148]:
final_edges = edges.select(F.col('player_id_player').alias('Source'),F.col('player_id_goalie').alias('Target'),'Goals').withColumn('Relationship',F.lit('GOAL'))

In [151]:
final_edges.repartition(1).write.csv('nhl_edges.csv',sep=",")

# Nodes

In [100]:
plays = sqlContext.read.option("header", "true").option("delimiter", ",").csv("game_plays.csv")

In [110]:
player_info = sqlContext.read.option("header", "true").option("delimiter", ",").csv("player_info.csv")

In [130]:
team_info = sqlContext.read.option("header", "true").option("delimiter", ",").csv("team_info.csv")

In [102]:
plays_filtered = plays.where("game_id > 2017090000")

In [117]:
players_list = goals_goalie.where("playerType IN ('Scorer','Goalie')").select('play_id_player','player_id','playerType')

In [118]:
players_teams = plays_filtered.join(players_list, plays_filtered["play_id"] == players_list["play_id_player"], how = 'right')

In [121]:
player_ids = players_teams.withColumn('team',F.expr("CASE WHEN playerType = 'Scorer' THEN team_id_for WHEN playerType = 'Goalie' THEN team_id_against END "))

In [155]:
player_team = player_ids.select('team',F.col('player_id').alias('player'),'playerType')

In [156]:
player_complete = player_team.join(player_info, player_team["player"] == player_info["player_id"], how = 'left')

In [157]:
nodes = (player_complete.join(team_info, team_info["team_id"] == player_complete["team"],how = 'left')
        .withColumn('name',F.expr("concat(firstName,' ',lastName)")).withColumn('fullTeamName',F.expr("concat(shortName,' ',teamName)"))
        .select('player_id','name','nationality','primaryPosition','playerType','birthDate','fullTeamName','abbreviation').distinct())

In [158]:
nodes.printSchema()

root
 |-- player_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- primaryPosition: string (nullable = true)
 |-- playerType: string (nullable = true)
 |-- birthDate: string (nullable = true)
 |-- fullTeamName: string (nullable = true)
 |-- abbreviation: string (nullable = true)



In [159]:
nodes.repartition(1).write.csv('nodes',sep=",")