In [113]:
# Install geohash2 for geohashing algorithm
!pip install geohash2



In [114]:
# Import libraries
import json
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, struct, when, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType, TimestampType
from pyspark.sql.functions import udf, to_timestamp
import geohash2
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

# Set ip address to host ip address
ip_address = "192.168.224.1"

In [115]:
# # Drop the data from collection that will be used
# client = MongoClient(ip_address, 27017)
# db = client["A3_db"]
# collection = db["ClimateHotspot"]
# collection.drop()

In [116]:
# User-defined function for calculating geohash
def to_geohash(lat, lon, precision=3):
    return geohash2.encode(lat, lon, precision)
# geohash_udf = udf(lambda lat, lon: geohash2.encode(lat, lon, precision=3), StringType())
geohash_udf = udf(to_geohash, StringType())

In [120]:
# Function to accept data into dataframe (for debugging purposes)
def receive_data(producer):
    try:
        query = producer \
        .writeStream \
        .outputMode("append") \
        .format("console") \
        .start() \
        
        query.awaitTermination()
    except:
        print("ERROR")
    finally:
        query.stop()

In [121]:
# Create a SparkSession
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('FireHotspotConsumer') \
    .getOrCreate()

In [122]:
# Define the schema for the climate data
climate_schema = StructType([
    StructField("date", StringType(), nullable=True),
    StructField("latitude", DoubleType(), nullable=True),
    StructField("longitude", DoubleType(), nullable=True),
    StructField("air_temperature", DoubleType(), nullable=True),
    StructField("relative_humidity", DoubleType(), nullable=True),
    StructField("windspeed_knots", DoubleType(), nullable=True),
    StructField("max_wind_speed", DoubleType(), nullable=True),
    StructField("precipitation_flag", StringType(), nullable=True),
    StructField("precipitation", DoubleType(), nullable=True),
    StructField("GHI", DoubleType(), nullable=True),
    StructField("producer", StringType(), nullable=True)
])

# Define the schema for the hotspot data
hotspot_schema = StructType([
    StructField("date", StringType(), nullable=True),
    StructField("datetime", StringType(), nullable=True),
    StructField("latitude", DoubleType(), nullable=True),
    StructField("longitude", DoubleType(), nullable=True),
    StructField("confidence", DoubleType(), nullable=True),
    StructField("surface_temperature", DoubleType(), nullable=True),
    StructField("producer", StringType(), nullable=True)
])

In [123]:
# Read the climate data from Kafka
climate_data = spark.readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', f'{ip_address}:9092') \
    .option('subscribe', 'climate_topic') \
    .option("failOnDataLoss", False) \
    .load() \

# Since the incoming data was a json file, we want to parse it into the right format
climate_data = climate_data.withColumn("value", climate_data["value"].cast(StringType()))
climate_data = climate_data.withColumn("data", from_json(climate_data["value"], climate_schema))

In [124]:
# Select columns that will be used
# Add column geo_hash using geohash function
# Modify date to timestamp for watermarking
# Specify watermark of 1 day using date
climate_stream = climate_data.select(
    "data.date",
    "data.latitude",
    "data.longitude",
    "data.air_temperature",
    "data.relative_humidity",
    "data.windspeed_knots",
    "data.max_wind_speed",
    "data.precipitation_flag",
    "data.precipitation",
    "data.GHI",
    "data.producer"
).withColumn("geo_hash", geohash_udf(col("latitude"), col("longitude"))) \
.withColumn("date", to_timestamp("date","yyyy/MM/dd")) \
.withWatermark("date", "1 day")

In [125]:
# print("starting")
# receive_data(climate_stream)

In [126]:
# Read the hotspot data from Kafka
hotspot_data = spark.readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', f'{ip_address}:9092') \
    .option('subscribe', 'hotspot_topic') \
    .option("failOnDataLoss", False) \
    .load() \

# Since the incoming data was a json file, we want to parse it into the right format
hotspot_data = hotspot_data.withColumn("value", hotspot_data["value"].cast(StringType()))
hotspot_data = hotspot_data.withColumn("data", from_json(hotspot_data["value"], hotspot_schema))

In [127]:
# Select columns that will be used
# Add column geo_hash using geohash function
# Modify date to timestamp for watermarking
# Also modify datetime to timestamp
# Specify watermark of 1 day using date
hotspot_stream = hotspot_data.select(
    "data.date",
    "data.datetime",
    col("data.latitude").alias("hotspot_latitude"),
    col("data.longitude").alias("hotspot_longitude"),
    "data.confidence",
    "data.surface_temperature",
    col("data.producer").alias("hotspot_producer"),
).withColumn("geo_hash", geohash_udf(col("hotspot_latitude"), col("hotspot_longitude"))) \
.withColumn("date", to_timestamp("date","yyyy/MM/dd")) \
.withColumn("datetime", to_timestamp("datetime","yyyy/MM/dd HH:mm:ss")) \
.withWatermark("date", "1 day")

In [128]:
# print("Starting")
# receive_data(hotspot_stream)

In [129]:
# Join the climate data with hotspot data based on date and geo-hashing with precision 3
joined_data = climate_stream.join(
    hotspot_stream,
    ["date", "geo_hash"],
    "left"
)

In [130]:
# joined_data.printSchema()

In [131]:
# Select the required columns for the document
output_data = joined_data.select(
    "date",
    "latitude",
    "longitude",
    "air_temperature",
    "relative_humidity",
    "windspeed_knots",
    "max_wind_speed",
    "precipitation_flag",
    "precipitation",
    "GHI",
    "geo_hash",
    struct(
        "datetime",
        "hotspot_producer",
        "hotspot_latitude",
        "hotspot_longitude",
        "confidence",
        "surface_temperature",
        when(col("surface_temperature").isNull(), None) \
        .when((col("surface_temperature") > 20) & (col("GHI") > 180), "natural") \
        .otherwise("other").alias("cause")
    ).alias("hotspot_data")
)


In [132]:
# output_data.printSchema()

In [133]:
# print("Starting")
# receive_data(output_data)

In [134]:
# MongoDBWriter class for writing batch data to MongoDB
class WriteToMongoDB:
    def __init__(self):
        # Set up the MongoDB connection
        self.client = MongoClient(ip_address, 27017)
        self.db = self.client["A3_db"]
        self.collection = self.db["ClimateHotspot"]

    def __call__(self, batch_df, batch_id):
        # Convert the batch DataFrame to a list of json
        json_documents = batch_df.toJSON().collect()
    
        # Convert each JSON string to a dictionary
        for json_doc in json_documents:
            document = json.loads(json_doc)
            print(document)
            print()
            document_date = document['date']
            document_geohash = document['geo_hash']
            document_hotspot = document.pop("hotspot_data")
            
            # If fire record exist, update or insert based on match query
            # Update if climate data (parent) already exist, otherwise insert new document
            if document_hotspot:
                self.collection.update_one({"date":document_date, "geo_hash":document_geohash},
                                          {"$push":{"hotspot_data":document_hotspot},
                                           "$setOnInsert":document
                                          },upsert=True)
            # If fire record doesn't exist, simply insert if document doesn't exist yet
            else:
                self.collection.update_one({"date":document_date, "geo_hash":document_geohash},
                          {"$setOnInsert":document},upsert=True)
    
    def close(self):
        # Close the MongoDB connection
        self.client.close()

In [135]:
# Write the streaming data to MongoDB
output_query = output_data.writeStream \
    .format("console") \
    .foreachBatch(WriteToMongoDB()) \
    .outputMode("append")

In [136]:
try:
    query = output_query.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print("Interrupted")
finally:
    query.stop()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt
ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/opt/conda/lib/python3.8/site-packages/pyspark/sql/utils.py", line 272, in call
    raise e
  File "/opt/conda/lib/python3.8/site-packages/pyspark/sql/utils.py", line 269, in call
    self.func(Da

Interrupted
