In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### **Parameters**

In [0]:
# # Catalog name
# catalog = "workspace"

# # Key Cols list
# key_cols = "['flight_id']"
# key_cols_list = eval(key_cols)

# # CDC Cols
# cdc_col = "modified_date"

# # BACKDATED REFRESH
# backdated_refresh = ""

# # SOURCE OBJECT
# source_object = "silver_flights"

# # SOURCE SCHEMA
# source_schema = "silver"

# # TARGET OBJECT
# target_object = "DimFlights"

# # TARGET SCHEMA
# target_schema = "gold"

# # Surrogate Key
# surrogate_key = "DimFlightsKey"

In [0]:
# # Catalog Name
# catalog = "workspace"

# # Key Cols List
# key_cols = "['airport_id']"
# key_cols_list = eval(key_cols)

# # CDC Column
# cdc_col = "modified_date"

# # Backdated Refresh
# backdated_refresh = ""

# # Source Object
# source_object = "silver_airports"

# # Source Schema
# source_schema = "silver"

# # Target Schema 
# target_schema = "gold"

# # Target Object 
# target_object = "DimAirports"

# # Surrogate Key
# surrogate_key = "DimAirportsKey"

In [0]:
# Catalog Name
catalog = "workspace"

# Key Cols List
key_cols = "['passenger_id']"
key_cols_list = eval(key_cols)

# CDC Column
cdc_col = "modified_date"

# Backdated Refresh
backdated_refresh = ""

# Source Object
source_object = "silver_passengers"

# Source Schema
source_schema = "silver"

# Target Schema 
target_schema = "gold"

# Target Object 
target_object = "DimPassengers"

# Surrogate Key
surrogate_key = "DimPassengersKey"


### **Incremental Data Ingestion**

#### **Last Load Date**

In [0]:
# No Back Dated Refresh
if len(backdated_refresh) == 0:
    
  # If Table Exists In The Destination
  if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    
    last_load = spark.sql(f"SELECT max({cdc_col}) FROM {target_schema}.{target_object}").collect()[0][0]
  
  else:
    last_load = "1990-01-01 00:00:00"

# Yes Back Dated Refresh
else:
  last_load = backdated_refresh

# test last load
last_load

datetime.datetime(2025, 8, 6, 21, 41, 43, 909000)

In [0]:
df_src = spark.sql(f"SELECT * FROM workspace.{source_schema}.{source_object} WHERE {cdc_col} > '{last_load}'")

In [0]:
df_src.display()

passenger_id,name,gender,nationality,modified_date
P0223,Nicholas Gomez,Female,Cook Islands,2025-08-12T21:01:32.095Z
P0224,Jason Jensen,Female,Rwanda,2025-08-12T21:01:32.095Z
P0222,Maria Taylor,Male,Lao People's Democratic Republic,2025-08-12T21:01:32.095Z
P0225,William Lopez,Male,Heard Island and McDonald Islands,2025-08-12T21:01:32.095Z
P0221,Amy Welch,Male,Croatia,2025-08-12T21:01:32.095Z
P0032,Daniel Douglas,Male,Singapore,2025-08-12T21:01:32.095Z
P0106,Joshua Shepherd Jr.,Female,Netherlands,2025-08-12T21:01:32.095Z
P0108,Brian Anderson,Female,Eritrea,2025-08-12T21:01:32.095Z
P0154,Angel Thompson,Female,Gambia,2025-08-12T21:01:32.095Z
P0149,Katherine Bowen DVM,Male,Mexico,2025-08-12T21:01:32.095Z


#### **OLD VS NEW RECORDS**

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"): 

  # Key Columns String For Incremental
  key_cols_string_incremental = ", ".join(key_cols_list)

  df_trg = spark.sql(f"""SELECT {key_cols_string_incremental}, {surrogate_key}, create_date, update_date FROM {catalog}.{target_schema}.{target_object}""")


else:

  # Key Columns String For Initial
  key_cols_string_init = [f"'' AS {i}" for i in key_cols_list]
  key_cols_string_init = ", ".join(key_cols_string_init)
  
  df_trg = spark.sql(f"""SELECT {key_cols_string_init}, CAST('0' AS INT) AS {surrogate_key}, CAST('1900-01-01 00:00:00' AS timestamp) AS create_date, CAST('1900-01-01 00:00:00' AS timestamp) AS update_date WHERE 1=0""")

In [0]:
df_trg.display()

passenger_id,DimPassengersKey,create_date,update_date
P0001,1,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0002,2,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0003,3,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0004,4,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0005,5,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0006,6,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0007,7,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0008,8,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0009,9,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0010,10,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z


**JOIN CONDITION**

In [0]:
join_condition = ' AND '.join([f"src.{i} = trg.{i}" for i in key_cols_list])

In [0]:
df_src.createOrReplaceTempView("src")
df_trg.createOrReplaceTempView("trg")

df_join = spark.sql(f"""
          SELECT src.*,
          trg.{surrogate_key},
          trg.create_date,
          trg.update_date
        FROM src
        LEFT JOIN trg
        ON {join_condition}
        """)

In [0]:
df_join.display()

passenger_id,name,gender,nationality,modified_date,DimPassengersKey,create_date,update_date
P0155,Robert Singleton,Female,Northern Mariana Islands,2025-08-12T21:01:32.095Z,155.0,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0106,Joshua Shepherd Jr.,Female,Netherlands,2025-08-12T21:01:32.095Z,106.0,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0032,Daniel Douglas,Male,Singapore,2025-08-12T21:01:32.095Z,32.0,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0154,Angel Thompson,Female,Gambia,2025-08-12T21:01:32.095Z,154.0,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0196,Monica Dillon,Female,Belarus,2025-08-12T21:01:32.095Z,196.0,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0049,Justin Thomas,Female,Tokelau,2025-08-12T21:01:32.095Z,49.0,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0108,Brian Anderson,Female,Eritrea,2025-08-12T21:01:32.095Z,108.0,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0113,Brian Richardson,Male,Dominican Republic,2025-08-12T21:01:32.095Z,113.0,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0158,Jamie Mcneil,Male,Burundi,2025-08-12T21:01:32.095Z,158.0,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0149,Katherine Bowen DVM,Male,Mexico,2025-08-12T21:01:32.095Z,149.0,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z


In [0]:
# OLD RECORDS
df_old = df_join.filter(col(f"{surrogate_key}").isNotNull())

# NEW RECORDS
df_new = df_join.filter(col(f"{surrogate_key}").isNull())

In [0]:
df_old.display()

passenger_id,name,gender,nationality,modified_date,DimPassengersKey,create_date,update_date
P0032,Daniel Douglas,Male,Singapore,2025-08-12T21:01:32.095Z,32,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0049,Justin Thomas,Female,Tokelau,2025-08-12T21:01:32.095Z,49,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0106,Joshua Shepherd Jr.,Female,Netherlands,2025-08-12T21:01:32.095Z,106,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0108,Brian Anderson,Female,Eritrea,2025-08-12T21:01:32.095Z,108,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0113,Brian Richardson,Male,Dominican Republic,2025-08-12T21:01:32.095Z,113,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0149,Katherine Bowen DVM,Male,Mexico,2025-08-12T21:01:32.095Z,149,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0154,Angel Thompson,Female,Gambia,2025-08-12T21:01:32.095Z,154,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0155,Robert Singleton,Female,Northern Mariana Islands,2025-08-12T21:01:32.095Z,155,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0158,Jamie Mcneil,Male,Burundi,2025-08-12T21:01:32.095Z,158,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z
P0196,Monica Dillon,Female,Belarus,2025-08-12T21:01:32.095Z,196,2025-08-12T20:39:25.949Z,2025-08-12T20:39:25.949Z


## **ENRCHING DFs**

### **Preparing df_old**

In [0]:
# df_old enriched
df_old_enr = df_old.withColumn("update_date", current_timestamp())

### **Preparing df_new**

In [0]:
df_new.display()

passenger_id,name,gender,nationality,modified_date,DimPassengersKey,create_date,update_date
P0225,William Lopez,Male,Heard Island and McDonald Islands,2025-08-12T21:01:32.095Z,,,
P0224,Jason Jensen,Female,Rwanda,2025-08-12T21:01:32.095Z,,,
P0223,Nicholas Gomez,Female,Cook Islands,2025-08-12T21:01:32.095Z,,,
P0222,Maria Taylor,Male,Lao People's Democratic Republic,2025-08-12T21:01:32.095Z,,,
P0221,Amy Welch,Male,Croatia,2025-08-12T21:01:32.095Z,,,


In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    max_surrogate_key = spark.sql(f"""
                                  SELECT max({surrogate_key}) FROM {catalog}.{target_schema}.{target_object}
                                  """).collect()[0][0]
    df_new_enr = df_new.withColumn(f"{surrogate_key}", lit(max_surrogate_key) + lit(1) + monotonically_increasing_id())\
                       .withColumn("create_date", current_timestamp())\
                       .withColumn("update_date", current_timestamp())
else:
    max_surrogate_key = 0
    df_new_enr = df_new.withColumn(f"{surrogate_key}", lit(max_surrogate_key) + lit(1) + monotonically_increasing_id())\
                       .withColumn("create_date", current_timestamp())\
                       .withColumn("update_date", current_timestamp())

In [0]:
df_old_enr.display()

passenger_id,name,gender,nationality,modified_date,DimPassengersKey,create_date,update_date
P0032,Daniel Douglas,Male,Singapore,2025-08-12T21:01:32.095Z,32,2025-08-12T20:39:25.949Z,2025-08-12T21:04:16.056Z
P0049,Justin Thomas,Female,Tokelau,2025-08-12T21:01:32.095Z,49,2025-08-12T20:39:25.949Z,2025-08-12T21:04:16.056Z
P0106,Joshua Shepherd Jr.,Female,Netherlands,2025-08-12T21:01:32.095Z,106,2025-08-12T20:39:25.949Z,2025-08-12T21:04:16.056Z
P0108,Brian Anderson,Female,Eritrea,2025-08-12T21:01:32.095Z,108,2025-08-12T20:39:25.949Z,2025-08-12T21:04:16.056Z
P0113,Brian Richardson,Male,Dominican Republic,2025-08-12T21:01:32.095Z,113,2025-08-12T20:39:25.949Z,2025-08-12T21:04:16.056Z
P0149,Katherine Bowen DVM,Male,Mexico,2025-08-12T21:01:32.095Z,149,2025-08-12T20:39:25.949Z,2025-08-12T21:04:16.056Z
P0154,Angel Thompson,Female,Gambia,2025-08-12T21:01:32.095Z,154,2025-08-12T20:39:25.949Z,2025-08-12T21:04:16.056Z
P0155,Robert Singleton,Female,Northern Mariana Islands,2025-08-12T21:01:32.095Z,155,2025-08-12T20:39:25.949Z,2025-08-12T21:04:16.056Z
P0158,Jamie Mcneil,Male,Burundi,2025-08-12T21:01:32.095Z,158,2025-08-12T20:39:25.949Z,2025-08-12T21:04:16.056Z
P0196,Monica Dillon,Female,Belarus,2025-08-12T21:01:32.095Z,196,2025-08-12T20:39:25.949Z,2025-08-12T21:04:16.056Z


### **Unionng old and new records**

In [0]:
df_union = df_old_enr.unionByName(df_new_enr)

In [0]:
df_union.display()

passenger_id,name,gender,nationality,modified_date,DimPassengersKey,create_date,update_date
P0032,Daniel Douglas,Male,Singapore,2025-08-12T21:01:32.095Z,32,2025-08-12T20:39:25.949Z,2025-08-12T21:04:17.541Z
P0049,Justin Thomas,Female,Tokelau,2025-08-12T21:01:32.095Z,49,2025-08-12T20:39:25.949Z,2025-08-12T21:04:17.541Z
P0106,Joshua Shepherd Jr.,Female,Netherlands,2025-08-12T21:01:32.095Z,106,2025-08-12T20:39:25.949Z,2025-08-12T21:04:17.541Z
P0108,Brian Anderson,Female,Eritrea,2025-08-12T21:01:32.095Z,108,2025-08-12T20:39:25.949Z,2025-08-12T21:04:17.541Z
P0113,Brian Richardson,Male,Dominican Republic,2025-08-12T21:01:32.095Z,113,2025-08-12T20:39:25.949Z,2025-08-12T21:04:17.541Z
P0149,Katherine Bowen DVM,Male,Mexico,2025-08-12T21:01:32.095Z,149,2025-08-12T20:39:25.949Z,2025-08-12T21:04:17.541Z
P0154,Angel Thompson,Female,Gambia,2025-08-12T21:01:32.095Z,154,2025-08-12T20:39:25.949Z,2025-08-12T21:04:17.541Z
P0155,Robert Singleton,Female,Northern Mariana Islands,2025-08-12T21:01:32.095Z,155,2025-08-12T20:39:25.949Z,2025-08-12T21:04:17.541Z
P0158,Jamie Mcneil,Male,Burundi,2025-08-12T21:01:32.095Z,158,2025-08-12T20:39:25.949Z,2025-08-12T21:04:17.541Z
P0196,Monica Dillon,Female,Belarus,2025-08-12T21:01:32.095Z,196,2025-08-12T20:39:25.949Z,2025-08-12T21:04:17.541Z


### **UPSERT**

In [0]:
from delta.tables import DeltaTable

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    dlt_obj = DeltaTable.forName(spark, f"{catalog}.{target_schema}.{target_object}")
    dlt_obj.alias("trg").merge(df_union.alias("src"), f"trg.{surrogate_key} = src.{surrogate_key}")\
                        .whenMatchedUpdateAll(condition = f"src.{cdc_col} >= trg.{cdc_col}")\
                        .whenNotMatchedInsertAll()\
                        .execute()
else: 
    df_union.write.format("delta")\
            .mode("append")\
            .saveAsTable(f"{catalog}.{target_schema}.{target_object}")

In [0]:
%sql
SELECT * FROM gold.dimflights

flight_id,airline,origin,destination,flight_date,modified_date,DimFlightsKey,create_date,update_date
F0001,Delta,Kellyfort,South Kathleen,2025-05-04,2025-08-06T21:41:44.844Z,1,2025-08-12T20:06:43.130Z,2025-08-12T20:27:32.353Z
F0002,Qatar Airways,Lake Stephen,New Vincent,2025-04-29,2025-08-06T21:41:44.844Z,2,2025-08-12T20:06:43.130Z,2025-08-12T20:27:32.353Z
F0003,Lufthansa,East Patrickborough,North Mary,2025-05-11,2025-08-06T21:41:44.844Z,3,2025-08-12T20:06:43.130Z,2025-08-12T20:27:32.353Z
F0004,Delta,Maddenshire,Johnchester,2025-05-16,2025-08-06T21:41:44.844Z,4,2025-08-12T20:06:43.130Z,2025-08-12T20:27:32.353Z
F0005,Qatar Airways,Bennettside,New Mistyhaven,2025-06-13,2025-08-06T21:41:44.844Z,5,2025-08-12T20:06:43.130Z,2025-08-12T20:27:32.353Z
F0006,Air Canada,New Richardside,South Jamesborough,2025-05-16,2025-08-06T21:41:44.844Z,6,2025-08-12T20:06:43.130Z,2025-08-12T20:27:32.353Z
F0007,Delta,Berryport,Miguelburgh,2025-05-24,2025-08-06T21:41:44.844Z,7,2025-08-12T20:06:43.130Z,2025-08-12T20:27:32.353Z
F0008,Lufthansa,Briannachester,Cervantesland,2025-05-26,2025-08-06T21:41:44.844Z,8,2025-08-12T20:06:43.130Z,2025-08-12T20:27:32.353Z
F0009,Delta,Alexandraborough,North Alexishaven,2025-06-10,2025-08-06T21:41:44.844Z,9,2025-08-12T20:06:43.130Z,2025-08-12T20:27:32.353Z
F0010,Emirates,Kruegerchester,Martintown,2025-05-20,2025-08-06T21:41:44.844Z,10,2025-08-12T20:06:43.130Z,2025-08-12T20:27:32.353Z


In [0]:
%sql
SELECT * FROM workspace.gold.dimpassengers where passenger_id = 'P0049'

passenger_id,name,gender,nationality,modified_date,DimPassengersKey,create_date,update_date
P0049,Justin Thomas,Female,Tokelau,2025-08-12T21:01:32.095Z,49,2025-08-12T20:39:25.949Z,2025-08-12T21:04:19.806Z
