kafka integration guide https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

In [1]:
from pyspark.sql import SparkSession

scala_version = '2.12'
spark_version = '3.5.0'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.5.0'
]

spark = (
    SparkSession
    .builder
    .master("local[*]")
    .config('spark.jars.packages', ",".join(packages))
    .getOrCreate()
)

read streaming data from kafka as a batch

In [2]:
kafkaBatch = (
    spark
        .read
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka-broker:29092") # kafka broker
        .option("subscribe", "web-events") # topic
        .option("startingOffsets", "earliest") # starting at offset 0 for all partitions
        .option("endingOffsets", '{"web-events":{"0":10}}') # 10 events from partition 0
        .load()
)
kafkaBatch.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)


show the contents of the batched events from kafka, shows binary values for key and value

In [3]:
kafkaBatch.show(5)

+--------------------+--------------------+----------+---------+------+--------------------+-------------+
|                 key|               value|     topic|partition|offset|           timestamp|timestampType|
+--------------------+--------------------+----------+---------+------+--------------------+-------------+
|[62 39 39 37 31 3...|[7B 22 65 76 65 6...|web-events|        0|     0|2024-03-09 16:02:...|            0|
|[64 62 64 61 39 3...|[7B 22 65 76 65 6...|web-events|        0|     1|2024-03-09 16:02:...|            0|
|[38 62 34 66 62 3...|[7B 22 65 76 65 6...|web-events|        0|     2|2024-03-09 16:02:...|            0|
|[38 32 39 38 66 6...|[7B 22 65 76 65 6...|web-events|        0|     3|2024-03-09 16:02:...|            0|
|[36 66 32 66 61 6...|[7B 22 65 76 65 6...|web-events|        0|     4|2024-03-09 16:02:...|            0|
+--------------------+--------------------+----------+---------+------+--------------------+-------------+


cast key and value to strings and show again

In [5]:
(
    kafkaBatch
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    .show(5)
)

+--------------------+--------------------+
|                 key|               value|
+--------------------+--------------------+
|b9971839-698d-499...|{"event_id": "b99...|
|dbda989f-71a8-464...|{"event_id": "dbd...|
|8b4fb671-1d16-435...|{"event_id": "8b4...|
|8298ff27-a3c7-493...|{"event_id": "829...|
|6f2fab81-e0ca-4c1...|{"event_id": "6f2...|
+--------------------+--------------------+


In [6]:
(
    kafkaBatch
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    .show(1, truncate=False)
)

+------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------