Import needed libraries

In [2]:
%%pyspark

 from pyspark.sql.types import *
 from pyspark.sql.functions import *
 from pyspark.sql import SparkSession
 from pyspark.sql.functions import to_date

Read in all datasets to dataframes

In [3]:
df_games = spark.read.parquet('abfss://nhlroot@simondemosimondemo.dfs.core.windows.net/nhlrawgamelevel/rawgamelevel.parquet')
display(df_games.limit(20))

In [4]:
df_teams = spark.read.csv('abfss://nhlroot@simondemosimondemo.dfs.core.windows.net/teams.csv', header=True)
display(df_teams)

In [5]:
df_seasons = spark.read.csv('abfss://nhlroot@simondemosimondemo.dfs.core.windows.net/seasons.csv', header=True)
display(df_seasons)

Select wanted columns for the games facts table

In [6]:
df_cleangames = df_games.select("gameId", "gameDate", "playerTeam", "opposingTeam", "season", "situation", "goalsFor", "goalsAgainst", "shotsOnGoalFor", "shotsOnGoalAgainst", "penaltiesFor", "penaltiesAgainst", "penalityMinutesFor", "penalityMinutesAgainst", "playoffGame")
display(df_cleangames)

Clean up team IDs

In [7]:

df_cleangames = df_cleangames.withColumn(
    'playerTeam', 
    when(col('playerTeam') == "L.A", "LAK")
    .when(col('playerTeam') == "N.J", "NJD")
    .when(col('playerTeam') == "S.J", "SJS")
    .when(col('playerTeam') == "T.B", "TBL")
    .otherwise(col('playerTeam'))
)

df_cleangames = df_cleangames.withColumn(
    'opposingTeam', 
    when(col('opposingTeam') == "L.A", "LAK")
    .when(col('opposingTeam') == "N.J", "NJD")
    .when(col('opposingTeam') == "S.J", "SJS")
    .when(col('opposingTeam') == "T.B", "TBL")
    .otherwise(col('opposingTeam'))
)


Cast columns to proper types

In [8]:
df_cleangames = df_cleangames.withColumn("playoffGame", col("playoffGame").cast(BooleanType()))
df_cleangames = df_cleangames.withColumn("gameId", col("gameId").cast(IntegerType()))
df_cleangames = df_cleangames.withColumn("gameDate", to_date(df_cleangames.gameDate, "yyyyMMdd"))
df_cleangames = df_cleangames.withColumn("gameDate", col("gameDate").cast(DateType()))
df_cleangames = df_cleangames.withColumn("playerTeam", col("playerTeam").cast(StringType()))
df_cleangames = df_cleangames.withColumn("opposingTeam", col("opposingTeam").cast(StringType()))
df_cleangames = df_cleangames.withColumn("season", col("season").cast(IntegerType()))
df_cleangames = df_cleangames.withColumn("situation", col("situation").cast(StringType()))
df_cleangames = df_cleangames.withColumn("goalsFor", col("goalsFor").cast(IntegerType()))
df_cleangames = df_cleangames.withColumn("goalsAgainst", col("goalsAgainst").cast(IntegerType()))
df_cleangames = df_cleangames.withColumn("shotsOnGoalFor", col("shotsOnGoalFor").cast(IntegerType()))
df_cleangames = df_cleangames.withColumn("shotsOnGoalAgainst", col("shotsOnGoalAgainst").cast(IntegerType()))
df_cleangames = df_cleangames.withColumn("penaltiesFor", col("penaltiesFor").cast(IntegerType()))
df_cleangames = df_cleangames.withColumn("penaltiesAgainst", col("penaltiesAgainst").cast(IntegerType()))
df_cleangames = df_cleangames.withColumn("penalityMinutesFor", col("penalityMinutesFor").cast(IntegerType()))
df_cleangames = df_cleangames.withColumn("penalityMinutesAgainst", col("penalityMinutesAgainst").cast(IntegerType()))

display(df_cleangames)

In [9]:
df_teams = df_teams.withColumn("TeamID", col("TeamID").cast(StringType()))
df_teams = df_teams.withColumn("Conference", col("Conference").cast(StringType()))
df_teams = df_teams.withColumn("Division", col("Division").cast(StringType()))
df_teams = df_teams.withColumn("TeamName", col("TeamName").cast(StringType()))
df_teams = df_teams.withColumn("City", col("City").cast(StringType()))
df_teams = df_teams.withColumn("State", col("State").cast(StringType()))
df_teams = df_teams.withColumn("Arena", col("Arena").cast(StringType()))
df_teams = df_teams.withColumn("Capacity", col("Capacity").cast(IntegerType()))
df_teams = df_teams.withColumn("Founded", col("Founded").cast(IntegerType()))
df_teams = df_teams.withColumn("Joined", col("Joined").cast(IntegerType()))
df_teams = df_teams.withColumn("General manager", col("General manager").cast(StringType()))
df_teams = df_teams.withColumn("Head coach", col("Head coach").cast(StringType()))
df_teams = df_teams.withColumn("Captain", col("Captain").cast(StringType()))

In [10]:
df_seasons = df_seasons.withColumn("Season", col("Season").cast(IntegerType()))
df_seasons = df_seasons.withColumn("NoTeams", col("NoTeams").cast(IntegerType()))
df_seasons = df_seasons.withColumn("RegGames", col("RegGames").cast(IntegerType()))
df_seasons = df_seasons.withColumn("Start", col("Start").cast(DateType()))
df_seasons = df_seasons.withColumn("Finish", col("Finish").cast(DateType()))
df_seasons = df_seasons.withColumn("Champion", col("Champion").cast(StringType()))

Check data metrics in fact table

In [24]:
# Duplicates
duplicate_rows = df_cleangames.count() - df_cleangames.dropDuplicates().count()
print(f"Number of duplicate rows: {duplicate_rows}")

# Unique values in columns
print()
print("Unique values in different columns:")
for column in df_cleangames.columns:
    print(f"{column}: {df_cleangames.select(column).distinct().count()} distinct values")

# Missing values
print()
print("Number of null values in each column:")
df_cleangames.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df_cleangames.columns)).show()

# Check schema
df_cleangames.printSchema()

Write dfs to data warehouse

In [12]:
# import com.microsoft.spark.sqlanalytics
# from com.microsoft.spark.sqlanalytics.Constants import Constants
# from pyspark.sql.functions import col

# df_teams.write.mode("overwrite").synapsesql("nhlsqlpool.dbo.dimteamsg")
# df_seasons.write.mode("overwrite").synapsesql("nhlsqlpool.dbo.dimseasonsg")
# df_cleangames.write.mode("overwrite").synapsesql("nhlsqlpool.dbo.factgamesg")