In [1]:
# In streaming and storing the data in mongo db. We have considered the following scenarios:
# 1. If Data received from climate + aqua + terra join the streams based on geo_hash match and send the climate data to climate collection
# and calculate average of aqua and terra and then send corresponding record to hotspot collection
# 2. If Data received from climate then send directly to climate collection
# 3. If Data received from climate + aqua then send climate to climate collection and aqua to hotspot respectively
# 4. If Data received from climate + terra the send climate to climate collection and terra to hotspot collection respectively
# 5. If Data received from aqua + terra then calculate average of surface temperate and confidence and sent to hotspot collection
# 6. If Data receive from climate + aqua + terra and no geo_hash match between all three then check match for climate+aqua or climate+terra or aqua+ terra
# and process the data and send to database

import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import pygeohash as gh
import sys
import time
import json
from datetime import datetime
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pymongo import MongoClient

#Adding geo_hash with precision 5
def add_geo_hash(record):
    geo_hash = gh.encode(record['latitude'], record['longitude'], precision=3)
    record['geo_hash'] = geo_hash
    return record

# Filtering Climate data based on sender_id
def filterClimateData(iter):
    if "climate" in iter['sender_id']:
        return iter

#Filtering Aqua data based on sender_id
def filterAquaData(iter):
    if "aqua" in iter['sender_id']:
        return iter

#Filtering Terra data based on sender_id
def filterTerraData(iter):
    if "terra" in iter['sender_id']:
        return iter

#Convert the records to create a data model for climate collection
def convertToDbModelClimate(records):
    data_list = []
    for each in records:
        data = {}
        data['climate_date']= datetime.strptime(each['created_time'], '%d-%m-%Y %H:%M:%S').date().strftime('%d/%m/%Y')
        data['air_temperature_celcius']=each['air_temperature_celcius']
        data['relative_humidity']=each['relative_humidity']
        data['windspeed_knots']=each['windspeed_knots']
        data['max_wind_speed'] = each['max_wind_speed']
        data['precipitation'] = each['precipitation ']
        data_list.append(data)

    return data_list


#Convert the records to create a data model for hotspot collection
def convertToDbModelHotspot(records):
    data_list = []
    for each in records:
        data = {}
        data['hotspot_id'] = datetime.strptime(each['created_time'], '%d-%m-%Y %H:%M:%S').date().strftime('%d/%m/%Y')
        data['latitude'] = each['latitude']
        data['longitude'] = each['longitude']
        data['datetime'] = each['created_time']
        data['confidence'] = each['confidence']
        data['surface_temperature_celcius'] = each['surface_temperature_celcius']
        data_list.append(data)
    return data_list


#Check whether two records match with geo_hash
def check_for_location_Match(T1, T2):
    print("Checking for location match")
    result = []
    for tr1 in T1:
        # For each record of S
        for tr2 in T2:
            #If matched Then
            if (tr1['geo_hash'] == tr2['geo_hash']):
                result.append((tr1,tr2))
    return result



#Calculate the average values of confidence and surface temperature for terra and aqua
def averageValues(records):
    record = records[0]
    conf_sum = 0
    surf_temp_sum = 0
    data = {}
    for each in records:
        conf_sum += each['confidence']
        surf_temp_sum += each['surface_temperature_celcius']
    data['hotspot_id']= datetime.strptime(record['created_time'], '%d-%m-%Y %H:%M:%S').date().strftime('%d/%m/%Y')
    data['latitude']=record['latitude']
    data['longitude']=record['longitude']
    data['datetime']=record['created_time']
    data['confidence'] = conf_sum/len(records)
    data['surface_temperature_celcius'] = surf_temp_sum/len(records)
    return data


# Join the records when received from climate, aqua and tera with same geo_hash
def joinAllAndSendToDb(iter):
    client = MongoClient()
    db = client.fit5148_assignment_db
    week11 = db.climate
    week12 = db.hotspot
    for record in iter:
        print("joined records::",record)
        joined_records = record[1]         # retrieve all joined records
        terra_data = joined_records[1]      # retrieve terra data records
        for each in joined_records[0]:
            if 'climate' in each['sender_id']:      #retrive climate data records and send to climate collection
                climate_data = convertToDbModelClimate([each])
                insert_into_db(climate_data, week11)
            elif 'aqua' in each['sender_id']:
                avg_values = averageValues((each,terra_data)) # calcuate average values
                insert_into_db([avg_values], week12)

    client.close()

#retrieve all the records a batch of rdd
def sendDataToDB(iter):
    client = MongoClient()
    db = client.fit5148_assignment_db     #database name
    week11 = db.climate     # collection name for climate data
    week12 = db.hotspot     # collection name for hotspot data
    data_sep = {}
    for record in iter:
        # segregate the data based on sender id
        if "climate" in record['sender_id']:
            if "climate" not in data_sep:
                data_sep['climate'] = [record]
            else:
                cl_list = data_sep['climate']
                cl_list.append(record)
                data_sep['climate'] = cl_list
        if "aqua" in record['sender_id']:
            if "aqua" not in data_sep:
                data_sep['aqua'] = [record]
            else:
                cl_list = data_sep['aqua']
                cl_list.append(record)
                data_sep['aqua'] = cl_list
        if "terra" in record['sender_id']:
            if "terra" not in data_sep:
                data_sep['terra'] = [record]
            else:
                cl_list = data_sep['terra']
                cl_list.append(record)
                data_sep['terra'] = cl_list
    print("#####data_separator#########")
    print(data_sep)
    # If only climate data is present is the rdd
    if(("climate" in data_sep) and ("aqua" not in data_sep) and ("terra" not in data_sep)):
        print("Data receive has only climate data")
        json_data = convertToDbModelClimate(data_sep["climate"])
        insert_into_db(json_data, week11)

    # If climate and aqua data is received in rdd
    elif(("climate" in data_sep) and ("aqua" in data_sep) and ("terra" not in data_sep)):
        print("Data received is climate and Aqua")
        match_records = check_for_location_Match(data_sep['climate'], data_sep['aqua'])
        if len(match_records) != 0:
            print("matched following records for climate and aqua::",match_records)
            for each in match_records:
                for each_tup in each:
                    if 'climate' in each_tup['sender_id']:
                        climate_data = convertToDbModelClimate([each_tup]) # convert into climate data model and send to db
                        insert_into_db(climate_data, week11)
                    else:
                        aqua_data = convertToDbModelHotspot([each_tup])  # convert into hotspot data model and send to db
                        insert_into_db(aqua_data, week12)

    # If climate and terra data is received in rdd
    elif (("climate" in data_sep) and ("terra" in data_sep) and ("aqua" not in data_sep)):
        print("Data received is climate and Terra")
        match_records = check_for_location_Match(data_sep['climate'], data_sep['terra'])
        if len(match_records) != 0:
            print("matched following records for climate and terra::", match_records)
            for each in match_records:
                for each_tup in each:
                    if 'climate' in each_tup['sender_id']:
                        climate_data = convertToDbModelClimate([each_tup])
                        insert_into_db(climate_data, week11)
                    else:
                        terra_data = convertToDbModelHotspot([each_tup])
                        insert_into_db(terra_data, week12)

    # If aqua and terra data received in rdd
    elif (("climate" not in data_sep) and ("terra" in data_sep) and ("aqua" in data_sep)):
        print("Data received is Terra and Aqua")
        match_records = check_for_location_Match(data_sep['terra'], data_sep['aqua'])
        if len(match_records) != 0:
            print("matched following records for aqua and terra::", match_records)
            for each in match_records:
                avg_records = averageValues(each)
                insert_into_db([avg_records], week12)

    # If climate terra and aqua all three received in rdd
    elif (("climate" in data_sep) and ("terra" in data_sep) and ("aqua" in data_sep)):
        print("Data received for climate, aqua and tera")
        match_records1 = check_for_location_Match(data_sep['climate'], data_sep['aqua']) #match records for climate and aqua
        if len(match_records1) != 0:
            print("matched following records for climate and aqua::",match_records1)
            for each in match_records1:
                for each_tup in each:
                    if 'climate' in each_tup['sender_id']:
                        climate_data = convertToDbModelClimate([each_tup])
                        insert_into_db(climate_data, week11)
                    else:
                        aqua_data = convertToDbModelHotspot([each_tup])
                        insert_into_db(aqua_data, week12)
        match_records2 = check_for_location_Match(data_sep['climate'], data_sep['terra']) # match records for climate and terra
        if len(match_records2) != 0:
            print("matched following records for climate and terra::", match_records2)
            for each in match_records2:
                for each_tup in each:
                    if 'climate' in each_tup['sender_id']:
                        climate_data = convertToDbModelClimate([each_tup])
                        insert_into_db(climate_data, week11)
                    else:
                        terra_data = convertToDbModelHotspot([each_tup])
                        insert_into_db(terra_data, week12)

        match_records3 = check_for_location_Match(data_sep['aqua'], data_sep['terra']) # match records for aqua and terra
        if len(match_records3) != 0:
            print("matched following records for aqua and terra::", match_records3)
            for each in match_records3:
                avg_records = averageValues(each)
                insert_into_db([avg_records], week12)


    client.close()


def insert_into_db(json_data, week):
    try:
        for j_data in json_data:
            print("json_data received::", j_data)
            week.insert(j_data)
    except Exception as ex:
        print("Exception Occured. Message: {0}".format(str(ex)))


batch_interval = 10
topic_1 = "climateHotspotData"

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, batch_interval)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic_1], {
    'bootstrap.servers': '127.0.0.1:9092',
    'group.id': 'spotFire-group1',
    'fetch.message.max.bytes': '15728640',
    'auto.offset.reset': 'largest'})
# Group ID is completely arbitrary

#Converting incoming stream to json and adding geohash
kvs = kafkaStream.map(lambda x: json.loads(x[1])).map(add_geo_hash)
kvs.pprint()
lines = kvs.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))

#filtering data based on sender_id
kvs_climate = kvs.filter(filterClimateData).map(lambda x : (x['geo_hash'], x))
kvs_aqua = kvs.filter(filterAquaData).map(lambda x : (x['geo_hash'], x))
kvs_terra = kvs.filter(filterTerraData).map(lambda x : (x['geo_hash'], x))

#joining climate and aqua and terra on the basis of geo_hash
kvs_jn_all = kvs_climate.join(kvs_aqua).join(kvs_terra)
kvs_jn_all.pprint()
kvs_jn_all.foreachRDD(lambda rdd: rdd.foreachPartition(joinAllAndSendToDb))



ssc.start()
time.sleep(600)  # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True, stopGraceFully=True)

-------------------------------------------
Time: 2019-11-14 22:48:20
-------------------------------------------
{'latitude': -36.3774, 'longitude': 143.7079, 'confidence': 91.0, 'surface_temperature_celcius': 68.0, 'created_time': '14-11-2019 22:48:15', 'sender_id': 'a4a45bce242846678985c368f9929965terra', 'geo_hash': 'r1w'}
{'latitude': -37.453, 'longitude': 148.099, 'air_temperature_celcius': 10, 'relative_humidity': 45.7, 'windspeed_knots': 3.6, 'max_wind_speed': 7.0, 'precipitation ': ' 0.01G', 'created_time': '14-11-2019 22:48:15', 'sender_id': '823d2f70dd754ccfa41ab48515635305climate', 'geo_hash': 'r33'}
{'latitude': -36.4466, 'longitude': 141.2471, 'confidence': 51.0, 'surface_temperature_celcius': 40.0, 'created_time': '14-11-2019 22:48:16', 'sender_id': '710eda3f5b32411fb607a1a69a182c69aqua', 'geo_hash': 'r1s'}

-------------------------------------------
Time: 2019-11-14 22:48:20
-------------------------------------------

-------------------------------------------
Time: 

-------------------------------------------
Time: 2019-11-14 22:49:10
-------------------------------------------

-------------------------------------------
Time: 2019-11-14 22:49:20
-------------------------------------------
{'latitude': -36.8202, 'longitude': 141.802, 'confidence': 80.0, 'surface_temperature_celcius': 54.0, 'created_time': '14-11-2019 22:49:12', 'sender_id': 'a4a45bce242846678985c368f9929965terra', 'geo_hash': 'r1k'}
{'latitude': -37.758, 'longitude': 144.693, 'air_temperature_celcius': 20, 'relative_humidity': 58.8, 'windspeed_knots': 11.5, 'max_wind_speed': 15.9, 'precipitation ': ' 0.00I', 'created_time': '14-11-2019 22:49:12', 'sender_id': '823d2f70dd754ccfa41ab48515635305climate', 'geo_hash': 'r1q'}
{'latitude': -37.5135, 'longitude': 142.7238, 'confidence': 93.0, 'surface_temperature_celcius': 72.0, 'created_time': '14-11-2019 22:49:13', 'sender_id': '710eda3f5b32411fb607a1a69a182c69aqua', 'geo_hash': 'r1m'}
{'latitude': -36.369, 'longitude': 143.7132, 'air_

-------------------------------------------
Time: 2019-11-14 22:50:10
-------------------------------------------

-------------------------------------------
Time: 2019-11-14 22:50:20
-------------------------------------------
{'latitude': -36.1104, 'longitude': 145.9829, 'confidence': 90.0, 'surface_temperature_celcius': 76.0, 'created_time': '14-11-2019 22:50:10', 'sender_id': '710eda3f5b32411fb607a1a69a182c69aqua', 'geo_hash': 'r1x'}
{'latitude': -37.462, 'longitude': 148.089, 'air_temperature_celcius': 8, 'relative_humidity': 40.7, 'windspeed_knots': 5.3, 'max_wind_speed': 12.0, 'precipitation ': ' 0.00G', 'created_time': '14-11-2019 22:50:13', 'sender_id': '823d2f70dd754ccfa41ab48515635305climate', 'geo_hash': 'r33'}
{'latitude': -37.333, 'longitude': 148.099, 'confidence': 94.0, 'surface_temperature_celcius': 43.0, 'created_time': '14-11-2019 22:50:17', 'sender_id': 'a4a45bce242846678985c368f9929965terra', 'geo_hash': 'r33'}
{'latitude': -35.6922, 'longitude': 143.1619, 'confid

-------------------------------------------
Time: 2019-11-14 22:51:20
-------------------------------------------

-------------------------------------------
Time: 2019-11-14 22:51:30
-------------------------------------------
{'latitude': -37.336, 'longitude': 148.073, 'air_temperature_celcius': 7, 'relative_humidity': 40.5, 'windspeed_knots': 8.1, 'max_wind_speed': 15.0, 'precipitation ': ' 0.12G', 'created_time': '14-11-2019 22:51:20', 'sender_id': '823d2f70dd754ccfa41ab48515635305climate', 'geo_hash': 'r33'}
{'latitude': -36.4061, 'longitude': 143.0987, 'confidence': 64.0, 'surface_temperature_celcius': 42.0, 'created_time': '14-11-2019 22:51:21', 'sender_id': '710eda3f5b32411fb607a1a69a182c69aqua', 'geo_hash': 'r1t'}
{'latitude': -36.2669, 'longitude': 143.1906, 'confidence': 61.0, 'surface_temperature_celcius': 45.0, 'created_time': '14-11-2019 22:51:22', 'sender_id': 'a4a45bce242846678985c368f9929965terra', 'geo_hash': 'r1t'}
{'latitude': -37.719, 'longitude': 142.154, 'air_te

-------------------------------------------
Time: 2019-11-14 22:52:20
-------------------------------------------

-------------------------------------------
Time: 2019-11-14 22:52:30
-------------------------------------------
{'latitude': -37.382, 'longitude': 149.341, 'air_temperature_celcius': 18, 'relative_humidity': 53.6, 'windspeed_knots': 7.2, 'max_wind_speed': 15.0, 'precipitation ': ' 0.00I', 'created_time': '14-11-2019 22:52:21', 'sender_id': '823d2f70dd754ccfa41ab48515635305climate', 'geo_hash': 'r36'}
{'latitude': -35.1113, 'longitude': 141.1635, 'confidence': 72.0, 'surface_temperature_celcius': 46.0, 'created_time': '14-11-2019 22:52:26', 'sender_id': '710eda3f5b32411fb607a1a69a182c69aqua', 'geo_hash': 'r1u'}
{'latitude': -36.0714, 'longitude': 145.7665, 'air_temperature_celcius': 18, 'relative_humidity': 54.6, 'windspeed_knots': 12.0, 'max_wind_speed': 25.1, 'precipitation ': ' 0.12G', 'created_time': '14-11-2019 22:52:27', 'sender_id': '823d2f70dd754ccfa41ab4851563530

KeyboardInterrupt: 