# Import library

In [1]:
import pyspark

from pyspark.sql import SparkSession
from delta import *
from delta.tables import DeltaTable
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
builder = (
    pyspark
    .sql.SparkSession.builder
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    # Logging
    # .config("spark.log.level", "DEBUG")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()
sc = spark.sparkContext # spark context

:: loading settings :: url = jar:file:/home/ad/Desktop/code/big-data-tools/venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ad/.ivy2/cache
The jars for the packages stored in: /home/ad/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4f658f08-7cee-4a1c-b47f-db2bddf078cd;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.3.0 in central
	found io.delta#delta-storage;3.3.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 48ms :: artifacts dl 2ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.3.0 from central in [default]
	io.delta#delta-storage;3.3.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0  

# Create test data

In [3]:
test_df = spark.createDataFrame([
    (124, "Raul",     "Oaxaca",      "INSERT", 1),
    (123, "Isabel",   "Monterrey",   "INSERT", 1),
    (125, "Mercedes", "Tijuana",     "INSERT", 2),
    (126, "Lily",     "Cancun",      "INSERT", 2),
    (123, None,       None,          "DELETE", 6),
    (125, "Mercedes", "Guadalajara", "UPDATE", 6),
    (125, "Mercedes", "Mexicali",    "UPDATE", 5),
    (123, "Isabel",   "Chihuahua",   "UPDATE", 5),
    # (125, None,       None,          "DELETE", 7), # test
], schema="userId int, name string, city string, operation string, sequenceNum int")

# add columns
test_df = (
    test_df
    .withColumn("ts_ms", add_months(to_date(lit("2025-01-01")), col("sequenceNum")))
    .withColumn("record_track_hash", hash("userId", "name", "city"))
)

test_df.show()

+------+--------+-----------+---------+-----------+----------+-----------------+
|userId|    name|       city|operation|sequenceNum|     ts_ms|record_track_hash|
+------+--------+-----------+---------+-----------+----------+-----------------+
|   124|    Raul|     Oaxaca|   INSERT|          1|2025-02-01|       -357915523|
|   123|  Isabel|  Monterrey|   INSERT|          1|2025-02-01|       -883305503|
|   125|Mercedes|    Tijuana|   INSERT|          2|2025-03-01|       1432383275|
|   126|    Lily|     Cancun|   INSERT|          2|2025-03-01|       1891951372|
|   123|    NULL|       NULL|   DELETE|          6|2025-07-01|         85273170|
|   125|Mercedes|Guadalajara|   UPDATE|          6|2025-07-01|      -1185033760|
|   125|Mercedes|   Mexicali|   UPDATE|          5|2025-06-01|       1041365571|
|   123|  Isabel|  Chihuahua|   UPDATE|          5|2025-06-01|       -269450623|
+------+--------+-----------+---------+-----------+----------+-----------------+



# Create Delta Table

In [4]:
table_path = '/home/ad/Desktop/code/big-data-tools/notebooks/spark-warehouse/cdc_data'
cdc_data = (
    DeltaTable
    .createOrReplace(spark)
    # .createIfNotExists(spark)
    # .tableName("default.cdc_data")
    .addColumn("userId", dataType=IntegerType())
    .addColumn("name", dataType=CharType(30))
    .addColumn("city", dataType=CharType(30))
    .addColumn("sequenceNum", dataType=IntegerType())
    .addColumn("record_active_flag", dataType=BooleanType())
    .addColumn("record_start_date", dataType=DateType())
    .addColumn("record_end_date", dataType=DateType())
    .addColumn("record_track_hash", dataType=IntegerType())
    .location(table_path)
    # enable CDF
    .property('delta.enableChangeDataFeed', 'true')
    .execute()
)

# clean data
spark.sql(f"DELETE FROM delta.`{table_path}` WHERE userId > 0;")

cdc_data.toDF().show()

25/05/17 16:47:38 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+------+----+----+-----------+------------------+-----------------+---------------+-----------------+
|userId|name|city|sequenceNum|record_active_flag|record_start_date|record_end_date|record_track_hash|
+------+----+----+-----------+------------------+-----------------+---------------+-----------------+
+------+----+----+-----------+------------------+-----------------+---------------+-----------------+



# Apply changes from CDC to Delta Lake

In [None]:
from pyspark.sql import Window

def apply_changes(
    dt,
    event_df,
    retryable_event_df=None, # out-of-order events from previous batches
    record_keys=['userId'],
    sequence_by=['sequenceNum']
):
    # TODO: union incoming events and out-of-order events

    # get inserted rows (operation = INSERT)
    # run insert command
    (
        dt.alias("target")
        .merge(
            event_df.alias("source"), "target.userId = source.userId"
        )
        .whenNotMatchedInsert(
            condition="operation = 'INSERT'",
            values={
                "target.userId": "source.userId",
                "target.name": "source.name",
                "target.city": "source.city",
                "target.sequenceNum": "source.sequenceNum",
                "target.record_active_flag": lit(True),
                "target.record_start_date": "source.ts_ms",
                "target.record_end_date": lit("9999-01-01"),
                "target.record_track_hash": "source.record_track_hash"
            }
        )
        .execute()
    )

    # get updated rows (operation = UPDATE)
    update_df = (
        event_df
        .filter("operation = 'UPDATE'")
        .sort(asc("sequenceNum"))
    )

    # join with table to get matched ids
    matched_df = (
        update_df.alias("m")
        .join(dt.toDF(), how='left_anti', on=['userId', 'record_track_hash'])
        # .select("userId", "ts_ms", "m.record_track_hash")
        .dropDuplicates(["userId"])
    )
    if not matched_df.isEmpty():
        # update tracking columns for old records (active_flag, end_date)
        (
            dt.alias("target")
            .merge(
                matched_df.alias("source"), "target.userId = source.userId"
            )
            .whenMatchedUpdate(
                condition="target.record_active_flag = true AND target.record_track_hash != source.record_track_hash",
                set={
                    "target.record_active_flag": lit(False),
                    # "target.record_end_date": "source.ts_ms"
                }
            )
            .execute()
        )

    if not update_df.isEmpty():
        # apply window function to set tracking columns for new records (active_flag, end_date)
        window = (
            Window
            .partitionBy("userId")
            .orderBy(desc("sequenceNum"))
        )


        update_df = (
            update_df.alias("s")
            .join(
                dt.toDF().alias("t"), 
                on=(col("s.userId") == col("t.userId")) & (col("s.record_track_hash") == col("t.record_track_hash")),
                how='left_anti'
            )
            .withColumn("row_num", row_number().over(window))
            .withColumn("record_active_flag", col("row_num") == 1)
            .withColumn("record_end_date", when(col("record_active_flag") == lit(True), lit("9999-01-01")).when(col("record_active_flag") == lit(False), col("ts_ms")))
            .withColumn("mergeKey", lit(None))
        )

        # insert updated rows as new records
        (
            dt.alias("target")
            .merge(
                update_df.alias("source"), 
                "target.userId = source.mergeKey"
            )
            .whenNotMatchedInsert(
                values={
                    "target.userId": "source.userId",
                    "target.name": "source.name",
                    "target.city": "source.city",
                    "target.sequenceNum": "source.sequenceNum",
                    "target.record_active_flag": "source.record_active_flag",
                    "target.record_start_date": "source.ts_ms",
                    "target.record_end_date": "source.record_end_date",
                    "target.record_track_hash": "source.record_track_hash"
                }
            )
            .execute()
        )

    # get delelted rows (operation = DELETE)
    delete_df = event_df.filter("operation = 'DELETE'").dropDuplicates(['userId'])

    if not delete_df.isEmpty():
        # run delete command
        (
            dt.alias("target")
            .merge(
                delete_df.alias("source"), "target.userId = source.userId"
            )
            # soft delete (active_flag = false)
            .whenMatchedUpdate(
                set={
                    'target.record_active_flag': lit(False),
                    'target.record_end_date': 'source.ts_ms'
                }
            )
            # .whenMatchedDelete()
            .execute()
        )

    # check out-of-order events
    # 1. DELETE/UPDATE before INSERT -> left_anti join
    # 2. DELETE before UPDATE (INSERT already exists)
    ooo_event_df = event_df.filter("operation = 'UPDATE' OR operation = 'DELETE'")
    ooo_event_df = (
        ooo_event_df
        .join(
            dt.toDF(), on=record_keys, how='left_anti'
        )
    )

    # write out of order events to streaming error event table for retry later



In [12]:
apply_changes(cdc_data, test_df)

In [13]:
cdc_data.toDF().sort("sequenceNum").show(truncate=False)

+------+------------------------------+------------------------------+-----------+------------------+-----------------+---------------+-----------------+
|userId|name                          |city                          |sequenceNum|record_active_flag|record_start_date|record_end_date|record_track_hash|
+------+------------------------------+------------------------------+-----------+------------------+-----------------+---------------+-----------------+
|124   |Raul                          |Oaxaca                        |1          |true              |2025-02-01       |9999-01-01     |-357915523       |
|123   |Isabel                        |Monterrey                     |1          |false             |2025-02-01       |2025-07-01     |-883305503       |
|126   |Lily                          |Cancun                        |2          |true              |2025-03-01       |9999-01-01     |1891951372       |
|125   |Mercedes                      |Tijuana                       |2     

In [14]:
cdc_data.history().show(truncate=False)

+-------+-----------------------+------+--------+-----------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Change data feed

In [None]:
# read changes from batch queries
change_df = (
  spark.read
  .format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0) # or startingTimestamp
  .load(table_path)
)

change_df.sort("_commit_version", "_commit_timestamp").show(truncate=False, n=50)

+------+------------------------------+------------------------------+-----------+------------------+-----------------+---------------+-----------------+----------------+---------------+-----------------------+
|userId|name                          |city                          |sequenceNum|record_active_flag|record_start_date|record_end_date|record_track_hash|_change_type    |_commit_version|_commit_timestamp      |
+------+------------------------------+------------------------------+-----------+------------------+-----------------+---------------+-----------------+----------------+---------------+-----------------------+
|126   |Lily                          |Cancun                        |2          |true              |2025-03-01       |9999-01-01     |1891951372       |insert          |1              |2025-05-17 16:47:40.678|
|123   |Isabel                        |Monterrey                     |1          |true              |2025-02-01       |9999-01-01     |-883305503       |ins