## Dim Customers
**Customer table is a dimention table in this project where orders is Fact table and customers and products are the dim table with SCD_type1 and SCD_type2 implementation respectively.**

### Data Reading from source customer

In [0]:
# Imports
from pyspark.sql.functions import *

In [0]:
# Parameters called from the parent notebook
dbutils.widgets.text("init_load_flag", "1")
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

In [0]:
df  = spark.table("db_dlt_proj.silver.customers")

# Remove duplicated based on primary key
df = df.dropDuplicates(["customer_id"])

### Surrogate_key- All the values
A surrogate key is an artificial, system-generated unique identifier used in a database table instead of (or in addition to) a natural/business key.

In [0]:
df = df.withColumn("DImCustomerKey", monotonically_increasing_id() + lit(1))

### Dividing New and Old Records from the tale

In [0]:
if init_load_flag == 0:
    df_old = spark.sql("select DimCustomerKey, customer_id, create_date, update_date from db_dlt_proj.gold.dim_customers")
else:
    df_old = spark.sql('''select 0 DimCustomerKey, 0 customer_id, 0 create_date, 0 update_date
                                from db_dlt_proj.silver.customers where 1=0''')    # Here i need only schema for  he target gold layer table

### Renaming Column of df_old

In [0]:
df_old = df_old.withColumnRenamed("DimCustomerKey", "old_DimCustomerKey")\
               .withColumnRenamed("customer_id", "old_customer_id")\
               .withColumnRenamed("create_date", "old_create_date")\
               .withColumnRenamed("update_date", "old_update_date")

### Applying join with the old records

In [0]:
df_join = df.join(df_old, df.customer_id == df_old.old_customer_id, "left")

### Seperating New and Old Records

In [0]:
df_new = df_join.filter(df_join["old_DimCustomerKey"].isNull())
df_old = df_join.filter(df_join["old_DimCustomerKey"].isNotNull())

### Preparing df_old

In [0]:
# Dropping all the columns which are not required
df_old = df_old.drop('old_DimCustomerKey','old_customer_id','old_update_date')

# Renaming "old_create_date" column to "create_date"
df_old = df_old.withColumnRenamed("old_create_date", "create_date")\
                .withColumn("create_date", to_timestamp(col("create_date")))

# Recreating "update_date" column to make sure that we are saying that this records is processed today
df_old = df_old.withColumn("update_date", current_timestamp())

### Preparing df_new

**Surrogate Key - From 1**

In [0]:
df_new = df_new.withColumn("DimCustomerKey", monotonically_increasing_id() + lit(1))

# Adding last run maximumn suroggate key valye to the incremental load
if init_load_flag == 1:
    max_surrogate_key = 0
else:
    max_surrogate_key = df_old.select(max("DimCustomerKey")).collect()[0][0]    # Extracting maximum_surrogate_key from the table values

df_new = df_new.withColumn("DimCustomerKey", lit(max_surrogate_key)+col("DimCustomerKey"))

In [0]:
# Dropping all the columns which are not required
df_new = df_new.drop('old_DimCustomerKey','old_customer_id','old_update_date','old_create_date')

# Recreating "create_date", "update_date columns with current_timestamp
df_new = df_new.withColumns({"create_date": current_timestamp(), "update_date": current_timestamp()})

### Union of df_old and df_new

In [0]:
df = df_old.unionByName(df_new)

## Upsert command(SCD type-1)

In [0]:
from delta.tables import DeltaTable
# # We can use init flag as well
# if init_load_flag == 1:
#     df.write.mode("overwrite").saveAsTable("db_dlt_proj.gold.customers")
# else:
#     df.write.mode("append").saveAsTable("db_dlt_proj.gold.customers")

# OR cheking that table is exist in the unity catalog or not
if spark.catalog.tableExists("db_dlt_proj.gold.dim_customers"):
    target_table = DeltaTable.forName(
        spark,
        "db_dlt_proj.gold.dim_customers"
    )
    (
        target_table.alias("target")
        .merge(
            df.alias("source"),
            "target.DimCustomerKey = source.DimCustomerKey"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )
else:
    df.write.mode("overwrite").saveAsTable("db_dlt_proj.gold.dim_customers")
    df.write.mode("overwrite").format("delta").save("/Volumes/db_dlt_proj/ext_location/gold_external_location/dim_customers")