Name: Sohan Pujar
Student Code: 30567556
Email: spuj0001@student.monash.edu

In [None]:
# Import appropriate dependencies and libraries
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import sys
import time
import json
import pandas
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
from pprint import pprint

# Looked at source code for pygeohash as couldn't import pygeohash appropriately
def encode(latitude, longitude, precision=12):
    """
    Encode a position given in float arguments latitude, longitude to
    a geohash which will have the character count precision.
    """
    __base32 = '0123456789bcdefghjkmnpqrstuvwxyz' #32 characters for base 32 expression
    lat_interval = (-90.0, 90.0)
    lon_interval = (-180.0, 180.0)
    geohash = []
    bits = [16, 8, 4, 2, 1]
    bit = 0
    ch = 0
    even = True
    while len(geohash) < precision:
        if even:
            mid = (lon_interval[0] + lon_interval[1]) / 2
            if longitude > mid:
                ch |= bits[bit]
                lon_interval = (mid, lon_interval[1])
            else:
                lon_interval = (lon_interval[0], mid)
        else:
            mid = (lat_interval[0] + lat_interval[1]) / 2
            if latitude > mid:
                ch |= bits[bit]
                lat_interval = (mid, lat_interval[1])
            else:
                lat_interval = (lat_interval[0], mid)
        even = not even
        if bit < 4:
            bit += 1
        else:
            geohash += __base32[ch]
            bit = 0
            ch = 0
    return ''.join(geohash)

spark = SparkSession.builder.config(
    "spark.archives",  
    "pyspark_venv.tar.gz#environment").getOrCreate()



def sendDataToDB(iter):
    client = MongoClient()
    db = client.fit3182_assignment_db
    hardquestion = db.hardquestion# Initialising appropriate pymongo commands
    hashed = 0
    lst = []
    proper_lst = []
    for record in iter:
        data = json.loads(record[1]) # goes through iter
        if data.get("producer") == 1: # process data if record comes from producer one
            jsonData = {} # form document and process the RDD
            jsonData['latitude'] = data.get('latitude')
            jsonData['longitude'] = data.get('longitude')
            jsonData['air_temperature_celcius'] = data.get('air_temperature_celcius')
            jsonData['relative_humidity'] = data.get('relative_humidity')
            jsonData['windspeed_knots'] = data.get('windspeed_knots')
            jsonData['max_wind_speed'] = data.get('max_wind_speed')
            jsonData['GHI_w/m2'] = data.get('GHI_w/m2')
            jsonData['report'] = data.get('report')
            jsonData['precipitation '] = data.get('precipitation ')
            jsonData["producer"] = data.get("producer")
            jsonData['date'] = data.get('date') 
            date = data.get('date') # save as variables for later use
            air_temp = data.get('air_temperature_celcius')
            GHI = data.get('GHI_w/m2')
            hashed = encode(data.get('latitude'),data.get('longitude'),precision = 3) # Call pygeohash source code, encode to define hashed
            hardquestion.insert_one(jsonData) # inserting the document in database
        else:
            lst.append(data) #appends separate list as RDD is lost from iter once iterated 
    
    for data in lst: # iterate over list with RDD from producer 2 and 3
        if encode(data.get('latitude'),data.get('longitude'),precision = 3) == hashed:
            proper_lst.append(data) # filter out the hotspots which don't lie within region of hashed from climate with precision 3
    dictionary = {}
    for data in proper_lst: # Iterate over filtered data
        hashed = encode(data.get('latitude'),data.get('longitude'),precision = 5)
        hash_date = data.get('date')[12:]
        key = str(hashed)+hash_date # Create dictionary and store as key
        if key in dictionary:
            dictionary[key].append(data) # Lies within precision 5 then append
        else:
            dictionary[key] = [data] # If data doesn't already exist, then create new key
    
    
    for key in dictionary: # Iterate through dictionary
        data = dictionary[key][0]
        long = data.get('latitude') # grab first value as lat, long and date will be same values per key
        lat = data.get('longitude')
        hotspot_date = data.get('date')
        stc_lst = [] # Initialised and processed for later use
        c_lst = []
        for record in dictionary[key]:
            stc = record.get('surface_temperature_celcius')
            c = record.get('confidence')
            stc_lst.append(stc)
            c_lst.append(c) # Append all records of surface temperature and cofidence to corresponding list
        stc_avg = sum(stc_lst)/len(stc_lst)
        c_avg = sum(c_lst)/len(c_lst) # Calculate average
        if GHI>180 and air_temp > 20: # Check if fire is natural
            json_doc = {"longitude": long, "latitude": lat, "datetime": hotspot_date, "surface_temp_avg":stc_avg, "confidence_avg": c_avg, "cause_of_fire": "natural"}
        else:
            json_doc = {"longitude": long, "latitude": lat, "datetime": hotspot_date, "surface_temp_avg":stc_avg, "confidence_avg": c_avg, "cause_of_fire": "other"}
        hardquestion.update_one({'date':date},{'$push':{'hotspot_location':json_doc}}) # Insert into collection
    
    client.close() # close client

n_secs = 10 # number of seconds before consuming
topic = "data"

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate() # creating sparkcontext
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs) # batch window in the spark context
    
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'week12-group', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary

lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB)) # Partitioned RDD to process simultaneously
kafkaStream.pprint()# printing the records consumed
ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

In [None]:
from pymongo import MongoClient
client = MongoClient()
db = client.fit3182_assignment_db
db.hardquestion.drop()

In [None]:
encode(-30, -30,precision = 3)