## FIT3182: Assignment 2 Part B (Streaming Application) 

### Name: Ashley Ooi Yan-Lin (ID: 31171095)

### Task 1: Processing Data Stream

### (d) Write a streaming application using the Apache Spark Structured Streaming API which processes data in batches of 10 seconds.

Firstly, we need to import the required statements and install pygeohash.

In [1]:
# import statements 
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, element_at, when
import pygeohash as pgh
import json
from pprint import pprint
from datetime import datetime
!pip install pygeohash # install pygeohash

We also need to establish a connection to MongoClient and access the collection in our database. Furthermore, setting the topic name is necessary to retrieve data from Kafka.

In [2]:
# Set our topic name (a global variable to store topic name)
topic_name = 'PartB'

# Connect to MongoClient and access our collection
client = MongoClient () 
db = client.fit3182_assignment_db
collection = db.partB
collection.delete_many({}) # Clear all previously added documemnts into this collection

In [None]:
spark = (
    SparkSession.builder
    # import and use all of the available local processor
    .master('local[*]')
    .appName('Spark Application')
    .getOrCreate()
)

topic_stream_df = (
    spark.readStream.format('kafka')
    # this is set the same environment as the producers
    .option('kafka.bootstrap.servers', 'localhost:9092')
    .option('subscribe', topic_name)
    .load()
)

output_stream_df = (
    topic_stream_df
    .select(                                      
        topic_stream_df.value
        .alias('data')
    )
)

In [5]:
# We can print the schema for this dataframe to see what columns we have to work with.
topic_stream_df.printSchema()
# Note that our only focus is the key and value
# The reading of the data are from the value 
# As the key and value are currently in binary which is not readable, I will have to convert the value to a 
# string instead of a binary data type

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [None]:
output_stream_df.printSchema()

Now, we have to outline the processing steps for the received batch of data to ensure the insertion of the document in the appropriate format.

In [None]:
def process_batch(batch_df,batch_id):
    raw_data = batch_df.collect()
    streams = [json.loads(item.asDict()['data']) for item in raw_data] # For each item, upon accessing its data, we need to convert the data from binary to json 
    hashMap = {}
    climateData = None
    
    # Group streams based on location (precision 3)
    for stream in streams:
        # Find our stream that is our climate data (from climate_streaming producer)
        if stream['producer'] == 'climate_streaming':
            climateData = stream
        
        # For each stream, we would hash the stream into the hashMap based on its location
        location = pgh.encode(stream['latitude'],stream['longitude'],precision=3)
        if location in hashMap:
            hashMap[location].append(stream)
        else:
            hashMap[location] = [stream]
    
    # If we found a climate data in the streams
    if climateData is not None:
        # Convert date to datetime object
        climateData['date'] = datetime.strptime(climateData['created_date'],'%d/%m/%Y') 
        del climateData['created_date'] # Rename column to match part A, remove previous column
        
        # Get the location of the climate data
        climateLocation = pgh.encode(climateData['latitude'],climateData['longitude'],precision=3)
        
        # If we have one or more hotspot data in that climate location
        if len(hashMap[climateLocation]) > 1:
            hotspots = {}
            
            # Group hotspot according to location (precision 5)
            for stream in hashMap[climateLocation]:
                producer = stream['producer']
                
                # If the stream's producer is not climate_streaming, means it is a hotspot data
                if producer != 'climate_streaming':
                    # Hash the hotspots based on hotspot's precision 5 location
                    location = pgh.encode(stream['latitude'],stream['longitude'],precision=5) 
                    if location in hotspots:
                        hotspots[location].append(stream)
                    else:
                        hotspots[location] = [stream]
                        
            # Calculate fireCause according to climate data
            if climateData['air_temperature_celcius']> 20 and climateData['GHI_w/m2']>180:
                fireCause = 'natural'
            else:
                fireCause = 'other'
            
            # Keep track of the documents needed to be added to climate data's hotspots
            hotspotDocs = []
            
            # Loop through each precision 5 location in our hotspots dictionary
            for location in hotspots.keys():
                
                hotspotDoc = None # Keep track of the document we would use to insert if we have one or more hotspots in a location
                totalSurfaceTemp = 0 # Keep track of the total surface temperature for all the hotspots in a location. Would be useful for calculating average surface temperature for more than two hotspots in the same location later
                totalConfidence = 0 # Keep track of the total confidence for all the hotspots in a location. Would be useful for calculating average confidence for more than two hotspots in the same location later
                numberOfData = 0 # Keep track of the number of hotspots in one location. Would be useful for calculating average confidence and surface temperature for more than two hotspots in the same location later
                
                # Loop through each hotspot(fireOccurence) in each location
                for fire in hotspots[location]:
                    # Update the variables we have set earlier
                    hotspotDoc = fire 
                    totalSurfaceTemp += fire['surface_temperature_celcius']
                    totalConfidence += fire['confidence']
                    numberOfData += 1
                
                avgSurfaceTemp = int(totalSurfaceTemp/numberOfData) # Calculate average surface temperature. If we had only one hotspot, the result would be unaffected
                avgConfidence = int(totalConfidence/numberOfData) # Calculate average confidence. If we had only one hotspot, the result would be unaffected
                hotspotDoc['surface_temperature_celcius'] = avgSurfaceTemp  # Update the surface temperature celcius in the hotspot document we chosen
                hotspotDoc['confidence'] = avgConfidence # Update the confidence in the hotspot document we chosen
                
                # To handle misaligned dates, make sure hotspot dates match the climate data's date
                curr_datetime = datetime.strptime(hotspotDoc['created_datetime'],'%d/%m/%Y %H:%M:%S')
                new_datetime = climateData['date'].replace(hour = curr_datetime.hour, minute = curr_datetime.minute)
                hotspotDoc['datetime'] = new_datetime
                del hotspotDoc['created_datetime'] # Rename column to match part A, remove previous column
                
                hotspotDoc['cause'] = fireCause # Insert the fire cause
                del hotspotDoc['producer'] # Remove producer from hotspot document
                hotspotDocs.append(hotspotDoc) # Add this hotspot document into the list of hotspots which will be inserted into the climate data later on

            climateData['hotspots'] = hotspotDocs

    # If we found a climate data earlier, we will insert the climateData that we have prepared, else just don't insert anything
    if climateData is not None:
        del climateData['producer'] # Remove producer from climate document
        del climateData['latitude'] # Remove latitude and longitud e
        del climateData['longitude']
        climateData['station'] = 948701 # Give station a constant value
        collection.insert_one(climateData)

In [None]:
# Put a trigger for 10 seconds so we only collect the data for every 10 seconds
# foreachBatch is used instead so that we are able to process all the data in a batch at once
db_writer = (
    output_stream_df
    .writeStream
    .outputMode('append')
    .trigger(processingTime = '10 seconds') 
    .foreachBatch(process_batch) 
)

console_logger = (
    output_stream_df
    .writeStream
    .outputMode('append')
    .format('console')
)

In [None]:
writer = db_writer
try:
    query = writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopped query')
except StreamingQueryException as exc:
    print(exc)
finally:
    query.stop()