Name : Tan Sook Mun

ID   : 30695759
    
Email: stan0111@student.monash.edu

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import pygeohash as pgh

def sendDataToDB(iter):    
    #Store the message into local variable
    aqua_records=[]
    terra_records=[]
    climate={}
    for record in iter:
        key=record[0]
        if key == "producer1":
            #rstore the climate
            climate = json.loads(record[1])
        elif key == "producer2":
            #if is from aqua store in aqua_records
            aqua_records.append(json.loads(record[1]))
        else:
            #if is from aqua store in terra_records
            terra_records.append(json.loads(record[1]))

    if climate!={}: #if no climate data dont store anything in database
        # Data processing
        hotspot=[]
        fire_event=[]
        cli_long=climate.get('longitude')
        cli_lat=climate.get('latitude')
        #look at aqua hotspots
        for aqua in aqua_records:
            long=aqua.get('longitude')
            lat=aqua.get('latitude')
            #checking if the area is close to each other in precission 3
            hash_a=pgh.encode(long,lat,precision=3)
            hash_b=pgh.encode(cli_long,cli_lat,precision=3)
            if hash_a!=hash_b: #if is not close
                aqua_records.remove(aqua) #remove hotspots that are far
            else: #if is close
                aqua["date"]=climate.get('date')#add the date 
                hotspot.append(aqua)#append into the list of hotspots
                
        #look at aqua hotspots
        for terra in terra_records:
            long=terra.get('longitude')
            lat=terra.get('latitude')
            hash_a=pgh.encode(long,lat,precision=3)
            hash_b=pgh.encode(cli_long,cli_lat,precision=3)
            if hash_a!=hash_b:
                terra_records.remove(terra)#remove hotspots that are far                 
            else: #if is close
                terra["date"]=climate.get('date')#add the date 
                hotspot.append(terra)#append into the list of hotspots

        climate["hotspots"]=hotspot
        if (len(aqua_records)>0 and len(terra_records)>0): #ensure there is aqua and terra data to compare 
            for aqua in aqua_records: #for each aqau check if it matches with terra
                aqua_long=aqua.get('longitude')
                aqua_lat=aqua.get('latitude')
                for terra in terra_records:
                    terra_long=aqua.get('longitude')
                    terra_lat=aqua.get('latitude')
                    hash_a=pgh.encode(aqua_long,aqua_lat,precision=5)
                    hash_b=pgh.encode(terra_long,terra_lat,precision=5)
                    if hash_a==hash_b: #if in the same location
                        fire={} #create a fire object
                        #get the avg values
                        fire["avg_surface_temp"]=(aqua["surface_temperature_celcius"]+
                                                  terra["surface_temperature_celcius"])/2
                        fire["confidence"]=(aqua["confidence"]+terra["confidence"])/2
                        #determine if the fire is natural or others
                        if climate.get("air_temperature_celcius")>20 and climate.get("GHI_w/m2")>180:
                            fire["cause"]="natural"
                        else:
                            fire["cause"]="others"
                        #setting the new key hour for the data visualition
                        fire["hour"]=int(terra.get("time")[0:2])
                        fire_event.append(fire)#append the collection of fire events
                        
        climate["fire_events"]=fire_event #add it an array of fire even into the key "fire_events"

    #Storing data into DB
    print("STORING PART")
    client = MongoClient()
    db = client.fit3182_assignment_db
    partB = db.partB
    if climate !={}: #if not climate dont store anything
        try:
            partB.insert(climate) #insert climate data
            print("inserted",climate)
        except Exception as ex:
            print("Exception Occured. Message: {0}".format(str(ex)))
    client.close()

    
##-------------------------------------------------------------------------------
n_secs = 10
topic = "partB"

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
    
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'week11-group', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary

lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))

ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

KeyboardInterrupt: 

In [3]:
#to check the records that has been added into the collection partB
from pymongo import MongoClient
from pprint import pprint

client = MongoClient()
db = client.fit3182_assignment_db
partB = db.partB
m=partB.find()
count=0
for a in m:
    pprint(a)
    count+=1
print(count)

{'GHI_w/m2': 145,
 '_id': ObjectId('60aa08ce9343690ee21a8da0'),
 'air_temperature_celcius': 18,
 'date': '2019-01-05',
 'date:': '',
 'fire_events': [{'avg_surface_temp': 60.0,
                  'cause': 'others',
                  'confidence': 82.0,
                  'hour': 12},
                 {'avg_surface_temp': 45.0,
                  'cause': 'others',
                  'confidence': 68.0,
                  'hour': 22},
                 {'avg_surface_temp': 45.0,
                  'cause': 'others',
                  'confidence': 70.0,
                  'hour': 3},
                 {'avg_surface_temp': 82.0,
                  'cause': 'others',
                  'confidence': 97.0,
                  'hour': 12},
                 {'avg_surface_temp': 67.0,
                  'cause': 'others',
                  'confidence': 83.0,
                  'hour': 22},
                 {'avg_surface_temp': 67.0,
                  'cause': 'others',
                  'confidence': 85.0,

 'precip_flag': 'G',
 'precipitation': 0.83,
 'relative_humidity': 60.5,
 'windspeed_knots': 7.0}
{'GHI_w/m2': 90,
 '_id': ObjectId('60aa091e9343690ee21a8db0'),
 'air_temperature_celcius': 10,
 'date': '2019-01-13',
 'date:': '',
 'fire_events': [{'avg_surface_temp': 44.0,
                  'cause': 'others',
                  'confidence': 58.5,
                  'hour': 12},
                 {'avg_surface_temp': 49.0,
                  'cause': 'others',
                  'confidence': 72.0,
                  'hour': 17},
                 {'avg_surface_temp': 46.5,
                  'cause': 'others',
                  'confidence': 68.5,
                  'hour': 22},
                 {'avg_surface_temp': 45.5,
                  'cause': 'others',
                  'confidence': 64.0,
                  'hour': 12},
                 {'avg_surface_temp': 50.5,
                  'cause': 'others',
                  'confidence': 77.5,
                  'hour': 17},
                 {'a

               'date': '2019-01-19',
               'latitude': -35.4509,
               'longitude': 143.1522,
               'surface_temperature_celcius': 47,
               'time': '08:11:43'},
              {'confidence': 54,
               'date': '2019-01-19',
               'latitude': -35.7608,
               'longitude': 143.3545,
               'surface_temperature_celcius': 47,
               'time': '08:11:46'},
              {'confidence': 90,
               'date': '2019-01-19',
               'latitude': -36.4325,
               'longitude': 144.3142,
               'surface_temperature_celcius': 67,
               'time': '03:23:46'}],
 'latitude': -37.611,
 'longitude': 149.277,
 'max_wind_speed': 13.0,
 'precip_flag': 'I',
 'precipitation': 0.0,
 'relative_humidity': 49.4,
 'windspeed_knots': 9.3}
{'GHI_w/m2': 83,
 '_id': ObjectId('60aa09649343690ee21a8dbe'),
 'air_temperature_celcius': 9,
 'date': '2019-01-20',
 'date:': '',
 'fire_events': [{'avg_surface_temp': 36.

                  'confidence': 85.0,
                  'hour': 22}],
 'hotspots': [{'confidence': 86,
               'date': '2019-01-28',
               'latitude': -36.4291,
               'longitude': 141.2426,
               'surface_temperature_celcius': 61,
               'time': '22:35:43'},
              {'confidence': 92,
               'date': '2019-01-28',
               'latitude': -36.1367,
               'longitude': 145.2071,
               'surface_temperature_celcius': 71,
               'time': '08:11:43'},
              {'confidence': 73,
               'date': '2019-01-28',
               'latitude': -35.1707,
               'longitude': 143.2276,
               'surface_temperature_celcius': 47,
               'time': '12:59:46'},
              {'confidence': 93,
               'date': '2019-01-28',
               'latitude': -36.4046,
               'longitude': 141.1534,
               'surface_temperature_celcius': 73,
               'time': '03:23:46'}],
 'lat

                 {'avg_surface_temp': 41.0,
                  'cause': 'others',
                  'confidence': 55.0,
                  'hour': 22},
                 {'avg_surface_temp': 47.0,
                  'cause': 'others',
                  'confidence': 81.0,
                  'hour': 12},
                 {'avg_surface_temp': 47.5,
                  'cause': 'others',
                  'confidence': 64.0,
                  'hour': 22},
                 {'avg_surface_temp': 41.0,
                  'cause': 'others',
                  'confidence': 74.5,
                  'hour': 12},
                 {'avg_surface_temp': 41.5,
                  'cause': 'others',
                  'confidence': 57.5,
                  'hour': 22},
                 {'avg_surface_temp': 47.5,
                  'cause': 'others',
                  'confidence': 83.5,
                  'hour': 12},
                 {'avg_surface_temp': 48.0,
                  'cause': 'others',
                  '

                 {'avg_surface_temp': 67.0,
                  'cause': 'others',
                  'confidence': 86.5,
                  'hour': 3},
                 {'avg_surface_temp': 71.0,
                  'cause': 'others',
                  'confidence': 87.5,
                  'hour': 8},
                 {'avg_surface_temp': 70.0,
                  'cause': 'others',
                  'confidence': 78.5,
                  'hour': 17},
                 {'avg_surface_temp': 80.0,
                  'cause': 'others',
                  'confidence': 95.0,
                  'hour': 22},
                 {'avg_surface_temp': 70.0,
                  'cause': 'others',
                  'confidence': 86.5,
                  'hour': 3},
                 {'avg_surface_temp': 67.0,
                  'cause': 'others',
                  'confidence': 87.0,
                  'hour': 8},
                 {'avg_surface_temp': 66.0,
                  'cause': 'others',
                  'conf

                  'cause': 'others',
                  'confidence': 84.5,
                  'hour': 14},
                 {'avg_surface_temp': 39.0,
                  'cause': 'others',
                  'confidence': 59.5,
                  'hour': 19},
                 {'avg_surface_temp': 53.5,
                  'cause': 'others',
                  'confidence': 70.0,
                  'hour': 4},
                 {'avg_surface_temp': 50.0,
                  'cause': 'others',
                  'confidence': 87.0,
                  'hour': 14},
                 {'avg_surface_temp': 47.0,
                  'cause': 'others',
                  'confidence': 62.0,
                  'hour': 19}],
 'hotspots': [{'confidence': 69,
               'date': '2019-01-08',
               'latitude': -37.067,
               'longitude': 141.379,
               'surface_temperature_celcius': 36,
               'time': '19:00:57'},
              {'confidence': 74,
               'date': '2019-01-

                  'confidence': 79.5,
                  'hour': 23},
                 {'avg_surface_temp': 68.5,
                  'cause': 'others',
                  'confidence': 82.0,
                  'hour': 4},
                 {'avg_surface_temp': 64.0,
                  'cause': 'others',
                  'confidence': 81.0,
                  'hour': 9},
                 {'avg_surface_temp': 87.5,
                  'cause': 'others',
                  'confidence': 98.5,
                  'hour': 14},
                 {'avg_surface_temp': 99.0,
                  'cause': 'others',
                  'confidence': 100.0,
                  'hour': 19},
                 {'avg_surface_temp': 74.0,
                  'cause': 'others',
                  'confidence': 79.5,
                  'hour': 23},
                 {'avg_surface_temp': 75.0,
                  'cause': 'others',
                  'confidence': 82.0,
                  'hour': 4},
                 {'avg_surface_te

               'longitude': 141.1927,
               'surface_temperature_celcius': 58,
               'time': '09:25:00'}],
 'latitude': -36.942,
 'longitude': 143.282,
 'max_wind_speed': 13.0,
 'precip_flag': 'I',
 'precipitation': 0.0,
 'relative_humidity': 58.3,
 'windspeed_knots': 7.1}
{'GHI_w/m2': 122,
 '_id': ObjectId('60aa1ad09343690daa71f758'),
 'air_temperature_celcius': 13,
 'date': '2019-01-27',
 'fire_events': [{'avg_surface_temp': 58.0,
                  'cause': 'others',
                  'confidence': 82.0,
                  'hour': 4},
                 {'avg_surface_temp': 54.0,
                  'cause': 'others',
                  'confidence': 69.0,
                  'hour': 9},
                 {'avg_surface_temp': 46.5,
                  'cause': 'others',
                  'confidence': 61.0,
                  'hour': 14},
                 {'avg_surface_temp': 46.0,
                  'cause': 'others',
                  'confidence': 62.0,
                  'hou

                  'hour': 4},
                 {'avg_surface_temp': 46.5,
                  'cause': 'others',
                  'confidence': 71.5,
                  'hour': 14},
                 {'avg_surface_temp': 49.5,
                  'cause': 'others',
                  'confidence': 75.5,
                  'hour': 19}],
 'hotspots': [{'confidence': 68,
               'date': '2019-02-05',
               'latitude': -37.3344,
               'longitude': 149.3933,
               'surface_temperature_celcius': 47,
               'time': '14:12:57'},
              {'confidence': 75,
               'date': '2019-02-05',
               'latitude': -37.0704,
               'longitude': 145.3684,
               'surface_temperature_celcius': 49,
               'time': '19:00:57'},
              {'confidence': 79,
               'date': '2019-02-05',
               'latitude': -36.9221,
               'longitude': 141.9429,
               'surface_temperature_celcius': 52,
            