# Slowly Changing Dimensions in Delta Lake
--notebook tested on my local Spark 3.0 installation, as well as in Databricks Community Edition

SCD's (Slowly Changing Dimensions) are fairly easy to implement with Spark 3.0 and Delta Lake.
In the traditional data warehousing world, where everything is stored in SQL tables, we used to have a construction like the below (from Kimball's [Data Warehouse toolkit](https://www.bookdepository.com/The-Data-Warehouse-Toolkit-Ralph-Kimball/9781118530801?redirected=true&utm_medium=Google&utm_campaign=Base1&utm_source=GR&utm_content=The-Data-Warehouse-Toolkit&selectCurrency=EUR&w=AFFMAU9SYY661SA8V9F5)) for a Type 2 SCD:

![SCD2 Example](img/scd2_ex.png)

Here we have a **Products** dimension table which stores data from one or more source systems. As we have a type 2 SCD, every time a record is updated in source, a new record is added into the dimension table, with the updated version of the source record. There are various records for the same source entity, showing the 'condition' of the source record in various points in time, Historical information is retained this way. The primary key of the record in the source system (e.g. our ERP) is the **SKU** column (hence the NK - natural key designation). The extra columns we have in our SCD 2 Product Dim are the following:
- **Product Key**: this is the PK (primary key) in the DIM Product table itself. As we have multiple rows for each original record in the source system, to retain the various versions, we can have multiple records with the same NK but with a different PK, of course. This PK has no intrisic meaning, hence it's also called a surrogate key, and it is an auto-increment field in most cases.
- **Row Effective Date**: this shows when this particular record (so this version of the source record) was loaded from the source system. The first time we load the dimension table, it is customary to use a date in the distant past, like 1/1/1900 or something. 
- **Row Expiration Date**: this shows when the record/version stops being active. So every time an SCD2 attribute is changed in the source system, the expiration date of currently active record gets updated to the date (or datetime) of the current loading batch, and another record is added with Effective date having the same batch load date, and Expiration Date something like 12/31/9999, to indicate it's the currently active record.
- **Current Row Indicator**: this is a flag that shows if the record is active or has expired. As an expiration date of 12/31/9999 shows the record is active too, sometimes this field is ommitted. It could also be used to indicate a record has been deleted from the source, or we could use another field for that.

We can do the same thing in Delta Lake, although it's not necessary. To start with it, let's load a sample file, which will play the role of the source system. Now please note, that if you have installed Spark 3.0 and pyspark enabled Jupyter in your laptop, and want to try this there, you should kick this off using something like
> pyspark --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

as per the [documentation](https://docs.delta.io/latest/delta-batch.html#-sql-support) and the guys who wrote [Learning Spark 2nd ed.](https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/), in order to have the Delta Lake and SQL capabilities available in your Jupyter session. For Databricks Community ed. just import the notebook and upload the csv files and you're set.

In [57]:
#define source file
sourceFile = "files/customers1.csv"

#for Databricks Community Ed. use the below instead, after you've uploaded the file in an appropriate folder there
#sourceFile =  "dbfs:/FileStore/shared_uploads/youremail@yourprovider.com/customers1.csv" 

# Configure Delta Lake table path
deltaPath = "/tmp/dim_customer"

#read file into spark data frame
df1a = (spark.read.format("csv")
.option("header", "true")
.load(sourceFile))

#show contents
df1a.show()

+---+------------------+--------+
| ID|              Name|    City|
+---+------------------+--------+
|  1| John Papadopoulos|  Athens|
|  2|   Matt Protopapas|  Athens|
|  3|  Michael Georgiou|Salonica|
+---+------------------+--------+



So we have a simple file showing our customers and the city they currently live. The primary key which uniquely defines a customer in our source system (the file) is **ID**. We now want to store this table in our Delta lake table **dim_customers** which will have an SCD Type 2 format. So we would like to have an auto-increment primary key along with the effective and exiration dates (we skip the 'current flag' for now). Given that auto-increments are not supported, as the data might reside in different partitions (and different computers in fact), we'll use monotonicaly increasing ID's instead.

In [58]:
from pyspark.sql.functions import *


df1b = (df1a
      .withColumn("CustomerKey", monotonically_increasing_id())
      .withColumn("ValidFrom",to_date(lit("01/01/1900"), "MM/dd/yyyy"))
      .withColumn("ValidTo",to_date(lit("12/31/9999"),"MM/dd/yyyy")))

Taditionally, the record with PK=0 in the dimension is a 'blank' record which is used to show the absence of a dimension entity. For example if the customer was not recorded for a sale, we would need to map the specific sale fact onto that 'blank' record in the dimension. It's not that difficult to add such a record in our data frame but we'll skip it for now. So, we're ready to write the first batch of the customer dimension into our table.

In [59]:
df1b.write.format("delta").saveAsTable('dimCustomer')
#spark.sql("CREATE TABLE dimCustomer USING DELTA LOCATION 'home/matt/Code/modern-examples/pyspark'")
spark.sql("Select * from dimCustomer").show()

+---+------------------+--------+-----------+----------+----------+
| ID|              Name|    City|CustomerKey| ValidFrom|   ValidTo|
+---+------------------+--------+-----------+----------+----------+
|  1| John Papadopoulos|  Athens|          0|1900-01-01|9999-12-31|
|  2|   Matt Protopapas|  Athens|          1|1900-01-01|9999-12-31|
|  3|  Michael Georgiou|Salonica|          2|1900-01-01|9999-12-31|
+---+------------------+--------+-----------+----------+----------+



Now we can load the second batch. We'll assume the source table evolved to the state described in the file below, with one record added and one updated.

In [60]:
#define source file
sourceFile = "files/customers2.csv"

#for Databricks Community Ed. use the below instead, after you've uploaded the file in an appropriate folder there
#sourceFile =  "dbfs:/FileStore/shared_uploads/youremail@yourprovider.com/customers2.csv" 

#read file into spark data frame
df2a = (spark.read.format("csv")
.option("header", "true")
.load(sourceFile))

#show contents
df2a.show()

+---+------------------+--------+---------------------------+
| ID|              Name|    City|LastUpdatedCreatedTimestamp|
+---+------------------+--------+---------------------------+
|  1| John Papadopoulos|Salonica|        2021/08/29 00:57.12|
|  2|   Matt Protopapas|  Athens|        2015/06/15 00:00.00|
|  3|  Michael Georgiou|Salonica|        2019/04/12 12:55.34|
|  4|       John Bishop|  Patras|        2021/08/30 10:00.00|
+---+------------------+--------+---------------------------+



The extra column shows when the record was last updated (or inserted) and will be used later. John Papadopoulos moved to Salonica and John Bishop from Patras was added as well. We now want to add the ValidFrom, ValidTo columns as before, assuming a daily batch (which supposingly runs on 31Aug2021), but also to add the surrogate keys for the new records. Note that 2 new records should be added. One for the new John Bishop record and another one for the updated John Papadopoulos record. As the data are distributed to clusters, we need to get the max surrogate key already stored in all the data and ensure all new surrogate keys are greater than the max SK already there. Let's see how it can be done.

In [73]:
#get max SK as a bigint into maxSK variable
maxSK=spark.sql("SELECT max(CustomerKey) as maxSK FROM dimCustomer").collect()[0]["maxSK"]

#Date the daily ETL is run
ETLRefDate=lit("08/31/2021")

#create data frame of new source table state
df2b = (df2a
      .withColumn("CustomerKey", monotonically_increasing_id()+maxSK+1)
      .withColumn("ValidFrom",to_date(ETLRefDate, "MM/dd/yyyy"))
      .withColumn("ValidTo",to_date(lit("12/31/9999"),"MM/dd/yyyy")))

####Probably the below is not needed
#create a delta store and a temp view on top of it
deltaPath2 = "/tmp/updates"
df2b.write.format("delta").save(deltaPath2)
spark.read.format("delta").load(deltaPath2).createOrReplaceTempView("updates")

#have a look at the data in the updates view
spark.sql("SELECT * FROM updates").show()

+---+------------------+--------+---------------------------+-----------+----------+----------+
| ID|              Name|    City|LastUpdatedCreatedTimestamp|CustomerKey| ValidFrom|   ValidTo|
+---+------------------+--------+---------------------------+-----------+----------+----------+
|  1| John Papadopoulos|Salonica|        2021/08/29 00:57.12|          3|2021-08-31|9999-12-31|
|  2|   Matt Protopapas|  Athens|        2015/06/15 00:00.00|          4|2021-08-31|9999-12-31|
|  3|  Michael Georgiou|Salonica|        2019/04/12 12:55.34|          5|2021-08-31|9999-12-31|
|  4|       John Bishop|  Patras|        2021/08/30 10:00.00|          6|2021-08-31|9999-12-31|
+---+------------------+--------+---------------------------+-----------+----------+----------+



In [83]:
#and now for the SCD2 merge

spark.sql( """
  MERGE INTO dimCustomer USING (
  
    select updates.ID as mergeKey, updates.* 
    FROM updates

    UNION ALL

    SELECT NULL as mergeKey, updates.*
    FROM updates JOIN dimCustomer
    on dimCustomer.ID=updates.ID
    WHERE dimCustomer.ValidTo = '9999-12-31' AND (dimCustomer.Name<>updates.Name OR dimCustomer.City <> updates.City)
  ) stg_upd
  ON dimCustomer.ID = stg_upd.mergeKey
  
  
  WHEN MATCHED AND dimCustomer.ValidTo = '9999-12-31'  AND (dimCustomer.Name<>stg_upd.Name OR dimCustomer.City <> stg_upd.City)
   THEN UPDATE SET dimCustomer.ValidTo = stg_upd.ValidFrom    
  WHEN NOT MATCHED
    THEN INSERT(ID,Name,City,CustomerKey,ValidFrom,ValidTo) VALUES (ID,Name,City,CustomerKey,ValidFrom,ValidTo)
""")

spark.sql("select * from dimCustomer").show()

+---+------------------+--------+-----------+----------+----------+
| ID|              Name|    City|CustomerKey| ValidFrom|   ValidTo|
+---+------------------+--------+-----------+----------+----------+
|  1| John Papadopoulos|  Athens|          0|1900-01-01|2021-08-31|
|  2|   Matt Protopapas|  Athens|          1|1900-01-01|9999-12-31|
|  3|  Michael Georgiou|Salonica|          2|1900-01-01|9999-12-31|
|  1| John Papadopoulos|Salonica|          3|2021-08-31|9999-12-31|
|  4|       John Bishop|  Patras|          4|2021-08-31|9999-12-31|
+---+------------------+--------+-----------+----------+----------+



The strange MERGE is due to Spark SQL not supporting INSERT clauses in the 'WHEN MATCHED' piece. So the first part updates the ValidTo of the current records that have now changed and the second part inserts both the new records and the new version of the changed records that are already in dimCustomer [check this for more info](https://docs.databricks.com/_static/notebooks/merge-in-scd-type-2.html). To be perfectly honest, I never got this merge to run on my laptop, as the stars, jars and config options would not align no matter how hard I tried. But then again, that's why we have the cloud...

Also note that trying to get the max SK is quite sub-optimal, especially since in Spark (and big data workloads in general) mini-batches are preferred against one big daily batch. So, if we needed to get the max SK across the cluster each time we run a mini-batch (which could be a matter of seconds) then that would take most of the time / resources in the mini batch, which wouldn't be very efficient. 

In reality we don't need the surrogate keys at all, as we could join the facts to the dimension using the source system PK (in conjunction with a unique identifier for the source system if many were in scope) and the valid from/to columns. To do that within a mini-batch context, we'd rather have very accurrate valid from / to values, perhaps to the milisecond, not date. After all, this was common practice even in the old days of daily batches. Hence the use for the LastUpdatedCreatedTimestamp. Note that this was not needed in the initial load (csv 1) as there the columns were set with preset values.

One might be tempted not to use those columns either, as DeltaLake provides for full history, along with a set of timestamps on when the record was inserted / updated in the Delta Table. However as those timestamps reflect the time the record was upserted in the spark data store and not the source system, we might have discrepanceis as fact tables are joined to dims that might be different at the time of the actual event in the source.

So we'll proceed with creating another DIM_Customer table with the better structure and add the 'is_current' flag along the way.

In [36]:
from pyspark.sql.functions import *

#define source csv 1
sourceFile = "files/customers1.csv"

#for Databricks Community Ed. use the below instead, after you've uploaded the file in an appropriate folder there
#sourceFile =  "dbfs:/FileStore/shared_uploads/youremail@yourprovider.com/customers1.csv" 

#read csv 1 file into delta table
(spark.read.format("csv")
    .option("header", "true")
    .load(sourceFile)
    .withColumn("ValidFrom",to_timestamp(lit("01/01/1900"), "MM/dd/yyyy"))
    .withColumn("ValidTo",to_timestamp(lit("12/31/9999"),"MM/dd/yyyy"))
    .withColumn("is_current",lit(1).cast('Boolean'))
    .write.format("delta").saveAsTable('DIM_Customer'))


#define source csv 2
sourceFile2 = "files/customers2.csv"

#for Databricks Community Ed. use the below instead, after you've uploaded the file in an appropriate folder there
#sourceFile2 =  "dbfs:/FileStore/shared_uploads/youremail@yourprovider.com/customers2.csv" 

#create temp view for csv 2
(spark.read.format("csv")
    .option("header", "true")
    .load(sourceFile2)
    .withColumn("LastUpdatedCreatedTimestamp",to_timestamp(col("LastUpdatedCreatedTimestamp"),"yyyy/MM/dd HH:mm.ss"))
    .withColumn("ValidTo",to_timestamp(lit("12/31/9999"),"MM/dd/yyyy"))
    .withColumn("is_current",lit(1).cast('Boolean'))
    .createOrReplaceTempView("updates_view"))

#merge
spark.sql( """
  MERGE INTO DIM_CUSTOMER USING (
  
    select updates_view.ID as mergeKey, updates_view.* 
    FROM updates_view

    UNION ALL

    SELECT NULL as mergeKey, updates_view.*
    FROM updates_view JOIN DIM_Customer
    on DIM_Customer.ID=updates_view.ID
    WHERE DIM_Customer.is_current = true AND (DIM_Customer.Name<>updates_view.Name OR DIM_Customer.City <> updates_view.City)
  ) stg_upd
  ON DIM_Customer.ID = stg_upd.mergeKey
  
  
  WHEN MATCHED AND DIM_Customer.is_current = true  AND (DIM_Customer.Name<>stg_upd.Name OR DIM_Customer.City <> stg_upd.City)
   THEN UPDATE SET DIM_Customer.is_current = false, DIM_Customer.ValidTo = stg_upd.LastUpdatedCreatedTimestamp
  WHEN NOT MATCHED
    THEN INSERT(ID,Name,City,ValidFrom,ValidTo,is_current) VALUES (ID,Name,City,LastUpdatedCreatedTimestamp,ValidTo,is_current)
""")
