In [None]:
# Import Delta Lake APIs
from delta.tables import DeltaTable

# Table names (adjust schema/catalog as needed)
stage_table_name = "first_ext.default.orders_stage"
target_table_name = "first_ext.default.orders_target"

# Read the data from the stage table
stage_df = spark.read.table(stage_table_name)

# --- Check if target table exists ---
if not spark.catalog.tableExists(target_table_name):
    # First run → create the target table directly from staging
    (
        stage_df.write
        .format("delta")
        .saveAsTable(target_table_name)
    )
    print(f"✅ Target table {target_table_name} created with initial data.")

else:
    # Target exists → perform SCD Type 1 merge (upsert)
    target_table = DeltaTable.forName(spark, target_table_name)

    # Define merge condition (using tracking_num as primary key)
    merge_condition = "stage.tracking_num = target.tracking_num"

    (
        target_table.alias("target")
        .merge(stage_df.alias("stage"), merge_condition)
        .whenMatchedDelete()  # delete old records
        .execute()
    )

    (
        stage_df.write
        .format("delta")
        .mode("append")
        .saveAsTable(target_table_name)
    )
    print(f"✅ SCD1 merge completed. Target table {target_table_name} updated.")
