In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Triggers in Spark Streaming") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.1")
    .config("spark.sql.shuffle.partitions", 8)
    .master("local[*]") 
    .getOrCreate()
)

spark

In [2]:
# Create the kafka_df to read from kafka

kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "cf-kafka-1:29092")
    .option("subscribe", "device-data")
    .option("startingOffsets", "earliest")
    .load()
)

In [3]:
# View schema for raw kafka_df
kafka_df.printSchema()
#kafka_df.show()
#kafka_df.rdd.getNumPartitions()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
# Parse value from binay to string into kafka_json_df
from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))

In [5]:
# Schema of the Payload

from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

json_schema = (
    StructType(
    [StructField('customerId', StringType(), True), 
    StructField('data', StructType(
        [StructField('devices', 
                     ArrayType(StructType([ 
                        StructField('deviceId', StringType(), True), 
                        StructField('measure', StringType(), True), 
                        StructField('status', StringType(), True), 
                        StructField('temperature', LongType(), True)
                    ]), True), True)
        ]), True), 
    StructField('eventId', StringType(), True), 
    StructField('eventOffset', LongType(), True), 
    StructField('eventPublisher', StringType(), True), 
    StructField('eventTime', StringType(), True)
    ])
)

In [6]:
# Apply the schema to payload to read the data
from pyspark.sql.functions import from_json,col

streaming_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema)).selectExpr("values_json.*")

In [7]:
# To the schema of the data, place a sample json file and change readStream to read 
streaming_df.printSchema()
#streaming_df.show(truncate=False)

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)



In [8]:
# Lets explode the data as devices contains list/array of device reading
from pyspark.sql.functions import explode

exploded_df = streaming_df.withColumn("data_devices", explode("data.devices"))


In [9]:
# Check the schema of the exploded_df, place a sample json file and change readStream to read 
exploded_df.printSchema()
#exploded_df.show(truncate=False)

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- data_devices: struct (nullable = true)
 |    |-- deviceId: string (nullable = true)
 |    |-- measure: string (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- temperature: long (nullable = true)



In [10]:
# Flatten the exploded df
from pyspark.sql.functions import col

flattened_df = (
    exploded_df
    .drop("data")
    .withColumn("deviceId", col("data_devices.deviceId"))
    .withColumn("measure", col("data_devices.measure"))
    .withColumn("status", col("data_devices.status"))
    .withColumn("temperature", col("data_devices.temperature"))
    .drop("data_devices")
)

In [11]:
# Check the schema of the flattened_df, place a sample json file and change readStream to read 
flattened_df.printSchema()
#flattened_df.show(truncate=False)

root
 |-- customerId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- deviceId: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- status: string (nullable = true)
 |-- temperature: long (nullable = true)



In [None]:
# Start the python script in `utils >  post_to_kafka.py` before running this cell
# The script will keep publishing json to the Kafka topic

# Running in once/availableNow and processingTime mode
# Write the output to console sink to check the output
# When trigger is set to once/availableNow, it runs one batch query and stops 
# When trigger is set to processingTime, it runs a batch query every 10 seconds

(flattened_df
 .writeStream
 .format("console")
 .outputMode("append")
 .trigger(once=True)
 .option("checkpointLocation", "checkpoint_dir_kafka_1")
 .start()
 .awaitTermination())

In [None]:
# Start the python script in `utils >  post_to_kafka.py` before running this cell
# The script will keep publishing json to the Kafka topic

# Running in once/availableNow and processingTime mode
# Write the output to console sink to check the output
# When trigger is set to once/availableNow, it runs one batch query and stops 
# When trigger is set to processingTime, it runs a batch query every 10 seconds

(flattened_df
 .writeStream
 .format("console")
 .outputMode("append")
 .trigger(processingTime='10 seconds')
 .option("checkpointLocation", "checkpoint_dir_kafka_1")
 .start()
 .awaitTermination())

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 535, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ~~~~~~~~~~~~~~~~~~~~^^
  File "/opt/conda/lib/python3.13/socket.py", line 719, in readinto
    return self._sock.recv_into(b)
           ~~~~~~~~~~~~~~~~~~~~^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [26]:
# Running in Continuous mode [Still Experimental in Spark 4.0.1]
# Not all operations are supported in Continuous Mode - https://spark.apache.org/docs/latest/streaming/performance-tips.html#supported-queries

# Databricks Real-Time Mode is a new low-latency execution engine for Spark Structured Streaming that improves upon the older Spark Continuous Processing mode, addressing its limitations and offering broader support and better performance for production workloads.
# https://docs.databricks.com/aws/en/structured-streaming/real-time

# Write the output to memory sink to check the output
# When trigger is set to continuous, it continously runs a query, and creates checkpoints at the interval specified

# Lists all active queries and stops them
# for q in spark.streams.active:
#     print(q.name)  # See active query names
#     q.stop()       # Stop each query as needed

(kafka_df
 .writeStream
 .format("memory")
 .queryName("kafka_table")
 .outputMode("append")
 .trigger(continuous='10 seconds')
 .option("checkpointLocation", "checkpoint_dir_kafka_2")
 .start()
 .awaitTermination()
 )

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 535, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ~~~~~~~~~~~~~~~~~~~~^^
  File "/opt/conda/lib/python3.13/socket.py", line 719, in readinto
    return self._sock.recv_into(b)
           ~~~~~~~~~~~~~~~~~~~~^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [27]:
# View data from Memory Sink
spark.sql("select * from kafka_table").show()

+--------------------+--------------------+-----------+---------+------+--------------------+-------------+
|                 key|               value|      topic|partition|offset|           timestamp|timestampType|
+--------------------+--------------------+-----------+---------+------+--------------------+-------------+
|                NULL|[7B 22 65 76 65 6...|device-data|        0|     0|2025-09-12 15:12:...|            0|
|                NULL|[7B 22 65 76 65 6...|device-data|        0|     1|2025-09-12 15:16:...|            0|
|                NULL|       [74 65 73 74]|device-data|        0|     2|2025-09-12 15:37:...|            0|
|                NULL|[7B 22 65 76 65 6...|device-data|        0|     3|2025-09-12 15:41:...|            0|
|                NULL|[7B 22 65 76 65 6...|device-data|        0|     4|2025-09-12 15:56:...|            0|
|                NULL|[7B 22 65 76 65 6...|device-data|        0|     5|2025-09-12 16:17:...|            0|
|                NULL|[7B 22