In [0]:
updates_df = spark.table("finance_fraudworkspace.silver_managed.customers_silver")

In [0]:
# CDC LOGIC
from delta.tables import DeltaTable

silver_customer_clean = "finance_fraudworkspace.silver_managed.customers_silver"
updates_df = spark.table(silver_customer_clean)

silver_table = DeltaTable.forName(spark, silver_customer_clean)
(
    silver_table.alias("target")
    .merge(
        updates_df.alias("source"),
        "target.Customer_id = source.Customer_id"
    )
    .whenMatchedUpdate(set={
        "Full_name": "source.Full_name",
        "DOB": "source.DOB",
        "Email": "source.Email",
        "Risk_level": "source.Risk_level",
        "created_timestamp": "source.created_timestamp"
    })
    .whenNotMatchedInsert(values={
        "Customer_id": "source.Customer_id",
        "Full_name": "source.Full_name",
        "DOB": "source.DOB",
        "Email": "source.Email",
        "Risk_level": "source.Risk_level",
        "created_timestamp": "source.created_timestamp"
    })
    .execute()
)


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

### # TESTING CDC CUSTOMERS LOGIC

In [0]:
display(spark.table("finance_fraudworkspace.silver_managed.customers_silver"))

Customer_id,Full_name,DOB,Email,Risk_level,created_timestamp
C001,Alice Brown,1988-05-12,alice_brown@outlook.com,LOW,2023-01-10T09:15:00Z
C002,Bob Smith,1975-09-22,C002@noemail.com,MEDIUM,2023-02-14T11:20:00Z
C003,Charlie Lee,2001-01-01,charlielee@gmail.com,HIGH,2023-03-01T14:05:00Z
C004,David Wilson,1985-03-18,davidwilson1985@gmail.com,UNKNOWN,2023-04-12T16:45:00Z
C005,Unknown,2000-07-25,evadunne@gmail.com,HIGH,2023-05-20T10:30:00Z
C006,Fatima Khan,1998-12-01,fatimakhxn@gmail.com,LOW,2023-06-18T08:10:00Z


In [0]:
# MODIFY ONE ROW FOR TESTING CDC LOGIC

from pyspark.sql.functions import when, lit, col
updates_df = spark.table("finance_fraudworkspace.silver_managed.customers_silver") \
    .withColumn(
        "Risk_level",
        when(col("Customer_id") == "C002", lit("HIGH"))
        .otherwise(col("Risk_level"))
    )

display(updates_df.filter("Customer_id = 'C002'"))


Customer_id,Full_name,DOB,Email,Risk_level,created_timestamp
C002,Bob Smith,1975-09-22,C002@noemail.com,HIGH,2023-02-14T11:20:00Z


In [0]:
%sql
DESCRIBE HISTORY finance_fraudworkspace.silver_managed.customers_silver;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2026-01-29T19:55:16Z,149160287736310,x23122498-nci@outlook.com,MERGE,"Map(predicate -> [""(Customer_id#193578 = Customer_id#193538)""], clusterBy -> [], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(1632028177295825),0124-183127-on88zhrb,2.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, numTargetBytesAdded -> 2526, numTargetBytesRemoved -> 2526, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 6, executionTimeMs -> 1886, materializeSourceTimeMs -> 1, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 887, numTargetRowsUpdated -> 6, numOutputRows -> 6, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 6, numTargetFilesRemoved -> 1, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 965)",,Databricks-Runtime/17.3.x-scala2.13
2,2026-01-29T19:54:06Z,149160287736310,x23122498-nci@outlook.com,MERGE,"Map(predicate -> [""(Customer_id#189128 = Customer_id#189088)""], clusterBy -> [], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(1632028177295825),0124-183127-on88zhrb,1.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, numTargetBytesAdded -> 2526, numTargetBytesRemoved -> 2526, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 6, executionTimeMs -> 1954, materializeSourceTimeMs -> 1, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 931, numTargetRowsUpdated -> 6, numOutputRows -> 6, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 6, numTargetFilesRemoved -> 1, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 991)",,Databricks-Runtime/17.3.x-scala2.13
1,2026-01-29T19:50:41Z,149160287736310,x23122498-nci@outlook.com,MERGE,"Map(predicate -> [""(Customer_id#176671 = Customer_id#176631)""], clusterBy -> [], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(1632028177295825),0124-183127-on88zhrb,0.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, numTargetBytesAdded -> 2526, numTargetBytesRemoved -> 2526, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 6, executionTimeMs -> 7082, materializeSourceTimeMs -> 5, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 4541, numTargetRowsUpdated -> 6, numOutputRows -> 6, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 6, numTargetFilesRemoved -> 1, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 2436)",,Databricks-Runtime/17.3.x-scala2.13
0,2026-01-29T19:39:44Z,149160287736310,x23122498-nci@outlook.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,List(2332074110742305),0124-183127-on88zhrb,,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 6, numOutputBytes -> 2526)",,Databricks-Runtime/17.3.x-scala2.13
