In [0]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from delta.tables import DeltaTable

In [0]:
init_load_flag = int(dbutils.widgets.get('init_load_flag'))

### Data Reading From Source

In [0]:
(
    spark
    .read
    .format("delta")
    .table("databricks_cata.silver.customers")
).display()

In [0]:
df = (
    spark
    .sql(
        """
        SELECT *
        FROM databricks_cata.silver.customers
        """
    )
)


In [0]:
df.display()

In [0]:
# Removing duplicates
df = (
    df
    .dropDuplicates(subset=['customer_id'])
)
df.limit(20)

### Dividing New vs Old Records

In [0]:
if init_load_flag == 0:
    df_old = (
        spark
        .sql(
            """
            SELECT DimCustomerKey AS old_DimCustomerKey, customer_id AS old_customer_id, create_date AS old_create_date, update_date AS old_update_date
            FROM databricks_cata.gold.DimCustomers;
            """
        )
    )
else:
    df_old = (
        spark
        .sql(
            """
            SELECT 0 AS old_DimCustomerKey, 0 AS old_customer_id, 0 AS old_create_date, 0 AS old_update_date
            FROM databricks_cata.silver.customers
            WHERE 1=0;
            """
        )
    )

In [0]:
display(df_old)

### Applying Join of New Records with Old Records

In [0]:
df_joined = (
    df
    .join(
        other=df_old,
        on=df['customer_id']==df_old['old_customer_id'],
        how='left'
    )
)

In [0]:
df_joined.display()

### Separating New vs Old Records

In [0]:
df_new = (
    df_joined
    .filter(
        F.col('old_DimCustomerKey').isNull()
    )
)
df_new.display()

In [0]:
df_old = (
    df_joined
    .filter(
        F.col('old_DimCustomerKey').isNotNull()
    )
)
df_old.display()

### Preparing df_old

In [0]:
# Dropping all the columns which are not required
df_old = (
    df_old
    .drop('old_customer_id', 'old_update_date')
)

# Renaming 'old_DimCustomerKey' to 'DimCustomerKey' 
df_old = (
    df_old
    .withColumnRenamed('old_DimCustomerKey', 'DimCustomerKey')
)

# Renaming 'old_create_date' to 'create_date' 
df_old = (
    df_old
    .withColumnRenamed('old_create_date', 'create_date')
    .withColumn('create_date',  F.to_timestamp('create_date'))
)

# Recreating 'update_date' column with current timestamp
df_old = (
    df_old
    .withColumn('update_date', F.current_timestamp())
)
df_old.display()

### Preparing df_new

In [0]:
df_new.display()

In [0]:
# Dropping all the columns which are not required
df_new = (
    df_new
    .drop('old_DimCustomerKey', 'old_customer_id', 'old_create_date', 'old_update_date')
)

# Creating 'create_date' column with current timestamp
df_new = (
    df_new
    .withColumn('create_date',  F.current_timestamp())
)

# Creating 'update_date' column with current timestamp
df_new = (
    df_new
    .withColumn('update_date', F.current_timestamp())
)
df_new.display()

### Create Surrogate Keys From 1

In [0]:
df_new = (
    df_new
    .withColumn('DimCustomerKey', F.monotonically_increasing_id() + F.lit(1))
)
df_new.display()

### Adding Max Surrogate Key

In [0]:
if init_load_flag == 1:
    max_surrogate_key = 0
else:
    max_surrogate_key = (
        spark
        .sql(
            """
            SELECT
                MAX(DimCustomerKey) AS max_surrogate_key
            FROM databricks_cata.gold.DimCustomers
            """
        )
        .collect()[0]['max_surrogate_key']
    )

In [0]:
print(max_surrogate_key)

In [0]:
df_new = (
    df_new
    .withColumn('DimCustomerKey', F.col('DimCustomerKey') + F.lit(max_surrogate_key))
)
df_new.display()

### Union of df_old and df_new

In [0]:
df_final = (
    df_new
    .unionByName(df_old)
)

In [0]:
df_final.display()

### SCD Type 1

In [0]:
# if init_load_flag == 1:
if spark.catalog.tableExists('databricks_cata.gold.DimCustomers'):
    target_delta_table = DeltaTable.forPath(spark, 'abfss://gold@databricksstorageete.dfs.core.windows.net/DimCustomers')
    (target_delta_table
        .alias('target')
        .merge(
            source=df_final.alias('source'),
            condition=F.expr('target.DimCustomerKey = source.DimCustomerKey')
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )
else:
    (
        df_final
        .write
        .format('delta')
        .mode('overwrite')
        .option('path', 'abfss://gold@databricksstorageete.dfs.core.windows.net/DimCustomers')
        .saveAsTable('databricks_cata.gold.DimCustomers')
    )

In [0]:
%sql
SELECT * FROM databricks_cata.gold.DimCustomers;