## Helper functions to implement delta change data feed read, predictive i/o (enable deletion vectors) for accelerated delta merge operations, custom delta merge and other operations in databricks.

In [0]:
def enable_cdf_existing_table(table_name:str, table_path:str):
    from pyspark.sql import DataFrame, SparkSession
    from delta import DeltaTable
    from pyspark.sql.utils import AnalysisException
    from pyspark.sql.functions import col

    """
    Description: 
        Enable delta change data feed on existing data files and tables.
    """

    try:
        # enable cdf on new delta tables
        spark.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true")

        # enable cdf on existing delta tables

        # check if cdf already enabled
        dt = DeltaTable.forPath(sparkSession=spark, path=table_path)
        if "delta.enableChangeDataFeed" in str(dt.detail().select(col("properties")).first()["properties"]):
            print("CDF is enabled")
        else:
            # enable cdf on existing table by first dropping external table metadata, the data remains intact
            spark.sql(f"drop table if exists {table_name};")

            # create new delta table on existing data
            spark.sql(f"create external table if not exists {table_name} location '{table_path}'")
            spark.sql(f"alter table {table_name} set tblproperties (delta.enableChangeDataFeed = true)")
            print(f"A new table {table_name} has been created using existing location at `{table_path}`")

            if "delta.enableChangeDataFeed" in str(dt.detail().select(col("properties")).first()["properties"]):
                print(f" CDF has been successfully enable on table {table_name}")
    except AnalysisException as ae:
        print(ae)



In [0]:
def enable_pio_existing_table(table_name:str, table_path:str):
    # import packages
    from pyspark.sql import DataFrame, SparkSession
    from delta import DeltaTable
    from pyspark.sql.utils import AnalysisException
    from pyspark.sql.functions import col


    """
    Description: 
        Enable Predictive I/O (delta.enableDeletionVectors) on existing data files and tables.
    """

    try:
        # enable cdf on existing delta tables

        # check if predictive io already enabled
        dt = DeltaTable.forPath(sparkSession=spark, path=table_path)
        if "delta.enableDeletionVectors" in str(dt.detail().select(col("properties")).first()["properties"]):
            print("Predictive I/O is enabled")
        else:
            # enable predictive io on existing table by first dropping external table metadata, the data remains intact
            spark.sql(f"drop table if exists {table_name};")

            # create new delta table on existing data
            spark.sql(f"create external table if not exists {table_name} location '{table_path}'")
            spark.sql(f"alter table {table_name} set tblproperties (delta.enableDeletionVectors = true)")
            print(f"A new table {table_name} has been created using existing location at `{table_path}`")

            if "delta.enableDeletionVectors" in str(dt.detail().select(col("properties")).first()["properties"]):
                print(f" Predictive I/O has been successfully enable on table {table_name}")
    except AnalysisException as ae:
        print(ae)


In [0]:
from pyspark.sql import DataFrame, SparkSession
from delta import DeltaTable
from typing import List, Dict, Any, Optional
from pyspark.sql.utils import AnalysisException

def merge_delta_table(df:DataFrame, target_tbl:str, primary_key_columns:Dict, target_tbl_path:str, func):

    """
    Description: 
        Custom function to implement delta merge on specified workload.
    """

    if not df.isEmpty():
        try:
            tbl = DeltaTable.forName(sparkSession=spark, tableOrViewName=target_tbl)
            tbl_alias = "target"
            df_alias = "df_src"
            merge_condition = " and ".join([f"{tbl_alias}.{col} = {df_alias}.{col}" for col in primary_key_columns])
            tbl.alias(tbl_alias)\
            .merge(df.alias(df_alias),merge_condition)\
            .whenMatchedUpdateAll(condition = "{df_alias}._change_type_ = update_postimage")\
            .whenNotMatchedInsertAll()\
            .execute()
        except:
            df.write.mode("overwrite").format("delta").saveAsTable(target_tbl)


In [0]:
from pyspark.sql.types import StructField, StructType, TimestampType, LongType, StringType
from pyspark.sql import DataFrame
import datetime

def write_commit_version(write_path:str, commit_version_tbl_name:str, max_commit_version:LongType, max_commit_timestamp:TimestampType, table_name:str):

    """
    Description: 
        Write the current max commit version of the dataframe change data feed to a tracking log/table.
    """
    try:
        properties_list = []
        schema = StructType([
            StructField("Max_Commit_Version", LongType(), True),
            StructField("Max_Commit_Timestamp", TimestampType(), True),
            StructField("Process_Timestamp", TimestampType(), True),
            StructField("Table_Name", StringType(), True)
        ])

        # initialize the variables
        process_timestamp = datetime.datetime.now()
        properties_list.append([max_commit_version, max_commit_timestamp, process_timestamp, table_name])

        df_commit = spark.createDataFrame(data=properties_list, schema=schema)
        df_commit.write.mode("append").format("delta").option("mergeSchema", True).save(write_path)
    except AnalysisException as ae:
        print(ae)



In [0]:
from pyspark.sql.functions import col, max
from pyspark.sql.utils import AnalysisException
import datetime

def get_starting_commit_version(tbl_name:str, commit_version_path:str):

    """
    Description: 
        Get current maximum commit version number from the commit version tracking log.
    """

    try:

        df = spark.read.format("delta").load(commit_version_path)
        starting_version = df.select(max(df.Curr_Max_Commit_Ver)).first()[0]

    except AnalysisException as ae:

        print(ae)
        starting_version = 0

    return starting_version
