In [0]:
from delta.tables import DeltaTable
from pyspark.sql import DataFrame

## The Original Incremental Load Function from DP-700

In [0]:
def incremental_load(spark, df_source: DataFrame, target_table: str, candidate_key: list):
    try:
        delta_table = DeltaTable.forName(spark, target_table)
    except Exception:
        try:
            df_source.write.format('delta').mode('overwrite').saveAsTable(target_table)
            print(f"Table '{target_table}' created successfully.")
        except Exception as e:
            print(f"Initial load for table {target_table} failed with error: {str(e)}")
            raise
        return

    try:
        change_detection_columns = [col for col in df_source.columns if col not in candidate_key]

        match_condition = ' AND '.join([f'target.{col} = source.{col}' for col in candidate_key])
        update_condition = ' OR '.join([f'target.{col} != source.{col}' for col in change_detection_columns])

        update_expr = {col: f'source.{col}' for col in df_source.columns}

        merge_operation = delta_table.alias('target').merge(
            source=df_source.alias('source'),
            condition=match_condition
        ).whenMatchedUpdate(
            condition=update_condition,
            set=update_expr
        ).whenNotMatchedInsertAll()

        merge_operation.execute()
        print(f"Incremental load completed successfully for table '{target_table}'.")
    except Exception as e:
        print(f"Incremental load failed for table {target_table} with error: {str(e)}")
        return

### Creating Initial Data

In [0]:
data_initial = [
    (1, "Mario Rossi", "2025-01-01", 100),
    (2, "Luigi Bianchi", "2025-01-02", 150),
    (3, "Carla Verdi", "2025-01-03", 200)
]
columns = ["id", "name", "date", "amount"]
df_initial = spark.createDataFrame(data_initial, columns)

### Performing the Initial Load

In [0]:
incremental_load(
    spark,
    df_source=df_initial,
    target_table="default.my_incremental_table",
    candidate_key=["id"]
)

Table 'default.my_incremental_table' created successfully.


In [0]:
# Verify the results

spark.sql("SELECT * FROM default.my_incremental_table").show()

+---+-------------+----------+------+
| id|         name|      date|amount|
+---+-------------+----------+------+
|  2|Luigi Bianchi|2025-01-02|   150|
|  1|  Mario Rossi|2025-01-01|   100|
|  3|  Carla Verdi|2025-01-03|   200|
+---+-------------+----------+------+



### Loading New and Updated Records

In [0]:
data_update = [
    (1, "Mario Rossi", "2025-01-01", 120),  # Updated
    (2, "Luigi Bianchi", "2025-01-02", 150), # Unchanged
    (4, "Anna Blu", "2025-01-04", 300)        # New Record
]
df_update = spark.createDataFrame(data_update, columns)

incremental_load(
    spark,
    df_source=df_update,
    target_table="default.my_incremental_table",
    candidate_key=["id"]
)

Incremental load completed successfully for table 'default.my_incremental_table'.


In [0]:
# Verify the updated table

spark.sql("SELECT * FROM default.my_incremental_table ORDER BY id").show()

+---+-------------+----------+------+
| id|         name|      date|amount|
+---+-------------+----------+------+
|  1|  Mario Rossi|2025-01-01|   120|
|  2|Luigi Bianchi|2025-01-02|   150|
|  3|  Carla Verdi|2025-01-03|   200|
|  4|     Anna Blu|2025-01-04|   300|
+---+-------------+----------+------+

