In [76]:
from pyspark.sql import SparkSession

scala_version = '2.12'
spark_version = '3.5.0'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.5.0'
]

spark = (
    SparkSession
    .builder
    .master("spark://spark-master:7077")
    .config('spark.jars.packages', ",".join(packages))
    .getOrCreate()
)

In [77]:
df = (
    spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka:29092")
        .option("subscribe", "web-events")
        .option("startingOffsets", "earliest")
        .load()
)

In [78]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)


In [79]:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

DataFrame[key: string, value: string]

In [80]:
(
    df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
        .writeStream
        .outputMode("append")
        .foreachBatch(lambda batch_df, batch_id: print(f"batch_id: {batch_id}"))
        .start()
        .awaitTermination(5)    
)

batch_id: 0
batch_id: 1
batch_id: 2
batch_id: 3
batch_id: 4
batch_id: 5
batch_id: 6
batch_id: 7
batch_id: 8
batch_id: 9
batch_id: 10
batch_id: 11
batch_id: 12
batch_id: 13
batch_id: 14
batch_id: 15
batch_id: 16
batch_id: 17
batch_id: 18
batch_id: 19
batch_id: 20
batch_id: 21
batch_id: 22


False

In [81]:
spark.stop()