## **Task 1.4: Streaming Application**

##### **Setup Spark Structured Streaming**

In [1]:
from pyspark.sql.types import  StructType, StructField, DoubleType, StringType
from pyspark.sql.functions import from_json, col
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, element_at, when

#topic to subscribe
subscribe_topic = "climate_hotspot"

ip_address = "10.192.68.151"

#create spark structured streaming session
spark_session = (
    SparkSession.builder
    .master("local[*]")
    .appName("Analyse hotspot reports and match climate data")
    .getOrCreate()
)


#schema for deserialising stream data
deserialize_schema = StructType([
    StructField("producer_identifier", StringType()),
    StructField("GHI_w/m2", DoubleType()),
    StructField("precipitation ", StringType()),
    StructField("max_wind_speed", DoubleType()),
    StructField("windspeed_knots", DoubleType()),
    StructField("relative_humidity", DoubleType()),
    StructField("air_temperature_celcius", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("latitude", DoubleType()),
    StructField("date", StringType()),
    StructField("confidence", DoubleType()),
    StructField("surface_temperature_celcius", DoubleType()),
    StructField("created_time", StringType())
])

#create stream of dataframes from kafka topics
df_stream_kafka = (
    spark_session.readStream
        .format('kafka')
        .option('kafka.bootstrap.servers', f'{ip_address}:9092')
        .option('subscribe', subscribe_topic)
        .load()
)

#select only deserialised json data including only fields from climate/hotspot data
decoded_stream_kafka = (
    df_stream_kafka
        .select(from_json(col("value").cast("string"), deserialize_schema).alias("stream_data")) 
        .select("stream_data.*")
)

decoded_stream_kafka.printSchema()

root
 |-- producer_identifier: string (nullable = true)
 |-- GHI_w/m2: double (nullable = true)
 |-- precipitation : string (nullable = true)
 |-- max_wind_speed: double (nullable = true)
 |-- windspeed_knots: double (nullable = true)
 |-- relative_humidity: double (nullable = true)
 |-- air_temperature_celcius: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- date: string (nullable = true)
 |-- confidence: double (nullable = true)
 |-- surface_temperature_celcius: double (nullable = true)
 |-- created_time: string (nullable = true)



**Install geohashing library**

In [None]:
!pip install python-geohash  

##### **Process streaming data and send to mongodb**

In [2]:
from pymongo import MongoClient
import geohash
from datetime import datetime, timedelta
from pprint import pprint

def merge_hotspots(hostspot_list):
    final_processed_hotspots = []
    geohashing_precision = 5
    processed_hotspot_ids = []  #store list indexes of hotspots already processed

    #match hotspot against every other hotspot in the list
    for i in range(len(hostspot_list)):
        hotspot_1 = hostspot_list[i]
        hotspot_1_hash = geohash.encode(hotspot_1["latitude"], hotspot_1["longitude"], geohashing_precision)
        hotspot_1_time = datetime.strptime(hotspot_1["created_time"], "%H:%M:%S")
        matched_hotspot_ids = []   #store list indexes of hotspots to be merged

        #skip hotspot if it has already been merged with another set
        if i in processed_hotspot_ids:
            continue 

        #iterate from i to end to skip hotspots that have already been processed
        for j in range(i+1, len(hostspot_list)):

            #skip hotspot if it has already been merged with another set
            if j in processed_hotspot_ids:
                continue 

            hotspot_2 = hostspot_list[j]
            hotspot_2_hash = geohash.encode(hotspot_2["latitude"], hotspot_2["longitude"], geohashing_precision)

            #check proximity
            if hotspot_1_hash == hotspot_2_hash:
                hotspot_2_time = datetime.strptime(hotspot_2["created_time"], "%H:%M:%S")
                time_diff = abs(hotspot_1_time - hotspot_2_time)

                #check if created time <= 10 mins
                if time_diff <= timedelta(minutes=10):
                    matched_hotspot_ids.append(j)   #store list indexes since it is to be merged
                    processed_hotspot_ids.append(j)

        
        matched_hotspot_ids.append(i)

        #merge hotspot data
        if len(matched_hotspot_ids) > 1:
            total_surface_temp = 0
            total_confidence = 0

            #calculate average surface temp and confidence for all hotspots to be merged
            for id in matched_hotspot_ids:
                total_surface_temp += hostspot_list[id]["surface_temperature_celcius"]
                total_confidence += hostspot_list[id]["confidence"]

            average_surface_temp = total_surface_temp/len(matched_hotspot_ids)
            average_confidence = total_confidence/len(matched_hotspot_ids)

            #merge hotspots by setting average temp and confidence and using remaining attributes of the first hotspot
            hostspot_list[i]["surface_temperature_celcius"] = average_surface_temp
            hostspot_list[i]["confidence"] = average_confidence

        #finally add ith hotspot to processed list
        if i not in processed_hotspot_ids:
            final_processed_hotspots.append(hostspot_list[i])
            processed_hotspot_ids.append(i)


    return final_processed_hotspots


def process_climate_hotspot(data):
    climate_data = None
    hotspots = []
                            
   #! Get climate report and hotspot report from batch
    for document in data:

        #get the last occurence of climate report in a batch and discard the rest
        if document["producer_identifier"] == "producer_1":
            climate_data = document

        #check if there are any hotspot reports in the current batch
        if (document["producer_identifier"] == "producer_2") or (document["producer_identifier"] == "producer_3"):
            hotspots.append(document)

    #return straight if no fire occurences or no climate report
    if (len(hotspots) == 0) or (climate_data == None):
        return climate_data
    
    #add embedded hotspots otherwise
    elif (len(hotspots) > 0) and (climate_data != None):
        climate_data["hotspots"] = []

    # Note: - If 'hotspots' field does not exist in climate_data, it implies no fire occurences for the day
    #       - But if 'hotspots' field exists in climate_data and is an empty list, it implies fires did occur on 
    #         that day but no fires were in proximity
    #       - Finally, if the 'hotspots' field is not an empty list, it implies fire occurences within proximity
    
    
    #! Merge reports for same hotspots
    merged_hotspots = merge_hotspots(hotspots)

    #! Climate-to-Hotspot matching
    geohashing_precision = 3
    
    #calculate proximity of climate report and hotspot report and process 
    for hotspot_data in merged_hotspots:
        hotspot_geohash = geohash.encode(hotspot_data["latitude"], hotspot_data["longitude"], geohashing_precision)
        climate_geohash = geohash.encode(climate_data["latitude"], climate_data["longitude"], geohashing_precision)

        #embed hotspot document if in proximity to climate report
        if climate_geohash == hotspot_geohash:
            filtered_hotspot_data = hotspot_data.copy()
            for field in hotspot_data.keys():
                if hotspot_data[field] == None:
                    filtered_hotspot_data.pop(field) #remove null fields added from climate fields in schema 

            climate_data["hotspots"].append(filtered_hotspot_data)

    #! Define cause of fire
    if (len(climate_data["hotspots"]) > 0):
        if (climate_data["air_temperature_celcius"] > 20) and (climate_data["GHI_w/m2"] > 180):
            climate_data["cause_of_fire"] = "natural"
        else:
            climate_data["cause_of_fire"] = "other"

    return climate_data


def store_to_mongodb(data_document):
    mongo_client = MongoClient(ip_address, 27017) #setup mongo client
    db = mongo_client["fit3182_assignment_db"]  #create or get database 
    streaming_climate_col = db.climates_streaming #get/create collection for streaming climate data
    streaming_climate_col.insert_one(data_document)  #store processed documents 


def process_stream(batch_df, batch_id):
    batch_df.show()  #uncomment to see batch data

    unprocessed_data = batch_df.collect()  #collect streaming data
    data_row_dicts = [entry.asDict() for entry in unprocessed_data]  #convert rows to dictionaries (documents)

    #process all data as required
    processed_climate_report = process_climate_hotspot(data_row_dicts)

    #remove null fields added from hotspot fields in schema
    if processed_climate_report != None:
        filtered_climate_report = processed_climate_report.copy()
        for field in processed_climate_report.keys():
            if processed_climate_report[field] == None:
                filtered_climate_report.pop(field)  
        
        #store data in mongodb
        store_to_mongodb(filtered_climate_report)
        print("Batch Result: Climate report produced below")
        pprint(filtered_climate_report)  
                
    else:
        print("Batch Result: No climate report in the current batch")
        

#writing stream data to data source (mongodb)
data_writer = (
    decoded_stream_kafka.writeStream
        .outputMode("append")
        .trigger(processingTime="10 seconds")   #process in batches of 10 seconds
        .foreachBatch(process_stream)
)

##### **Begin data writer**

In [3]:
from pyspark.sql.streaming import StreamingQueryException

q_writer = data_writer

try:
    query = q_writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopped query')
except StreamingQueryException as exception:
    print(exception)
finally:
    query.stop()

+-------------------+--------+--------------+--------------+---------------+-----------------+-----------------------+---------+--------+----+----------+---------------------------+------------+
|producer_identifier|GHI_w/m2|precipitation |max_wind_speed|windspeed_knots|relative_humidity|air_temperature_celcius|longitude|latitude|date|confidence|surface_temperature_celcius|created_time|
+-------------------+--------+--------------+--------------+---------------+-----------------+-----------------------+---------+--------+----+----------+---------------------------+------------+
+-------------------+--------+--------------+--------------+---------------+-----------------+-----------------------+---------+--------+----+----------+---------------------------+------------+

Batch Result: No climate report in the current batch
+-------------------+--------+--------------+--------------+---------------+-----------------+-----------------------+---------+--------+----------+----------+-------

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Interrupted by CTRL-C. Stopped query


##### **Check data stored in mongodb collection**

In [84]:
from pprint import pprint
from pymongo import MongoClient

mongo_client = MongoClient(ip_address, 27017) 
db = mongo_client["fit3182_assignment_db"] 
streaming_climate_col = db.climates_streaming

data = streaming_climate_col.find()
for doc in data:
    pprint(doc)

{'GHI_w/m2': 66.0,
 '_id': ObjectId('664f23ff7de308e5b7252811'),
 'air_temperature_celcius': 7.0,
 'date': '2024-01-06',
 'hotspots': [],
 'latitude': -36.3328,
 'longitude': 146.0355,
 'max_wind_speed': 16.9,
 'precipitation ': ' 0.08G',
 'producer_identifier': 'producer_1',
 'relative_humidity': 37.2,
 'windspeed_knots': 6.2}
{'GHI_w/m2': 117.0,
 '_id': ObjectId('664f24087de308e5b7252813'),
 'air_temperature_celcius': 13.0,
 'date': '2024-01-07',
 'latitude': -36.7084,
 'longitude': 142.7354,
 'max_wind_speed': 19.0,
 'precipitation ': ' 0.02G',
 'producer_identifier': 'producer_1',
 'relative_humidity': 44.1,
 'windspeed_knots': 12.9}
{'GHI_w/m2': 121.0,
 '_id': ObjectId('664f24137de308e5b7252815'),
 'air_temperature_celcius': 14.0,
 'cause_of_fire': 'other',
 'date': '2024-01-08',
 'hotspots': [{'confidence': 77.0,
               'created_time': '22:22:31',
               'latitude': -37.6974,
               'longitude': 142.9716,
               'producer_identifier': 'producer_2',

In [85]:
#! uncomment the following line to clear current contents from collection
# streaming_climate_col.delete_many({})