In [None]:
#This notebook is inteded for demo puposes.
#Scenario:
#An IOT device is sending data to a Kafka endpoint. This notebook consumes the data and performs the following functions:
#1. Store data in a folder partitioned format on cloud storage by deviceName,tagName, year, month, day, hour
#2. Ensure there is not a duplicate of the specific value stored in the "value" column to be sent to downstream databases. (This looks at a specific value not duplicate for a row of data.)
#   The objective is to only capture changes in the "value" column. Value can only be a 0 or 1.

#To accomplish point to the following logic is done:
#   Take data from the Kafka endpoint and store data in delta table for further processing. The requirement for storing the data 
#   is to check if the data is within the last seven days. If it meets this requirement, it will attempt to place the data in table by 
#   checking the values of the entries that occured before and after the timestamp of the consumed data from kafka. If there is a value
#   already entered we will ignore the new entry. 
#   
#   Once data is stored we will send to downstream SQL database for reporting.
    
 

In [None]:
#Create a function to handle writes. Data is persisted before write to prevent recomputation of data.
def writeData(df):
    #Set up SQL connection information to use Spark SQL Connector
    server_name = "jdbc:sqlserver://{SERVER_ADDR}"
    database_name = "database_name"
    url = server_name + ";" + "databaseName=" + database_name + ";"

    table_name = "table_name"
    username = "username"
    password = "password123!#" # Please specify password here

    df.persist()
    df.write.format("delta").mode("append").partitionBy('day').save("/data/silver")
    df.write \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .mode("append") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password) \
        .save()
    df.unpersist()

In [None]:
from datetime import timezone
from delta.tables import *
import datetime

#Created function to be used in foreachbactch
def processRow(df, batchId):
        #Create dataframe to be used to insert to SQL, need to cast year,month,day,hour as integer
        newDF = df.withColumn("year", from_unixtime(col("time"), 'yyyy').cast(IntegerType())) \
            .withColumn("month", from_unixtime(col("time"), 'MM').cast(IntegerType())) \
            .withColumn("day", from_unixtime(col("time"), 'dd').cast(IntegerType())) \
            .withColumn("hour", from_unixtime(col("time"), 'HH').cast(IntegerType()))


        #Get unix timestap from seven days ago at the current datetime.
        unixAWeekAgo = ((datetime.datetime.now()) - (datetime.timedelta(days=7))).replace(tzinfo=timezone.utc).timestamp()
        #Get timestamp from data received
        time = newDF.toPandas().iat[0][2]



        #Ensure time is greater than 7 days ago
        if(time>unixAWeekAgo):
            #Verify if Delta table has been created, if not create table and insert data.
            if(DeltaTable.isDeltaTable(spark,"/data/silver")== False):
                #persist data to prevent recomputation at each write.
                newDf.persist()
                newDF.write.format("delta").partitionBy("day").option("path","/data/silver").saveAsTable("events")
                #If using Spark 2.4-see below
                #newDF.write.format("delta").mode("append").partitionBy('day') \
                #    .option("__partition_columns", """["day"]""").option("path","/data/silver") \
                #    .saveAsTable("events")

                #Write to SQL
                newDF.write \
                    .format("com.microsoft.sqlserver.jdbc.spark") \
                    .mode("append") \
                    .option("url", url) \
                    .option("dbtable", table_name) \
                    .option("user", username) \
                    .option("password", password) \
                    .save()
                newDF.unpersist()
            else: 
                #Get day and hour values       
                day = newDF.toPandas().iat[0][6]
                hour = newDF.toPandas().iat[0][7]
                #Get the result that happend prior to this timestamp
                lastResult = spark.sql("SELECT * FROM events WHERE day = {0} AND hour = {1} AND time < {2} ORDER BY time DESC limit 1".format(day,hour,time))
                #Get the result that happend after this timestamp
                afterResult = spark.sql("SELECT * FROM events WHERE day = {0} AND hour = {1} AND time > {2} ORDER BY time DESC limit 1".format(day,hour,time))
            
            #Get values from the new data and the data for the last and after result.
                rawValue = newDF.toPandas().iat[0][3]
                lastResultValue = lastResult.toPandas().iat[0][3]
                afterResultValue = afterResult.toPandas().iat[0][3]

                #Compare results to see if a insert is needed.
                #There is a result for before and after
                if(lastResult.count() == 1 and afterResult.count() == 1):
                    #Check if the previous result and the after result are not the same as the dataframe current result
                    if (rawValue != lastResultValue and rawValue != afterResultValue):
                        writeData(df)
                #Check if no previous result but there was a result after
                elif(lastResult.count()== 0 and afterResult.count() == 1):
                    #Verify that the after result is not equal to the current dataframe
                    if(rawValue != afterResultValue):        
                        writeData(df)
                #Check if there is a pervious result but not one after.
                elif(lastResult.count()==1 and afterResult.count() == 0):
                    #Verify that the last result does not match the current dataframe result.
                    if(rawValue != lastResultValue):
                        writeData(df)
                else:
                    #There is no data in table, inserting.
                    writeData(df)

In [None]:
# define Schema for reading data off kafka topic
schema = StructType().add("deviceName", StringType()).add("tagName", StringType()) \
    .add("time", StringType()).add("value", StringType())

In [None]:
#Read data from kafka topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
  .option("subscribe", "iot-events") \
  .load() \
  .select(from_json(col("value").cast("string"), schema))

In [None]:
from pyspark.sql.functions import from_unixtime , col

#Add columns for folder partitioning, date is stored in unix time
storageDF = df.withColumn("year", from_unixtime(col("time"), 'yyyy')) \
    .withColumn("month", from_unixtime(col("time"), 'MM')) \
    .withColumn("day", from_unixtime(col("time"), 'dd')) \
    .withColumn("hour", from_unixtime(col("time"), 'HH'))

#Retrieve value for each (row,column) for path creation of storing data.
deviceName = df.toPandas().iat[0][0]
tagName = df.toPandas().iat[0][1]
year = storageDF.toPandas().iat[0][4]
month = storageDF.toPandas().iat[0][5]
day = storageDF.toPandas().iat[0][6]
hour = storageDF.toPandas().iat[0][7]

#Create Path
storage_path = '/Data/%s/%s/%s/%s/%s/%s' % (deviceName, tagName, year,month,day,hour)

#Write Raw data to folder.
storageDF.write.parquet(storage_path, mode = "append")

In [None]:
#use foreachbatch funcationality to auto process each df.
df.writeStream \
    .foreachBatch(processRow) \ 
    .start()