# Stream Processing using Apache Spark Streaming

Submitted by : Mayank Bhardwaj, Neha Jain, Rishupal Singh Chabbra

Below is the implementation of streaming application in Apache Spark Streaming which has a local streaming context with two execution threads and a batch interval of 10 seconds. The streaming application will receive streaming data from all three producers designed Producer 1, Producer 2 and Producer 3.

After receiving the data from three producers, 
* if we have data from atleast P1, we join it with the incoming streams of P2 and P3 based on the locations. 
* Additionally, geohash function is applied on the longitude and latitude values. 
* If the data from P2 and P3 is present, the average of both the values is taken. 
* Data for both P1 and (P2+P3) stored in the database using the data model that was created as a part Task A(referencing model). 
* If P2 and P3 both do not exists, P1 stream data is sent to databases 

#### Topic "climate" is used for the Kafka stream similar to producers

In [None]:
# import libraries
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import sys
import time
import json
from pymongo import MongoClient
import pymongo
from pprint import pprint
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import geohash as gh

# joining two json records based on longitude and latitude
def joinjson(one, two ):
    result = []
    match = False

    if two['latitude'] == one['latitude'] and two['longitude'] == one['longitude']:
            match = True
               
    return match


# loading the hotspot data to MongoDB after processing
def sendDataToDB_hotspot(data):
    client = MongoClient()
    db = client.fit5148_assignment_db
    hotspot = db.hotspot
        
    y = hotspot.count()

    index = str(y+1)
    data = json.dumps(data)
    data = json.loads(data)
    jsonData = {}
    jsonData["_id"] = index
    jsonData['latitude']= data['latitude']
    jsonData['longitude']= data['longitude']
    jsonData['datetime']=data['created_time']
    jsonData['confidence']= data['confidence']
    jsonData['surface_temperature_celcius']= data['surface_temperature_celcius']
    jsonData['climate'] = data['climate']

    try:
        hotspot.replace_one({"_id":index}, jsonData, True)
        
    except Exception as ex:
        print("Exception Occured. Message: {0}".format(str(ex)))
    client.close()

# loading the climate data to MongoDB after processing
def sendDataToDB_climate(data):
    client = MongoClient()
    db = client.fit5148_assignment_db
    climate = db.climate
    
    y = climate.count()

    index = str(y+1)
    data = json.dumps(data)
    data = json.loads(data)
    jsonData = {}
    jsonData["_id"] = index
    jsonData['latitude']= data['latitude']
    jsonData['longitude']= data['longitude']
    jsonData['air_temperature_celcius']=data['air_temperature_celcius']
    jsonData['relative_humidity']= data['relative_humidity']
    jsonData['windspeed_knots']= data['windspeed_knots']
    jsonData['max_wind_speed']=data['max_wind_speed']
    jsonData['precipitation']=data['precipitation']
    jsonData['sender_id']= data['sender_id']
    jsonData['datetime'] = data['created_time']

    try:
        climate.replace_one({"_id":index}, jsonData, True)
    
    except Exception as ex:
        print("Exception Occured. Message: {0}".format(str(ex)))
    client.close()
    return index

# processing the stream of the data
def processStream(iter):

    joinresultP2 = []
    P1 = []       
    P2 = []
    P3 = []
    for message in iter: # iteratig over all the records in the streams
        message = message[1]
        if 'P1' in message:
            P1.append(message)
        if 'P2' in message:
            P2.append(message)
        if 'P3' in message:
            P3.append(message)

    if (len(P1) >= 1):
        # if P1 exists, send to database
        for each in P1:
            each = json.loads(each)
            lat = float(each["latitude"])
            lon = float(each["longitude"])
            each["geohash"] = gh.encode(lat, lon, precision = 5)
            index = sendDataToDB_climate(each)
            
            if (len(P2) >= 1):
                # if P2 exists, join with P1
                for each1 in P2:
                    each1 = json.loads(each1)
                    match = joinjson(each, each1)
                    if match:
                        each1['climate'] = index
                        joinresultP2.append(each1)
                           
            if(len(P3) >= 1):
                # if P3 exists, join with P1
                for each1 in P3:
                    each1 = json.loads(each1)
                    match = joinjson(each, each1)
                    if match:
                        each1['climate'] = index
                        joinresultP2.append(each1)
                                              
        # if both P2 and P3 exists, take average
        if(len(joinresultP2) > 1):
            l = len(joinresultP2)
            surface = 0
            conf = 0
            for i in range(l):
                record = joinresultP2[i]
                surface = surface + float(record["surface_temperature_celcius"])
                conf = conf + float(record['confidence'])
            
            surface = surface/l
            conf = conf/l
            
            record = joinresultP2[0]
            record["surface_temperature_celcius"] = str(surface)
            record['confidence'] = str(conf)
        
            joinresultP2 = [record]
            
        for each in joinresultP2:
            sendDataToDB_hotspot(each)

            
#batch interval of 10 seconds
n_secs = 10
topic = 'climate' # topic
# Spark ocntext with 2 threads
conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]").set("spark.streaming.concurrentJobs", "3")
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)

# Kafka object to fetch data in streams
kafkaStream1 = KafkaUtils.createDirectStream(ssc, [topic],{
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})

# processing for each batch
lines1 = kafkaStream1.foreachRDD(lambda rdd: rdd.foreachPartition(processStream))

ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)