### Processing the two datastreams to conform the data model built in Task B

In [1]:
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import pymongo
from pymongo import MongoClient
from bson.objectid import ObjectId
    
    
client = MongoClient()
db = client.fit5148_db
db.drop_collection('fire')
db.drop_collection('climate')
    
    
def climate_operation(record):
    #we insert the climate data whenever it arrives
    #according to the schema of climate collection
    word = record
    fields = word.split(',')
    
    client = MongoClient()
    db = client.fit5148_db
    collection = db.climate
    collection.insert_one({
         "Station": int(fields[1]),
         "Date": None ,
         "Air Temperature(Celcius)": int(fields[2]),  
         "Relative Humidity": float(fields[3]),  
         "WindSpeed  (knots)": float(fields[4]),   
         "Max Wind Speed": float(fields[5]),  
         "MAX": fields[6],  
         "MIN": fields[7],  
         "Precipitation": fields[8]
        })
    client.close()
    
def fire_operation(record):
    # we first insert the fire data according to
    #our schema in task B and then
    # add the child reference with the climate data arrived in the same
    # time batch
    word = record
    fields = word.split(',')
    client = MongoClient()
    db = client.fit5148_db
    climate = db.climate
    fire = db.fire
    fire.insert_one({
         "_id" : fields[-1], 
         "Latitude": float(fields[1]),
         "Longitude": float(fields[2]),
         "Surface Temperature (kelvin)": float(fields[3]),  
         "Datetime":None,  
         "Power": float(fields[4]),   
         "Confidence": int(fields[5]),  
         "Date": None,  
         "Surface Temperature (Celcius)":  int(fields[6]),  
        })
    for document in climate.find({}).sort('_id',-1).limit(1):
        Id = document['_id']
        climate.update_one({'_id':Id},{'$push':{"fires":fields[-1]}})
    client.close()
       
    
    
    
# We add this line to avoid an error : "Cannot run multiple SparkContexts at once". If there is an existing spark context, we will reuse it instead of creating a new context.
sc = SparkContext.getOrCreate()

# If there is no existing spark context, we now create a new context
if (sc is None):
    sc = SparkContext()
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")


host = 'localhost'
climate = ssc.socketTextStream(host,9999)
fire = ssc.socketTextStream(host,8080)

# Split each line into records
words1 = fire.flatMap(lambda line: line.split("\n"))
words2 = climate.flatMap(lambda line: line.split("\n"))

# Output the result                            
words1.foreachRDD(lambda rdd: rdd.foreach(fire_operation))
words2.foreachRDD(lambda rdd: rdd.foreach(climate_operation))
ssc.start()
try:
    ssc.awaitTermination(timeout=60)
except KeyboardInterrupt:
    ssc.stop()
    sc.stop()

ssc.stop()
sc.stop()


### Test the data has been loaded successfully into the database according to our data model in Task B

In [2]:
client = MongoClient()
db = client.fit5148_db
climate = db.climate
fire = db.fire

### Test the results  by using revised version query of A6 after setting up data model in B

In [42]:
from pprint import pprint
results = climate.aggregate([
{
    "$unwind":"$fires"
},
{"$lookup":{
    "from":"fire",
    "localField":"fires",
    "foreignField":"_id",
    "as":"Fire"}
},
{
 "$replaceRoot": { "newRoot": { "$mergeObjects": [ { "$arrayElemAt": [ "$Fire", 0 ] }, "$$ROOT" ] } }
},
{ 
    "$project": { "Fire": 0,"_id":0,"fires":0}
},

{
    "$sort":{"Surface Temperature (Celcius)":-1}
},
{
    "$limit":10
}
])
for document in results: 
    pprint(document)

{'Air Temperature(Celcius)': 15,
 'Confidence': 95,
 'Date': None,
 'Datetime': None,
 'Latitude': -37.613,
 'Longitude': 149.305,
 'MAX': '',
 'MIN': '58.3',
 'Max Wind Speed': 13.0,
 'Power': 160.9,
 'Precipitation': '0.02G',
 'Relative Humidity': 50.7,
 'Station': 948700,
 'Surface Temperature (Celcius)': 75,
 'Surface Temperature (kelvin)': 348.2,
 'WindSpeed  (knots)': 9.2}
{'Air Temperature(Celcius)': 16,
 'Confidence': 93,
 'Date': None,
 'Datetime': None,
 'Latitude': -38.057,
 'Longitude': 144.211,
 'MAX': '68.4*',
 'MIN': '54.3*',
 'Max Wind Speed': 15.0,
 'Power': 48.8,
 'Precipitation': '0.00G',
 'Relative Humidity': 53.6,
 'Station': 948700,
 'Surface Temperature (Celcius)': 73,
 'Surface Temperature (kelvin)': 346.3,
 'WindSpeed  (knots)': 8.1}
{'Air Temperature(Celcius)': 15,
 'Confidence': 93,
 'Date': None,
 'Datetime': None,
 'Latitude': -37.875,
 'Longitude': 142.51,
 'MAX': '',
 'MIN': '58.3',
 'Max Wind Speed': 13.0,
 'Power': 46.0,
 'Precipitation': '0.02G',
 'Rel

In [3]:
climate.find_one()

{'_id': ObjectId('5b0168c87c376e3793901e3c'),
 'Station': 948700,
 'Date': None,
 'Air Temperature(Celcius)': 19,
 'Relative Humidity': 56.8,
 'WindSpeed  (knots)': 7.9,
 'Max Wind Speed': 11.1,
 'MAX': '72.0*',
 'MIN': '61.9*',
 'Precipitation': '0.00I',
 'fires': ['sk1xDm4OZy3L',
  'LxkG5b2u5SRy',
  '7bNi1dqgdTxy',
  'dGWw4xkXLamh',
  'QVJm2GBz7huz']}

In [39]:
fire.find_one()

{'_id': 'D65jrlXbhPUt',
 'Latitude': -37.966,
 'Longitude': 145.051,
 'Surface Temperature (kelvin)': 341.8,
 'Datetime': None,
 'Power': 26.7,
 'Confidence': 78,
 'Date': None,
 'Surface Temperature (Celcius)': 68}

In [None]:
client.drop_database('fit5148_db')