In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

# home
#hostip = "192.168.1.101" 


# library
hostip = '10.192.0.143'

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

import pymongo
from pymongo import MongoClient


In [2]:
# import statements
from time import sleep
from json import dumps
from kafka3 import KafkaProducer
import random
import pandas as pd
import datetime as dt
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, element_at, when
import pygeohash as pgh
import json
from pprint import pprint
import datetime

#Initialize our spark session with 
#threads = #logicalCPU and the given application name.

spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('Spark Streaming from Kafka into MongoDB')
    .getOrCreate()
)

In [3]:
# Create a streaming dataframe with options 
# providing the bootstrap server(s) and topic name.

topic_stream_df1 = (
    spark.readStream.format('kafka')
    .option('kafka.bootstrap.servers', f'{hostip}:9092')
    .option('subscribe', 'PartB1')
    .option("failOnDataLoss", "false")
    .load()
)

union_stream = topic_stream_df1

stream = union_stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


In [4]:


def process_data(batch_df, batch_id):
    
    # print(f"Processing batch: {batch_id}")
    collected_data = batch_df.collect()
    
    # this climate will go to the inside to database
    climate = {}
    
    # initial list for calculating stage
    initial_AQUA = []
    initial_TERA = []

    try:
        if len(collected_data) > 0:
            
            
            for raw in collected_data:
                
                # make json format
                data = raw.asDict()
                data = data['value']
                data = json.loads(data)
                
                
                # data pre processing based on producer
                if data['producer'] == 'producer1':
                    data['latitude'] = float(data['latitude'])
                    data['longitude'] = float(data['longitude'])
                    data['air_temperature_celcius'] = float(data['air_temperature_celcius'])
                    data['relative_humidity'] = float(data['relative_humidity'])
                    data['windspeed_knots'] = float(data['windspeed_knots'])
                    data['max_wind_speed'] = float(data['max_wind_speed'])
                    data['GHI_w/m2'] = float(data['GHI_w/m2'])                                
                    created_time_obj = datetime.datetime.strptime(data['created_time'], "%Y-%m-%dT%H:%M:%S")
                    formatted_time = created_time_obj.strftime("%Y-%m-%d %H:%M:%S")
                    data['created_time'] = formatted_time
                    
                    climate = data
                    
                elif data['producer'] == 'producer2':
                    data['latitude'] = float(data['latitude'])
                    data['longitude'] = float(data['longitude'])
                    data['confidence'] = float(data['confidence'])
                    data['surface_temperature_celcius'] = float(data['surface_temperature_celcius'])
                    created_time_obj = datetime.datetime.strptime(data['created_time'], "%Y-%m-%dT%H:%M:%S")
                    formatted_time = created_time_obj.strftime("%Y-%m-%d %H:%M:%S")
                    data['created_time'] = formatted_time
                    initial_AQUA.append(data)
                    
                else:
                    data['latitude'] = float(data['latitude'])
                    data['longitude'] = float(data['longitude'])
                    data['confidence'] = float(data['confidence'])
                    data['surface_temperature_celcius'] = float(data['surface_temperature_celcius'])
                    created_time_obj = datetime.datetime.strptime(data['created_time'], "%Y-%m-%dT%H:%M:%S")
                    formatted_time = created_time_obj.strftime("%Y-%m-%d %H:%M:%S")
                    data['created_time'] = formatted_time
                    initial_TERA.append(data)
                
                # second stage : geohash (compare climate and initial datas)       
                temp_hotspot = []   

                # Process climate data
                if climate != {} :

                    cli_long = climate['longitude']
                    cli_lat = climate['latitude']
            
                    # pygeohash format
                    cli_encode_info = pgh.encode(latitude=cli_lat, longitude=cli_long, precision=3)

                    
                    # now, need to think about the AQUA initial and TERRA initial
                    
                    for A_record in initial_AQUA:
                        AQUA_encode_info = pgh.encode(latitude = A_record['latitude'], longitude = A_record['longitude'], precision = 3 )
                        
                        if AQUA_encode_info == cli_encode_info:
                            A_record['sate'] = 'AQUA'
                            temp_hotspot.append(A_record)
                            
                    for T_record in initial_TERA:
                        TERA_encode_info = pgh.encode(latitude = T_record['latitude'], longitude = T_record['longitude'], precision = 3 )
                        
                        if TERA_encode_info == cli_encode_info:
                            T_record['sate'] = 'TERRA'
                            temp_hotspot.append(T_record)
                            
                    # Merge ‘surface temperature’ and ‘confidence’
                    
                    if len(temp_hotspot) > 1:
                        
                        # checking enter here
                        # print("alright, need to merge ~ ")
                        
                        merged_hotspot = list()
                        
                        merge_AQUA = list()
                        merge_TERA = list()
                        
                        for hotspot_info in temp_hotspot:
                            if hotspot_info['sate'] == 'AQUA':
                                merge_AQUA.append(hotspot_info)
                            else:
                                merge_TERA.append(hotspot_info)
                                
                        if len(merge_AQUA) > 0 and len(merge_TERA) > 0:
                            
                            for i in merge_AQUA:
                                compare5_AQUA_lat = i['latitude']
                                compare5_AQUA_long = i['longitude']
                                
                                for j in merge_TERA:
                                    encode5_AQUA = pgh.encode(latitude = compare5_AQUA_lat, longitude = compare5_AQUA_long, precision = 5)
                                    encode5_TERA = pgh.encode(latitude = j['latitude'], longitude = j['longitude'], precision = 5)
                                    
                                    # final check
                                    if encode5_AQUA == encode5_TERA:
                                        legit = dict()
                                        
                                        legit['avg_temperature'] = (i['surface_temperature_celcius'] + j['surface_temperature_celcius']) / 2
                                        legit['avg_confidence'] = (i['confidence'] + j['confidence']) / 2
                                        legit['latitude']  = compare5_AQUA_lat
                                        legit['longitude'] = compare5_AQUA_long
                                        
                                        merged_hotspot.append(legit)
                    

                    # checking is the event is natural or event
                    if len(temp_hotspot) > 0:
                        air_temp = float(climate['air_temperature_celcius'])
                        GHI = float(climate['GHI_w/m2'])
                        event = 'other'

                        if air_temp > 20 and GHI > 180:
                            event = 'natural'

                        for h in temp_hotspot:
                            h['event'] = event
                    
                    climate['hotspot'] = temp_hotspot
            
                    try:
                        db.hotspot.insert_one(climate)
                        
                    except pymongo.errors.DuplicateKeyError:
                        # This key has already in the database, so shouldn't use it.
                        pass
                    
                    except Exception as ex:
                        
                        print("An error occurred:", ex)


        
    except Exception as ex:
        print(ex)

In [5]:
db_writer = (
    stream
    .writeStream
    .foreachBatch(process_data)
    .outputMode('append')
    .trigger(processingTime='10 seconds')
    .start()
)


In [6]:
# home
# client = MongoClient('mongodb://192.168.1.101:27017/')

# library
client = MongoClient('mongodb://10.192.0.143:27017/')


# list of database before we delete
result = client.list_database_names()
#print(result)

# make database
db = client.fit3182_assignment1_db

# add new collection into new database
#db.hotspot.drop()


In [7]:
import threading

def stop_db_writer():
    print('Stopping query after timeout.')
    db_writer.stop()

try:
    # it will automatically stop with  stop_db_writer fuction, 5 min
    timer = threading.Timer(600, stop_db_writer)
    timer.start()
    
    # wait till it finish
    db_writer.awaitTermination()
    
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopping query.')
    db_writer.stop()

Stopping query after timeout.


There will be an error if the key (_id) has been collusion, therefore, i used try and except( Duplicate Key Error) 

In [8]:
cursor = db.hotspot.find({})
for document in cursor:
    print(document)

{'_id': ObjectId('6654124cc91afa9b100d87b5'), 'latitude': -38.038, 'longitude': 142.986, 'air_temperature_celcius': 15.0, 'relative_humidity': 50.7, 'windspeed_knots': 9.2, 'max_wind_speed': 13.0, 'precipitation ': ' 0.02G', 'GHI_w/m2': 128.0, 'created_time': '2024-03-09 00:00:00', 'producer': 'producer1', 'hotspot': [{'latitude': -38.0795, 'longitude': 143.2449, 'confidence': 100.0, 'surface_temperature_celcius': 91.0, 'created_time': '2024-01-01 23:59:22', 'producer': 'producer3', 'sate': 'TERRA', 'event': 'other'}]}
{'_id': ObjectId('66541256c91afa9b100d87b6'), 'latitude': -36.398, 'longitude': 145.286, 'air_temperature_celcius': 11.0, 'relative_humidity': 45.1, 'windspeed_knots': 11.5, 'max_wind_speed': 18.1, 'precipitation ': ' 0.20G', 'GHI_w/m2': 98.0, 'created_time': '2024-03-10 00:00:00', 'producer': 'producer1', 'hotspot': []}
{'_id': ObjectId('66541260c91afa9b100d87b7'), 'latitude': -37.886, 'longitude': 147.207, 'air_temperature_celcius': 19.0, 'relative_humidity': 54.3, 'wi

The below function which is "process_data2" is the fucntion for in case if merging is not working.