# FIT3182 Big data management and processing - S1 2021 Assignment
***

Name: Wong Kai Lin

Student ID: 30507588

Email: kwon0061@student.monash.edu
***


## Part B- 
## Task 1. Processing Data Stream 
### d. Streaming Application:

Write a streaming application in Apache Spark Streaming 
which has a local streaming context with two execution threads and a batch interval 
of 10 seconds. The streaming application will receive streaming data from all three 
producers. If the streaming application has data from all or at least two of the 
producers, do the processing as follows:

- Group the streams based on the location (i,e, latitude and longitude) and 
create the data model developed in Part A.

- You can find if two locations are close to each other or not by implementing 
the geo-hashing algorithm or find a library that does the job for you. The 
precision number in the algorithm determines the number of characters in the 
Geohash. Please use precision 3. If the climate data and hotspot data are not 
close to each other we can ignore the hotspot data and just store the climate 
data.

- If the streaming application has the data from only one producer (Producer 
1), it implies that there was no fire at that time and we can store the climate 
data into MongoDB straight away. 

- If we receive the data from two different satellites AQUA and TERRA for the 
same location (to determine whether the two locations are the same or not 
please use geohash with precision 5), then average the ‘surface temperature’ 
and ‘confidence’ from the two satellites and save it as a fire event.

- If a fire was detected with an air temperature greater than 20 (°C) and a GHI 
greater than 180 (W/m2
), then report the cause of the fire event as ‘natural’. 
Otherwise, report the cause of the fire event as ‘other’.


In [1]:
import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import pygeohash as pgh
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

# returns whether 2 locations are close to one another
def closeness(lat1, lon1, lat2, lon2, prec):
    hash_a = pgh.encode(lat1, lon1, precision= prec)
    hash_b = pgh.encode(lat2, lon2, precision= prec)
    
    if hash_a == hash_b:
        return True
    else:
        return False

def sendDataToDB(iter):

    # iter - the 10 second data stream (list of messages from the (10secs) batch of data)
    
    # First part. Store the message into local variable
    climate_data = {}
    aqua_data = []
    terra_data = []
    
    for record in iter:
        
        key = record[0]
        
        if key == "P1": # producer 1: climate data
            climate = json.loads(record[1])
            
        elif key == "P2": # producer 2: hotspot_aqua data
            aqua_data.append(json.loads(record[1]))
            
        elif key == "P3": # producer 3: hotspot_terra data
            terra_data.append(json.loads(record[1]))
    
    
    # Second part. Data processing  
    
    if climate_data == {}: # end the process if there is no climate data
        print("There is no Climate data.")
        return
        
    #else:      
    
        # 1. Compare climate and each hotspot's location with precision=3
            # If not close...
                # Remove the data
                
        #hotspot = []
        #fire_event = []

        
        
        # 2. Combine the hotspot data that are close to each other

            # EXAMPLES:
            # 2 hospot aqua and 1 hotspot terra produce same geohash, combine 3 to 1
                # average the surface temperature and confidence
                # average lat, lon
                # date follow climate, time choose any one of the data

            # Hint: for....loop , hash parition, merge (group operation for data with same attribute)
            
            
        # 3. Decide the fire event is natural or other based on the air temparature and WHI (in climate data)
             
            # Get the air temparature and WHI from climate
            
            # Append the 'natural' or 'other' to the hostpot data
                # Create a new field to store the value
        
    
    
    
    # Last part. Store the data into mongodb
    client = MongoClient()
    db = client.fit3182_assignment_db # database
    streaming = db.streaming_data # create new collection
    
    #json_data = {'date': date, 'climate': climate_data, 'hotspot': hotspot, 'fire_event': fire_event}
    
    for record in iter:           
        data = record[1].split(":")
        key = record[0]

        json_data = {}

        json_data["producer"] = key # data from which producer
                
        try:
            streaming.insert(json_data) # insert data into MongoDb
        except Exception as ex:
            print("Exception Occured. Message: {0}".format(str(ex)))
                
    client.close()

n_secs = 10
topic = "hotspot"

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
    
# Direct stream to receive data from Kafka - Establish connection    
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'localhost:9092', 
                        'group.id':'streaming_data', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary 

# send every partition of data in each batch of data (data every 10secs) into sendDataToDB()         
lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))
 
ssc.start()
print('Awaiting message from producer...')
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)
print('Process stopped.')

Awaiting message from producer...


KeyboardInterrupt: 