In [10]:
from pyspark.sql import SparkSession
from processor import get_spark_session

spark:SparkSession = get_spark_session()

In [11]:
kafka_df = (
    spark.read.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "iot-sensor-data")
    .option("startingOffsets", "earliest")
    .option("failOnDataLoss",False)
    .load()
)

In [12]:
from pyspark.sql.types import StringType

iot_data_df = kafka_df.withColumn("value", kafka_df.value.cast(StringType())).select(
    "value"
)

In [13]:
iot_data_df.show()

+--------------------+
|               value|
+--------------------+
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
|{"device_id": "se...|
+--------------------+
only showing top 20 rows



In [14]:
from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    TimestampType,
    DecimalType,
)

json_schema = StructType(
    [
        StructField("device_id", StringType()),
        StructField("temperature", DecimalType()),
        StructField("humidity", IntegerType()),
        StructField("pressure", DecimalType()),
        StructField("wind_speed", DecimalType()),
        StructField("rainfall", DecimalType()),
        StructField(
            "location",
            StructType(
                [
                    StructField("latitude", DecimalType()),
                    StructField("longitude", DecimalType()),
                ]
            ),
        ),
        StructField("device_status", StringType()),
        StructField("timestamp",TimestampType()),
    ]
)

In [15]:
from pyspark.sql.functions import from_json,col

iot_data_json_df = iot_data_df.withColumn('value',from_json('value',json_schema))

In [16]:
iot_data_json_df.show()

+--------------------+
|               value|
+--------------------+
|{sensor_54, 22, 1...|
|{sensor_49, 14, 5...|
|{sensor_48, 11, 3...|
|{sensor_8, 39, 28...|
|{sensor_33, 9, 58...|
|{sensor_97, 46, 3...|
|{sensor_91, 40, 6...|
|{sensor_79, 5, 27...|
|{sensor_61, 25, 8...|
|{sensor_5, 49, 89...|
|{sensor_7, 34, 19...|
|{sensor_2, 48, 20...|
|{sensor_61, 46, 2...|
|{sensor_55, 2, 83...|
|{sensor_63, 27, 7...|
|{sensor_83, 21, 4...|
|{sensor_81, 42, 1...|
|{sensor_22, 17, 5...|
|{sensor_32, 35, 1...|
|{sensor_42, 30, 8...|
+--------------------+
only showing top 20 rows



In [18]:
iot_data_parsed = iot_data_json_df.select(col("value.*"))

In [20]:
iot_data_parsed.show()

+---------+-----------+--------+--------+----------+--------+----------+-------------+-------------------+
|device_id|temperature|humidity|pressure|wind_speed|rainfall|  location|device_status|          timestamp|
+---------+-----------+--------+--------+----------+--------+----------+-------------+-------------------+
|sensor_54|         22|      16|     991|        28|     127|  {3, 159}|     inactive|2024-09-17 07:29:39|
|sensor_49|         14|      58|    1026|        14|     199| {-44, 54}|  maintenance|2024-07-14 11:10:45|
|sensor_48|         11|      36|     987|         4|      50| {70, 179}|  maintenance|2024-07-04 13:56:21|
| sensor_8|         39|      28|     972|        13|     110|{-57, -26}|     inactive|2024-12-17 13:00:50|
|sensor_33|          9|      58|     957|        28|      98| {83, 112}|     inactive|2024-02-25 08:40:34|
|sensor_97|         46|      35|    1039|        26|     159|   {17, 0}|  maintenance|2024-01-04 17:23:19|
|sensor_91|         40|      65|    1

In [22]:
iot_data_parsed.withColumn("longitude",col("location.longitude")).withColumn("latitude",col('location.latitude')).drop("location").show()

+---------+-----------+--------+--------+----------+--------+-------------+-------------------+---------+--------+
|device_id|temperature|humidity|pressure|wind_speed|rainfall|device_status|          timestamp|longitude|latitude|
+---------+-----------+--------+--------+----------+--------+-------------+-------------------+---------+--------+
|sensor_54|         22|      16|     991|        28|     127|     inactive|2024-09-17 07:29:39|      159|       3|
|sensor_49|         14|      58|    1026|        14|     199|  maintenance|2024-07-14 11:10:45|       54|     -44|
|sensor_48|         11|      36|     987|         4|      50|  maintenance|2024-07-04 13:56:21|      179|      70|
| sensor_8|         39|      28|     972|        13|     110|     inactive|2024-12-17 13:00:50|      -26|     -57|
|sensor_33|          9|      58|     957|        28|      98|     inactive|2024-02-25 08:40:34|      112|      83|
|sensor_97|         46|      35|    1039|        26|     159|  maintenance|2024-