# Creating Flag Parameter

In [0]:
dbutils.widgets.text('incremental_flag','0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')
print(incremental_flag)

0


# Creating Dimensions Model

### Fetch related column

In [0]:
df_src = spark.sql('''
select distinct Date_Id
from parquet.`abfss://silver@learningdatabrickcar.dfs.core.windows.net/carssales`
''')

In [0]:
df_src.display()

Date_Id
DT00029
DT00140
DT00192
DT00444
DT00475
DT00947
DT00976
DT01028
DT01099
DT00657


### dim_mode sink initil and incremental

In [0]:
if spark.catalog.tableExists('cars_catlog.gold.dim_date'):
    df_sink = spark.sql('''
    select  dim_date_key,date_ID from 
    cars_catlog.gold.dim_date
    ''')

else:
    df_sink = spark.sql('''
    select 1 as  dim_date_key,date_ID from 
    parquet.`abfss://silver@learningdatabrickcar.dfs.core.windows.net/carssales`
    where 1=0 ''')

In [0]:
df_sink.display()

dim_date_key,date_ID
1,DT00029
2,DT00140
3,DT00192
4,DT00444
5,DT00475
6,DT00947
7,DT00976
8,DT01028
9,DT01099
10,DT00657


### Filtering new recods and old records 

In [0]:
df_filter = df_src.join(df_sink,df_src.Date_Id==df_sink.date_ID,'left').select(df_src['date_id'],df_sink['dim_date_key'])

In [0]:
df_filter.display()

date_id,dim_date_key
DT00029,1
DT00140,2
DT00192,3
DT00444,4
DT00475,5
DT00947,6
DT00976,7
DT01028,8
DT01099,9
DT00657,10


**df_filter_old**

In [0]:
from pyspark.sql.functions import *

In [0]:
df_filter_old = df_filter.filter(col('dim_date_key').isNotNull())

**df_filter_new**

In [0]:
df_filter_new = df_filter.filter(col('dim_date_key').isNull()).select(df_src['date_Id'])

In [0]:
df_filter_new.display()

date_Id


### Create surrogate Key

**Fetch max Surrogate key from existing table**

In [0]:
if (incremental_flag == '0'):
    max_value = 1
else:
    max_value_df = spark.sql(''' select max(dim_date_key)  from cars_catlog.gold.dim_date''')
    max_value = max_value_df.collect()[0][0] + 1

**Create surroget key column and ADD the max surrogate key**

In [0]:
df_filter_new = df_filter_new.withColumn('dim_date_key',max_value + monotonically_increasing_id())

In [0]:
df_filter_new.display()

date_Id,dim_date_key


### Create final df = df_filter_new + df_filter_old

In [0]:
df_final = df_filter_new.union(df_filter_old)

# SCD TYPE - 1 (UPSERT)

In [0]:
from delta.tables import DeltaTable

In [0]:
# Incremental Run
if spark.catalog.tableExists('cars_catlog.gold.dim_date'):
    delta_tbl = DeltaTable.forPath(spark, 'abfss://gold@learningdatabrickcar.dfs.core.windows.net/dim_date')

    delta_tbl.alias('trg').merge(df_final.alias('src'),'trg.dim_date_key = src.dim_date_key')\
                .whenMatchedUpdateAll()\
                .whenNotMatchedInsertAll()\
                .execute()


# Initial Run 
else:
    df_final.write.format("delta")\
        .mode('overwrite')\
        .option('path','abfss://gold@learningdatabrickcar.dfs.core.windows.net/dim_date')\
        .saveAsTable('cars_catlog.gold.dim_date')


In [0]:
%sql
select * from cars_catlog.gold.dim_date;

date_Id,dim_date_key
DT00029,1
DT00140,2
DT00192,3
DT00444,4
DT00475,5
DT00947,6
DT00976,7
DT01028,8
DT01099,9
DT00657,10
