In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Create Flag Parameter

In [0]:
dbutils.widgets.text('Incremental_flag','0')

In [0]:
Incremental_flag = dbutils.widgets.get('Incremental_flag')
print(type(Incremental_flag))

<class 'str'>


# Creating Dimensional Model

### Fetch Relative Columns

In [0]:
df_src = spark.sql('''
    SELECT DISTINCT(Dealer_ID) as Dealer_ID, DealerName 
    FROM parquet. `abfss://silver@carsalesadls.dfs.core.windows.net/carsales`
    ''')

In [0]:
df_src.display()

Dealer_ID,DealerName
DLR0058,Fiat do Brasil Motors
DLR0107,Land Rover Motors
DLR0129,Mia Motors
DLR0111,Lotus Motors
DLR0085,Humber Motors
DLR0001,AC Cars Motors
DLR0218,Lagonda Motors
DLR0082,Honda Motors
DLR0063,Ford do Brasil Motors
DLR0193,Tazzari Motors


###dim_model Sink - Initial and Incremental (Just bring the schema if table not exists)

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_dealer'):

    df_sink = spark.sql('''
    SELECT dim_Dealer_key, Dealer_ID, DealerName
    FROM cars_catalog.gold.dim_dealer
    ''')

else:
    df_sink = spark.sql('''
    SELECT 1 as dim_Dealer_key, Dealer_ID, DealerName
    FROM parquet.`abfss://silver@carsalesadls.dfs.core.windows.net/carsales`
    WHERE 1=0
    ''')

### Filtering new records and old Records

In [0]:
df_filter = df_src.join(df_sink,df_src['Dealer_ID'] == df_sink['Dealer_ID'],'left').select(df_src['Dealer_ID'],df_src['DealerName'],df_sink['dim_dealer_key'])

In [0]:
df_filter.display()

Dealer_ID,DealerName,dim_dealer_key
DLR0058,Fiat do Brasil Motors,1.0
DLR0107,Land Rover Motors,2.0
DLR0129,Mia Motors,3.0
DLR0111,Lotus Motors,4.0
DLR0085,Humber Motors,5.0
DLR0001,AC Cars Motors,6.0
DLR0218,Lagonda Motors,7.0
DLR0082,Honda Motors,8.0
DLR0063,Ford do Brasil Motors,9.0
DLR0193,Tazzari Motors,10.0


### df_filter_old

In [0]:
df_filter_old = df_filter.filter(col('dim_dealer_key').isNotNull())

In [0]:
df_filter_old.display()

Dealer_ID,DealerName,dim_dealer_key
DLR0058,Fiat do Brasil Motors,1
DLR0107,Land Rover Motors,2
DLR0129,Mia Motors,3
DLR0111,Lotus Motors,4
DLR0085,Humber Motors,5
DLR0001,AC Cars Motors,6
DLR0218,Lagonda Motors,7
DLR0082,Honda Motors,8
DLR0063,Ford do Brasil Motors,9
DLR0193,Tazzari Motors,10


### df_filter_new

In [0]:
df_filter_new = df_filter.filter(col('dim_dealer_key').isNull()).select(df_filter['Dealer_ID'],df_filter['DealerName'])

In [0]:
df_filter_new.display()

Dealer_ID,DealerName
XYZ0063,Datafam Dealers
XYZ0059,AI Powered Autos


# Create Surrogate Key

**Fetch the max Surrogate Key from existing table**

In [0]:
if (Incremental_flag == '0'):
    max_value = 1
else:
    max_value_df = spark.sql("SELECT max(dim_dealer_key) FROM cars_catalog.gold.dim_dealer")
    max_value = max_value_df.collect()[0][0]+1

**Create surrogate key column ADD the max surrogate key**

In [0]:
df_filter_new = df_filter_new.withColumn('dim_dealer_key', max_value + monotonically_increasing_id())

In [0]:
df_filter_new.display()

Dealer_ID,DealerName,dim_dealer_key
XYZ0063,Datafam Dealers,1
XYZ0059,AI Powered Autos,2


### Create Final DF - df_filter_old + df_filter_new

In [0]:
df_final= df_filter_new.union(df_filter_old)
df_final.display()

Dealer_ID,DealerName,dim_dealer_key
XYZ0063,Datafam Dealers,1
XYZ0059,AI Powered Autos,2
DLR0058,Fiat do Brasil Motors,1
DLR0107,Land Rover Motors,2
DLR0129,Mia Motors,3
DLR0111,Lotus Motors,4
DLR0085,Humber Motors,5
DLR0001,AC Cars Motors,6
DLR0218,Lagonda Motors,7
DLR0082,Honda Motors,8


# SCD TYPE 1 (UPSERT)

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists('cars_catalog.gold.dim_dealer'):
    delta_tbl = DeltaTable.forPath(spark, "abfss://gold@carsalesadls.dfs.core.windows.net/dim_dealer")

    # Remove duplicates from the source DataFrame
    df_final = df_final.dropDuplicates(["dim_dealer_key"])

    delta_tbl.alias("trg").merge(
        df_final.alias("src"),
        "trg.dim_dealer_key = src.dim_dealer_key"
    ).whenMatchedUpdateAll()\
     .whenNotMatchedInsertAll()\
     .execute()

# Initial RUN
else:
    df_final.write.format("delta")\
        .mode("overwrite")\
        .option("path", "abfss://gold@carsalesadls.dfs.core.windows.net/dim_dealer")\
        .saveAsTable("cars_catalog.gold.dim_dealer")

In [0]:
%sql
SELECT * FROM cars_catalog.gold.dim_dealer

Dealer_ID,DealerName,dim_dealer_key
BR0854,"Austin, Rover Motors",268
BR0984,Dacia Motors,269
BR0990,Standard-Triumph Motors,270
BR1554,Iveco Motors,271
BR1682,Panoz Motors,272
BR1764,Marlin Motors,273
BR2115,Mitsubishi Motors,274
BR2152,Toyota Motors,275
BR2404,Singer Motors,276
BR0502,Caterham Motors,277
