**install geohash packages**

In [1]:
#pip install python-geohash

**import libraries**

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

import json
import geohash
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, element_at, when
from pprint import pprint

**Topic & IP** <br>
create variable to store topic and ip address

In [3]:
topic_name = 'climate'
hostip = "192.168.1.27" # change it to your IP

In [4]:
# data base
client = MongoClient('192.168.1.27', 27017)

In [5]:
# add new or connect database
db = client.fit3182_assignment_db

**Drop the collection**
I will drop the collection first

In [6]:
# drop the collection
db.climate_hotspot.drop()

In [7]:
# add new collection 
climate_hot = db.climate_hotspot

**Spark** <br>
Create Spark 

In [8]:
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('fire event app')
    .getOrCreate()
)

**kafka** <br>
Create a streaming dataframe using the topic and ip address, this kafka will subscribe to the topic "climate"

In [9]:
fire_event_df = (
    spark.readStream.format('kafka')
    .option('kafka.bootstrap.servers', f'{hostip}:9092')
    # subscribe to ths topic name
    .option('subscribe', topic_name)
    .load()
)

**Value** <br>
only collect the value from fire_even_df output, if we were to output the schema we will see other option like key, topic, offset etc, but here, we only need value

In [10]:
output_fire_event_df = fire_event_df # we only value as it is the value produced by producer

In [11]:
print(output_fire_event_df)

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]


**Manipulate the data according to the question** <br>
created a function to manipulate the data according to question before storing into the database

In [14]:
def process_data(data, epoch_id):
    data_batch = data.collect()
    
    # decoding each batch of data back into dictonary type from binary so that it is easier to work with
    json_data = [json.loads(row.value.decode('utf-8')) for row in data_batch]
    
    climate_counter = 0
    posi_climate = 0
    cli_lat = 0
    cli_long = 0
    climate_doc = None
    
    # add in geohash value to climate, terra and aqua streaming data first
    for i in range(len(json_data)):
        if json_data[i]["producer"] == 'climate_streaming':
            climate_counter+=1
            
            # to remeber the position of climate if there exists two climate data in this batch
            # according to brefing video we will use the first climate data if there are 2
            if climate_counter == 1:
                posi_climate = i
                cli_lat = json_data[i]["latitude"]
                cli_long = json_data[i]["longitude"]
                json_data[i]["geohash3"] = geohash.encode(latitude= cli_lat,longitude=cli_long,precision=3)
                climate_doc = json_data[i]
        else:
            # add geohash value into json_data
            json_data[i]["geohash3"] = geohash.encode(latitude= json_data[i]["latitude"],longitude=json_data[i]["longitude"],precision=3)
    
            json_data[i]["geohash5"] = geohash.encode(latitude= json_data[i]["latitude"],longitude=json_data[i]["longitude"],precision=5)
            
            # added this so there is more than 1 matched location i can use this to help to skip the index
            json_data[i]["matched"] = False
    
    if climate_counter >0 :
        matched_posi = []
        for i in range(len(json_data)):
            if json_data[i]["producer"] != 'climate_streaming':
            
                avg_temp = 0
                avg_confi = 0
                
                # find matching hotspot data
                for j in range(i+1,len(json_data)):
                    if json_data[j]["producer"] != 'climate_streaming' :
                        if json_data[i]["geohash5"] == json_data[j]["geohash5"] and json_data[j]["matched"]==False:
                            matched_posi.append(j)
                            avg_temp += json_data[j]["surface_temperature_celcius"]
                            avg_confi += json_data[j]["confidence"]
                            json_data[j]["matched"] = True
                                 
                # there is a match if and only if the value is greater than 0
                if avg_temp > 0:
                    avg_temp = (avg_temp+json_data[i]["surface_temperature_celcius"])/len(matched_posi)
                    json_data[i]["surface_temperature_celcius"] = avg_temp
                if avg_confi > 0:
                    avg_confi = (avg_temp+json_data[i]["confidence"])/len(matched_posi)
                    json_data[i]["confidence"] = avg_confi
                
   
        # if matched == true means i had already been avg into previous i from last for-loop so do not add in using json_data[i]["matched"] != True
        climate_doc["fire_event"] = []
        for i in range(len(json_data)):
            if json_data[i]["producer"] != "climate_streaming":
                if json_data[i]["matched"] != True:
                    # so if geohash of climate is equal to geohash of hotspot data, add into database
                    if json_data[i]["geohash3"] == climate_doc["geohash3"]:
                        
                        # process the date if they are not the same according to FAQ
                        if json_data[i]["newdate"] != climate_doc["newdate"]:
                            json_data[i]["newdate"] = climate_doc["newdate"]
                        
                        climate_doc["fire_event"].append(json_data[i])
            
    
        # add in whether the fire event is cause by natural or other
        if climate_doc["fire_event"] != []:
            if climate_doc["air_temperature_celcius"] > 20 and climate_doc["GHI_w/m2"] > 180:
                climate_doc["cause_fire_event"] = "natural"
            else:
                climate_doc["cause_fire_event"] = "other"
        else:
            climate_doc["cause_fire_event"] = ""
        
        # insert the result into database
        # len much be greater than 0 because some batch might not consists climate data at all
        if len(climate_doc) >0:
            result_ins = climate_hot.insert_one(climate_doc)
      
            

In [15]:

writer = (
    output_fire_event_df
    .writeStream
    .option("checkpointLocation", "./fire_event_sdf_checkpoints")
    .outputMode('append')
    .trigger(processingTime='10 seconds')
    .foreachBatch(process_data)
)

**streaming query**

In [16]:
try:
    query = writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopped query')
except StreamingQueryException as exc:
    print(exc)
finally:
    query.stop()

<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>
<class 'list'>


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Interrupted by CTRL-C. Stopped query


**print out the result from db to have a look**

In [17]:
from pprint import pprint 

client = MongoClient('192.168.1.27', 27017) # 27017 number of ports
db = client.fit3182_assignment_db

result = db.list_collection_names()
print(result)

cursor = db.climate_hotspot.find({})

for doc in cursor:
    pprint(doc)

['climate_hotspot', 'hotspot', 'climate', 'clihot']
{'GHI_w/m2': 122,
 '_id': ObjectId('647f5861092d2423e8aac052'),
 'air_temperature_celcius': 14,
 'cause_fire_event': '',
 'cre_date_time': '14:41:21',
 'fire_event': [],
 'geohash3': 'r36',
 'latitude': -37.602,
 'longitude': 149.311,
 'max_wind_speed': 14.0,
 'newdate': '2022-01-16 00:00:00',
 'precipitation ': ' 0.00G',
 'producer': 'climate_streaming',
 'relative_humidity': 48.5,
 'windspeed_knots': 7.3}
{'GHI_w/m2': 89,
 '_id': ObjectId('647f58aa092d2423e8aac053'),
 'air_temperature_celcius': 10,
 'cause_fire_event': '',
 'cre_date_time': '16:02:46',
 'fire_event': [],
 'geohash3': 'r1k',
 'latitude': -37.293,
 'longitude': 141.245,
 'max_wind_speed': 8.0,
 'newdate': '2021-12-31 00:00:00',
 'precipitation ': ' 0.01G',
 'producer': 'climate_streaming',
 'relative_humidity': 45.7,
 'windspeed_knots': 3.7}
{'GHI_w/m2': 108,
 '_id': ObjectId('647f58b4092d2423e8aac054'),
 'air_temperature_celcius': 12,
 'cause_fire_event': '',
 'cre_d