In [1]:
# get data from mqtt and send it to kafka
# pip install paho-mqtt

import paho.mqtt.client as mqtt
import json
from datetime import datetime
from kafka import KafkaProducer

mqtt_client = mqtt.Client()
producer = KafkaProducer(bootstrap_servers=["localhost:9092"], value_serializer=lambda x:json.dumps(x).encode("utf-8"))

def on_connect(client, userdata, flags, rc):
    if(str(rc) == '0'):
        print("mqtt connection successful...")
        mqtt_client.subscribe("sipl/#")
    else:
        print("mqtt connection failed...")

def on_message(client, userdata, msg):
    try:
        payload = msg.payload.decode("utf-8")
        data = json.loads(payload)
        data['topic'] = msg.topic
        data['time'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(".", end="")
        producer.send(topic="iot", value=data)
    except Exception as ex:
        print(ex)

mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect(host="test.mosquitto.org")
mqtt_client.loop_start()

mqtt connection successful...
..........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................

In [2]:
# create spark session
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder\
            .appName("miniproject")\
            .config("spark.sql.shuffle.partitions", "4")\
            .getOrCreate()

24/12/26 12:19:01 WARN Utils: Your hostname, nilesh-pc resolves to a loopback address: 127.0.1.1; using 192.168.1.101 instead (on interface wlp0s20f3)
24/12/26 12:19:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/26 12:19:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# read data from kafka into spark appln
kafka_pub_topic = "iot"
data = spark.readStream\
            .format("kafka")\
            .option("kafka.bootstrap.servers", "localhost:9092")\
            .option("subscribe", kafka_pub_topic)\
            .option("failOnDataLoss", "false")\
            .option("startingOffsets", "latest")\
            .load()

data.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
# process the data
# schema = "topic STRING, time TIMESTAMP"
from pyspark.sql.types import *
schema = StructType([
                StructField('topic', StringType(), True),
                StructField('time', TimestampType(), True),
                StructField('accelerometerSensor', StructType([StructField('x', StringType(), True), StructField('y', StringType(), True), StructField('z', StringType(), True)]), True),
                StructField('compassSensor', StructType([StructField('x', StringType(), True), StructField('y', StringType(), True), StructField('z', StringType(), True)]), True), 
                StructField('gpsSensor', StructType([StructField('alt', StringType(), True), StructField('lat', StringType(), True), StructField('lon', StringType(), True)]), True), 
                StructField('gyroscopeSensor', StructType([StructField('x', StringType(), True), StructField('y', StringType(), True), StructField('z', StringType(), True)]), True), 
                StructField('lightSensor', StructType([StructField('illuminance', StringType(), True)]), True)
        ])

result = data\
                .select(col("value").cast("STRING"))\
                .select(from_json("value", schema=schema).alias("v"))\
                .withColumn("etime", col("v.time"))\
                .withColumn("topic", col("v.topic"))\
                .withColumn("light", col("v.lightSensor.illuminance").cast("DOUBLE"))\
                .drop("v")\
                .withWatermark("etime", "30 seconds")\
                .groupBy(col("topic"), window("etime", windowDuration="20 seconds", slideDuration="5 seconds"))\
                .avg("light")\
                .withColumnRenamed("avg(light)", "avgreading")\
                .select("topic", "window.start", "window.end", "avgreading")

result.printSchema()
                

root
 |-- topic: string (nullable = true)
 |-- start: timestamp (nullable = true)
 |-- end: timestamp (nullable = true)
 |-- avgreading: double (nullable = true)



In [5]:
query1 = result.orderBy(desc("start")).limit(20)\
            .writeStream\
            .trigger(processingTime="10 seconds")\
            .format("memory")\
            .queryName("avgsensor")\
            .outputMode("complete")\
            .start()

24/12/26 12:19:06 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-0b2e065f-9eff-4130-b00f-4be3c1bf2d20. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/12/26 12:19:06 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


24/12/26 12:19:07 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
[Stage 3:>                                                          (0 + 2) / 2]

In [6]:
query2 = result.select(struct("topic", "start", "end", "avgreading").alias("v"))\
            .select(to_json("v").alias("value"))\
            .writeStream\
            .trigger(processingTime="10 seconds")\
            .format("kafka")\
            .option("kafka.bootstrap.servers", "localhost:9092")\
            .option("topic", "avgiot")\
            .option("checkpointLocation", "file:///tmp/iot-ckpt")\
            .outputMode("append")\
            .start()

24/12/26 12:19:13 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


24/12/26 12:19:13 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
24/12/26 12:19:14 WARN HDFSBackedStateStoreProvider: The state for version 168 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
24/12/26 12:19:14 WARN HDFSBackedStateStoreProvider: The state for version 168 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
24/12/26 12:19:14 WARN HDFSBackedStateStoreProvider: The state for version 168 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
24/12/26 12:19:14 WARN HDFSBackedStateStoreProvider: The state for version 168 doesn't exist in loadedMaps. Reading snapshot file and del

In [7]:
%load_ext sparksql_magic

In [8]:
%%sparksql

SELECT * FROM avgsensor;

0,1,2,3
topic,start,end,avgreading
sipl/rohan,2024-12-26 12:19:10,2024-12-26 12:19:30,68.0
sipl/nilesh,2024-12-26 12:19:10,2024-12-26 12:19:30,95.5
sipl/rohan,2024-12-26 12:19:05,2024-12-26 12:19:25,68.25
sipl/nilesh,2024-12-26 12:19:05,2024-12-26 12:19:25,95.5
sipl/rohan,2024-12-26 12:19:00,2024-12-26 12:19:20,68.25
sipl/nilesh,2024-12-26 12:19:00,2024-12-26 12:19:20,95.5
sipl/rohan,2024-12-26 12:18:55,2024-12-26 12:19:15,68.25
sipl/nilesh,2024-12-26 12:18:55,2024-12-26 12:19:15,95.5
sipl/nilesh,2024-12-26 12:18:50,2024-12-26 12:19:10,95.5


                                                                                

In [9]:
# plot spark dataframe
import pyspark.pandas as ps

df = spark.sql("SELECT * FROM avgsensor")\
        .groupBy("topic", "start").pivot("topic").avg("avgreading")\
        .orderBy("start")\
        .drop("topic", "end")

# df.printSchema()

pdf = ps.DataFrame(df).set_index("start")
print(type(pdf))
pdf.plot.line()



<class 'pyspark.pandas.frame.DataFrame'>


                                                                                

In [None]:
# wait for query execution (not needed in notebook)
spark.streams.awaitAnyTermination()

In [None]:
# to stop mqtt client
mqtt_client.loop_stop()

In [21]:
# stop kakfa producer
producer.close()

In [22]:
# to destroy spark session
spark.stop()