# Streaming Application

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'


import sys
import time
import json
import pymongo
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import Geohash

def process_stream(iter):
    json_data_list = []
    sender_id_list = []
    client = pymongo.MongoClient("mongodb://localhost:27017/")
    db = client["streamingDataDB"]
    streams = db["streams"]
    input_stream = iter.collect()
    for data in input_stream:
                   
        data_str = json.loads(data[1])
        data_str = data_str.replace("'","\"")
        data_json = json.loads(data_str)

        json_data_list.append(data_json)
        
    for json_data in json_data_list:
        sender_id_list.append(json_data["sender_id"])
    
    #print("Producer ID in RDD ",sender_id_list)
    
    
    if "P01" in sender_id_list:
              
        #case1: when data is received only from station
        if len(set(sender_id_list)) == 1:
            #print("-----CASE 1------")

            #iterating again to insert climate data sent from station(producer 1)
            for json_data in json_data_list:
                json_data["satellite_data"] = []
                streams.insert_one(json_data)           
            
        #case2: when data is received from station and either of the satellite
        if len(set(sender_id_list)) == 2:
            
            #print("-----CASE 2------")
            
            #seperating station and satellite json data and storing it in two lists
            
            json_data_station_list = []
            json_data_satellite_list = []
            
            for json_data in json_data_list:
                if json_data["sender_id"] == "P01":
                    json_data_station_list.append(json_data)
                else:
                    json_data_satellite_list.append(json_data)
            
            #print("Station list-----",json_data_station_list)
            #print("Satellite list-----",json_data_satellite_list)
            
            
            #iterating again to join station and satellite streams if they are close to each other
            
            for json_data_station in json_data_station_list:
                json_data_station["satellite_data"] = []
                for json_data_satellite in json_data_satellite_list:   
                    
                    station_hash = Geohash.encode(float(json_data_station["latitude"]),float(json_data_station["longitude"]),precision = 5)
                    satellite_hash = Geohash.encode(float(json_data_satellite["latitude"]),float(json_data_satellite["longitude"]),precision = 5)                    
                    
                    if(station_hash == satellite_hash):
                        #print("-------HASH EQUAL-------")
                        json_data_station["satellite_data"].append(json_data_satellite)
                #print("-----FINAL STATION DATA---",json_data_station)
                streams.insert_one(json_data_station)
            
        #case3: when data is received from station and both satellites
        
        if len(set(sender_id_list)) == 3:
            
            #print("-----CASE 3------")
            #seperating station and satellite json data and storing it their separate lists
            
            json_data_station_list = []
            json_data_satellite_aqua_list = []
            json_data_satellite_terra_list = []
            
            for json_data in json_data_list:
                if json_data["sender_id"] == "P01":
                    json_data_station_list.append(json_data)
                elif json_data["sender_id"] == "P02":
                    json_data_satellite_aqua_list.append(json_data)
                elif json_data["sender_id"] == "P03":
                    json_data_satellite_terra_list.append(json_data)
            
            
            #generating combined satellite list
                                
            #print("Station list-----",json_data_station_list)
            #print("Satellite AQUA list-----",json_data_satellite_aqua_list)
            #print("Satellite TERRA list-----",json_data_satellite_terra_list)
            
            list_terra_match = []
            list_aqua_terra_combined = []
            
            for index_aqua in range(len(json_data_satellite_aqua_list)):
                match_counter = 1
                for index_terra in range(len(json_data_satellite_terra_list)):
                    aqua_element = json_data_satellite_aqua_list[index_aqua]
                    terra_element = json_data_satellite_terra_list[index_terra]
                    
                    aqua_hash = Geohash.encode(float(aqua_element["latitude"]),float(aqua_element["longitude"]),precision = 5)
                    terra_hash = Geohash.encode(float(terra_element["latitude"]),float(terra_element["longitude"]),precision = 5)                    
                    
                    if(aqua_hash == terra_hash):
                        #print("SATELLITE HASH MATCHED---")
                        aqua_element["surface_temperature_celcius"] = str((match_counter * int(aqua_element["surface_temperature_celcius"]) + int(terra_element["surface_temperature_celcius"]))//(match_counter + 1))
                        aqua_element["confidence"] = str((match_counter * int(aqua_element["confidence"]) + int(terra_element["confidence"]))//(match_counter + 1))
                        
                        match_counter = match_counter + 1
                        
                        list_terra_match.append(index_terra)
                    
                list_aqua_terra_combined.append(aqua_element)
                
                
            #checking for unmatched json data in terra json list and adding it to the combined list
            for index_terra in range(len(json_data_satellite_terra_list)):
                if index_terra not in list_terra_match:
                    list_aqua_terra_combined.append(json_data_satellite_terra_list[index_terra])
                                                                        
            #print("TERRA MATCH LIST--",  list_terra_match)   
            #print("MERGED LIST--",  list_aqua_terra_combined)
            
            
            
            
            #iterating again to join station and combined satellite streams if they are close to each other
            
            for json_data_station in json_data_station_list:
                json_data_station["satellite_data"] = []
                for json_data_satellite in list_aqua_terra_combined:                       
                    station_hash = Geohash.encode(float(json_data_station["latitude"]),float(json_data_station["longitude"]),precision = 5)
                    satellite_hash = Geohash.encode(float(json_data_satellite["latitude"]),float(json_data_satellite["longitude"]),precision = 5)                    

                    if(station_hash == satellite_hash):
                        #print("-------HASH EQUAL-------")
                        json_data_station["satellite_data"].append(json_data_satellite)
                #print("-----FINAL STATION DATA---",json_data_station)
                streams.insert_one(json_data_station)
                                                                             
    
         
n_secs = 10
topic = "AssignmentTaskC"

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
    
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'taskc-group', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary


lines = kafkaStream.foreachRDD(process_stream)


ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
ssc.stop(stopSparkContext=True,stopGraceFully=True)
