In [0]:
from delta.tables import *

stage_delta_table_name = "incremental_load.default.orders_stage"
target_delta_table_name = "incremental_load.default.orders_target"

In [0]:
stage_df = spark.read.table(stage_delta_table_name)

In [0]:
# Check if the target Delta table exists
if not spark._jsparkSession.catalog().tableExists(target_delta_table_name):
    # If the target table does not exist, overwrite it with the stage DataFrame
    stage_df.write.format("delta").mode("overwrite").saveAsTable(target_delta_table_name)
else:
    # If the target table exists, read it into a DataFrame
    target_df = DeltaTable.forName(spark, target_delta_table_name)

    # Define the merge condition based on the tracking number
    merge_condition = "stage.trackingnum = target.trackingnum"

    # Perform the merge operation: remove matching records in the target table to update with the latest values in the next step
    target_df.alias("target")\
    .merge(stage_df.alias("stage"), merge_condition)\
    .whenMatchedDelete()\
    .execute()

    # Append the stage DataFrame to the target Delta table
    stage_df.write.format("delta").mode("append").saveAsTable(target_delta_table_name)