
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning">
</div>


# Processing Records from CDF and Propagating Changes

In this notebook, we'll demonstrate how you can easily propagate changes (inserts, updates and deletes) through a Lakehouse with Delta Lake [Change Data Feed (CDF)](https://docs.databricks.com/en/delta/delta-change-data-feed.html) via Stream or queried by its specific version.

For this demo, we'll work with a slightly different dataset representing patient information for medical records. Descriptions of the data at various stages follow.

#### Raw Files
We will be loading JSON files into a Delta table.

#### Bronze Table (bronze_users)
Here we store all records as consumed. A row represents:
1. A new patient providing data for the first time
1. An existing patient confirming that their information is still correct
1. An existing patient updating some of their information

The type of action a row represents is not captured.

#### Silver Table
This is the validated view of our data. Each patient will appear only once in this table. An upsert statement will be used to identify rows that have changed.


#### Gold Table
For this example, we'll create a simple gold table leveraging the silver table.


By the end of this lesson, students will be able to:
- Enable Change Data Feed on a cluster or for a particular table
- Describe how changes (insert, update and delete) are recorded
- Read CDF output with Spark SQL or PySpark
- Leverage the `change_table` function for tracking changes and `_change_type` column to get specific actions in your data. 
- Retrieve the latest history version from a table


## REQUIRED - SELECT CLASSIC COMPUTE

Before executing cells in this notebook, please select your classic compute cluster in the lab. Be aware that **Serverless** is enabled by default.

Follow these steps to select the classic compute cluster:

1. Navigate to the top-right of this notebook and click the drop-down menu to select your cluster. By default, the notebook will use **Serverless**.

1. If your cluster is available, select it and continue to the next cell. If the cluster is not shown:

    - In the drop-down, select **More**.

    - In the **Attach to an existing compute resource** pop-up, select the first drop-down. You will see a unique cluster name in that drop-down. Please select that cluster.

**NOTE:** If your cluster has terminated, you might need to restart it in order to select it. To do this:

1. Right-click on **Compute** in the left navigation pane and select *Open in new tab*.

1. Find the triangle icon to the right of your compute cluster name and click it.

1. Wait a few minutes for the cluster to start.

1. Once the cluster is running, complete the steps above to select your cluster.

## A. Classroom Setup

Run the following cell to configure your working environment for this course. It will also set your default catalog to your unique catalog name and the schema to your specific schema name shown below using the `USE` statements.
<br></br>


```
USE CATALOG your-catalog;
USE SCHEMA your-catalog.pii_data;
```

**NOTE:** The `DA` object is only used in Databricks Academy courses and is not available outside of these courses. It will dynamically reference the information needed to run the course.

In [0]:
%run ./Includes/Classroom-Setup-1.3

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m



------- Resetting Files for the Start of the Demo -------


Searching for files in /Volumes/dbacademy/ops/labuser10747597_1750981120@vocareum_com/cdf_demo/stream_source/cdc to delete...
Deleting file: /Volumes/dbacademy/ops/labuser10747597_1750981120@vocareum_com/cdf_demo/stream_source/cdc/batch01.json
Deleting file: /Volumes/dbacademy/ops/labuser10747597_1750981120@vocareum_com/cdf_demo/stream_source/cdc/batch02.json
Deleting file: /Volumes/dbacademy/ops/labuser10747597_1750981120@vocareum_com/cdf_demo/stream_source/cdc/batch03.json

Searching for files in /Volumes/labuser10747597_1750981120/pii_data/cdf_demo/stream_source/cdc to delete...
Deleting file: /Volumes/labuser10747597_1750981120/pii_data/cdf_demo/stream_source/cdc/batch01.json

-----Creating JSON files for CDC-----
Creating file /Volumes/dbacademy/ops/labuser10747597_1750981120@vocareum_com/cdf_demo/stream_source/cdc/batch01.json.
Creating file /Volumes/dbacademy/ops/labuser10747597_1750981120@vocareum_com/cdf_demo/stream_

0,1
Your Course Catalog Name:,
Your Schema Name:,
Source Data Volume (DA.paths.cdc_stream):,
Your Checkpoint Location (DA.paths.checkpoints):,


### A1. Catalog and Schema

Run the code below to view your current default catalog and schema. Ensure that their names match the ones from the cell above.

In [0]:
%sql
SELECT current_catalog(), current_schema()

current_catalog(),current_schema()
labuser10747597_1750981120,pii_data


### A2. Source Data Volume

Run the code below to view the volume path where the data will be stored for streaming. You can also check the contents in the Catalog Explorer, located in the left navigation pane. Currently no files are loaded to the volume.

In [0]:
print(f"Source Data Volume:{DA.paths.cdc_stream}")

Source Data Volume:/Volumes/labuser10747597_1750981120/pii_data/cdf_demo/stream_source/cdc


### A3. Structure Streaming Checkpoint folder

The [checkpoint](https://docs.databricks.com/en/structured-streaming/checkpoints.html) location tracks information that identifies the query, including state data and processed records. When you delete files in a checkpoint directory or switch to a new checkpoint location, the next run of the query starts fresh.

Run the code below to view the checkpoint path to use in our structured streaming process. You can also check this in the Catalog Explorer.Currently no files are loaded to the **_checkpoint** folder in the volume.

In [0]:
print(f"Checkpoint Volume:{DA.paths.checkpoints}")

Checkpoint Volume:/Volumes/labuser10747597_1750981120/pii_data/cdf_demo/_checkpoints


## B. Create a Bronze Users Table and Ingest Data with Auto Loader

In this section, we'll use Auto Loader to ingest data as it arrives.

The steps below include:
* Declaring the target table
* Creating and starting the stream
* Loading data into the source directory



### B1. Create the bronze_users table

In [0]:
%sql
-- Drop Table if exists
DROP TABLE IF EXISTS bronze_users;  

-- Create bronze_users table
CREATE TABLE IF NOT EXISTS bronze_users ( 
  mrn BIGINT, 
  dob DATE, 
  sex STRING, 
  gender STRING, 
  first_name STRING, 
  last_name STRING, 
  street_address STRING, 
  zip BIGINT, city STRING, 
  state STRING, 
  updated timestamp
);

### B2. Create, Start, and Load Stream Data into the Bronze Table

The cell below completes the following:
- defines a schema for the stream
- creates a stream into the **bronze_users** table that runs every **3 seconds**
- loads the first batch of JSON files to your *../pii_data/cdf_demo/stream_source/cdc* volume.

**Note**: The stream will continue running every 3 seconds until it is terminated later in the notebook. Wait for the stream to finish reading the file before continuing to the next cell.


In [0]:
# Define the schema for the incoming data
schema = """
  mrn BIGINT, 
  dob DATE, 
  sex STRING, 
  gender STRING, 
  first_name STRING, 
  last_name STRING, 
  street_address STRING, 
  zip BIGINT, 
  city STRING, 
  state STRING, 
  updated TIMESTAMP
"""

# Read the streaming data from the specified path with the defined schema into the Bronze Table
from pyspark.sql.functions import col, expr,date_add

bronze_users_stream = (
      spark
      .readStream
        .format("cloudFiles")                 # Specify the format as cloudFiles for auto loader
        .option("cloudFiles.format", "json")  # Specify the file format as JSON
        .schema(schema)                       # Apply the defined schema to the incoming data
        .load(DA.paths.cdc_stream)            # Load the data from the specified path
      .writeStream
        .format("delta")                      # Write the stream data in Delta format
        .outputMode("append")                 # Append new records to the table
        .trigger(processingTime='3 seconds')
        .option("checkpointLocation", f"{DA.paths.checkpoints}/bronze")  # Specify the checkpoint location
        .table("bronze_users"))               # Write the stream data to the Bronze table


# Load a file into the volume: DA.paths.cdc_stream
DA.load(copy_from=DA.paths.stream_source.cdf_demo, 
        copy_to=DA.paths.cdc_stream, 
        n=1)


----Loading files to user's volume-----
File number 1 - Copying file /Volumes/dbacademy/ops/labuser10747597_1750981120@vocareum_com/cdf_demo/stream_source/cdc/batch01.json --> /Volumes/labuser10747597_1750981120/pii_data/cdf_demo/stream_source/cdc/batch01.json.
Stop loading files to user's volume.


### B3. Validate a File has been placed in your volume

Wait for the stream to initialize and process the file that has been placed to be ingested into the **bronze_users** table. Run the cell below to get the exact names of your volumes. 

In the catalog explorer check your source files and checkpoint volumes and confirm files have been added. You should see the following:
- In the  **cdc** folder a file named *batch01.json*
- In the **_checkpoints** a folder named **bronze** corresponding to the **bronze_users** table.

**Example**

![Source and Checkpoint Folders](./Includes/images/stream_folders.png)


In [0]:
print(f"Source Data Volume:{DA.paths.cdc_stream}")
print(f"Checkpoint Volume:{DA.paths.checkpoints}")

Source Data Volume:/Volumes/labuser10747597_1750981120/pii_data/cdf_demo/stream_source/cdc
Checkpoint Volume:/Volumes/labuser10747597_1750981120/pii_data/cdf_demo/_checkpoints


### B4. Validate Ingested Data in the **bronze_users** Table

Query the **bronze_users** table and confirm that *829* users have been loaded into the table from the first batch of JSON files (one file).


In [0]:
%sql
SELECT * 
FROM bronze_users;

mrn,dob,sex,gender,first_name,last_name,street_address,zip,city,state,updated



## C. Create a **silver_users** Target Table and Load It with Our Production Data

Our **silver_users** table will be loaded with production data from our users to serve as a baseline. Here, we use `DEEP CLONE` to move read-only data from PROD to our environment, where we have full write/delete access.

In this case, our production data is stored as a Delta table in the following location: `Volumes/dbacademy_gym_data/v01/pii/silver`

In [0]:
%sql
DROP TABLE IF EXISTS silver_users;

CREATE OR REPLACE TABLE silver_users
DEEP CLONE delta.`/Volumes/dbacademy_gym_data/v01/pii/silver/`;

source_table_size,source_num_of_files,num_of_synced_transactions,num_removed_files,num_copied_files,removed_files_size,copied_files_size
117357,1,,0,1,0,117357


### C1. Enable Change Data Feed (CDF)

You can enable CDF on both new and existing tables.

To globally enable CDF on every new table, use the following syntax:

```spark.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", True)```

Tables that were not created with CDF enabled will not have it turned on by default, but they can be altered to capture changes with the `ALTER TABLE` statement.

For more information, see the [Enable Change Data Feed](https://docs.databricks.com/en/delta/delta-change-data-feed.html#enable-change-data-feed) documentation.



In [0]:
%sql
ALTER TABLE silver_users 
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

### C2. Check if CDF is Enabled

Use the `DESCRIBE TABLE EXTENDED` command on the silver table to check if CDF is enabled. In the output, look at the last row under **Table Properties** and confirm that CDF is set with the `[delta.enableChangeDataFeed=true]` property.


In [0]:
%sql
DESCRIBE TABLE EXTENDED silver_users;

col_name,data_type,comment
mrn,bigint,
dob,date,
sex,string,
gender,string,
first_name,string,
last_name,string,
street_address,string,
zip,bigint,
city,string,
state,string,


Query the **silver_users** table and confirm that it contains *3,132* rows.


In [0]:
%sql
SELECT * 
FROM silver_users;

mrn,dob,sex,gender,first_name,last_name,street_address,zip,city,state,updated
77688376,1970-11-13,F,F,Julie,Nelson,3860 Dana Ridges,90095,Los Angeles,CA,2010-01-01T00:00:00Z
18855422,1941-07-05,M,M,Chad,Mullen,9331 Gabrielle Mountains,90302,Inglewood,CA,2010-01-01T00:00:00Z
28927996,1990-10-07,M,M,Andrew,Mccarthy,40403 Henderson Parkway Apt. 970,90240,Downey,CA,2010-01-01T00:00:00Z
31164725,1981-08-31,F,F,Jacqueline,Mcdaniel,2327 Christina Coves Apt. 173,93523,Edwards,CA,2010-01-01T00:00:00Z
95594654,1994-12-04,F,F,Lisa,Taylor,54561 David Parkways,91107,Pasadena,CA,2010-01-01T00:00:00Z
99408191,1947-02-23,M,M,Kyle,Evans,74870 Brown Ridges Suite 199,91605,North Hollywood,CA,2010-01-01T00:00:00Z
79237635,1923-02-08,F,F,Kelly,Thompson,98494 Martinez Park Suite 587,91606,North Hollywood,CA,2010-01-01T00:00:00Z
22605553,1934-01-11,M,M,Richard,Harris,5902 Ruth Meadow,90028,Los Angeles,CA,2010-01-01T00:00:00Z
31106562,1973-10-29,M,M,Frank,Edwards,2971 Dawn Highway,90201,Bell Gardens,CA,2010-01-01T00:00:00Z
79135872,1950-01-15,M,M,Robert,Hayes,71723 Michelle Park,91608,Universal City,CA,2010-01-01T00:00:00Z


## D. Upsert Data from Bronze to Silver Users

In this section, we'll stream data from the **bronze_users** table to the **silver_users** table.

We will create the `upsert_to_delta` function to handle the streaming `MERGE INTO` operation for the **silver_users** table.

### D1. `upsert_to_delta` Function

Here, we create the upsert logic for the **silver_users** table using a streaming read from the **bronze_users** table.

The function below performs the following steps:
- The `microBatchDF.createOrReplaceTempView("updates")` line takes the incoming micro-batch DataFrame (`microBatchDF`) and registers it as a temporary SQL view named **updates**.
- The `MERGE INTO` statement combines `UPDATE` and `INSERT` actions into one operation using our unique identifier **mrn** as the MERGE condition. This statement uses the incoming micro-batch data view **updates** as the source to update the target **silver_users**.

In [0]:
def upsert_to_delta(microBatchDF, batchId):
    # Create or replace a temporary view for the micro-batch DataFrame
    microBatchDF.createOrReplaceTempView("updates")
    
    # Perform a MERGE operation to upsert data into the silver table
    # The MERGE statement matches records in the 'silver' table with records in the 'updates' view based on the 'mrn' field
    # If a match is found and any of the specified fields are different, the existing record in 'silver' is updated with the new values from 'updates'
    # If no match is found, a new record is inserted into 'silver' with the values from 'updates'
    microBatchDF._jdf.sparkSession().sql("""
        MERGE INTO silver_users s
        USING updates u
        ON s.mrn = u.mrn
        WHEN MATCHED AND s.dob <> u.dob OR
                         s.sex <> u.sex OR
                         s.gender <> u.gender OR
                         s.first_name <> u.first_name OR
                         s.last_name <> u.last_name OR
                         s.street_address <> u.street_address OR
                         s.zip <> u.zip OR
                         s.city <> u.city OR
                         s.state <> u.state OR
                         s.updated <> u.updated
            THEN UPDATE SET *
        WHEN NOT MATCHED
            THEN INSERT *
    """)

### D2. Stream user's data from Bronze into Silver tables

Now lets create and start the stream from **bronze_users** into **silver_users**. The `foreachBatch` method uses the `upsert_to_delta` function from above to upsert from a streaming query.

Check the [Upsert from streaming queries using foreachBatch](https://docs.databricks.com/en/structured-streaming/delta-lake.html#merge-in-streaming) documentation for more information.


In [0]:
silver_users_stream = (
              spark
              .readStream
                .table("bronze_users")
              .writeStream
                .foreachBatch(upsert_to_delta)  # Upsert micro-batch data into the silver table
                .trigger(processingTime='3 seconds')  # Trigger the stream processing every 3 seconds
                .start()
        )

### D3. Check the History of the Silver Users Table

Wait for the stream to initialize and process.

Once complete, let's check the changes applied to our **silver_users** table.

Running the cell below will show the history for this table, and there should be three log entries:
- Version 0: The initial clone
- Version 1: Setting table properties to enable CDF on the table
- Version 2: The MERGE stream from the **bronze_users** table.

**NOTE:** If the results show only two versions, please wait for the stream to complete and then rerun the cell below.


In [0]:
%sql
DESCRIBE HISTORY silver_users

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2025-06-27T00:13:42Z,3304343146826341,labuser10747597_1750981120@vocareum.com,SET TBLPROPERTIES,"Map(properties -> {""delta.enableChangeDataFeed"":""true""})",,List(3913647707152806),0626-233939-2sqbphck,0,WriteSerializable,True,Map(),,Databricks-Runtime/15.4.x-scala2.12
0,2025-06-27T00:13:37Z,3304343146826341,labuser10747597_1750981120@vocareum.com,CLONE,"Map(source -> delta.`dbfs:/Volumes/dbacademy_gym_data/v01/pii/silver`, sourceVersion -> 0, isShallow -> false)",,List(3913647707152806),0626-233939-2sqbphck,-1,Serializable,False,"Map(removedFilesSize -> 0, numRemovedFiles -> 0, sourceTableSize -> 117357, numCopiedFiles -> 1, copiedFilesSize -> 117357, sourceNumOfFiles -> 1)",,Databricks-Runtime/15.4.x-scala2.12


## E. Access and Read the Change Data Feed (CDF)

The [Change Data Feed (CDF)](https://docs.databricks.com/en/delta/delta-change-data-feed.html) feature allows tracking of row-level changes between versions of a Delta table. When enabled, it records change events for all data written to the table, including metadata that indicates whether a row was inserted, deleted, or updated.

To capture the recorded CDC data in a stream, we'll add two options:
- **`readChangeData` = True**
- **`startingVersion` = 2** (This refers to the version history; alternatively, you can use **`startingTimestamp`**)

## CDF Schema in Results
When reading from the change data feed, the schema includes the following metadata columns:

| Column name        | Type      | Description                                    |
|--------------------|-----------|------------------------------------------------|
| _change_type       | String    | The type of change event: `insert`, `delete`, `update_preimage`, `update_postimage`. |
| _commit_version    | Long      | The Delta log or table version containing the change |
| _commit_timestamp  | Timestamp | The timestamp when the commit was created      |




In this section, we’ll display all changes to patients in the **silver_users** table starting at version *2*.

Run the cell below and view the results. Notice that the results show 1,530 records have been modified.

Then, scroll to the right of the table. You will notice the new columns: **_change_type**, **_commit_version**, and **_commit_timestamp**.

**NOTE:** Users with changes will have two records: one corresponding to *update_preimage* and one corresponding to *update_postimage*.

In [0]:
cdf_df = (spark.read
               .format("delta")
               .option("readChangeData", True)   # Read the change data
               .option("startingVersion", 1)     # Since we want to start reading changes from version 2 (when we merged the data)
               .table("silver_users"))

## Display the changed data
display(cdf_df)

mrn,dob,sex,gender,first_name,last_name,street_address,zip,city,state,updated,_change_type,_commit_version,_commit_timestamp


### E1. Land New Data to Stream

As a recap, we currently have two active streams:

1. The first stream ingests data from our source folder (raw JSON files) into the **bronze_users** table.
2. The second stream syncs data via `MERGE INTO` from the **bronze_users** table to the **silver_users** table.

Run the cell below to land another JSON file in our source directory and stream additional data into the pipeline.

In [0]:
# Load files into the volume: DA.paths.cdc_stream
DA.load(copy_from=DA.paths.stream_source.cdf_demo, 
        copy_to=DA.paths.cdc_stream, 
        n=2)


----Loading files to user's volume-----
File number 1 - batch01.json is already in the source volume. Skipping file.
File number 2 - Copying file /Volumes/dbacademy/ops/labuser10747597_1750981120@vocareum_com/cdf_demo/stream_source/cdc/batch02.json --> /Volumes/labuser10747597_1750981120/pii_data/cdf_demo/stream_source/cdc/batch02.json.
Stop loading files to user's volume.


### E2. Check again silver_users Table History

#### NOTE: Wait a few seconds for the stream to process the newly added JSON file and upsert data into the **silver_users** table.

Our objective is to validate the captured CDC changes for **`_commit_version` number 3** (you can change the sort order of the **`_commit_version`** column in the display above to see this).

Run the cell below to view the history of the **silver_users** table after the new file was added to the volume. 

After running the cell, view the results and confirm that version *3* is now available in the history with a *MERGE* operation. This should correspond to the last cell where new files were loaded.

**NOTE:** If three rows are returned in the results, please wait a few more seconds for the stream to complete, then rerun the cell. The history should display 4 rows.


In [0]:
%sql
DESCRIBE HISTORY silver_users;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2025-06-27T00:13:42Z,3304343146826341,labuser10747597_1750981120@vocareum.com,SET TBLPROPERTIES,"Map(properties -> {""delta.enableChangeDataFeed"":""true""})",,List(3913647707152806),0626-233939-2sqbphck,0,WriteSerializable,True,Map(),,Databricks-Runtime/15.4.x-scala2.12
0,2025-06-27T00:13:37Z,3304343146826341,labuser10747597_1750981120@vocareum.com,CLONE,"Map(source -> delta.`dbfs:/Volumes/dbacademy_gym_data/v01/pii/silver`, sourceVersion -> 0, isShallow -> false)",,List(3913647707152806),0626-233939-2sqbphck,-1,Serializable,False,"Map(removedFilesSize -> 0, numRemovedFiles -> 0, sourceTableSize -> 117357, numCopiedFiles -> 1, copiedFilesSize -> 117357, sourceNumOfFiles -> 1)",,Databricks-Runtime/15.4.x-scala2.12


## F. Table Changes Function

To pick up the recorded CDC data for a specific range of table's history versions there is the [table_changes](https://docs.databricks.com/en/sql/language-manual/functions/table_changes.html) function is part of the Delta Lake Change Data Feed (CDF) feature, which tracks row-level changes between versions of a Delta table. It returns a log of changes to a Delta Lake table with Change Data Feed enabled, including inserts, updates, and deletes.

Use the following syntax: `table_changes(table_str, start [, end])`
- _table_str_:  The table name
- _start_: starting history version or timestamp
- _end_: optional ending version or timestamp

### F1. Get the Latest Version of the Table's History

The cell below will capture the most recent version of the **silver_users** table and store it in an SQL variable.

Confirm that the variable **latest_version** equals *3*.


In [0]:
%sql
-- Drop the existing variable if it exists
DROP TEMPORARY VARIABLE IF EXISTS latest_version;

-- Declare the variable
DECLARE VARIABLE latest_version INT;

-- Set the variable to the latest version of the table
SET VARIABLE latest_version = (
  SELECT max(version) AS latest_version
  FROM (DESCRIBE HISTORY silver_users)
);

-- Select the variable
SELECT latest_version;

latest_version
1


### F2. Get Operation Metrics for Inserted, Updated, and Deleted Rows

The cell below will query the table’s latest history using the **latest_version** variable and display the following operation metrics for the most recent update/insert/delete:
- **numTargetRowsInserted**: The number of rows inserted in the latest version (147 inserted).
- **numTargetRowsUpdated**: The number of rows updated in the latest version (809 updated).
- **numTargetRowsDeleted**: The number of rows deleted in the latest version (0 deleted).


In [0]:
%sql
SELECT 
  operationMetrics['numTargetRowsInserted'],
  operationMetrics['numTargetRowsUpdated'],
  operationMetrics['numTargetRowsDeleted']
FROM (DESCRIBE HISTORY silver_users)
WHERE version = latest_version

operationMetrics[numTargetRowsInserted],operationMetrics[numTargetRowsUpdated],operationMetrics[numTargetRowsDeleted]
,,


### F3. View Modified Rows (Inserted, Updated or Deleted)

The **_change_type** column allows us to easily see how rows were modified.

- *insert*: Indicates a new row was added to the table.
- *delete*: Indicates a row was removed from the table.
- *update_preimage*: Represents the value of a row before it was updated.
- *update_postimage*: Represents the value of a row after it was updated.

#### F3.1 View All Rows That Were Inserted (insert)

In the cell below, use the `table_changes` function to query most recent update to the **silver_users** table and filter for all rows that were inserted. Use the **_change_type** column to filter for rows with the value *insert*. Run the cell.

Notice that the resulting row count matches the one provided in the previous cell: ('numTargetRowsInserted') = *147*. 

Scroll to the right of the table and view the columns **_change_type**, **_commit_version** and **_commit_timestamp**.


With CDF, you can easily inspect every row that was inserted.

In [0]:
%sql
SELECT * 
FROM table_changes("silver_users", latest_version)    -- Query the latest update
WHERE _change_type = "insert"                         -- View all rows that were inserted
ORDER BY _commit_version

mrn,dob,sex,gender,first_name,last_name,street_address,zip,city,state,updated,_change_type,_commit_version,_commit_timestamp
99672922,1931-10-18,F,F,Kathy,Rodriguez,675 Anderson Stream Suite 436,91023,Mt Wilson,CA,2019-12-21T00:42:59.294Z,insert,2,2025-06-27T00:14:01Z
96445179,1986-12-23,M,M,George,Gay,2097 Cook Mills Suite 006,91125,Pasadena,CA,2019-12-20T20:33:58.672Z,insert,2,2025-06-27T00:14:01Z
46655319,1933-08-10,F,F,Gloria,Sparks,5353 Williams Row,90296,Playa Del Rey,CA,2019-12-20T17:37:32.98Z,insert,2,2025-06-27T00:14:01Z
34127363,1947-04-20,F,F,Brianna,Johnson,1251 Miller Lodge Apt. 327,91606,North Hollywood,CA,2019-12-20T11:45:32.349Z,insert,2,2025-06-27T00:14:01Z
54353309,1980-03-26,F,F,Nicole,Ayers,8170 Charles Isle,91403,Sherman Oaks,CA,2019-12-20T14:46:14.785Z,insert,2,2025-06-27T00:14:01Z
90987112,1962-01-01,F,F,Sandra,Kelly,028 Garcia Crossing Apt. 413,91607,Valley Village,CA,2019-12-20T04:09:36.585Z,insert,2,2025-06-27T00:14:01Z
31424934,1965-09-24,M,M,Thomas,Miller,863 Alexander Circle,91010,Duarte,CA,2019-12-19T19:30:24.735Z,insert,2,2025-06-27T00:14:01Z
38117071,1946-10-18,M,M,Steven,Glenn,68987 Johnson Inlet Apt. 587,90043,Los Angeles,CA,2019-12-22T00:48:58.333Z,insert,2,2025-06-27T00:14:01Z
30855228,1927-01-23,M,M,Brian,Dougherty,60977 Ashley Ferry Apt. 865,90804,Long Beach,CA,2019-12-21T21:36:02.674Z,insert,2,2025-06-27T00:14:01Z
81696473,1987-11-09,F,F,Dawn,Wright,13867 Thomas Courts Suite 304,90095,Los Angeles,CA,2019-12-21T22:05:00.482Z,insert,2,2025-06-27T00:14:01Z


#### F3.2 View Updated Row (update_postimage)

In the cell below, use the `table_changes` function to query the most recent update to the **silver_users** table and filter for all updated rows. Use the **_change_type** column to filter for rows with the value *update_postimage*. Run the cell.

Notice that the resulting row count matches the one provided in the previous cell: ('numTargetRowsUpdated'): *809*. 

Scroll to the right of the table and view the columns **_change_type**, **_commit_version** and **_commit_timestamp**.

With CDF, you can easily inspect every updated row.

In [0]:
%sql
SELECT * 
FROM table_changes("silver_users", latest_version)
WHERE _change_type = "update_postimage"
ORDER BY _commit_version

mrn,dob,sex,gender,first_name,last_name,street_address,zip,city,state,updated,_change_type,_commit_version,_commit_timestamp
63729051,1987-01-05,M,M,Ryan,Nelson,2372 James Meadow,91601,North Hollywood,CA,2019-12-21T00:20:33.84Z,update_postimage,2,2025-06-27T00:14:01Z
19613129,1997-10-27,M,M,Adam,Mills,8737 Smith Lodge Suite 329,90263,Malibu,CA,2019-12-21T00:20:14.962Z,update_postimage,2,2025-06-27T00:14:01Z
75805171,1969-12-10,F,F,Yolanda,Perez,2888 Karen Wells Suite 179,91030,South Pasadena,CA,2019-12-21T00:25:38.036Z,update_postimage,2,2025-06-27T00:14:01Z
65740343,1995-08-02,M,M,Austin,Hurley,53642 Jason Court,90701,Artesia,CA,2019-12-20T22:37:15.533Z,update_postimage,2,2025-06-27T00:14:01Z
53225168,1936-12-09,F,F,Mary,Perry,735 Allen Spring,90048,Los Angeles,CA,2019-12-20T22:47:47.062Z,update_postimage,2,2025-06-27T00:14:01Z
54184596,1968-09-16,F,F,Emily,Krause,1712 Kristen Pine,90744,Wilmington,CA,2019-12-20T22:49:52.646Z,update_postimage,2,2025-06-27T00:14:01Z
38821881,1941-09-28,M,M,Frank,Tucker,31338 Moore Tunnel,93553,Pearblossom,CA,2019-12-20T23:38:34.693Z,update_postimage,2,2025-06-27T00:14:01Z
85014229,1950-07-20,F,F,Angela,Allen,201 Douglas Light,91224,La Crescenta,CA,2019-12-20T23:48:37.425Z,update_postimage,2,2025-06-27T00:14:01Z
86741442,2000-03-20,M,M,Robert,Gomez,30951 John Mills Suite 287,91304,Canoga Park,CA,2019-12-20T21:21:17.993Z,update_postimage,2,2025-06-27T00:14:01Z
37087907,1935-03-27,F,F,Teresa,Davenport,473 Brian Spurs,90623,La Palma,CA,2019-12-20T21:13:20.615Z,update_postimage,2,2025-06-27T00:14:01Z


#### F3.3 View Row Prior to Update (update_preimage)

In the cell below, use the `table_changes` function to query the most recent update to the **silver_users** table and filter for all rows prior to the update. Use the **_change_type** column to filter for rows with the value *update_preimage*. Run the cell.

Notice that the resulting row count matches the one provided in the previous cell: ('numTargetRowsUpdated'): *809*. 

Scroll to the right of the table and view the columns **_change_type**, **_commit_version** and **_commit_timestamp**.

With CDF, you can easily inspect every row prior to the update.

In [0]:
%sql
SELECT * 
FROM table_changes("silver_users", latest_version)
WHERE _change_type = "update_preimage"
ORDER BY _commit_version

mrn,dob,sex,gender,first_name,last_name,street_address,zip,city,state,updated,_change_type,_commit_version,_commit_timestamp
63729051,1987-01-05,M,M,Ryan,Nelson,2372 James Meadow,91601,North Hollywood,CA,2010-01-01T00:00:00Z,update_preimage,2,2025-06-27T00:14:01Z
19613129,1997-10-27,M,M,Adam,Mills,8737 Smith Lodge Suite 329,90263,Malibu,CA,2010-01-01T00:00:00Z,update_preimage,2,2025-06-27T00:14:01Z
75805171,1969-12-10,F,F,Yolanda,Perez,2888 Karen Wells Suite 179,91030,South Pasadena,CA,2010-01-01T00:00:00Z,update_preimage,2,2025-06-27T00:14:01Z
65740343,1995-08-02,M,M,Austin,Hurley,105 Estrada Fields Suite 087,90603,Whittier,CA,2010-01-01T00:00:00Z,update_preimage,2,2025-06-27T00:14:01Z
53225168,1936-12-09,F,F,Mary,Perry,735 Allen Spring,90048,Los Angeles,CA,2010-01-01T00:00:00Z,update_preimage,2,2025-06-27T00:14:01Z
54184596,1968-09-16,F,F,Emily,Krause,1712 Kristen Pine,90744,Wilmington,CA,2010-01-01T00:00:00Z,update_preimage,2,2025-06-27T00:14:01Z
38821881,1941-09-28,M,M,Frank,Tucker,31338 Moore Tunnel,93553,Pearblossom,CA,2010-01-01T00:00:00Z,update_preimage,2,2025-06-27T00:14:01Z
85014229,1950-07-20,F,F,Angela,Allen,201 Douglas Light,91224,La Crescenta,CA,2010-01-01T00:00:00Z,update_preimage,2,2025-06-27T00:14:01Z
86741442,2000-03-20,M,M,Robert,Gomez,30951 John Mills Suite 287,91304,Canoga Park,CA,2010-01-01T00:00:00Z,update_preimage,2,2025-06-27T00:14:01Z
37087907,1935-03-27,F,F,Teresa,Davenport,473 Brian Spurs,90623,La Palma,CA,2010-01-01T00:00:00Z,update_preimage,2,2025-06-27T00:14:01Z


#### F3.4 View All Deleted Rows (delete)

In the cell below, use the `table_changes` function to query the most recent update to the **silver_users** table and filter for all rows that were deleted. Use the **_change_type** column to filter for rows with the value *delete*. Run the cell.

Notice that the resulting row count matches the one provided in the previous cell: ('numTargetRowsDeleted'): *0*. 

With CDF, you can easily inspect every deleted row.

**NOTE:** Don't worry! We'll review how to apply deletes and propagate them in the next section.

In [0]:
%sql
SELECT * 
FROM table_changes("silver_users", latest_version)
WHERE _change_type in ("delete")
ORDER BY _commit_version

mrn,dob,sex,gender,first_name,last_name,street_address,zip,city,state,updated,_change_type,_commit_version,_commit_timestamp



## G. Propagating Deletes

While some use cases may require processing deletes alongside updates and inserts, the most important delete requests are those that allow companies to maintain compliance with privacy regulations such as GDPR and CCPA. Most companies have stated SLAs around how long these requests will take to process, but for various reasons, these are often handled in pipelines separate from their core ETL.

This section is focused to Propagate Deletes applied in the **silver_users** to propagate into the **gold_users** table.

**NOTE:** Please follow all regulatory standards for your data.


### G1. Gold Users Table Setup

Let's first create the **gold_users** table, our final production table.

Run the query below to create the **gold_users** table and populate it with the latest snapshot of the **silver_users** table. After running the query, view the results and confirm that the table contains *3,407* records.

In [0]:
%sql
DROP TABLE IF EXISTS gold_users;

-- Create the gold_users table from the silver_users table
CREATE OR REPLACE TABLE gold_users as
SELECT 
     mrn,
     street_address,
     zip,
     city,
     state,
     updated
FROM silver_users;

-- View the gold table
SELECT * 
FROM gold_users;

mrn,street_address,zip,city,state,updated
77688376,3860 Dana Ridges,90095,Los Angeles,CA,2010-01-01T00:00:00Z
95594654,54561 David Parkways,91107,Pasadena,CA,2010-01-01T00:00:00Z
99408191,74870 Brown Ridges Suite 199,91605,North Hollywood,CA,2010-01-01T00:00:00Z
79237635,98494 Martinez Park Suite 587,91606,North Hollywood,CA,2010-01-01T00:00:00Z
31106562,2971 Dawn Highway,90201,Bell Gardens,CA,2010-01-01T00:00:00Z
25374221,025 Hernandez Estate Apt. 429,91202,Glendale,CA,2010-01-01T00:00:00Z
12035337,249 Bruce Freeway,91380,Santa Clarita,CA,2010-01-01T00:00:00Z
31699228,7908 Williams Stravenue,90808,Long Beach,CA,2010-01-01T00:00:00Z
28114036,42873 Laurie Drive,90291,Venice,CA,2010-01-01T00:00:00Z
86217269,442 Trevino Radial,90095,Los Angeles,CA,2010-01-01T00:00:00Z



### G2. Processing Right to Be Forgotten Requests

While it is possible to process deletes at the same time as appends and updates, the fines around right to be forgotten requests may warrant a separate process.



Let's start by leveraging the **user_delete_requests** table. This table holds the **mrn**, and the **request_date** (as of today) for users to be deleted, as provided by the compliance team.

Let's query the table and view the deletion requests. Confirm that the table currently contains 20 requests for deletion.

In [0]:
%sql
SELECT *
FROM user_delete_requests;

mrn,request_date
46609218,2025-06-27T00:13:17.711Z
10481208,2025-06-27T00:13:17.711Z
51962029,2025-06-27T00:13:17.711Z
36170414,2025-06-27T00:13:17.711Z
57985430,2025-06-27T00:13:17.711Z
92891095,2025-06-27T00:13:17.711Z
77708899,2025-06-27T00:13:17.711Z
84079469,2025-06-27T00:13:17.711Z
80880484,2025-06-27T00:13:17.711Z
12267606,2025-06-27T00:13:17.711Z



We’ll create a new read stream from the **user_delete_requests** table to complete the following:
- Add a column named **deadline** to indicate a 30-day period for action, with a default status of *requested*.
- Add a column named **status** and update it from *requested* to *deleted* once the changes are propagated.


Run the query and view the results.

In [0]:
requests_df = (spark.readStream
                    .table("user_delete_requests")
                    .select(
                            "mrn",
                            "request_date",
                            F.date_add("request_date", 30).alias("deadline"),
                            F.lit("requested").alias("status")))


display(requests_df)

mrn,request_date,deadline,status
46609218,2025-06-27T00:13:17.711Z,2025-07-27,requested
10481208,2025-06-27T00:13:17.711Z,2025-07-27,requested
51962029,2025-06-27T00:13:17.711Z,2025-07-27,requested
36170414,2025-06-27T00:13:17.711Z,2025-07-27,requested
57985430,2025-06-27T00:13:17.711Z,2025-07-27,requested
92891095,2025-06-27T00:13:17.711Z,2025-07-27,requested
77708899,2025-06-27T00:13:17.711Z,2025-07-27,requested
84079469,2025-06-27T00:13:17.711Z,2025-07-27,requested
80880484,2025-06-27T00:13:17.711Z,2025-07-27,requested
12267606,2025-06-27T00:13:17.711Z,2025-07-27,requested


### G3. Adding Commit Messages in History

Delta Lake supports arbitrary commit messages, which are recorded in the Delta transaction log and viewable in the table history. This feature can assist with auditing.

Setting a global commit message with SQL will ensure it is used for all subsequent operations in the notebook.

For more information, refer to the [Enrich Delta Lake tables with custom metadata](https://docs.databricks.com/en/delta/custom-metadata.html#enrich-delta-lake-tables-with-custom-metadata) documentation.


In [0]:
%sql
SET spark.databricks.delta.commitInfo.userMetadata=Deletes committed

key,value
spark.databricks.delta.commitInfo.userMetadata,Deletes committed





With DataFrames, commit messages can also be specified as part of the write options using `option("userMetadata","comment")`.

In this section, we will create a new streaming table named **delete_requests** to indicate that we are manually processing these requests in the notebook, rather than using an automated job.


In [0]:
query = (requests_df
         .writeStream
            .outputMode("append")        # Append mode to add new rows to the output table
            .option("checkpointLocation", f"{DA.paths.checkpoints}/delete_requests")  # Specify checkpoint location
            .option("userMetadata", "Requests processed interactively")  # Add user metadata
            .trigger(availableNow=True)  # Trigger the query to process all available data now
            .table("delete_requests"))   # Write the output to the delete_requests table


query.awaitTermination()  # Wait for the streaming query to finish

View the history of the **delete_requests** table. Notice that the **operation** column messages clearly indicate *CREATE TABLE* and *STREAMING UPDATE* in the table history.

Scroll to the right of the table. Notice the following:
- that the **userMetadata** column contains the note metadata note *Requests processed interactively* that we set above for the stream.
- the initial *CREATE TABLE* contains the user metadata *Deletes committed* for the creation of the table. 


In [0]:
%sql
DESCRIBE HISTORY delete_requests

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2025-06-27T00:14:23Z,3304343146826341,labuser10747597_1750981120@vocareum.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> ead0979a-1203-4eb1-ba45-ef9ddf1b94fe, epochId -> 0, statsOnLoad -> false)",,List(3913647707152806),0626-233939-2sqbphck,0.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 20, numOutputBytes -> 1634, numAddedFiles -> 1)",Requests processed interactively,Databricks-Runtime/15.4.x-scala2.12
0,2025-06-27T00:14:19Z,3304343146826341,labuser10747597_1750981120@vocareum.com,CREATE TABLE,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,List(3913647707152806),0626-233939-2sqbphck,,WriteSerializable,True,Map(),Deletes committed,Databricks-Runtime/15.4.x-scala2.12





### G4. Processing Delete Requests

The **delete_requests** table will be used to track users' requests to be forgotten. 

It is possible to process delete requests alongside inserts and updates to existing data as part of a normal **`MERGE`** statement.

Because PII exists in several places through the current lakehouse, tracking requests and processing them asynchronously may provide better performance for production jobs with low latency SLAs. The approach modeled here also indicates the time at which the delete was requested and the deadline, and provides a field to indicate the current processing status of the request.

Query the new **delete_requests** table and view the results. Notice that the deletion requests are active, with a **status** is *requested*.


In [0]:
%sql
SELECT * 
FROM delete_requests;

mrn,request_date,deadline,status
46609218,2025-06-27T00:13:17.711Z,2025-07-27,requested
10481208,2025-06-27T00:13:17.711Z,2025-07-27,requested
51962029,2025-06-27T00:13:17.711Z,2025-07-27,requested
36170414,2025-06-27T00:13:17.711Z,2025-07-27,requested
57985430,2025-06-27T00:13:17.711Z,2025-07-27,requested
92891095,2025-06-27T00:13:17.711Z,2025-07-27,requested
77708899,2025-06-27T00:13:17.711Z,2025-07-27,requested
84079469,2025-06-27T00:13:17.711Z,2025-07-27,requested
80880484,2025-06-27T00:13:17.711Z,2025-07-27,requested
12267606,2025-06-27T00:13:17.711Z,2025-07-27,requested


### G5. Check the Records to Delete

When working with static data, committing deletes is straightforward. Run the cell below to preview the records to delete from the **silver_users** table. Notice that the **silver_users** table contains all users who have requested deletion.

In [0]:
%sql
SELECT *
FROM silver_users
WHERE mrn IN (SELECT mrn FROM delete_requests)

mrn,dob,sex,gender,first_name,last_name,street_address,zip,city,state,updated
10481208,1964-05-27,M,M,Randall,Thomas,343 House Way,90630,Cypress,CA,2010-01-01T00:00:00Z
84079469,1965-12-18,M,M,Scott,Hawkins,33350 Jimenez Turnpike Apt. 706,91101,Pasadena,CA,2010-01-01T00:00:00Z
77708899,1975-01-08,M,M,Mark,Wilson,323 Morales Port,90606,Whittier,CA,2010-01-01T00:00:00Z
16352917,1932-08-07,F,F,Angela,Gallagher,65790 Samantha Squares,91204,Glendale,CA,2010-01-01T00:00:00Z
36170414,1985-10-10,M,M,Chad,Rodriguez,93017 Sherry Vista,91006,Arcadia,CA,2010-01-01T00:00:00Z
48952287,1939-09-27,M,M,George,Jones,271 Charles Dale Apt. 102,90245,El Segundo,CA,2010-01-01T00:00:00Z
60252784,1995-03-04,M,M,Roy,Zuniga,39980 Kemp Cliffs,93510,Acton,CA,2010-01-01T00:00:00Z
39141616,1931-08-20,M,M,Justin,Campbell,38106 Morales Manor,91364,Woodland Hills,CA,2010-01-01T00:00:00Z
97829250,1985-01-30,F,F,Rose,Brown,6612 Jessica Ridges Apt. 406,90631,La Habra,CA,2010-01-01T00:00:00Z
39313172,1979-08-06,M,M,John,Robertson,10369 Austin Dale,91396,Winnetka,CA,2010-01-01T00:00:00Z



### G6. Committing Deletes into Silver Users

The following cell deletes records from the **silver_users** table by rewriting all data files containing records affected by the `DELETE` statement. 

**NOTE:** Recall that with Delta Lake, deleting data will create new data files rather than deleting existing data files.

In [0]:
%sql
DELETE FROM silver_users
WHERE mrn IN (SELECT mrn FROM delete_requests)

num_affected_rows
20


Describe the history of the **silver_users** table and confirm *4* versions of the table exists. The latest version contains the *DELETE* value in the **operation** column.

In [0]:
%sql
DESCRIBE HISTORY silver_users;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
4,2025-06-27T00:14:30Z,3304343146826341,labuser10747597_1750981120@vocareum.com,DELETE,"Map(predicate -> [""mrn#25333L IN (list#25321 [])""])",,List(3913647707152806),0626-233939-2sqbphck,3,WriteSerializable,False,"Map(numRemovedFiles -> 3, numRemovedBytes -> 167980, numCopiedRows -> 3387, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 3, executionTimeMs -> 2002, numDeletionVectorsUpdated -> 0, numDeletedRows -> 20, scanTimeMs -> 1039, numAddedFiles -> 3, numAddedBytes -> 167525, rewriteTimeMs -> 927)",Deletes committed,Databricks-Runtime/15.4.x-scala2.12
3,2025-06-27T00:14:10Z,3304343146826341,labuser10747597_1750981120@vocareum.com,MERGE,"Map(predicate -> [""(mrn#20212L = mrn#16814L)""], matchedPredicates -> [{""predicate"":""((((NOT (dob#20213 = dob#16815) OR NOT (sex#20214 = sex#16816)) OR NOT (gender#20215 = gender#16817)) OR (NOT (first_name#20216 = first_name#16818) OR NOT (last_name#20217 = last_name#16819))) OR (((NOT (street_address#20218 = street_address#16820) OR NOT (zip#20219L = zip#16821L)) OR NOT (city#20220 = city#16822)) OR (NOT (state#20221 = state#16823) OR NOT (updated#20222 = updated#16824))))"",""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(3913647707152806),0626-233939-2sqbphck,2,WriteSerializable,False,"Map(numTargetRowsCopied -> 1622, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 120371, numTargetBytesRemoved -> 93568, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 809, executionTimeMs -> 5054, materializeSourceTimeMs -> 292, numTargetRowsInserted -> 147, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 2269, numTargetRowsUpdated -> 809, numOutputRows -> 2578, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 1, numSourceRows -> 956, numTargetFilesRemoved -> 1, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 2382)",,Databricks-Runtime/15.4.x-scala2.12
2,2025-06-27T00:14:01Z,3304343146826341,labuser10747597_1750981120@vocareum.com,MERGE,"Map(predicate -> [""(mrn#17335L = mrn#16814L)""], matchedPredicates -> [{""predicate"":""((((NOT (dob#17336 = dob#16815) OR NOT (sex#17337 = sex#16816)) OR NOT (gender#17338 = gender#16817)) OR (NOT (first_name#17339 = first_name#16818) OR NOT (last_name#17340 = last_name#16819))) OR (((NOT (street_address#17341 = street_address#16820) OR NOT (zip#17342L = zip#16821L)) OR NOT (city#17343 = city#16822)) OR (NOT (state#17344 = state#16823) OR NOT (updated#17345 = updated#16824))))"",""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(3913647707152806),0626-233939-2sqbphck,1,WriteSerializable,False,"Map(numTargetRowsCopied -> 2431, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 141177, numTargetBytesRemoved -> 117357, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 701, executionTimeMs -> 8814, materializeSourceTimeMs -> 295, numTargetRowsInserted -> 128, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 2415, numTargetRowsUpdated -> 701, numOutputRows -> 3260, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 1, numSourceRows -> 829, numTargetFilesRemoved -> 1, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 5852)",,Databricks-Runtime/15.4.x-scala2.12
1,2025-06-27T00:13:42Z,3304343146826341,labuser10747597_1750981120@vocareum.com,SET TBLPROPERTIES,"Map(properties -> {""delta.enableChangeDataFeed"":""true""})",,List(3913647707152806),0626-233939-2sqbphck,0,WriteSerializable,True,Map(),,Databricks-Runtime/15.4.x-scala2.12
0,2025-06-27T00:13:37Z,3304343146826341,labuser10747597_1750981120@vocareum.com,CLONE,"Map(source -> delta.`dbfs:/Volumes/dbacademy_gym_data/v01/pii/silver`, sourceVersion -> 0, isShallow -> false)",,List(3913647707152806),0626-233939-2sqbphck,-1,Serializable,False,"Map(removedFilesSize -> 0, numRemovedFiles -> 0, sourceTableSize -> 117357, numCopiedFiles -> 1, copiedFilesSize -> 117357, sourceNumOfFiles -> 1)",,Databricks-Runtime/15.4.x-scala2.12


### G7. Collect Deleted Silver Users to Propagate with CDF

The code below configures an incremental read of all changes committed to the **silver_users** table starting at version *4*, the delete operation.

Run the cell and view the results. Scroll to the right of the table and notice that in this version *20* rows were deleted from the **silver_users** table. You can view the exact rows that were deleted with CDF.


In [0]:
deleteDF = (spark.readStream
                 .format("delta")
                 .option("readChangeFeed", "true")
                 .option("startingVersion", 4)     # Start_version 4 where the delete operation occurred
                 .table("silver_users"))


display(deleteDF)

mrn,dob,sex,gender,first_name,last_name,street_address,zip,city,state,updated,_change_type,_commit_version,_commit_timestamp
10481208,1964-05-27,M,M,Randall,Thomas,343 House Way,90630,Cypress,CA,2010-01-01T00:00:00Z,delete,4,2025-06-27T00:14:30Z
84079469,1965-12-18,M,M,Scott,Hawkins,33350 Jimenez Turnpike Apt. 706,91101,Pasadena,CA,2010-01-01T00:00:00Z,delete,4,2025-06-27T00:14:30Z
77708899,1975-01-08,M,M,Mark,Wilson,323 Morales Port,90606,Whittier,CA,2010-01-01T00:00:00Z,delete,4,2025-06-27T00:14:30Z
16352917,1932-08-07,F,F,Angela,Gallagher,65790 Samantha Squares,91204,Glendale,CA,2010-01-01T00:00:00Z,delete,4,2025-06-27T00:14:30Z
36170414,1985-10-10,M,M,Chad,Rodriguez,93017 Sherry Vista,91006,Arcadia,CA,2010-01-01T00:00:00Z,delete,4,2025-06-27T00:14:30Z
48952287,1939-09-27,M,M,George,Jones,271 Charles Dale Apt. 102,90245,El Segundo,CA,2010-01-01T00:00:00Z,delete,4,2025-06-27T00:14:30Z
60252784,1995-03-04,M,M,Roy,Zuniga,39980 Kemp Cliffs,93510,Acton,CA,2010-01-01T00:00:00Z,delete,4,2025-06-27T00:14:30Z
39141616,1931-08-20,M,M,Justin,Campbell,38106 Morales Manor,91364,Woodland Hills,CA,2010-01-01T00:00:00Z,delete,4,2025-06-27T00:14:30Z
97829250,1985-01-30,F,F,Rose,Brown,6612 Jessica Ridges Apt. 406,90631,La Habra,CA,2010-01-01T00:00:00Z,delete,4,2025-06-27T00:14:30Z
39313172,1979-08-06,M,M,John,Robertson,10369 Austin Dale,91396,Winnetka,CA,2010-01-01T00:00:00Z,delete,4,2025-06-27T00:14:30Z


### G8. Function to Propagate Deletes

The relationships between our natural keys (**mrn**) are stored in the **silver_users** table. These keys allow us to link a user's data across various pipelines and sources. The Change Data Feed (CDF) from this table will retain all these fields, enabling successful identification of records to be deleted or modified in downstream tables. This approach can be expanded to use hashed values or other relevant keys.

The function below demonstrates how to commit deletes to two tables using different keys and syntax. Note that, in this case, the `MERGE INTO` syntax is not necessarily the only method to process deletes to the **gold_users** table. However, this code block demonstrates the basic syntax that could be extended if inserts and updates were processed alongside deletes in the same operation.

Assuming successful completion of these two table modifications, an update will be processed back to the **delete_requests** table as well. 

The code below completes the following for each batch:
- Deletes rows in the **gold_users** table that have been requested.
- Updates the status of the requested deletes in the **delete_requests** table.


In [0]:
def process_deletes(microBatchDF, batchId):
    
    (microBatchDF
        .createOrReplaceTempView("deletes"))
    
    microBatchDF._jdf.sparkSession().sql("""
        MERGE INTO gold_users u
        USING deletes d
        ON u.mrn = d.mrn
        WHEN MATCHED
            THEN DELETE
    """)

    
    microBatchDF._jdf.sparkSession().sql("""
        MERGE INTO delete_requests dr
        USING deletes d
        ON d.mrn = dr.mrn
        WHEN MATCHED
          THEN UPDATE SET status = "deleted"
    """)


### G9. Propagate Changes into Gold Users and Delete Requests

Recall that this workload is driven by incremental changes to the **silver_users** table (tracked through the Change Data Feed).

Executing the following cell will propagate deletes from a single table to multiple tables throughout the lakehouse.


In [0]:
query = (deleteDF.writeStream
                 .foreachBatch(process_deletes)
                 .outputMode("update")
                 .option("checkpointLocation", f"{DA.paths.checkpoints}/deletes")
                 .trigger(availableNow=True)
                 .start())

query.awaitTermination()


### G10. Review Delete Commits
Notice the **status** for the records in the **delete_requests** table are now updated to *deleted*.

In [0]:
%sql
SELECT * 
FROM delete_requests;

mrn,request_date,deadline,status
46609218,2025-06-27T00:13:17.711Z,2025-07-27,deleted
10481208,2025-06-27T00:13:17.711Z,2025-07-27,deleted
51962029,2025-06-27T00:13:17.711Z,2025-07-27,deleted
36170414,2025-06-27T00:13:17.711Z,2025-07-27,deleted
57985430,2025-06-27T00:13:17.711Z,2025-07-27,deleted
92891095,2025-06-27T00:13:17.711Z,2025-07-27,deleted
77708899,2025-06-27T00:13:17.711Z,2025-07-27,deleted
84079469,2025-06-27T00:13:17.711Z,2025-07-27,deleted
80880484,2025-06-27T00:13:17.711Z,2025-07-27,deleted
12267606,2025-06-27T00:13:17.711Z,2025-07-27,deleted



Describe the history of the **gold_users** table.

Notice that in the latest version that our commit message will be in the far right column of our history, under the column **userMetadata**.

For the **gold_users** table, the **operation** column in the history will indicate a merge because of the chosen syntax, even though only deletes were committed. The number of deleted rows can be reviewed in the **operationMetrics** in the key *numTargetRowsDeleted*.

In [0]:
%sql
DESCRIBE HISTORY gold_users;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2025-06-27T00:14:37Z,3304343146826341,labuser10747597_1750981120@vocareum.com,MERGE,"Map(predicate -> [""(mrn#27042L = mrn#26514L)""], matchedPredicates -> [{""actionType"":""delete""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [])",,List(3913647707152806),0626-233939-2sqbphck,0.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 20, numTargetFilesAdded -> 0, numTargetBytesAdded -> 0, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 3, numTargetRowsMatchedUpdated -> 0, executionTimeMs -> 2154, materializeSourceTimeMs -> 261, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 20, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 1045, numTargetRowsUpdated -> 0, numOutputRows -> 0, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 20, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 710)",Deletes committed,Databricks-Runtime/15.4.x-scala2.12
0,2025-06-27T00:14:13Z,3304343146826341,labuser10747597_1750981120@vocareum.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> false)",,List(3913647707152806),0626-233939-2sqbphck,,WriteSerializable,False,"Map(numFiles -> 3, numOutputRows -> 3407, numOutputBytes -> 121220)",,Databricks-Runtime/15.4.x-scala2.12


Count the number of rows in the current **gold_users** table. Confirm that the table now has *3,387* rows.

In [0]:
%sql
SELECT count(*) AS TotalRows
FROM gold_users;

TotalRows
3387


### G11. Are Deletes Fully Committed?

Not exactly.

Due to how Delta Lake's history and CDF features are implemented, deleted values are still present in older versions of the data.

Run the query below to count the number of records from version 0 of the **gold_users** table. The results show that the original table has 3,407 rows, which includes the deleted rows.

With Delta tables, you can still view the original data in an earlier version of the table.


In [0]:
%sql
SELECT count(*) AS TotalRows
FROM gold_users VERSION AS OF 0;

TotalRows
3407


For more information check out [GDPR and CCPA compliance with Delta Lake](https://docs.databricks.com/en/security/privacy/gdpr-delta.html#how-delta-lake-simplifies-point-deletes) and the [VACUUM](https://docs.databricks.com/en/sql/language-manual/delta-vacuum.html) statement.


## H. Stop Active Streams
Make sure to run the following cell to stop all active streams. Be careful when using streaming. If you do not stop an active stream, the cluster will continuously run. We are done with streaming data in this demonstration.

In [0]:
for stream in spark.streams.active:
    stream.stop()
    stream.awaitTermination()


&copy; 2025 Databricks, Inc. All rights reserved. Apache, Apache Spark, Spark, the Spark Logo, Apache Iceberg, Iceberg, and the Apache Iceberg logo are trademarks of the <a href="https://www.apache.org/" target="blank">Apache Software Foundation</a>.<br/>
<br/><a href="https://databricks.com/privacy-policy" target="blank">Privacy Policy</a> | 
<a href="https://databricks.com/terms-of-use" target="blank">Terms of Use</a> | 
<a href="https://help.databricks.com/" target="blank">Support</a>
