<h1><center>DS420: PE6 Sparking Streaming with Kafka Source

## Goal:

In this programming exercise, we are going to repeat the word count example from the last lecture, but using Kafka source. 

In addition, we will be working with windowed streaming, and use Pandas to visualize the data. 

In [50]:
import findspark
findspark.init('/opt/spark')

## Create a Spark session, and named it as PE6_xxx, where xxx is your last name

In [51]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *


In [52]:
# Just change the appName but don't modify the configurations!

spark = SparkSession.builder.appName('sparkStreaming_PE6_moynihan')          \
                    .config('spark.jars.packages','org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1')\
                    .config('spark.jars.packages','org.apache.kafka:kafka-clients:2.4.1')\
                    .getOrCreate()

## Read data from kafka into a streaming DataFrame. Ensure to supply the bootstrap server and the channel, and include the timestamp as you read it in. 

Help link 1: [Kafka integration](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)
Help link 2: [Windowed streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time)


In [76]:
# Read data from kafka into a streaming DataFrame
# Ensure to supply the bootstrap server and the channel;
# and include the timestamp as you read it. 

df = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "ds420-moynihan") \
  .option('includeTimestamp','True')\
  .load()




In [77]:
# Check if dataframe is streaming type
df.isStreaming


True

In [78]:
# Check out the schema
df.printSchema()


root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



## Select only `timestamp`, `key` and `value` columns. Cast `key` and `value` columns into String type.

selectExpr() it takes set of SQL expressions in a string to execute. This gives an ability to run SQL like expressions without creating a temporary table and views.

Checkout the `selectExpr()` function in this [code example](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing)

In [79]:
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", 'timestamp')



In [80]:
# Checkout schema again
df.printSchema()



root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



## Use a query to count the occurences of each word

In [81]:
# Split the lines into words
from pyspark.sql.functions import explode, split
words = df.select(explode(split(df.value, " ")).alias('words'))


In [82]:
wordCounts = words.groupBy('words').count()
#start the query

query = (wordCounts.writeStream
         .outputMode('complete')
         .format('memory')
         .queryName('wordCountsQuery')
         .start())



22/04/25 12:15:27 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-83eb14b7-8682-4872-9f41-8012654debf9. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


## Now invoke a kafka producer and start to type words on it. In the meanwhile, run the following query within a for loop to see the results.

In [83]:
import time


for x in range(10):
    df = spark.sql('SELECT * FROM wordCountsQuery')
    df.show(10)
    time.sleep(5)




+-----+-----+
|words|count|
+-----+-----+
+-----+-----+



                                                                                

+-----+-----+
|words|count|
+-----+-----+
+-----+-----+



                                                                                

+-----+-----+
|words|count|
+-----+-----+
|hello|    1|
+-----+-----+

+-----+-----+
|words|count|
+-----+-----+
|hello|    1|
+-----+-----+

+-----+-----+
|words|count|
+-----+-----+
|hello|    1|
+-----+-----+

+-----+-----+
|words|count|
+-----+-----+
|hello|    1|
+-----+-----+

+-----+-----+
|words|count|
+-----+-----+
|hello|    1|
+-----+-----+

+-----+-----+
|words|count|
+-----+-----+
|hello|    1|
+-----+-----+

+-----+-----+
|words|count|
+-----+-----+
|hello|    1|
+-----+-----+

+-----+-----+
|words|count|
+-----+-----+
|hello|    1|
+-----+-----+



## Write a windowed query to count the words. 

In [60]:
from pyspark.sql import Window

In [61]:
wordsWindow = df.select(explode(split(df["value"], " ")).alias('word'), df['timestamp'])




In [62]:
wordsWindow.printSchema()

root
 |-- word: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [63]:
# window(df.timestamp, window_size, slide_size)
# window_size must > slide_size

from pyspark.sql.functions import window
windowedCounts = wordsWindow.groupBy(
    window(wordsWindow['timestamp'],'1 minutes','30 seconds'),
    wordsWindow['word']
).count().orderBy('window')




In [65]:
query2 = (windowedCounts.writeStream.outputMode('complete').
          format('memory').
          option('truncate','fakse').
          queryName('WindowedWordCountQuery').
          start())




22/04/25 11:58:59 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-17d2390e-148d-4cce-a261-9bfe2314110e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/04/25 11:58:59 WARN ClientUtils: Removing server host1:9092 from bootstrap.servers as DNS resolution failed for host1
22/04/25 11:58:59 WARN KafkaOffsetReader: Error in attempt 1 getting Kafka offsets: 
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.

22/04/25 11:59:01 WARN ClientUtils: Removing server host1:9092 from bootstrap.servers as DNS resolution failed for host1
22/04/25 11:59:01 WARN KafkaOffsetReader: Error in attempt 3 getting Kafka offsets: 
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:76)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:88)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:538)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:600)
	at scala.runtime.java8.JFunction0$mcV$sp.appl

## Again, run the following query within a for loop to see the results.

In [66]:
import time

for i in range(20):
    df = spark.sql('SELECT * FROM WindowedWordCountQuery')
    df.show(20, False)
    time.sleep(5)



+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

+------+----+-----+
|window|

## Visualize the word counts in action by using a barplot from seaborn.

In [67]:
import time
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
# Only works for Jupyter Notebooks!
%matplotlib inline 

In [73]:
for i in range(10):
    
    time.sleep( 5 )
    df = spark.sql('SELECT * FROM wordCountsQuery')
    df_pd = df.toPandas()
    display.clear_output(wait=True)
    plt.figure(figsize = (10,8))
    sns.barplot(data = df_pd, x='count',y='word')
    plt.show()

ValueError: Could not interpret input 'word'

<Figure size 720x576 with 0 Axes>

## Stop all queries

In [84]:
# Stop query
query.stop()

In [85]:
# Stop query2
query2.stop()