In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

# Create Flag Parameter

In [0]:
dbutils.widgets.text('incremental_flag', '0')

In [0]:
incremental_flag = dbutils.widgets.get('incremental_flag')
print(incremental_flag)

0


In [0]:
print(type(incremental_flag))

<class 'str'>


# Creating Dimension Model

### Fetch Relative Columns

In [0]:
df_src = spark.sql('''
    SELECT * 
    FROM parquet.`abfss://silver@tahirbucket.dfs.core.windows.net/carsales`
''')
df_src.printSchema()

root
 |-- branch_id: string (nullable = true)
 |-- dealer_id: string (nullable = true)
 |-- model_id: string (nullable = true)
 |-- revenue: long (nullable = true)
 |-- units_sold: long (nullable = true)
 |-- date_id: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- branch_name: string (nullable = true)
 |-- dealer_name: string (nullable = true)
 |-- model_category: string (nullable = true)
 |-- rev_per_unit: double (nullable = true)



In [0]:
df_src = spark.sql('''
    SELECT DISTINCT(date_id) as date_id 
    FROM parquet.`abfss://silver@tahirbucket.dfs.core.windows.net/carsales`
''')

In [0]:
df_src.display()

### `dim_date` Sink - Initial & Incremental
**(Just bring the schema if table not exist)**

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_date'):
    df_sink = spark.sql('''
        SELECT dim_date_key, date_id
        FROM cars_catalog.gold.dim_date
    ''')
else:
    df_sink = spark.sql('''
        SELECT 1 AS dim_date_key, date_id
        FROM PARQUET.`abfss://silver@tahirbucket.dfs.core.windows.net/carsales`
        WHERE 1 = 0
    ''')

In [0]:
df_sink.display()

### Filtering new records & old records

In [0]:
# df_src (left table) LEFT JOIN df_sink (right table) ON df_src.model_id = df_sink.model_id
df_filter = df_src.join(df_sink, df_src['date_id'] == df_sink['date_id'], 'left') \
            .select(df_src['date_id'], df_sink['dim_date_key'])

In [0]:
df_filter.display()

**df_filter_old**

In [0]:
df_filter_old = df_filter \
                .filter(col('dim_date_key').isNotNull())

In [0]:
df_filter_old.display()

**df_filter_new**

In [0]:
df_filter_new = df_filter \
                .filter(col('dim_date_key').isNull()) \
                .select(df_src['date_id'])

In [0]:
df_filter_new.display()

### Create Surrogate Key

**Fetch the max surrogate key from existing table**

In [0]:
if (incremental_flag == '0'):
    max_value = 1
else:
    max_value_df = spark.sql("SELECT MAX(dim_date_key) FROM cars_catalog.gold.dim_date")
    max_value = max_value_df.collect()[0][0] + 1

**Create surrogate key column and ADD the max surrogate key**

In [0]:
df_filter_new = df_filter_new.withColumn('dim_date_key', max_value + monotonically_increasing_id())

In [0]:
df_filter_new.display()

### Create Final df: df_filter_old + df_filter_new

In [0]:
df_final = df_filter_new.union(df_filter_old)

In [0]:
df_final.display()

# SCD Type - 1 (UPSERT)

In [0]:
# Incremental Run
if spark.catalog.tableExists('cars_catalog.gold.dim_date'):
    # Remove duplicates from the source DataFrame
    df_final_deduped = df_final.dropDuplicates(['dim_date_key'])
    
    delta_tbl = DeltaTable.forPath(spark, 'abfss://gold@tahirbucket.dfs.core.windows.net/dim_date')
    delta_tbl.alias('target') \
        .merge(df_final_deduped.alias('source'), 'target.dim_date_key = source.dim_date_key') \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
else:
    # Initial Run
    df_final.write.format('delta') \
        .mode('overwrite') \
        .option('path', 'abfss://gold@tahirbucket.dfs.core.windows.net/dim_date') \
        .saveAsTable('cars_catalog.gold.dim_date')

In [0]:
%sql
SELECT * FROM cars_catalog.gold.dim_date;