In [51]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [52]:
spark = SparkSession.builder.appName("PySparkPractice").getOrCreate()

In [53]:
df = spark.read.option("header", True).option("inferSchema", True).csv("iot_sensor_data.csv")

In [54]:
df = df.withColumn("timestamp", to_timestamp(col("timestamp"))).withColumn("date", date_format("timestamp", "yyyy-MM-dd"))

In [55]:
df.orderBy("sensor_id").show()

+---------+------------+-------------------+-----------+--------+-------------+------+----------+
|sensor_id|warehouse_id|          timestamp|temperature|humidity|battery_level|status|      date|
+---------+------------+-------------------+-----------+--------+-------------+------+----------+
|  S_W10_1|         W10|2025-08-25 00:35:00|       25.4|    60.1|         60.5|    OK|2025-08-25|
|  S_W10_1|         W10|2025-08-25 00:20:00|       26.0|    59.8|         82.5|    OK|2025-08-25|
|  S_W10_1|         W10|2025-08-25 02:55:00|       22.7|    73.9|         56.3|    OK|2025-08-25|
|  S_W10_1|         W10|2025-08-25 01:25:00|       26.4|    62.8|         83.7|    OK|2025-08-25|
|  S_W10_1|         W10|2025-08-25 03:40:00|       20.2|    60.4|         63.9|    OK|2025-08-25|
|  S_W10_1|         W10|2025-08-25 03:45:00|       28.4|    53.0|         73.8|    OK|2025-08-25|
|  S_W10_1|         W10|2025-08-25 04:30:00|       25.3|    55.0|         84.9|    OK|2025-08-25|
|  S_W10_1|         

In [56]:
w = Window.partitionBy("sensor_id", "warehouse_id").orderBy("timestamp")
df = df.withColumn("gap", (col("timestamp").cast("long") - lag("timestamp", 1).over(w).cast("long"))/60)

In [57]:
df.show()

+---------+------------+-------------------+-----------+--------+-------------+------+----------+----+
|sensor_id|warehouse_id|          timestamp|temperature|humidity|battery_level|status|      date| gap|
+---------+------------+-------------------+-----------+--------+-------------+------+----------+----+
|  S_W10_1|         W10|2025-08-25 00:20:00|       26.0|    59.8|         82.5|    OK|2025-08-25|NULL|
|  S_W10_1|         W10|2025-08-25 00:35:00|       25.4|    60.1|         60.5|    OK|2025-08-25|15.0|
|  S_W10_1|         W10|2025-08-25 00:40:00|       24.5|    62.5|         92.1|    OK|2025-08-25| 5.0|
|  S_W10_1|         W10|2025-08-25 00:50:00|       29.0|    60.9|         76.0|    OK|2025-08-25|10.0|
|  S_W10_1|         W10|2025-08-25 01:10:00|       28.2|    56.8|         84.0|    OK|2025-08-25|20.0|
|  S_W10_1|         W10|2025-08-25 01:25:00|       26.4|    62.8|         83.7|    OK|2025-08-25|15.0|
|  S_W10_1|         W10|2025-08-25 01:45:00|       24.7|    54.3|        

In [58]:
daily_metrics = df.groupBy("date", "warehouse_id").agg(
    avg(col("temperature")).alias("avg_temp"),
    min(col("temperature")).alias("min_temp"),
    max(col("temperature")).alias("max_temp"),
    avg(col("humidity")).alias("avg_humidity"),
    min(col("humidity")).alias("min_humidity"),
    max(col("humidity")).alias("max_humidity"),
    (sum(when(col("status") == "MALFUNCTION", 1).otherwise(0)) / count("*") * 100).alias("malfunction_pct"),
    avg(col("battery_level")).alias("avg_battery")
)

daily_metrics.show(truncate=False)

+----------+------------+------------------+--------+--------+------------------+------------+------------+------------------+-----------------+
|date      |warehouse_id|avg_temp          |min_temp|max_temp|avg_humidity      |min_humidity|max_humidity|malfunction_pct   |avg_battery      |
+----------+------------+------------------+--------+--------+------------------+------------+------------+------------------+-----------------+
|2025-08-25|W50         |24.131521739130438|-100.0  |100.0   |58.67826086956522 |45.7        |71.8        |2.1739130434782608|76.82391304347826|
|2025-08-25|W38         |25.618947368421065|18.3    |100.0   |59.26947368421053 |0.0         |95.0        |1.0526315789473684|73.38526315789474|
|2025-08-25|W31         |23.194845360824754|-100.0  |100.0   |59.01134020618555 |0.0         |70.3        |3.0927835051546393|73.25051546391751|
|2025-08-25|W26         |23.678125000000005|-100.0  |33.5    |61.169791666666676|48.1        |95.0        |1.0416666666666665|76.5

In [None]:
gaps_df = df.filter((col("gap").isNotNull()) & (col("gap") > 15.1)).groupBy("warehouse_id", "date").count().alias("missing_readings_count")
gaps_df.show(truncate=False)

In [None]:
final_daily_metrics = daily_metrics.join(gaps_df,on=["warehouse_id", "date"],how="left").na.fill(0, subset=["count"])
final_daily_metrics = final_daily_metrics.withColumnRenamed("count", "missing_readings")
final_daily_metrics.show(truncate=False)

+------------+----------+------------------+--------+--------+------------------+------------+------------+------------------+-----------------+----------------+
|warehouse_id|date      |avg_temp          |min_temp|max_temp|avg_humidity      |min_humidity|max_humidity|malfunction_pct   |avg_battery      |missing_readings|
+------------+----------+------------------+--------+--------+------------------+------------+------------+------------------+-----------------+----------------+
|W50         |2025-08-25|24.131521739130438|-100.0  |100.0   |58.67826086956522 |45.7        |71.8        |2.1739130434782608|76.82391304347826|23              |
|W38         |2025-08-25|25.618947368421065|18.3    |100.0   |59.26947368421053 |0.0         |95.0        |1.0526315789473684|73.38526315789474|23              |
|W31         |2025-08-25|23.194845360824754|-100.0  |100.0   |59.01134020618555 |0.0         |70.3        |3.0927835051546393|73.25051546391751|23              |
|W26         |2025-08-25|23.

In [None]:
outliers_df = df.filter(
    (col("temperature") < -20) | (col("temperature") > 60) | (col("humidity") > 90)
).withColumn("outlier_type",when(col("temperature") < -20, "Temp Low").when(col("temperature") > 60, "Temp High").when(col("humidity") > 90, "Humidity High").otherwise("Other")
).select("sensor_id","warehouse_id","date","outlier_type",when(col("outlier_type").contains("Temp"), col("temperature")).when(col("outlier_type").contains("Humidity"), col("humidity")).alias("value")
).orderBy("date", "warehouse_id", "sensor_id")

In [None]:
outliers_df.show()

+---------+------------+----------+-------------+------+
|sensor_id|warehouse_id|      date| outlier_type| value|
+---------+------------+----------+-------------+------+
|   S_W1_3|          W1|2025-08-25|    Temp High| 100.0|
|   S_W1_3|          W1|2025-08-25|     Temp Low|-100.0|
|   S_W1_4|          W1|2025-08-25|Humidity High|  95.0|
|   S_W1_5|          W1|2025-08-25|     Temp Low|-100.0|
|  S_W10_2|         W10|2025-08-25|     Temp Low|-100.0|
|  S_W10_5|         W10|2025-08-25|Humidity High|  95.0|
|  S_W11_1|         W11|2025-08-25|Humidity High|  95.0|
|  S_W11_2|         W11|2025-08-25|     Temp Low|-100.0|
|  S_W11_3|         W11|2025-08-25|    Temp High| 100.0|
|  S_W11_5|         W11|2025-08-25|     Temp Low|-100.0|
|  S_W12_1|         W12|2025-08-25|Humidity High|  95.0|
|  S_W12_1|         W12|2025-08-25|    Temp High| 100.0|
|  S_W12_1|         W12|2025-08-25|     Temp Low|-100.0|
|  S_W12_2|         W12|2025-08-25|Humidity High|  95.0|
|  S_W12_2|         W12|2025-08