# Build the IBM Streams Application for the Edge

Pulls the pre-built HandwrittenDigits_Model into the edge application bundle.
Input images are read in from an IBM Streams local topic (named 'camera_images'), with tuples containing:
- image - the binary PNG blob of the image
- camera - a camera id
- timestamp - the timestamp of the image being captured

The Image is prepped for scoring, then scored using the pre-built model loaded at application startup.
The prediction is recorded, and certainty of prediction is used to determine if we need to send the raw image back home for training improvements.
Periodically, prediction metrics are also sent home for higher level analysis.


In [None]:
!pip install --upgrade --user 'streamsx>=1.15.8'
!pip install --upgrade scikit-learn==0.21.3
!pip install streamsx.eventstreams


In [None]:
import os
import sys
import json
import datetime
import getpass
import numpy as np
import time
import base64
import socket

# Make sure this is first in the list...
sys.path.insert(0, '/home/wsuser/.local/lib/python3.6/site-packages')

from streamsx.topology.topology import Topology
from streamsx.topology import context
import streamsx.ec
import streamsx.eventstreams as eventstreams
print("Streamsx version:",streamsx.ec.__version__)

if '/project_data/data_asset' not in sys.path:
    sys.path.insert(0, '/project_data/data_asset')

from image_source import ImageSource
from image_classifier import DigitPredictor, compute_metrics, ImagePrep

# Grab Streams instance config object and REST reference
from icpd_core import icpd_util
STREAMS_INSTANCE_NAME = "edge"
streams_cfg=icpd_util.get_service_instance_details(name=STREAMS_INSTANCE_NAME)

from streamsx.rest_primitives import Instance
streams_cfg[context.ConfigParams.SSL_VERIFY] = False
streams_instance = Instance.of_service(streams_cfg)

# Model Name
MODEL_NAME = 'HandwrittenDigits_Model'

# How confident we have to be in the prediction to not send it home.
# This is just the default. Can be changed at submission time.
CONFIDENCE_THRESHOLD = 0.70

# Metrics aggregation window duration (in seconds)
METRICS_DURATION = 10

# Eventstreams topics
UNCERTAIN_PREDICTIONS_EVENTSTREAMS_TOPIC = 'DefaultTopic'
METRICS_EVENTSTREAMS_TOPIC = 'DefaultTopic'


In [None]:
# Enter in your Eventstreams credentials as JSON
eventstreams_credentials_json = getpass.getpass('Your Event Streams credentials:')
eventstreams_credentials = json.loads(eventstreams_credentials_json)


In [None]:
class Enricher(object):
    """
    Callable class that base64 encodes the image data, and adds some metadata to each tuple, including camera id/uid, timestamp, etc.
    
    """

    def __init__(self, get_camera_id):
        # Note this method is only called when the topology is
        # declared to create a instance to use in the map function.
        self.get_camera_id = get_camera_id
        self._uid = None
        self._cam_name = None

    def __call__(self, t):
        t['image'] = base64.b64encode(t['image']).decode('utf-8')
        t['camera'] = self._cam_name
        t['timestamp'] = datetime.datetime.utcnow().isoformat() + 'Z'

        return t

    def __enter__(self):
        # Called at runtime in the IBM Streams job before
        # this instance starts processing tuples.
        self._uid = socket.gethostname()
        self._cam_name = self.get_camera_id() + "-" + self._uid
        print("Camera name:", self._cam_name, flush=True)

    def __exit__(self, exc_type, exc_value, traceback):
        # __enter__ and __exit__ must both be defined.
        pass
        

In [None]:
# Build the application flow graph toplogy
def createEdgeCameraClassifierTopology():
    topo = Topology(name="EdgeCameraClassifier")

    # Add some Python dependencies into the edge application bundle
    topo.add_pip_package('scikit-learn==0.21.3')
    topo.add_pip_package('numpy')
    topo.add_pip_package('Pillow')
    topo.add_pip_package('joblib')

    # Ensure the model is pulled into the edge application bundle
    model_path = topo.add_file_dependency(os.path.join('/project_data/data_asset',MODEL_NAME), 'etc')

    
    # Create submission parameters
    # Threshold of certainty
    get_confidence_threshold = topo.create_submission_parameter('confidence', default=CONFIDENCE_THRESHOLD)
    
    # Initial parallel widths
    get_scoring_parallelism = topo.create_submission_parameter('parallelism', default=1)

    # Create submission parameters
    # How many times to repeat the dataset.  0 indicates to repeat forever.
    get_repeat_count = topo.create_submission_parameter('repeat', default=0)
    
    # Delay between sending images, in seconds.  0 indicates to not delay at all.
    get_delay = topo.create_submission_parameter('delay', default=0.0)
    
    # Camera id to use for this source
    get_camera_id = topo.create_submission_parameter('camera', default='Camera')
    
    # Source type is unused here, but some operators expect it, to help chosing a different sample image source
    get_source_type = lambda : 0 # source type of 0 is the MNIST test dataset we add below.
 
    # Pull in the images and MNIST index files we use to get images to push through
    dataset_dirs = []
    dataset_dirs.append(topo.add_file_dependency('/project_data/data_asset/mnist-test-images', 'etc'))
    
        
    # Start sending images
    images = topo.source(ImageSource(get_source_type, 
                                     dataset_dirs,
                                     delay=get_delay,
                                     repeat=get_repeat_count),
                         name="ImageSource")
    
    # Enrich the images streams with camera id and timestamp
    images_enriched = images.map(Enricher(get_camera_id),
                                 name="EnrichImages")
    
    # Enrich the incoming tuples, and pre-process the images into a form the model expects
    prepared_images = images_enriched.map(ImagePrep(), name="PrepareImages")
    
    # Now do actual classification of the image using the DigitPredictor class.
    # Allow this to be parallelized
    reparallel_prepared_images = prepared_images.parallel(get_scoring_parallelism)
    parallel_image_predictions = reparallel_prepared_images.map(DigitPredictor(model_path), name='PredictDigit')
    classified = parallel_image_predictions.end_parallel()
    
    # Dummy operator to make the graph easier to understand
    dummy = classified.map(lambda t: t, name="RecombineClassified")
    
    # Filter out the certain predictions, and keep the uncertain ones to send home.
    # Also, for testing, send everything from Test cameras home as well.
    uncertain_predictions = dummy.filter(lambda t: t['result_probability'] <= get_confidence_threshold() or t['camera'].startswith("Test"),
                                              name='CertaintyFilter')
    
    # Get a stream that is just the result class and camera id for aggregated metrics
    simplified = dummy.map(lambda t: {'camera': t['camera'],
                                           'result_class': t['result_class'],
                                           'result_probability': t['result_probability'],
                                           'prep_time': t['prep_time'],
                                           'predict_time': t['predict_time'],
                                           'timestamp': t['timestamp']},
                                name='SimplifyClassifications')
    
    
    # Send home predicted images that we're not sure about, through a kafka topic
    sendhome_uncertain_images = uncertain_predictions.as_json()
    eventstreams.publish(sendhome_uncertain_images, topic=UNCERTAIN_PREDICTIONS_EVENTSTREAMS_TOPIC, credentials=eventstreams_credentials, name="SendHomeUncertainImages")
    
    # Do some other processing for each prediction (here, we do nothing)
    result = simplified.map(lambda x : None, name='FurtherProcessing')
 
    
    # Aggregate classifications, over time windows
    metrics_windows = simplified.batch(size=datetime.timedelta(seconds=METRICS_DURATION))
    metrics = metrics_windows.aggregate(lambda v: compute_metrics(v, get_confidence_threshold(), METRICS_DURATION, get_delay(), get_repeat_count(), get_scoring_parallelism(), get_source_type()), name='ComputeDigitMetrics')
    
    # Periodically send classification metrics home, through a kafka topic
    sendhome_metrics = metrics.as_json()
    sendhome_metrics.view(name="metrics_view")
    eventstreams.publish(sendhome_metrics, topic=METRICS_EVENTSTREAMS_TOPIC, credentials=eventstreams_credentials, name="SendHomeClassificationMetrics")
    
    
    return topo


In [None]:
# Build the topology into a bundle file for later submission
topo =  createEdgeCameraClassifierTopology()

# Set the job config
job_config = context.JobConfig(job_name = topo.name, tracing = "debug")
job_config.raw_overlay = {'edgeConfig': {'imageName':'edge-camera-classifier-app', 'imageTag': 'v1', 'pipPackages': ['scikit-learn==0.21.3'], 'rpms': []}}
job_config.add(streams_cfg)

# Actually build the job, and push to edge image repo.
print("Building new job:", topo.name)

submission_result = context.submit('EDGE', topo, streams_cfg)
if submission_result.return_code == 0:
    print("Job Bundle built successfully.")
    print("  Image:       %s" % (submission_result['image'],))
