# Lakehouse - incremental data load POC using Spark Change data feed (CDF)
#### We will utilise CDF to implement our notebook for Dynamics customers data. Spark and delta lake provide some powerful capabilities that handle incremental data with minimal setups. All new changes are read incrementally and merged into a target table

In [14]:
# Loading the necessary libraries
from pyspark.sql import SparkSession
from datetime import datetime
from dateutil import parser,relativedelta
import pyspark.sql.functions as f
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, hour, to_date, col, quarter, explode, sequence, expr,current_timestamp,lit
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, DoubleType, StringType, FloatType, ArrayType, LongType
from delta.tables import DeltaTable
from notebookutils import mssparkutils
spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite","CORRECTED")


StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 22, Finished, Available)

## Define variables
##### `delta_load` flag defines whether its full load or incremental load from source.
##### To run an initial full load or truncate/reload for a table, set `delta_load` flag as 0 and set the variable with name of the target table (for testing, define new target table). 
##### To run an incremental load, set `delta_load` flag as 1.
##### Define datalake, lakehouse, container and target table. 
##### <u>Make sure to create the lakehouse before this notebook is run.</u>

In [33]:
# 0 for full load and 1 for incremental load
delta_load=1
#delta_load=0


# set path to source table - in our case storage account linked with synapse link
path_to_source_table = "abfss://yourworkspace@onelake.dfs.fabric.microsoft.com/dataverse_xxxxx.Lakehouse/Tables/custtable"

source_table = "`dataverse_xxxxx`.`custtable`"


#define target variables
lakehouse = "yourlakehouse" # target lakehouse - create it first if does not exist


target_table = "customer_silver" # target table
lakehouse_targettable = lakehouse + "." + target_table
path_to_target_table = "Files/clickstreamdata/"+ target_table # to help create a DeltaTable object for merge


# set temp table and its path. This table holds incremental data before merging into target table
temp_table = "customer_silver_temp"
lakehouse_temptable = lakehouse + "." + temp_table


StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 41, Finished, Available)

## Enable CDF on table
#### You can retrieve detailed information about a Delta table (for example, number of files, data size) using DESCRIBE DETAIL. We confirm the property delta.enableChangeDataFeed is indeed enabled.

In [16]:
# run only once to enable CDF on the table
'''
if delta_load == 0:
    spark.sql(f"ALTER TABLE {source_table} SET TBLPROPERTIES ('delta.enableChangeDataFeed'='true')")
    df = spark.sql(f"DESCRIBE DETAIL {source_table}")
    display (df.limit(10))
'''    


StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 24, Finished, Available)

'\nif delta_load == 0:\n    spark.sql(f"ALTER TABLE {source_table} SET TBLPROPERTIES (\'delta.enableChangeDataFeed\'=\'true\')")\n    df = spark.sql(f"DESCRIBE DETAIL {source_table}")\n    display (df.limit(10))\n'

## Read data 
#### Next we need to retrieve changed data from change data feed folders based on starting version and ending version. We use Describe history to get the latest version of the commit on source table. Describe history has information on the operations, user, timestamp, and so on for each write to a Delta table. The operations are returned in reverse chronological order. By default table history is retained for 30 days.

#### The three columns _change_type, _commit_version and _commit_timestamp come due to CDF being enabled on the source table. _change_type can be insert or delete or update_preimage or update_postimage. We will remove records with values update_preimage as these are before update and not needed for merge. _commit_version is an ever increasing number and crucial for our where clause to read only the changed data. More info: https://docs.delta.io/latest/delta-change-data-feed.html#what-is-the-schema-for-the-change-data-feed

In [17]:
# incremental load

if delta_load == 1:

  #Retrieve the latest version of the change data feed table, we read only latest record
  source_df = spark.sql (f"DESCRIBE HISTORY {source_table} LIMIT 1")
  max_version = source_df.agg({"version": "max"}).collect()[0][0]

  #Retrieve the last commit version from target table. We can also use the temp table to get this value as temp table is much smaller
  target_df = spark.sql(f"SELECT _commit_version FROM {lakehouse_targettable}")
  last_commit_version = target_df.agg({"_commit_version": "max"}).collect()[0][0]

  if max_version == last_commit_version:
    mssparkutils.notebook.exit("Aborting as condition not met. Further tasks will be skipped")

  start_version = last_commit_version + 1 # we want to read from after last commited version

# filters for starting and ending versions give us incremental data, both are included
  cdf_table_df = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", start_version) \
    .option("endingVersion", max_version) \
    .table(source_table)

# full load
else:

 # we will read dataframe from source file and not use CDF file
  cdf_table_df = spark.read.format('delta') \
  .table(source_table)


  #after enabling the change data feed for delta table
  #Since we want to use Change Data feed, we need to capture the version number, and use it as watermark for incremental
  df = spark.sql (f"DESCRIBE HISTORY {source_table}")
  max_version = df.agg({"version": "max"}).collect()[0][0]
     
  cdf_table_df = cdf_table_df.withColumn("_commit_version", lit(max_version))
  cdf_table_df = cdf_table_df.withColumn("_change_type", lit('insert'))
  cdf_table_df = cdf_table_df.withColumn("_commit_timestamp", lit(current_timestamp()))
  cdf_table_df = cdf_table_df.withColumn("_loadedtimestamp", lit(current_timestamp()))

  

StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 25, Finished, Available)

## De-duplication and extract columns
##### Remove any duplicates, read only latest. We will remove records with values update_preimage as these are before update and not needed for merge.

In [18]:
# incremental load
if delta_load == 1:

    cdf_table_df.createOrReplaceTempView("cdf_table")
    # We also will get latest version of the records
    cdf_table_df = spark.sql("""
                            select * 
                            from (
                                    select *, row_number () over (partition by Id order by _commit_version desc) as Row_id 
                                    from cdf_table
                                    WHERE _change_type !='update_preimage'
                                ) sub2
                                where sub2.Row_id =1
                                """)
    cdf_table_df = cdf_table_df.select(col('Id'), col('accountnum'), col('custgroup'), col('currency'), col('IsDelete'), col('_commit_version'), col('_change_type'), col('_commit_timestamp'))

    cdf_table_df = cdf_table_df.withColumn("_loadedtimestamp", lit(current_timestamp()))

    cdf_table_df = cdf_table_df.withColumnRenamed('accountnum', 'CustomerId')
    cdf_table_df = cdf_table_df.withColumnRenamed('custgroup', 'CustomerGroup')
    cdf_table_df = cdf_table_df.withColumnRenamed('currency', 'Currency')

StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 26, Finished, Available)

## Write the temp table
#####  We write in Overwrite mode so previous changes are discarded and we only retain new batch of incremental data. 

In [19]:
# Creating the lake database if it does not exist
'''
spark.sql (f"""
CREATE DATABASE IF NOT EXISTS {lakehouse}
""")
'''

StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 27, Finished, Available)

'\nspark.sql (f"""\nCREATE DATABASE IF NOT EXISTS {lakehouse}\n""")\n'

In [20]:
# at this stage, you can write the df to a temp table for analysis, enrichment
cdf_table_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(lakehouse_temptable)

display(cdf_table_df.limit(10))

StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 28, Finished, Available)

SynapseWidget(Synapse.DataFrame, 3740c6ab-5269-4f61-a9ea-efa9e1257c1a)

In [21]:
#df = spark.sql(f"SELECT * FROM {lakehouse_temptable}")
#display(df)

StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 29, Finished, Available)

In [22]:
#df = spark.sql(f"SELECT count(*) FROM {lakehouse_temptable}")
#display(df)

StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 30, Finished, Available)

## Quality Checks
##### Here we are validating that certain columns in the dataframe are unique and not null. The validate function will check all the expectations we've set up and return the results.
##### For example, we want to make sure CustomerId is unique, this can be challenging as you may get multiple copies of same Id when it comes to updates, which can break our merge. 
##### We use an open source library called Great expectations. It has many more capabilities, you can find here. https://learn.microsoft.com/en-us/fabric/data-science/tutorial-great-expectations

In [23]:
# install libraries and then comment it before next run
%pip install semantic-link great-expectations great_expectations_experimental great_expectations_zipcode_expectations


StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 31, Finished, Available)

In [24]:
import great_expectations as ge
import great_expectations.dataset.sparkdf_dataset as sd

# Convert Spark DataFrame to Great Expectations Dataset
ge_df = ge.dataset.SparkDFDataset(cdf_table_df)

# Define Expectations

# Not null 
ge_df.expect_column_values_to_not_be_null('CustomerGroup')
ge_df.expect_column_values_to_not_be_null('Currency')

# Unique
ge_df.expect_column_values_to_be_unique('CustomerId')


# Validate Expectations
results = ge_df.validate()

print(results)

StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 32, Finished, Available)

{
  "success": true,
  "results": [
    {
      "success": true,
      "expectation_config": {
        "expectation_type": "expect_column_values_to_not_be_null",
        "kwargs": {
          "column": "CustomerGroup",
          "result_format": "BASIC"
        },
        "meta": {}
      },
      "result": {
        "element_count": 3,
        "unexpected_count": 0,
        "unexpected_percent": 0.0,
        "unexpected_percent_total": 0.0,
        "partial_unexpected_list": []
      },
      "meta": {},
      "exception_info": {
        "raised_exception": false,
        "exception_message": null,
        "exception_traceback": null
      }
    },
    {
      "success": true,
      "expectation_config": {
        "expectation_type": "expect_column_values_to_not_be_null",
        "kwargs": {
          "column": "Currency",
          "result_format": "BASIC"
        },
        "meta": {}
      },
      "result": {
        "element_count": 3,
        "unexpected_count": 0,
        "unex

## Enrich your silver
##### At this stage, you have a deduped incremental data. You can enrich it further by making joins with other tables, or picking out columns you are interested in and discarding rest. Next, we do the merge with the target silver table. This same process of picking out incremental changes using CDF folders can happen for your gold tables.

## Merge
##### We use Delta lake merge function, to either update, delete or insert. Note the condition uses Id and IsDelete. The deleted records come with IsDelete as true and we use that condition to delete them from target. More info: https://docs.delta.io/latest/delta-update.html

In [34]:
if delta_load == 0:

    #first time write, no merge needed, you can either save the df as target or enriched temp table as target
    cdf_table_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(lakehouse + "." + target_table)

else:

    targetTable = DeltaTable.forPath(spark, path_to_target_table)
    (
    # Execute merge
    targetTable.alias("target").merge(
        cdf_table_df.alias("source"),
        "source.Id = target.Id"

    )
    .whenMatchedDelete(condition = "source.IsDelete = 'true'") 
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
    )

StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 42, Finished, Available)

## Time travel
#### You can use versions and timestamps feature of change data feed files to do time travel and know data values in the past. For more info, read: https://delta.io/blog/2023-02-01-delta-lake-time-travel/

In [35]:
#Retrieve the latest version of the change data feed table, we read only latest record

target_df = spark.sql(f"SELECT _commit_version FROM {lakehouse_temptable}")
new_last_commit_version = target_df.agg({"_commit_version": "max"}).collect()[0][0]

new_last_commit_version


StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 43, Finished, Available)

4

In [36]:
custdf = spark.sql(f"SELECT custGroup FROM {source_table} VERSION AS OF {new_last_commit_version} where accountnum == '004003' ")

display (custdf.limit(5))

StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 44, Finished, Available)

SynapseWidget(Synapse.DataFrame, c460cf5c-9d75-4c89-9ca7-71fd573269c8)

In [37]:
custdf = spark.sql(f"SELECT custGroup FROM {source_table} VERSION AS OF {last_commit_version} where accountnum == '004003' ")

display (custdf.limit(5))

StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 45, Finished, Available)

SynapseWidget(Synapse.DataFrame, 11b72cc4-1919-4b2b-8727-7638eb1744c1)

In [38]:
'''
# filters specific version
cdf_table_df_new = spark.read.format("delta") \
    .option("versionAsOf", new_last_commit_version) \
    .table(source_table)

display (new_last_commit_version)
display (cdf_table_df_new.limit(10))
'''


StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 46, Finished, Available)

'\n# filters specific version\ncdf_table_df_new = spark.read.format("delta")     .option("versionAsOf", new_last_commit_version)     .table(source_table)\n\ndisplay (new_last_commit_version)\ndisplay (cdf_table_df_new.limit(10))\n'

In [39]:
'''
cdf_table_df_old = spark.read.format("delta") \
    .option("versionAsOf", last_commit_version) \
    .table(source_table)


display (last_commit_version)
display (cdf_table_df_old.limit(10))
'''

StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 47, Finished, Available)

'\ncdf_table_df_old = spark.read.format("delta")     .option("versionAsOf", last_commit_version)     .table(source_table)\n\n\ndisplay (last_commit_version)\ndisplay (cdf_table_df_old.limit(10))\n'

## Cleanup with Vacuum command
##### Remove old data in the temp table by using the Vacuum command, which physically removes files from storage that are older than the retention period. This saves storage costs. You can consider using vaccum for source delta lake folders (Synapse link folders) as well once data is read. You will lose ability to time travel if you use vaccum command so you should plan your retention based on business needs. 
More info: https://delta.io/blog/remove-files-delta-lake-vacuum-command/

In [40]:
#spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
#spark.sql(f"VACUUM {lakehouse_temptable} RETAIN 0 HOURS").show(truncate=False)


StatementMeta(, 22d61851-fb53-4466-83c8-bd4a79ee364d, 48, Finished, Available)

## Conclusion
##### The notebook demonstrates a POC of achieving incremental pipeline for data loading at scale using Delta lake change data feed feature. CDF offers a powerful mechansim to load changed data.