## Kafka PySpark Integration

In [1]:
# Add kafka spark jar for kafka-spark Integration
# Docs - https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.13:3.3.0 pyspark-shell'

In [2]:
# Import required modules
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, DoubleType
from pyspark.sql.functions import col,from_json

In [3]:
# Check pyspark version
import pyspark
pyspark.__version__

'3.3.0'

In [4]:
# Create SparkSession
spark = SparkSession \
    .builder \
    .appName("KafkaPysparkExample") \
    .getOrCreate()

In [5]:
# Read Kafka Spark Structured Stream
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "iot_sim_test") \
  .load()

In [6]:
# Read Kafka Spark Structured Stream
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "iot_sim_test") \
  .load()

In [7]:
raw_df = df.selectExpr("CAST(value AS STRING)")

In [8]:
# Define Schema for input json
payload_schema = StructType([
    StructField("data", StructType([
        StructField("temperature", DoubleType(), True)
    ]), True)
])

schema = StructType([ 
    StructField("guid",StringType(),True), 
    StructField("destination",StringType(),True), 
    StructField("state",StringType(),True), 
    StructField("eventTime",StringType(),True), 
    StructField("payload",payload_schema,True)
  ])

In [9]:
# Parse json column in a dataframe
raw_df = raw_df.withColumn("jsonData",from_json(col("value"),schema)) \
                   .select("jsonData.*")
raw_df.printSchema()
# raw_df.show(truncate=False)

root
 |-- guid: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- state: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- data: struct (nullable = true)
 |    |    |-- temperature: double (nullable = true)



In [10]:
# Rename column and extract relevant column
raw_df_temp_state = raw_df.select(col("payload.data.temperature").alias("temperature"), col("state"))

In [11]:
# Calculate average temperature by State
avg_temp_state = raw_df_temp_state.groupby("state").mean("temperature")

In [12]:
avg_temp_state.writeStream.outputMode("complete").format("console").start()
# .awaitTermination()

<pyspark.sql.streaming.StreamingQuery at 0x7f28d071fc40>