In [None]:
import json
from kafka import KafkaConsumer
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
from pyspark.sql.functions import from_json, col, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import to_timestamp, from_unixtime, unix_timestamp
from pymongo import MongoClient

In [None]:
conf = SparkConf().setAppName("TemperatureProcessing")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)


In [None]:
kafkaParams = {
    "bootstrap_servers": "ec2-65-0-72-75.ap-south-1.compute.amazonaws.com:9092"}
topic = "IOTTemperatureStream01"


In [None]:
consumer = KafkaConsumer(topic, **kafkaParams)


In [None]:
df = spark.createDataFrame([], schema=StructType(
    [StructField("plant_name", StringType(), True),
        StructField("lane_number", StringType(), True),
        StructField("timestamp", TimestampType(), True),
        StructField("temperature", IntegerType(), True),
        StructField("component_type", StringType(), True),
        StructField("component_manufacturer", StringType(), True),
     ])
)


In [None]:
def filterData(dataframe):
    last_10_minutes = datetime.now() - timedelta(minutes=10)
    last_30_minutes= datetime.now() - timedelta(minutes=30)
    filtered_max_temp_data = dataframe.filter(dataframe.timestamp < last_30_minutes)
    filtered_data = dataframe.filter(dataframe.timestamp > last_10_minutes)
    filtered_data = filtered_data.filter(col("temperature") > 50)
    component_counts = filtered_data.groupBy("component_type").count()
    max_temp_per_lane = filtered_max_temp_data.groupBy("lane_number", "component_type").agg(
        {"temperature": "max"}).withColumnRenamed("max(temperature)", "Max Temp")
    component_counts.show()
    max_temp_per_lane.show()

In [None]:
def process(rdd):
    print(rdd)
    data = spark.read.json(rdd)
    data = data.filter(data["component_info"].isNotNull())
    data = data.filter(data["timestamp"].isNotNull())
    if(data.count() > 0):
        data = data.withColumn("timestamp", when(col("timestamp").cast("double").isNotNull(
        ), col("timestamp").cast("double").cast("timestamp")).otherwise(col("timestamp")))
        data = data.withColumn("component_manufacturer",
                               data["component_info"]["component_manufacturer"])
        data = data.withColumn(
            "component_type", data["component_info"]["component_type"])
        data = data.drop("component_info")
        df=df.union(data)
    else:
        print("No data")
    df.show()
    filterData(df)


In [None]:
while True:
    messages = consumer.poll(1000)
    for tp, message in messages.items():
        for record in message:
            data = json.loads(record.value)
            print(data)
            if "component_info" in data and data["component_info"] and "component_type" in data["component_info"] and data["component_info"]["component_type"] is not None:
                rdd = sc.parallelize([record.value.decode('utf-8')])
                process(rdd)
