# Streaming Application using Spark Structured Streaming

In [1]:
# Initialize OS environment for Kakfa and Pyspark integration
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 pyspark-shell'

In [2]:
# Libraries
import sys
import time
import json
from pymongo import MongoClient
from pyspark.sql import SparkSession
import geohash
import datetime as dt
import math
from pprint import pprint

In [3]:
# Initialize MongoDB database
client = MongoClient()
db = client.fit3182_assignment_db
collection = db.data

In [4]:
# Initialize topic name for Kafka broker
topic_name = "FIT3182_ASSIGNMENT_2"

In [5]:
# Initialize Spark session
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('[Assignment] Spark Streaming from Kafka into MongoDB')
    .getOrCreate()
)

22/05/25 18:03:00 WARN Utils: Your hostname, han-ThinkPad-E585 resolves to a loopback address: 127.0.1.1; using 192.168.1.2 instead (on interface wlp4s0)
22/05/25 18:03:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/han/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/han/.ivy2/cache
The jars for the packages stored in: /home/han/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1d89355c-b8d4-44e3-a5d8-168c2dbbf914;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.1 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	foun

In [6]:
# Initialize DataStreamReader
topic_stream_df = (
    spark.readStream.format('kafka') # specify source
    .option('kafka.bootstrap.servers', 'localhost:9092')
    .option('subscribe', topic_name)
    .load()
)

In [7]:
# Specifiy read format 
output_stream_df = topic_stream_df.selectExpr("CAST(key as STRING)","CAST(value as STRING)")

In [8]:
# Function to process batches of data
def process_data(df,batch_id):
    df = df.collect()
    lst = [[],[],[]] # Containers to separate data sources
    
    # Allocate data to respective container
    for row in df:
        key = int(row["key"]) - 1
        value = eval(row["value"])
        lst[key].append(value)
        
    if lst[0]: # Process data only when climate data is received
        cl_data = lst[0][0] # 1 batch should only have 1 climate data
        temp_date = dt.datetime.strptime(cl_data['date_of_creation'],"%d/%m/%y")
        cl_date = dt.datetime.strftime(temp_date,"%Y-%m-%d") # Transform date to match Part A
        
        # Initialize climate geohash
        cl_geo = geohash.encode(cl_data['latitude'], cl_data['longitude'], precision = 3)        
        
        # Set station value
        cl_data['station'] = cl_geo
        
        # Initialize hotspot dictionary
        hotspot_dict = {}
        
        # Compare climate with AQUA hotspot
        aqua = lst[1]
        for data in aqua:
            geo_val = geohash.encode(data['latitude'],data['longitude'],precision = 5)
                    
            # Hotspot is close to climate
            if cl_geo == geo_val[:3]: 
                try:
                    hotspot_dict[geo_val].append(data) # Add hotspot to respective group
                except KeyError: # List does not exist
                    hotspot_dict[geo_val] = [data] 
                
        # Compare climate with TERRA hotspot
        terra = lst[2]
        for data in terra:
            geo_val = geohash.encode(data['latitude'],data['longitude'],precision = 5)
            
            # Hotspot is close to climate
            if cl_geo == geo_val[:3]: 
                try:
                    hotspot_dict[geo_val].append(data) # Add hotspot to respective group
                except KeyError: # List does not exist
                    hotspot_dict[geo_val] = [data] 
                
        # Drop unnecessary keys and modify key name to match Part A data model
        cl_data.pop('date_of_sending')
        cl_data.pop('producer')
        cl_data.pop('latitude')
        cl_data.pop('longitude')
        cl_data['date'] = cl_data['date_of_creation']
        cl_data.pop('date_of_creation')
        
        hotspot_list = []
        
        if len(hotspot_dict.keys()): # Proceed if there is fire
            for key in hotspot_dict:
                group_lst = hotspot_dict[key]
                group_length = len(group_lst)
                group_temp = group_lst[0]['surface_temperature_celcius']
                group_conf = group_lst[0]['confidence']
                
                for i in range(1,len(group_lst)):
                    data = group_lst[i]
                    group_temp += data['surface_temperature_celcius']
                    group_conf += data['confidence']
                    
                # Obtain average values
                avg_temp = round(group_temp / group_length,2)
                avg_conf = round(group_conf / group_length,2)
                
                # Create final data to be embedded with climate data
                hotspot = group_lst[0] # Use latitude and longitude from first data
                hotspot['surface_temperature_celcius'] = avg_temp
                hotspot['confidence'] = avg_conf
                
                # Ensure datetime follows the format stated in Part A
                temp_time = dt.datetime.strptime(hotspot['date_of_sending'],"%d/%m/%y %H:%M:%S")
                hotspot['datetime'] = cl_date + 'T' + dt.datetime.strftime(temp_time,"%H:%M:%S")
                
                # Remove unnecessary columns
                hotspot.pop('producer')
                hotspot.pop('date_of_sending')
                
                # Add hotspot to hotspot list
                hotspot_list.append(hotspot)
                
            # Add list of hotspot to climate
            cl_data['hotspot'] = hotspot_list
            
            # Determine the cause of fire event
            if cl_data['GHI_w/m2'] > 180 and cl_data['air_temperature_celcius'] > 20:
                cl_data['cause_of_fire'] = 'natural'
            else:
                cl_data['cause_of_fire'] = 'other'
                
        # Insert into MongoDB collection
        collection.insert_one(cl_data)
        print("Data inserted. " + str(cl_data))                 

In [9]:
# Create data stream writer
db_writer = (
    output_stream_df
    .writeStream 
    .outputMode('append')
    .foreachBatch(process_data)
    .trigger(processingTime="10 seconds") # Process in batches of 10 seconds
)

In [None]:
# Run streaming
try:
    query = db_writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopped query')
except StreamingQueryException as exc:
    print(exc)
finally:
    query.stop()