#### FLAG PARAMETER


**Create a flag whether it is first run or Incremental Run**

_Set Incremental parameter as 0_

In [0]:
dbutils.widgets.text('Incremental_flag','0')

In [0]:
incremental_flag=dbutils.widgets.get('Incremental_flag')
print(incremental_flag)
print(type(incremental_flag))

### Creating Dimension Model

**Read files from Silver Layer from ADLS**

In [0]:
%sql

select * from parquet.`abfss://silver@claimcraftadls.dfs.core.windows.net/Claimcraft_Cleaned_data`

#### Create Patient-Zip Dimension

In [0]:
df_src = spark.sql(
    """
select distinct
  Patient_Zip
from
  parquet.`abfss://silver@claimcraftadls.dfs.core.windows.net/Claimcraft_Cleaned_data`
  where Patient_Zip not in ( 'null','Null','None','none','NULL')
"""
)

In [0]:
display(df_src)

### Dim_Patient-Zip_Sink - initital & Incremental

#### Create Schema 
`Where 1=0` is means the condition is false and always bring only schema and not data

**Now apply the condition- if the firsttime load (bring schema),              
if table and data available bring the values**

In [0]:
# If table already available then we can directly read the table
if spark.catalog.tableExists("pharma_claim_craft.claim_craft_gold.dim_patient_zip"):
    # When the actual data brings from table we dnt need to mention where 1=0 and 1 as Dim_key
    df_sink = spark.sql(
        """ 
                    select Dim_Patient_Key,Patient_Zip from `pharma_claim_craft`.claim_craft_gold.dim_patient_zip"""
    )
else:
    # If we dont have the table then we need to create dummy value as 1 as Dim_key and add where 1=0
    df_sink = spark.sql(
        """
                    select 1 as Dim_Patient_Key,Patient_Zip from parquet.`abfss://silver@claimcraftadls.dfs.core.windows.net/Claimcraft_Cleaned_data`
                    where 1=0
                    """
    )

In [0]:
df_sink.display()

### Filtering Old and New records

**Apply left join between Src and sink data**

In [0]:
df_filter=df_src.join(df_sink,df_src['Patient_Zip']==df_sink['Patient_Zip'],'left').select(df_src['Patient_Zip'],df_sink['Dim_Patient_Key'])
df_filter.display()

**(upsert)- Based on the condition-if dimkey null it is consider new record - need to generate Surogate Key. If not null this is consider as old record and it will be updated.**



**DF_FILTER_OLD**

In [0]:
df_filter_old=df_filter.filter(df_filter['Dim_Patient_Key'].isNotNull())
df_filter_old.display()

**DF_FILTER_NEW**

Not Required the Surrogate Key column as it has no value in new records


In [0]:
df_filter_new=df_filter.filter(df_filter['Dim_Patient_Key'].isNull()).select(df_src['Patient_Zip'])
df_filter_new.display()


### Create Surrogate Key

**Fetch the Max Surrogate value to do the incremental**

First/History load - Max will be set as 0 
Incremental load - Max value will be taken from the table (Dim_Key) column and add the incremental value

In [0]:
if incremental_flag == "0":
    max_value = 0

else:
    max_value_df = spark.sql(
        """
    select max(Dim_Patient_Key) as max_value from pharma_claim_craft.claim_craft_gold.dim_patient_zip"""
    )
    max_value = max_value_df.collect()[0][0]

print(max_value)

**Add the Surogate Key value in Table - Dim_Key Column - in new records**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
df_filter_new=df_filter_new.withColumn('Dim_Patient_Key',max_value+monotonically_increasing_id()+1)
display(df_filter_new)


#### Create Final DF

In [0]:
df_product_final=df_filter_old.union(df_filter_new)
df_product_final.display()


%md
### SCD - Slowly Changing Dimension Type- 1 (Upsert)

**Upsert = Update+Insert**

In [0]:
from delta.tables import DeltaTable

In [0]:
# Incremental
if spark.catalog.tableExists("pharma_claim_craft.claim_craft_gold.dim_patient_zip"):
    # create the deltatable obj
    deltatbl = DeltaTable.forPath(
        spark, "abfss://gold@claimcraftadls.dfs.core.windows.net/dim_patient_zip"
    )
    # upsert concept - Merge
    deltatbl.alias("trg").merge(
        df_product_final.alias("src"),
        "trg.Dim_Patient_Key=src.Dim_Patient_Key"
        # when ID matches- Updates and If not matches insert
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
# Initial Load- Insert
else:
    df_product_final.write.mode("overwrite").format("delta").option(
        "path", "abfss://gold@claimcraftadls.dfs.core.windows.net/dim_patient_zip"
    ).saveAsTable("pharma_claim_craft.claim_craft_gold.dim_patient_zip")

In [0]:
%sql

select * from `pharma_claim_craft`.claim_craft_gold.dim_patient_zip