In [1]:
import os

from pymongo import MongoClient
from pyspark.sql import SparkSession
from json import loads, JSONDecodeError
from datetime import datetime
from bson import ObjectId

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'


In [2]:
from typing import Tuple

"""
    Convert lat-lng from tuple of floating points to tuple of binary string presentations of the values
    !!!IMPORTANT!!!
        To avoid converting floating points to binary directly, all values are x100 and converted to integer
        THE FINAL VALUE IS 100X THE ORIGINAL VALUE (but it doesnt matter for our use case)
    !!!END OF IMPORTANT!!!
"""
def latlng_to_binstr(lat_lng: Tuple[float, float]) -> Tuple[str, str]:
    # float(25.125), float(10.13) -> int(25125), int(10130) -> '0b10101100`', '0b11011010' -> '10101100', '11011010'
    return bin(int(lat_lng[0]*100))[2:], bin(int(lat_lng[1]*100))[2:]

"""
    Return True if two given latitude-longitude pairs are close together
                                                          within 3 precision
"""
def are_close(lat_lng_1: Tuple[float, float], lat_lng_2: Tuple[float, float]) -> bool:
    lat_lng_1 = latlng_to_binstr(lat_lng_1)
    lat_lng_2 = latlng_to_binstr(lat_lng_2)
    for i in range(3):
        if lat_lng_1[0][i] != lat_lng_2[0][i] or lat_lng_1[1][i] != lat_lng_2[1][i]:
            return False
    return True

"""
    Return True if two given latitude-longitude pairs are the same
                                                          within 5 precision
"""
def are_same(lat_lng_1: Tuple[float, float], lat_lng_2: Tuple[float, float]) -> bool:
    lat_lng_1 = latlng_to_binstr(lat_lng_1)
    lat_lng_2 = latlng_to_binstr(lat_lng_2)
    for i in range(5):
        if lat_lng_1[0][i] != lat_lng_2[0][i] or lat_lng_1[1][i] != lat_lng_2[1][i]:
            return False
    return True


In [3]:
TOPIC_NAME = "topic_1"
HOST_IP = "192.168.20.6"

In [4]:
"""
    EXCEPT errno 61 connection refused:
        RESTART ipynb kernel
"""
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('[Demo] Spark Streaming from Kafka into MongoDB')
    .getOrCreate()
)

:: loading settings :: url = jar:file:/Users/petermok/opt/miniconda3/envs/fit3182/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/petermok/.ivy2/cache
The jars for the packages stored in: /Users/petermok/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-867621aa-051a-43d2-ae91-5a245bf1ce7d;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.3.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.0 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305

In [174]:
from pyspark.sql import DataFrame

p1_topic_name = "climate_producer"
p1_stream_df = (
    spark.readStream.format('kafka')
    .option('kafka.bootstrap.servers', f'{HOST_IP}:9092')
    .option('subscribe', p1_topic_name)
    .load()
)
p1_stream_df.printSchema()

p1_output_stream_df = (
    p1_stream_df
    .select(p1_stream_df.columns[:2])   # get column of key (producer_id, date) and value (data)
)
p1_output_stream_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)



In [6]:
p2_topic_name = "aqua_producer"
p2_stream_df = (
    spark.readStream.format('kafka')
    .option('kafka.bootstrap.servers', f'{HOST_IP}:9092')
    .option('subscribe', p2_topic_name)
    .load()
)
p2_stream_df.printSchema()

p2_output_stream_df = (
    p2_stream_df
    .select(p2_stream_df.columns[:2])   # get column of key (producer_id, date) and value (data)
)
p2_output_stream_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)



In [7]:
p3_topic_name = "terra_producer"
p3_stream_df = (
    spark.readStream.format('kafka')
    .option('kafka.bootstrap.servers', f'{HOST_IP}:9092')
    .option('subscribe', p3_topic_name)
    .load()
)
p3_stream_df.printSchema()

p3_output_stream_df = (
    p3_stream_df
    .select(p3_stream_df.columns[:2])   # get column of key (producer_id, date) and value (data)
)
p3_output_stream_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)



In [152]:
class ClimateWriter:

    def __init__(self):
        self.client = None
        self.db = None
        self.col = None
        self.producer = None
        self.date = None
        self.data = None

    # called at the start of processing each partition in each output micro-batch
    def open(self, partition_id, epoch_id):
        self.client = MongoClient(
            host=f'{"localhost"}',
            port=27017
        )
        self.db = self.client.fit3182_assignment_db
        self.col = self.db.dates
        return True

    # called once per row of the result dataframe
    def process(self, row):
        print("CLIMATE Processing")
        key = row["key"].decode()
        value = row["value"].decode()
        try:
            key = dict(loads(key.replace("\'", "\"")))      # dict-in-str -> json -> dict
            self.producer = key.get("producer")
            self.date = datetime.strptime(key.get("date"), "%Y-%m-%d")      # str -> date
            self.date = datetime.combine(self.date, datetime.min.time())    # date -> datetime
        except JSONDecodeError as e:
            print("CLIMATE Process skipped: \n" + str(e) + " in decoding key (Don't worry about it, it works 50% of the time)")
        try:
            value = dict(loads(value.replace("\'", "\"")))  # dict-in-str -> json -> dict
            self.data = value
        except JSONDecodeError as e:
            print("CLIMATE Process skipped: \n" + str(e) + " in decoding key (Don't worry about it, it works 50% of the time)")

        if self.producer and self.date and self.data:
            print("CLIMATE Process Done")

    # called once all rows have been processed (possibly with error)
    def close(self, err):
        if err:
            print("Error in closing Climate Writer: " + str(err))

        if self.date and self.data:
            db_obj = {
                "_id": self.date,
                "climate": {
                    "air_temperature": self.data.get("air_temperature_celcius"),
                    "ghi": self.data.get("GHI_w/m2"),
                    "max_wind_speed": self.data.get("max_wind_speed"),
                    "precipitation": self.data.get("precipitation"),
                    "relative_humidity": self.data.get("humidity"),
                    "windspeed_knots": self.data.get("windspeed_knots")
                },
                "hotspots": []
            }
            #local_hotspots.clear()

            try:
                self.col.insert_one(db_obj)
            except Exception as e:
                print("Exception in inserting CLIMATE data to DB: " + str(e))
            else:
                print("---------------------------")
                print("CLIMATE data inserted")
                print("Hotspots count " + str(len(db_obj.get("hotspots"))))
                print("Collection Size: " + str(self.col.count_documents({})))
                print("---------------------------")
            finally:
                self.client.close()


In [153]:
class HotspotWriter:

    def __init__(self):
        self.db = None
        self.col = None
        self.client = None
        self.data = None
        self.datetime = None
        self.producer = None

    # called at the start of processing each partition in each output micro-batch
    def open(self, partition_id, epoch_id):
        self.client = MongoClient(
            host=f'{"localhost"}',
            port=27017
        )
        self.db = self.client.fit3182_assignment_db
        self.col = self.db.hotspots
        return True

    # called once per row of the result dataframe
    def process(self, row):
        print("HOTSPOT Processing")
        key = row["key"].decode()
        value = row["value"].decode()
        try:
            key = dict(loads(key.replace("\'", "\"")))      # dict-in-str -> json -> dict
            self.producer = key.get("producer")
            self.datetime = datetime.strptime(key.get("datetime"), "%Y-%m-%d %H:%M:%S.%f")
        except JSONDecodeError as e:
            print("Process skipped: \n" + str(e) + " in decoding key (Don't worry about it, it works 50% of the time)")
        try:
            value = dict(loads(value.replace("\'", "\"")))  # dict-in-str -> json -> dict
            self.data = value
        except JSONDecodeError as e:
            print("Process skipped: \n" + str(e) + " in decoding key (Don't worry about it, it works 50% of the time)")

        if self.producer and self.datetime and self.data:
            print("Process Done")


    # called once all rows have been processed (possibly with error)
    def close(self, err):
        if err:
            print("Error: " + str(err))

        if self.datetime and self.data:
            db_obj = {
                "_id": ObjectId(),
                "confidence": self.data.get("confidence"),
                "datetime": self.datetime,
                "date": datetime.combine(self.datetime.date(), datetime.min.time()),
                "lat": self.data.get("latitude"),
                "lng": self.data.get("longitude"),
                "surface_temperature": self.data.get("surface_temperature_celcius")
            }
            try:
                local_hotspots.append(db_obj)
                print(id(local_hotspots))
            except Exception as e:
                print("Exception in inserting HOTSPOT data to Memory: " + str(e))
            else:
                print("---------------------------")
                print("HOTSPOT data inserted to local memory")
                print("Local HOTSPOT data size: " + str(len(local_hotspots)))
                print("---------------------------")
            finally:
                self.client.close()


In [154]:
climate_writer = (
    p1_output_stream_df
    .writeStream
    .outputMode('append')
    .trigger(continuous="10 seconds")
    .foreach(ClimateWriter())
)

aqua_writer = (
    p2_output_stream_df
    .writeStream
    .outputMode('append')
    .trigger(continuous="2 seconds")
    .foreach(HotspotWriter())
)

terra_writer = (
    p3_output_stream_df
    .writeStream
    .outputMode('append')
    .trigger(continuous="2 seconds")
    .foreach(HotspotWriter())
)

console_logger = (
    p1_output_stream_df
    .writeStream
    .outputMode('complete')
    .format('console')
)


In [155]:
from pyspark.errors import StreamingQueryException

queries = []
try:
    queries.append(climate_writer.start())
    queries.append(aqua_writer.start())
    queries.append(terra_writer.start())
    for query in queries:
        query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopped query')
except StreamingQueryException as exc:
    print(exc)
finally:
    for query in queries:
        query.stop()


23/06/05 23:21:24 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/nk/pq_9ypcs6_x5jdx99mrtszc80000gn/T/temporary-aa7d9684-8aeb-47aa-9d58-21d11322f746. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/06/05 23:21:24 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/06/05 23:21:25 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/nk/pq_9ypcs6_x5jdx99mrtszc80000gn/T/temporary-5550a8c4-9255-443a-9d16-d20d3b214f4b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folde

Interrupted by CTRL-C. Stopped query


                                                                                