In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *

###**Create Parameters**

In [0]:
'''
#To Create dbutils

#key Columns
dbutils.widgets.text("keycols","")

val = dbutils.widgets.get("keycols")
eval(val)

#CDC Column
dbutils.widgets.text("cdccol","")

#backdated refresh
dbutils.widgets.text("backdated_refresh","")

#Source Object
dbutils.widgets.text("source_object","")

#Source schema
dbutils.widgets.text("source_schema","")
'''

### Fetching Parameters and Creating Variables

In [0]:
'''
# # Catalog Name
catalog = "workspace"

# # Key Cols List
key_cols = "['flight_id']"
key_cols_list = eval(key_cols)

# # CDC Column
cdc_col = "modifiedDate"

# # Backdated Refresh
backdated_refresh = ""

# # Source Object
source_object = "silver_flights"

# # Source Schema
source_schema = "silver"

# # Target Schema 
target_schema = "gold"

# # Target Object 
target_object = "dimflights"

# # Surrogate Key
surrogate_key = "dimflightsKey"
'''


In [0]:

# # Catalog Name
catalog = "workspace"

# # Key Cols List
key_cols = "['airport_id']"
key_cols_list = eval(key_cols)

# # CDC Column
cdc_col = "modifiedDate"

# # Backdated Refresh
backdated_refresh = ""

# # Source Object
source_object = "silver_airports"

# # Source Schema
source_schema = "silver"

# # Target Schema 
target_schema = "gold"

# # Target Object 
target_object = "dimairports"

# # Surrogate Key
surrogate_key = "dimairportsKey"


In [0]:
'''
# Catalog Name
catalog = "workspace"

# Key Cols List
key_cols = "['passenger_id']"
key_cols_list = eval(key_cols)

# CDC Column
cdc_col = "modifiedDate"

# Backdated Refresh
backdated_refresh = ""

# Source Object
source_object = "silver_customers"

# Source Schema
source_schema = "silver"

# Target Schema 
target_schema = "gold"

# Target Object 
target_object = "dimcustomers"

# Surrogate Key
surrogate_key = "dimcustomersKey"
'''

## **INCREMENTAL DATA INGESTION**

#### **Last Load Date**

In [0]:
# No Back Dated Refresh
if len(backdated_refresh) == 0:
  
  # If Table Exists In The Destination
  if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):

    last_load = spark.sql(f"SELECT max({cdc_col}) FROM workspace.{target_schema}.{target_object}").collect()[0][0]
    
  else:
    last_load = "1900-01-01 00:00:00"

# Yes Back Dated Refresh
else:
  last_load = backdated_refresh

# Test The Last Load 
last_load

In [0]:
df_src = spark.sql(f"SELECT * FROM {source_schema}.{source_object} WHERE {cdc_col} > '{last_load}'")

In [0]:
df_src.display()

In [0]:
'''
#Code Checks:

key_cols = "['flight_id','id2']"
key_cols_list = eval(key_cols)

#" ".join(key_cols_list) #output: 'flight_id id2'

", ".join(key_cols_list) #If we have more than 1 key column : Output: 'flight_id, id2'
'''

## OLD vs NEW RECORDS

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"): 

  # Key Columns String For Incremental
  key_cols_string_incremental = ", ".join(key_cols_list)

  #Bring below columns from Target table and store it in df_trg dataframe
  df_trg = spark.sql(f"""SELECT {key_cols_string_incremental}, {surrogate_key}, create_date, update_date 
                      FROM {catalog}.{target_schema}.{target_object}""")


else: #Apply join with Empty Table(which will only have schema with 0 records)

  # Key Columns String For Initial
  key_cols_string_init = [f"'' AS {i}" for i in key_cols_list]
  key_cols_string_init = ", ".join(key_cols_string_init)
  
  df_trg = spark.sql(f"""SELECT {key_cols_string_init}, CAST('0' AS INT) AS {surrogate_key}, CAST('1900-01-01 00:00:00' AS timestamp) AS create_date, CAST('1900-01-01 00:00:00' AS timestamp) AS update_date WHERE 1=0""")

In [0]:
df_trg.display()

#In the First run No rows will be returned since target table will have no records

In [0]:
'''
#Code Checks:
key_cols = "['flight_id','id2']"
key_cols_list = eval(key_cols)

#[f"src.{i} = trg.{i}" for i in key_cols_list] #Output: ['src.flight_id = trg.flight_id', 'src.id2 = trg.id2']

' AND '.join(['src.flight_id = trg.flight_id', 'src.id2 = trg.id2']) #Output: 'src.flight_id = trg.flight_id AND src.id2 = trg.id2'
'''

**JOIN CONDITION**

In [0]:
join_condition = ' AND '.join([f"src.{i} = trg.{i}" for i in key_cols_list])

In [0]:
df_src.createOrReplaceTempView("src")
df_trg.createOrReplaceTempView("trg")

df_join = spark.sql(f"""
            SELECT src.*, 
                   trg.{surrogate_key},
                   trg.create_date,
                   trg.update_date
            FROM src
            LEFT JOIN trg
            ON {join_condition}
            """)

In [0]:
df_join.display()

#In the Initial load, columns pulled from target table will have NULLS.
#If there are nulls in the Target table columns then those are new records which needs to be loaded to target table. 
#If there are records then those target columns then those are existing target table records which needs to be updated

In [0]:
# OLD RECORDS
df_old = df_join.filter(col(f'{surrogate_key}').isNotNull())

# NEW RECOERDS
df_new = df_join.filter(col(f'{surrogate_key}').isNull())

In [0]:
df_old.display()
#During the initial load, the target table will have no records. so, no rows will be returned

#df_new.display()
#During Initial load, we see all records as new records which needs to be loaded into target table

## **ENRICHING DFS**

#### **Preparing DF_OLD**

In [0]:
df_old_enr = df_old.withColumn('update_date', current_timestamp()) #To update old records when they are updated

In [0]:
df_old_enr.display()

#### **Preparing DF_NEW**

In [0]:
df_new.display()

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"): 
    max_surrogate_key = spark.sql(f"""
                            SELECT max({surrogate_key}) FROM {catalog}.{target_schema}.{target_object}
                        """).collect()[0][0]
    df_new_enr = df_new.withColumn(f'{surrogate_key}', lit(max_surrogate_key)+lit(1)+monotonically_increasing_id())\
                    .withColumn('create_date', current_timestamp())\
                    .withColumn('update_date', current_timestamp())    

else:
    max_surrogate_key = 0
    df_new_enr = df_new.withColumn(f'{surrogate_key}', lit(max_surrogate_key)+lit(1)+monotonically_increasing_id())\
                    .withColumn('create_date', current_timestamp())\
                    .withColumn('update_date', current_timestamp())


In [0]:
max_surrogate_key

In [0]:
#Code Checks:
'''
spark.sql(f"""
SELECT max('flight_id') FROM workspace.silver.silver_flights
""").collect()[0][0] #output: 'flight_id'

#.collect() Output:[Row(max('flight_id')='flight_id')]
#.collect()[0] Output:Row(max('flight_id')='flight_id')
'''

In [0]:
df_old_enr.display()
#df_new_enr.display()

In [0]:
#Code Checks

df_old_enr.printSchema()
df_new_enr.printSchema()

## **Unioning OLD AND NEW RECORDS**

In [0]:
df_union = df_old_enr.unionByName(df_new_enr)

In [0]:
df_union.display()

## **UPSERT**

In [0]:
from delta.tables import DeltaTable 

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):

    dlt_obj = DeltaTable.forName(spark, f"{catalog}.{target_schema}.{target_object}")
    dlt_obj.alias("trg").merge(df_union.alias("src"), f"trg.{surrogate_key} = src.{surrogate_key}")\
                        .whenMatchedUpdateAll(condition = f"src.{cdc_col} >= trg.{cdc_col}")\
                        .whenNotMatchedInsertAll()\
                        .execute()

else: 

    df_union.write.format("delta")\
            .mode("append")\
            .saveAsTable(f"{catalog}.{target_schema}.{target_object}")


In [0]:
%sql
select * from workspace.gold.dimflights;--110(First Run); --113 rows(Scd Load)

In [0]:
%sql
select * from workspace.gold.dimairports;--55(First Run); --58 rows (scd Load)

In [0]:
%sql
select * from workspace.gold.dimcustomers;--220(First Run); --225 rows(Scd Load)

In [0]:
%sql
select * from workspace.gold.dimcustomers where passenger_id='P0049';