In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode

# Create a Spark session
spark = SparkSession.builder \
    .appName("ExamplePySparkNotebook") \
    .getOrCreate()

# Read from Kafka topic
input_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "input_topic") \
    .load()

# Split the input text into words
words = input_stream.select(explode(split(input_stream.value, " ")).alias("word"))

# Perform word count
word_count = words.groupBy("word").count()

# Write the result to HDFS
word_count.writeStream \
    .outputMode("complete") \
    .format("parquet") \
    .option("path", "/hdfs_output/word_count") \
    .option("checkpointLocation", "/hdfs_output/checkpoints") \
    .start() \
    .awaitTermination()