Install geohash package 

In [1]:
pip install python-geohash

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'
from datetime import datetime, timedelta
import random
import re
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import types as T
# import statements
import json 
import geohash 

topic_name = 'Assignment3'
hostip = "192.168.0.108" # change it to your IP

#instantiate Spark 
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('[Demo] Spark Streaming from Kafka into MongoDB')
        .getOrCreate()
)

#configure reading from kafka streams
# converts stream into a dataframe
producer_df = (
    spark.readStream.format('kafka')
    .option('kafka.bootstrap.servers', f'{hostip}:9092')
    .option('subscribe', "Producer1, Producer2, Producer3") #set spark to listen to 3 streams
    .load()
)


In [3]:
producer_df.select('value')

DataFrame[value: binary]

In [12]:
import copy

"""
Input
batch: a list of json records
collection: string representation of database's collection name to append add to

Explanation
This pushes records to mongodb databaase
"""
def push_batch_tomongodb(batch, collection):    
    mongo_client = MongoClient(host=f'{hostip}', port=27017)
    db = mongo_client['fit3182_db']
    collection_db = db[collection]
    for record in batch:
        collection_db.insert_one(record)
    
    mongo_client.close()
    
    
"""
Input
df: dataframe

Explanation
This is called for a batch of new records received every 10 seconds (1 day)
"""
def foreach_batch_function(df, epoch_id):   
    #map dataframe to a list 
    data = df.collect()
    temp_aqua = []
    temp_terra = []
    climate = None
    climate_lat = None
    climate_long = None
    #seperate data based on stream producer
    for row in data:
        decoded_data = row.value.decode('utf-8')
        json_value = json.loads(decoded_data)
        producer = json_value["producer"]
        
        if producer== "Producer1":
            result = re.match(r"(\d+\.\d+)([A-Za-z]+)", json_value['precipitation '].strip()) #seperate precipitation data into seperate numeric and aphabert part
            numeric_part_precipitation = float(result.group(1)) #measurement of precipitation
            alphabetic_part_precipitation = result.group(2) #flag representation info
            datetime_obj = datetime.fromisoformat(json_value['created_on']) 

            #map climate schema
            climateSchema = {}
            climateSchema["date"] = datetime_obj
            climateSchema["station"] = random.randint(1, 10000) #randomly created station number
            climateSchema["relative_humidity"] = json_value["relative_humidity"]
            climateSchema["air_temperature_celsius"] = json_value["air_temperature_celcius"]
            climateSchema["windspeed_knots"] = json_value["windspeed_knots"]
            climateSchema["max_wind_speed"] = json_value["max_wind_speed"]
            climateSchema["measure"] = numeric_part_precipitation
            climateSchema["flag"] = alphabetic_part_precipitation
            climateSchema["GHI_w/m2"] = json_value["GHI_w/m2"]
            climate_lat = json_value["latitude"]
            climate_long = json_value["longitude"]
            climate = climateSchema
            
        else:
            datetime_obj = datetime.fromisoformat(json_value['created_on'])
            date_obj = datetime_obj.date()  # Extract the date part
            date_part = datetime(date_obj.year, date_obj.month, date_obj.day)  # Create new datetime object using the extracted date
            
            #map hotspot schema
            hotspotSchema = {}
            hotspotSchema["date"] = date_part
            hotspotSchema["datetime"] = datetime_obj
            hotspotSchema["latitude"] = json_value["latitude"]
            hotspotSchema["longitude"] = json_value["longitude"]
            hotspotSchema["confidence"] = json_value["confidence"]
            hotspotSchema["surface_temperature_celsius"] = json_value["surface_temperature_celcius"]
            
            if producer == "Producer2":
                temp_aqua.append(hotspotSchema)
            elif producer == "Producer3":
                temp_terra.append(hotspotSchema)
            
    #handle when there was no climate data is found
    if climate == None:
        print("-----------NO CLIMATE DATA FOUND-----------")
        return
    
    #encode3 for climate loc
    geohash3Climate = geohash.encode(climate_lat, climate_long, 3)
    
    fire_aqua = []
    temp_fires = []
    
    #find all nearby (geohash precision 3) fires to the station
    for aqua in temp_aqua:
        geohash3 = geohash.encode(aqua['latitude'], aqua['longitude'], 3)
        if geohash3 == geohash3Climate:
            temp_fires.append(aqua)
    
    for terra in temp_terra:
        geohash3 = geohash.encode(terra['latitude'], terra['longitude'], 3)
        if geohash3 == geohash3Climate:
            temp_fires.append(terra)
    
    
    if len(temp_fires) < 1: #if there are no fires nearby found
        #insert climate data   
        print("---------------NO FIRES NEARBY------------")
        print(climate)
        push_batch_tomongodb([climate], "climate")
        return
    
    hashMap = {}
    fires = []
    
    #create hashmap of fires hashed by geohash 5
    #this essentially groups fires with the same geohash 5
    for fire in temp_fires: 
        geohash5 = geohash.encode(float(fire['latitude']), float(fire['longitude']), 5)
        if geohash5 in hashMap.keys():
            hashMap[geohash5].append(fire)
        else:
            hashMap[geohash5] = [fire]
    
    for key in hashMap.keys():
        record = {}
        match_hotspots = hashMap[key]
        avg_surface_temp = 0
        avg_confidence = 0
        earlier_date = None
        earlier_datetime = None
        centre_lat = 0
        centre_long = 0
        isFirstItr = True
        #this iterates through every fire in each hash partition 
        #and combines the records if they are the same fire
        for fire in match_hotspots:   
            
            #check if this is the first fire
            if isFirstItr == True:
                avg_surface_temp = fire['surface_temperature_celsius']
                avg_confidence = fire['confidence']
                centre_lat = fire['latitude']
                centre_long = fire['longitude']
                isFirstItr = False
            else:
                avg_surface_temp = float(fire['surface_temperature_celsius'] + avg_surface_temp) / 2 
                avg_confidence = float(fire['confidence'] + avg_confidence) / 2               
                #find centre point of location
                centre_lat = float((centre_lat + fire['latitude']) /2)
                centre_long = float((centre_long + fire['longitude']) /2)
            
            #this takes the earliest date 
            if earlier_date == None or earlier_date > fire['datetime']:
                #use earlier date to signify the earliest recorded date and time of fire
                earlier_date = fire['date']
                earlier_datetime = fire['datetime']

        
        #determine the type of fire
        if climate['air_temperature_celsius'] > 20 and climate['GHI_w/m2'] > 180:
            record['type'] = "natural"
        else:
            record['type'] = "other"
        
        record['date'] = earlier_date
        record["datetime"] = earlier_datetime
        record['surface_temperature_celsius'] = avg_surface_temp
        record['confidence'] = avg_confidence
        record['latitude'] = centre_lat
        record['longitude'] = centre_long
        fires.append(record)
    
    #once fire records have been processed, the records are added to mongodb
    print("--------------CLIMATE-------------")
    print(climate)
    push_batch_tomongodb([climate], "climate")
    print("--------------FIRES-------------")
    print(fires)
    push_batch_tomongodb(fires, "hotspots")    

            

In [13]:
writer = (
    producer_df.writeStream \
    .outputMode('append') \
    .foreachBatch(foreach_batch_function) \
    .trigger(processingTime="10 seconds")
)

In [None]:
try:
    query = writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopping query.')
finally:
    query.stop()

-----------NO CLIMATE DATA FOUND-----------
--------------CLIMATE-------------
{'date': datetime.datetime(2025, 3, 14, 0, 0), 'station': 7570, 'relative_humidity': 48.1, 'air_temperature_celsius': 14, 'windspeed_knots': 13.7, 'max_wind_speed': 19.0, 'measure': 0.35, 'flag': 'G', 'GHI_w/m2': 122}
--------------FIRES-------------
[{'type': 'other', 'date': datetime.datetime(2025, 3, 12, 0, 0), 'datetime': datetime.datetime(2025, 3, 12, 9, 36), 'surface_temperature_celsius': 56.0, 'confidence': 83.0, 'latitude': -36.7217, 'longitude': 141.6378}]
--------------CLIMATE-------------
{'date': datetime.datetime(2025, 3, 15, 0, 0), 'station': 7941, 'relative_humidity': 43.6, 'air_temperature_celsius': 10, 'windspeed_knots': 9.7, 'max_wind_speed': 14.0, 'measure': 0.2, 'flag': 'G', 'GHI_w/m2': 90}
--------------FIRES-------------
[{'type': 'other', 'date': datetime.datetime(2025, 3, 13, 0, 0), 'datetime': datetime.datetime(2025, 3, 13, 19, 12), 'surface_temperature_celsius': 42.0, 'confidence': 