In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'


import sys
import time
import json
import pygeohash as gh
from pprint import pprint
from pymongo import MongoClient
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

def join(t1,t2): # Define a "join" method for calling in function "sendDataToDB"
    t2.pop('geohash') # reduce the repeating  
    t2.pop('fire')
    t2.pop('latitude')
    t2.pop('longitude')
    t1["fire_data"].append(t2) #append the second dict to list so that form the embeded data model
    t1["fire"] = 'true' # set True because once join with data from producer2 or producer3, it means there is a fire
    return t1

def sendDataToDB(iter):
    client = MongoClient()
    db = client.fit5148_assignment_db # Using fit5148_assignment_db in mongoDB
    all_data = db.all_data  #Set 3 collections, all_data is for storing all data
    fire_data = db.fire_data  #fire_data is for storing the fire record in mongoDB
    join_data = db.join_data  #join_data is for storing the join results(once there is join result) in mongoDB
    producer1 = []  # Set lists to store the data from producer1,2,3 
    producer2 = []
    producer3 = []
    fire_list = [] #fire_list is for storing the fire record
    join_list = [] #join_list is for storing join result
    for record in iter: 
        Data = json.loads(record[1])
        jsonData = {}
        jsonData["sender_id"] = Data.get("sender_id")
        jsonData["time"] = Data.get("create_time")
        
        # This section is used for reading data from kafak producer
        
        if jsonData["sender_id"] == "Producer1":  
            jsonData["precipitation"] = Data.get("data").get("precipitation ")
            jsonData["relative_humidity"] = Data.get("data").get("relative_humidity")
            jsonData["max_wind_speed"] = Data.get("data").get("max_wind_speed")
            jsonData["longitude"] = Data.get("data").get("longitude")
            jsonData["windspeed_knots"] = Data.get("data").get("windspeed_knots")
            jsonData["latitude"] = Data.get("data").get("latitude")
            jsonData["air_temperature_celcius"] = Data.get("data").get("air_temperature_celcius")
            jsonData["geohash"] = gh.encode(jsonData["longitude"],jsonData["latitude"],precision=5)
            jsonData["fire_data"] = []
            jsonData["fire"]="false"
        else:
            jsonData["surface_temperature_celcius"] = Data.get("data").get("surface_temperature_celcius")
            jsonData["latitude"] = Data.get("data").get("latitude")
            jsonData["confidence"] = Data.get("data").get("confidence")
            jsonData["longitude"] = Data.get("data").get("longitude")
            jsonData["geohash"] = gh.encode(jsonData["longitude"],jsonData["latitude"],precision=5)
            jsonData["fire"]="true"
        # Because data from different producer, it will have different document, so we consider 2 situations
        if jsonData["sender_id"] == "Producer1":
            producer1.append(jsonData)
        if jsonData["sender_id"] == "Producer2":
            producer2.append(jsonData)
            fire1 = jsonData.copy()
            fire_list.append(fire1)
        if jsonData["sender_id"] == "Producer3":
            producer3.append(jsonData) 
            fire1 = jsonData.copy() 
            fire_list.append(fire1)
            
        # This is used to store data by the different producer
        # we copy the fire record to the fire_list because we don't hope the pop() in join() function 
        # will change the original data
        
    amount = len(producer1)+len(producer2)+len(producer3)
    if amount >= 2: # This means there be 2 or more records in stream application
        # We do the average option for the data with same geohash from producer2 and producer3
        if len(producer2) > 0 and len(producer3) > 0: 
            for record2 in producer2:
                for record3 in producer3:
                    if record2["geohash"] == record3["geohash"]:
                            record2['surface_temperature_celcius'] = 0.5* (record2['surface_temperature_celcius'] \
                                +record3['surface_temperature_celcius'])
                            record2['confidence'] = 0.5* (record2['confidence'] +record3['confidence'])
                            record3['surface_temperature_celcius'] = record2['surface_temperature_celcius']
                            record3['confidence'] = record2['confidence']
        # We dont remove the record, we just update the average data to original data.
        
    for record1 in producer1:
        if len(producer2) == 0 and len(producer3) > 0: # If there is data from producer1 and producer3
            for record3 in producer3:
                if record1["geohash"] == record3["geohash"]:
                    record1 = join(record1,record3) # Call join() function we write previous
                    join_list.append(record1)
                    producer3.remove(record3)
        if len(producer2) > 0 and len(producer3) == 0: # If there is data from producer1 and producer2
            for record2 in producer2:
                if record1["geohash"] == record2["geohash"]:
                    record1 = join(record1,record2)
                    join_list.append(record1)
                    producer2.remove(record2)
        if len(producer2) > 0 and len(producer3) > 0: # If there is data from producer1,2,3
            for record2 in producer2:
                if record1["geohash"] == record2["geohash"]:
                    record1 = join(record1,record2)
                    join_list.append(record1)
                    producer2.remove(record2)
            for record3 in producer3:
                if record1["geohash"] == record3["geohash"]:
                    record1 = join(record1,record3)
                    join_list.append(record1)
                    producer3.remove(record3)
                else:
                    if len(record1['fire_data']>0):
                        join_list.append(record1)                    
                    
    iter2 = []
    iter2 = producer1 + producer2 + producer3    
            
    for item in iter2: # insert all data into mongoDB
        try:
            all_data.insert(item)
        except Exception as ex:
            print("Exception Occured. Message: {0}".format(str(ex)))
            
    for item2 in fire_list: #insert fire records into mongoDB
        try:
            fire_data.insert(item2)
        except Exception as ex:
            print("Exception Occured. Message: {0}".format(str(ex)))  
            
    for item3 in join_list: # insert join result into mongoDB
        try:
            join_data.insert(item3)
        except Exception as ex:
            print("Exception Occured. Message: {0}".format(str(ex)))
            
    client.close()

batch_interval = 10 #set the batch interval to 10 
topic = "Producer"

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]") # Set 2 execution threads
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, batch_interval)
    
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1:9092', 
                        'group.id':'week11-group', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary

lines = kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendDataToDB))

ssc.start()
time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

KeyboardInterrupt: 

In [3]:
client = MongoClient() #Drop the collections in mongoDB when necessary
db = client.fit5148_assignment_db
db.all_data.drop()
db.fire_data.drop()
db.join_data.drop()