In [1]:
! pip install spark


Collecting spark
  Using cached spark-0.2.1-py3-none-any.whl
Installing collected packages: spark
Successfully installed spark-0.2.1


In [2]:
!pip install kafka-python

Collecting kafka-python
  Using cached kafka_python-2.0.2-py2.py3-none-any.whl.metadata (7.8 kB)
Using cached kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2


In [3]:
!pip install findspark



In [4]:
!pip install Pandas

Collecting Pandas
  Using cached pandas-2.2.2-cp311-cp311-win_amd64.whl.metadata (19 kB)
Collecting numpy>=1.23.2 (from Pandas)
  Using cached numpy-1.26.4-cp311-cp311-win_amd64.whl.metadata (61 kB)
Collecting pytz>=2020.1 (from Pandas)
  Using cached pytz-2024.1-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from Pandas)
  Using cached tzdata-2024.1-py2.py3-none-any.whl.metadata (1.4 kB)
Using cached pandas-2.2.2-cp311-cp311-win_amd64.whl (11.6 MB)
Using cached numpy-1.26.4-cp311-cp311-win_amd64.whl (15.8 MB)
Using cached pytz-2024.1-py2.py3-none-any.whl (505 kB)
Using cached tzdata-2024.1-py2.py3-none-any.whl (345 kB)
Installing collected packages: pytz, tzdata, numpy, Pandas
Successfully installed Pandas-2.2.2 numpy-1.26.4 pytz-2024.1 tzdata-2024.1


In [15]:
import findspark

findspark.init()

from pyspark.sql import SparkSession

scala_version = '2.12'  # your scala version
spark_version = '3.5.0'  # your spark version
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.6.0'  #your kafka version
]
spark = SparkSession.builder.master("local").appName("kafka-example").config("spark.jars.packages",
                                                                             ",".join(packages)).getOrCreate()
spark

**Creating a Kafka Source for Batch Queries**

Create dataframe from Kafka data

In [16]:
topic_name = 'RandomNumber'
kafka_server = 'localhost:9092'
kafkaDf = spark.read.format("kafka").option("kafka.bootstrap.servers", kafka_server).option("subscribe",
                                                                                            topic_name).option(
    "startingOffsets",
    "earliest").load()

Show data (converting dataframe to pandas for cleaner view of data)

In [17]:
kafkaDf.toPandas()

Unnamed: 0,key,value,topic,partition,offset,timestamp,timestampType
0,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,92,2024-05-09 13:00:51.329,0
1,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,93,2024-05-09 13:00:56.332,0
2,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,94,2024-05-09 13:01:01.364,0
3,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,95,2024-05-09 13:01:06.374,0
4,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,96,2024-05-09 13:01:11.466,0
...,...,...,...,...,...,...,...
191,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,283,2024-05-09 13:16:46.819,0
192,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,284,2024-05-09 13:16:51.820,0
193,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,285,2024-05-09 13:16:56.821,0
194,,"[123, 34, 110, 117, 109, 98, 101, 114, 34, 58,...",RandomNumber,0,286,2024-05-09 13:17:01.822,0


Show streaming data using for loop

In [None]:
from pyspark.sql.functions import col

batchDF = kafkaDf.select(col('topic'), col('offset'), col('value').cast('string').substr(12, 1).alias('rand_number'))
from time import sleep
from IPython.display import display, clear_output

for x in range(0, 2000):
    try:
        print("Showing live view refreshed every 5 seconds")
        print(f"Seconds passed: {x * 5}")
        display(batchDF.toPandas())
        sleep(5)
        clear_output(wait=True)
    except KeyboardInterrupt:
        print("break")
    break
print("Live view ended...")

 Perform some data aggregation and show live results

In [None]:
batchCountDF = batchDF.groupBy('rand_number').count()
for x in range(0, 2000):
    try:
        print("Showing live view refreshed every 5 seconds")
        print(f"Seconds passed: {x * 5}")
        display(batchCountDF.toPandas())
        sleep(100)
        clear_output(wait=True)
    except KeyboardInterrupt:
        print("break")
    break
print("Live view ended...")

Creating a Kafka Source for Streaming Queries

Create Streaming dataframe from Kafka

In [19]:
streamRawDf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafka_server).option("subscribe",topic_name).load()
streamDF = streamRawDf.select(col('topic'), col('offset'),col('value').cast('string').substr(12, 1).alias('rand_number'))
checkEvenDF = streamDF.withColumn('Is_Even', col('rand_number').cast('int') % 2 == 0)

AnalysisException: Queries with streaming sources must be executed with writeStream.start();
kafka

Write stream

In [20]:
from random import randint

randNum = str(randint(0, 10000))
q1name = "queryNumber" + randNum
q2name = "queryCheckEven" + randNum
stream_writer1 = (
    streamDF.writeStream.queryName(q1name).trigger(processingTime="5 seconds").outputMode("append").format("memory"))
stream_writer2 = (
    checkEvenDF.writeStream.queryName(q2name).trigger(processingTime="5 seconds").outputMode("append").format("memory"))
query1 = stream_writer1.start()
query2 = stream_writer2.start()

View streaming result

In [22]:
for x in range(0, 2000):
    try:
        print("showing live view refreshed every 5 seconds")
        print(f"Seconds passed : {x * 5}")
        result1 = spark.sql(f"SELECT * from {query1.name}")
        result2 = spark.sql(f"SELECT * from {query2.name}")
        display(result1.toPandas())
        display(result2.toPandas())
        sleep(5)
        clear_output(wait=True)

    except KeyboardInterrupt:
        print("Caught Keyboard Interrupt")
        break
print("Closing")

showing live view refreshed every 5 seconds
Seconds passed : 95


Unnamed: 0,topic,offset,rand_number


Unnamed: 0,topic,offset,rand_number,Is_Even


Caught Keyboard Interrupt
Closing
