In [1]:
import datetime
import os
from json import JSONDecodeError

import pymongo
import time

from bson import ObjectId
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pymongo import MongoClient
import json
import geohash2

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'


# os.environ['PYSPARK_PYTHON'] = '/usr/local/Cellar/python/3.7.1/bin/python3'
# os.environ['PYSPARK_DRIVER_PYTHON'] = 'ipython3'


def insert_climate_to_db(client, climates):
    """
    Insert the given list to mongodb
    :param climates: A list of climate records
    """
    if len(climates) == 0:
        return
    db = client.fit5148_assignment_db
    db.climate_streaming.insert_many(climates)


def insert_hotspot_to_db(client, hotspots):
    """
    Insert the given list to mongodb
    :param hotspots: A list of hotspot records
    """
    if len(hotspots) == 0:
        return

    db = client.fit5148_assignment_db
    for h in hotspots:
        print(h)
    db.hotspot_streaming.insert_many(hotspots)


def is_joinable(record1, record2):
    """
    Use geo-hash to determine whether two reords are joinable
    :param record1: hotspot or climate
    :param record2: hotspot or climate
    :return: True if the precision of geo-hash is smaller than 5
    """
    if 'latitude' not in record1 or 'latitude' not in record2:
        raise Exception("record should have location information (latitude)")
    if 'longitude' not in record1 or 'longitude' not in record2:
        raise Exception("record should have location information (longitude)")
    precision = 5
    lat_1 = record1["latitude"]
    lat_2 = record2["latitude"]
    lng_1 = record1["longitude"]
    lng_2 = record2["longitude"]
    geo_hash_1 = geohash2.encode(lat_1, lng_1, precision)
    geo_hash_2 = geohash2.encode(lat_2, lng_2, precision)
    if geo_hash_1 == geo_hash_2:
        return True
    else:
        return False


def handle_partition(iter):
    """
    Perform operations on each batch
    :param iter: A list of elements from the batch
    :return:
    """
    climates = []
    hotspots = []
    ''' split climate and hotspot '''
    for key, str_record in iter:
        record = {}  # load data as a dictionary
        try:
            record = json.loads(str_record)
        except JSONDecodeError:
            print("Cannot deserialize data -> " + str_record)
            continue

        if record['sender_id'] == "climate":
            climates.append(record)
        elif record['sender_id'] == "AQUA" or record['sender_id'] == "TERRA":
            hotspots.append(record)
        else:
            raise Exception("Invalid sender id")

    ''' join two hotspot '''
    if len(hotspots) == 2:
        if is_joinable(hotspots[0], hotspots[1]):  # hotspot records are joinable
            print("joinable hotspots")
            avg_surface_temperature = (hotspots[0]['surface_temperature_celcius']
                                       + hotspots[1]['surface_temperature_celcius']) / 2
            avg_confidence = (hotspots[0]['confidence'] + hotspots[1]['confidence']) / 2

            ''' combine joinable hotspots '''
            hotspot_to_store = hotspots[0]
            hotspot_to_store['surface_temperature_celcius'] = avg_surface_temperature
            hotspot_to_store['confidence'] = avg_confidence
            hotspots = [hotspot_to_store]
            print(hotspots)

    ''' prepare the records that will be saved to the database '''
    hotspots_to_save = []
    for climate in climates:
        climate['_id'] = ObjectId()
        climate['created_time'] = datetime.datetime.strptime(climate['created_time'], '%Y-%m-%dT%H:%M:%S')
        for hotspot in hotspots:
            if is_joinable(climate, hotspot):  # bind climate and hotspot if joinable
                print("joinable climate and hotspot")
                hotspot['climate_id'] = climate['_id']
                hotspot['created_time'] = datetime.datetime.strptime(hotspot['created_time'], '%Y-%m-%dT%H:%M:%S')
                hotspots_to_save.append(hotspot)


    ''' save results to the database '''
    client = MongoClient("localhost", 27017)
    # print(climates)
    # print(hotspots)
    insert_climate_to_db(client, climates)
    insert_hotspot_to_db(client, hotspots_to_save)
    client.close()


n_secs = 10  # interval between each batch
topic = "temperature_analysis"
conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[2]")

sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(conf=conf)

ssc = StreamingContext(sc, n_secs)
consumer = KafkaUtils.createDirectStream(ssc, [topic], {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'week11-group',
    'fetch.message.max.bytes': '15728640',
    'auto.offset.reset': 'largest'},
                                         valueDecoder=lambda x: x.decode('utf-8').replace("'", "\""),
                                         # decode as dictionary
                                         keyDecoder=lambda x: x.decode('utf-8'))  # decode as string

consumer.foreachRDD(lambda rdd: rdd.foreachPartition(handle_partition))  # able to run concurrently

ssc.start()

time.sleep(600)  # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True, stopGraceFully=True)
