# Setup

In [15]:
import pyspark
from delta import *
from pyspark.sql import functions as F

builder = (
    pyspark.sql.SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [108]:
spark.sql("drop table if exists student")
spark.sql("drop table if exists existing_students")
spark.sql("drop table if exists students")

DataFrame[]

# How to create a Delta Table with Change Data Feed enabled

In [109]:
spark.sql("""

CREATE TABLE students (id LONG, name STRING, age LONG)
USING delta 
TBLPROPERTIES (delta.enableChangeDataFeed = true)

""")

DataFrame[]

# How to enable Change Data Feed on an existing Delta table

In [110]:
spark.sql("""

CREATE TABLE existing_students (id LONG, name STRING, age LONG)
USING delta

""")

DataFrame[]

In [111]:
spark.sql("""

ALTER TABLE existing_students SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

""")

DataFrame[]

# How to read a Delta Lake Change Data Feed

In [112]:
# First change, append some data

spark.sql("""
    INSERT INTO students VALUES (0,"Matt", 20), (1, "Jim", 25), (2, "Nick", 30)
""")

DataFrame[]

In [113]:
# Second change, delete some data

spark.sql("""
    DELETE FROM students WHERE id = 2
""")

DataFrame[num_affected_rows: bigint]

In [114]:
# Third change, delete some data

spark.sql("""
    UPDATE students SET age = 30 WHERE id = 1
""")

DataFrame[num_affected_rows: bigint]

## Read Change Data Feed in batches

In [115]:
spark.sql("""
    SELECT * FROM table_changes('students', 0, 1)
""").show()

+---+----+---+------------+---------------+--------------------+
| id|name|age|_change_type|_commit_version|   _commit_timestamp|
+---+----+---+------------+---------------+--------------------+
|  0|Matt| 20|      insert|              1|2023-04-19 11:28:...|
|  2|Nick| 30|      insert|              1|2023-04-19 11:28:...|
|  1| Jim| 25|      insert|              1|2023-04-19 11:28:...|
+---+----+---+------------+---------------+--------------------+



In [116]:
spark.sql("""
    SELECT * FROM table_changes('students', 2, 2)
""").show()

+---+----+---+------------+---------------+--------------------+
| id|name|age|_change_type|_commit_version|   _commit_timestamp|
+---+----+---+------------+---------------+--------------------+
|  2|Nick| 30|      delete|              2|2023-04-19 11:28:...|
+---+----+---+------------+---------------+--------------------+



In [117]:
spark.sql("""
    SELECT * FROM table_changes('students', 3, 3)
""").show()

+---+----+---+----------------+---------------+--------------------+
| id|name|age|    _change_type|_commit_version|   _commit_timestamp|
+---+----+---+----------------+---------------+--------------------+
|  1| Jim| 25| update_preimage|              3|2023-04-19 11:28:...|
|  1| Jim| 30|update_postimage|              3|2023-04-19 11:28:...|
+---+----+---+----------------+---------------+--------------------+



In [118]:
spark.sql("""
    SELECT * FROM table_changes('students', 0)
""").show()

+---+----+---+----------------+---------------+--------------------+
| id|name|age|    _change_type|_commit_version|   _commit_timestamp|
+---+----+---+----------------+---------------+--------------------+
|  1| Jim| 25| update_preimage|              3|2023-04-19 11:28:...|
|  1| Jim| 30|update_postimage|              3|2023-04-19 11:28:...|
|  2|Nick| 30|          delete|              2|2023-04-19 11:28:...|
|  0|Matt| 20|          insert|              1|2023-04-19 11:28:...|
|  2|Nick| 30|          insert|              1|2023-04-19 11:28:...|
|  1| Jim| 25|          insert|              1|2023-04-19 11:28:...|
+---+----+---+----------------+---------------+--------------------+



In [83]:
# you can also use table_changes_by_path to read using a path instead of a table name

In [102]:
# get the path of the students table for example purposes
path = spark.sql("DESCRIBE EXTENDED STUDENTS") \
     .where("col_name = 'Location'") \
     .collect()[0].data_type

In [103]:
spark.sql("""
    SELECT * FROM table_changes_by_path('{}', 0)
""".format(path)).show()

+---+-----+---+----------------+---------------+--------------------+
| id| name|age|    _change_type|_commit_version|   _commit_timestamp|
+---+-----+---+----------------+---------------+--------------------+
|  1|  Jim| 25| update_preimage|              2|2023-04-19 09:01:...|
|  1|  Jim| 30|update_postimage|              2|2023-04-19 09:01:...|
|  2| Nick| 30|          delete|              2|2023-04-19 09:01:...|
|  3|Denny| 35|          insert|              2|2023-04-19 09:01:...|
|  0| Matt| 20|          insert|              1|2023-04-19 09:01:...|
|  2| Nick| 30|          insert|              1|2023-04-19 09:01:...|
|  1|  Jim| 25|          insert|              1|2023-04-19 09:01:...|
+---+-----+---+----------------+---------------+--------------------+



## Read Change Data Feed as a stream

In [107]:
# Starting version only
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("students")

# Starting timestamp and path based
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2023-04-19 05:35:43") \
  .load(path)

# Starting from the latest snapshot (no version or timestamp provided)
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("students")

DataFrame[id: bigint, name: string, age: bigint, _change_type: string, _commit_version: bigint, _commit_timestamp: timestamp]

# How to apply change data downstream

In [119]:
spark.sql("""
    SELECT * FROM table_changes_by_path('{}', 0)
""".format(path)).show()

+---+----+---+----------------+---------------+--------------------+
| id|name|age|    _change_type|_commit_version|   _commit_timestamp|
+---+----+---+----------------+---------------+--------------------+
|  1| Jim| 25| update_preimage|              3|2023-04-19 11:28:...|
|  1| Jim| 30|update_postimage|              3|2023-04-19 11:28:...|
|  2|Nick| 30|          delete|              2|2023-04-19 11:28:...|
|  0|Matt| 20|          insert|              1|2023-04-19 11:28:...|
|  2|Nick| 30|          insert|              1|2023-04-19 11:28:...|
|  1| Jim| 25|          insert|              1|2023-04-19 11:28:...|
+---+----+---+----------------+---------------+--------------------+



In [74]:
spark.sql("""
    CREATE OR REPLACE TEMP VIEW changes AS
    SELECT cast(col1 as LONG) as id, 
           col2 as name, 
           cast(col3 as LONG) as age,
           col4 as change_type 
    FROM VALUES (3,"Denny", 35, "INSERT"), (2, "Nick", 30, "DELETE"), (1, "Jim", 30, "UPDATE")
""")

DataFrame[]

In [75]:
spark.sql("""

MERGE INTO students t
USING changes s
ON t.id = s.id
WHEN MATCHED AND s.change_type = "UPDATE" 
    THEN UPDATE SET name = s.name, age = s.age
WHEN MATCHED AND s.change_type = "DELETE"
    THEN DELETE
WHEN NOT MATCHED AND s.change_type = "INSERT"
    THEN INSERT (id, name, age) VALUES (s.id, s.name, s.age)

""").show()

23/04/19 09:01:08 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
23/04/19 09:01:08 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
23/04/19 09:01:08 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                3|               1|               1|                1|
+-----------------+----------------+----------------+-----------------+

