In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Streaming from Kafka") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]") 
    .getOrCreate()
)

spark

In [2]:
kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "ed-kafka:29092")
    .option("subscribe", "device_data")
    .option("startingOffsets", "earliest")
    .load()
)

In [3]:
# View schema for raw kafka_df
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
# kafka_df.show()

In [5]:
# Parse value from binay to string into kafka_json_df
from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))

In [6]:
# kafka_json_df.show()

In [7]:
# Schema of the Pyaload

from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

json_schema = (
    StructType(
    [
        StructField('eventId', StringType(), True), 
        StructField('eventOffset', LongType(), True), 
        StructField('eventPublisher', StringType(), True),         
        StructField('customerId', StringType(), True), 
        StructField('data', StructType(
                                        [StructField('devices', 
                                             ArrayType(StructType([ 
                                                StructField('deviceId', StringType(), True), 
                                                StructField('measure', StringType(), True), 
                                                StructField('status', StringType(), True), 
                                                StructField('temperature', LongType(), True)
                                            ]), True), True)
        ]), True), 
    StructField('eventTime', StringType(), True)
    ])
)

{"eventId": "aa90011f-3967-496c-b94b-a0c8de19a3d3",
 "eventOffset": 10003, 
 "eventPublisher": "device",
 "customerId": "CI00108", 
 "data": {"devices": [{"deviceId": "D004", "temperature": 16, "measure": "C", "status": "SUCCESS"}]},
 "eventTime": "2023-01-05 11:13:53.643364"}

{'eventId': 'aa90011f-3967-496c-b94b-a0c8de19a3d3',
 'eventOffset': 10003,
 'eventPublisher': 'device',
 'customerId': 'CI00108',
 'data': {'devices': [{'deviceId': 'D004',
    'temperature': 16,
    'measure': 'C',
    'status': 'SUCCESS'}]},
 'eventTime': '2023-01-05 11:13:53.643364'}

In [8]:
from pyspark.sql.functions import from_json,col

streaming_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema)).selectExpr("values_json.*")

In [9]:
# streaming_df.show()

In [10]:
streaming_df.printSchema()

root
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventTime: string (nullable = true)



In [11]:
# Lets explode the data as devices contains list/array of device reading
from pyspark.sql.functions import explode

exploded_df = streaming_df.withColumn("data_devices", explode("data.devices"))

In [12]:
# Check the schema of the exploded_df, place a sample json file and change readStream to read 
exploded_df.printSchema()

root
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- data_devices: struct (nullable = true)
 |    |-- deviceId: string (nullable = true)
 |    |-- measure: string (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- temperature: long (nullable = true)



In [13]:
# Flatten the exploded df
from pyspark.sql.functions import col

flattened_df = (
    exploded_df
    .drop("data")
    .withColumn("deviceId", col("data_devices.deviceId"))
    .withColumn("measure", col("data_devices.measure"))
    .withColumn("status", col("data_devices.status"))
    .withColumn("temperature", col("data_devices.temperature"))
    .drop("data_devices")
)

In [14]:
# Check the schema of the flattened_df, place a sample json file and change readStream to read 
flattened_df.printSchema()

root
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- customerId: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- deviceId: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- status: string (nullable = true)
 |-- temperature: long (nullable = true)



In [15]:
# flattened_df.show(truncate=False)

In [30]:
from pyspark.sql.functions import to_json, struct
kafka_ready_df  = (
    flattened_df
    .withColumn("value", to_json(struct(*flattened_df.columns)))  # Convert entire row to JSON
    .select("value")) 

In [None]:
query_kafka = (kafka_ready_df
               .writeStream
               .format("kafka")
               .option("kafka.bootstrap.servers", "ed-kafka:29092")
               .option("topic", "device_data")  
               .option("checkpointLocation", "checkpoint_dir_kafka")
               .start())

query_kafka.awaitTermination()

In [24]:
# console_query = (flattened_df
#  .writeStream
#  .format("console")
#  .outputMode("append")
#  .option("checkpointLocation", "checkpoint_dir_kafka")
#  .start()
#  .awaitTermination())

# console_query = (flattened_df
#  .writeStream
#  .format("csv")
#  .option("path", "output_dir")
#  .outputMode("append")
#  .option("checkpointLocation", "checkpoint_dir_kafka")
#  .start()
#  .awaitTermination())

(flattened_df
               .writeStream
               .format("kafka")
               .option("kafka.bootstrap.servers", "ed-kafka:29092")
               .option("topic", "device_data")
               .option("checkpointLocation", "checkpoint_dir_kafka")
               .start()
              .awaitTermination())

StreamingQueryException: Query [id = 26d762a0-c2cd-4dd5-8e51-c84b73bdda77, runId = 474150a0-34e4-4ceb-87c7-04959405399a] terminated with exception: Required attribute 'value' not found