In [1]:
import os

SCALA_VERSION = '2.12'
SPARK_VERSION = '3.1.3'
# Download Kafka Jar file, this for readStream.format("kafka"), "kafka" is a driver
# kafka driver code is part of Maven Jar file
# https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12/3.1.3
# pyspark-shell shall download the jar file behind..
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION} pyspark-shell'

In [2]:
# connect kafka with spark with simple word count example
# run on a terminal after starting kafka
# kafka-topics  --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic words
# kafka-console-producer --bootstrap-server localhost:9092 --topic words


# here we implement windowed word count, the word count would reset 10 minute, we fix  10 minute window, sliding 5 minutes

# kafka-topics  --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic word-counts-10min 
# kafka-console-consumer --bootstrap-server localhost:9092 --topic  word-counts-10min  --from-beginning  

In [2]:
import findspark
findspark.init()

In [4]:
import pyspark


from pyspark.sql import SparkSession
# spark groupBy has default setting for spark.sql.shuffle.partitions as 200
# we set to  4, should NOT be done in production 
spark = SparkSession.builder.master("local[1]")\
                            .config('spark.sql.shuffle.partitions', 4)\
                            .appName("SparkStreamingKafkaSlidingWindow").getOrCreate()

In [5]:
# read from kafka, here spark is consumer for kafka topic called test
# spark streaming works as dataframe/sql
# group.id is consumer group id
# subcribe is kafka topic
# "kafka" driver is not available by default with spark, we need to download it, we did on cell 1
kafkaDf = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "localhost:9092")\
  .option("subscribe", "words")\
  .option("group.id", "wordcount-group-10min-sliding-window")\
  .load()

In [6]:
kafkaDf.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [7]:
linesDf = kafkaDf.selectExpr("CAST(value AS STRING)", "timestamp")
linesDf.printSchema() # we get only value as string

root
 |-- value: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [8]:
import pyspark.sql.functions as F
# split line into word list
# flatten word list into individual element as output, similar to flatMap

wordsDf = linesDf.select(F.explode(F.split(linesDf.value," ")).alias("word"), "timestamp" )
wordsDf.printSchema()
# apply 10 Minute Window
# groupBy can be useful to mention more columns, we are 10 minute window as another group by
# within minute, the count start from 0, goes on based on number words
# Slide Window
wordCountsDf = wordsDf.groupBy("word", F.window(wordsDf.timestamp, "10 minutes", "5 minutes")).count()
wordCountsDf.printSchema()

root
 |-- word: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

root
 |-- word: string (nullable = true)
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)



In [9]:
echoOnconsole = wordCountsDf\
                .writeStream\
                .outputMode("complete")\
                .format("console")\
                .start() # start the query. spark will subscribe for data

22/03/14 21:51:39 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b940d8e5-d0e8-4638-840a-d73034abb0b4. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


In [10]:
import pyspark.sql.functions as F

# convert all the columns into json
# * represent all columns ie word, count, 
# struct create a structure around word, count columns
# to json convert structure to column
# value is Kafka value part of message
wordCountsToKafkaDf = wordCountsDf\
                    .selectExpr("to_json(struct(*)) as value")

wordCountsToKafkaDf.printSchema()

root
 |-- value: string (nullable = true)



                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----+------+-----+
|word|window|count|
+----+------+-----+
+----+------+-----+



In [11]:
wordCountsToKafkaDf.writeStream.format("kafka")\
                    .outputMode("complete")\
                     .option("kafka.bootstrap.servers", "localhost:9092")\
                    .option("topic", "word-counts-10min")\
                    .option("checkpointLocation", "file:///tmp/spark20")\
                    .start()
                    

<pyspark.sql.streaming.StreamingQuery at 0x7f9f78387450>

22/03/14 21:52:07 ERROR MicroBatchExecution: Query [id = 570b4fc9-742d-4149-bb56-ff2fc1c86500, runId = 46fe913e-5e29-4bee-8ae4-a8ec597f1106] terminated with error
java.lang.IllegalStateException: Set(test-0) are gone. Some data may have been missed.. 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".
    
	at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.reportDataLoss(KafkaMicroBatchStream.scala:209)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$planInputPartitions$1(KafkaMicroBatchStream.scala:104)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$planInputPartitions$1$adapted(KafkaMicroBatchStream.scala:104)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.getOffsetRanges