# FIT5148 - Distributed Databases and Big Data

# Assignment 2 - Task C2 Solution Workbook


**Student Details**
- Name: Pushan Mukerjee
- Student ID: 29052971


Load relevant libraries

In [1]:
#Libraries
from pyspark import SparkContext # spark
from pyspark.streaming import StreamingContext # spark streaming
import sys
import pymongo #mongo db driver
from pymongo import MongoClient #mongodb client
from pprint import pprint #for printing streamed output
import json #for constructing json documents to load into mongo db
import datetime as dt #for manipulating  
import pandas as pd #for constructing dataframes

**Optional step**

Clean out and prepare fresh MongoDb **as2TaskB** database


In [2]:
client = MongoClient () #defining the Mongodb client.
result = client.drop_database('as2TaskB') #ensure that the as2TaskB database 
                                          #doesn't already exist

db = client.as2TaskB #defining the db

fireCollection = db.fire #define a new collection for fire data. 
                         #This will store fire data plus the 
                         #the embedded climate data associated 
                         #with each fire

climateCollection = db.climate #define a new collection 
                               #for the climate data

result = fireCollection.drop() #Ensure that the fire collection 
                               #does not already exist in Mongo db

result = climateCollection.drop() #Ensure that the fire collection 
                                  #does not already exist in Mongo db

**Send Partition** function for loading climate and fire JSON documents into **as2TaskB** MongoDB database

Processes the following RDD: (Datestamp, (climateDataRecord, fireDataRecord))

**Note:** FireDataRecord consists of 5 fire records bundled into one list, joined with the climate record on **event** datestamp ingested in Task C1

**Sample RDD**

('2018-10-08 12:52:57', (["'cdata'", '948700', '26', '60.8', '6.9', '12.0', "' 0.00I'", "'2018-10-08 12:52:57'"], 
                         ["'fdata'", '-38.038000000000004', '142.986', '313.4', '32.8', '65', '40', "'2018-10-08 12:52:57']
                         ['fdata'", '-37.95', '142.366', '343.2', '58.7', '92', '70', "'2018-10-08 12:52:57']
                         ['fdata'", '-38.231', '147.172', '333.3', '17.9', '83', '60', "'2018-10-08 12:52:57']
                         ['fdata'", '-37.903', '145.25', '317.7', '10.4', '53', '44', "'2018-10-08 12:52:57']
                         ['fdata'", '-37.986999999999995', '144.005', '311.2', '10.4', '50', '38', "'2018-10-08 12:52:57'"]))


In [3]:
#Processes the following RDD, sample below:     
    
#('2018-10-08 12:52:57', (["'cdata'", '948700', '26', '60.8', '6.9', '12.0', "' 0.00I'", "'2018-10-08 12:52:57'"], 
#                         ["'fdata'", '-38.038000000000004', '142.986', '313.4', '32.8', '65', '40', "'2018-10-08 12:52:57']
#                         ['fdata'", '-37.95', '142.366', '343.2', '58.7', '92', '70', "'2018-10-08 12:52:57']
#                         ['fdata'", '-38.231', '147.172', '333.3', '17.9', '83', '60', "'2018-10-08 12:52:57']
#                         ['fdata'", '-37.903', '145.25', '317.7', '10.4', '53', '44', "'2018-10-08 12:52:57']
#                         ['fdata'", '-37.986999999999995', '144.005', '311.2', '10.4', '50', '38', "'2018-10-08 12:52:57'"]))

def sendPartition(iter):
    #setup MongoDB connection and fire and climate collections
    connection = MongoClient()
    db = connection.get_database('as2TaskB')
    climateCollection = db.climate
    fireCollection = db.fire
    
    #loop through the streamed records
    for record in iter:
        datestamp = record[0]
        date = datestamp[0:10]
        climateEvent = record[1][0]
        fireEvents = record[1][1]

        #process and insert climate record
        station = int(climateEvent[1])
        air_temp = int(climateEvent[2])
        rel_humidity = float(climateEvent[3])
        windspeed_knots = float(climateEvent[4])
        maxwind_speed = float(climateEvent[5])
        precipitation = climateEvent[6]
        climateDf = pd.DataFrame({'Date':date, 'Station':station, 'Air_Temperature_Celcius':air_temp, 'Relative_Humidity':rel_humidity, 'WindSpeed_knots':windspeed_knots, 'Max_Wind_Speed':maxwind_speed, 'Precipitation ':precipitation}, index=[0])
        embeddedDf = pd.DataFrame({'Station':station, 'Air_Temperature_Celcius':air_temp, 'Relative_Humidity':rel_humidity, 'WindSpeed_knots':windspeed_knots, 'Max_Wind_Speed':maxwind_speed, 'Precipitation ':precipitation}, index=[0])
        climateRecord = json.loads(climateDf.to_json(orient='records')) 
        embeddedClimateRecord = json.loads(embeddedDf.to_json(orient='records')) 
        climateCollection.insert_many(climateRecord)

        #process and insert fireRecord 1
        latitude1 = float(fireEvents[1])
        longitude1 = float(fireEvents[2])
        surfacetemp_kelvin1 = float(fireEvents[3])
        power1 = float(fireEvents[4])
        confidence1 = int(fireEvents[5])
        surfacetemp_celcius1 = int(fireEvents[6])
        fireDf1 = pd.DataFrame({'Date':date, 'Datetime':datestamp, 'Latitiude':latitude1, 'Longitude':longitude1, 'Surface_Temperature_Kelvin':surfacetemp_kelvin1, 'Power':power1, 'Confidence':confidence1, 'Surface_Temperature_Celcius ':surfacetemp_celcius1, 'Climate':embeddedClimateRecord}, index=[0])
        fireRecord1 = json.loads(fireDf1.to_json(orient='records'))
        fireCollection.insert_many(fireRecord1)  
        
        #process and insert fireRecord 2
        latitude2 = float(fireEvents[8])
        longitude2 = float(fireEvents[9])
        surfacetemp_kelvin2 = float(fireEvents[10])
        power2 = float(fireEvents[11])
        confidence2 = int(fireEvents[12])
        surfacetemp_celcius2 = int(fireEvents[13])
        fireDf2 = pd.DataFrame({'Date':date, 'Datetime':datestamp, 'Latitiude':latitude2, 'Longitude':longitude2, 'Surface_Temperature_Kelvin':surfacetemp_kelvin2, 'Power':power2, 'Confidence':confidence2, 'Surface_Temperature_Celcius ':surfacetemp_celcius2, 'Climate':embeddedClimateRecord}, index=[0])
        fireRecord2 = json.loads(fireDf2.to_json(orient='records'))
        fireCollection.insert_many(fireRecord2)  

        #process and insert fireRecord 3
        latitude3 = float(fireEvents[15])
        longitude3 = float(fireEvents[16])
        surfacetemp_kelvin3 = float(fireEvents[17])
        power3 = float(fireEvents[18])
        confidence3 = int(fireEvents[19])
        surfacetemp_celcius3 = int(fireEvents[20])
        fireDf3 = pd.DataFrame({'Date':date, 'Datetime':datestamp, 'Latitiude':latitude3, 'Longitude':longitude3, 'Surface_Temperature_Kelvin':surfacetemp_kelvin3, 'Power':power3, 'Confidence':confidence3, 'Surface_Temperature_Celcius ':surfacetemp_celcius3, 'Climate':embeddedClimateRecord}, index=[0])
        fireRecord3 = json.loads(fireDf3.to_json(orient='records'))
        fireCollection.insert_many(fireRecord3)
        
        #process and insert fireRecord 4
        latitude4 = float(fireEvents[22])
        longitude4 = float(fireEvents[23])
        surfacetemp_kelvin4 = float(fireEvents[24])
        power4 = float(fireEvents[25])
        confidence4 = int(fireEvents[26])
        surfacetemp_celcius4 = int(fireEvents[27])
        fireDf4 = pd.DataFrame({'Date':date, 'Datetime':datestamp, 'Latitiude':latitude4, 'Longitude':longitude4, 'Surface_Temperature_Kelvin':surfacetemp_kelvin4, 'Power':power4, 'Confidence':confidence4, 'Surface_Temperature_Celcius ':surfacetemp_celcius4, 'Climate':embeddedClimateRecord}, index=[0])
        fireRecord4 = json.loads(fireDf4.to_json(orient='records'))
        fireCollection.insert_many(fireRecord4)
        
        #process and insert fireRecord 5
        latitude5 = float(fireEvents[29])
        longitude5 = float(fireEvents[30])
        surfacetemp_kelvin5 = float(fireEvents[31])
        power5 = float(fireEvents[32])
        confidence5 = int(fireEvents[33])
        surfacetemp_celcius5 = int(fireEvents[34])
        fireDf5 = pd.DataFrame({'Date':date, 'Datetime':datestamp, 'Latitiude':latitude5, 'Longitude':longitude5, 'Surface_Temperature_Kelvin':surfacetemp_kelvin5, 'Power':power5, 'Confidence':confidence5, 'Surface_Temperature_Celcius ':surfacetemp_celcius5, 'Climate':embeddedClimateRecord}, index=[0])
        fireRecord5 = json.loads(fireDf5.to_json(orient='records'))
        fireCollection.insert_many(fireRecord5)

    #close the MongoDb connection    
    connection.close()
    

**Stream the fireData and climateData sent from TCP server as2taskC1.ipynb**

* Below code streams climate data from Port 9999 and fire data on Port 9997 in 2 seperate streams.

* Both streams use a time based window of 5 seconds and join fireData tuples with climateData tuples matching the **event** datestamp within the window. 

* climateData is first split into an array of records and mapped to RDD: **(datestamp, climateRecord)** tuples. 

* Sample climateRecord is: ["'cdata'", '948700', '26', '60.8', '6.9', '12.0', "' 0.00I'", "'2018-10-08 12:52:57'"]).

* fireData is first split into an array of records and mapped to RDD: **(datestamp, fireRecord)** tuples.

* Sample fireRecord is: ["'fdata'", '-38.038000000000004', '142.986', '313.4', '32.8', '65', '40', "'2018-10-08 12:52:57']

* The climateData and fireData tuples are joined to create a stream of joint RDD: **(Datestamp, (climateDataRecord, fireDataRecord))**. See Above SendPartition code for sample. 

* Once joined, the RDDs are sent to SendPartition for processing and writing to **as2TaskB** MongoDb database in the schema defined in as2TaskB. 



In [4]:
# We add this line to avoid an error : "Cannot run multiple SparkContexts at once". If there is an existing spark context, we will reuse it instead of creating a new context.
sc = SparkContext.getOrCreate()

# Create a local StreamingContext with as many working processors as possible and a batch interval of 10 seconds            
batch_interval = 5

# If there is no existing spark context, we now create a new context
if (sc is None):
    sc = SparkContext(master="local[3]", appName = "WordCountApp")
ssc = StreamingContext(sc, batch_interval)

#defining the connection host and ports
host = "localhost"
port1 = "9999"
port2 = "9997"

#creating the 2 streams
climateStream = ssc.socketTextStream(host, int(port1))
fireStream = ssc.socketTextStream(host, int(port2))

#Create 5 second windows for each stream 
wclimateStream = climateStream.window(5)
wfireStream = fireStream.window(5)

#Map the windowed streams to (Datestamp, EventRecord) RDD's so they can be joined on datestamp.
climateRecs = wclimateStream.map(lambda climateRec: climateRec[1:-1].split(", ")).map(lambda climateRec: (climateRec[7][1:20], climateRec))
fireRecs = wfireStream.map(lambda fireRec: fireRec[1:-1].split(", ")).map(lambda fireRec: (fireRec[7][1:20], fireRec))

#create a joined stream to create RDDs of the form (Datestamp, (climateRec, fireRec))
joinedStream = climateRecs.join(fireRecs) #join the 2 streams on datestamp. 
                                          #In practice leftOuterJoin required as a climatic day may contain 0 or many fires.

#Store the event RDDs in the MongoDB database using sendPartition procedure
joinedStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

# Print the result                            
joinedStream.pprint()

#Start Streaming
ssc.start()
try:
    ssc.awaitTermination(timeout=60)
except KeyboardInterrupt:
    ssc.stop()
    sc.stop()


-------------------------------------------
Time: 2018-10-09 00:31:25
-------------------------------------------

-------------------------------------------
Time: 2018-10-09 00:31:30
-------------------------------------------
('2018-10-09 00:31:27', (["'cdata'", '948700', '15', '50.7', '9.2', '13.0', "' 0.02G'", "'2018-10-09 00:31:27'"], ["'fdata'", '-35.646', '142.282', '305.6', '11.8', '65', '32', "'2018-10-09 00:31:27']['fdata'", '-38.075', '143.76', '320.1', '17.6', '72', '46', "'2018-10-09 00:31:27']['fdata'", '-37.635999999999996', '149.33', '316.7', '37.1', '94', '43', "'2018-10-09 00:31:27']['fdata'", '-37.624', '149.332', '306.8', '20.8', '69', '33', "'2018-10-09 00:31:27']['fdata'", '-37.82', '142.32299999999998', '327.6', '18.9', '62', '54', "'2018-10-09 00:31:27'"]))
('2018-10-09 00:31:24', (["'cdata'", '948700', '19', '56.8', '7.9', '11.1', "' 0.00I'", "'2018-10-09 00:31:24'"], ["'fdata'", '-37.966', '145.05100000000002', '341.8', '26.7', '78', '68', "'2018-10-09 00:31:

-------------------------------------------
Time: 2018-10-09 00:32:10
-------------------------------------------
('2018-10-09 00:32:06', (["'cdata'", '948700', '16', '50.8', '5.8', '12.0', "' 0.00I'", "'2018-10-09 00:32:06'"], ["'fdata'", '-37.382', '149.341', '314.9', '10.8', '64', '41', "'2018-10-09 00:32:06']['fdata'", '-37.611', '149.27700000000002', '311.7', '16.1', '53', '38', "'2018-10-09 00:32:06']['fdata'", '-37.608000000000004', '149.292', '328.7', '42.3', '100', '55', "'2018-10-09 00:32:06']['fdata'", '-37.605', '149.308', '316.9', '22.7', '68', '43', "'2018-10-09 00:32:06']['fdata'", '-34.282', '142.121', '327.8', '39.3', '52', '54', "'2018-10-09 00:32:06'"]))

-------------------------------------------
Time: 2018-10-09 00:32:15
-------------------------------------------
('2018-10-09 00:32:09', (["'cdata'", '948700', '24', '55.4', '5.9', '9.9', "' 0.00I'", "'2018-10-09 00:32:09'"], ["'fdata'", '-37.606', '149.312', '360.7', '100.7', '100', '87', "'2018-10-09 00:32:09']['

-------------------------------------------
Time: 2018-10-09 00:32:55
-------------------------------------------
('2018-10-09 00:32:51', (["'cdata'", '948700', '20', '55.8', '10.5', '15.9', "' 0.01G'", "'2018-10-09 00:32:51'"], ["'fdata'", '-35.891999999999996', '145.62', '322.8', '19.0', '71', '49', "'2018-10-09 00:32:51']['fdata'", '-37.247', '141.27', '317.0', '17.1', '94', '43', "'2018-10-09 00:32:51']['fdata'", '-35.937', '145.607', '324.2', '13.5', '73', '51', "'2018-10-09 00:32:51']['fdata'", '-37.247', '141.278', '341.7', '88.2', '91', '68', "'2018-10-09 00:32:51']['fdata'", '-37.586999999999996', '142.47899999999998', '314.5', '11.7', '58', '41', "'2018-10-09 00:32:51'"]))

-------------------------------------------
Time: 2018-10-09 00:33:00
-------------------------------------------
('2018-10-09 00:32:54', (["'cdata'", '948700', '16', '48.4', '8.1', '15.9', "' 0.00G'", "'2018-10-09 00:32:54'"], ["'fdata'", '-37.805', '144.15', '329.1', '28.8', '82', '55', "'2018-10-09 00:3

-------------------------------------------
Time: 2018-10-09 00:33:35
-------------------------------------------
('2018-10-09 00:33:30', (["'cdata'", '948700', '16', '48.3', '14.4', '19.0', "' 0.00G'", "'2018-10-09 00:33:30'"], ["'fdata'", '-37.086999999999996', '145.37', '306.7', '17.5', '64', '33', "'2018-10-09 00:33:30']['fdata'", '-37.758', '148.721', '308.8', '11.2', '76', '35', "'2018-10-09 00:33:30']['fdata'", '-37.561', '148.032', '307.8', '11.1', '73', '34', "'2018-10-09 00:33:30']['fdata'", '-37.446', '148.102', '338.2', '46.8', '100', '65', "'2018-10-09 00:33:30']['fdata'", '-37.448', '148.114', '332.5', '35.5', '72', '59', "'2018-10-09 00:33:30'"]))

-------------------------------------------
Time: 2018-10-09 00:33:40
-------------------------------------------
('2018-10-09 00:33:33', (["'cdata'", '948700', '16', '53.6', '12.1', '16.9', "' 0.16G'", "'2018-10-09 00:33:33'"], ["'fdata'", '-37.453', '148.118', '307.4', '10.9', '71', '34', "'2018-10-09 00:33:33']['fdata'", '-

**Verify records inserted into MongoDB**

In [6]:
numClimateRecs = climateCollection.count() #count the number of JSON documents
                                           #inserted into the climate Collection
print("Number of Climate records:", numClimateRecs) #print the count. Should be 366.


numFireRecs = fireCollection.count() #count the number of JSON documents 
                                     #inserted into the fire Collection
print("Number of Fire records:", numFireRecs) #print the count. Should be 2668.


Number of Climate records: 49
Number of Fire records: 245


**Check the schema of the inserted records**

In [7]:
climateCollection.find()[0]

{'Air_Temperature_Celcius': 15,
 'Date': '2018-10-09',
 'Max_Wind_Speed': 13.0,
 'Precipitation ': "' 0.02G'",
 'Relative_Humidity': 50.7,
 'Station': 948700,
 'WindSpeed_knots': 9.2,
 '_id': ObjectId('5bbb5c3293436973dad26958')}

In [8]:
fireCollection.find()[0]

{'Climate': {'Air_Temperature_Celcius': 15,
  'Max_Wind_Speed': 13.0,
  'Precipitation ': "' 0.02G'",
  'Relative_Humidity': 50.7,
  'Station': 948700,
  'WindSpeed_knots': 9.2},
 'Confidence': 65,
 'Date': '2018-10-09',
 'Datetime': '2018-10-09 00:31:27',
 'Latitiude': -35.646,
 'Longitude': 142.282,
 'Power': 11.8,
 'Surface_Temperature_Celcius ': 32,
 'Surface_Temperature_Kelvin': 305.6,
 '_id': ObjectId('5bbb5c3293436973dad26959')}