### Enable Logging

In [0]:
datetime.datetime.fromtimestamp

In [0]:
import logging
import time
import datetime

def instantiate_logger(project_name):
    log_file_prefix = project_name
    file_date = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d')
    p_dir = '/tmp/'
    p_file_name = f'{log_file_prefix}_{file_date}.log'
    log_file = f'{p_dir}{p_file_name}'
    logger = logging.getLogger('custom log')
    logger.setLevel(logging.DEBUG)
    #Instantiate Stream and File handlers
    file_handler = logging.FileHandler(log_file,mode='a')
    stream_handler = logging.StreamHandler()

    #Instantiate Stream and File handlers
    stream_handler.setLevel(logging.DEBUG)

    #Creating Formatter
    formatter = logging.Formatter('%(levelname)s:%(asctime)s:%(message)s')
    stream_handler.setFormatter(formatter)
    file_handler.setFormatter(formatter)
    if logger.hasHandlers():
        logger.handlers.clear()

    #Add Handlers to logger    
    logger.addHandler(file_handler)
    logger.addHandler(stream_handler)
    logger.debug('Starting Logger')
    return logger,log_file
log_writer,log_filename = instantiate_logger('cdf_demo')

DEBUG:2024-08-30 04:38:19,212:Starting Logger


In [0]:
%sh
cat /tmp/cdf_demo_2024-08-30.log

DEBUG:2024-08-30 03:23:29,621:Starting Logger
DEBUG:2024-08-30 03:23:39,984:Starting Logger


## Using autoloader for raw to bronze
- uses notification to scale new file ingestion instead of directory listing
- load file once
- Streaming
- uses checkpoints to resume from last run


In [0]:
from pyspark.sql.functions import (lit,
                                   col,
                                   row_number,
                                   concat,
                                   desc,
                                   to_date
)
from delta.tables import *
from pyspark.sql.window import Window
from pyspark.sql.types import StructType,StructField,StringType,LongType,TimestampType,DateType,BooleanType

In [0]:
dbutils.fs.mkdirs('/FileStore/tables/Manual_CDF_Demo/')
dbutils.fs.mkdirs('/FileStore/tables/Manual_CDF_Demo/data')

True

In [0]:
# dbutils.fs.rm(checkpoint_path,True)

True

In [0]:
%sql
-- DROP TABLE default.customers_bronze

In [0]:
checkpoint_path = 'dbfs:/FileStore/tables/Manual_CDF_Demo/_checkpoint/customers_bronze'
data_path = 'dbfs:/FileStore/tables/Manual_CDF_Demo/data/'

(
   spark.readStream
   .format('cloudFiles') 
   .option('cloudFiles.format','csv')
   .option('cloudFiles.schemaLocation',checkpoint_path)   
   .load(data_path)
   .withColumn('softDelete',lit(False))
   .writeStream
   .option('checkpointLocation',checkpoint_path)
   .trigger(availableNow=True)
   .toTable('customers_bronze')
)

<pyspark.sql.streaming.query.StreamingQuery at 0x7fbe73852710>

In [0]:
%sql
select * from customers_bronze

EmPId,FirstName,LastName,Address1,CreatedOn,ModifiedOn,_rescued_data,softDelete
1,Leonardo,Spider,123 First St.,12/31/2024,12/31/2024,,False
2,Napoleon,Bonaparte,234 Second St.,12/31/2024,12/31/2024,,False
3,Charles,Darwin,345 Third St.,12/31/2024,12/31/2024,,False
4,Albert,Einstein,456 Fourth St.,12/31/2024,12/31/2024,,False
5,Thomas,Jefferson,678 Fifth St.,12/31/2024,12/31/2024,,False
1,Leonardo,da Vinci,123 First St.,12/31/2024,12/31/2024,,False
2,Napoleon,Bonaparte,234 Second St.,12/31/2024,12/31/2024,,False
3,Charles,Darwin,345 Third St.,12/31/2024,12/31/2024,,False
4,Albert,Einstein,456 Fourth St.,12/31/2024,10/31/2025,,False
5,Thomas,Jefferson,678 Fifth St.,12/31/2024,12/31/2024,,False


In [0]:
def TurnCDFOn(schema, table):   
    spark.sql(f"ALTER TABLE {schema}.{table} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
    print(f'TurnCDFOn: Turned CDF on for {schema}.{table}')

In [0]:
TurnCDFOn('default','customers_bronze')

TurnCDFOn: Turned CDF on for default.customers_bronze


In [0]:
customers_deltaTable = DeltaTable.forName(spark,'customers_bronze')
fullHistoryDF = customers_deltaTable.history()
lastOperationDF = customers_deltaTable.history(1)
fullHistoryDF.display()

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
5,2024-08-30T04:41:42Z,7285002445872367,vasaicrow@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 763ead28-04c3-4a02-a8df-2b38b6087bbe, epochId -> 2, statsOnLoad -> false)",,List(33125772867387),0830-030902-ddjb86xj,4.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 0, numOutputBytes -> 0, numAddedFiles -> 0)",,Databricks-Runtime/15.4.x-scala2.12
4,2024-08-30T04:41:38Z,7285002445872367,vasaicrow@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 763ead28-04c3-4a02-a8df-2b38b6087bbe, epochId -> 1, statsOnLoad -> false)",,List(33125772867387),0830-030902-ddjb86xj,3.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 7, numOutputBytes -> 2681, numAddedFiles -> 1)",,Databricks-Runtime/15.4.x-scala2.12
3,2024-08-30T03:56:37Z,7285002445872367,vasaicrow@gmail.com,UPDATE,"Map(predicate -> [""(cast(EmPId#8592 as int) = 1)""])",,List(33125772867387),0830-030902-ddjb86xj,2.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numRemovedBytes -> 2589, numCopiedRows -> 4, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 1, executionTimeMs -> 3030, numDeletionVectorsUpdated -> 0, scanTimeMs -> 1453, numAddedFiles -> 1, numUpdatedRows -> 1, numAddedBytes -> 2804, rewriteTimeMs -> 1577)",,Databricks-Runtime/15.4.x-scala2.12
2,2024-08-30T03:56:12Z,7285002445872367,vasaicrow@gmail.com,SET TBLPROPERTIES,"Map(properties -> {""delta.enableChangeDataFeed"":""true""})",,List(33125772867387),0830-030902-ddjb86xj,1.0,WriteSerializable,True,Map(),,Databricks-Runtime/15.4.x-scala2.12
1,2024-08-30T03:55:53Z,7285002445872367,vasaicrow@gmail.com,STREAMING UPDATE,"Map(outputMode -> Append, queryId -> 763ead28-04c3-4a02-a8df-2b38b6087bbe, epochId -> 0, statsOnLoad -> false)",,List(33125772867387),0830-030902-ddjb86xj,0.0,WriteSerializable,True,"Map(numRemovedFiles -> 0, numOutputRows -> 5, numOutputBytes -> 2589, numAddedFiles -> 1)",,Databricks-Runtime/15.4.x-scala2.12
0,2024-08-30T03:55:47Z,7285002445872367,vasaicrow@gmail.com,CREATE TABLE,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {}, statsOnLoad -> false)",,List(33125772867387),0830-030902-ddjb86xj,,WriteSerializable,True,Map(),,Databricks-Runtime/15.4.x-scala2.12


In [0]:
customers_df = customers_deltaTable.toDF()
(customers_df
    .orderBy('EmpId')
    .display())


EmPId,FirstName,LastName,Address1,CreatedOn,ModifiedOn,_rescued_data
1,Leonardo,daVinci,123 First St.,12/31/2024,12/31/2024,
2,Napoleon,Bonaparte,234 Second St.,12/31/2024,12/31/2024,
3,Charles,Darwin,345 Third St.,12/31/2024,12/31/2024,
4,Albert,Einstein,456 Fourth St.,12/31/2024,12/31/2024,
5,Thomas,Jefferson,678 Fifth St.,12/31/2024,12/31/2024,


### Reading change data feed by version


In [0]:
#doesnt work if version is before timestamp
cdf_version = (
    spark.read.format('delta')
    .option('readChangeFeed','true')
    .option('startingVersion',2)
    .table('customers_bronze')
)

#Reading change data feed by timestamp
#doesnt work if version is before timestamp
cdf_timestamp = (
    spark.read.format('delta')
    .option('readChangeFeed','true')
    .option('startingTimestamp','2024-08-30T03:56:12.000+00:00')
    .table('customers_bronze')
)

cdf_version.display()
cdf_timestamp.display()

EmPId,FirstName,LastName,Address1,CreatedOn,ModifiedOn,_rescued_data,softDelete,_change_type,_commit_version,_commit_timestamp
1,Leonardo,daVinci,123 First St.,12/31/2024,12/31/2024,,False,update_preimage,3,2024-08-30T03:56:37Z
1,Leonardo,Spider,123 First St.,12/31/2024,12/31/2024,,False,update_postimage,3,2024-08-30T03:56:37Z


EmPId,FirstName,LastName,Address1,CreatedOn,ModifiedOn,_rescued_data,softDelete,_change_type,_commit_version,_commit_timestamp
1,Leonardo,daVinci,123 First St.,12/31/2024,12/31/2024,,False,update_preimage,3,2024-08-30T03:56:37Z
1,Leonardo,Spider,123 First St.,12/31/2024,12/31/2024,,False,update_postimage,3,2024-08-30T03:56:37Z


In [0]:
# Seeing CDF with an update
customers_bronze_delta = DeltaTable.forName(spark,'customers_bronze')
(customers_bronze_delta
 .update(
     condition = col('EmPId') == 1,
     set = {'LastName': lit('Spider')}
     )
 )

In [0]:
customers_df = customers_deltaTable.toDF()
(customers_df
    .orderBy('EmpId')
    .display())


EmPId,FirstName,LastName,Address1,CreatedOn,ModifiedOn,_rescued_data,softDelete
1,Leonardo,Spider,123 First St.,12/31/2024,12/31/2024,,False
2,Napoleon,Bonaparte,234 Second St.,12/31/2024,12/31/2024,,False
3,Charles,Darwin,345 Third St.,12/31/2024,12/31/2024,,False
4,Albert,Einstein,456 Fourth St.,12/31/2024,12/31/2024,,False
5,Thomas,Jefferson,678 Fifth St.,12/31/2024,12/31/2024,,False


In [0]:
cdf_timestamp = (
    spark.read.format('delta')
    .option('readChangeFeed','true')
    .option('startingTimestamp','2024-08-30T03:56:12.000+00:00')
    .table('customers_bronze')
)
cdf_timestamp.display()

EmPId,FirstName,LastName,Address1,CreatedOn,ModifiedOn,_rescued_data,softDelete,_change_type,_commit_version,_commit_timestamp
1,Leonardo,daVinci,123 First St.,12/31/2024,12/31/2024,,False,update_preimage,3,2024-08-30T03:56:37Z
1,Leonardo,Spider,123 First St.,12/31/2024,12/31/2024,,False,update_postimage,3,2024-08-30T03:56:37Z


### Get Latest Updates from bronze

In [0]:
def cdf_id_last_version(df):
    filtered_df = (df
                  .filter(~col('_change_type').isin('delete','update_preimage'))
                  )
    windowPartition = Window.partitionBy("EmPId").orderBy(desc('_commit_timestamp'))
    ranked_df = (filtered_df
                 .withColumn('rank',row_number().over(windowPartition))
                 )
    result_df = ranked_df.filter(col('rank') == 1)
    return result_df

last_version = cdf_id_last_version(cdf_timestamp)
last_version.display()

EmPId,FirstName,LastName,Address1,CreatedOn,ModifiedOn,_rescued_data,softDelete,_change_type,_commit_version,_commit_timestamp,rank
1,Leonardo,Spider,123 First St.,12/31/2024,12/31/2024,,False,update_postimage,3,2024-08-30T03:56:37Z,1


## Using cdf for bronze to silver
- uses checkpoints to resume from last run

In [0]:
def silver_etl(df):
    return (
        df.withColumn(
            "FullName",
            concat(
                col("FirstName"),
                lit(" "),
                col("LastName"),
            ),
        )
        .withColumn(
            "CreatedOn", to_date(col("CreatedOn"), "MM/dd/yyyy").alias("CreatedOn")
        )
        .withColumn(
            "ModifiedOn", to_date(col("ModifiedOn"), "MM/dd/yyyy").alias("ModifiedOn")
        )
    )

In [0]:
from pyspark.context import SparkContext

schema = StructType([
    StructField('EmPId',StringType(),True),
    StructField('FirstName',StringType(),True),
    StructField('LastName',StringType(),True),
    StructField('FullName',StringType(),True),
    StructField('Address1',StringType(),True),
    StructField('CreatedOn',DateType(),True),
    StructField('ModifiedOn',DateType(),True),
    # StructField('softDelete',BooleanType(),True),
    StructField('_rescued_data',StringType(),True),
])

empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
empty_df.write.format('delta').mode('overwrite').saveAsTable('customers_silver')

In [0]:
SINK_TABLE = "customers_silver"

def etl_and_merge_to_silver(df, batch_id):
    if not df.isEmpty():
        sink_delta_table = DeltaTable.forName(spark, SINK_TABLE)
        df_transactions_sink = sink_delta_table.toDF()
        log_writer.info(f"Sink delta table has {df_transactions_sink.count()} records")
        last_version_records = cdf_id_last_version(df)
        source_transformed = silver_etl(last_version_records)
        log_writer.info(
            f"batch_id: {batch_id} with source table of {source_transformed.count()} records"
        )
        (
            sink_delta_table.alias("target")
            .merge(source_transformed.alias("source"), "source.EmPId = target.EmPId")
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )
        sink_delta_table = DeltaTable.forName(spark,SINK_TABLE)
        df_transactions_sink = sink_delta_table.toDF()
        log_writer.info(f'Sink delta table has now {df_transactions_sink.count()} records')
    else:
        log_writer.info(f'batch_id: {batch_id} no Records to load')


### Read Stream from checkpoint rather than version or timestamp

In [0]:
SOURCE_TABLE = 'customers_bronze'
checkpoint_path = 'dbfs:/FileStore/tables/Manual_CDF_Demo/_checkpoint/customers_silver'

(spark.readStream
 .format('delta')
 .option('readChangeFeed','true').
 table(SOURCE_TABLE)
 .writeStream
 .foreachBatch(etl_and_merge_to_silver)
 .option('checkpointLocation',checkpoint_path)
 .trigger(availableNow=True)
 .start()
 )

<pyspark.sql.streaming.query.StreamingQuery at 0x7fbe745dc790>

In [0]:
%sql
select * from default.customers_silver

EmPId,FirstName,LastName,FullName,Address1,CreatedOn,ModifiedOn,_rescued_data
1,Leonardo,da Vinci,Leonardo da Vinci,123 First St.,2024-12-31,2024-12-31,
2,Napoleon,Bonaparte,Napoleon Bonaparte,234 Second St.,2024-12-31,2024-12-31,
3,Charles,Darwin,Charles Darwin,345 Third St.,2024-12-31,2024-12-31,
4,Albert,Einstein,Albert Einstein,456 Fourth St.,2024-12-31,2025-10-31,
5,Thomas,Jefferson,Thomas Jefferson,678 Fifth St.,2024-12-31,2024-12-31,
6,baba,yaga,baba yaga,678 Fifth St.,2024-12-31,2024-12-31,
7,tommy,Jefferson,tommy Jefferson,678 Fifth St.,2024-12-31,2024-12-31,


### Copy log file to Cloud Storage

In [0]:
with open(log_filename,'r') as f:
    print(f.read())

DEBUG:2024-08-30 03:23:29,621:Starting Logger
DEBUG:2024-08-30 03:23:39,984:Starting Logger
INFO:2024-08-30 04:32:58,986:Sink delta table has 0 records
INFO:2024-08-30 04:33:30,251:Sink delta table has 0 records
INFO:2024-08-30 04:33:32,029:batch_id: 0 with source table of 5 records
INFO:2024-08-30 04:33:58,028:Sink delta table has 0 records
INFO:2024-08-30 04:33:59,138:batch_id: 0 with source table of 5 records
INFO:2024-08-30 04:34:41,628:Sink delta table has 0 records
INFO:2024-08-30 04:34:42,579:batch_id: 0 with source table of 5 records
INFO:2024-08-30 04:34:51,863:Sink delta table has now 5 records
DEBUG:2024-08-30 04:37:48,942:Starting Logger
DEBUG:2024-08-30 04:38:09,235:Starting Logger
DEBUG:2024-08-30 04:38:19,212:Starting Logger
INFO:2024-08-30 04:42:47,366:Sink delta table has 5 records
INFO:2024-08-30 04:42:48,543:batch_id: 1 with source table of 7 records
INFO:2024-08-30 04:42:59,797:Sink delta table has now 7 records



In [0]:
# dbutils.fs.cp(f'file:{log_filename}', '/mnt/path')

### Clean Up

In [0]:
dbutils.fs.rm('dbfs:/FileStore/tables/Manual_CDF_Demo/_checkpoint/',True)

In [0]:
%sql
DROP TABLE IF EXISTS default.customers_bronze
DROP TABLE IF EXISTS default.customers_silver