<a href="https://colab.research.google.com/github/somabasavaiah/Data-Engineering-Projects/blob/main/INDUSTRIAL_IoT_DATA_PROCESSING_PIPELINE_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install pyspark > /dev/null

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StringType, IntegerType, FloatType
import pandas as pd
import time

In [4]:
spark = SparkSession.builder \
    .appName("Industrial_IoT_Pipeline") \
    .getOrCreate()

print("âœ” Spark Running")

âœ” Spark Running


In [5]:
iot_stream_data = [
    {"device_id": "M1", "temperature": 72.5, "vibration": 0.32, "timestamp": "2025-01-10 10:00:00"},
    {"device_id": "M2", "temperature": 80.1, "vibration": 0.55, "timestamp": "2025-01-10 10:00:05"},
    {"device_id": "M1", "temperature": 90.2, "vibration": 0.90, "timestamp": "2025-01-10 10:00:10"},
    {"device_id": "M3", "temperature": 65.0, "vibration": 0.20, "timestamp": "2025-01-10 10:00:15"},
]


In [6]:
pdf = pd.DataFrame(iot_stream_data)
stream_df = spark.createDataFrame(pdf)

print("RAW STREAMING DATA:")
stream_df.show()

RAW STREAMING DATA:
+---------+-----------+---------+-------------------+
|device_id|temperature|vibration|          timestamp|
+---------+-----------+---------+-------------------+
|       M1|       72.5|     0.32|2025-01-10 10:00:00|
|       M2|       80.1|     0.55|2025-01-10 10:00:05|
|       M1|       90.2|      0.9|2025-01-10 10:00:10|
|       M3|       65.0|      0.2|2025-01-10 10:00:15|
+---------+-----------+---------+-------------------+



In [7]:
print("âœ” Cleaning Data...")

clean_df = stream_df \
    .withColumn("temperature", col("temperature").cast(FloatType())) \
    .withColumn("vibration", col("vibration").cast(FloatType())) \
    .withColumn("timestamp", col("timestamp").cast(StringType()))

clean_df.show()

âœ” Cleaning Data...
+---------+-----------+---------+-------------------+
|device_id|temperature|vibration|          timestamp|
+---------+-----------+---------+-------------------+
|       M1|       72.5|     0.32|2025-01-10 10:00:00|
|       M2|       80.1|     0.55|2025-01-10 10:00:05|
|       M1|       90.2|      0.9|2025-01-10 10:00:10|
|       M3|       65.0|      0.2|2025-01-10 10:00:15|
+---------+-----------+---------+-------------------+



In [8]:
clean_df.createOrReplaceTempView("iot_data")

analytics_df = spark.sql("""
    SELECT
        device_id,
        AVG(temperature) AS avg_temp,
        AVG(vibration) AS avg_vibration
    FROM iot_data
    GROUP BY device_id
""")

print("ANALYTICS RESULT:")
analytics_df.show()

ANALYTICS RESULT:
+---------+----------------+-------------------+
|device_id|        avg_temp|      avg_vibration|
+---------+----------------+-------------------+
|       M1|81.3499984741211| 0.6099999845027924|
|       M2|80.0999984741211|  0.550000011920929|
|       M3|            65.0|0.20000000298023224|
+---------+----------------+-------------------+



In [9]:
output_path = "/content/iot_curated_data.parquet"
analytics_df.write.mode("overwrite").parquet(output_path)

print("âœ” Saved curated IoT data to:", output_path)

âœ” Saved curated IoT data to: /content/iot_curated_data.parquet


In [10]:
batch_data = [
    ("M1", 70.2, 0.3, "2024-12-18 10:00:00"),
    ("M2", 88.0, 0.6, "2024-12-18 10:05:00"),
    ("M3", 75.0, 0.4, "2024-12-18 10:10:00"),
]

batch_schema = StructType() \
    .add("device_id", StringType()) \
    .add("temperature", FloatType()) \
    .add("vibration", FloatType()) \
    .add("timestamp", StringType())

batch_df = spark.createDataFrame(batch_data, batch_schema)

print("BATCH DATA:")
batch_df.show()

BATCH DATA:
+---------+-----------+---------+-------------------+
|device_id|temperature|vibration|          timestamp|
+---------+-----------+---------+-------------------+
|       M1|       70.2|      0.3|2024-12-18 10:00:00|
|       M2|       88.0|      0.6|2024-12-18 10:05:00|
|       M3|       75.0|      0.4|2024-12-18 10:10:00|
+---------+-----------+---------+-------------------+



In [11]:
combined_df = clean_df.union(batch_df)

print("COMBINED PIPELINE DATA:")
combined_df.show()



COMBINED PIPELINE DATA:
+---------+-----------+---------+-------------------+
|device_id|temperature|vibration|          timestamp|
+---------+-----------+---------+-------------------+
|       M1|       72.5|     0.32|2025-01-10 10:00:00|
|       M2|       80.1|     0.55|2025-01-10 10:00:05|
|       M1|       90.2|      0.9|2025-01-10 10:00:10|
|       M3|       65.0|      0.2|2025-01-10 10:00:15|
|       M1|       70.2|      0.3|2024-12-18 10:00:00|
|       M2|       88.0|      0.6|2024-12-18 10:05:00|
|       M3|       75.0|      0.4|2024-12-18 10:10:00|
+---------+-----------+---------+-------------------+



In [12]:
combined_df.createOrReplaceTempView("iot_all")

final_df = spark.sql("""
    SELECT
        device_id,
        MAX(temperature) AS max_temp,
        MAX(vibration) AS max_vibration
    FROM iot_all
    GROUP BY device_id
""")

print("FINAL ANALYTICS:")
final_df.show()

print("\nðŸŽ‰ Pipeline Executed Successfully!")

FINAL ANALYTICS:
+---------+--------+-------------+
|device_id|max_temp|max_vibration|
+---------+--------+-------------+
|       M1|    90.2|          0.9|
|       M2|    88.0|          0.6|
|       M3|    75.0|          0.4|
+---------+--------+-------------+


ðŸŽ‰ Pipeline Executed Successfully!
