In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Streaming from Kafka") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2')
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]") 
    .getOrCreate()
)

spark

In [2]:
# Create the kafka_df to read from kafka

kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:29092")
    .option("subscribe", "test_topic")
    .option("startingOffsets", "earliest")
    .load()
)

In [3]:
# View schema for raw kafka_df
kafka_df.printSchema()
# kafka_df.show()


root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
kafka_df \
    .selectExpr("CAST(value AS STRING) as message") \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("kafka_messages") \
    .start()

<pyspark.sql.streaming.StreamingQuery at 0xffff88144ca0>

In [8]:
spark.streams.active

[<pyspark.sql.streaming.StreamingQuery at 0xffff8215a320>]

In [9]:
# In another cell, check:
spark.sql("SELECT * FROM kafka_messages").show()

+-------+
|message|
+-------+
|    HHJ|
|      \|
|      H|
|  GJGUG|
|   jhvj|
|   jgbk|
|   vhgv|
|nzkvna;|
|   efvf|
|   edae|
|      q|
|  rqvqe|
|     w2|
+-------+

