# CREATE FLAG PARAMETER

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.widgets.text('incremental_flag','0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')
print(type(incremental_flag))

# CREATING DIMENION DEALER

In [0]:
df_src = spark.sql("""
    SELECT *
    FROM PARQUET.`abfss://silver@carsneelabhdatalake.dfs.core.windows.net/carsales`
""")

df_src.display()

# FETCH RELATIVE COLUMNS

In [0]:
df_src = spark.sql("""
    SELECT DISTINCT (DEALER_ID) AS DEALER_ID , DEALERNAME
    FROM PARQUET.`abfss://silver@carsneelabhdatalake.dfs.core.windows.net/carsales`
""")


In [0]:
df_src.display()

# DIM DEALER SINK INITIAL AND INCREMENTAL

In [0]:
if  spark.catalog.tableExists('cars_catalog.gold.dim_dealer'):
    df_sink = spark.sql("""
        SELECT  dim_dealer_key, DEALER_ID, DEALERNAME
        FROM cars_catalog.gold.dim_dealer
    """)
else:
    df_sink = spark.sql("""
        SELECT 1 AS dim_dealer_key, DEALER_ID, DEALERNAME
        FROM PARQUET.`abfss://silver@carsneelabhdatalake.dfs.core.windows.net/carsales`
        WHERE 1 = 0
    """)


In [0]:
df_sink.display()

# FILTERING NEW RECORDS AND OLD RECORDS

In [0]:
df_filter = df_src.join(df_sink, df_src['DEALER_ID'] == df_sink['DEALER_ID'], 'left').select(df_src['DEALER_ID'], df_src['DEALERNAME'],df_sink['dim_dealer_key'])


In [0]:
df_filter.display()

# Fetching exsisting records

In [0]:
df_filter_old = df_filter.filter(col('dim_dealer_key').isNotNull())

In [0]:
df_filter_old.display()

# Fetching all new records

In [0]:
df_filter_new = df_filter.filter(col('dim_dealer_key').isNull()).select(df_src['DEALER_ID'], df_src['DEALERNAME'])

In [0]:
df_filter_new.display()

### Creating Surroage Key

# Using Flag_Increment to fetch max Surrogate key

In [0]:
if incremental_flag == '0':
    max_value = 1
else:
    max_value_df = spark.sql("select max(dim_dealer_key) from cars_catalog.gold.dim_dealer")
    max_value = max_value_df.collect()[0][0]

### Create surroaget key column and add the max surrogate key

In [0]:
df_filter_new = df_filter_new.withColumn('dim_dealer_key', max_value+monotonically_increasing_id())


In [0]:
df_filter_new.display()

# Create Final DF -df_filter_old + df_filter_new

In [0]:
df_final = df_filter_new.union(df_filter_old)

In [0]:
df_final.display()

# SCD TYPE - 1(UPSERT)

In [0]:
from delta.tables import DeltaTable

In [0]:
#This condition is for our Incremental run case
if spark.catalog.tableExists('cars_catalog.gold.dim_dealer'):
     delta_table = DeltaTable.forPath(spark, 'abfss://gold@carsneelabhdatalake.dfs.core.windows.net/dim_dealer')

     delta_table.alias("trg").merge(df_final.alias("src"),"trg.dim_dealer_key=src.dim_dealer_key")\
        .whenMatchedUpdateAll()\
            .whenNotMatchedInsertAll()\
                    .execute()

#This is our initial run case
else : 
    df_final.write.format("delta")\
        .mode('overwrite')\
            .option("path","abfss://gold@carsneelabhdatalake.dfs.core.windows.net/dim_dealer")\
                .saveAsTable("cars_catalog.gold.dim_dealer")


In [0]:
%sql
select * from cars_catalog.gold.dim_dealer