## Config

In [0]:
# catalog and schema creation
spark.sql('create catalog if not exists scd_2_demo')
spark.sql('create schema if not exists scd_2_demo.bronze')
spark.sql('create schema if not exists scd_2_demo.silver')
spark.sql('create schema if not exists scd_2_demo.gold')

DataFrame[]

In [0]:
# importing libraries
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta import *

In [0]:
# defining variables
v_timestamp = dbutils.widgets.get('p_timestamp')
v_customers_path = 'abfss://dbw-demo@stdaukdbwdemodevsuk001.dfs.core.windows.net/scd_2_demo/*.csv'

## Bronze

In [0]:
# schema for customers data
v_schema = StructType(
    [StructField("customer_id", IntegerType(), False), 
     StructField("company_name", StringType(), False)])

In [0]:
# processed_time column
df_customers_bronze = spark.read.option('header',True).schema(v_schema).csv(v_customers_path).withColumn('processed_time', to_timestamp(lit(v_timestamp)))

In [0]:
# writing to bronze table
df_customers_bronze.write.format('delta').mode('append').saveAsTable('scd_2_demo.bronze.customers')

## Silver

- https://learn.microsoft.com/en-us/azure/databricks/delta/merge
- https://docs.delta.io/latest/api/python/index.html

In [0]:
try:
    # defining the delta table and dataframes
    target_delta = DeltaTable.forName(spark, "scd_2_demo.silver.customers_scd_2")
    df_customers_bronze = spark.read.table('scd_2_demo.bronze.customers').filter(col('processed_time')==to_timestamp(lit(v_timestamp))).select('customer_id','company_name')

    # merge operation
    (target_delta.alias('target')
    .merge(df_customers_bronze.alias('source'), "source.customer_id = target.customer_id")
    .whenMatchedUpdate(
       condition="target.end_time is null",
       set={"target.end_time": lit(v_timestamp)}
   )\
   .execute()
    )

    # Appending the new records
    df_customers_bronze = df_customers_bronze.\
                            withColumn('effective_time', to_timestamp(lit(v_timestamp))).\
                            withColumn('end_time', lit(None)).select('customer_id','company_name', 'effective_time', col('end_time').cast('timestamp'))

    df_customers_bronze.write.mode('append').saveAsTable('scd_2_demo.silver.customers_scd_2')

except:
    # The except block should only run the first time this notebook has been executed
    # This block will run if the previous block fails, this will be because a silver table doesn't exist yet
    df_customers_bronze = spark.read.table('scd_2_demo.bronze.customers')

    df_customers_silver = df_customers_bronze.\
                        withColumn('effective_time', to_timestamp(lit(v_timestamp))).\
                        withColumn('end_time', lit(None).cast('timestamp')).\
                        select('customer_id', 'company_name','effective_time', 'end_time')

    df_customers_silver.write.saveAsTable('scd_2_demo.silver.customers_scd_2')

## Gold

In [0]:
# filtering silver layer table to a df while returning only active records, storing the result into a variable
df_customers_gold = spark.read.table('scd_2_demo.silver.customers_scd_2').filter('end_time is null').select('customer_id', 'company_name')

In [0]:
# writing to gold table
df_customers_gold.write.mode('overwrite').saveAsTable('scd_2_demo.gold.customers_active')