## Spark Streaming read from Kafka

In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Streaming from Kafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") \
    .getOrCreate()

spark

:: loading settings :: url = jar:file:/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8726940d-0090-4430-bd78-378582f7a886;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.0 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report 

In [2]:
# Create the streaming_df to read from kafka
streaming_df = spark.readStream\
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "devices") \
    .option("startingOffsets", "earliest") \
    .load()

In [4]:
# To the schema of the data,  post a kafka message and change readStream to read 
# streaming_df.printSchema()
# streaming_df.show(truncate=False)

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



[Stage 0:>                                                          (0 + 1) / 1]

+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [3]:
# JSON Schema
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
json_schema = StructType([StructField('customerId', StringType(), True), \
StructField('data', StructType([StructField('devices', ArrayType(StructType([ \
StructField('deviceId', StringType(), True), \
StructField('measure', StringType(), True), \
StructField('status', StringType(), True), \
StructField('temperature', LongType(), True)]), True), True)]), True), \
StructField('eventId', StringType(), True), \
StructField('eventOffset', LongType(), True), \
StructField('eventPublisher', StringType(), True), \
StructField('eventTime', StringType(), True)])

In [4]:
# Parse value from binay to string
json_df = streaming_df.selectExpr("cast(value as string) as value")

# Apply Schema to JSON value column and expand the value
from pyspark.sql.functions import from_json

json_expanded_df = json_df.withColumn("value", from_json(json_df["value"], json_schema)).select("value.*") 

In [5]:
# Validate Schema
# json_expanded_df.show(10, False)
# json_expanded_df.printSchema()

[Stage 0:>                                                          (0 + 1) / 1]

+----------+-------------------------------------------------------------------------+------------------------------------+-----------+--------------+--------------------------+
|customerId|data                                                                     |eventId                             |eventOffset|eventPublisher|eventTime                 |
+----------+-------------------------------------------------------------------------+------------------------------------+-----------+--------------+--------------------------+
|CI00101   |{[{D004, C, SUCCESS, 20}, {D004, C, SUCCESS, 1}, {D002, C, SUCCESS, 21}]}|1450324a-c546-4175-a6d8-ee58822e1d41|10038      |device        |2023-01-05 11:13:53.650313|
+----------+-------------------------------------------------------------------------+------------------------------------+-----------+--------------+--------------------------+

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (

                                                                                

In [5]:
# Lets explode the data as devices contains list/array of device reading
from pyspark.sql.functions import explode, col

exploded_df = json_expanded_df \
    .select("customerId", "eventId", "eventOffset", "eventPublisher", "eventTime", "data") \
    .withColumn("devices", explode("data.devices")) \
    .drop("data")

In [7]:
# Check the schema of the exploded_df,  post a kafka message and change readStream to read 
# exploded_df.printSchema()
# exploded_df.show(truncate=False)

root
 |-- customerId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- devices: struct (nullable = true)
 |    |-- deviceId: string (nullable = true)
 |    |-- measure: string (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- temperature: long (nullable = true)

+----------+------------------------------------+-----------+--------------+--------------------------+----------------------+
|customerId|eventId                             |eventOffset|eventPublisher|eventTime                 |devices               |
+----------+------------------------------------+-----------+--------------+--------------------------+----------------------+
|CI00101   |1450324a-c546-4175-a6d8-ee58822e1d41|10038      |device        |2023-01-05 11:13:53.650313|{D004, C, SUCCESS, 20}|
|CI00101   |1450324a-c546-4175-a6d8-ee58822e1d41|10038      

                                                                                

In [6]:
# Flatten the exploded df
flattened_df = exploded_df \
    .selectExpr("customerId", "eventId", "eventOffset", "eventPublisher", "cast(eventTime as timestamp) as eventTime", 
                "devices.deviceId as deviceId", "devices.measure as measure", 
                "devices.status as status", "devices.temperature as temperature") 

In [26]:
# Check the schema of the flattened_df,  post a kafka message and change readStream to read 
# flattened_df.printSchema()
# flattened_df.show(truncate=False)

root
 |-- customerId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: timestamp (nullable = true)
 |-- deviceId: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- status: string (nullable = true)
 |-- temperature: long (nullable = true)

+----------+------------------------------------+-----------+--------------+--------------------------+--------+-------+-------+-----------+
|customerId|eventId                             |eventOffset|eventPublisher|eventTime                 |deviceId|measure|status |temperature|
+----------+------------------------------------+-----------+--------------+--------------------------+--------+-------+-------+-----------+
|CI00101   |1450324a-c546-4175-a6d8-ee58822e1d41|10038      |device        |2023-01-05 11:13:53.650313|D004    |C      |SUCCESS|20         |
|CI00101   |1450324a-c546-4175-a6d8-ee58822e1d41|10038      

                                                                                

In [7]:
# Aggregate the dataframes to find the average temparature
# per Customer per device throughout the day for SUCCESS events
from pyspark.sql.functions import to_date, avg

agg_df = flattened_df.where("STATUS = 'SUCCESS'") \
    .withColumn("eventDate", to_date("eventTime", "yyyy-MM-dd")) \
    .groupBy("customerId","deviceId","eventDate") \
    .agg(avg("temperature").alias("avg_temp"))

In [32]:
# Check the schema of the agg_df, post a kafka message and change readStream to read 
# agg_df.printSchema()
# agg_df.show()

root
 |-- customerId: string (nullable = true)
 |-- deviceId: string (nullable = true)
 |-- eventDate: date (nullable = true)
 |-- avg_temp: double (nullable = true)

+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   CI00101|    D002|2023-01-05|    21.0|
|   CI00101|    D004|2023-01-05|    10.5|
+----------+--------+----------+--------+



                                                                                

In [8]:
# Write the output to console sink to check the output
writing_df = agg_df.writeStream \
    .format("console") \
    .option("checkpointLocation","checkpoint_dir") \
    .outputMode("complete") \
    .start()
    
# Start the streaming application to run until the following happens
# 1. Exception in the running program
# 2. Manual Interruption
writing_df.awaitTermination()

23/01/06 07:24:10 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   CI00101|    D002|2023-01-05|    21.0|
|   CI00101|    D004|2023-01-05|    10.5|
+----------+--------+----------+--------+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   CI00101|    D002|2023-01-05|    21.0|
|   CI00101|    D004|2023-01-05|    10.5|
+----------+--------+----------+--------+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   CI00101|    D002|2023-01-05|    21.0|
|   CI00101|    D004|2023-01-05|    10.5|
+----------+--------+----------+--------+



                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   CI00105|    D002|2023-01-05|    20.0|
|   CI00101|    D002|2023-01-05|    21.0|
|   CI00101|    D004|2023-01-05|    10.5|
+----------+--------+----------+--------+

-------------------------------------------
Batch: 4
-------------------------------------------
+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   CI00105|    D002|2023-01-05|    20.0|
|   CI00101|    D002|2023-01-05|    21.0|
|   CI00101|    D004|2023-01-05|    10.5|
+----------+--------+----------+--------+



                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   CI00105|    D002|2023-01-05|    20.0|
|   CI00101|    D002|2023-01-05|    21.0|
|   CI00114|    null|2023-01-06|     3.0|
|   CI00101|    D004|2023-01-05|    10.5|
+----------+--------+----------+--------+

-------------------------------------------
Batch: 6
-------------------------------------------
+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   CI00105|    D002|2023-01-05|    20.0|
|   CI00101|    D002|2023-01-05|    21.0|
|   CI00114|    null|2023-01-06|     3.0|
|   CI00101|    D004|2023-01-05|    10.5|
+----------+--------+----------+--------+

-------------------------------------------
Batch: 7
-------------------------------------------
+----------+--------+----------+---

                                                                                

-------------------------------------------
Batch: 9
-------------------------------------------
+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   CI00105|    D002|2023-01-05|    20.0|
|   CI00101|    D002|2023-01-05|    21.0|
|   CI00114|    null|2023-01-06|     3.0|
|   CI00101|    D004|2023-01-05|    10.5|
|   CI00118|    D001|2023-01-06|    23.0|
+----------+--------+----------+--------+

-------------------------------------------
Batch: 10
-------------------------------------------
+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   CI00105|    D002|2023-01-05|    20.0|
|   CI00101|    D002|2023-01-05|    21.0|
|   CI00114|    null|2023-01-06|     3.0|
|   CI00101|    D004|2023-01-05|    10.5|
|   CI00118|    D001|2023-01-06|    23.0|
+----------+--------+----------+--------+

-------------------------------------------
Bat

                                                                                

-------------------------------------------
Batch: 19
-------------------------------------------
+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   CI00105|    D002|2023-01-05|    20.0|
|   CI00101|    D002|2023-01-05|    21.0|
|   CI00112|    D003|2023-01-06|    15.0|
|   CI00101|    D004|2023-01-05|    10.5|
|   CI00114|    null|2023-01-06|     3.0|
|   CI00110|    D001|2023-01-06|    14.0|
|   CI00116|    D005|2023-01-06|     2.0|
|   CI00118|    D001|2023-01-06|    23.0|
|   CI00106|    null|2023-01-06|    18.0|
+----------+--------+----------+--------+

-------------------------------------------
Batch: 20
-------------------------------------------
+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   CI00105|    D002|2023-01-05|    20.0|
|   CI00101|    D002|2023-01-05|    21.0|
|   CI00112|    D003|2023-01-06|    15.0|
|   C

                                                                                

-------------------------------------------
Batch: 21
-------------------------------------------
+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   CI00105|    D002|2023-01-05|    20.0|
|   CI00101|    D002|2023-01-05|    21.0|
|   CI00112|    D003|2023-01-06|    15.0|
|   CI00101|    D004|2023-01-05|    10.5|
|   CI00114|    null|2023-01-06|     3.0|
|   CI00110|    D001|2023-01-06|    14.0|
|   CI00116|    D005|2023-01-06|     2.0|
|   CI00118|    D001|2023-01-06|    23.0|
|   CI00106|    null|2023-01-06|    18.0|
+----------+--------+----------+--------+



                                                                                

-------------------------------------------
Batch: 22
-------------------------------------------
+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   CI00105|    D002|2023-01-05|    20.0|
|   CI00101|    D002|2023-01-05|    21.0|
|   CI00112|    D003|2023-01-06|    15.0|
|   CI00101|    D004|2023-01-05|    10.5|
|   CI00114|    null|2023-01-06|     3.0|
|   CI00110|    D001|2023-01-06|    14.0|
|   CI00116|    D005|2023-01-06|     2.0|
|   CI00118|    D001|2023-01-06|    23.0|
|   CI00106|    null|2023-01-06|    18.0|
+----------+--------+----------+--------+

-------------------------------------------
Batch: 23
-------------------------------------------
+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   CI00105|    D002|2023-01-05|    20.0|
|   CI00101|    D002|2023-01-05|    21.0|
|   CI00112|    D003|2023-01-06|    15.0|
|   C

                                                                                

-------------------------------------------
Batch: 30
-------------------------------------------
+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   CI00105|    D002|2023-01-05|    20.0|
|   CI00101|    D002|2023-01-05|    21.0|
|   CI00112|    D003|2023-01-06|    15.0|
|   CI00104|    D005|2023-01-06|     4.0|
|   CI00101|    D004|2023-01-05|    10.5|
|   CI00114|    null|2023-01-06|     3.0|
|   CI00110|    D001|2023-01-06|    14.0|
|   CI00116|    D005|2023-01-06|     2.0|
|   CI00101|    D003|2023-01-06|    28.0|
|   CI00118|    D001|2023-01-06|    23.0|
|   CI00104|    D001|2023-01-06|    23.0|
|   CI00106|    null|2023-01-06|    18.0|
+----------+--------+----------+--------+

-------------------------------------------
Batch: 31
-------------------------------------------
+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   C

                                                                                

-------------------------------------------
Batch: 33
-------------------------------------------
+----------+--------+----------+--------+
|customerId|deviceId| eventDate|avg_temp|
+----------+--------+----------+--------+
|   CI00105|    D002|2023-01-05|    20.0|
|   CI00101|    D002|2023-01-05|    21.0|
|   CI00112|    D003|2023-01-06|    15.0|
|   CI00104|    D005|2023-01-06|     4.0|
|   CI00101|    D004|2023-01-05|    10.5|
|   CI00114|    null|2023-01-06|     3.0|
|   CI00110|    D001|2023-01-06|    14.0|
|   CI00119|    D003|2023-01-06|    17.0|
|   CI00116|    D005|2023-01-06|     2.0|
|   CI00101|    D003|2023-01-06|    28.0|
|   CI00118|    D001|2023-01-06|    23.0|
|   CI00104|    D001|2023-01-06|    23.0|
|   CI00117|    D003|2023-01-06|    13.0|
|   CI00106|    null|2023-01-06|    18.0|
+----------+--------+----------+--------+

-------------------------------------------
Batch: 34
-------------------------------------------
+----------+--------+----------+--------+
|cust

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/local/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 