# FIT3182 Assignment 3 Part B

- Name: Ong Di Sheng
- Student ID: 31109667
- Email: dong0009@student.monash.edu

## Task 1: Processing Data Stream 

### ***d. Streaming Application*** ###

*Write a streaming application using the Apache Spark Structured Streaming API which processes data in batches of 10 seconds. The streaming application will receive streaming data from all three producers and processes it.*

In [1]:
# import libraries 
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, element_at, when
import datetime as dt
from json import loads
from pprint import pprint
!pip install pygeohash
import pygeohash as pgh



In [2]:
# set global variables 
CLIMATE_TOPIC = 'climate'
HOTSPOT_AQUA_TOPIC = 'aqua'
HOTSPOT_TERRA_TOPIC = 'terra'

# replace with your own IP address
hostip = '192.168.1.110' 

In [3]:
# initialize spark session
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('Spark Streaming Application')
    .getOrCreate()
)

In [4]:
# create streaming dataframe 
# subscribe to multiple topics 
topic_name = ', '.join([CLIMATE_TOPIC, HOTSPOT_AQUA_TOPIC, HOTSPOT_TERRA_TOPIC])
topic_stream_df = (
    spark.readStream.format('kafka')
    .option('kafka.bootstrap.servers', f'{hostip}:9092')
    .option('subscribe', topic_name)
    .load()
)

In [5]:
# observe columns of created streaming dataframe 
topic_stream_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
# generate output stream
# cast `key` and `value` columns as string
output_stream_df = (
    topic_stream_df
    .select(
        topic_stream_df.key.cast('string').alias('producer'),
        topic_stream_df.value.cast('string').alias('data')
    )
)

# show schema of output stream
output_stream_df.printSchema()

root
 |-- producer: string (nullable = true)
 |-- data: string (nullable = true)



In [7]:
# create connection with mongodb
# replace with your own IP address and port number
client = MongoClient(hostip, 27017)

# access database 
db = client.fit3182_assignment_db

# access collection
collection = db.streaming

# drop processed stream data from previous run
collection.drop()

In [8]:
def compute_geohash(lat, long, precision):
    return pgh.encode(lat, long, precision)

def process_batch(df, epoch_id):    
    # collect raw data from all streams 
    raw_data = df.collect()

    # preprocessing
    # convert to dictionary
    data = []
    for i in range(len(raw_data)):
        current = raw_data[i].asDict()
        current['data'] = loads(current['data'])
        data.append(current)
    
    # search for climate data in current batch
    climate = None
    for i in range(len(data)):
        if data[i]['producer'] == 'climate':
            climate = data[i]['data']
            climate['date'] = dt.datetime.strptime(climate['date'], '%d-%m-%Y')
            climate['hotspots'] = []
            climate['station'] = 123456
            break
    
    # no climate data in current batch
    if climate is None:
        return
    
    # compute geohash for climate data (precision 3)
    climate_geohash = compute_geohash(climate['latitude'], climate['longitude'], 3)
    
    # filter hotspot data with same location as climate
    hotspots = []
    for i in range(len(data)):
        if data[i]['producer'] != 'climate':
            hotspot = data[i]['data']
            
            # compute geohash for current hotspot data (precision 3)
            hotspot_geohash = compute_geohash(hotspot['latitude'], hotspot['longitude'], 3)
            
            # same location (hotspot and climate data)  
            if hotspot_geohash == climate_geohash:
                time = dt.datetime.strptime(hotspot['time'], '%H:%M:%S')
                hotspot['date'] = climate['date']
                hotspot['datetime'] = climate['date'] + dt.timedelta(hours=time.hour, minutes=time.minute, seconds=time.second)
                del hotspot['time']
                hotspots.append(hotspot)
    
    # no hotspot data with same location as climate
    # insert climate data only into mongodb
    if len(hotspots) == 0:
        collection.insert_one(climate)
        return
    
    # at least one hotspot data with same location as climate
    # determine cause of fire event
    cause = None
    if climate['air_temperature_celcius'] > 20 and climate['GHI_w/m2'] > 180:
        cause = 'natural'
    else:
        cause = 'other'
        
    # compute geohash for every hotspot data (precision 5)
    # group hotspot data with similar location 
    geohash_dict = {}
    for hotspot in hotspots:
        geohash = compute_geohash(hotspot['latitude'], hotspot['longitude'], 5)
        if geohash not in geohash_dict:
            geohash_dict[geohash] = [hotspot]
        else:
            geohash_dict[geohash].append(hotspot)
    
    # compute average of confidence and surface temperature for each group
    # insert hotspot data into climate document
    for group in geohash_dict.values():
        hotspot = group[0]
        if len(group) > 1:
            hotspot['confidence'] = sum([x['confidence'] for x in group]) / len(group)
            hotspot['surface_temperature_celcius'] = sum([x['surface_temperature_celcius'] for x in group]) / len(group)
        hotspot['cause'] = cause
        climate['hotspots'].append(hotspot)
    
    # insert climate document embedded with hotspot data into mongodb
    collection.insert_one(climate)


In [9]:
# process data in batches of 10 seconds
# define stream writer for mongodb sink
writer = (
    output_stream_df
    .writeStream
    .outputMode('append')
    .trigger(processingTime='10 seconds')
    .foreachBatch(process_batch)  
)

In [10]:
# start streaming query
try:
    query = writer.start()
    query.awaitTermination()
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopped query')
except StreamingQueryException as exc:
    print(exc)
finally:
    query.stop()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Interrupted by CTRL-C. Stopped query


In [11]:
# observe newly inserted streaming output in mongodb collection
for x in collection.find():
    pprint(x)

{'GHI_w/m2': 162,
 '_id': ObjectId('648140b680ea83430dabd4c9'),
 'air_temperature_celcius': 19,
 'date': datetime.datetime(2023, 1, 5, 0, 0),
 'hotspots': [],
 'latitude': -36.9194,
 'longitude': 143.6131,
 'max_wind_speed': 11.1,
 'precipitation': 0.0,
 'precipitation_flag': 'I',
 'relative_humidity': 50.3,
 'station': 123456,
 'windspeed_knots': 7.7}
{'GHI_w/m2': 89,
 '_id': ObjectId('648140c080ea83430dabd4ca'),
 'air_temperature_celcius': 10,
 'date': datetime.datetime(2023, 1, 6, 0, 0),
 'hotspots': [],
 'latitude': -37.227,
 'longitude': 141.146,
 'max_wind_speed': 8.9,
 'precipitation': 0.0,
 'precipitation_flag': 'G',
 'relative_humidity': 44.9,
 'station': 123456,
 'windspeed_knots': 5.5}
{'GHI_w/m2': 129,
 '_id': ObjectId('648140ca80ea83430dabd4cb'),
 'air_temperature_celcius': 15,
 'date': datetime.datetime(2023, 1, 7, 0, 0),
 'hotspots': [{'cause': 'other',
               'confidence': 97,
               'date': datetime.datetime(2023, 1, 7, 0, 0),
               'datetime':