In [1]:
#import statements
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import geohash as geh
import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

#Function that sends data to mongodb of each batch every 10 seconds
def sendDataToDB(iter):
    client = MongoClient()
    db = client.FIT5148 #Specifying the name of the database to create in mongo db
    week11 = db.Streaming_Data #Specifyting the name of the collection to create
    
    #Creating 3 lists to store climate, aqua and terra data separately
    climate_data = []
    aqua_data = []
    terra_data = []
    
    #For every record in each iteration(from each batch), do the following:
    for record in iter:
        data = record[1] #store the useful record
        my1 = json.loads(data)
        var1 = geh.encode(float(my1['latitude']),float(my1['longitude']), precision = 5) #calculate geohash with precision = 5
        my1['geohash'] = var1 #Insert geohash value into the dict with a new key
        #using sender id we determine which record belongs to which data and append to respective list
        if my1['sender_id'] == 'id_1':
            climate_data.append(my1)
        elif my1['sender_id'] =='id_2':
            aqua_data.append(my1)
        else:
            terra_data.append(my1)
    
    a = len(aqua_data)
    t = len(terra_data)
    if t == 1 and a == 1: #If aqua and terra data exist (There can be at most 1 record from aqua and terra each)
        for c in climate_data:
            g1 = c['geohash']  
            my_arr = []
            my_dict = c
            for i in aqua_data:
                for j in terra_data:
                    id1 = i['geohash']
                    id2 = j['geohash']  
                    if id1 == id2: #If geo hash of aqua and terra match, we calculate the averages
                        d1 = dict()
                        sur1 = i['surface_temperature_celcius']
                        sur2 = j['surface_temperature_celcius']
                        con1 = i['confidence']
                        con2 = j['confidence']
                        avg_sur = (sur1 + sur2) / 2.0
                        avg_con = (con1 + con2) / 2.0
                        d1['average_surface_temp'] = avg_sur
                        d1['average_confidence'] = avg_con
                        my_arr.append(d1)
                        if g1 == id1: #append list with average values if there exists a nearest climate data
                            my_dict['hotspot_data'] = my_arr
                    else: #if aqua and terra geohashes don't match
                        if g1 == id1:
                            my_arr.append(i)
                            my_dict['hotspot_data'] = my_arr #add aqua data if it matches geohash of climate data
                        elif g1 == id2:
                            my_arr.append(j)
                            my_dict['hotspot_data'] = my_arr #add terra data if it matches geohash of climate data
                        else:
                            print("Just climate data")

    elif t == 1 and a != 1: #if aqua data doesn't exist
        for i in climate_data:
            my_arr = []
            my_dict = i
            for j in terra_data: 
                id1 = i['geohash']
                id2 = j['geohash']
                if id1 == id2: #check if the geohash match with the climate data and add terra info
                    my_arr.append(j)
                    my_dict['hotspot_data'] = my_arr            
                else:
                    print("Just climate data")
                                
    elif a == 1 and t != 1: #if terra data doesn't exist
        for i in climate_data:
            my_arr = []
            my_dict = i
            for j in aqua_data:
                id1 = i['geohash']
                id2 = j['geohash']
                if id1 == id2: #check if aqua data location is closer to climate data location and add aqua info
                    my_arr.append(j)
                    my_dict['hotspot_data'] = my_arr
                else:
                    print("Just climate data")
    else: 
        print("Just climate data")
    for rec in climate_data: #insert records from climate data
        week11.insert(rec)        
        
    #print("--------------The end-------------------------")
    client.close()

n_secs = 10 #Batch interval of 10 seconds
topic = "streaming02" #Name of the topic from producers 1, 2 and 3

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]") #2 Master threads
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
    
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'week11-group', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary

lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))

ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

KeyboardInterrupt: 