In [1]:
#uncomment this to install the library
# !pip3 install pygeohash

## Libraries and auxiliary functions

In [2]:
#load the libraries
from time import sleep
from kafka import KafkaConsumer
import datetime as dt
import pygeohash as pgh

In [3]:
#fuctions to check the location based on the geo hash (precision =5)
#function to check location between 2 data
def close_location (data1,data2):
    print("checking location...of sender",data1.get("id")," and  sender" , data2.get("id"))

    #with the precision =5 , we find the location that close together with the radius around 2.4km
    if data1.get("geohash")== data2.get("geohash"): 
        print("=>>>>>sender",str(data1.get("id")),"location near   ",  "sender",str(data2.get("id")),"location")
    else:
        print('>>>not close together<<<')
        
#function to check location between the joined data and another data (e.g hotspot data)
def close_location_2 (data1,data2): 
    print("checking location...of joined data id:",data1.get("id")," and  sender" , data2.get("id"))
    
    #with the precision =5 , we find the location that close together with the radius 2.4km
    if data1.get("geohash")== data2.get("geohash"): 
        print("=>>>> location",str(data1.get("geohash")),"location near   ",  str(data2.get("geohash")),"location")
    else:
        print('>>>not close together<<<')


# check location of 2 climate data stored in the list
def close_location_in_list(a_list):
    print('check 2 climate location data')
    data_1 = a_list[0]
    data_2 = a_list[1]
    close_location (data_1,data_2)

In [4]:
#auxilary function to handle the average and join of the json file
#function to merge satellite data
def merge_sat(data1,data2):
    result ={}
    
    result["_id"] = data1.get("_id") # take satellite _id ,we will store this joined data to the hotspot collection
    result["created_time"] = data1.get("created_time")
    
    
    #average the result of the location
    result['surface_temperature_celsius'] = (float(data1.get("surface_temperature_celsius"))+float(data2.get("surface_temperature_celsius")))/2    
    result["confidence"] = (float(data1.get("confidence"))+float(data2.get("confidence")))/2
    
    #reassign the location like the initial data structure
    result['geohash'] = data2.get('geohash')
    result["location"] = data1.get("location")

        
    return result

# function to join climate data and satellite data
def join_data_cli_sat(climData,satData):
    result={}

    #get location and id of the join data
    result["_id"] = climData.get("_id") # take climate _id ,we will store this joined data to the climate collection
    result['geohash'] = climData.get('geohash')
    result["location"] = climData.get("location")
    result["created_time"] = climData.get("created_time")
    

    #get climate data
    result["air_temperature_celsius"] = climData.get("air_temperature_celsius")
    result["relative_humidity"] = climData.get("relative_humidity")
    result["max_wind_speed"] = climData.get("max_wind_speed")
    result["windspeed_knots"] = climData.get("windspeed_knots")
    result["precipitation"] = climData.get("precipitation")
   
    #get satellite data
    result["surface_temperature_celsius"] = satData.get("surface_temperature_celsius")
    result["confidence"] = satData.get("confidence")
    result["hotspots"] = satData.get("_id") #reference to the hotspot data like in the task A_B
        
    return result
        


## Streaming Application

In [5]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'


import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils



def sendDataToDB(iter):
    client = MongoClient()
    db = client.fit5148_assignment_db
    
    # MongoDB design
    sat_col = db.hotspot #to store satellite data and joined satellite data 
     
    # to store the join between climate and satellite
    clim_col = db.climate #to store the climate data
    
    #list of senders per iter
    sender = []
    
    #variable to store the data from 3 unique senders per iter
    climList = []
    satData_2 = {}
    satData_3 = {}
#####################################  PARSING THE DATA FROM SENDERS  PER ITER###########################################
    for record in iter: 
            sender.append(record[0])
            data_id = json.loads(record[1])
            data = data_id.get('data')
            


            if record[0] == "sender_2" : #parse AQUA satelite data

                
                #main data
                #add "AQUA" string to the "_id" to handle the case when 2 satellite data come at the same time
                #to make sure the incomming data from AQUA at a specific time is unique
                satData_2["_id"] = "AQUA" +str(dt.datetime.strptime(str(data_id.get("created_time")), "%Y-%m-%dT%H:%M:%S"))
                satData_2["id"] = data_id.get("sender_id") #unique sender_id
                
                #use datetime as ISO format for readable in mongoDB
                satData_2["created_time"] = dt.datetime.strptime(str(data_id.get("created_time")), "%Y-%m-%dT%H:%M:%S")
                
                # parse other data
                satData_2["location"] = {"latitude" : float(data.get("lat")), "longitude" : float(data.get("lon"))}
                satData_2["surface_temperature_celsius"] = float(data.get("surface_temp"))
                satData_2["confidence"] = float(data.get("confidence"))
                geohash = pgh.encode(float(data.get("lat")),float(data.get("lon")),precision=5)            
                satData_2["geohash"] = geohash #unique_location               



            if record[0] == "sender_3": #parse TERRA satelite data

                #main data
                #add "TERRA" string to the "_id" to handle the case when 2 satellite data come at the same time
                #to make sure the incomming data for TERRA at a specific time is unique
                satData_3["_id"] = "TERRA" +str(dt.datetime.strptime(str(data_id.get("created_time")), "%Y-%m-%dT%H:%M:%S"))
                satData_3["id"] = data_id.get("sender_id") #unique sender_id
                
                #use datetime as ISO format for readable in mongoDB
                satData_3["created_time"] = dt.datetime.strptime(str(data_id.get("created_time")), "%Y-%m-%dT%H:%M:%S")
                # parse other data
                satData_3["location"] = {"latitude" : float(data.get("lat")), "longitude" : float(data.get("lon"))}
                satData_3["surface_temperature_celsius"] = float(data.get("surface_temp"))
                satData_3["confidence"] = float(data.get("confidence"))
                geohash = pgh.encode(float(data.get("lat")),float(data.get("lon")),precision=5)               
                satData_3["geohash"] = geohash #unique_location     



            if record[0] == "sender_1": #parse climate data
                climData = {}

                
                
                #main data
                #add "CLIM" string to the "_id" to handle to make sure the incomming data for 
                #climate at a specific time is unique
                climData["_id"] = "CLIM" + str(dt.datetime.strptime(str(data_id.get("created_time")), "%Y-%m-%dT%H:%M:%S"))
                climData["id"] = data_id.get("sender_id") #unique sender_id
                
                #use datetime as ISO format for readable in mongoDB
                climData["created_time"] = dt.datetime.strptime(str(data_id.get("created_time")), "%Y-%m-%dT%H:%M:%S")
                climData["location"] = {"latitude" : float(data.get("lat")), "longitude" :  float(data.get("lon"))}

                
                climData["air_temperature_celsius"] = float(data.get("air_temp"))
                climData["relative_humidity"] = float(data.get("relative_humid"))
                climData["max_wind_speed"] = float(data.get("max_wind_speed"))
                climData["windspeed_knots"] = float(data.get("windspeed"))
                climData["precipitation"] = data.get("prep")
                geohash = pgh.encode(float(data.get("lat")),float(data.get("lon")),precision=5) 
                climData["geohash"] = geohash
                climList.append(climData)

    uniq_sender_id = set(sender) #check unique sender for each iter

################################ PERFOMING JOIN AND CHECK LOCATION THEN PUSH TO MONGODB ##################################

####################### Received only from unique one sender
    
    #for climate data, there will be the case with on 2 streams of climate data go throught the app
    if len(uniq_sender_id) == 1 and "sender_1" in uniq_sender_id:#store to climate data to mongoDB
        print("---------------------received CLIMATE data------------------------")
        try:
            #find close location in climate data and print out
            if len(climList) > 1:
                #check 2 climate location data
                close_location_in_list(climList)
                
            for data in climList:
                clim_col.insert(data)
        
        except Exception as ex:
            print("Exception Occured. Message: {0}".format(str(ex)))
    
    # if there is one satellite data (AQUA), there will be no case with 2 same satelite data
    if len(uniq_sender_id) == 1 and "sender_2" in uniq_sender_id:#store to climate data to mongoDB
        print("---------------------received AQUA data------------------------")
        try:

            sat_col.insert(satData_2)
        
        except Exception as ex:
            print("Exception Occured. Message: {0}".format(str(ex)))
            
    # if there is one satellite data (TERRA) , there will be no case with 2 same satelite data
    if len(uniq_sender_id) == 1 and "sender_3" in uniq_sender_id:#store to climate data to mongoDB
        print("---------------------received TERRA data------------------------")
        try:

            sat_col.insert(satData_3)
        
        except Exception as ex:
            print("Exception Occured. Message: {0}".format(str(ex)))
    
########################## Received  from 2 unique  senders

    elif len(sender) == 2 and len(uniq_sender_id) == 2:
        print("---------------------received 2 streams------------------------")
        #will have 1 case, because there will be at least 1 climate data 
        #if the consummer received 2, that will be the climat data and one sat data
        #or 2 climate data because we assume that there is at least 1 climate data in the stream
        
       
        try:

            for climate in climList:


                if len(satData_3)!=0:
                    
                    #check location
                    close_location(climate,satData_3)

                    #check lat lon first!!!
                    print('---checking TERRA and Climate location---')
                    if satData_3["location"] == climate["location"]:
                        print('joining....')
                        join_cli_sat = join_data_cli_sat(climate,satData_3)
                        clim_col.insert(join_cli_sat)
                        sat_col.insert(satData_3)
                    else:
                        print('no join')
                        sat_col.insert(satData_3)
                        clim_col.insert(climate)

                elif len(satData_2)!=0:
                    #check close location
                    close_location(climate,satData_2)

                    print('---checking AQUA and Climate location---')
                    #check lat lon first!!!
                    if satData_2["location"] == climate["location"]:
                        print('joining....')
                        join_cli_sat = join_data_cli_sat(climate,satData_2)
                        clim_col.insert(join_cli_sat)
                        sat_col.insert(satData_2)
                    else:
                        print('no join')
                        sat_col.insert(satData_2)
                        clim_col.insert(climate)
                else: #received only 2 climate data

                    print('received 2 climate data')
                    clim_col.insert(climate)

            # if we received 2 sattelite data only (rare case, we ran out of climate data)
            if len(climList) == 0:
                if len(satData_3)!=0 and len(satData_2)!=0:
                    #check location
                    close_location(satData_3,satData_2)
                    print('---checking AQUA and TERRA location---')
                    if satData_2["location"] == satData_3["location"]:
                        print('joining....')
                        sat_data = merge_sat(satData_2,satData_3)

                        #insert the data into the mongo with handling the exceptions : duplicate
                        sat_col.insert(sat_data)
                        
                    else:
                        sat_col.update(satData_3, satData_3, upsert=True)
                        sat_col.update(satData_2, satData_2, upsert=True)



                    
            
        except Exception as ex:
            print("Exception Occured. Message: {0}".format(str(ex))) #exception will occur with empty satelite data

            
#########################################################Received 3 stream
########################## Received  from 2 unique  sender            

#we assume that there is at least 1 climate data in the stream , so if we have 3 streams of data
# there will be 2 climate data and 1 satelite data because the app process 10 secs batch
# if received 3 streams, there will be 2 climate data and 1 satellite data

    if len(sender) == 3: 
        print("---------------------received 3 streams------------------------")
        try:
            
            if len(climList) > 1:
                #check 2 climate location data
                close_location_in_list(climList)

            for climate2 in climList:

                if len(satData_3)!=0:
                    
                    
    
                    #check location
                    close_location(climate2,satData_3)
            
                    print('---checking TERRA and Climate location---')
                    if satData_3["location"] == climate2["location"]:
                        print('joining....')

                        join_data = join_data_cli_sat(climate2,satData_3)
                        clim_col.insert(join_data)
                        sat_col.update(satData_3, satData_3, upsert=True)
                        

                    else:
                        print('no join')

                        clim_col.insert(climate2)
                        
                        #insert the data into the mongo with handling the exceptions : duplicate
                        sat_col.update(satData_3, satData_3, upsert=True)


                elif len(satData_2)!=0:
                    
                    
                    #check location
                    close_location(climate2,satData_2)
                    
                    print('---checking AQUA and Climate location---')
                    
                    if satData_2["location"] == climate2["location"]:
                        print('joining....')
                        
                        join_data = join_data_cli_sat(climate2,satData_2)
                        clim_col.insert(join_data)
                        sat_col.update(satData_2, satData_2, upsert=True)
                        
                    else:
                        print('no join')
                       
                        clim_col.insert(climate2)
                        #insert the data into the mongo with handling the exceptions : duplicate
                        sat_col.update(satData_2, satData_2, upsert=True)



        except Exception as ex:
            print("Exception Occured. Message: {0}".format(str(ex)))
            
 ########################################Received 4 streams of data#################################   
# There will be 2 climate data and 2 satellite data from AQUA and TERRA

    elif len(sender) ==4 : # 4 will have 2 climate data and 2 sat data
        print("---------------------received 4 streams------------------------")
        try:
            
            if len(climList) > 1:
                #check 2 climate location data
                close_location_in_list(climList)

            for climate2 in climList:
                print('---checking AQUA , TERRA and Climate location---')
                #location sat2=sat3=climate
                if (satData_2["location"] == satData_3["location"])\
                and  (satData_2["location"] == climate2["location"]):
                    print('joining....')
                    
                    #join 2 satellite data
                    sat_data = merge_sat(satData_2,satData_3)
                    sat_col.update(sat_data, sat_data, upsert=True)
                    
                    #join with the climate file
                    final_data = join_data_cli_sat(climate2,sat_data)
                    clim_col.insert(final_data)
                    
                
                #location sat2=sat3
                elif (satData_2["location"] == satData_3["location"])\
                and (satData_2["location"] != climate2["location"]):
                    print('joining....')
                    sat_data = merge_sat(satData_2,satData_3)
                    
                    #insert the data into the mongo with handling the exceptions : duplicate
                    sat_col.update(sat_data, sat_data, upsert=True)
                    clim_col.insert(climate2)
                    
                    #check location
                    close_location_2(sat_data,climate2)
                
                #location sat2=climate 
                elif (satData_2["location"] != satData_3["location"])\
                and (satData_2["location"] == climate2["location"]):
                    print('joining....')
                   
                    join_data = join_data_cli_sat(climate2,satData_2)
                    clim_col.insert(join_data)
                    
                    #insert the data into the mongo with handling the exceptions : duplicate
                    sat_col.update(satData_3, satData_3, upsert=True)
                    sat_col.update(satData_2, satData_2, upsert=True)
#
                    #check location
                    close_location_2(join_data,satData_3)
                
                #location sat3 =climate
                elif (satData_2["location"] != satData_3["location"])\
                and (satData_3["location"] == climate2["location"]):
                    print('joining....')
                    
                    join_data = join_data_cli_sat(climate2,satData_3)
                    clim_col.insert(join_data)
                    
                    #insert the data into the mongo with handling the exceptions : duplicate
                    sat_col.update(satData_3, satData_3, upsert=True)
                    sat_col.update(satData_2, satData_2, upsert=True)
#                
                    #check location
                    close_location_2(join_data,satData_2)
                
                #if nothing to merge
                else:
                    

                    print('no join')

                    #check location
                    close_location(climate2,satData_2)
                    close_location(climate2,satData_3)
                    close_location(satData_2,satData_3)
                    clim_col.insert(climate2)
                    
                    #insert the data into the mongo with handling the exceptions
                    sat_col.update(satData_3, satData_3, upsert=True)
                    sat_col.update(satData_2, satData_2, upsert=True)
                    


               
            
        except Exception as ex:
            print("Exception Occured. Message: {0}".format(str(ex)))
          
    client.close()


    
################################################  INITIATE THE STREAM ################################################
n_secs = 10 # set batch to 10 seconds
topic = 'TaskC'

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]") #set 2 processors
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
    
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'localhost:9092', 
                        'group.id':'taskC-group', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary
    
lines= kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))


# this line print to check the data IDs has gone through the app for a specific time
a = kafkaStream.map(lambda x:x[0])
a.pprint()


ssc.start()  

# ssc.awaitTermination()

# ssc.start()
time.sleep(3000) # Run stream for 20 mins just to get the data for visualisation
# # ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

-------------------------------------------
Time: 2019-05-24 17:45:20
-------------------------------------------
sender_2
sender_3
sender_1
sender_1

-------------------------------------------
Time: 2019-05-24 17:45:30
-------------------------------------------
sender_3
sender_1
sender_1

-------------------------------------------
Time: 2019-05-24 17:45:40
-------------------------------------------
sender_2
sender_1
sender_1

-------------------------------------------
Time: 2019-05-24 17:45:50
-------------------------------------------
sender_1
sender_2
sender_1

-------------------------------------------
Time: 2019-05-24 17:46:00
-------------------------------------------
sender_3
sender_1
sender_1

-------------------------------------------
Time: 2019-05-24 17:46:10
-------------------------------------------
sender_1
sender_2
sender_3
sender_1

-------------------------------------------
Time: 2019-05-24 17:46:20
-------------------------------------------
sender_1
sender_

KeyboardInterrupt: 

-------------------------------------------
Time: 2019-05-24 17:48:50
-------------------------------------------

-------------------------------------------
Time: 2019-05-24 17:49:00
-------------------------------------------

-------------------------------------------
Time: 2019-05-24 17:49:10
-------------------------------------------

-------------------------------------------
Time: 2019-05-24 17:49:20
-------------------------------------------

-------------------------------------------
Time: 2019-05-24 17:49:30
-------------------------------------------

-------------------------------------------
Time: 2019-05-24 17:49:40
-------------------------------------------

-------------------------------------------
Time: 2019-05-24 17:49:50
-------------------------------------------

-------------------------------------------
Time: 2019-05-24 17:50:00
-------------------------------------------

-------------------------------------------
Time: 2019-05-24 17:50:10
----------