In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'


import sys
import time
import Geohash
import json, ast
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

############################ sending data to mongodb

def sendDataToDB(list1):
    client = MongoClient()
    db = client.fit5148_assignment_db
    taskC = db.taskC
    for record in list1:
        try:
            taskC.insert(record)
        except Exception as exp:
            print('Exception Occurred. Message {0}'.format(str(exp)))
    client.close()
    
###########################geoHash generation

def genHash(lat,lon):
    
    hashLoc = Geohash.encode(lat, lon, precision = 5) #use 2 for better results
    return hashLoc

##########################c embedding generated geo hash to each json record 

def embedGenCode(alist,date):
    blist=([],[],[])
    for producerData in range(len(alist)):
        for record in alist[producerData]:
            jsonData = {}
            jsonData = ast.literal_eval(record)
            if producerData == 0:
                lat = jsonData['Climate']['latitude']
                lon = jsonData['Climate']['longitude']
            else:
                lat = jsonData['latitude']
                lon = jsonData['longitude']
            geoValue = genHash(lat,lon)
            jsonData['geocode']=geoValue
            jsonData['Date'] = date
            blist[producerData].append(jsonData)
    processData(blist)
        
#####################################################

def processData(blist):

    final_list = []
    
    for climateData in blist[0]: #checking climate data
        temp={}
        sumSurfacetemperature=0
        sumConfidence=0
        counter=0       
        for aquaData in blist[1]:
            if aquaData['geocode']==climateData['geocode']:
                sumSurfacetemperature = sumSurfacetemperature + aquaData['surface_temperature_celcius']
                sumConfidence = sumConfidence + aquaData['confidence']
                counter = counter +1
                temp['latitudeAqua']=aquaData['latitude'] 
                temp['longitudeAqua']=aquaData['longitude']
        for terradata in blist[2]:
            if terradata['geocode'] == climateData['geocode']:
                sumSurfacetemperature = sumSurfacetemperature + terradata['surface_temperature_celcius']
                sumConfidence = sumConfidence +terradata['confidence'] 
                counter = counter + 1            
                temp['latitudeTerra']=terradata['latitude'] 
                temp['longitudeTerra']=terradata['longitude']                
        if (sumSurfacetemperature != 0 and sumSurfacetemperature != 0): # calculating average if geocode matches 
            temp['surface_temperature_celcius'] = sumSurfacetemperature/counter #for aqua and terra data 
            temp['confidence'] = sumConfidence/counter #if multiple values of either Aqua or terra arrives, then also taking average
            if 'latitudeAqua' in temp.keys():
                temp['latitude'] = temp.pop('latitudeAqua')
                temp['longitude'] = temp.pop('longitudeAqua')
            elif 'latitudeTerra' in temp.keys():
                temp['latitude'] = temp.pop('latitudeTerra')
                temp['longitude'] = temp.pop('longitudeTerra')
            else:
                temp['latitude'] = temp.pop('latitudeAqua')
                temp['longitude'] = temp.pop('longitudeAqua')
                temp.pop('latitudeTerra')
                temp.pop('longitudeTerra')           
        if temp:
            climateData['hotspot_data'] = temp
        final_list.append(climateData)
    print(final_list)
    sendDataToDB(final_list)
           
################################## 3 buckets for three different producers

def createBucket(iter):
    alist = ([],[],[])
    for record in iter:
        data = record[1].split("&")
        if data[1] == "climate_data":
            alist[0].append(data[2])
        elif data[1] == "aqua_Data":
            alist[1].append(data[2])
        else:
            alist[2].append(data[2])                       
    try:       
        embedGenCode(alist,data[0])   
    except Exception as ex:
        print("Exception Occured. Message: {0}".format(str(ex)))
              
n_secs = 10
topic = "climate_hotspot"

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]") #2 execution threads
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
    
kafkaStream_c = KafkaUtils.createDirectStream(ssc, ['climate_hotspot'], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'taskC-group', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary
    

kafkaStream_c.pprint()

lines = kafkaStream_c.foreachRDD(lambda rdd: rdd.foreachPartition(createBucket))
ssc.start()
time.sleep(600) 
ssc.stop(stopSparkContext=True,stopGraceFully=True)

-------------------------------------------
Time: 2019-05-28 22:52:50
-------------------------------------------
('parsed', "22:52:46&climate_data&{'Climate': {'longitude': 143.375, 'latitude': -37.332, 'relative_humidity': 51.7, 'precipitation ': ' 0.00I', 'windspeed_knots': 7.2, 'air_temperature_celcius': 21, 'max_wind_speed': 13.0}}")

-------------------------------------------
Time: 2019-05-28 22:53:00
-------------------------------------------
('parsed', "22:52:51&climate_data&{'Climate': {'longitude': 143.1666, 'latitude': -36.2212, 'relative_humidity': 44.2, 'precipitation ': ' 0.00I', 'windspeed_knots': 5.8, 'air_temperature_celcius': 11, 'max_wind_speed': 9.9}}")
('parsed', "22:52:56&climate_data&{'Climate': {'longitude': 143.791, 'latitude': -35.962, 'relative_humidity': 49.9, 'precipitation ': ' 0.00G', 'windspeed_knots': 6.8, 'air_temperature_celcius': 14, 'max_wind_speed': 15.9}}")

-------------------------------------------
Time: 2019-05-28 22:53:10
------------------

KeyboardInterrupt: 

-------------------------------------------
Time: 2019-05-28 22:54:30
-------------------------------------------
('parsed', "22:54:21&climate_data&{'Climate': {'longitude': 146.149, 'latitude': -36.294, 'relative_humidity': 57.7, 'precipitation ': ' 0.00I', 'windspeed_knots': 9.9, 'air_temperature_celcius': 20, 'max_wind_speed': 18.1}}")
('parsed', "22:54:26&climate_data&{'Climate': {'longitude': 143.1062, 'latitude': -37.8147, 'relative_humidity': 46.4, 'precipitation ': ' 0.00I', 'windspeed_knots': 9.5, 'air_temperature_celcius': 17, 'max_wind_speed': 20.0}}")

-------------------------------------------
Time: 2019-05-28 22:54:40
-------------------------------------------
('parsed', "22:54:31&climate_data&{'Climate': {'longitude': 149.297, 'latitude': -37.6, 'relative_humidity': 53.4, 'precipitation ': ' 0.00I', 'windspeed_knots': 10.9, 'air_temperature_celcius': 22, 'max_wind_speed': 19.0}}")

-------------------------------------------
Time: 2019-05-28 22:54:50
------------------