In [1]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Streaming from Kafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") \
    .getOrCreate()

spark

:: loading settings :: url = jar:file:/usr/local/spark-3.2.4-bin-hadoop2.7/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3ccb74c5-5a04-47c9-b00b-878afbdffc96;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.4 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.4 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central

In [2]:
KAFKA_BOOTSTRAP_SERVERS= "kafka1:19091,kafka2:19092,kafka3:19093"
schema = StructType([
    StructField("eventId", StringType()),
    StructField("eventOffset", StringType()),
    StructField("eventPublisher", StringType()),
    StructField("customerId", StringType()),
    StructField("data", StructType([
        StructField("devices", ArrayType(StructType([
            StructField("deviceId", StringType()),
            StructField("temperature", IntegerType()),
            StructField("measure", StringType()),
            StructField("status", StringType()),
        ])))
    ])),
    StructField("eventTime", StringType())
])

In [3]:
df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
    .option("subscribe", "devices") \
    .option("startingOffsets", "earliest") \
    .load() \
    .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) \
    .select(col("parsed_value.*"))

df.show(10, True)



+--------------------+-----------+--------------+----------+--------------------+--------------------+
|             eventId|eventOffset|eventPublisher|customerId|                data|           eventTime|
+--------------------+-----------+--------------+----------+--------------------+--------------------+
|b947256f-3257-4c1...|      10000|        device|   CI00117|{[{D002, 19, C, S...|2024-02-01 10:46:...|
|03462a43-d288-425...|      10001|        device|   CI00119|{[{D002, 14, C, n...|2024-02-01 10:46:...|
|c4a27012-73b3-47d...|      10002|        device|   CI00112|{[{D003, 0, C, SU...|2024-02-01 10:46:...|
|b400766d-9f53-42c...|      10003|        device|   CI00120|                {[]}|2024-02-01 10:46:...|
|9d25c42f-f12b-4cf...|      10004|        device|   CI00106|{[{D002, 2, C, ER...|2024-02-01 10:46:...|
|e767577b-3de5-45a...|      10005|        device|   CI00116|{[{D002, 16, C, S...|2024-02-01 10:46:...|
|d8bdddde-f249-463...|      10006|        device|   CI00102|             

                                                                                

In [None]:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show(10, False)

In [4]:
checkpointDir = '/home/jovyan/work/utils/checkpoint'

In [5]:
lines = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
    .option("subscribe", "devices") \
    .option("checkpointLocation", checkpointDir) \
    .load() \
    .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) \
    .select(col("parsed_value.*"))

In [None]:
query = lines \
    .writeStream \
    .trigger(processingTime='5 seconds') \
    .outputMode('Append') \
    .format('console') \
    .start()

query.awaitTermination()

24/02/01 10:51:40 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-78f26219-9cc5-469e-bd25-c41f02f366b3. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/02/01 10:51:40 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----------+--------------+----------+----+---------+
|eventId|eventOffset|eventPublisher|customerId|data|eventTime|
+-------+-----------+--------------+----------+----+---------+
+-------+-----------+--------------+----------+----+---------+



24/02/01 10:51:46 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 5048 milliseconds
