# 3.4 Analyzing and visualizing PM2.5 data streams (Step-3)

This Jupyter notebook represents a **Kafka consumer** that analyses the data stream generated in the previous step. 
After initializing the Kafka consumer we will perform two types of analysis:

-  **Event Detection:** We'll detect events where the 3-day average value of PM2.5 concentrations for a location exceeds the critical value of 20.0 (event of interest). As soon as the event is detected we'll invoke a warning action.

-  **Monitoring:** We'll create a map that informs us about the location and the current status (active, not active) of sensors.




In [1]:
## Import Libraries

import warnings ## ignore warnings that might be shown due to older python libraries
warnings.filterwarnings('ignore') 

import geopandas as gpd # to read files with spatial information like raster or vector
import json, math # handle json and mathematical operations
import numpy as np # handle matrix type operations and manipulation on numerical data
import geojson # handle json files with spatial information 
import pandas as pd # handle tabular data
import sys # output error messages
import time # provides time related functions
import socket #  to get network properties for kafka communication
from confluent_kafka import Consumer, KafkaError, KafkaException # Kafka library for kafka consumer components

from ipyleaflet import Map, basemaps, WidgetControl, Marker, basemap_to_tiles, DrawControl, GeoJSON, MarkerCluster, AwesomeIcon # widget to enable map interactions
from ipywidgets import IntSlider, ColorPicker, jslink # widget to enable map interactions

## 3.4.1 Define and Initialize the Kafka Consumer 

In [2]:
### START: AVOID MAKING CHANGES ###

'''
Offset decides in what order to consume the message. "smallest" means read the first message that was sent at 1st position and then the others.
"largest" will mean to read the most 'recent' message in 1st position and then others in the same order
'''

conf = {'bootstrap.servers': 'kafka:9093',
        'default.topic.config': {'auto.offset.reset': 'smallest'},
        'group.id': socket.gethostname()}

### END: AVOID MAKING CHANGES ###

In [3]:
## Set topic name as defined in file: step_2_producer.ipynb
topic = "pm25_stream"

## Kafka streamed data will be stored here
#df = pd.DataFrame(columns=['lat','lon','value','day','boxId'])
df = pd.DataFrame(columns=['lat','lon','day','value','boxId'])

In [4]:
## Initialize the consumer and subscribe to the topic

consumer = Consumer(conf)
consumer.subscribe([topic])

running = True

## 3.4.2 Event Detection

a) Define functions that will be used for real-time processing of sensor data

In [5]:

def warning_action(sensebox, timestamp, pm_3d_average):

    '''
    This function defines what actions should be performed when the PM 2.5 levels have exceeded the defined threshold above
    In this case we are simply printing a message
    '''
    
    notification = str(sensebox)+" : "+str(timestamp)+" : !!! WARNING !!! PM 2.5 three-day average ("+str(pm_3d_average)+") exceeds critical value"
    print(notification)

    '''
    You can insert an email trigger script after this comment
    '''
    
def event_filter(df):
    
    '''
    Function to handle events in real-time. This function is used to define the operations that should be 
    triggered for every data point that is received by the consumer.
    '''
    
    try:
                
        ## Get rolling average of pm value by lat/lon over last 3 days
        rolling_average = df.groupby(['lat','lon']).rolling(3)['value'].mean().reset_index()
        rolling_average.dropna(inplace=True)
        
        current_lat = df.iloc[df.shape[0] - 1,:].lat
        current_lon = df.iloc[df.shape[0] - 1,:].lon
        current_value = df.iloc[df.shape[0] - 1,:].value
        
        ## Get the rolling average value for each unique lat/lon. This is required because the stream has data for multiple locations
        rolling_average_index = rolling_average[(rolling_average['lat'] == current_lat) & (rolling_average['lon'] == current_lon)].index[-1]
        
        ## Get the details for the pm value, time and senseboxid for the current lat/lon that was received
        pm_3d_average = rolling_average.loc[rolling_average_index, 'value']
        timestamp = df.iloc[df.shape[0] - 1,:]['day']
        sensebox = df.iloc[df.shape[0] - 1,:]['boxId']
            
        ## Fiter events
        if pm_3d_average > 20.0 : # three day average exeeds critial value of 20
    
            ## Trigger action
            warning_action(sensebox, timestamp, round(pm_3d_average,2))    
        
        else:
            
            ## PM value is ok
            print(str(sensebox)+" : "+str(timestamp)+" : PM 2.5 value ("+ str(round(current_value,2)) +") is ok")

    except:

        ## In the begining when the consumer has just started there isn't enough data points to calculate rolling average for 3 days,
        ## hence, this logic will fail in the initial 2 iterations and can be simply handled by a try-catch section
        
        pass # do nothing and continue

b) Poll and process sensor data messages 

In [6]:
try:

    ## Ifinite loop which breaks if no message is received for more than **10 seconds**
    
    while running:

        msg = consumer.poll(timeout=10) # wait 10 seconds before exit. If no messages are received for 10 seconds, consuming will stop 
        
        if msg is None:
            break # if no messsages are received, exit this loop
            
        if msg.error():

            ## handle different errors that can come up (specific to kafka)

            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                    (msg.topic(), msg.partition(), msg.offset()))
                
            elif msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
                sys.stderr.write('Topic unknown, creating %s topic\n' %
                                    (topic))
            elif msg.error():
                raise KafkaException(msg.error())
                
        else:
            
            ## This block is executed when everything is working fine and a message was successfully received

            ## Load the message in JSON format or dictionary format
            input = json.loads(msg.value())
                        
            
            ## The actual PM 2.5 value is the "key" of the above dictionary
            key = list(input.keys())[0]
            
            ## Create a temporary dictionary with received values for each data point. Dict to Pandas conversion is easier
            ## Each of these dicts are appended to the pandas dataframe as a row

            stream = {
                'lat': input[key][0],  # latitude of the sensor
                'lon': input[key][1],  # longitude of the sensor
                'day': input[key][2],  # day of the value recording
                'value':  float(key),  # PM 2.5 Value
                'boxId': input[key][3] # Sensebox ID
            }

                    
            ## Append the above dict to a pandas table
            stream_df = pd.DataFrame([stream])
            df = pd.concat([df, stream_df], ignore_index=True)
            
            ### EVENT NOTIFICATION SECTION: START ###
            
            event_filter(df)
            
            ### EVENT NOTIFICATION SECTION: END ###
            
        ## Commit enables processing of a message only once, meaning drops any duplicates, however, you may lose messages that
        ## were not sent for some failure and will not be re-tried. Removing this command is possible but will require further
        ## changes to this script to perform manual de-duplication
        consumer.commit()

except KeyboardInterrupt:
    pass

finally:
    consumer.close()
    
    ## Note: Re-running this cell will note pull the data again as it is already pulled and the consumer is closed. You should
    ## re-run the 'sendStream.py' file to send the data again and then restart this notebook

591f578c51d34600116a8ea5 : 2024-01-16T13:00:00.000Z : PM 2.5 value (6.49) is ok
591f578c51d34600116a8ea5 : 2024-01-17T14:00:00.000Z : PM 2.5 value (9.89) is ok
591f578c51d34600116a8ea5 : 2024-01-18T15:00:00.000Z : PM 2.5 value (10.08) is ok
591f578c51d34600116a8ea5 : 2024-01-19T16:00:00.000Z : PM 2.5 value (16.39) is ok
591f578c51d34600116a8ea5 : 2024-01-20T17:00:00.000Z : PM 2.5 value (5.12) is ok
591f578c51d34600116a8ea5 : 2024-01-21T18:00:00.000Z : PM 2.5 value (31.15) is ok
59ad958fd67eb50011b85f6d : 2024-01-16T13:00:00.000Z : PM 2.5 value (6.32) is ok
59ad958fd67eb50011b85f6d : 2024-01-17T14:00:00.000Z : PM 2.5 value (6.89) is ok
59ad958fd67eb50011b85f6d : 2024-01-18T15:00:00.000Z : PM 2.5 value (5.79) is ok
59ad958fd67eb50011b85f6d : 2024-01-19T16:00:00.000Z : PM 2.5 value (6.45) is ok
59ad958fd67eb50011b85f6d : 2024-01-20T17:00:00.000Z : PM 2.5 value (6.22) is ok
59ad958fd67eb50011b85f6d : 2024-01-21T18:00:00.000Z : PM 2.5 value (6.88) is ok
59ad958fd67eb50011b85f6d : 2024-01-22

In [7]:
## Read the output of the streamed file
df.head()

Unnamed: 0,lat,lon,day,value,boxId
0,51.96422,7.645218,2024-01-14T11:00:00.000Z,11.232186,591f578c51d34600116a8ea5
1,51.96422,7.645218,2024-01-15T12:00:00.000Z,4.7794,591f578c51d34600116a8ea5
2,51.96422,7.645218,2024-01-16T13:00:00.000Z,6.492572,591f578c51d34600116a8ea5
3,51.96422,7.645218,2024-01-17T14:00:00.000Z,9.893489,591f578c51d34600116a8ea5
4,51.96422,7.645218,2024-01-18T15:00:00.000Z,10.077617,591f578c51d34600116a8ea5


In the above dataframe, we have data for three different days for each of the **n** locations

In [8]:
## Check how many values are present for each day
df['day'].value_counts()

day
2024-01-15T12:00:00.000Z    10
2024-01-16T13:00:00.000Z    10
2024-01-17T14:00:00.000Z    10
2024-01-18T15:00:00.000Z    10
2024-01-14T11:00:00.000Z     9
2024-01-19T16:00:00.000Z     9
2024-01-20T17:00:00.000Z     9
2024-01-21T18:00:00.000Z     9
2024-01-22T19:00:00.000Z     9
Name: count, dtype: int64

In [9]:
# Convert Pandas to GeoPandas
gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.lon, df.lat))
gdf.set_crs(epsg=4326, inplace=True, allow_override=True)
gdf.drop(['lon','lat'], axis=1, inplace=True)
gdf.head()

Unnamed: 0,day,value,boxId,geometry
0,2024-01-14T11:00:00.000Z,11.232186,591f578c51d34600116a8ea5,POINT (7.64522 51.96422)
1,2024-01-15T12:00:00.000Z,4.7794,591f578c51d34600116a8ea5,POINT (7.64522 51.96422)
2,2024-01-16T13:00:00.000Z,6.492572,591f578c51d34600116a8ea5,POINT (7.64522 51.96422)
3,2024-01-17T14:00:00.000Z,9.893489,591f578c51d34600116a8ea5,POINT (7.64522 51.96422)
4,2024-01-18T15:00:00.000Z,10.077617,591f578c51d34600116a8ea5,POINT (7.64522 51.96422)


## 3.4.3 Monitoring the Status of Sensors

In this section, we will create a map of the SenseBoxes that provides information about the location and current status of the sensors:

- Green: SenseBoxes that are live/returned values on the last date 
- Gray: SenseBoxes that are down/did not return values for the last date


In [10]:
## Define Icons for SenseBoxes

icon_active = AwesomeIcon(
    name='map-marker',
    marker_color='green',
    icon_color='green',
    spin=False
)

icon_inactive = AwesomeIcon(
    name='map-marker',
    marker_color='gray',
    icon_color='gray',
    spin=False
)

In [11]:
gdf['valid'] = gdf['value'].apply(lambda x: False if math.isnan(x) == True else True)

In [12]:
## Get most recent date

valid_boxes = gdf[gdf['valid'] == True]
recent_date = valid_boxes['day'].max()

In [13]:
active_boxes = gdf[gdf['day'] == recent_date][['boxId','geometry']]
active_boxes.drop_duplicates(subset=['boxId'], inplace=True)
active_boxes = active_boxes[['geometry']]
active_boxes['status'] = 'active'
active_boxes

Unnamed: 0,geometry,status
8,POINT (7.64522 51.96422),active
17,POINT (7.63528 51.90300),active
26,POINT (7.62061 51.92106),active
35,POINT (7.64153 51.97302),active
44,POINT (7.64668 51.98850),active
53,POINT (7.56908 51.99415),active
62,POINT (13.36913 52.52015),active
71,POINT (7.63194 51.95434),active
84,POINT (7.72341 51.91065),active


In [14]:
inactive_boxes = gdf[gdf['valid'] == False].drop_duplicates(subset=['geometry'])[['geometry']]
inactive_boxes['status'] = 'inactive'
inactive_boxes

Unnamed: 0,geometry,status


Setup marker icons to display the inactive and active sensors separately

In [15]:
## Create a cluster of active senseboxes as points

active_markers = []
for coords in active_boxes['geometry']:
    
    active_markers.append(
        Marker(location=(coords.y, coords.x), icon=icon_active, draggable=False)
    )

## Create a cluster of inactive senseboxes as points

inactive_markers = []
for coords in inactive_boxes['geometry']:
    
    inactive_markers.append(
        Marker(location=(coords.y, coords.x), icon=icon_inactive, draggable=False)
    )


In [16]:
lat = 51.9500023
lng = 7.6240147

center = (lat, lng)

m = Map(center=center, zoom=11)

active_boxes_cluster = MarkerCluster(
    markers=tuple(active_markers)
)

inactive_boxes_cluster = MarkerCluster(
    markers=tuple(inactive_markers)
)

m.add_layer(active_boxes_cluster)
m.add_layer(inactive_boxes_cluster)

display(m)

Map(center=[51.9500023, 7.6240147], controls=(ZoomControl(options=['position', 'zoom_in_text', 'zoom_in_title'…

This marks the end of our Kafka Streaming workflow. You should now be able to see a MAP of all senseboxes locations that are active/inactive

#### END OF TUTORIAL
