In [165]:
#Importing PySpark Packages
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StringType
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit

spark = SparkSession.builder.appName("Assignment").getOrCreate()
sc = spark.sparkContext

In [166]:
#Defining the Schema of the Stream Tables
#Included 4 new columns apart from initial columns timestamp, t1, t3, t3 tags
#t3_modified to convert the stringformat of t3 to boolean value
#t1_counter, t2_counter, t3_counter to keep track of consecutive occurences
userSchema = StructType().add("timestamp", "string").add("t1", "double").add("t2", "integer").add("t3", "string").add("t3_modified","boolean").add("t1_counter", "integer").add("t2_counter", "integer").add("t3_counter", "integer");

In [167]:
#Reading the input stream from the csv from the local Directory
#If we are reading JSON data we need to use json conversion statement
csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("C:/Users/polaramesh/Desktop/Assignment");

In [168]:
#Initialising the Global counters to keep track of the consecutive occurences
t1_counter_value = 0
t2_counter_value = 0
t3_counter_value = 0

In [169]:
#Function to convert the t3 counter to boolean value
def update_t3_modified(t3):
    if(t3 == "ON"):
        return 1
    else:
        return 0

In [170]:
#Function to track 4 consecutive occurences for tag t1 @ t1>55.43 
def t1_counter(t1):
    global t1_counter_value
    if(t1 > 55.43):
        if(t1_counter_value == 4):
            t1_counter_value = 0
            t1_counter_value = t1_counter_value + 1
            return t1_counter_value
        else:
            t1_counter_value = t1_counter_value + 1
            return t1_counter_value
    else:
        t1_counter_value = 0
        return t1_counter_value
        

In [171]:
#Function to track 6 consecutive occurences for tag t2 @ t2<20
def t2_counter(t2):
    global t2_counter_value
    if(t2 < 20):
        if(t2_counter_value == 6):
            t2_counter_value = 0
            t2_counter_value = t2_counter_value + 1
            return t2_counter_value
        else:
            t2_counter_value = t2_counter_value + 1
            return t2_counter_value
    else:
        t2_counter_value = 0
        return t2_counter_value
        

In [172]:
#Function to track 3 consecutive occurences for tag t3 @ t3==ON
def t3_counter(t3):
    global t3_counter_value
    if(t3 == 1):
        if(t3_counter_value == 3):
            t3_counter_value = 0
            t3_counter_value = t3_counter_value + 1
            return t3_counter_value
        else:
            t3_counter_value = t3_counter_value + 1
            return t3_counter_value
    else:
        t3_counter_value = 0
        return t3_counter_value
        

In [173]:
#creating UDF's
fun1 = udf(update_t3_modified)
fun2 = udf(t1_counter)
fun3 = udf(t2_counter)
fun4 = udf(t3_counter)

In [174]:
#Modifying the initial null entries for tracking tags with actual occurences
csvDF=csvDF.withColumn("t3_modified", fun1(csvDF['t3']))
csvDF=csvDF.withColumn("t1_counter", fun2(csvDF['t1']))
csvDF=csvDF.withColumn("t2_counter", fun3(csvDF['t2']))
csvDF=csvDF.withColumn("t3_counter", fun4(csvDF['t3_modified']))

In [175]:
#Creating new dataframe to select 4th consecutive rule break for tag t1
rule_id_1 = csvDF.select('timestamp').where('t1_counter == 4')
rule_id_1 = rule_id_1.withColumn("rule_id",lit(1))

In [176]:
#Creating new dataframe to select 6th consecutive rule break for tag t2
rule_id_2 = csvDF.select('timestamp').where('t2_counter == 6')
rule_id_2 = rule_id_2.withColumn("rule_id",lit(2))

In [177]:
#Creating new dataframe to select 3rd consecutive rule break for tag t3
rule_id_3 = csvDF.select('timestamp').where('t3_counter == 3')
rule_id_3 = rule_id_3.withColumn("rule_id",lit(3))

In [178]:
#Creating new dataframe to union all the above dataframes with corresponding timestamp and Rule_id's
RULE_BREAK = rule_id_1.union(rule_id_2).union(rule_id_3)

In [179]:
#Displaying the RULE_BREAK dataframe on to the console
#as mentioned the interval time is set to 1 minute, can be increased on decreased inside the trigger function
#If we want to store the result into some table at HDFS/S3 location the details need to be specified at option('path', '/path/to/destination/directory')
#Storage format can be mentioned at format('parquet') - for parquet file format
query = RULE_BREAK.writeStream.format('console').option('truncate', 'False').trigger(processingTime='60 seconds').start()

In [180]:
query.stop();