In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *

### **Parameters**

In [0]:
# # # Catalog Name
# catalog = "workspace"

# # # Key Cols List
# key_cols = "['flight_id']"
# key_cols_list = eval(key_cols)

# # # CDC Column
# cdc_col = "modifiedDate"

# # # Backdated Refresh
# backdated_refresh = ""

# # # Source Object
# source_object = "silver_flights"

# # # Source Schema
# source_schema = "silver"

# # # Target Schema 
# target_schema = "gold"

# # # Target Object 
# target_object = "DimFlights"

# # # Surrogate Key
# surrogate_key = "DimFlightsKey"


In [0]:
# # Catalog Name
catalog = "workspace"

# # Key Cols List
key_cols = "['passenger_id']"
key_cols_list = eval(key_cols)

# # CDC Column
cdc_col = "modifiedDate"

# # Backdated Refresh
backdated_refresh = ""

# # Source Object
source_object = "silver_passengers"

# # Source Schema
source_schema = "silver"

# # Target Schema 
target_schema = "gold"

# # Target Object 
target_object = "DimPassengers"

# # Surrogate Key
surrogate_key = "DimPassengersKey"


## **INCREMENTAL DATA INGESTION**

#### **Last Load Date**

In [0]:
# No Back Dated Refresh
if len(backdated_refresh) == 0:
  
  # If Table Exists In The Destination
  if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):

    last_load = spark.sql(f"SELECT max({cdc_col}) FROM workspace.{target_schema}.{target_object}").collect()[0][0]
    
  else:
    last_load = "1900-01-01 00:00:00"

# Yes Back Dated Refresh
else:
  last_load = backdated_refresh

# Test The Last Load 
last_load

In [0]:
df_src = spark.sql(f"SELECT * FROM {source_schema}.{source_object} WHERE {cdc_col} >= '{last_load}'")
df_src.display()

## OLD vs NEW RECORDS

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"): 

  # Key Columns String For Incremental
  key_cols_string_incremental = ", ".join(key_cols_list)

  #taget column with surrogate key 
  df_trg = spark.sql(f"SELECT {key_cols_string_incremental}, {surrogate_key}, create_date, update_date FROM {catalog}. {target_schema}.{target_object}")
 
else:

  # Key Columns String For Initial
  key_cols_string_init = [f"'' AS {i}" for i in key_cols_list]
  key_cols_string_init = ", ".join(key_cols_string_init)

  df_trg = spark.sql(f"""SELECT {key_cols_string_init},CAST('0' AS INT ) AS {surrogate_key},CAST('1900-01-01 00:00:00' AS timestamp) AS create_date, CAST('1900-01-01 00:00:00' AS timestamp) AS update_date WHERE 1=0""")


In [0]:
df_trg.display()

**JOIN CONDITION**

In [0]:
join_condition = ' AND '.join([f"src.{i} = trg.{i}" for i in key_cols_list])

In [0]:
# Register DataFrames as temporary SQL views
df_src.createOrReplaceTempView("src")
df_trg.createOrReplaceTempView("trg")


# Perform the join using Spark SQL
df_join = spark.sql(f"""
    SELECT 
        src.*, 
        trg.{surrogate_key},
        trg.create_date,
        trg.update_date
    FROM src
    LEFT JOIN trg
    ON {join_condition}
""")



In [0]:
df_join.display()

In [0]:
# OLD RECORDS
df_old = df_join.filter(col(f'{surrogate_key}').isNotNull())

# NEW RECOERDS
df_new = df_join.filter(col(f'{surrogate_key}').isNull())

In [0]:
df_old.display()

## **ENRICHING DFS**

#### **Preparing DF_OLD**

In [0]:
df_old_enr = df_old.withColumn('update_date', current_timestamp())

#### **Preparing DF_NEW**

In [0]:
df_new.display()

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"): 
    max_surrogate_key = spark.sql(f"""
                            SELECT max({surrogate_key}) FROM {catalog}.{target_schema}.{target_object}
                        """).collect()[0][0]
    df_new_enr = df_new.withColumn(f'{surrogate_key}', lit(max_surrogate_key)+lit(1)+monotonically_increasing_id())\
                    .withColumn('create_date', current_timestamp())\
                    .withColumn('update_date', current_timestamp())    

else:
    max_surrogate_key = 0
    df_new_enr = df_new.withColumn(f'{surrogate_key}', lit(max_surrogate_key)+lit(1)+monotonically_increasing_id())\
                    .withColumn('create_date', current_timestamp())\
                    .withColumn('update_date', current_timestamp())


In [0]:
df_old_enr.display()

## **Unioning OLD AND NEW RECORDS**

In [0]:
df_union = df_old_enr.unionByName(df_new_enr)

In [0]:
df_old.printSchema()


In [0]:
df_union.display()

## **UPSERT**

In [0]:
from delta.tables import DeltaTable 

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):

    dlt_obj = DeltaTable.forName(spark, f"{catalog}.{target_schema}.{target_object}")
    dlt_obj.alias("trg").merge(df_union.alias("src"), f"trg.{surrogate_key} = src.{surrogate_key}")\
                        .whenMatchedUpdateAll(condition = f"src.{cdc_col} >= trg.{cdc_col}")\
                        .whenNotMatchedInsertAll()\
                        .execute()

else: 

    df_union.write.format("delta")\
            .mode("append")\
            .saveAsTable(f"{catalog}.{target_schema}.{target_object}")


In [0]:
%sql
select * from workspace.gold.DimPassengers where DimPassengersKey = 7
