In [1]:
import os
# set the Kafka version
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'


import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

  

n_secs = 10
topic = 'climate'

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
    


def sendDataToDB(messages): 
    
    client = MongoClient()
    db = client.fit5148_db
    week11 = db.new # send to this collection
    
    
    all_record = [] # to store the streaming records
    all_producer = {} # to store records from 3 producers
    all_location = {} # to store records from different locations
    
    # for each record in Dstream RDD partition, do the following
    for record in messages:
        data = json.loads(record[1]) # change data to json format 
        all_record.append(data) # add data to a list 
    
    # this part creates a dictionary that is in the form of {producer: [{record}]}
    # have 1 to 3 keys 
    # for each json item in the list, do the following 
    for m in all_record:
        if int(m['sender_id']) not in list(all_producer.keys()):
            all_producer[int(m['sender_id'])] = [m]
            print ('no')
        else:
            all_producer[int(m['sender_id'])].append(m) 
            print('yes')

            
    # this part joins the streams by geohash. use a dictionary to collect the results in the form of {geogash: [{record}]}
    # have 1 to 4 keys  
    # for each json item in the list, do the following 
    for m in all_record:
        if m['geohash'] not in list(all_location.keys()):
            all_location[m['geohash']] = [m]
        else: 
            all_location[m['geohash']].append(m)
     # when we only receive data from producer 1, send this climate data directly to MongoDB
    if len(set(list(all_producer.keys()))) == 1 and set(list(all_producer.keys())) == set([1]):
        
        for value in all_producer.values():
            
            content_climate = {'latitude':float(value[0]['latitude']), 
                               'longitude':float(value[0]['longitude']), 
                               'air_temperature_celcius':int(value[0]['air_temperature_celcius']), 
                               'relative_humidity':float(value[0]['relative_humidity']),
                               'windspeed_knots':float(value[0]['windspeed_knots']),
                               'max_wind_speed':float(value[0]['max_wind_speed']),
                               'precipitation':value[0]['precipitation'],
                               'datetime': value[0]['created_time']}

            week11.insert_one(content_climate)
   

    # when have records from more than 1 producers 
    elif len(set(all_producer.keys())) > 1:

        # loop through the locations dictionary to check scenearios:
        # 1. if we have only climate data (1 or 2 for the same location)--> insert into MongoDB
        # 2. if we have hotspot data and climate data --> embed hotspot in climate --> insert into MongoDB
        # 3. if we have hotspot data without climate data --> do not store
        # 4. if we have hotspot data with a shared location from TERRA and AQUA & also a matching climate record  
        #    --> average confidence and surface temperature celcius --> embed hotspot in climate --> insert into MongoDB
    
        for value in all_location.values():
            
            # scenario 1: location only has data from producer 1 
            if len(value) == 1 and int(value[0]['sender_id']) == 1:
                
                        content_climate = {'latitude':float(value[0]['latitude']), 
                               'longitude':float(value[0]['longitude']), 
                               'air_temperature_celcius':int(value[0]['air_temperature_celcius']), 
                               'relative_humidity':float(value[0]['relative_humidity']),
                               'windspeed_knots':float(value[0]['windspeed_knots']),
                               'max_wind_speed':float(value[0]['max_wind_speed']),
                               'precipitation':value[0]['precipitation'],
                               'datetime': value[0]['created_time']}
                        
                        week11.insert_one(content_climate)
    
          


            # scenario 2: same location has data from 
            # producer 1,1 || producer 1,2 || producer 1,3 || producer 2,3 (we do not store)
            elif len(value) == 2:

                # records could be in the order of 3,1 or 2,1
                # therefore loop through all the records then determine if we need to embed hotspot into climate
                
                has_climate = False # track if we have climate data 
                has_hotspot = False # track if we have hotspot data 
                has_another_climate = False # track if we have another climate data 


                # for each record in this location, do the following
                for i in range(len(value)):
                    
                    temp = value[i] # a dictionary 
                    
                    # the case which we have only seen one climate data, create a climate content
                    if int(temp['sender_id']) == 1 and has_climate == False:
                        content_climate = {'latitude':float(temp['latitude']), 
                               'longitude':float(temp['longitude']), 
                               'air_temperature_celcius':int(temp['air_temperature_celcius']), 
                               'relative_humidity':float(temp['relative_humidity']),
                               'windspeed_knots':float(temp['windspeed_knots']),
                               'max_wind_speed':float(temp['max_wind_speed']),
                               'precipitation':temp['precipitation'],
                               'datetime': temp['created_time'],
                               'hotspot':[]}


                        has_climate = True # flag that this location has a climate data 

                    # the case which we have two climate data, create a second climate content
                    elif int(temp['sender_id']) == 1 and has_climate == True:

                        content_climate_2 = {'latitude':float(temp['latitude']), 
                                   'longitude':float(temp['longitude']), 
                                   'air_temperature_celcius':int(temp['air_temperature_celcius']), 
                                   'relative_humidity':float(temp['relative_humidity']),
                                   'windspeed_knots':float(temp['windspeed_knots']),
                                   'max_wind_speed':float(temp['max_wind_speed']),
                                   'precipitation':temp['precipitation'],
                                   'datetime': temp['created_time']}

                        has_another_climate = True

                    # create a hotspot content
                    else:

                        content_hotspot = {'latitude':float(temp['latitude']), 
                                                'longitude':float(temp['longitude']), 
                                                'confidence':int(temp['confidence']), 
                                                'time':temp['created_time'].split(' ')[1],
                                                'surface_temperature_celcius':float(temp['surface_temperature_celcius'])
                                               }



                        has_hotspot = True # flag that this location has a hotspot data 


                # perform embedding and insertion into MongoDB when we have both climate data & hotspot data 
                if has_hotspot and has_climate: 
                    content_climate['hotspot'].append(content_hotspot) # embed hotspot into climate
                    
                    try:
                        week11.insert_one(content_climate)
                        print ('success')
                    except Exception as ex:
                        print("Exception Occured. Message: {0}".format(str(ex)))
                
                # when we have 2 climate records 
                elif has_another_climate == True:
                    
                    content_climate.pop('hotspot') # do not need the extra hotspot attribute 
               
                    try:
                        week11.insert_one(content_climate)
                        week11.insert_one(content_climate_2)
                        print ('success')
                    except Exception as ex:
                        print("Exception Occured. Message: {0}".format(str(ex)))



         

            # when we have data from more than 1 producer
            # could have 1,1,2 || 1,1,3 || 1,2,3 || 1,1,2,3 
            # last two cases need to average confidence and air_temperature_celcius for both hotspots
            if len(value) > 2: 
                
                has_climate = False
                has_hotspot = False
                has_another_climate = False
        

                # for each record in this location, do the following
                for i in range(len(value)):
                    
                    temp = value[i] # a dictionary 

                    # the case which we have only seen one climate data, create a climate content
                    if int(temp['sender_id']) == 1 and has_climate == False:

                        content_climate = {'latitude':float(temp['latitude']), 
                                   'longitude':float(temp['longitude']), 
                                   'air_temperature_celcius':int(temp['air_temperature_celcius']), 
                                   'relative_humidity':float(temp['relative_humidity']),
                                   'windspeed_knots':float(temp['windspeed_knots']),
                                   'max_wind_speed':float(temp['max_wind_speed']),
                                   'precipitation':temp['precipitation'],
                                   'datetime': temp['created_time'],
                                   'hotspot':[]}


                        has_climate = True

                    # the case which we have two climate data, create a second climate content
                    elif int(temp['sender_id']) == 1 and has_climate == True:

                        content_climate_2 = {'latitude':float(temp['latitude']), 
                                   'longitude':float(temp['longitude']), 
                                   'air_temperature_celcius':int(temp['air_temperature_celcius']), 
                                   'relative_humidity':float(temp['relative_humidity']),
                                   'windspeed_knots':float(temp['windspeed_knots']),
                                   'max_wind_speed':float(temp['max_wind_speed']),
                                   'precipitation':temp['precipitation'],
                                   'datetime': temp['created_time'],
                                   'hotspot':[]}

                        has_another_climate = True

                    
                    # the case which we have hotspot data, create hotspot content for potential embedding
                    else:


                        # when there's hotspot exists for the same location from a different producer 
                        if has_hotspot: 

                            content_hotspot['confidence'] = (content_hotspot['confidence'] + float(temp['confidence'])) / 2
                            content_hotspot['surface_temperature_celcius'] = (content_hotspot['surface_temperature_celcius'] \
                                                                              + float(temp['surface_temperature_celcius'])) / 2


                        # when there's no hotspot exists from either 2 or 3 
                        else: 
                            content_hotspot = {'latitude':float(temp['latitude']), 
                                                'longitude':float(temp['longitude']), 
                                                'confidence':int(temp['confidence']),
                                                'time':temp['created_time'].split(' ')[1],
                                                'surface_temperature_celcius':float(temp['surface_temperature_celcius'])
                                               }



                        has_hotspot = True 


                # insert two records when we have 1,1,2 or 1,1,3 or 1,1,2,3
                if has_another_climate == True:
                    content_climate['hotspot'].append(content_hotspot) # embed hotspot into climate record 1
                    content_climate_2['hotspot'].append(content_hotspot) # embed hotspot into climate record 2
                    try:
                        week11.insert_one(content_climate)
                        week11.insert_one(content_climate_2)
                        print ('success')
                    except Exception as ex:
                        print("Exception Occured. Message: {0}".format(str(ex)))

                # insert one record when we have 1,2,3
                else:
                    content_climate['hotspot'].append(content_hotspot) # embed hotspot into climate
                    try:
                        week11.insert_one(content_climate)
                        print ('success')
                    except Exception as ex:
                        print("Exception Occured. Message: {0}".format(str(ex)))


                
        client.close()

        
        
        
        
        
kafkaStream = KafkaUtils.createDirectStream(ssc,[topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'week11-group', 
                        'fet`ch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary
kafkaStream.pprint()
lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))


ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

-------------------------------------------
Time: 2019-05-24 15:58:50
-------------------------------------------

-------------------------------------------
Time: 2019-05-24 15:59:00
-------------------------------------------
(None, '{"geohash": "r1me7", "longitude": "142.8935", "sender_id": "2", "latitude": "-37.3847", "surface_temperature_celcius": "88", "created_time": "2019-05-24 15:58:54", "confidence": "100"}')

-------------------------------------------
Time: 2019-05-24 15:59:10
-------------------------------------------

-------------------------------------------
Time: 2019-05-24 15:59:20
-------------------------------------------
(None, '{"latitude": "-36.6942", "longitude": "143.8021", "geohash": "r1qr2", "created_time": "2019-05-24 15:59:12", "confidence": "93", "sender_id": "3", "surface_temperature_celcius": "72"}')
(None, '{"geohash": "r1jr8", "longitude": "142.3959", "sender_id": "2", "latitude": "-38.0254", "surface_temperature_celcius": "99", "created_time": "20

-------------------------------------------
Time: 2019-05-24 16:02:10
-------------------------------------------
(None, '{"created_time": "2019-05-24 16:02:00", "longitude": "148.067", "sender_id": "1", "latitude": "-37.401", "max_wind_speed": "19", "windspeed_knots": "14", "relative_humidity": "42.5", "precipitation": "0.04G", "geohash": "r3371", "air_temperature_celcius": "13"}')
(None, '{"created_time": "2019-05-24 16:02:05", "longitude": "148.042", "sender_id": "1", "latitude": "-37.376", "max_wind_speed": "8", "windspeed_knots": "5.5", "relative_humidity": "43.2", "precipitation": "0.00G", "geohash": "r3372", "air_temperature_celcius": "11"}')

-------------------------------------------
Time: 2019-05-24 16:02:20
-------------------------------------------
(None, '{"created_time": "2019-05-24 16:02:10", "longitude": "141.5355", "sender_id": "1", "latitude": "-37.013", "max_wind_speed": "12", "windspeed_knots": "7.3", "relative_humidity": "44.1", "precipitation": "0.39G", "geohash

-------------------------------------------
Time: 2019-05-24 16:03:50
-------------------------------------------
(None, '{"created_time": "2019-05-24 16:03:40", "longitude": "143.4996", "sender_id": "1", "latitude": "-36.9364", "max_wind_speed": "18.1", "windspeed_knots": "12.3", "relative_humidity": "41.7", "precipitation": "0.00G", "geohash": "r1qjc", "air_temperature_celcius": "14"}')
(None, '{"created_time": "2019-05-24 16:03:45", "longitude": "143.352", "sender_id": "1", "latitude": "-37.477", "max_wind_speed": "13", "windspeed_knots": "6", "relative_humidity": "50.6", "precipitation": "0.00G", "geohash": "r1mfy", "air_temperature_celcius": "18"}')

-------------------------------------------
Time: 2019-05-24 16:04:00
-------------------------------------------
(None, '{"created_time": "2019-05-24 16:03:50", "longitude": "148.104", "sender_id": "1", "latitude": "-37.437", "max_wind_speed": "21", "windspeed_knots": "16.7", "relative_humidity": "40.7", "precipitation": "0.12G", "ge

-------------------------------------------
Time: 2019-05-24 16:05:30
-------------------------------------------
(None, '{"created_time": "2019-05-24 16:05:21", "longitude": "146.7777", "sender_id": "1", "latitude": "-37.1815", "max_wind_speed": "16.9", "windspeed_knots": "10.8", "relative_humidity": "50.4", "precipitation": "0.00I", "geohash": "r32kk", "air_temperature_celcius": "17"}')
(None, '{"latitude": "-36.4489", "longitude": "144.1445", "geohash": "r1w88", "created_time": "2019-05-24 16:05:22", "confidence": "69", "sender_id": "3", "surface_temperature_celcius": "45"}')
(None, '{"created_time": "2019-05-24 16:05:26", "longitude": "143.28", "sender_id": "1", "latitude": "-36.939", "max_wind_speed": "14", "windspeed_knots": "8.5", "relative_humidity": "67.4", "precipitation": "0.63G", "geohash": "r1mvu", "air_temperature_celcius": "20"}')

-------------------------------------------
Time: 2019-05-24 16:05:40
-------------------------------------------
(None, '{"created_time": "2

-------------------------------------------
Time: 2019-05-24 16:07:20
-------------------------------------------
(None, '{"created_time": "2019-05-24 16:07:11", "longitude": "149.331", "sender_id": "1", "latitude": "-37.58", "max_wind_speed": "12", "windspeed_knots": "6", "relative_humidity": "59", "precipitation": "0.00I", "geohash": "r364n", "air_temperature_celcius": "23"}')
(None, '{"created_time": "2019-05-24 16:07:16", "longitude": "148.153", "sender_id": "1", "latitude": "-37.465", "max_wind_speed": "21", "windspeed_knots": "13.1", "relative_humidity": "43.5", "precipitation": "0.24G", "geohash": "r336g", "air_temperature_celcius": "11"}')

-------------------------------------------
Time: 2019-05-24 16:07:30
-------------------------------------------
(None, '{"created_time": "2019-05-24 16:07:21", "longitude": "144.39", "sender_id": "1", "latitude": "-37.288", "max_wind_speed": "13", "windspeed_knots": "6.1", "relative_humidity": "49.9", "precipitation": "0.02G", "geohash": "

-------------------------------------------
Time: 2019-05-24 16:09:00
-------------------------------------------
(None, '{"created_time": "2019-05-24 16:08:51", "longitude": "148.111", "sender_id": "1", "latitude": "-37.453", "max_wind_speed": "16.9", "windspeed_knots": "10.6", "relative_humidity": "45.3", "precipitation": "0.08G", "geohash": "r336f", "air_temperature_celcius": "11"}')
(None, '{"created_time": "2019-05-24 16:08:56", "longitude": "143.428", "sender_id": "1", "latitude": "-37.858", "max_wind_speed": "12", "windspeed_knots": "6", "relative_humidity": "49.5", "precipitation": "0.00G", "geohash": "r1mbx", "air_temperature_celcius": "14"}')
(None, '{"geohash": "r1tcj", "longitude": "143.3479", "sender_id": "2", "latitude": "-36.343", "surface_temperature_celcius": "40", "created_time": "2019-05-24 16:08:59", "confidence": "50"}')



In [11]:
#### Below code is used to test logics, no streaming 


### data for testing purpose
test = [(None, '{"created_time": "2019-05-24 16:05:01", "longitude": "145.2096", "sender_id": "1", "latitude": "-36.1462", "max_wind_speed": "14", "windspeed_knots": "9.7", "relative_humidity": "43.6", "precipitation": "0.20G", "geohash": "r1x62", "air_temperature_celcius": "10"}'),
(None, '{"created_time": "2019-05-24 16:05:06", "longitude": "143.841", "sender_id": "1", "latitude": "-38.167", "max_wind_speed": "11.1", "windspeed_knots": "5.3", "relative_humidity": "58.1", "precipitation": "0.00G", "geohash": "r1qkb", "air_temperature_celcius": "21"}'),
(None, '{"geohash": "r1qkb", "longitude": "143.8311","sender_id": "2", "latitude": "-37.0927", "surface_temperature_celcius": "53", "created_time": "2019-05-24 16:05:07", "confidence": "80"}')]

### same function as above
def sendDataToDB(messages): 
    
    client = MongoClient()
    db = client.fit5148_db
    week11 = db.new # send to this collection
    
    
    all_record = [] # to store the streaming records
    all_producer = {} # to store records from 3 producers
    all_location = {} # to store records from different locations
    
    # for each record in Dstream RDD partition, do the following
    for record in messages:
        data = json.loads(record[1]) # change data to json format 
        all_record.append(data) # add data to a list 
    
    # this part creates a dictionary that is in the form of {producer: [{record}]}
    # have 1 to 3 keys 
    # for each json item in the list, do the following 
    for m in all_record:
        if int(m['sender_id']) not in list(all_producer.keys()):
            all_producer[int(m['sender_id'])] = [m]
        else:
            all_producer[int(m['sender_id'])].append(m) 

            
    # this part joins the streams by geohash. use a dictionary to collect the results in the form of {geogash: [{record}]}
    # have 1 to 4 keys  
    # for each json item in the list, do the following 
    for m in all_record:
        if m['geohash'] not in list(all_location.keys()):
            all_location[m['geohash']] = [m]
        else: 
            all_location[m['geohash']].append(m)
     # when we only receive data from producer 1, send this climate data directly to MongoDB
    if len(set(list(all_producer.keys()))) == 1 and set(list(all_producer.keys())) == set([1]):
        
        for value in all_producer.values():
            
            content_climate = {'latitude':float(value[0]['latitude']), 
                               'longitude':float(value[0]['longitude']), 
                               'air_temperature_celcius':int(value[0]['air_temperature_celcius']), 
                               'relative_humidity':float(value[0]['relative_humidity']),
                               'windspeed_knots':float(value[0]['windspeed_knots']),
                               'max_wind_speed':float(value[0]['max_wind_speed']),
                               'precipitation':value[0]['precipitation'],
                               'datetime': value[0]['created_time']}

            week11.insert_one(content_climate)
   

    # when have records from more than 1 producers 
    elif len(set(all_producer.keys())) > 1:

        # loop through the locations dictionary to check scenearios:
        # 1. if we have only climate data (1 or 2 for the same location)--> insert into MongoDB
        # 2. if we have hotspot data and climate data --> embed hotspot in climate --> insert into MongoDB
        # 3. if we have hotspot data without climate data --> do not store
        # 4. if we have hotspot data with a shared location from TERRA and AQUA & also a matching climate record  
        #    --> average confidence and surface temperature celcius --> embed hotspot in climate --> insert into MongoDB
    
        for value in all_location.values():
            
            # scenario 1: location only has data from producer 1 
            if len(value) == 1 and int(value[0]['sender_id']) == 1:
                
                        content_climate = {'latitude':float(value[0]['latitude']), 
                               'longitude':float(value[0]['longitude']), 
                               'air_temperature_celcius':int(value[0]['air_temperature_celcius']), 
                               'relative_humidity':float(value[0]['relative_humidity']),
                               'windspeed_knots':float(value[0]['windspeed_knots']),
                               'max_wind_speed':float(value[0]['max_wind_speed']),
                               'precipitation':value[0]['precipitation'],
                               'datetime': value[0]['created_time']}
                        
                        week11.insert_one(content_climate)
    
          


            # scenario 2: same location has data from 
            # producer 1,1 || producer 1,2 || producer 1,3 || producer 2,3 (we do not store)
            elif len(value) == 2:

                # records could be in the order of 3,1 or 2,1
                # therefore loop through all the records then determine if we need to embed hotspot into climate
                
                has_climate = False # track if we have climate data 
                has_hotspot = False # track if we have hotspot data 
                has_another_climate = False # track if we have another climate data 


                # for each record in this location, do the following
                for i in range(len(value)):
                    
                    temp = value[i] # a dictionary 
                    
                    # the case which we have only seen one climate data, create a climate content
                    if int(temp['sender_id']) == 1 and has_climate == False:
                        content_climate = {'latitude':float(temp['latitude']), 
                               'longitude':float(temp['longitude']), 
                               'air_temperature_celcius':int(temp['air_temperature_celcius']), 
                               'relative_humidity':float(temp['relative_humidity']),
                               'windspeed_knots':float(temp['windspeed_knots']),
                               'max_wind_speed':float(temp['max_wind_speed']),
                               'precipitation':temp['precipitation'],
                               'datetime': temp['created_time'],
                               'hotspot':[]}


                        has_climate = True # flag that this location has a climate data 

                    # the case which we have two climate data, create a second climate content
                    elif int(temp['sender_id']) == 1 and has_climate == True:

                        content_climate_2 = {'latitude':float(temp['latitude']), 
                                   'longitude':float(temp['longitude']), 
                                   'air_temperature_celcius':int(temp['air_temperature_celcius']), 
                                   'relative_humidity':float(temp['relative_humidity']),
                                   'windspeed_knots':float(temp['windspeed_knots']),
                                   'max_wind_speed':float(temp['max_wind_speed']),
                                   'precipitation':temp['precipitation'],
                                   'datetime': temp['created_time']}

                        has_another_climate = True

                    # create a hotspot content
                    else:

                        content_hotspot = {'latitude':float(temp['latitude']), 
                                                'longitude':float(temp['longitude']), 
                                                'confidence':int(temp['confidence']), 
                                                'time':temp['created_time'].split(' ')[1],
                                                'surface_temperature_celcius':float(temp['surface_temperature_celcius'])
                                               }



                        has_hotspot = True # flag that this location has a hotspot data 


                # perform embedding and insertion into MongoDB when we have both climate data & hotspot data 
                if has_hotspot and has_climate: 
                    content_climate['hotspot'].append(content_hotspot) # embed hotspot into climate
                    
                    try:
                        week11.insert_one(content_climate)
                        print ('success')
                    except Exception as ex:
                        print("Exception Occured. Message: {0}".format(str(ex)))
                
                # when we have 2 climate records 
                elif has_another_climate == True:
                    
                    content_climate.pop('hotspot') # do not need the extra hotspot attribute 
               
                    try:
                        week11.insert_one(content_climate)
                        week11.insert_one(content_climate_2)
                        print ('success')
                    except Exception as ex:
                        print("Exception Occured. Message: {0}".format(str(ex)))



         

            # when we have data from more than 1 producer
            # could have 1,1,2 || 1,1,3 || 1,2,3 || 1,1,2,3 
            # last two cases need to average confidence and air_temperature_celcius for both hotspots
            if len(value) > 2: 
                
                has_climate = False
                has_hotspot = False
                has_another_climate = False
        

                # for each record in this location, do the following
                for i in range(len(value)):
                    
                    temp = value[i] # a dictionary 

                    # the case which we have only seen one climate data, create a climate content
                    if int(temp['sender_id']) == 1 and has_climate == False:

                        content_climate = {'latitude':float(temp['latitude']), 
                                   'longitude':float(temp['longitude']), 
                                   'air_temperature_celcius':int(temp['air_temperature_celcius']), 
                                   'relative_humidity':float(temp['relative_humidity']),
                                   'windspeed_knots':float(temp['windspeed_knots']),
                                   'max_wind_speed':float(temp['max_wind_speed']),
                                   'precipitation':temp['precipitation'],
                                   'datetime': temp['created_time'],
                                   'hotspot':[]}


                        has_climate = True

                    # the case which we have two climate data, create a second climate content
                    elif int(temp['sender_id']) == 1 and has_climate == True:

                        content_climate_2 = {'latitude':float(temp['latitude']), 
                                   'longitude':float(temp['longitude']), 
                                   'air_temperature_celcius':int(temp['air_temperature_celcius']), 
                                   'relative_humidity':float(temp['relative_humidity']),
                                   'windspeed_knots':float(temp['windspeed_knots']),
                                   'max_wind_speed':float(temp['max_wind_speed']),
                                   'precipitation':temp['precipitation'],
                                   'datetime': temp['created_time'],
                                   'hotspot':[]}

                        has_another_climate = True

                    
                    # the case which we have hotspot data, create hotspot content for potential embedding
                    else:


                        # when there's hotspot exists for the same location from a different producer 
                        if has_hotspot: 

                            content_hotspot['confidence'] = (content_hotspot['confidence'] + float(temp['confidence'])) / 2
                            content_hotspot['surface_temperature_celcius'] = (content_hotspot['surface_temperature_celcius'] \
                                                                              + float(temp['surface_temperature_celcius'])) / 2


                        # when there's no hotspot exists from either 2 or 3 
                        else: 
                            content_hotspot = {'latitude':float(temp['latitude']), 
                                                'longitude':float(temp['longitude']), 
                                                'confidence':int(temp['confidence']), 
                                                'time':temp['created_time'].split(' ')[1],
                                                'surface_temperature_celcius':float(temp['surface_temperature_celcius'])
                                               }



                        has_hotspot = True 


                # insert two records when we have 1,1,2 or 1,1,3 or 1,1,2,3
                if has_another_climate == True:
                    content_climate['hotspot'].append(content_hotspot) # embed hotspot into climate record 1
                    content_climate_2['hotspot'].append(content_hotspot) # embed hotspot into climate record 2
                    try:
                        week11.insert_one(content_climate)
                        week11.insert_one(content_climate_2)
                        print ('success')
                    except Exception as ex:
                        print("Exception Occured. Message: {0}".format(str(ex)))

                # insert one record when we have 1,2,3
                else:
                    content_climate['hotspot'].append(content_hotspot) # embed hotspot into climate
                    try:
                        week11.insert_one(content_climate)
                        print ('success')
                    except Exception as ex:
                        print("Exception Occured. Message: {0}".format(str(ex)))


                
        client.close()

        
        
# apply the function on test data
sendDataToDB(test)

success


In [10]:
#### results of running sendDataToDB(test)



# { "_id" : ObjectId("5ce78b219343692aec31d045"), 
#      "hotspot" : [ { "surface_temperature_celcius" : 53, 
#                     "longitude" : 143.8311, 
#                     "confidence" : 80, 
#                     "latitude" : -37.0927,
#                     "time": "16:05:07"} ], 
#      "max_wind_speed" : 11.1, 
#      "air_temperature_celcius" : 21, 
#      "windspeed_knots" : 5.3, 
#      "datetime" : "2019-05-24 16:05:06", 
#      "longitude" : 143.841, 
#      "relative_humidity" : 58.1, "precipitation" : 
#      "0.00G", "latitude" : -38.167 }

# { "_id" : ObjectId("5ce78b219343692aec31d046"), 
#  "max_wind_speed" : 14, 
#  "air_temperature_celcius" : 10, 
#  "windspeed_knots" : 9.7, 
#  "datetime" : "2019-05-24 16:05:01", 
#  "longitude" : 145.2096, 
#  "relative_humidity" : 43.6, 
#  "precipitation" : "0.20G",
#  "latitude" : -36.1462 }


In [None]:
# from pprint import pprint 
# client = MongoClient()
# db = client.fit5148_db
# week11 = db.new

# for record in week11.find():
#     pprint (record)
#     break