## Name: KO KO WIN

In [1]:
#import os 
#os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0'
import sys 
import json 
from pyspark.sql import SparkSession 
from pyspark.sql.functions import *
from pyspark.sql.types import StructType 
import pygeohash as gh
from pymongo import MongoClient

In [2]:
topic = ["Producer01", "Producer02", "Producer03"] 
topics = 'Producer01,Producer02,Producer03'

In [3]:
spark =  (
    SparkSession.builder
    .master("local[*]")
    .appName('Streaming Application')
    .getOrCreate()
)


In [4]:
topic_stream_df = (
    spark.readStream.format("kafka")
    .option('kafka.bootstrap.servers', 'localhost:9092')
    .option('subscribe', topics) 
    .load()
)


In [5]:
topic_stream_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
topic_stream_df.select('value')

DataFrame[value: binary]

In [7]:
#schema to be use in json parsing
doc_schema = (
    StructType()
    .add("latitude", 'string')
    .add("longitude", 'string')
    .add("air_temperature_celcius", 'string')
    .add("relative_humidity", 'string')
    .add("windspeed_knots", 'string')
    .add("max_wind_speed", 'string')
    .add("precipitation", 'string')
    .add("GHI_w/m2", 'string')
    .add("precipitation_flag", 'string')
    .add("precipitation_value", 'string')
    .add("time", 'string')
    .add("created_date", 'string')
    .add("producer_information", 'string')
    .add("confidence", 'string')
    .add("surface_temperature_celcius", 'string')
    .add("created_time", 'string')
)

In [8]:
#Parse value in JSON string
(
    topic_stream_df
    .select( col('value').cast("string"))
    .select(from_json('value', doc_schema))
    .printSchema()

)

root
 |-- from_json(value): struct (nullable = true)
 |    |-- latitude: string (nullable = true)
 |    |-- longitude: string (nullable = true)
 |    |-- air_temperature_celcius: string (nullable = true)
 |    |-- relative_humidity: string (nullable = true)
 |    |-- windspeed_knots: string (nullable = true)
 |    |-- max_wind_speed: string (nullable = true)
 |    |-- precipitation: string (nullable = true)
 |    |-- GHI_w/m2: string (nullable = true)
 |    |-- precipitation_flag: string (nullable = true)
 |    |-- precipitation_value: string (nullable = true)
 |    |-- time: string (nullable = true)
 |    |-- created_date: string (nullable = true)
 |    |-- producer_information: string (nullable = true)
 |    |-- confidence: string (nullable = true)
 |    |-- surface_temperature_celcius: string (nullable = true)
 |    |-- created_time: string (nullable = true)



In [16]:
def process_batch(batch_df, batch_id): 
    
    client = MongoClient() 
    db = client.fit3182_assignment_db 
    collections = db.stream 

    climate = {} 
    aqua_arr = []
    terra_arr = []
    hotspot = []
    fire_event = []
    fire_dict = {}
    
    raw_data = batch_df.collect()
    
    for record in raw_data.asDict(): 
        prod_id = record["producer_information"]

        if prod_id == "Event Producer 1 (Climate)": 
            climate = record

        elif prod_id == "Event Producer 2 (Aqua)": 
            aqua_arr.append(record) 

        elif prod_id == "Event Producer 3 (Terra)": 
            terra_arr.append(record) 



    
   
    if climate != 0: 
        #get longitude&latitude of climate 
        climate_long = climate["longitude"]
        climate_lat = climate["lat"] 
        
        #look throguh aqua data, if geohash match add to hotspot array
        for x in aqua_arr: 
            #get longitude&latitude of aqua satellite
            aqua_long = x["longitude"] 
            aqua_lat = x["latitude"] 
            geo_hash_aqua = gh.encode(aqua_long, aqua_lat, precision=3)
            geo_hash_cli = gh.encode(climate_long, climate_lat, precision=3)
            
            if geo_hash_aqua == geo_hash_cli: 
                hotspot.append(x) 
                #do something to date 
                x["date"] = climate["created_date"]

        #look through terra data, if geohash match add to hotspot array
        for e in terra_arr :
            #get long and lat of terra satellite 
            terra_long = e["longitude"] 
            terra_lat = e["latitude"] 
            geo_hash_terra = gh.encode(terra_long, terra_lat, precision=3)
            geo_hash_cli = gh.encode(climate_long, climate_lat, precision=3)
            
            if geo_hash_terra == geo_hash_cli: 
                hotspot.append(e) 
                e["date"] = climate["created_date"] 
        
        #if hotspot data is empty add climate data to MongoDB
        if len(hotspot) == 0: 
            try: 
                collections.insert(climate)
            
            except Exception as e: 
                print(e)
                
        #if the hotspot data is not empty 
        else: 
            climate["hotspots"] = hotspot
            if len(aqua_arr) != 0 and len(terra_arr) != 0:
                
                for aq in aqua_arr: 
                    aqua_long = aq["longitude"]
                    aqua_lat = aq["latitude"]

                    for tr in terra_arr: 
                        terra_long = tr["longitude"]
                        terra_lat = tr["latitude"] 

                        geo_hash_aqua = gh.encode(aqua_long, aqua_lat, precision=5) 
                        geo_hash_terra = gh.encode(terra_long, terra_lat, precision=5)
                        
                        #check if the locations of 2 stellites are the same 
                        if geo_hash_aqua == geo_hash_terra: 

                            #calculate average surface_temp and confidence of 2 satellites 
                            fire_dict["avg_surface_temperature"] = (aq["surface_temperature_celcius"] + 
                                                                    tr["surface_temperature_celcius"]) / 2 
                            fire_dict["avg_confidence"] = (aq["confidence"] + tr["confidence"]) / 2 

                            #check the event of the fire 
                            if climate["air_temperature_celcius"] > 20 and climate["GHI_w/m2"] > 180: 
                                fire_dict["event"] = "natural"

                            else: 
                                fire_dict["event"] = "other"

                            fire_event.append(fire_dict)

            #add the fire data into the climate document 
            climate["fire"] = fire_event 

            try: 
                collections.insert(climate)

            except Exception as e: 
                print(e)
    
    client.close() 

In [17]:
db_writer = (
    topic_stream_df
    .writeStream
    .outputMode('append')
    .foreachBatch(process_batch)
)

In [18]:
db_writer.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f2e1c799190>

In [19]:
console_logger = (
    topic_stream_df
    .writeStream
    .outputMode('append')
    .format('console')
)

In [20]:
client = MongoClient()
db = client.fit3182_assignment_db 
collections = db.stream 

for col in collections.find({}): 
    print(col)