### Task 1: Processing Data Stream
d. **Streaming Application**: Write a streaming application using the Apache Spark
Structured Streaming API which processes data in batches of 10 seconds. The
streaming application will receive streaming data from all three producers and
processes it as follows:
- Group the streams based on the location (i,e, latitude and longitude) and create the data model developed in Part A.
- You can find if two locations are close to each other or not by implementing the geo-hashing algorithm or find a library that does the job for you. The precision number in the algorithm determines the number of characters in the Geohash. Please use precision 3. If the climate data and hotspot data are not close to each other we can ignore the hotspot data and just store the climate data.
- If the streaming application has the data from only one producer (Producer 1), it implies that there was no fire at that time and we can store the climate data into MongoDB straight away.
- If we receive the data from two different satellites AQUA and TERRA for the same location (to determine whether the two locations are the same or not please use geohash with precision 5), then average the ‘surface temperature’ and ‘confidence’ from the two satellites and save it as a fire event.
- If a fire was detected with an air temperature greater than 20 (°C) and a GHI greater than 180 (W/m 2 ), then report the cause of the fire event as ‘natural’. Otherwise, report the cause of the fire event as ‘other’.
Save the file as **Assignment_PartB_Streaming_Application.ipynb**.

In [1]:
# import necessary library
from pymongo import MongoClient
from pyspark.sql import *
import json
from pprint import pprint
from pyspark.sql.functions import col, split, element_at, when
import geohash2 as gh
from datetime import datetime as dt

Set a global variable to store the Kafka topic name

In [2]:
topic_name = 'assignment'

Initialize the spark session with all available processor and the application name.

In [3]:
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('Assignment')
    .getOrCreate()
)

Create a streaming dataframe with options providing the bootstrap server(s) and topic name.

In [4]:
topic_stream_df = (
    spark.readStream.format('kafka')
    .option('kafka.bootstrap.servers', 'localhost:9092')
    .option('subscribe', topic_name)
    .load()
)

Print the schema for the topic streaming dataframe 

In [5]:
topic_stream_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



Cast the "value" column in the streaming dataframe to string datatype and name the column "data"

In [6]:
output_stream_df = (
    topic_stream_df.select(
        topic_stream_df.value.cast("string").alias("data")
    )
)

Print the schema for the output streaming dataframe 

In [7]:
output_stream_df.printSchema()

root
 |-- data: string (nullable = true)



Create 2 new collections in MongoDB to store the climate streaming data and the hotspot streaming data under the assignment_db database which is the same database as Part A but different collection.

In [8]:
client = MongoClient()
db = client.assignment_db
db.climate_streaming.drop()
db.hotspot_streaming.drop()

# Create 2 collections for climate_historic data and hotspot_historic dara
climate_streaming = db.climate_streaming
hotspot_streaming = db.hotspot_streaming

While processing each batch of data, we assume that each batch of climate data and hotspot will be on the same date. The hotspot data date will always follows the climate data date for each batch.

In [9]:
def process_batch_data(batch_df, batch_id):
    raw_data = batch_df.collect()
    clean_data = []
    climate_data = []
    hotspot_data = []
    hotspots_matches = []
    
    print("#################################################################")
    print("Total data in this batch: " + str(len(raw_data)))
    
    for raw in raw_data:
        
        data = raw.asDict() # Get the data like a Dictionary object
        data = data["data"] # Get only the data key-value pair
        data = json.loads(data)
        
        # seperate the climate data and hotspot data
        if data["producer"] == "climate_producer":
            climate_data = data
        elif data["producer"] == "aqua_producer" or data["producer"] == "terra_producer":
            hotspot_data.append(data)
            
#     print("Climate Data")
#     pprint(climate_data)
#     print("Hotspot Data in this batch")
#     pprint(hotspot_data)
    
    hotspot_locations = []
    
    # if there's a climate data then continue
    if climate_data:
        
        # preprocess the climate data to fit the data model in Part A, since there's no station in the 
        # climate_streaming.csv, therefore, I have created a station key-value pair with a default value of 948700
        del climate_data['producer']
        climate_data['station'] = 948700 
        climate_data["date"] = dt.strptime(climate_data["date"], "%d/%m/%Y")

        
        # get the geohash encoding of the climate data
        climate_location = gh.encode(climate_data["latitude"], climate_data["longitude"], 3)
          
        # loop through each hotspot data in the batch
        for hotspot in hotspot_data:
            
            # get the geohash encoding of the hotspot data
            hotspot_location = gh.encode(hotspot["latitude"], hotspot["longitude"], 3)
            hotspot_locations.append(hotspot_location)
            
            # if the climate geohash encoding is the same as the hotspot geohash encoding, 
            # then it means that the climate data and hotspot data is close to each other
            if climate_location == hotspot_location:
                hotspots_matches.append(hotspot)
          

    
        
        # if there's not hotspot data that is close to the climate data, 
        # then insert only the climate data into the MongoDB database
        if len(hotspots_matches) == 0:
            ## TODO: Write climate_data to MongoDB
            print("There's no hotspot data that is close to the climate data")
            del climate_data['longitude']
            del climate_data['latitude']
            climate_streaming.insert_one(climate_data)
            print("Successfully inserted the climate data into database")
            return
        
        # else, there's at least one hotspot data that is close to the climate data
        else: 
            hotspots_group = {}
            
            # Loop the hotspot datas that are close to the climate data, 
            # and group those hotspot datas that are at the same location
            # The hotspot datas that are at the same location will be store in an array in a dictionary
            # The key will be the geohash encoding of the hotspot location, the value will be an array that
            # store the hotspot datas
            for hotspot in hotspots_matches:
                
                # get the geohash encoding of the hotspot data
                hotspot_location = gh.encode(hotspot["latitude"], hotspot["longitude"], 5)
                try: 
                    hotspots_group[hotspot_location].append(hotspot)
                except KeyError:
                    hotspots_group[hotspot_location] = [hotspot]
            
            # Loop through every groups in the hotspot data dictionary 
            for key in hotspots_group:
                hotspot_arr = hotspots_group[key]
                accum_surface_temp = 0
                accum_confidence = 0
                
                # The latitude will be the first hotspot data's latitude element
                latitude = hotspot_arr[0]["latitude"]
                
                # The longitude will be the first hotspot data's longitude element
                longitude = hotspot_arr[0]["longitude"]
                
                # The date will be the climate data's date
                date = climate_data["date"]
                
                # The time will be the first hotspot data's time element
                datetime = str(climate_data["date"].day) + "/" + str(climate_data["date"].month) + "/" + str(climate_data["date"].year) + ", " +  hotspot_arr[0]["datetime"].split(", ")[1]
                cause_of_fire = ""
                
                # Loop through the array in each hotspot group to calculate the average surface temperature
                # and the average confidence
                for hotspot in hotspot_arr:
                    accum_surface_temp += hotspot["surface_temperature_celcius"]
                    accum_confidence += hotspot["confidence"]
                    
                avg_surface_temp = accum_surface_temp/len(hotspot_arr)
                avg_confidence = accum_confidence/len(hotspot_arr)
                
                # Check for the cause of fire
                if climate_data["air_temperature_celcius"] > 20 and climate_data["GHI_w/m2"] > 180:
                    cause_of_fire = "natural"
                else: 
                    cause_of_fire = "other"
               
                # Create the JSON object to be stored into the MongoDB database
                hotspot_new_doc = { 
                    'confidence': avg_confidence,
                    'date': date,
                    'datetime': dt.strptime(datetime, '%d/%m/%Y, %H:%M:%S'),
                    'latitude': latitude,
                    'longitude': longitude,
                    'surface_temperature_celcius': avg_surface_temp,
                    'cause_of_fire': cause_of_fire
                }
    
                hotspot_streaming.insert_one(hotspot_new_doc)
#                 pprint(hotspot_new_doc)
                print("Succesfully inserted the hotspot data into database")
                
            del climate_data['longitude']
            del climate_data['latitude']
            climate_streaming.insert_one(climate_data)
            print("Succesfully inserted the climate data into database")
            
    # else return             
    else:
        print("There's no climate data in this batch")
        return
            
        
    

In [10]:
db_writer = (
    output_stream_df
    .writeStream
    .outputMode('append')
    .trigger(processingTime='10 seconds') # process data in batches of 10 seconds
    .foreachBatch(process_batch_data) # for each batch of the data call the process_batch_data function above
)

In [11]:
writer = db_writer

In [12]:
try:
    query = writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopped query')
finally:
    query.stop()

#################################################################
Total data in this batch: 0
There's no climate data in this batch
#################################################################
Total data in this batch: 8
Succesfully inserted the hotspot data into database
Succesfully inserted the climate data into database
#################################################################
Total data in this batch: 10
There's no hotspot data that is close to the climate data
Successfully inserted the climate data into database
#################################################################
Total data in this batch: 11
There's no hotspot data that is close to the climate data
Successfully inserted the climate data into database
#################################################################
Total data in this batch: 11
Succesfully inserted the hotspot data into database
Succesfully inserted the hotspot data into database
Succesfully inserted the climate data into database
#######

#################################################################
Total data in this batch: 11
Succesfully inserted the hotspot data into database
Succesfully inserted the hotspot data into database
Succesfully inserted the climate data into database
#################################################################
Total data in this batch: 10
There's no hotspot data that is close to the climate data
Successfully inserted the climate data into database
#################################################################
Total data in this batch: 11
Succesfully inserted the hotspot data into database
Succesfully inserted the climate data into database
#################################################################
Total data in this batch: 10
Succesfully inserted the hotspot data into database
Succesfully inserted the climate data into database
#################################################################
Total data in this batch: 11
There's no hotspot data that is close to the clima

#################################################################
Total data in this batch: 11
There's no hotspot data that is close to the climate data
Successfully inserted the climate data into database
#################################################################
Total data in this batch: 11
Succesfully inserted the hotspot data into database
Succesfully inserted the climate data into database
#################################################################
Total data in this batch: 11
Succesfully inserted the hotspot data into database
Succesfully inserted the hotspot data into database
Succesfully inserted the hotspot data into database
Succesfully inserted the climate data into database
#################################################################
Total data in this batch: 11
Succesfully inserted the hotspot data into database
Succesfully inserted the climate data into database
#################################################################
Total data in this batch: 1

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/student/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/student/.local/lib/python3.8/site-packages/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Interrupted by CTRL-C. Stopped query


Show the example of the climate data in the climate_streaming collection in assignment_db

In [13]:
climate_doc = climate_streaming.find({}).limit(1)
for x in climate_doc: 
    pprint(x)

{'GHI_w/m2': 109,
 '_id': ObjectId('628a6a1d813d13723d9aaa63'),
 'air_temperature_celcius': 12,
 'date': datetime.datetime(2022, 1, 2, 0, 0),
 'max_wind_speed': 15.9,
 'precipitation': {'amount': 0.01, 'flag': 'G'},
 'relative_humidity': 42.7,
 'station': 948700,
 'windspeed_knots': 10.0}


Show the example of the hotspot data in the hotspot_streaming collection in assignment_db

In [14]:
hotspot_doc = hotspot_streaming.find({}).limit(1)
for x in hotspot_doc: 
    pprint(x)

{'_id': ObjectId('628a6a1d813d13723d9aaa62'),
 'cause_of_fire': 'other',
 'confidence': 100.0,
 'date': datetime.datetime(2022, 1, 2, 0, 0),
 'datetime': datetime.datetime(2022, 1, 2, 7, 39, 29),
 'latitude': -36.114,
 'longitude': 142.1377,
 'surface_temperature_celcius': 118.0}
