In [0]:
# Creating and Initializing Widgets
dbutils.widgets.text("eventhub_name","","");
dbutils.widgets.text("eventhub_namespace","","");
dbutils.widgets.text("access_key_name", "","");
dbutils.widgets.text("access_key","","");
dbutils.widgets.text("Target_Table_Name","","");
dbutils.widgets.text("primary_key","","");


eventhub_name = dbutils.widgets.get("eventhub_name");
eventhub_namespace = dbutils.widgets.get("eventhub_namespace");
access_key_name = dbutils.widgets.get("access_key_name");
access_key = dbutils.widgets.get("access_key");
Target_Table_Name = dbutils.widgets.get("Target_Table_Name");
primary_key = dbutils.widgets.get("primary_key");

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, from_json
import json
from urllib.parse import unquote
from pyspark.sql.functions import udf
from delta.tables import *
import time
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from datetime import datetime as dt

In [0]:
#Connection to eventhub for streaming data



connection_template = f"Endpoint=sb://{eventhub_namespace}.servicebus.windows.net/;SharedAccessKeyName={access_key_name};SharedAccessKey={access_key};EntityPath={eventhub_name}"
# Start from beginning of stream
startOffset = "-1"

# End at the current time. This datetime formatting creates the correct string format from a python datetime object
endTime = dt.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")

ehConf = {}
# Create the positions
startingEventPosition = {
  "offset": startOffset,  
  "seqNo": -1,            #not in use
  "enqueuedTime": None,   #not in use
  "isInclusive": True
}

endingEventPosition = {
  "offset": None,           #not in use
  "seqNo": -1,              #not in use
  "enqueuedTime": endTime,
  "isInclusive": True
}


# Put the positions into the Event Hub config dictionary
#ehConf["eventhubs.startingPosition"] = json.dumps(startingEventPosition)
#ehConf["eventhubs.endingPosition"] = json.dumps(endingEventPosition)
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connection_template)

In [0]:
#Reading Streaming data from eventhub 
read_df = spark.readStream.format("eventhubs").options(**ehConf).load()

#Converting Streaming data from Binary format to String
bodyString=read_df.withColumn("Body", col("body").cast("String")).select("Body")
bodyString.createOrReplaceTempView("jsonStr")
#startOffset = read_df.select("offset").collect().max

In [0]:
from pyspark.sql.functions import when,col

#Reading Schema from Target Delta Table
#target_table = spark.sql("select concat(Body:source:schema,'.',Body:source:table) from jsonStr").collect
Target_table_schema=spark.sql(f"select * from {Target_Table_Name}").schema

#Extracting payload Streaming json data in table foramt 
json_payload=spark.sql("select Body:before ,Body:after,Body:op,Body:source:schema,Body:source:table from jsonStr")

#Identifying the data is insert, update or delete
changetype = json_payload.withColumn("data", when(json_payload.op == "d",json_payload.before)
                                 .otherwise(json_payload.after))


#Converting Record in Table format from json
inc=changetype.select("*",from_json("data",Target_table_schema).alias("jstr"))
inc.printSchema


In [0]:
#Dataframe containg the change data on insert, update, delete operation
c=inc.select("*",col("jstr.*")).drop("after","before","data","jstr")
finaldf=c.withColumn("modified_date_timestamp",to_timestamp("ModifiedDate")).withWatermark(("modified_date_timestamp"),"20 minutes").na.drop("all")

In [0]:
display(finaldf)

op,schema,table,AddressID,AddressLine1,AddressLine2,City,StateProvinceID,PostalCode,SpatialLocation,rowguid,ModifiedDate,modified_date_timestamp
c,Person,Address,32522.0,1970 Naa!,,Bothqqll,79.0,98111.0,5hAAAAEMrov8KLzkR0BnqJGJiYpewA==,9AADCB0D-36CF-483F-84D8-585C2D4EC636,1666084433993,
d,Person,Address,32522.0,1970 Naa!,,Bothqqll,79.0,98111.0,5hAAAAEMrov8KLzkR0BnqJGJiYpewA==,9AADCB0D-36CF-483F-84D8-585C2D4EC636,1666084433993,
c,Person,Address,32522.0,1970 Naa!,,Bothqqll,79.0,98111.0,5hAAAAEMrov8KLzkR0BnqJGJiYpewA==,9AADCB0D-36CF-483F-84D8-585C2D4EC636,1666084433993,
u,Person,Person,,,,,,,,D8763459-8AA8-47CC-AFF7-C9079AF79033,1201132800000,


In [0]:
from delta.tables import *
from pyspark.sql.functions import *
from delta.tables import *

#Reading taget Delta table in dataframe
deltaTable = DeltaTable.forName(spark, f"{Target_Table_Name}")

# Function to insert ,update ,delete microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
    deltaTable.alias("t").merge(\
    microBatchOutputDF.alias("s"),\
    f"s.{primary_key} = t.{primary_key}") \
    .whenMatchedDelete(condition = "s.op = 'd'")\
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll(condition = "s.op = 'c'") \
    .execute()

# Write the output of a streaming  into Delta table
finaldf.writeStream \
  .format("delta") \
  .foreachBatch(upsertToDelta) \
  .outputMode("update") \
  .option("checkpointLocation", f"/checkpoint/{Target_Table_Name}/1")\
  .start()
