In [1]:
import os

SCALA_VERSION = '2.11'
SPARK_VERSION = '2.4.7'

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION} pyspark-shell'

import findspark
findspark.init() # Shift + Enter

In [None]:
# open command prompt
#     kafka-topics  --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic words
#     kafka-topics  --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic word-counts 

# open command prompt

#     kafka-console-producer --bootstrap-server localhost:9092 --topic words

# open command prompt

#     kafka-console-consumer --bootstrap-server localhost:9092 --topic  word-counts  --from-beginning 

In [2]:
import pyspark

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]")\
 .config('spark.sql.shuffle.partitions', 4)\
                            .appName("SparkWordCountStreamKafka").getOrCreate()

In [3]:
# read from kafka, here spark is consumer for kafka topic called words
# spark streaming works as dataframe/sql
# group.id is consumer group id
# subcribe is kafka topic
# "kafka" driver is not available by default with spark, we need to download it, we did on cell 1

kafkaDf = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "localhost:9092")\
  .option("subscribe", "words")\
  .option("group.id", "wordcount-group")\
  .load()

In [4]:
# key and value are binary type, we need to CAST To STRING type
kafkaDf.printSchema()
# timestampType values
# CreateTime:  Timestamp relates to message creation time as set by a Kafka client/producer
# LogAppendTime : Timestamp relates to the time a message was appended to a Kafka log.
# UnknownType

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
# Kafka value, key are in binary format
# we need to type cast to STRING, 
# we pick only value, ignore other column
linesDf = kafkaDf.selectExpr("CAST(value AS STRING)")
linesDf.printSchema() # we get only value as string

root
 |-- value: string (nullable = true)



In [6]:
import pyspark.sql.functions as F
# split line into word list
# flatten word list into individual element as output, similar to flatMap

wordsDf = linesDf.select(F.explode(F.split(linesDf.value," ")).alias("word") )

wordCountsDf = wordsDf.groupBy("word").count()
wordCountsDf.printSchema()

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = false)



In [7]:
# for readStream/spark stream, we cannot use df.show(), it won't work, instead if we want print on console
# we need use console provider
# we start stream, until this , all the above code is lazy evaluation
# complete output mode means, any update /new word, it prints all the result so far
# console is nothing jupyter console window
# checkpointLocation internal to spark, in case failure, spark can resume from checkpointLocation state
echoOnconsole = wordCountsDf\
                .writeStream\
                .outputMode("complete")\
                .format("console")\
                 .option("checkpointLocation", "file:///tmp/spark3")\
                .start() # start the query. spark will subscribe for data

In [8]:
# now publish the word count result (word, count columns) to kafka topic "word-counts", publish as json format
# {"word": "kafka", "count": 8}

# F is alias for all functions, we can access col by F.col 
import pyspark.sql.functions as F

# convert all the columns into json
# * represent all columns ie word, count, 
# struct create a structure around word, count columns
# to json convert structure to column
# value is Kafka value part of message
wordCountsToKafkaDf = wordCountsDf\
                    .selectExpr("to_json(struct(*)) as value")

wordCountsToKafkaDf.printSchema()

root
 |-- value: string (nullable = true)



In [9]:
# checkpointLocation is for storing local state, for system restart, system failure in between
# ensure to run kafka console consumer for topic word-count, commands are present in top of file
wordCountsToKafkaDf.writeStream.format("kafka")\
                    .outputMode("complete")\
                     .option("kafka.bootstrap.servers", "localhost:9092")\
                    .option("topic", "word-counts")\
                    .option("checkpointLocation", "file:///tmp/spark")\
                    .start()

<pyspark.sql.streaming.StreamingQuery at 0x272e0477548>