In [1]:
# !pip install python-geohash
import json
from pymongo import MongoClient
from pyspark.sql import SparkSession
import geohash
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

In [2]:
host_ip = "192.168.0.129"

#If a SparkSession already exists, it retrieves it; otherwise, it creates a new one.
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('Streaming Application')
    .getOrCreate()
)

In [3]:
#Reads streaming data from topic Scenario01 produced by Kafka Producers
kafka_sdf = (
    spark.readStream
    .format('kafka')
    .option('kafka.bootstrap.servers', f'{host_ip}:9092')
    .option("failOnDataLoss", "false")
    .option('subscribe', 'Scenario01')
    .load()
)

In [4]:
#Contains serialized data that we need
climate_sdf = kafka_sdf.select('value')

In [5]:
def write_db(data):
    client = MongoClient(host_ip, 27017)
    db = client.fit3182_assignment_db
    climate_collection = db.PartB
    climate_collection.insert_one(data)
    print(data)

In [6]:
client = MongoClient(host_ip, 27017)
db = client.fit3182_assignment_db
climate_collection = db.PartB
climate_collection.delete_many({})

<pymongo.results.DeleteResult at 0x7f3c7d226af0>

In [7]:
#Processes the streaming data, extracts climate and hotspot data, groups the hotspot data based on geohash precision 5, 
#and writes the processed data to the database.
def stream_process_batch(df, epoch_id):
    flag_hash3 = False
    data = df.collect()
    json_data = [json.loads(row.value.decode("utf-8")) for row in data]
    
    climate_geohash_value = 0
    
    #Find climate data
    for elem in json_data:
        if elem.get("station") == 111:
            climate_data = elem
            climate_geohash_value = geohash.encode(climate_data.get("latitude"), climate_data.get("longitude"), precision=3)
            break
    #Exit function if climate doesn't exist
    if climate_geohash_value ==0:
        print("NO CLIMATE FOUND")
        return
    
    geohash_groups_p3 = []
    geohash_groups_p5 = {}
    
    #Compute geohash for hotspot data and find the ones that are same geohash as chosen climate data
    for elem in json_data:
        if elem.get("station") != 111:
            hotspot_data = elem
            geohash_value = geohash.encode(hotspot_data.get("latitude"), hotspot_data.get("longitude"), precision=3)
            if climate_geohash_value == geohash_value:
                geohash_groups_p3.append(hotspot_data)
                flag_hash3 = True
    #If there is no matching hotspot data, push climate data only
    if flag_hash3 == False:
        climate_reference = {
            "geohash_value":climate_geohash_value,
            "latitude": float(climate_data['latitude']),
            "longitude": float(climate_data['longitude']),
            "air_temperature_celcius": int(climate_data['air_temperature_celcius']),
            "relative_humidity": float(climate_data["relative_humidity"]),
            "windspeed_knots": float(climate_data["windspeed_knots"]),
            "max_wind_speed": float(climate_data["max_wind_speed"]),
            "precipitation": climate_data['precipitation'], 
            "GHI_w/m2": int(climate_data["GHI_w/m2"]),
            "date": climate_data['date'],
            "datetime": climate_data['datetime'],
            "station": climate_data['station']
        }
        write_db(climate_reference)
        return
        
    else: 
        #Compute geohash precision 5 between hotspot data, and save respective hotspot data with their respective
        #precision 5 key
        for item in geohash_groups_p3:
            item_geohash_value = geohash.encode(item.get("latitude"), item.get("longitude"), precision=5)
            if item_geohash_value in geohash_groups_p5:
                geohash_groups_p5[item_geohash_value].append(item)
            else:
                geohash_groups_p5[item_geohash_value] = [item]
                
        if climate_data['air_temperature_celcius']>20 and climate_data["GHI_w/m2"]>180:
            fire_event = "natural"
        else:
            fire_event="other"
        
        #Push climate data onto mongo with geohash value as primary key
        climate_reference = {
            "geohash_value":climate_geohash_value,
            "latitude": float(climate_data['latitude']),
            "longitude": float(climate_data['longitude']),
            "air_temperature_celcius": int(climate_data['air_temperature_celcius']),
            "relative_humidity": float(climate_data["relative_humidity"]),
            "windspeed_knots": float(climate_data["windspeed_knots"]),
            "max_wind_speed": float(climate_data["max_wind_speed"]),
            "precipitation": climate_data['precipitation'], 
            "GHI_w/m2": int(climate_data["GHI_w/m2"]),
            "date": climate_data['date'],
            "datetime": climate_data['datetime'],
            "station": climate_data['station']
        }
        write_db(climate_reference)
        
        #Average confidence and surface temperature for hotspot data with same precision 5 key
        for key, geo_grps in geohash_groups_p5.items():
                hotspot_reference={
                    "hotspot_geohash_value": climate_geohash_value,
                    "latitude": geo_grps[0]['latitude'],
                    "longitude": geo_grps[0]['longitude'],
                    "confidence": sum(elem1['confidence'] for elem1 in geo_grps) / len(geo_grps),
                    "surface_temperature_celcius": sum(elem1['surface_temperature_celcius'] for elem1 in geo_grps) / len(geo_grps),
                    "date": geo_grps[0]['date'] ,
                    "datetime": geo_grps[0]['datetime'] ,
                    "station": geo_grps[0]['station'],
                    "fire_event":fire_event
                }
                write_db(hotspot_reference)
            
    


In [8]:
#Sets up a streaming writer to write the streaming data to the console output, with a trigger interval of 10 seconds 
#and using stream_process_batch function to handle the actual processing of the data in each batch.
writer = (
    climate_sdf.writeStream.format("Console")
    .option("checkpointLocation", "./parking_sdf_checkpoints")
    .option("failOnDataLoss", "false")
    .outputMode('append')
    .trigger(processingTime='10 seconds')
    .foreachBatch(stream_process_batch)
)


In [None]:
#Ensures that the streaming query is properly started, continues running until explicitly stopped, and handles 
#any interruptions or errors gracefully.
try:
    query = writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopping query.')
finally:
    query.stop()

{'geohash_value': 'r36', 'latitude': -37.608, 'longitude': 149.282, 'air_temperature_celcius': 22, 'relative_humidity': 62.7, 'windspeed_knots': 7.6, 'max_wind_speed': 18.1, 'precipitation': ' 0.00I', 'GHI_w/m2': 169, 'date': '10/2/2023', 'datetime': '2023-02-10T00:00:00', 'station': 111, '_id': ObjectId('6481509da76fa647081ee9a4')}
{'hotspot_geohash_value': 'r36', 'latitude': -37.623, 'longitude': 149.323, 'confidence': 68.25, 'surface_temperature_celcius': 52.5, 'date': '1/1/2023', 'datetime': '2023-01-01T04:48:00', 'station': 112, 'fire_event': 'other', '_id': ObjectId('6481509da76fa647081ee9a6')}
{'hotspot_geohash_value': 'r36', 'latitude': -37.636, 'longitude': 149.33, 'confidence': 75.25, 'surface_temperature_celcius': 48.25, 'date': '2/1/2023', 'datetime': '2023-01-02T14:24:00', 'station': 113, 'fire_event': 'other', '_id': ObjectId('6481509da76fa647081ee9a8')}
{'hotspot_geohash_value': 'r36', 'latitude': -37.608, 'longitude': 149.328, 'confidence': 88.85714285714286, 'surface_t

{'geohash_value': 'r36', 'latitude': -37.63, 'longitude': 149.232, 'air_temperature_celcius': 18, 'relative_humidity': 57.0, 'windspeed_knots': 7.4, 'max_wind_speed': 15.0, 'precipitation': ' 0.00I', 'GHI_w/m2': 145, 'date': '29/1/2023', 'datetime': '2023-01-29T00:00:00', 'station': 111, '_id': ObjectId('6481513da76fa647081ee9da')}
{'geohash_value': 'r36', 'latitude': -37.644, 'longitude': 149.233, 'air_temperature_celcius': 22, 'relative_humidity': 58.0, 'windspeed_knots': 6.9, 'max_wind_speed': 12.0, 'precipitation': ' 0.00I', 'GHI_w/m2': 176, 'date': '30/1/2023', 'datetime': '2023-01-30T00:00:00', 'station': 111, '_id': ObjectId('64815146a76fa647081ee9dc')}
{'geohash_value': 'r36', 'latitude': -37.642, 'longitude': 149.263, 'air_temperature_celcius': 20, 'relative_humidity': 55.8, 'windspeed_knots': 10.5, 'max_wind_speed': 15.9, 'precipitation': ' 0.01G', 'GHI_w/m2': 163, 'date': '31/1/2023', 'datetime': '2023-01-31T00:00:00', 'station': 111, '_id': ObjectId('64815151a76fa647081ee9d