# Change Data Capture (CDC)

In [0]:
# --- KONFIGURATION & SETUP ---

# 1. Zentrale Pfad-Definitionen (Damit alles zusammenpasst)
base_path       = "/Volumes/workspace/default/volume/cdc_demo"
cdc_source_path = f"{base_path}/source_json"       # Hier legen wir die JSONs ab
checkpoint_path = f"{base_path}/_checkpoints"      # Hier speichert Spark den State
schema_path     = f"{base_path}/_schema_inference" # Hier speichert Auto Loader das Schema

# Tabellennamen
target_table    = "workspace.default.customers_silver"
temp_view_name  = "cdc_bronze_view"

# 2. AUFRÄUMEN (Reset)
print("🧹 Starte Bereinigung...")

# Physische Dateien löschen (WICHTIG: Damit Checkpoints und Schema vergessen werden!)
# Wir löschen den ganzen Demo-Ordner rekursiv
dbutils.fs.rm(base_path, True) 

# Tabellen und Views aus dem Katalog werfen
spark.sql(f"DROP TABLE IF EXISTS {target_table}")
spark.sql(f"DROP VIEW IF EXISTS {temp_view_name}")

# Ordner-Struktur frisch erstellen (Optional, aber sauber)
dbutils.fs.mkdirs(cdc_source_path)

print("✅ System vollständig zurückgesetzt. Bereit für Demo-Start!")

🧹 Starte Bereinigung...
✅ System vollständig zurückgesetzt. Bereit für Demo-Start!


In [0]:
# --- SCHRITT 1: Die "Gestern"-Situation herstellen ---
# Wir simulieren eine bestehende Kundentabelle (Silver Layer)

# Tabelle erstellen
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {target_table} (
      id INT,
      name STRING,
      email STRING,
      city STRING,
      updated_at TIMESTAMP
    )
""")

# Daten einfügen (Initial Load)
# Hans und Lisa sind unsere Bestandskunden
spark.sql(f"""
    INSERT INTO {target_table} VALUES 
    (1, 'Hans Müller', 'hans@demo.de', 'Berlin', current_timestamp()),
    (2, 'Lisa Schmidt', 'lisa@demo.de', 'München', current_timestamp())
""")

print(f"✅ Zieltabelle '{target_table}' erstellt und mit Basis-Daten gefüllt.")
display(spark.table(target_table))

✅ Zieltabelle 'workspace.default.customers_silver' erstellt und mit Basis-Daten gefüllt.


id,name,email,city,updated_at
1,Hans Müller,hans@demo.de,Berlin,2025-11-26T09:26:13.668Z
2,Lisa Schmidt,lisa@demo.de,München,2025-11-26T09:26:13.668Z


In [0]:
# --- SCHRITT 2: Neue Daten simulieren (CDC Source) ---
# Wir erstellen eine JSON-Datei im Volume, die Updates, Inserts und Deletes enthält.

cdc_data_json = """
[
  {"id": 1, "name": "Hans Müller", "email": "hans.neu@demo.de", "city": "Berlin", "operation": "UPDATE"}, 
  {"id": 2, "name": null, "email": null, "city": null, "operation": "DELETE"},
  {"id": 3, "name": "Kai Kurz", "email": "kai@demo.de", "city": "Hamburg", "operation": "INSERT"}
]
"""

# Datei physisch ins Volume schreiben
# Der Auto Loader wird diese Datei gleich entdecken
batch_file_path = f"{cdc_source_path}/batch_01.json"
dbutils.fs.put(batch_file_path, cdc_data_json, overwrite=True)

print(f"✅ Neue CDC-Datei wurde hier abgelegt: {batch_file_path}")
print("   Inhalt: Hans (Update), Lisa (Delete), Kai (Insert)")

Wrote 290 bytes.
✅ Neue CDC-Datei wurde hier abgelegt: /Volumes/workspace/default/volume/cdc_demo/source_json/batch_01.json
   Inhalt: Hans (Update), Lisa (Delete), Kai (Insert)


In [0]:
# --- SCHRITT 3: Ingestion (Stream Starten) ---
# Wir laden die Rohdaten via Auto Loader in eine temporäre View

print("🚀 Starte Auto Loader Stream...")

# 1. Stream Definition
df_cdc_stream = (spark.readStream
                 .format("cloudFiles")
                 .option("cloudFiles.format", "json")
                 .option("cloudFiles.inferColumnTypes", "true")
                 # WICHTIG: Hier speichern wir das erkannte Schema
                 .option("cloudFiles.schemaLocation", schema_path) 
                 .load(cdc_source_path))

# 2. Schreiben in Memory-View (als Brücke zu SQL)
# In der Free Edition nutzen wir trigger(availableNow=True), damit es wie ein Batch läuft
ingestion_query = (df_cdc_stream.writeStream
                   .format("memory")
                   .queryName(temp_view_name)  # Name der View für SQL
                   .outputMode("append")
                   .trigger(availableNow=True) # Einmal alles verarbeiten, dann Stopp
                   .option("checkpointLocation", checkpoint_path)
                   .start())

# Warten, bis der Stream fertig ist (sonst ist die View noch leer!)
ingestion_query.awaitTermination()

print(f"✅ Ingestion abgeschlossen. Daten stehen in View '{temp_view_name}' bereit.")

🚀 Starte Auto Loader Stream...
✅ Ingestion abgeschlossen. Daten stehen in View 'cdc_bronze_view' bereit.


In [0]:
# --- SCHRITT 4: Apply Changes (MERGE) ---
# Wir verschmelzen die Updates (Bronze View) in das Ziel (Silver Table)

print("🔄 Führe MERGE Operation aus...")

spark.sql(f"""
MERGE INTO {target_table} AS target
USING {temp_view_name} AS source
ON target.id = source.id

-- Fall 1: DELETE (Lisa)
WHEN MATCHED AND source.operation = 'DELETE' THEN
  DELETE

-- Fall 2: UPDATE (Hans)
WHEN MATCHED AND source.operation = 'UPDATE' THEN
  UPDATE SET 
    target.email = source.email,
    target.city = source.city,
    target.updated_at = current_timestamp()

-- Fall 3: INSERT (Kai)
WHEN NOT MATCHED AND source.operation = 'INSERT' THEN
  INSERT (id, name, email, city, updated_at) 
  VALUES (source.id, source.name, source.email, source.city, current_timestamp())
""")

print("✅ MERGE erfolgreich durchgeführt.")

🔄 Führe MERGE Operation aus...
✅ MERGE erfolgreich durchgeführt.


In [0]:
# --- SCHRITT 5: Ergebnisprüfung ---
print("📊 Aktueller Stand der Tabelle:")

# Wir nutzen SQL Magic für schöne Darstellung, aber rufen die Variable ab
display(spark.sql(f"SELECT * FROM {target_table} ORDER BY id"))

# Erwartung:
# ID 1 (Hans): Neue Email
# ID 2 (Lisa): Weg
# ID 3 (Kai): Neu da

📊 Aktueller Stand der Tabelle:


id,name,email,city,updated_at
1,Hans Müller,hans.neu@demo.de,Berlin,2025-11-26T09:27:20.448Z
3,Kai Kurz,kai@demo.de,Hamburg,2025-11-26T09:27:20.448Z


TimeTravel (optional)

In [0]:
%sql
-- Zeig mir das Logbuch der Tabelle
DESCRIBE HISTORY workspace.default.customers_silver

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2025-11-26T09:27:24.000Z,78230215195550,philippe.christen@fhnw.ch,MERGE,"Map(predicate -> [""(cast(id#33216 as bigint) = id#33196L)""], clusterBy -> [], matchedPredicates -> [{""predicate"":""(operation#33198 = DELETE)"",""actionType"":""delete""},{""predicate"":""(operation#33198 = UPDATE)"",""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""predicate"":""(operation#33198 = INSERT)"",""actionType"":""insert""}])",,List(2852600395492658),1126-074506-re5c2t3l-v2n,1.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 1, numTargetFilesAdded -> 2, numTargetBytesAdded -> 4460, numTargetBytesRemoved -> 1585, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 1, executionTimeMs -> 2372, materializeSourceTimeMs -> 182, numTargetRowsInserted -> 1, numTargetRowsMatchedDeleted -> 1, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 994, numTargetRowsUpdated -> 1, numOutputRows -> 2, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 3, numTargetFilesRemoved -> 1, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 1111)",,Databricks-Runtime/17.2.x-aarch64-photon-scala2.13
1,2025-11-26T09:26:15.000Z,78230215195550,philippe.christen@fhnw.ch,WRITE,"Map(mode -> Append, statsOnLoad -> true, partitionBy -> [])",,List(2852600395492658),1126-074506-re5c2t3l-v2n,0.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 1585)",,Databricks-Runtime/17.2.x-aarch64-photon-scala2.13
0,2025-11-26T09:26:14.000Z,78230215195550,philippe.christen@fhnw.ch,CREATE TABLE,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true"",""delta.enableRowTracking"":""true"",""delta.rowTracking.materializedRowCommitVersionColumnName"":""_row-commit-version-col-17ebc7d2-e9bc-44fb-8463-0de569aac57f"",""delta.rowTracking.materializedRowIdColumnName"":""_row-id-col-c4648f46-a046-4295-9193-afa2afc6f8b4""}, statsOnLoad -> false)",,List(2852600395492658),1126-074506-re5c2t3l-v2n,,WriteSerializable,True,Map(),,Databricks-Runtime/17.2.x-aarch64-photon-scala2.13


In [0]:
%sql
-- Wir reisen zurück in die Vergangenheit (zu Version 1)
-- Damals war Lisa noch da und Hans hatte die alte Email.
SELECT * FROM workspace.default.customers_silver VERSION AS OF 1
ORDER BY id

id,name,email,city,updated_at
1,Hans Müller,hans@demo.de,Berlin,2025-11-26T09:26:13.668Z
2,Lisa Schmidt,lisa@demo.de,München,2025-11-26T09:26:13.668Z


### Cleanup

In [0]:
# --- FINAL CLEANUP ---
# Führe diese Zelle aus, wenn du die Demo beendet hast und alles zurücksetzen willst.

print("🧹 Starte finales Aufräumen...")

# 1. Tabellen und Views aus dem Metastore entfernen
spark.sql(f"DROP TABLE IF EXISTS {target_table}")
spark.sql(f"DROP VIEW IF EXISTS {temp_view_name}")
print(f"✅ Tabelle '{target_table}' und View '{temp_view_name}' gelöscht.")

# 2. Physische Dateien im Volume löschen
# Wir löschen den kompletten Hauptordner der Demo
dbutils.fs.rm(base_path, True)
print(f"✅ Alle Dateien und Checkpoints in '{base_path}' gelöscht.")

print("🏁 System ist wieder sauber.")

🧹 Starte finales Aufräumen...
✅ Tabelle 'workspace.default.customers_silver' und View 'cdc_bronze_view' gelöscht.
✅ Alle Dateien und Checkpoints in '/Volumes/workspace/default/volume/cdc_demo' gelöscht.
🏁 System ist wieder sauber.


# Beispiel CDC in Delta Live Tables (NICHT VERFÜGBAR IN DER FREE EDITION!) 

In [0]:
%sql
SET school.dataset_path=dbfs:/mnt/DE-Associate-Book/datasets/school;

In [0]:
%sql
CREATE OR REFRESH STREAMING TABLE courses_bronze
COMMENT "The raw courses data, ingested from CDC feed"
AS SELECT * FROM cloud_files("${school.dataset_path}/courses-cdc", "json")

In [0]:
%sql
CREATE OR REFRESH STREAMING TABLE courses_silver;

In [0]:
%sql
APPLY CHANGES INTO LIVE.courses_silver
 FROM STREAM(LIVE.courses_bronze)
 KEYS (course_id)
 APPLY AS DELETE WHEN row_status = "DELETE"
 SEQUENCE BY row_time
 COLUMNS * EXCEPT (row_status, row_time)

In [0]:
%sql
CREATE OR REPLACE MATERIALIZED VIEW instructor_counts_stats
 COMMENT "Number of courses per instructor"
AS SELECT instructor, count(*) as courses_count,
         current_timestamp() updated_time
 FROM LIVE.courses_silver
 GROUP BY instructor

In [0]:
%sql
CREATE TEMPORARY LIVE VIEW courses_sales
 AS SELECT b.title, o.quantity
   FROM (
     SELECT *, explode(courses) AS course
     FROM LIVE.enrollments_cleaned) o
   INNER JOIN LIVE.courses_silver b
   ON o.course.course_id = b.course_id;