# Stream Processing using Apache Spark Streaming
## (a) Streaming application
### Importing library

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import sys
import time
import json
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

import pygeohash as gh     # to calculat geo-hash value
import pprint

### Merging data
* Merge data coming from three different stream based on following conditions:
  * Join the streams based on the location (i.e. latitude and longitude)
and create the data model developed in Task A.
  * Find if two locations are close to each other or not. You can do this
by implementing the geo-hashing algorithm or find the library that
does the job for you. Use precision 5. The precision determines the
number of characters in the Geohash.
  * If we receive the data from two different satellites AQUA and
TERRA for the same location, then average the ‘surface
temperature’ and ‘confidence’.
  * If the streaming application has the data from only one producer
(Producer 1), it implies that there was no fire at that time and we can
store the climate data into MongoDB straight away

In [2]:
'''
This function is used to merge the data coming from three different Kafka producers
'''
def merge_data(batch,db):
    climate_list = []
    climate_coll = db['ch']
    hotspot_coll = db['hh']
    aqua = None
    terra = None
    for d in batch:
        d_copy = json.loads(d[1]).copy()
        points = d_copy['location']['coordinates']
        
        # gets geo-hash string value with precision 5
        geo_hash =  gh.encode(points[0],points[1],precision = 5)
        d_copy['geo-hash'] = geo_hash
        
        # identify the data source and segregate based on that
        if d[0]=='climate':
            climate_list.append(d_copy)
        
        elif d[0]=='AQUA':
            aqua = d_copy.copy()

        else:
            terra = d_copy.copy()
    
    # if there is no data stream from source aqua and terra
    if not aqua and not terra:
        if climate_list:
            climate_coll.insert_many(climate_list, ordered = False)
            print("Inserted climate list1 {}\n\n".format(climate_list))
        return
    
    sat_dict = None
    
    # if data stream is available for both aqua and terra
    if aqua and terra:
      
        # if their geo-hash value matches, merge the data
        if aqua['geo-hash'] == terra['geo-hash']:
            sat_dict = {}
            sat_dict['confidence'] = (aqua['confidence'] + terra['confidence'])/2
            sat_dict['surface_temperature'] = (aqua['surface_temperature'] + terra['surface_temperature'])/2
            sat_dict['location'] = {"coordinates": [gh.decode(aqua['geo-hash'])[0], gh.decode(aqua['geo-hash'])[1]], "type": "Point"}
            merge_id = hotspot_coll.insert_one(sat_dict).inserted_id
            print("Document inserted common {}\n\n".format(merge_id))
    
    # if only aqua data present
    elif aqua:
        del aqua['geo-hash']
        aqua_id = hotspot_coll.insert_one(aqua).inserted_id
        print("Document inserted aqua {}\n\n".format(aqua_id))
    # if only terra data is present
    else:
        del terra['geo-hash']
        terra_id = hotspot_coll.insert_one(terra).inserted_id
        print("Document inserted terra {}\n\n".format(terra_id))
    
    # merging with climate streaming data.
    if sat_dict:
        hash_value = sat_dict['geo-hash']
        for c in climate_list:
            if c['geo-hash'] == hash_value:
                c['surface_temperaure'] = sat_dict['surface_temperature']
                c['confidence'] = sat_dict['confidence']
                c['hotspot_data'] = [merge_id]
    else:
        for c in climate_list:
            if c['geo-hash'] == aqua['geo-hash']:
                c['surface_temperaure'] = aqua['surface_temperature']
                c['confidence'] = aqua['confidence']
                c['hotspot_data'] = [aqua_id]
            
            elif c['geo-hash'] == terra['geo-hash']:
                c['surface_temperaure'] = terra['surface_temperature']
                c['confidence'] = terra['confidence']
                c['hotspot_data'] = [terra_id]
    if climate_list:
        climate_coll.insert_many(climate_list, ordered = False)
        print("Inserted climate list2 {}\n\n".format(climate_list))

In [3]:
'''
This function is used to consume data from kafka producer and process using
Apache Spark data stream processing and then finally inserting it into mongoDB
'''
def sendDataToDB(iter_):
    client = MongoClient()
    db = client['fit5148_assignment_db']
    try:
        merge_data(iter_,db)
    except Exception as ex:
        print("Exception Occured. Message: {0}".format(str(ex)))
            
    client.close()

# setting batch interval to 10 seconds
batch_interval = 10
topic = "climate"

# setting local streaming context with two execution threads
conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
    

sc.setLogLevel("WARN")
ssc = StreamingContext(sc, batch_interval)
topic_list = [topic,"AQUA","TERRA"]
kafkaStream = KafkaUtils.createDirectStream(ssc,topic_list , {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'week11-group', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary

lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))
print(lines)
ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

None


In [11]:
ssc.stop(stopSparkContext=True,stopGraceFully=True)
sc.stop()