In [None]:
from pyspark.sql.functions import col

In [None]:
TABLE_NAME = dbutils.widgets.get("TABLE_NAME")
string_schema = dbutils.widgets.get("QUERY")
ENABLED = dbutils.widgets.get("ENABLED")
SCHEMA = dbutils.widgets.get("SCHEMA")
primary_key = dbutils.widgets.get("PK")

In [None]:
# #variable initialization Same principle with Azure DLS2 but with no need for raw zone and other paths, no clean up requirements as well 
# see create notes, only difference here is we dont read the 'r' data from debezium


# TABLE_NAME= "TABLE1_"
# string_schema = """
# SELECT 
#         after.SomeID AS SomeID
#        ,after.Comments         AS Comments
#        ,after.LoginID          AS LoginID
#        ,after.OpenedDT         AS OpenedDT
#        ,after.SavedDT          AS SavedDT
#        ,after.OriginalLoginID  AS OriginalLoginID
#        ,after.EditedMSID       AS EditedMSID
#        ,after.ApprovalLoginID  AS ApprovalLoginID
#        ,ts_ms as kafka_ts
#        ,op as op
#         ,row_number() over (partition by after.SomeID order by ts_ms desc) rn
#     FROM tmp_{}
#     where op != 'r'
#     order by ts_ms asc
# """
# ENABLED = 'TRUE'
# SCHEMA = 'some_schema'

# primary_key = 'SomeID'

timestamp_key = 'kafka_ts'
delta_table_path = f'/mnt/databricks/DELTA/{SCHEMA}/{TABLE_NAME}'

In [None]:
# reads the checkpoint table 

latest_ts = spark.sql(f"""select last_sync_ts from default.control_table 
                         where table_name=='{TABLE_NAME}' and schema_name == '{SCHEMA}'""").collect()[0]['last_sync_ts']

# read the kafka stream base on the latest timestamp from the checkpoint table, this way, only the needed topics or latest rows are requested 

raw = (spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafkabrokeraddress:9092")
  .option("subscribe", "topic")
  .option("startingTimestamp", int(latest_ts)+1)
  .option("endingTimestamp", "9000000000000")
  .option("kafka.group.id", "dbs_consumer1")
  .load()
)

In [None]:
# this line of code reads the debezium payload from kafka

df = raw.selectExpr("""
                   from_json(cast(value as string), "STRUCT<payload: STRUCT<
                      after: 
                          MAP<STRING, STRING>,
                      op: STRING,
                      ts_ms: DOUBLE
                      >
                      >").payload.after as after""", 
                      """from_json(cast(value as string), "STRUCT<payload: STRUCT<
                      after: 
                          MAP<STRING, STRING>,
                      op: STRING,
                      ts_ms: DOUBLE
                      >
                      >").payload.ts_ms as ts_ms""",
                      """from_json(cast(value as string), "STRUCT<payload: STRUCT<
                      after: 
                          MAP<STRING, STRING>,
                      op: STRING,
                      ts_ms: DOUBLE
                      >
                      >").payload.op as op""",
                      'cast(timestamp as double) * 1000 as sync_ts' )


In [None]:
max_value = df.selectExpr("max(sync_ts) as max_value").collect()[0]['max_value']

In [None]:
# do the actual merge 

import datetime
#capture the current timestamp - store it on a variable called run_timestamp

from pyspark.sql.functions import *
from pyspark.sql.window import Window
from delta.tables import *
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.utils import AnalysisException

delta_table = DeltaTable.forPath(spark, delta_table_path)


df_changes = df

df_changes.createOrReplaceTempView('tmp_' + TABLE_NAME)

extracted_df = spark.sql(string_schema.format(TABLE_NAME))

cleaned_df = extracted_df.filter(col("rn") == 1)


col_list = delta_table.toDF().columns
print(col_list)
merge_action = {f"target.{colName}": f"source.{colName}" for colName in col_list}


merge_condition = (col("target.{}".format(primary_key)) == col("source.{}".format(primary_key)))
insert_condition = (col("source.op") == "c")
update_condition = (col("source.op") == "u")
delete_condition = (col("source.op") == "d")

# Merge the extracted_df into delta_table
delta_table.alias("target").merge(
    cleaned_df.alias("source"),
    merge_condition
).whenMatchedUpdate(
    condition=update_condition,
    set=merge_action
).whenMatchedDelete(
    condition=delete_condition
).whenNotMatchedInsert(
    values=merge_action
).execute()


['MeasurementSetID', 'Comments', 'LoginID', 'OpenedDT', 'SavedDT', 'OriginalLoginID', 'EditedMSID', 'ApprovalLoginID', 'kafka_ts']


In [None]:
# update the control table , the last_sync_ts is the timestamp from the kafka offset ts , so that on the next run, only data from that ts in ms is captured

spark.sql(f"""
          update default.control_table set last_sync_ts = {max_value}
          where schema_name == '{SCHEMA}' and table_name == '{TABLE_NAME}'
          """)

Out[7]: DataFrame[num_affected_rows: bigint]