In [3]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (
    SparkSession.builder
    .appName("kafka-streaming")
    .config(
        "spark.jars.packages",
        "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5"
    )
    .getOrCreate()
)


sc = spark.sparkContext

In [4]:
# Read the message from the kafka stream
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "ingestion-topic") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

DataFrame[key: string, value: string]

In [5]:
# Create a small temporary view for SparkSQL
df.createOrReplaceTempView("message")

In [6]:
# Write out the message to the console of the environment
res = spark.sql("SELECT * from message")
res.writeStream.format("console") \
            .outputMode("append") \
            .start() 

<pyspark.sql.streaming.StreamingQuery at 0x7ffff4968390>

In [None]:
# Write the message back into Kafka in another topic that you are going to listen to with a local consumer
# ds = df \
#   .writeStream \
#   .format("kafka") \
#   .option("kafka.bootstrap.servers", "kafka:9092") \
#   .option("topic", "spark-output") \
#   .option("checkpointLocation", "/tmp") \
#   .start() \
#   .awaitTermination()

In [8]:
from pyspark.sql.functions import to_json, struct

df_out = df.select(
    to_json(struct("*")).cast("string").alias("value")
)


In [9]:
ds = (
    df_out
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("topic", "spark-output")
    .option("checkpointLocation", "/tmp/spark-output-checkpoint")
    .start()
)
