Name: Haw Xiao Ying <br>Student ID: 29797918 <br>Email: xhaw0001@student.monash.edu

# Assignment Part B

## Task 2

### 1. Streaming data visualization

In [None]:
import json
from time import sleep
from kafka import KafkaConsumer
import datetime as dt
import matplotlib.pyplot as plt

# This line is needed for the inline display of graphs in Jupyter Notebook
%matplotlib notebook

topic = 'assignment'

def annotate_max(x, y, ax = None):
    ymax = max(y)
    xpos = y.index(ymax)
    xmax = x[xpos]
    text = 'Max: Time={}, Air Temperature={}'.format(xmax, ymax)
    if not ax:
        ax=plt.gca()
    ax.annotate(text, xy=(xmax, ymax), xytext=(xmax, ymax+5), arrowprops=dict(facecolor='red', shrink=0.05),)
    
def annotate_min(x, y, ax = None):
    ymin = min(y)
    xpos = y.index(ymin)
    xmin = x[xpos]
    text = 'Min: Time={}, Air Temperature={}'.format(xmin, ymin)
    if not ax:
        ax=plt.gca()
    ax.annotate(text, xy=(xmin, ymin), xytext=(xmin, ymin+5), arrowprops=dict(facecolor='orange', shrink=0.05),)

def connect_kafka_consumer():
    _consumer = None
    try:
         _consumer = KafkaConsumer(topic,
                                   consumer_timeout_ms=10000, # stop iteration if no message after 10 sec
                                   auto_offset_reset='earliest', # comment this if you don't want to consume earliest available message
                                   bootstrap_servers=['localhost:9092'],
                                   api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _consumer

def init_plots():
    try:
        width = 9.5
        height = 6
        fig = plt.figure(figsize=(width,height)) # create new figure
        ax = fig.add_subplot(111) # adding the subplot axes to the given grid position
        fig.suptitle('Real-time uniform stream data visualization with interesting points') # giving figure a title
        ax.set_xlabel('Time')
        ax.set_ylabel('Air Temperature')
        ax.set_ylim(0,40) 
        ax.set_yticks([0,5,10,15,20,25,30,35,40])
        fig.show() # displaying the figure
        fig.canvas.draw() # drawing on the canvas
        return fig, ax
    except Exception as ex:
        print(str(ex))
    
def consume_messages(consumer, fig, ax):
    try:
        # container for x and y values
        x, y = [], []
        # print('Waiting for messages')
        for message in consumer:
            data = json.loads(message.value.decode('utf-8'))
            x.append(str(dt.datetime.now().strftime("%X"))) 
            y.append(data['air_temperature_celcius'])

            # we start plotting only when we have 10 data points
            if len(y) > 10:
                ax.clear()
                ax.plot(x, y)
                ax.set_xlabel('Time')
                ax.set_ylabel('Air Temperature')
                ax.set_ylim(0,40) 
                ax.set_yticks([0,5,10,15,20,25,30,35,40])
                annotate_max(x,y)
                annotate_min(x,y)
                fig.canvas.draw()
                x.pop(0) # removing the item in the first position
                y.pop(0)
        plt.close('all')
    except Exception as ex:
        print(str(ex))
    
    
    
if __name__ == '__main__':
    
    consumer = connect_kafka_consumer()
    fig, ax = init_plots()
    consume_messages(consumer, fig, ax)
    
    

### 2. Static data visualization

In [None]:
# a. Plot a bar chart to visualize the total number of fire records based on each hour.

import matplotlib.pyplot as plt
from pymongo import MongoClient
from pprint import pprint

%matplotlib notebook

client = MongoClient()
db = client.fit3182_assignment_db
cursor = db.hotspot_stream.find()

lst_freq = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
lst_time = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23]
for record in cursor:
    lst_freq[int(record["datetime"][11:13])] += 1
    
plt.bar(lst_time, lst_freq)
plt.xlabel("Hours")
plt.xticks(lst_time)
plt.ylabel("Number of Fire Records")
plt.title("Total Number of Fire Records Based on Each Hour")
plt.show()


In [None]:
# b. In a map visualize fire locations as markers. Use a ‘blue’ marker if the cause of the fire was ‘natural’. 
#    Otherwise, use a ‘red’ marker. Display detailed information such as air temperature, surface temperature, 
#    relative humidity, and confidence with the marker tooltip. 

import folium
from pymongo import MongoClient

def map_plot(results):
    
    m = folium.Map(location=[-38.043995, 145.264296], zoom_start=8) # starting on melbourne city
        
    # plot marker in the map
    for record in results:

        # Get the information of records in here...
        latitude = record['latitude']
        longitude = record['longitude']
        surf_temp = record['surface_temperature_celcius']
        confidence = record['confidence']
        
        cursor = db.climate_stream.find({'_id':record['foreign_key']})
        climate_data = cursor[0]
        air_temp = climate_data['air_temperature_celcius']
        humidity = climate_data['relative_humidity']
        
        detailed_information = "Air temperature: " + str(air_temp) 
        detailed_information += ", Surface temperature: " + str(surf_temp) 
        detailed_information += ", Relative humidity: " + str(humidity)
        detailed_information += ", Confidence: " + str(confidence)
        
        # Add marker to the map object
        if (record['cause'] == 'natural'):
            folium.Marker([latitude, longitude], popup=detailed_information, icon=folium.Icon(color='blue')).add_to(m)
        else:
            folium.Marker([latitude, longitude], popup=detailed_information, icon=folium.Icon(color='red')).add_to(m)

    return m


if __name__ == "__main__":
    
    # Step 1. Access to the database and read the record using find()
    client = MongoClient()
    db = client.fit3182_assignment_db
    
    results = db.hotspot_stream.find()
    
    # Step 2. To get map object with markers
    plot_map = map_plot(results)
    
    # Step 3. To plot map object
    display(plot_map)
