In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *

### **Parameters**

In [0]:
# # Catalog Name
# catalog = "workspace"

# # Key Cols List
# key_cols = "['flight_id']"
# key_cols_list = eval(key_cols)

# # CDC Column
# cdc_col = "modifiedDate"

# # Backdated Refresh
# backdated_refresh = ""

# # Source Object
# source_object = "silver_flights"

# # Source Schema
# source_schema = "silver"

# # Target Schema 
# target_schema = "gold"

# # Target Object 
# target_object = "DimFlights"

# # Surrogate Key
# surrogate_key = "DimFlightsKey"


In [0]:
# # Catalog Name
# catalog = "workspace"

# # Key Cols List
# key_cols = "['airport_id']"
# key_cols_list = eval(key_cols)

# # CDC Column
# cdc_col = "modifiedDate"

# # Backdated Refresh
# backdated_refresh = ""

# # Source Object
# source_object = "silver_airports"

# # Source Schema
# source_schema = "silver"

# # Target Schema 
# target_schema = "gold"

# # Target Object 
# target_object = "DimAirports"

# # Surrogate Key
# surrogate_key = "DimAirportsKey"


In [0]:
# Catalog Name
catalog = "workspace"

# Key Cols List
key_cols = "['passenger_id']"
key_cols_list = eval(key_cols)

# CDC Column
cdc_col = "modifiedDate"

# Backdated Refresh
backdated_refresh = ""

# Source Object
source_object = "silver_passengers"

# Source Schema
source_schema = "silver"

# Target Schema 
target_schema = "gold"

# Target Object 
target_object = "DimPassengers"

# Surrogate Key
surrogate_key = "DimPassengersKey"


## **INCREMENTAL DATA INGESTION**

#### **Last Load Date**

In [0]:
# No Back Dated Refresh
if len(backdated_refresh) == 0:
  
  # If Table Exists In The Destination
  if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):

    last_load = spark.sql(f"SELECT max({cdc_col}) FROM workspace.{target_schema}.{target_object}").collect()[0][0]
    
  else:
    last_load = "1900-01-01 00:00:00"

# Yes Back Dated Refresh
else:
  last_load = backdated_refresh

# Test The Last Load 
last_load

datetime.datetime(2025, 6, 22, 23, 33, 29, 533000)

In [0]:
df_src = spark.sql(f"SELECT * FROM {source_schema}.{source_object} WHERE {cdc_col} > '{last_load}'")

In [0]:
df_src.display()

passenger_id,name,gender,nationality,modifiedDate
P0223,Nicholas Gomez,Female,Cook Islands,2025-06-23T23:38:01.852Z
P0224,Jason Jensen,Female,Rwanda,2025-06-23T23:38:01.852Z
P0222,Maria Taylor,Male,Lao People's Democratic Republic,2025-06-23T23:38:01.852Z
P0225,William Lopez,Male,Heard Island and McDonald Islands,2025-06-23T23:38:01.852Z
P0221,Amy Welch,Male,Croatia,2025-06-23T23:38:01.852Z
P0032,Daniel Douglas,Male,Singapore,2025-06-23T23:38:01.852Z
P0106,Joshua Shepherd Jr.,Female,Netherlands,2025-06-23T23:38:01.852Z
P0108,Brian Anderson,Female,Eritrea,2025-06-23T23:38:01.852Z
P0154,Angel Thompson,Female,Gambia,2025-06-23T23:38:01.852Z
P0149,Katherine Bowen DVM,Male,Mexico,2025-06-23T23:38:01.852Z


## OLD vs NEW RECORDS

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"): 

  # Key Columns String For Incremental
  key_cols_string_incremental = ", ".join(key_cols_list)

  df_trg = spark.sql(f"""SELECT {key_cols_string_incremental}, {surrogate_key}, create_date, update_date 
                      FROM {catalog}.{target_schema}.{target_object}""")


else:

  # Key Columns String For Initial
  key_cols_string_init = [f"'' AS {i}" for i in key_cols_list]
  key_cols_string_init = ", ".join(key_cols_string_init)
  
  df_trg = spark.sql(f"""SELECT {key_cols_string_init}, CAST('0' AS INT) AS {surrogate_key}, CAST('1900-01-01 00:00:00' AS timestamp) AS          create_date, CAST('1900-01-01 00:00:00' AS timestamp) AS update_date WHERE 1=0""")

In [0]:
df_trg.display()

passenger_id,DimPassengersKey,create_date,update_date
P0002,1,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0004,2,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0007,3,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0014,4,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0023,5,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0033,6,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0049,7,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0080,8,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0120,9,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0134,10,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z


**JOIN CONDITION**

In [0]:
join_condition = ' AND '.join([f"src.{i} = trg.{i}" for i in key_cols_list])

In [0]:
df_src.createOrReplaceTempView("src")
df_trg.createOrReplaceTempView("trg")

df_join = spark.sql(f"""
            SELECT src.*, 
                   trg.{surrogate_key},
                   trg.create_date,
                   trg.update_date
            FROM src
            LEFT JOIN trg
            ON {join_condition}
            """)

In [0]:
df_join.display()

passenger_id,name,gender,nationality,modifiedDate,DimPassengersKey,create_date,update_date
P0155,Robert Singleton,Female,Northern Mariana Islands,2025-06-23T23:38:01.852Z,82.0,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0106,Joshua Shepherd Jr.,Female,Netherlands,2025-06-23T23:38:01.852Z,197.0,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0032,Daniel Douglas,Male,Singapore,2025-06-23T23:38:01.852Z,46.0,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0154,Angel Thompson,Female,Gambia,2025-06-23T23:38:01.852Z,110.0,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0196,Monica Dillon,Female,Belarus,2025-06-23T23:38:01.852Z,86.0,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0049,Justin Thomas,Female,Tokelau,2025-06-23T23:38:01.852Z,7.0,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0108,Brian Anderson,Female,Eritrea,2025-06-23T23:38:01.852Z,36.0,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0113,Brian Richardson,Male,Dominican Republic,2025-06-23T23:38:01.852Z,50.0,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0158,Jamie Mcneil,Male,Burundi,2025-06-23T23:38:01.852Z,76.0,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0149,Katherine Bowen DVM,Male,Mexico,2025-06-23T23:38:01.852Z,53.0,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z


In [0]:
# OLD RECORDS
df_old = df_join.filter(col(f'{surrogate_key}').isNotNull())

# NEW RECOERDS
df_new = df_join.filter(col(f'{surrogate_key}').isNull())

In [0]:
df_old.display()

passenger_id,name,gender,nationality,modifiedDate,DimPassengersKey,create_date,update_date
P0049,Justin Thomas,Female,Tokelau,2025-06-23T23:38:01.852Z,7,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0108,Brian Anderson,Female,Eritrea,2025-06-23T23:38:01.852Z,36,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0032,Daniel Douglas,Male,Singapore,2025-06-23T23:38:01.852Z,46,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0113,Brian Richardson,Male,Dominican Republic,2025-06-23T23:38:01.852Z,50,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0149,Katherine Bowen DVM,Male,Mexico,2025-06-23T23:38:01.852Z,53,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0158,Jamie Mcneil,Male,Burundi,2025-06-23T23:38:01.852Z,76,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0155,Robert Singleton,Female,Northern Mariana Islands,2025-06-23T23:38:01.852Z,82,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0196,Monica Dillon,Female,Belarus,2025-06-23T23:38:01.852Z,86,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0154,Angel Thompson,Female,Gambia,2025-06-23T23:38:01.852Z,110,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z
P0106,Joshua Shepherd Jr.,Female,Netherlands,2025-06-23T23:38:01.852Z,197,2025-06-23T23:30:44.248Z,2025-06-23T23:30:44.248Z


## **ENRICHING DFS**

#### **Preparing DF_OLD**

In [0]:
df_old_enr = df_old.withColumn('update_date', current_timestamp())

#### **Preparing DF_NEW**

In [0]:
df_new.display()

passenger_id,name,gender,nationality,modifiedDate,DimPassengersKey,create_date,update_date
P0225,William Lopez,Male,Heard Island and McDonald Islands,2025-06-23T23:38:01.852Z,,,
P0224,Jason Jensen,Female,Rwanda,2025-06-23T23:38:01.852Z,,,
P0223,Nicholas Gomez,Female,Cook Islands,2025-06-23T23:38:01.852Z,,,
P0222,Maria Taylor,Male,Lao People's Democratic Republic,2025-06-23T23:38:01.852Z,,,
P0221,Amy Welch,Male,Croatia,2025-06-23T23:38:01.852Z,,,


In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"): 
    max_surrogate_key = spark.sql(f"""
                            SELECT max({surrogate_key}) FROM {catalog}.{target_schema}.{target_object}
                        """).collect()[0][0]
    df_new_enr = df_new.withColumn(f'{surrogate_key}', lit(max_surrogate_key)+lit(1)+monotonically_increasing_id())\
                    .withColumn('create_date', current_timestamp())\
                    .withColumn('update_date', current_timestamp())    

else:
    max_surrogate_key = 0
    df_new_enr = df_new.withColumn(f'{surrogate_key}', lit(max_surrogate_key)+lit(1)+monotonically_increasing_id())\
                    .withColumn('create_date', current_timestamp())\
                    .withColumn('update_date', current_timestamp())


In [0]:
df_old_enr.display()

passenger_id,name,gender,nationality,modifiedDate,DimPassengersKey,create_date,update_date
P0049,Justin Thomas,Female,Tokelau,2025-06-23T23:38:01.852Z,7,2025-06-23T23:30:44.248Z,2025-06-23T23:42:12.129Z
P0108,Brian Anderson,Female,Eritrea,2025-06-23T23:38:01.852Z,36,2025-06-23T23:30:44.248Z,2025-06-23T23:42:12.129Z
P0032,Daniel Douglas,Male,Singapore,2025-06-23T23:38:01.852Z,46,2025-06-23T23:30:44.248Z,2025-06-23T23:42:12.129Z
P0113,Brian Richardson,Male,Dominican Republic,2025-06-23T23:38:01.852Z,50,2025-06-23T23:30:44.248Z,2025-06-23T23:42:12.129Z
P0149,Katherine Bowen DVM,Male,Mexico,2025-06-23T23:38:01.852Z,53,2025-06-23T23:30:44.248Z,2025-06-23T23:42:12.129Z
P0158,Jamie Mcneil,Male,Burundi,2025-06-23T23:38:01.852Z,76,2025-06-23T23:30:44.248Z,2025-06-23T23:42:12.129Z
P0155,Robert Singleton,Female,Northern Mariana Islands,2025-06-23T23:38:01.852Z,82,2025-06-23T23:30:44.248Z,2025-06-23T23:42:12.129Z
P0196,Monica Dillon,Female,Belarus,2025-06-23T23:38:01.852Z,86,2025-06-23T23:30:44.248Z,2025-06-23T23:42:12.129Z
P0154,Angel Thompson,Female,Gambia,2025-06-23T23:38:01.852Z,110,2025-06-23T23:30:44.248Z,2025-06-23T23:42:12.129Z
P0106,Joshua Shepherd Jr.,Female,Netherlands,2025-06-23T23:38:01.852Z,197,2025-06-23T23:30:44.248Z,2025-06-23T23:42:12.129Z


## **Unioning OLD AND NEW RECORDS**

In [0]:
df_union = df_old_enr.unionByName(df_new_enr)

In [0]:
df_union.display()

passenger_id,name,gender,nationality,modifiedDate,DimPassengersKey,create_date,update_date
P0049,Justin Thomas,Female,Tokelau,2025-06-23T23:38:01.852Z,7,2025-06-23T23:30:44.248Z,2025-06-23T23:42:13.357Z
P0108,Brian Anderson,Female,Eritrea,2025-06-23T23:38:01.852Z,36,2025-06-23T23:30:44.248Z,2025-06-23T23:42:13.357Z
P0032,Daniel Douglas,Male,Singapore,2025-06-23T23:38:01.852Z,46,2025-06-23T23:30:44.248Z,2025-06-23T23:42:13.357Z
P0113,Brian Richardson,Male,Dominican Republic,2025-06-23T23:38:01.852Z,50,2025-06-23T23:30:44.248Z,2025-06-23T23:42:13.357Z
P0149,Katherine Bowen DVM,Male,Mexico,2025-06-23T23:38:01.852Z,53,2025-06-23T23:30:44.248Z,2025-06-23T23:42:13.357Z
P0158,Jamie Mcneil,Male,Burundi,2025-06-23T23:38:01.852Z,76,2025-06-23T23:30:44.248Z,2025-06-23T23:42:13.357Z
P0155,Robert Singleton,Female,Northern Mariana Islands,2025-06-23T23:38:01.852Z,82,2025-06-23T23:30:44.248Z,2025-06-23T23:42:13.357Z
P0196,Monica Dillon,Female,Belarus,2025-06-23T23:38:01.852Z,86,2025-06-23T23:30:44.248Z,2025-06-23T23:42:13.357Z
P0154,Angel Thompson,Female,Gambia,2025-06-23T23:38:01.852Z,110,2025-06-23T23:30:44.248Z,2025-06-23T23:42:13.357Z
P0106,Joshua Shepherd Jr.,Female,Netherlands,2025-06-23T23:38:01.852Z,197,2025-06-23T23:30:44.248Z,2025-06-23T23:42:13.357Z


## **UPSERT**

In [0]:
from delta.tables import DeltaTable 

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):

    dlt_obj = DeltaTable.forName(spark, f"{catalog}.{target_schema}.{target_object}")
    dlt_obj.alias("trg").merge(df_union.alias("src"), f"trg.{surrogate_key} = src.{surrogate_key}")\
                        .whenMatchedUpdateAll(condition = f"src.{cdc_col} >= trg.{cdc_col}")\
                        .whenNotMatchedInsertAll()\
                        .execute()

else: 

    df_union.write.format("delta")\
            .mode("append")\
            .saveAsTable(f"{catalog}.{target_schema}.{target_object}")


In [0]:
%sql
select * from workspace.gold.dimpassengers where passenger_id = 'P0049'

passenger_id,name,gender,nationality,modifiedDate,DimPassengersKey,create_date,update_date
P0049,Justin Thomas,Female,Tokelau,2025-06-23T23:38:01.852Z,7,2025-06-23T23:30:44.248Z,2025-06-23T23:42:14.906Z
