## Slowly Changing Dimension Type 2 

Use merge functionality to upserts records to a dimension table ( in delta lake format)

In [None]:
inputPath = 'abfss://container1@553701adls.dfs.core.windows.net/inputfile'

Reading the data into the dataframe. Providing the schema details and header as true for this CSV file. Read the data into a DataFrame. We supply the schema.

In [None]:
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType

inputSchema = StructType([
  StructField("CustomerID", IntegerType(), True),
  StructField("CustomerAddress", StringType(), True)  
])


rawDataDF = (spark.read
  .option("header", "true")
  .schema(inputSchema)
  .csv(inputPath)
            )

In [None]:
display(rawDataDF)

Writing the orginal file to deltafile folder

In [None]:
DataPath = 'abfss://container1@553701adls.dfs.core.windows.net/deltafile'

In [None]:
# write to Delta Lake
rawDataDF.write.mode("overwrite").format("delta").save(DataPath)

Creating a SCD Type 2 by adding columns Valid_From, Valid_To and a surrogate key (CustomerSK)

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
updatedDF = rawDataDF.withColumn('Valid_To', to_timestamp(lit('12-24-2099 12:01:19.000'),'MM-dd-yyyy HH:mm:ss.SSSS').cast("timestamp"))\
.withColumn('Valid_From',current_timestamp().cast("timestamp"))\
.withColumn("CustomerSK", row_number().over(Window.orderBy('CustomerID')))


  

In [None]:
updatedDF.write.mode("overwrite").format("delta").option("mergeSchema","true").save(DataPath)

In [None]:
spark.sql("""
  DROP TABLE IF EXISTS customer_data
""")
spark.sql("""
  CREATE TABLE customer_data
  USING DELTA
  LOCATION '{}'
""".format(DataPath))

In [None]:
%%sql
DESCRIBE customer_data

In [None]:
%%sql
select * from customer_data

In [None]:
newFile = 'abfss://container1@553701adls.dfs.core.windows.net/newfile'
newDataDF = (spark
  .read
  .option("header", "true")
  .schema(inputSchema)
  .csv(newFile))

In [None]:
display(newDataDF)

In [None]:
# write to Delta Lake
newDataDF.write.mode("overwrite").format("delta")

In [None]:
newDataDF.createOrReplaceTempView("new_customer_data")

In [None]:
%%sql
select * from new_customer_data

Finding the customers that already exists in the orginal table

In [None]:
%%sql
 SELECT
null AS CustomerSK, CustomerID, CustomerAddress, current_timestamp() AS Valid_From, null AS Valid_To
FROM new_customer_data
UNION ALL
SELECT
CustomerSK, CustomerID, CustomerAddress, Valid_From, Valid_To
FROM customer_data
WHERE CustomerID IN
(SELECT CustomerID FROM new_customer_data)


In [None]:
%%sql
MERGE INTO customer_data
USING
 

( SELECT
null AS CustomerSK, CustomerID, CustomerAddress, current_timestamp() AS Valid_From, null AS Valid_To
FROM new_customer_data
UNION ALL
SELECT
CustomerSK, CustomerID, CustomerAddress, Valid_From, Valid_To
FROM customer_data
WHERE CustomerID IN
(SELECT CustomerID FROM new_customer_data)
) ChangeRows
 
-- merge based on the surrogate key
ON customer_data.CustomerSK = ChangeRows.CustomerSK
 
-- if there is a match do this…
WHEN MATCHED THEN
UPDATE SET customer_data.Valid_To = current_timestamp()
 
-- if there is no match insert new row
WHEN NOT MATCHED THEN INSERT *

In [None]:
%%sql
select * from customer_data

In [None]:
%%sql
SELECT  ROW_NUMBER() OVER (ORDER BY CustomerSK NULLS LAST)  AS CustomerSK 
FROM customer_data

In [None]:
df = sqlContext.table("customer_data")

In [None]:
display(df)

In [None]:
%%sql
Select * from customer_data

In [None]:
updated_sk = df.selectExpr(
"ROW_NUMBER() OVER (ORDER BY CustomerSK NULLS LAST)  AS CustomerSK", 
"CustomerID", "CustomerAddress", "Valid_From", "Valid_To"
)

In [None]:
display(updated_sk)

In [None]:
updated_sk.write.mode("overwrite").format("delta").save(DataPath)

In [None]:
%%sql
Select * from customer_data