In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from delta.tables import DeltaTable
import time
import os

# -------------------------------
# 1️⃣ Initialize Spark with Delta
# -------------------------------
spark = SparkSession.builder \
    .appName("DeltaStreamingCDC") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

delta_path = "/tmp/delta/employees"
checkpoint_path = "/tmp/delta/checkpoint"
merge_checkpoint = "/tmp/delta/merge_checkpoint"

# Clean up from previous runs
os.system(f"rm -rf {delta_path} {checkpoint_path} {merge_checkpoint}")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/19 04:28:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/19 04:28:20 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


0

In [2]:
# -------------------------------
# 2️⃣ Create Initial Delta Table
# -------------------------------
initial_data = [(1, "Alice", 1000), (2, "Bob", 1500)]
columns = ["id", "name", "salary"]

df_initial = spark.createDataFrame(initial_data, columns)
df_initial.write.format("delta").mode("overwrite").save(delta_path)

delta_table = DeltaTable.forPath(spark, delta_path)
print("Initial Delta Table:")
delta_table.toDF().show()

                                                                                

Initial Delta Table:


                                                                                

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1|Alice|  1000|
|  2|  Bob|  1500|
+---+-----+------+



In [3]:
# -------------------------------
# 3️⃣ Simulate Streaming Updates
# -------------------------------
# We'll create a small "updates" Delta table as the streaming source
updates_path = "/tmp/delta/updates"
os.system(f"rm -rf {updates_path}")

# Start a streaming read from updates_path
updates_stream = spark.readStream.format("delta").load(updates_path)

AnalysisException: [DELTA_SCHEMA_NOT_SET] Table schema is not set.  Write data into it or use CREATE TABLE to set the schema.

In [None]:
# Merge function for foreachBatch
def merge_to_delta(batch_df, batch_id):
    if not batch_df.rdd.isEmpty():
        delta_table.alias("t").merge(
            batch_df.alias("u"),
            "t.id = u.id"
        ).whenMatchedUpdate(set={"salary": "u.salary"}) \
         .whenNotMatchedInsert(values={"id": "u.id", "name": "u.name", "salary": "u.salary"}) \
         .execute()
        print(f"Batch {batch_id} merged!")

# Start streaming merge
merge_query = updates_stream.writeStream \
    .foreachBatch(merge_to_delta) \
    .option("checkpointLocation", merge_checkpoint) \
    .start()

# -------------------------------
# 4️⃣ Simulate CDC / Batch Inserts
# -------------------------------
# New batches coming in every 5 seconds
cdc_batches = [
    [(2, "Bob", 1800), (3, "Charlie", 1200)],  # Update + insert
    [(1, "Alice", 1200), (4, "David", 2000)],  # Update + insert
]

for i, batch in enumerate(cdc_batches):
    df_batch = spark.createDataFrame(batch, columns)
    df_batch.write.format("delta").mode("append").save(updates_path)
    print(f"\nInserted batch {i+1} into updates_path")
    time.sleep(5)

# -------------------------------
# 5️⃣ Query Current and Historical Versions
# -------------------------------
# Wait for streaming to catch up
time.sleep(5)
merge_query.stop()

print("\nDelta Table after streaming merges:")
delta_table.toDF().show()

# View version 0 (initial table)
print("\nVersion 0 (Initial Table):")
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
df_v0.show()

# View version 1 (after first batch)
print("\nVersion 1 (After First Batch):")
df_v1 = spark.read.format("delta").option("versionAsOf", 1).load(delta_path)
df_v1.show()

# View latest version
print("\nLatest Delta Table (Final State):")
delta_table.toDF().show()