# Streaming Application

In [1]:
import json
import os

from datetime import datetime
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pprint import pprint

import pygeohash as pgh

os.environ[
    "PYSPARK_SUBMIT_ARGS"
] = "--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell"

In [2]:
# setting constant environment variables
TOPIC = "climate,hotspot"  # read from 2 topics
HOST = "localhost"
BATCH_INTERVAL = 10

In [3]:
# initialize spark client to read from kafka source
spark = (
    SparkSession.builder.master("local[*]")
    .appName("Climate-Hotspot-Analysis")
    .getOrCreate()
)

kafka_sdf = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", f"{HOST}:9092")
    .option("subscribe", TOPIC)
    .load()
)

:: loading settings :: url = jar:file:/opt/homebrew/Cellar/apache-spark/3.4.0/libexec/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/weichun/.ivy2/cache
The jars for the packages stored in: /Users/weichun/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a8dc0fa1-6f55-4875-a7c3-44f21167837f;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.3.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.0 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3

In [4]:
def submit_datapoint(measurement):
    # initialize client and connection to db
    client = MongoClient()
    db = client.fit3182_assignment_db
    collection = db.climate_historic

    # insert document into db per batch
    try:
        collection.insert_one(measurement)
    except Exception as ex:
        print("Exception Occured. Message: {0}".format(str(ex)))
    client.close()


def group_hotspots(aqua, terra):
    groups = {}
    fire_events = set()

    # iterate through each record and group similar geohash record together
    for record in aqua:
        geohash = record["geohash"]
        if geohash not in groups:
            groups[geohash] = []
        groups[geohash].append(record)

    hotspot_key = groups.keys()

    # iterate through each record and group similar geohash record together
    for record in terra:
        geohash = record["geohash"]
        # categorize as fire_event if there is another datapoint coming from another/previous satelite
        if geohash in hotspot_key:
            fire_events.add(geohash)
        if geohash not in groups:
            groups[geohash] = []
        groups[geohash].append(record)

    hotspots = []
    for group in groups.values():
        if len(group) > 1:
            # find average of measurement for hotspots data that are referring to the same location
            number_of_records = len(group)
            datapoint = group[0]

            total_temp = 0
            total_conf = 0

            for record in group:
                total_temp += record["surface_temperature_celcius"]
                total_conf += record["confidence"]

            # calculate average surface temperature and confidence in 2 decimal point
            datapoint["surface_temperature_celcius"] = round(
                total_temp / number_of_records, 2
            )
            datapoint["confidence"] = round(total_conf / number_of_records, 2)

            hotspots.append(datapoint)
        else:
            hotspots.append(group[0])

    return fire_events, hotspots


def find_correlation(climate, hotspots, fire_events):
    climate["hotspots"] = []
    if climate and hotspots:
        for hotspot in hotspots:
            # relate climate and hotspots data only if they are close ie
            # same geohash with precision 3
            if climate["geohash"][:3] == hotspot["geohash"][:3]:
                # hotspots are categorized only if there are fire_events
                if hotspot["geohash"] in fire_events:
                    if (
                        climate["air_temperature_celcius"] > 20
                        and climate["GHI_w/m2"] > 180
                    ):
                        hotspot["cause"] = "natural"
                    else:
                        hotspot["cause"] = "other"
                    print(f"{hotspot['cause']} fire detected!")

            # clean up hotspot data
            hotspot["timestamp"] = datetime.strptime(
                hotspot["datetime"], "%Y-%m-%dT%H:%M:%S"
            ).strftime("%H:%M:%S")
            hotspot.pop("producer_id")
            hotspot.pop("geohash")
            hotspot.pop("datetime")

            climate["hotspots"].append(hotspot)

    # clean up climate data
    climate.pop("producer_id")
    climate.pop("latitude")
    climate.pop("longitude")
    climate.pop("geohash")

    climate["date"] = datetime.strptime(climate["date"], "%Y-%m-%d")
    return climate


def process_batch(df, epoch_id):
    climate = None
    aqua = []
    terra = []

    # iterate through batch dataset and assign to appropriate list based on producer_id
    dataset = df.collect()
    for record in dataset:
        data = json.loads(record.value)

        # encode geohash based on latitude and longitude
        data["geohash"] = pgh.encode(data["latitude"], data["longitude"], precision=5)

        if data["producer_id"] == "climate_producer":
            climate = data
        elif data["producer_id"] == "aqua_producer":
            aqua.append(data)
        elif data["producer_id"] == "terra_producer":
            terra.append(data)
        else:
            print("Invalid producer_id....Skipping")

    print(
        f"received {0 if not climate else 1} climate records, {len(aqua)} aqua records, {len(terra)} terra records"
    )

    # only proceed to process data for current window if there's a climate data
    # climate date form the basis of a document in the DB hence no point in processing if there is no data
    if climate:
        fire_events, hotspots = group_hotspots(aqua, terra)
        measurement = find_correlation(climate, hotspots, fire_events)
        submit_datapoint(measurement)

In [5]:
# configure spark writer to process stream in mini batches
writer = (
    kafka_sdf.writeStream.format("Console")
    .option("checkpointLocation", "climate-hotspots-checkpoint")
    .outputMode("append")
    .trigger(processingTime=f"{BATCH_INTERVAL} seconds")  # trigger action in batches
    .foreachBatch(process_batch)
)

In [6]:
try:
    query = writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print("Interrupted by CTRL-C. Stopping query.")
finally:
    query.stop()

23/06/05 17:31:11 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/06/05 17:31:12 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
23/06/05 17:31:12 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
23/06/05 17:31:12 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
23/06/05 17:31:12 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known config.
23/06/05 17:31:12 WARN AdminClientConfig: The configuration 'auto.offset.reset' was supplied but isn't a known config.
                                                                                

received 1 climate records, 153 aqua records, 56 terra records
received 1 climate records, 4 aqua records, 4 terra records


[Stage 2:>                                                          (0 + 2) / 2]                                                                                

received 1 climate records, 5 aqua records, 5 terra records


[Stage 3:>                                                          (0 + 2) / 2]                                                                                

received 1 climate records, 5 aqua records, 5 terra records


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/weichun/Desktop/fit3182/asgn_3/env/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/weichun/Desktop/fit3182/asgn_3/env/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/weichun/.pyenv/versions/3.9.16/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Interrupted by CTRL-C. Stopping query.
