# build-metro-application

Builds metro appliction that accepts Kafka messages from the edge(s). 

Aggregates/Analyze messages 
- push to storage
- generate events 
- send notifcations
- deep analysis
- host views to monitor the 'goings on'.

### Install and import required libraries

In [None]:
!pip install streamsx.eventstreams

In [None]:
import urllib3, time, json, os, sys, collections, warnings, ast
import numpy as np

from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema
from streamsx.topology.context import submit, ContextTypes
import streamsx.eventstreams as eventstreams
from streamsx.rest_primitives import Instance
from streamsx import rest
from streamsx.topology import context

if 'scripts' not in sys.path:
    sys.path.insert(0, 'scripts')
import digit_support

### Add credentials for the IBM Streams service

In [None]:
urllib3.disable_warnings()
# Cell to grab Streams instance config object and REST reference
from icpd_core import icpd_util
STREAMS_INSTANCE_NAME = "<INSERT-STREAMS-INSTANCE-NAME-HERE>"
streams_cfg=icpd_util.get_service_instance_details(name=STREAMS_INSTANCE_NAME)
streams_cfg[context.ConfigParams.SSL_VERIFY] = False
streams_instance = Instance.of_service(streams_cfg)

In [None]:
## Define the EventStreams topics to access 
UNCERTAIN_PREDICTIONS_EVENTSTREAMS_TOPIC = 'UncertainPredictions'
METRICS_EVENTSTREAMS_TOPIC = 'ClassificationMetrics'

### Add IBM eventstreams credentials

In [None]:
import getpass

In [None]:
## Eventstreams for uncertain predictions 

uncertain_eventstreams_credentials_json = getpass.getpass('Your Event Streams credentials:')
uncertain_eventstreams_credentials = eventstreams.configure_connection(streams_instance, name='UNCERTAIN', credentials=uncertain_eventstreams_credentials_json)
uncertain_eventstreams_credentials = 'UNCERTAIN'

In [None]:
## Eventstreams for all metrics

metrics_eventstreams_credentials_json = getpass.getpass('Your Event Streams credentials:')
metrics_eventstreams_credentials = eventstreams.configure_connection(streams_instance, name='METRICS', credentials=metrics_eventstreams_credentials_json)
metrics_eventstreams_credentials = 'METRICS'

### Create the application

In [None]:
class SlideWindow(object):
    """ Window with slide_length elements. 
    
    Window fills, intial output will have less than slide_length.
    
    Args:
        slide_length: maximum number of elements in window.
        
    Returns:
        list of up to 25 of the last tups input.
    """
    def __init__(self, slide_length:int=25):
        self.slide_length = slide_length

    def __enter__(self):
        self.chunk = collections.deque(maxlen=self.slide_length)
        
    def __exit__(self, exc_type, exc_value, traceback):
        # __enter__ and __exit__ must both be defined.
        pass
    
    def __call__(self, tup) -> list:
        self.chunk.append(tup)
        return list(self.chunk)

In [None]:
class convert_tuples(object):
    
    def __call__(self, tup):
        # Unbox the stream from streams flow
        if 'metrics_json' in tup:
            string_tup = tup['metrics_json']
            
            return str(string_tup)

        elif 'uncertain_metrics_json' in tup:
            string_tup = tup['uncertain_metrics_json']            
            string_tup = json.loads(string_tup)
        
            # Convert from streams flow types into python types 
            string_tup['result_class'] = int(string_tup['result_class']) # float -> int
            string_tup['predictions'] = ast.literal_eval(string_tup['predictions']) # String -> List
            string_tup['image'] = ast.literal_eval(string_tup['image']) # String -> List
            string_tup['prepared_image'] = np.reshape(string_tup['image'], (28, 28)).tolist() # reshape 784 len List into 28 x 28
    
            return json.dumps(string_tup)    

### If you wish to use Kafka instead of IBM eventstreams, uncomment and fill in the following cell

In [None]:
## To learn more, go to https://streamsxkafka.readthedocs.io/en/latest/ 

# !pip install --user streamsx.kafka>=1.9.0
# import streamsx.kafka as kafka

# consumerProperties = dict()
# consumerProperties['bootstrap.servers'] = ...,
# consumerProperties['fetch.min.bytes'] = ...,
# consumerProperties['max.partition.fetch.bytes'] = ...,
# consumerProperties['security.protocol'] = ...,
# consumerProperties['sasl.jaas.config'] = ...

# # Certain
# consumer_certain = kafka.KafkaConsumer(config=consumerProperties,
#                          topic='<YOUR_TOPIC>',
#                          schema=CommonSchema.Json)

# # Uncertain
# consumer_uncertain = kafka.KafkaConsumer(config=consumerProperties,
#                          topic='<YOUR_TOPIC>',
#                          schema=CommonSchema.Json)

### If you wish to use Kafka instead of IBM eventstreams, uncomment the relevant lines in the following cell

In [None]:
urllib3.disable_warnings()
def build_metro() -> Topology:
    """ metro application subscribing to two topics 
    
    * Subscribed topics are reflected out view.
    * Metrics are windowed
    
    Returns:
        Topology of the application. 
    """
    topo = Topology('EdgeMetroSubscribe')

    # COLLECT METRICS 
    from_evstr1 = eventstreams.subscribe(topo, schema=CommonSchema.Json, topic=METRICS_EVENTSTREAMS_TOPIC, credentials=metrics_eventstreams_credentials)
    ## --- If using kafka, comment out the above line and use the following line instead ---
    # from_evstr1 = topo.source(consumer_certain)
    
    from_evstr1_clean = from_evstr1.map(convert_tuples())
    metrics_view = from_evstr1_clean.view(name="ClassificationMetrics")
    # from_evstr1_clean.print(name="ClassificationMetrics")
    
    # WINDOW METRICS (Includes both certain and uncertain data)
    windowSlide = from_evstr1_clean.map(SlideWindow())
    metrics_windowSlide_view = windowSlide.view(name="WindowUncertain")
    # windowSlide.print(name="windowPrint")
    
    # COLLECT UNCERTAIN METRICS
    from_evstr2 = eventstreams.subscribe(topo, schema=CommonSchema.Json, topic=UNCERTAIN_PREDICTIONS_EVENTSTREAMS_TOPIC, credentials=uncertain_eventstreams_credentials)
    ## --- If using kafka, comment out the above line and use the following line instead ---
    # from_evstr2 = topo.source(consumer_uncertain)
    
    from_evstr2_clean = from_evstr2.map(convert_tuples())
    un_certrain_metrics_view = from_evstr2_clean.view(name="UncertainPredictions")
    # from_evstr2_clean.print(name="uncertainPrint")

    return metrics_view, metrics_windowSlide_view, un_certrain_metrics_view, topo



metrics_view, metrics_windowSlide_view, un_certrain_metrics_view, topo = build_metro()
result = digit_support.submitToStreams(topo, streams_cfg)
result['return_code']

In [None]:
# show the views that we've got up...
time.sleep(15)   # give the applications time to wake up collect some images.....
digit_support.display_views(streams_instance, job_name="EdgeMetroSubscribe")

In [None]:
# Connect to the view and display sample data
queue = un_certrain_metrics_view.start_data_fetch()
try:
    json_val = queue.get()
    val = json.loads(json_val)
    print('Predicted digit is {}'.format(val['result_class']))
    print('Predicted probability is {}\n'.format(val['result_probability']))
    print(val)
finally:
    un_certrain_metrics_view.stop_data_fetch()

## Monitor from metro. 

Once the metro is up and running the render-metro-views.ipynb notebook can monitor and enhance the processing.