In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

scala_version = '2.12'
spark_version = '3.4.1'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:2.13.0'
]

spark = SparkSession.builder.master("local").appName("kafka-example").config("spark.jars.packages",",".join(packages)).getOrCreate()
spark

In [2]:
topic_name = 'RandomNumber'
kafka_server = 'localhost:9092'

kafka_Df = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_server) \
        .option("subscribe", topic_name) \
        .option("startingOffsets", "earliest") \
        .load()

In [5]:
columns_to_select = [col for col in kafka_Df.columns if col != "timestamp"]
selected_df = kafka_Df.select(*columns_to_select)

# Convert the selected Spark DataFrame to a Pandas DataFrame
pandas_df = selected_df.toPandas()

# Show the Pandas DataFrame
pandas_df

Unnamed: 0,key,value,topic,partition,offset,timestampType
0,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,0,0
1,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,1,0
2,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,2,0
3,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,3,0
4,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,4,0
...,...,...,...,...,...,...
2111,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,2111,0
2112,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,2112,0
2113,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,2113,0
2114,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,2114,0


In [19]:
from pyspark.sql.functions import col

batchDF = kafka_Df.select(col('topic'),col('offset'),col('value').cast('string').substr(12,1).alias('rand_number'))
from time import sleep
from IPython.display import display, clear_output

for x in range(0, 2000):
    try:
        print("Showing live view refreshed every 5 seconds")
        print(f"Seconds passed: {x*5}")
        display(batchDF.toPandas())
        sleep(5)
        clear_output(wait=True)
    except KeyboardInterrupt:
        print("break")
        break
print("Live view ended...")

Showing live view refreshed every 5 seconds
Seconds passed: 160


Unnamed: 0,topic,offset,rand_number
0,RandomNumber,0,0
1,RandomNumber,1,1
2,RandomNumber,2,2
3,RandomNumber,3,3
4,RandomNumber,4,4
...,...,...,...
1688,RandomNumber,1688,2
1689,RandomNumber,1689,2
1690,RandomNumber,1690,2
1691,RandomNumber,1691,2


break
Live view ended...


In [20]:
batchCountDF = batchDF.groupBy('rand_number').count()
for x in range(0, 2000):
    try:
        print("Showing live view refreshed every 5 seconds")
        print(f"Seconds passed: {x*5}")
        display(batchDF.toPandas())
        sleep(5)
        clear_output(wait=True)
    except KeyboardInterrupt:
        print("break")
        break
print("Live view ended...")

Showing live view refreshed every 5 seconds
Seconds passed: 25


Unnamed: 0,topic,offset,rand_number
0,RandomNumber,0,0
1,RandomNumber,1,1
2,RandomNumber,2,2
3,RandomNumber,3,3
4,RandomNumber,4,4
...,...,...,...
1697,RandomNumber,1697,2
1698,RandomNumber,1698,2
1699,RandomNumber,1699,2
1700,RandomNumber,1700,2


break
Live view ended...


In [6]:
from pyspark.sql.functions import col

topic_name = 'RandomNumber'
kafka_server = 'localhost:9092'

streamRawDf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafka_server).option("subscribe", topic_name).load()
streamDF = streamRawDf.select(col('topic'),col('offset'),col('value').cast('string').substr(12,1).alias('rand_number'))
checkEvenDF = streamDF.withColumn('Is_Even',col('rand_number').cast('int') % 2 == 0 )

In [7]:
from random import randint
randNum=str(randint(0,10000))
q1name = "queryNumber"+randNum
q2name = "queryCheckEven"+randNum
stream_writer1 = (streamDF.writeStream.queryName(q1name).trigger(processingTime="5 seconds").outputMode("append").format("memory"))
stream_writer2 = (checkEvenDF.writeStream.queryName(q2name).trigger(processingTime="5 seconds").outputMode("append").format("memory"))
query1 = stream_writer1.start()
query2 = stream_writer2.start()

In [8]:
from time import sleep
from IPython.display import display, clear_output

for x in range(0, 2000):
    try:
        print("Showing live view refreshed every 5 seconds")
        print(f"Seconds passed: {x*5}")
        result1 = spark.sql(f"SELECT * from {query1.name}")
        result2 = spark.sql(f"SELECT * from {query2.name}")
        display(result1.toPandas())
        display(result2.toPandas())
        sleep(5)
        clear_output(wait=True)
    except KeyboardInterrupt:
        print("break")
        break
print("Live view ended...")

Showing live view refreshed every 5 seconds
Seconds passed: 670


Unnamed: 0,topic,offset,rand_number
0,RandomNumber,2124,2
1,RandomNumber,2125,2
2,RandomNumber,2126,2
3,RandomNumber,2127,2
4,RandomNumber,2128,2
...,...,...,...
143,RandomNumber,2267,1
144,RandomNumber,2268,1
145,RandomNumber,2269,1
146,RandomNumber,2270,1


Unnamed: 0,topic,offset,rand_number,Is_Even
0,RandomNumber,2124,2,True
1,RandomNumber,2125,2,True
2,RandomNumber,2126,2,True
3,RandomNumber,2127,2,True
4,RandomNumber,2128,2,True
...,...,...,...,...
142,RandomNumber,2266,1,False
143,RandomNumber,2267,1,False
144,RandomNumber,2268,1,False
145,RandomNumber,2269,1,False


break
Live view ended...
