
# Delta Lake Change Data Feed
<img src="https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-logo-whitebackground.png" style="width:200px; float: right"/>

Delta Lake is an open format and can be read using multiple engine or with standalone libraries (java, python, rust)...

It's then easy to subscribe to modifications stream on one of your table to propagage the changes downstream in a medaillon architecture.

See the [documentation](https://docs.databricks.com/delta/delta-change-data-feed.html) for more details.

<!-- Collect usage data (view). Remove it to disable collection. View README for more details.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=data-engineering&org_id=2940815195285158&notebook=%2F04-Delta-Lake-CDF&demo_name=delta-lake&event=VIEW&path=%2F_dbdemos%2Fdata-engineering%2Fdelta-lake%2F04-Delta-Lake-CDF&version=1">
<!-- [metadata={"description":"Quick introduction to Delta Lake. <br/><i>Use this content for quick Delta demo.</i>",
 "authors":["quentin.ambard@databricks.com"],
 "db_resources":{}}] -->

In [0]:
%run ./_resources/00-setup $reset_all_data=true

## CDF for Data Mesh & Delta Sharing

<img src="https://github.com/QuentinAmbard/databricks-demo/raw/main/retail/resources/images/delta-cdf-datamesh.png" style="float:right; margin-right: 50px" width="300px" />

When sharing data within a Datamesh and/or to external organization with Delta Sharing, you not only need to share existing data, but also all modifications, so that your consumer can capture apply the same changes.

CDF makes **Data Mesh** implementation easier. Once enabled by an organisation, data can be shared with other. It's then easy to subscribe to the modification stream and propagage GDPR DELETE downstream.

To do so, we need to make sure the CDF are enabled at the table level. Once enabled, it'll capture all the table modifications using the `table_changes` function.

For more details, visit the [CDF documentation](https://docs.databricks.com/delta/delta-change-data-feed.html)

### Try Out CDF :
- Enable CDF
- Do Sample changes
- Query Delta History to ensure changes went through
- Query older version
- Check for CDF tracking columns

In [0]:
ALTER TABLE user_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

#### Delta CDF table_changes output
In addition to the row details, `table_changes` provides back 4 cdc types in the "_change_type" column:

| CDC Type             | Description                                                               |
|----------------------|---------------------------------------------------------------------------|
| **update_preimage**  | Content of the row before an update                                       |
| **update_postimage** | Content of the row after the update (what you want to capture downstream) |
| **delete**           | Content of a row that has been deleted                                    |
| **insert**           | Content of a new row that has been inserted                               |

Let's query the changes of the Delta Version 12 which should be our MERGE operation (you can run a `DESCRIBE HISTORY user_data_bronze` to see the version numbers).

As you can see 1 row has been UPDATED (we get the old and new value), 1 DELETED and one INSERTED.

In [0]:
-- Make sure you run the first notebook to load all the data.
UPDATE user_delta SET firstname = 'John' WHERE ID < 10;
DELETE FROM user_delta WHERE ID > 1000;

In [0]:
DESCRIBE HISTORY user_delta;

In [0]:
select * from table_changes("user_delta", 118);

In [0]:
select distinct(_change_type) from table_changes("user_delta", 118)

## Using CDF to capture incremental change (stream)

To capture the last changes from your table, you can leverage Spark Streaming API. 

It's then easy to subscribe to modifications stream on one of your table to propagage GDPR DELETE downstream

In [0]:
%python
stream = spark.readStream.format("delta") \
              .option("readChangeFeed", "true") \
              .option("startingVersion", 118) \
              .table("user_delta")


display(stream.select("_change_type", "_commit_version", "_commit_timestamp", "id", "firstname", "email"), checkpointLocation = get_chkp_folder(folder))

## Using CDF for CDC on source table, and MERGE incrementally on target


In [0]:
CREATE table user_delta_silver as select * from user_delta

In [0]:
select * from user_delta_silver

In [0]:
-- Make sure you run the first notebook to load all the data.
UPDATE user_delta SET firstname = 'John_cdc' WHERE ID < 10;

In [0]:
select * from user_delta WHERE ID < 10;

In [0]:
describe history user_delta

In [0]:
%python
cdf_changes = spark.read.format("delta") \
  .option("readChangeData", "true") \
  .option("startingVersion", 121) \
  .table("user_delta")

cdf_changes.createOrReplaceTempView("source_cdf_changes")
display(cdf_changes)

In [0]:
MERGE INTO user_delta_silver AS target
USING (
  SELECT * FROM source_cdf_changes
  WHERE _change_type IN ('update_postimage', 'insert')
) AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

In [0]:
select * from user_delta_silver WHERE ID < 10;

In [0]:
%python
time.sleep(30)
DBDemos.stop_all_streams()

## Easier CDF with Spark Declarative Pipelines APPLY CHANGES

Delta Lake CDF is a low level API. To implement simple CDC pipeline using pure SQL (including SCDT2 tables), you can leverage the Spark Declarative Pipelines engine! See the [documentation](https://docs.databricks.com/workflows/delta-live-tables/delta-live-tables-cdc.html) for more details.

That's it, we covered the main capabilities provided by Delta Lake.

If you want to know more about the technical implementation, you can have a look to the [internal structure of Delta Lake]($./05-Advanced-Delta-Lake-Internal) (optional) or [go back to the Introduction]($./00-Delta-Lake-Introduction).