# FIT3182 Assignment Part B

- Name: Tan Chong Ern
- Student ID: 31435661
- Email: ctan0119@student.monash.edu
---

## Task 1. Processing Data Stream

### *d. Streaming Application* 
*Write a streaming application using the Apache Spark Structured Streaming API which processes data in batches of 10 seconds. The streaming application will receive streaming data from all three producers and processes it.*

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StringType, FloatType
from pprint import pprint
import pymongo
import datetime as dt
import pygeohash as pgh


# initialising the Spark Session
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('Streaming Application')
    .getOrCreate()
)

# obtaining the streaming dataframe in Spark by subscribing to the specific Kafka topics
topics_df = (
    spark.readStream.format('kafka') # specify source
    .option('kafka.bootstrap.servers', 'localhost:9092')
    .option('subscribe', 'climate_producer, aqua_producer, terra_producer')
    .load() # creates streaming dataframe
)

topics_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [2]:
# configuring the schema to allocate the stream's data into their respective attributes
schema = (
    StructType()
    .add('latitude', FloatType())
    .add('longitude', FloatType())
    .add('air_temperature_celcius', FloatType())
    .add('relative_humidity', FloatType())
    .add('windspeed_knots', FloatType())
    .add('max_wind_speed', FloatType())
    .add('precipitation', FloatType())
    .add('precipitation_flag', StringType())
    .add('GHI_w/m2', FloatType())
    .add('date', StringType())
    .add('datetime', StringType())
    .add('confidence', FloatType())
    .add('surface_temperature_celcius', FloatType()))

# selecting only the necessary columns from the stream for the processing
output_stream_df = (topics_df
       .select(topics_df.value.cast('string').alias('temp'),
               topics_df.key.cast('string').alias('producer'))
       .select(from_json('temp', schema).alias('temp'), 'producer')
       .select('temp.*', 'producer')
      )

output_stream_df.printSchema()

root
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- air_temperature_celcius: float (nullable = true)
 |-- relative_humidity: float (nullable = true)
 |-- windspeed_knots: float (nullable = true)
 |-- max_wind_speed: float (nullable = true)
 |-- precipitation: float (nullable = true)
 |-- precipitation_flag: string (nullable = true)
 |-- GHI_w/m2: float (nullable = true)
 |-- date: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- confidence: float (nullable = true)
 |-- surface_temperature_celcius: float (nullable = true)
 |-- producer: string (nullable = true)



In [3]:
def eval_geohash(doc1, doc2, precision): # determines whether two document's coordinates are close
    return pgh.encode(doc1['latitude'], doc1['longitude'], precision) == pgh.encode(doc2['latitude'], doc2['longitude'], precision)

def filter_hotspots(df):
    climate, hotspots = None, []
    
    # finds the climate data from the dataframe
    for i in range(len(df)):
        if df[i]['producer'] == 'climate_producer':
            climate = {k: v for k, v in df[i].asDict().items() if v} # filters null values
            climate['date'] = dt.datetime.strptime(climate['date'], '%Y-%m-%d %H:%M:%S')
            climate['hotspots'] = []
            climate['station'] = 420420
            df.pop(i)
            break
    
    if not climate:
        return climate, hotspots
    
    # filters through the hotspots' data based on their proximity to the climate data with geohash precision 3        
    for row in df:
        if not eval_geohash(row, climate, 3):
            continue # ignore hotspot data that is not close to the climate data
        row = {k: v for k, v in row.asDict().items() if v}
        time = dt.datetime.strptime(row.pop('datetime'), '%H:%M:%S')
        row['datetime'] = climate['date'] + dt.timedelta(hours = time.hour, minutes = time.minute)
        hotspots.append(row)
    
    return climate, hotspots
        
def agg_climate_hotspots(climate, hotspots):
    if not climate: return None
    cause = 'natural' if climate['air_temperature_celcius'] > 20 and climate['GHI_w/m2'] > 180 else 'other'
    table = {}

    # hash each of the (filtered) hotspots based on their geohash
    for doc in hotspots:
        ghash = pgh.encode(doc['latitude'], doc['longitude'], 5)
        if ghash not in table:
            table[ghash] = [doc]
        else:
            table[ghash].append(doc)
    
    # aggregate hotspots with geohash precision 5 and add them to the climate document
    for arr in table.values():
        hotspot = arr[0]
        if len(arr) > 1:
            hotspot['surface_temperature_celcius'] = sum([x['surface_temperature_celcius'] for x in arr]) / len(arr)
            hotspot['confidence'] = sum([x['confidence'] for x in arr]) / len(arr)
        hotspot['cause'] = cause
        climate['hotspots'].append(hotspot)
        
    return climate


def process(df, epoch_id):
    # configuring the MongoDB client and collection
    client = pymongo.MongoClient()
    db = client.fit_3182_assignment_db
    collection = db.streaming
    
    df = df.collect()
    climate, hotspots = filter_hotspots(df)
    if climate:
        climate = agg_climate_hotspots(climate, hotspots)
        collection.insert_one(climate)
    client.close()

In [4]:
# connecting to the MongoDB database collection so we can see the stream results later on
client = pymongo.MongoClient()
db = client.fit_3182_assignment_db
collection = db.streaming

collection.drop() # comment this line if you want to append the new data to existing collection data

In [5]:
writer = (
    output_stream_df
    .writeStream
    .outputMode('append')
    .foreachBatch(process)
    .trigger(processingTime='10 seconds')
)

try:
    query = writer.start()
    query.awaitTermination()
    
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopped query')
except StreamingQueryException as exc:
    print(exc)
finally:
    query.stop()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/student/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/student/.local/lib/python3.8/site-packages/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Interrupted by CTRL-C. Stopped query


In [6]:
# run this to look at the streaming output in the MongoDB collection
for x in collection.find():
    pprint(x)

{'GHI_w/m2': 198.0,
 '_id': ObjectId('628950ac3c1c13b95179263a'),
 'air_temperature_celcius': 23.0,
 'date': datetime.datetime(2022, 1, 1, 0, 0),
 'hotspots': [],
 'latitude': -37.60499954223633,
 'longitude': 149.3260040283203,
 'max_wind_speed': 19.0,
 'precipitation_flag': 'I',
 'producer': 'climate_producer',
 'relative_humidity': 49.599998474121094,
 'station': 420420,
 'windspeed_knots': 10.0}
{'GHI_w/m2': 150.0,
 '_id': ObjectId('628950b53c1c13b95179263c'),
 'air_temperature_celcius': 18.0,
 'date': datetime.datetime(2022, 1, 2, 0, 0),
 'hotspots': [{'cause': 'other',
               'confidence': 67.0,
               'datetime': datetime.datetime(2022, 1, 2, 14, 24),
               'latitude': -36.17509841918945,
               'longitude': 142.78309631347656,
               'producer': 'aqua_producer',
               'surface_temperature_celcius': 43.0}],
 'latitude': -35.77899932861328,
 'longitude': 143.10569763183594,
 'max_wind_speed': 13.0,
 'precipitation_flag': 'I',
 'pr

---

Here is an alternate way of processing the streaming data which does not use the StructType schema to convert the data into a streaming dataframe, but instead casts the streaming data into strings and later loads them into dictionaries during processing before insertion into the MongoDB collection. One notable advantage of this approach is that it avoids null values in the dataframe, requiring less preprocessiing, and is hence more efficient overall. 

```py
# selecting only the necessary columns from the stream for the processing
output_stream_df = topics_df.select(
    topics_df.key.cast('string').alias('producer'), 
    topics_df.value.cast('string').alias('data')
)
output_stream_df.printSchema()


def eval_geohash(doc1, doc2, precision): # determines whether two document's coordinates are close
    return pgh.encode(doc1['latitude'], doc1['longitude'], precision) == pgh.encode(doc2['latitude'], doc2['longitude'], precision)

def filter_hotspots(df):
    climate, hotspots = None, []
    
    # finds the climate data from the dataframe
    for i in range(len(df)):
        if df[i]['producer'] == 'climate_producer':
            climate = loads(df[i]['data'])
            climate['date'] = dt.datetime.strptime(climate['date'], '%Y-%m-%d %H:%M:%S')
            climate['hotspots'] = []
            climate['station'] = 420420
            df.pop(i)
            break
    
    if not climate:
        return climate, hotspots
        
    # filters through the hotspots' data based on their proximity to the climate data with geohash precision 3
    for row in df:
        hotspot = loads(row['data'])
        if not eval_geohash(hotspot, climate, 3):
            continue # ignore hotspot data that is not close to the climate data
        time = dt.datetime.strptime(hotspot['datetime'], '%H:%M:%S')
        hotspot['datetime'] = climate['date'] + dt.timedelta(hours = time.hour, minutes = time.minute)
        hotspots.append(hotspot)
    
    return climate, hotspots
        
def agg_climate_hotspots(climate, hotspots):
    if not climate: return None
    cause = 'natural' if climate['air_temperature_celcius'] > 20 and climate['GHI_w/m2'] > 180 else 'other'
    table = {}

    # hash each of the (filtered) hotspots based on their geohash
    for doc in hotspots:
        ghash = pgh.encode(doc['latitude'], doc['longitude'], 5)
        if ghash not in table:
            table[ghash] = [doc]
        else:
            table[ghash].append(doc)
    
    # aggregate hotspots with geohash precision 5 and add them to the climate document
    for arr in table.values():
        hotspot = arr[0]
        if len(arr) > 1:
            hotspot['surface_temperature_celcius'] = sum([x['surface_temperature_celcius'] for x in arr]) / len(arr)
            hotspot['confidence'] = sum([x['confidence'] for x in arr]) / len(arr)
        hotspot['cause'] = cause
        climate['hotspots'].append(hotspot)
        
    return climate

def process(df, epoch_id):
    # configuring the MongoDB client and collection
    client = pymongo.MongoClient()
    db = client.fit_3182_assignment_db
    collection = db.streaming
    
    # process the data and add it to the database if a climate entry is present
    df = df.collect()
    climate, hotspots = filter_hotspots(df)
    if climate:
        climate = agg_climate_hotspots(climate, hotspots)
        collection.insert_one(climate)
    client.close()
```