In [5]:
from pyspark.sql import Row
from datetime import datetime

# Initial Data (e.g., simplified patient data)
data = [
    Row(id=1, name='Alice Smith', value=100.0, updated_timestamp=datetime(2024, 10, 26, 10, 0, 0)),
    Row(id=2, name='Bob Johnson', value=200.0, updated_timestamp=datetime(2024, 10, 26, 10, 5, 0)),
    Row(id=3, name='Charlie Brown', value=300.0, updated_timestamp=datetime(2024, 10, 26, 10, 10, 0))
]
initial_df = spark.createDataFrame(data)

# Define the managed table name in the Lakehouse
source_table_name = "HealthcareData.source_patient_data_managed"

# Write initial data to a managed Delta table
# Using mode("overwrite") for a clean start each demo run
initial_df.write.format("delta").mode("overwrite").saveAsTable(source_table_name)

print(f"Initial managed source table '{source_table_name}' created.")

# Show the table exists and data is there via SQL
spark.sql(f"SELECT * FROM {source_table_name}").show()

StatementMeta(, 3fee9106-5a6d-4f09-8952-bdbb91ed22a5, 7, Finished, Available, Finished)

Initial managed source table 'HealthcareData.source_patient_data_managed' created.
+---+-------------+-----+-------------------+
| id|         name|value|  updated_timestamp|
+---+-------------+-----+-------------------+
|  1|  Alice Smith|100.0|2024-10-26 10:00:00|
|  2|  Bob Johnson|200.0|2024-10-26 10:05:00|
|  3|Charlie Brown|300.0|2024-10-26 10:10:00|
+---+-------------+-----+-------------------+



In [6]:
# Enable CDF for the managed table
spark.sql(f"ALTER TABLE {source_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")

print(f"CDF enabled for managed table '{source_table_name}'.")

StatementMeta(, 3fee9106-5a6d-4f09-8952-bdbb91ed22a5, 8, Finished, Available, Finished)

CDF enabled for managed table 'HealthcareData.source_patient_data_managed'.


In [7]:
spark.sql(f"SHOW TBLPROPERTIES {source_table_name}").show()

StatementMeta(, 3fee9106-5a6d-4f09-8952-bdbb91ed22a5, 9, Finished, Available, Finished)

+--------------------+-----+
|                 key|value|
+--------------------+-----+
|delta.enableChang...| true|
|delta.minReaderVe...|    1|
|delta.minWriterVe...|    4|
|delta.parquet.vor...| true|
+--------------------+-----+



In [8]:
# Insert new data (e.g., new patient) using SQL INSERT
spark.sql(f"""
  INSERT INTO {source_table_name} VALUES
  (4, 'David Green', 400.0, '{datetime(2024, 10, 27, 11, 0, 0)}')
""")

print("Inserted new record into managed table.")

StatementMeta(, 3fee9106-5a6d-4f09-8952-bdbb91ed22a5, 10, Finished, Available, Finished)

Inserted new record into managed table.


In [9]:
# Update existing data (e.g., id=1, patient Alice's value changed) using SQL UPDATE
spark.sql(f"""
  UPDATE {source_table_name}
  SET value = 150.0, updated_timestamp = '{datetime(2024, 10, 27, 11, 10, 0)}'
  WHERE id = 1
""")

print("Updated a record in managed table.")

StatementMeta(, 3fee9106-5a6d-4f09-8952-bdbb91ed22a5, 11, Finished, Available, Finished)

Updated a record in managed table.


In [10]:
# Delete data (e.g., id=3, patient Charlie discharged/record removed) using SQL DELETE
spark.sql(f"""
  DELETE FROM {source_table_name}
  WHERE id = 3
""")

print("Deleted a record from managed table.")

StatementMeta(, 3fee9106-5a6d-4f09-8952-bdbb91ed22a5, 12, Finished, Available, Finished)

Deleted a record from managed table.


In [11]:
spark.sql(f"DESCRIBE HISTORY {source_table_name}").show()

StatementMeta(, 3fee9106-5a6d-4f09-8952-bdbb91ed22a5, 13, Finished, Available, Finished)

+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|           operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      4|2025-04-25 20:51:...|  NULL|    NULL|              DELETE|{predicate -> ["(...|NULL|    NULL|     NULL|          3|  Serializable|        false|{numRemovedFiles ...|        NULL|Apache-Spark/3.5....|
|      3|2025-04-25 20:51:...|  NULL|    NULL|              UPDATE|{predicate -> ["(...|NULL|    NULL|     NULL|          2|  Serializable|        false|{numRemoved

In [26]:
# Define the version to start reading from (ADJUST based on your DESCRIBE HISTORY output)
starting_version = 1 # Example: If version 0 was initial write, changes start from 1.

# Define the ABFSS path for the managed table's data location in OneLake
workspace_name = "Healthcare" # Your workspace name
lakehouse_name = "HealthcareData" # Your Lakehouse name
managed_table_simple_name = source_table_name.split('.')[-1] # Get just the table name part
abfss_path_to_table = f"abfss://{workspace_name}@onelake.dfs.fabric.microsoft.com/{lakehouse_name}.Lakehouse/Tables/{managed_table_simple_name}"

# Read the change data feed using the ABFSS path and CDF options
change_df = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", starting_version) \
    .load(abfss_path_to_table)

print(f"Read change data from version {starting_version} from ABFSS path {abfss_path_to_table}.")
change_df.show(truncate=False)

StatementMeta(, 3fee9106-5a6d-4f09-8952-bdbb91ed22a5, 29, Finished, Available, Finished)

Read change data from version 1 from ABFSS path abfss://Healthcare@onelake.dfs.fabric.microsoft.com/HealthcareData.Lakehouse/Tables/source_patient_data_managed.
+---+-------------+-----+-------------------+----------------+---------------+-----------------------+
|id |name         |value|updated_timestamp  |_change_type    |_commit_version|_commit_timestamp      |
+---+-------------+-----+-------------------+----------------+---------------+-----------------------+
|1  |Alice Smith  |100.0|2024-10-26 10:00:00|update_preimage |3              |2025-04-25 20:51:02.595|
|1  |Alice Smith  |150.0|2024-10-27 11:10:00|update_postimage|3              |2025-04-25 20:51:02.595|
|3  |Charlie Brown|300.0|2024-10-26 10:10:00|delete          |4              |2025-04-25 20:51:10.94 |
|4  |David Green  |400.0|2024-10-27 11:00:00|insert          |2              |2025-04-25 20:50:47.359|
+---+-------------+-----+-------------------+----------------+---------------+-----------------------+


In [27]:
# Define target managed table name
target_table_name = "HealthcareData.target_patient_data_managed"

# --- FIX: Correctly Initialize Target Table with Initial Data (Simulate Initial Load) ---
# Instead of dropping and creating an empty table, we will populate it
# with the data from the source table at version 0 (the initial write).
# This simulates the "first run" or initial load of the target table.

initial_source_data_at_v0 = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load(abfss_path_to_table) # Use the ABFSS path

# Overwrite the target table with this initial data
initial_source_data_at_v0.write.format("delta").mode("overwrite").saveAsTable(target_table_name)

print(f"Target managed table '{target_table_name}' initialized with data from source version 0 (simulating initial load).")

# --- Show the state after initial load (optional) ---
print(f"State of target table after initial load (version 0 from source):")
spark.read.table(target_table_name).show()


# --- Continue with processing changes from CDF (Incremental Load Logic) ---

# Process changes: Filter out update_preimage and select relevant columns explicitly
changes_for_merge = change_df.filter("_change_type != 'update_preimage'") \
                             .select("id", "name", "value", "updated_timestamp", "_change_type") # Select necessary columns

# Register the filtered changes as a temporary view
temp_view_name = "temp_cdc_changes_for_merge"
changes_for_merge.createOrReplaceTempView(temp_view_name)

print(f"Filtered changes registered as temporary view '{temp_view_name}'.")

# Perform the MERGE operation on the target managed table (which is now NOT empty)
# using the TEMPORARY VIEW. This MERGE logic is correct for incremental updates.
spark.sql(f"""
  MERGE INTO {target_table_name} AS target
  USING {temp_view_name} AS source  -- Use the temporary view as the source
  ON target.id = source.id
  WHEN MATCHED AND source._change_type = 'update_postimage' THEN
    UPDATE SET
      target.name = source.name,
      target.value = source.value,
      target.updated_timestamp = source.updated_timestamp
  WHEN MATCHED AND source._change_type = 'delete' THEN
    DELETE
  WHEN NOT MATCHED AND source._change_type = 'insert' THEN
    INSERT (id, name, value, updated_timestamp)
    VALUES (source.id, source.name, source.value, source.updated_timestamp)
""")

print(f"Incremental changes applied to the target managed table '{target_table_name}' using temporary view '{temp_view_name}'.")

# --- Optional: Clean up the temporary view when done ---
# spark.catalog.dropTempView(temp_view_name)
# print(f"Temporary view '{temp_view_name}' dropped.")

StatementMeta(, 3fee9106-5a6d-4f09-8952-bdbb91ed22a5, 30, Finished, Available, Finished)

Target managed table 'HealthcareData.target_patient_data_managed' initialized with data from source version 0 (simulating initial load).
State of target table after initial load (version 0 from source):
+---+-------------+-----+-------------------+
| id|         name|value|  updated_timestamp|
+---+-------------+-----+-------------------+
|  1|  Alice Smith|100.0|2024-10-26 10:00:00|
|  2|  Bob Johnson|200.0|2024-10-26 10:05:00|
|  3|Charlie Brown|300.0|2024-10-26 10:10:00|
+---+-------------+-----+-------------------+

Filtered changes registered as temporary view 'temp_cdc_changes_for_merge'.
Incremental changes applied to the target managed table 'HealthcareData.target_patient_data_managed' using temporary view 'temp_cdc_changes_for_merge'.


In [28]:
from delta.tables import DeltaTable
from datetime import datetime

# Get the latest commit version processed in this run from the original change_df
# We use the original change_df as it contains all change types and versions read
if change_df.count() > 0:
    latest_processed_version = change_df.agg({"_commit_version": "max"}).collect()[0][0]
    print(f"Latest commit version processed in this run: {latest_processed_version}")

    # --- Store this version persistently in a control table (using DataFrame ops) ---
    # Define the name for your control table
    control_table_name = "HealthcareData.cdf_control_versions"

    # Data for the new/updated watermark
    new_watermark_data = [(source_table_name, latest_processed_version, datetime.now())]
    new_watermark_df = spark.createDataFrame(new_watermark_data, ["table_name", "last_processed_version", "processed_timestamp"])

    # Create the control table if it doesn't exist
    # This table will store the last processed version for each source table
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {control_table_name}
        (table_name STRING, last_processed_version LONG, processed_timestamp TIMESTAMP)
        USING DELTA
    """)
    print(f"Control table '{control_table_name}' ensured to exist.")

    # --- Read, Update/Add Row, and Overwrite the Control Table ---
    try:
        # Read the current state of the control table
        control_delta_table = DeltaTable.forName(spark, control_table_name)
        current_control_df = control_delta_table.toDF()

        # Filter out the old row for the source table if it exists
        filtered_control_df = current_control_df.filter(f"table_name != '{source_table_name}'")

        # Union the filtered data with the new watermark row
        updated_control_df = filtered_control_df.union(new_watermark_df)

        # Overwrite the control table with the updated DataFrame
        updated_control_df.write.format("delta").mode("overwrite").saveAsTable(control_table_name)

        print(f"Watermark updated for '{source_table_name}' in control table '{control_table_name}' using overwrite.")

    except Exception as e:
         # This catch is mostly for initial table creation or unexpected issues
         # If the table didn't exist initially, the CREATE TABLE IF NOT EXISTS handles it.
         # For a robust production pipeline, more specific error handling might be needed.
         print(f"Could not update control table using overwrite method: {e}")
         # As a fallback for the very first run if the table was just created:
         try:
             new_watermark_df.write.format("delta").mode("append").saveAsTable(control_table_name)
             print(f"Watermark appended for '{source_table_name}' (fallback append).")
         except Exception as append_e:
             print(f"Fallback append also failed: {append_e}")


    # --- Read and show the state of the control table ---
    print(f"Current state of the control table '{control_table_name}':")
    spark.read.table(control_table_name).show(truncate=False) # Show all columns

else:
    print("No changes were read from the change feed in this run. Watermark remains unchanged.")

# --- Optional: Clean up the temporary view ---
try:
     spark.catalog.dropTempView("temp_cdc_changes_for_merge")
     print("Temporary view 'temp_cdc_changes_for_merge' dropped.")
except Exception as e:
     # Ignore if view doesn't exist, just ensure cleanup attempt
     pass # print(f"Temporary view 'temp_cdc_changes_for_merge' might not exist or failed to drop: {e}")

StatementMeta(, 3fee9106-5a6d-4f09-8952-bdbb91ed22a5, 31, Finished, Available, Finished)

Latest commit version processed in this run: 4
Control table 'HealthcareData.cdf_control_versions' ensured to exist.
Watermark updated for 'HealthcareData.source_patient_data_managed' in control table 'HealthcareData.cdf_control_versions' using overwrite.
Current state of the control table 'HealthcareData.cdf_control_versions':
+------------------------------------------+----------------------+--------------------------+
|table_name                                |last_processed_version|processed_timestamp       |
+------------------------------------------+----------------------+--------------------------+
|HealthcareData.source_patient_data_managed|4                     |2025-04-25 21:27:23.471805|
+------------------------------------------+----------------------+--------------------------+

Temporary view 'temp_cdc_changes_for_merge' dropped.


In [29]:
# Define target managed table name
target_table_name = "HealthcareData.target_patient_data_managed"

print(f"\n--- Final state of target managed table '{target_table_name}': ---")
# Read the target table and display the contents
# Use .show() for a simple text output, or .display() in a notebook for a richer view
spark.read.table(target_table_name).show()
# Alternatively, using display():
# spark.read.table(target_table_name).display()

StatementMeta(, 3fee9106-5a6d-4f09-8952-bdbb91ed22a5, 32, Finished, Available, Finished)

--- Final state of target managed table 'HealthcareData.target_patient_data_managed': ---
+---+-----------+-----+-------------------+
| id|       name|value|  updated_timestamp|
+---+-----------+-----+-------------------+
|  1|Alice Smith|150.0|2024-10-27 11:10:00|
|  4|David Green|400.0|2024-10-27 11:00:00|
|  2|Bob Johnson|200.0|2024-10-26 10:05:00|
+---+-----------+-----+-------------------+
