## Trigger modes
### trigger = Once or trigger = availableNow (both are same, but once is depricated)
    1. Without checkpoint location:
        - If we don't give checkpoint location, spark internally manages it and each new run as diff checkpoint location
        - Because of this in every run, it will load all data from topic
        - It is like full load every time

    2. with checkpoint location:
        - It checks the latestoffset written from checkpoint location and read only from that offset
        - generally appends data

    3. startingOffset
        - earliest -> reads from start
        - latest -> mostly gives null as it reads the data written only at processing time, as it is like bacth mostly it will be null

### trigger = processingTime
    1. Trigger the processing based on the time specified for processingTime. For e.g., if processingTime = 10 seconds, it triggers at every 10 secs and process all the unprocessed offsets.
    2. Without checkpoint location:
        - If we don't give checkpoint location, spark internally manages it and each new run as diff checkpoint location
        - Because of this in every run, it will load all data from topic first
    3. with checkpoint location:
        - It checks the latestoffset written from the checkpoint location and reads only from that offset
    4. startingOffset
        - earliest -> reads from start
        - latest -> When streaming starts, it creates batch 0 with the latest offset, from next run it checkes that offset and read from there


### trigger = continuous
    - Not used much in production
    - The data is processed continuously, but the checkpoint location is updated based on time interval set in Continuous
    - if continuous = 10 seconds, processing happens immediately, but checkpoint updates every 10 secs
    - If jobs failed dude to some issue and the checkpoint wasn't updated, restarting the job will create issue because of duplicate data.
        
    

In [1]:
# imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
# create spark session
spark =( SparkSession 
    .builder 
    .appName("Triggers in Spark Streaming") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
    # .config("spark.sql.shuffle.partitions", 8)
    # .master("local[*]") 
    .getOrCreate()
)
spark

In [3]:
# connect to kafka
df = (
    spark
    .readStream
    .format('kafka')
    .option('kafka.bootstrap.servers','kafka:29092')
    .option('subscribe','test')
    .option('startingOffsets','earliest')
    .load()
)

In [4]:

# cast the value for actual value
df_parsed = (
    df.selectExpr(
        "cast(key as string) as key",
        "cast(value as string) as value",
        "timestamp"
                 )
)


In [5]:
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

json_schema = (
    StructType(
    [StructField('customerId', StringType(), True), 
    StructField('data', StructType(
        [StructField('devices', 
                     ArrayType(StructType([ 
                        StructField('deviceId', StringType(), True), 
                        StructField('measure', StringType(), True), 
                        StructField('status', StringType(), True), 
                        StructField('temperature', LongType(), True)
                    ]), True), True)
        ]), True), 
    StructField('eventId', StringType(), True), 
    StructField('eventOffset', LongType(), True), 
    StructField('eventPublisher', StringType(), True), 
    StructField('eventTime', StringType(), True)
    ])
)

df_parsed1 = df_parsed.withColumn('values_json',from_json('value',json_schema)).selectExpr("values_json.*")


In [6]:
# Lets explode the data as devices contains list/array of device reading
from pyspark.sql.functions import explode

exploded_df = df_parsed1.withColumn("data_devices", explode("data.devices"))

In [7]:

flattened_df = (
    exploded_df
    .drop("data")
    .withColumn("deviceId", col("data_devices.deviceId"))
    .withColumn("measure", col("data_devices.measure"))
    .withColumn("status", col("data_devices.status"))
    .withColumn("temperature", col("data_devices.temperature"))
    .drop("data_devices")
)



# Write with trigger modes

## Once

In [84]:
(
    flattened_df
    .writeStream
    .format('console')
    .outputMode('append')
    .trigger(once=True)
    .option('checkpointLocation','trigger_once/c1')
    .start()
    .awaitTermination()
        
)

## Available now

In [85]:
(
    flattened_df
    .writeStream
    .format('console')
    .outputMode('append')
    .trigger(availableNow=True)
    .queryName('batch')
    .option('checkpointLocation','trigger_once/c1')
    .start()
    .awaitTermination()
        
)

IllegalArgumentException: Cannot start query with name batch as a query with that name is already active in this SparkSession

## processingTime

In [8]:
# without checkpoint location

(
    flattened_df
    .writeStream
    .format('console')
    .outputMode('append')
    .trigger(processingTime='10 seconds')
    .queryName('batch')
    .start()
    .awaitTermination()
        
)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/local/lib/python3.10/socket.py", line 717, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [8]:
# without checkpoint location

(
    flattened_df
    .writeStream
    .format('console')
    .outputMode('append')
    .trigger(processingTime='10 seconds')
    .queryName('batch')
    .option('checkpointLocation','trigger_once/processing_time_10_sec_1')
    .start()
    .awaitTermination()
        
)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/local/lib/python3.10/socket.py", line 717, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 