In [63]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import sys
import time
import json
import datetime as dt
import pygeohash as gh
from kafka import KafkaConsumer
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf 
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

def sendDataToDB(iter):
    client = MongoClient()
    db = client.fit5148_assignment_db
    taskc = db.taskc
    # store arrival time for current batch in a variable processing_time
    arrival_time= dt.datetime.now().strftime("%X")
    for record in iter:
        data = json.loads(record[1])
        # initialize empty dict to store climate, hotspot data
        climate = {}
        hotspot = {}
        # if data is from producer 1 which is the climate data
        if data.get("sender_id") == 1:
            # extract the attributes for this climate record
            climate["_id"] = data.get("created_time")
            climate["station_latitude"] = float(data.get("latitude"))
            climate["station_longitude"] = float(data.get("longitude"))
            climate["air_temperature_celcius"] = int(data.get("air_temperature_celcius"))
            climate["relative_humidity"] = float(data.get("relative_humidity"))
            climate["windspeed_knots"] = float(data.get("windspeed_knots"))
            climate["max_wind_speed"] = float(data.get("max_wind_speed"))
            climate["precipitation"] = data.get("precipitation")
            climate["hotspots"] = []
            climate["arrival_time"] = arrival_time
            # write this climate record to MongoDB
            try:
                taskc.replace_one({"_id":data.get("created_time")}, climate, True)
            except Exception as ex:
                print("Exception Occured. Message: {0}".format(str(ex)))
        # if data is from producer 2 or producer 3 which is the hotspot data
        elif (data.get("sender_id") == 2 or data.get("sender_id") == 3):
            # initialize empty dict to store geohash of current batch climate records
            geohash = {}
            # get latitude and longitude of this current hotspot record
            latitude = float(data.get("latitude"))
            longitude = float(data.get("longitude"))
            # calculate the geohash string representation of this hotspot with precision 5
            hotspot_geohash = gh.encode(latitude, longitude, precision=5)
            # find the climate data for this batch using the arrival time and extract it's id, lat, lon
            cursor = taskc.find({"arrival_time": arrival_time},{\
                            "_id":1,"station_latitude":1,"station_longitude":1})
            # for each climate record stored in this batch, we calculate it's geohash representation 
            for each in cursor: 
                geohash[each['_id']] = gh.encode(each['station_latitude'],each['station_longitude'],precision=5)
            # compare hotspot_geohash with the climate's geohash and get id of climate if geohash matches
            for key, value in geohash.items():
                if hotspot_geohash in value:
                    assigned_climate = key
            # process this hotspot record only if this hotspot matches the climate geohash
            if len(assigned_climate) > 0:
                # extract the attributes for this hotspot record
                hotspot["created_time"] = data.get("created_time")
                hotspot["hotspot_latitude"] = latitude
                hotspot["hotspot_longitude"] = longitude
                # if there is no hotspot record tagged under this climate's record, update hotspot record
                if (taskc.find({"_id": assigned_climate, "hotspots": {'$not': {'$size':0}}}).count()) == 0:
                    hotspot["confidence"] = int(data.get("confidence"))
                    hotspot["surface_temperature_celcius"] = int(data.get("surface_temperature_celcius"))
                    try:
                        taskc.update_one({'_id': assigned_climate},{'$push':{"hotspots":hotspot}})
                    except Exception as ex:
                        print("Exception Occured. Message: {0}".format(str(ex)))
                # if there is a hotspot record under the climate's record,
                # we will need to average the surface temperature and confidence
                elif (taskc.find({"_id": assigned_climate, "hotspots": {'$not': {'$size':0}}}).count()) > 0:
                    # store confidence and surface temp of current hotspot record
                    confidence_1 = int(data.get("confidence"))
                    sur_temp_cel_1 = int(data.get("surface_temperature_celcius"))
                    # find the confidence and surface temp of hotspot stored under this climate
                    cursor = taskc.find({"_id": assigned_climate} ,{"_id":0, "hotspots.confidence":1,\
                                                               "hotspots.surface_temperature_celcius":1})
                    # extract the confidence and surface temp
                    for each in cursor:
                        confidence_2 = each['hotspots'][0]['confidence']
                        sur_temp_cel_2 = each['hotspots'][0]['surface_temperature_celcius']
                    # calculate the average of confidence and surface temperature
                    avg_confidence = (confidence_1 + confidence_2)/2
                    avg_sur_temp_cel = (sur_temp_cel_1 + sur_temp_cel_2)/2
                    # assign confidence and surface temp of current hotspot record to the average
                    hotspot['confidence'] = avg_confidence
                    hotspot['surface_temperature_celcius'] = avg_sur_temp_cel
                    try:
                        # update the confidence and surface temp of previously stored hotspot record
                        taskc.update_one({'_id': assigned_climate, "hotspots.confidence": confidence_2},{\
                                            '$set':{"hotspots.$.confidence":avg_confidence}})
                        taskc.update_one({'_id': assigned_climate, "hotspots.surface_temperature_celcius":\
                                          sur_temp_cel_2},{'$set':{"hotspots.$.surface_temperature_celcius":\
                                                                    avg_sur_temp_cel}})
                        # push this hotspot record into the climate that has same geohash
                        taskc.update_one({'_id': assigned_climate},{'$push':{"hotspots":hotspot}})
                    except Exception as ex:
                        print("Exception Occured. Message: {0}".format(str(ex)))
                    
    client.close()

# Create a local StreamingContext with as many working processors as possible and a batch interval of 10 seconds            
batch_interval = 10

# local[2]: run Spark locally with 2 execution threads
conf = SparkConf().setAppName("ReportFire").setMaster("local[2]")
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
# entry point for spark streaming, a batch interval of 10 seconds 
ssc = StreamingContext(sc, batch_interval)

topic_1 = "climate_data"
topic_2 = "hotspot_data"

kafkaStream_climate = KafkaUtils.createDirectStream(ssc, [topic_1], {
                        'bootstrap.servers':'127.0.0.1:9092',
                        'group.id':'reportfire-group',
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})

kafkaStream_hotspot = KafkaUtils.createDirectStream(ssc, [topic_2], {
                        'bootstrap.servers':'127.0.0.1:9092',
                        'group.id':'reportfire-group',
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})


# for every batch, send the two RDDs and all partitions to the function 'sendDataToDB'
climate = kafkaStream_climate.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))
hotspot = kafkaStream_hotspot.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))

ssc.start()
time.sleep(3600) # Run stream for 60 minutes
ssc.stop(stopSparkContext=True,stopGraceFully=True)