# FIT3182 - Big data management and processing

Name: Cheong Karr Kei

Student ID: 30091497

Email: kche0070@student.monash.edu


# Assignment Part B #

**Task 1. Processing Stream Data**

**Streaming application**

In this notebook, we have the streaming application which has a local streaming context with two execution threads and a batch interval of 10 seconds. This streaming application receives data from all of our three producers. 

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'


import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from pprint import pprint
import pygeohash as pgh
import datetime as dt



def compare_location(location1, location2, precision):
    """
    This function takes in two locations and uses the geo-hashing algorithm to tell us if they are close to each other

    Params: 
    1. location1: tuple of (latitude, longitude)
    1. location2: tuple of (latitude, longitude)
    3. precision: integer indicating the precision of the geo-hashing algorithm

    """
    
    #boolean which tells us whether the two locations are close to each other 
    close = False

    hash_1 = pgh.encode(location1[0], location1[1], precision)
    hash_2 = pgh.encode(location2[0], location2[1], precision)

    if hash_1 == hash_2:
        close = True
        
    return close

def geo_hash(location, precision):
    hash_1 = pgh.encode(location[0], location[1], precision)
    
    return hash_1



def sendDataToDB(iter):
    client = MongoClient()
    #Create DB
    db = client.fit3182_assignment_db
    #Create climates collection
    #partb_climate = db.climate_stream
    partb_climate = db.climate_stream_test
    #Create fires collection
    #partb_fire = db.fire_stream
    partb_fire = db.fire_stream_test
    
    #array to store climate data 
    climate_data = []
    #array to store AQUA fire data 
    aqua_data = []
    #array to store TERRA fire data 
    terra_data = []
    
    for record in iter:
        #get key
        key = record[0]
        #get document
        data = json.loads(record[1])
        
        #append data to the arrays according to the key 
        #if it is climate data
        if key == "P1":
            climate_data.append(data)
            

        #if it is AQUA fire data
        if key == "P2":
            aqua_data.append(data)

        #if it is TERRA fire data
        if key == "P3":
            terra_data.append(data)
    
         
    print("Climate data: " + str(len(climate_data)))
    print("AQUA data: " + str(len(aqua_data)))
    print("TERRA data: " + str(len(terra_data)))
    print("\n")
    
    
    #variable to store the final climate data that we will add to the database 
    final_climate = None
    #array to store the final fire data that we will add to the database 
    final_fires = []
    #array to store the fire data that are close to the climate location 
    close_fires = []
    
    
    #proceed to data processing only if there is climate data 
    if len(climate_data)!= 0:
        #if there is no hotspot data, just store the climate data 
        if len(aqua_data) == 0 and len(terra_data) == 0:
            final_climate = climate_data[0]
            final_climate["fires"] = []
        
        #if there is hotspot data, compare these hotspot data locations with our climate data location
        else:
            #obtain climate data 
            data = climate_data[0]
            #obtain the latitude of climate data
            climate_lat = data["latitude"]
            #obtain the longitude of climate data
            climate_lon = data["longitude"] 
            #save latitude and longitude of climate data as  tuple
            climate_loc = (climate_lat, climate_lon)
            
            #if there is AQUA data
            if len(aqua_data) > 0:
                #iterate each data
                for data in aqua_data:
                    #obtain the latitude of AQUA data
                    aqua_lat = data["latitude"]
                    #obtain the longitude of AQUA data
                    aqua_lon = data["longitude"]
                    #save latitude and longitude of AQUA data as  tuple
                    aqua_loc = (aqua_lat, aqua_lon)
                    #compare the locations using geo-hashing algorithm with precision 3 
                    close = compare_location(climate_loc, aqua_loc, 3)
                    #if they are close, add AQUA data to array of close locations 
                    print(close)
                    if close:
                        close_fires.append(data)
                        pprint(data)
            
            #if there is TERRA data
            if len(terra_data) > 0:
                #iterate each data
                for data in terra_data:
                    #obtain the latitude of TERRA data
                    terra_lat = data["latitude"]
                    #obtain the longitude of TERRA data
                    terra_lon = data["longitude"]
                    #save latitude and longitude of TERRA data as  tuple
                    terra_loc = (terra_lat, terra_lon)
                    #compare the locations using geo-hashing algorithm with precision 3 
                    close = compare_location(climate_loc, terra_loc, 3)
                    #if they are close, add TERRA data to array of close locations 
                    print(close)
                    if close:
                        close_fires.append(data)
                        pprint(data)
                        
            
        #if there are no close fires to climate data, just add climate data 
        if len(close_fires) == 0:
            final_climate = climate_data[0]
            final_climate["fires"] = []
            
        #if there are close fires to climate data, perform geo-hashing algorithm and group the fire data by that value
        else:
            #initialise dictionary to store fire data 
            fire_hash = {}
            
            #iterate close fire data
            for data in close_fires:
                #get latitude of fire data 
                fire_lat = data["latitude"]
                #get longitude of fire data 
                fire_lon = data["longitude"]
                #store location as tuple
                fire_loc = (fire_lat, fire_lon)
                #get geo hash of location with precision 5
                current_hash = geo_hash(fire_loc, 5)
                
                #if geo hash of current location already exists, append to existing array
                if current_hash in fire_hash.keys():
                    fire_hash[current_hash].append(data)
                #else, add the key and array value
                else:
                    fire_hash[current_hash] = [data]
                    
            #iterature through the dictionary
            for key in fire_hash:
                #if there is only one data for that key, add to final fires data straight away 
                if len(fire_hash[key]) == 1:
                
                    #add to final fire data
                    final_fires.append(fire_hash[key][0])
                
                #if there is more than one data for that key, obtain the average latitude, longitude, confidence and surface temperature
                elif len(fire_hash[key]) > 1:
                    
                    #store total latitude 
                    total_lat = None
                    #store total longitude 
                    total_lon = None
                    #store total confidence
                    total_conf = None
                    #store total surface temperature
                    total_temp = None
                    
                    #iterate through data with same key
                    for data in fire_hash[key]:
                        #get latitude
                        lat = data["latitude"]
                        #get longitude
                        long = data["longitude"]
                        #get confidence
                        conf = data["confidence"]
                        #get surface temperature
                        temp = data["surface_temperature_celcius"]
                        
                        #obtain total latitude
                        if total_lat == None:
                            total_lat = lat 
                        else: 
                            total_lat += lat 
                        #obtain total longitude
                        if total_lon == None:
                            total_lon = long
                        else: 
                            total_lon += long 
                        #obtain total confidence
                        if total_conf == None:
                            total_conf = conf 
                        else: 
                            total_conf += conf
                        #obtain total surface temperature
                        if total_temp == None:
                            total_temp = temp 
                        else: 
                            total_temp += temp 
                            
                    
                    #obtain average 
                    total_data = len(fire_hash[key])
                    #store new latitude 
                    new_lat = round(total_lat/total_data, 3)
                    #store new longitude 
                    new_lon = round(total_lon/total_data,3)
                    #store new confidence
                    new_conf = round(total_conf/total_data)
                    #store new surface temperature
                    new_temp = round(total_temp/total_data)
                    #store new datetime, in this case we just use the datetime of last element in array
                    new_datetime = fire_hash[key][len(fire_hash[key])-1]["datetime"]
                    
                    #create new data 
                    new_fire = {
                        "latitude": float(new_lat),
                        "longitude": float(new_lon),
                        "confidence": int(new_conf),
                        "surface_temperature_celcius": int(new_temp),
                        "datetime": new_datetime
                    }
                    
                    #add data to final fires data
                    final_fires.append(new_fire)
                    
                    
                    
                
        #get climate data 
        final_climate = climate_data[0]

        #get the date
        climate_date = final_climate["date"]

        #get the air temperature 
        air_temp = final_climate["air_temperature_celcius"]
        #get the GHI
        ghi = final_climate["GHI_w/m2"]
        #store the cause of fire as other by default
        fire_cause = "other"

        #if air temperature >20 and GHI > 180, cause of fire is "natural"
        if air_temp > 20 and ghi > 180:
            fire_cause = "natural"               

        #if we still have fire data to add 
        if len(final_fires) >0:
            #iterate through fire data
            for data in final_fires:
                fire_dt = data["datetime"]
                #get time 
                fire_dt = dt.datetime.strptime(fire_dt,"%Y-%m-%dT%H:%M:%S").strftime("%X")
                #change format of climate date
                climate_date_formatted = dt.datetime.strptime(climate_date, "%d/%m/%Y").strftime("%Y-%m-%d")
                #new datetime for fire data 
                new_dt = climate_date_formatted + "T" + fire_dt
                #change datetime
                data["datetime"] = new_dt
                #add cause of fire 
                data["cause"] = fire_cause
                #add date
                data["date"] = climate_date
                
            
            
            #insert fire data to database 
            partb_fire.insert_many(final_fires)
            
            
            #get the ID's of inserted fire data 
            fire_ids = []
            
            results = partb_fire.find({"date":climate_date})
            for doc in results:
                fire_ids.append(doc["_id"])
                
            
            #add fires field to climate data containing the ID's of corresponding fires
            final_climate["fires"] = fire_ids
            
        #add climate data to database 
        partb_climate.insert_one(final_climate)
        
    
    print("Climate data:")
    pprint(final_climate)
    print("Fire data:")
    for i in final_fires:
        pprint(i)
    
    print("\n")
    
       
   
        
    client.close()

n_secs = 10
topic = "FIT3182_Assignment"

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
    
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'fit3182-group', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary

lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))

ssc.start()
#time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

In [None]:
# from pymongo import MongoClient
# from pprint import pprint
# client = MongoClient()
# #Create DB
# db = client.fit3182_assignment_db
# #Create climates collection
# partb_climate = db.partb_climate
# #Create fires collection
# partb_fire = db.partb_fire

# partb_climate.drop()
# partb_fire.drop()