# ETL - Creating Fact and Dim Tables #

This notebook loads the Chess.com and Lichess staging tables from .parquet files and performs these final minor transformations on them:

1. Removing the extra `rated` column in the the Chess.com staging table before we union the tables.
2. Creating an `id` column by hashing the `game_end_time`, `white_username` and `black_username` columns.
3. We also create the the following `*_id` columns using Spark SQL's [SHA1 hash function](https://spark.apache.org/docs/2.3.0/api/sql/index.html#sha1):

* `game_id` hashed from `game_end_time` + `white_username` + `black_username`
* `white_id` hashed from `white_username`
* `black_id` hashed from `black_username`
* `opening_id` hashed from `opening`
* `time_class_id` hashed from `time_class`
* `platform_id` hased from `platform`

4. We use PySpark's [dropDuplicates](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.dropDuplicates.html) function to remove rows that have the same (SHA1 hashed) `game_id` column.

Currently this removes `50250` rows from the final Chess.com + Lichess combined `games` table dataframe.

5. We use PySpark's `.cast()` function to set the correct data types for the date and rating columns.

6. We run the following data quality checks:

a) Check for nulls in the `game_id` column

b) Check that all values in the `games.game_id` column are distinct (no dupes)

Finally, we write the cleaned fact and dim tables to a new set of `.parquet` files so these tables can be loaded by our final `analytics.ipynb` notebook.

In [8]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark import SQLContext
import pandas as pd
import yaml, os

In [11]:
with open(r'config/dl-chessdotcom.yaml') as file:
    config = yaml.load(file,Loader=yaml.SafeLoader)

os.environ['AWS_ACCESS_KEY_ID']=config['aws_access_key_id']
os.environ['AWS_SECRET_ACCESS_KEY']=config['aws_secret_key_id']

In [12]:
spark = SparkSession \
    .builder \
    .appName("Lichess and Chess.com Staging -> Fact tables") \
    .getOrCreate()

spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key",config['aws_access_key_id'])
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key",config['aws_secret_key_id'])

In [13]:
# Set which outpath path we need - either local or s3.
# output_data = config['output_data_path_local']
output_data = config['output_data_path_s3']

## Load Chess.com Staging Table

In [14]:
df_chessdotcom = spark.read.parquet("s3a://" + output_data + "staging/chessdotcom/games/")  

### Remove extra `rating` column

There is an extra `rating` column in the Chess.com table, that we need to remove in order to union it correctly with the Lichess table.

In [15]:
df_chessdotcom = df_chessdotcom.drop("rated") #Remove extra column in Chess.com staging table.

#df_chessdotcom.printSchema()

In [16]:
#df_chessdotcom.limit(10).toPandas()

## Load Lichess Staging Table

In [17]:
df_lichess = spark.read.parquet("s3a://" + output_data + "staging/lichess/games/")

In [18]:
#df_lichess.printSchema()

## Combine Chess.com + Lichess `games` Table Dataframes

In [19]:
df_merged = df_chessdotcom.union(df_lichess)

In [20]:
#df_merged.count()

### Create `game_id` column

In [21]:
df_merged_with_game_id = df_merged.withColumn("game_id", F.sha1(F.concat(F.col("game_end_time"),F.col("white_username"),F.col("black_username"))))

### Create `white_id` and `black_id` columns

In [22]:
df_merged_with_white_id = df_merged_with_game_id.withColumn("white_id", F.sha1(F.col("white_username")))

In [23]:
df_merged_with_black_id = df_merged_with_white_id.withColumn("black_id", F.sha1(F.col("black_username")))

### Create `opening_id` column

In [24]:
df_merged_with_opening_id = df_merged_with_black_id.withColumn("opening_id", F.sha1(F.col("opening")))

### Create `time_class_id` column

In [25]:
df_merged_with_tc_id = df_merged_with_opening_id.withColumn("time_class_id", F.sha1(F.col("time_class")))

### Create `platform_id` column

In [26]:
df_merged_with_plaform_id = df_merged_with_tc_id.withColumn("platform_id", F.sha1(F.col("platform")))

### Create`year`column

In [27]:
df_merged_with_year = df_merged_with_plaform_id.withColumn("year", F.year(F.col('game_end_date')))

In [28]:
df_merged_with_year.limit(10).toPandas()

Unnamed: 0,game_end_time,game_end_date,time_class,white_username,white_rating,black_username,black_rating,winner,termination,opening,moves,platform,game_id,white_id,black_id,opening_id,time_class_id,platform_id,year
0,2019-07-05 06:04:38,2019-07-05,bullet,AnthonyWirig,2693.0,Byniolus,2605.0,,Game drawn by repetition,Slav Defense Modern Line,1. d4 {[%clk 0:00:59.9]} 1... d5 {[%clk 0:00:5...,chessdotcom,7945d55f55a90303f8e3a0e27b79bb2ce4b6b71b,30f50d341f994926ef531a5a9ab913f6d1a9a659,0a18e34ad8d152e2d54ac1e3ca4843768768b2c8,1bf7dcd630549c20df5ea7e0c9a00111dc628ff0,472da2b94e9fa87badd16a55e1eaec4f53ffc52a,224fccb5995aa2f6fd208d59570e06a95a438283,2019
1,2019-07-05 06:07:14,2019-07-05,bullet,Byniolus,2615.0,AnthonyWirig,2683.0,white,Byniolus won on time,Reti Opening Sicilian Invitation 2.d4,1. Nf3 {[%clk 0:00:59.9]} 1... c5 {[%clk 0:00:...,chessdotcom,204e7ca0dc82aab9d8c76b951f6aa015f8224527,0a18e34ad8d152e2d54ac1e3ca4843768768b2c8,30f50d341f994926ef531a5a9ab913f6d1a9a659,dd60fc18f81cb65b19c8da336c99142664f007d1,472da2b94e9fa87badd16a55e1eaec4f53ffc52a,224fccb5995aa2f6fd208d59570e06a95a438283,2019
2,2019-07-05 06:09:44,2019-07-05,bullet,AnthonyWirig,2689.0,Byniolus,2609.0,white,AnthonyWirig won on time,Vant Kruijs Opening 1...d5,1. e3 {[%clk 0:00:59.9]} 1... d5 {[%clk 0:00:5...,chessdotcom,5a68679490dca4d5e1208c681d3d4302550e61be,30f50d341f994926ef531a5a9ab913f6d1a9a659,0a18e34ad8d152e2d54ac1e3ca4843768768b2c8,8159020ada7b3f84ca6b8e0000ba06e28a6e6321,472da2b94e9fa87badd16a55e1eaec4f53ffc52a,224fccb5995aa2f6fd208d59570e06a95a438283,2019
3,2019-07-05 06:11:00,2019-07-05,blitz,AverageRecord,2983.0,Firouzja2003,3032.0,black,Firouzja2003 won on time,Queens Pawn Opening Symmetrical Variation,1. d4 {[%clk 0:02:59.9]} 1... d5 {[%clk 0:02:5...,chessdotcom,82c2605f4ad83a33f32d7aa9b94ba23eb86931a6,6db210f0dad8997ca206c03cebe678f365cc65d3,a543b4d983c19658ef674e30404c2fff3b3e8bea,01e332f824ba36196b0ec09d3e1359d4def5463a,6d199aca996a9a8bff542e2bc10e3f0edc62cd07,224fccb5995aa2f6fd208d59570e06a95a438283,2019
4,2019-07-05 06:11:51,2019-07-05,bullet,Byniolus,2619.0,AnthonyWirig,2679.0,white,Byniolus won by checkmate,Englund Gambit Declined Reversed Alekhine Vari...,1. d4 {[%clk 0:00:59.9]} 1... e5 {[%clk 0:00:5...,chessdotcom,7d49674d83bdd829726775a0b3f2fcd19f19c185,0a18e34ad8d152e2d54ac1e3ca4843768768b2c8,30f50d341f994926ef531a5a9ab913f6d1a9a659,f4a0b065b5025458df464b512ebf85e0de3fa079,472da2b94e9fa87badd16a55e1eaec4f53ffc52a,224fccb5995aa2f6fd208d59570e06a95a438283,2019
5,2019-07-05 06:14:28,2019-07-05,bullet,AnthonyWirig,2670.0,Byniolus,2628.0,black,Byniolus won on time,Pirc Defense 2.Nc3 Nf6,1. e4 {[%clk 0:00:59.9]} 1... d6 {[%clk 0:00:5...,chessdotcom,13f372c4e892a0e67152c65baf3afc0b61528c25,30f50d341f994926ef531a5a9ab913f6d1a9a659,0a18e34ad8d152e2d54ac1e3ca4843768768b2c8,ee21bc01c89a5fc482bccd76ab2b5ad3d20ac48d,472da2b94e9fa87badd16a55e1eaec4f53ffc52a,224fccb5995aa2f6fd208d59570e06a95a438283,2019
6,2019-07-05 06:18:04,2019-07-05,blitz,TST2014,2588.0,AverageRecord,2985.0,black,AverageRecord won on time,Queens Gambit Declined Three Knights Variation,1. d4 {[%clk 0:02:59.9]} 1... d5 {[%clk 0:02:5...,chessdotcom,b30b13eea6fec55302c036787d2b9eb62e5c4155,6f4db437261bf2619cc3814f0e9f128c9a5aec34,6db210f0dad8997ca206c03cebe678f365cc65d3,1334c1ea28f9b12ff8629e815c451d2881199f0c,6d199aca996a9a8bff542e2bc10e3f0edc62cd07,224fccb5995aa2f6fd208d59570e06a95a438283,2019
7,2019-07-05 06:20:52,2019-07-05,blitz,transponster,2815.0,Alexander_Moskalenko,2856.0,black,Alexander_Moskalenko won by resignation,Kings Indian Defense 3.Nc3 d6,1. d4 {[%clk 0:02:59.9]} 1... Nf6 {[%clk 0:02:...,chessdotcom,389f83e77af76acb9a57f5f7dbe457a06ec2a141,01a6231e8c2433640181e3ba529284b966c67503,b1a6117173dc9001b21957f6dbea785430b8dd39,8ee4fc60be53b342da2c1816264591491166dcb8,6d199aca996a9a8bff542e2bc10e3f0edc62cd07,224fccb5995aa2f6fd208d59570e06a95a438283,2019
8,2019-07-05 06:25:57,2019-07-05,blitz,Alexander_Moskalenko,2855.0,transponster,2816.0,,Game drawn by agreement,English Opening Agincourt Defense 2.Nf3 d5,1. c4 {[%clk 0:02:59.9]} 1... e6 {[%clk 0:02:5...,chessdotcom,094b73b26d0b7eddcea6298555f20307a765a6ee,b1a6117173dc9001b21957f6dbea785430b8dd39,01a6231e8c2433640181e3ba529284b966c67503,f58a535100cd88471fc8f39400d8ffeb005cf48c,6d199aca996a9a8bff542e2bc10e3f0edc62cd07,224fccb5995aa2f6fd208d59570e06a95a438283,2019
9,2019-07-05 06:32:08,2019-07-05,blitz,transponster,2817.0,Alexander_Moskalenko,2854.0,,Game drawn by insufficient material,Kings Indian Defense Normal Variation 4.e4 O O,1. d4 {[%clk 0:02:59.9]} 1... Nf6 {[%clk 0:02:...,chessdotcom,01471743c661bc19ceb8bb3f4576e1bf59b0ee3e,01a6231e8c2433640181e3ba529284b966c67503,b1a6117173dc9001b21957f6dbea785430b8dd39,986138c42f455e507f39747c813de4c3d9ba5db4,6d199aca996a9a8bff542e2bc10e3f0edc62cd07,224fccb5995aa2f6fd208d59570e06a95a438283,2019


In [24]:
df_merged_with_year.count()

1047618

## Check Merged `Games` Dim Table for Dupes of `id` Column and Drop Them

In [29]:
df_dupes_dropped = df_merged_with_year.dropDuplicates(['game_id'])

In [79]:
df_dupes_dropped_count = df_dupes_dropped.count()

## Set Correct Data Types for Date and Rating Columns

In [30]:
df_dupes_dropped = df_dupes_dropped.withColumn("year",df_dupes_dropped.year.cast('int'))
df_dupes_dropped = df_dupes_dropped.withColumn("game_end_time",df_dupes_dropped.game_end_time.cast('timestamp'))
df_dupes_dropped = df_dupes_dropped.withColumn("game_end_date",df_dupes_dropped.game_end_date.cast('timestamp'))
df_dupes_dropped = df_dupes_dropped.withColumn("white_rating",df_dupes_dropped.white_rating.cast('int'))
df_dupes_dropped = df_dupes_dropped.withColumn("black_rating",df_dupes_dropped.black_rating.cast('int'))


### Create and Write Cleaned `openings` Dim Table

In [31]:
openings_dim_table = df_dupes_dropped.select(F.col("opening_id").alias("id"),"opening").distinct()


In [None]:
#openings_dim_table.limit(10).toPandas()

In [None]:
openings_dim_table.write.mode('append').parquet("s3a://" + output_data + "/dim/openings")

### Create and Write Cleaned `players` Dim Table

In [None]:
white_players_table = df_dupes_dropped.select("white_id","white_username").distinct()

In [None]:
black_players_table = df_dupes_dropped.select("black_id","black_username").distinct()

In [None]:
white_black_players_combined = white_players_table.union(black_players_table)

In [None]:
players_dim_table = white_black_players_combined.select(F.col("white_id").alias("id"),F.col("white_username").alias("username")).dropDuplicates(['id'])

In [None]:
#players_dim_table.limit(10).toPandas()

In [None]:
players_dim_table.write.mode('append').parquet("s3a://" + output_data + "/dim/players")

### Create and Write Cleaned `time_class` Dim Table

In [None]:
time_class_table = df_dupes_dropped.select((F.col("time_class_id").alias("id"),"time_class").distinct()


In [122]:
#time_class_table.limit(10).toPandas()

Unnamed: 0,time_class_id,time_class
0,7bff1b790fcdfa5016c20a07a145631da3fe3cfa,ultraBullet
1,f915e10481634c0a44492699b2b8a3657c334106,correspondence
2,dc94ac81aa982e5382b814d4d883bfdb2ed62ddf,rapid
3,2fe14b9b993ea6eb953f8129c7a1edede9792b77,daily
4,4a19573b7e72b7249d2271839c762ffe1f0452f3,classical
5,472da2b94e9fa87badd16a55e1eaec4f53ffc52a,bullet
6,6d199aca996a9a8bff542e2bc10e3f0edc62cd07,blitz


In [None]:
time_class_table.write.mode('append').parquet("s3a://" + output_data + "/dim/time_class")

### Create and Write Cleaned `platform` Dim Table

In [None]:
platform_table = df_dupes_dropped.select((F.col("platform_id").alias("id"),"platform").distinct()


In [None]:
#platform_table.limit(10).toPandas()

In [None]:
platform_table.write.mode('append').parquet("s3a://" + output_data + "/dim/platform")

## Run Data Quality Checks on the `games` Fact Table

### Check for nulls in the `game_id` column

In [46]:
df_final_games_table = df_dupes_dropped.select("game_id","year","game_end_time","game_end_date", "time_class_id", "white_id", "white_rating","black_id","black_rating","winner","termination","opening_id","moves","platform_id")

In [47]:
final_games_table_nulls = df_final_games_table.where(F.col("game_id").isNull())

In [53]:
final_games_table_nulls_pd = final_games_table_nulls.toPandas()

In [66]:
if final_games_table_nulls_pd.empty:
    print("Test Passed: No Null Values Found")
else:
    print(final_games_table_nulls_pd)
    raise AssertionError("Null values were found in the `game_id` column.")

  game_id  year       game_end_time game_end_date  \
0    None  2020 2020-07-20 02:23:50    2020-07-20   

                              time_class_id  \
0  7bff1b790fcdfa5016c20a07a145631da3fe3cfa   

                                   white_id  white_rating black_id  \
0  29cd8be4cbb4f1cc0a5a999f56052f8189458e55          2367     None   

   black_rating winner termination                                opening_id  \
0           NaN  black   outoftime  e708ca8d764a90c6eb30072b0587b103b188fde3   

                                               moves  \
0  Nf3 d6 e3 Nh6 Nc3 Bd7 d3 Nc6 Be2 e6 O-O d5 d4 ...   

                                platform_id  
0  e14d455ef2680fa21e1299985922a77b8cd28eaf  


AssertionError: Null values were found in the `game_id` column.

### Check that all values in the `game_id` column are distinct (no dupes)

In [64]:
final_games_table_dupes = df_final_games_table.select(F.countDistinct("game_id"))

In [65]:
final_games_table_dupes_pd = final_games_table_dupes.toPandas()

In [75]:
game_id_distinct = final_games_table_dupes_pd.set_axis(["game_id_distinct"],axis=1)

In [101]:
# We set `df_dupes_dropped_count` above, when  we merge the Chess.com and  Lichess datasets together 
# and then drop the dupes via:
#
# > df_dupes_dropped = df_merged_with_year.dropDuplicates(['game_id'])
#
# In this data quality check we see whether the number of distinct fields in the `game_id` column
# is less than this variable. If so, it may indicate that there other issues with the data in this column.

if df_dupes_dropped_count - game_id_distinct.values[0] == 0: 
    print("Test Passed: No Duplicate Values Found")
else:
    raise AssertionError("Current distinct value count in the `game_id` column does not match the df_dupes_dropped_count` that we recorded earlier.")

AssertionError: Current distinct values in the `game_id` column do not match the df_dupes_dropped_count` that we recorded earlier.

## Write Cleaned and Data Quality-checked `games` Fact Table

**Warning: depending on your system / Spark cluster resources, this write can take a while!**

In [None]:
df_final_games_table.write.mode('append').partitionBy("year","time_class_id").parquet("s3a://" + output_data + "/fact/game")