# Explore Event Streams Reader

This notebook shows how to read the Kafka topic published by the Streams job in a notebook, as an illustrative example.

**This project contains Sample Materials, provided under license.  
Licensed Materials - Property of IBM.  
© Copyright IBM Corp. 2019. All Rights Reserved.  
US Government Users Restricted Rights - Use, duplication or disclosure restricted by GSA ADP Schedule Contract with IBM Corp.**


## Setup

### Ensure appropriate python packages are available

In [None]:
!pip install kafka-python

### Create EventStreams Credentials File and Topic
Place your Event Streams credentials file (JSON copied from the Event Streams service on the cloud; see the README notebook) in the `datasources/credentials/` directory under the project.  Update the EVENTSTREAMS_CREDENTIALS variable with its name.  For simplicity and consistency with other DataSource credentials files, naming it `eventstreams_USERID.json` is a good pattern.

If you haven't already, create the Event Streams topic you'd like to use from the cloud Event Streams service interface, and set SIGNIFICANT_EVENTS_EVENTSTREAMS_TOPIC, below to match.
This topic name must also match the topic name set in the `streaming_analytics_lfe_pipeline` notebook.

In [None]:
import os
import getpass
userName=getpass.getuser()

EVENTSTREAMS_CREDENTIALS = os.environ['DSX_PROJECT_DIR']+'/datasources/credentials/eventstreams_'+userName+'.json' 
SIGNIFICANT_EVENTS_EVENTSTREAMS_TOPIC = "SIGNIFICANT_LFE_SCORES"


## Connect to EventStreams with a new Consumer

In [None]:
import kafka
import json
import ssl
import time

# Load credentials for the EventStreams
with open(EVENTSTREAMS_CREDENTIALS) as f:
    creds = json.load(f)

# Connect to EventStreams, with our loaded credentials and attach to the requested Topic.
cons = None
while cons is None:
    try:
        cons = kafka.KafkaConsumer(SIGNIFICANT_EVENTS_EVENTSTREAMS_TOPIC, \
                                   bootstrap_servers=creds["kafka_brokers_sasl"], \
                                   security_protocol="SASL_SSL", \
                                   sasl_mechanism="PLAIN", \
                                   sasl_plain_username=creds["user"], \
                                   sasl_plain_password=creds["api_key"], \
                                   ssl_cafile=ssl.get_default_verify_paths().cafile, \
                                   auto_offset_reset='latest')

    except kafka.errors.NoBrokersAvailable:
        print("No Brokers Available. Retrying ...")
        time.sleep(1)
        cons = None


## Support Code for Interactive and Dynamic Widgets

In [None]:
import ipywidgets as widgets
import threading
import pandas as pd
import collections
import math

# Handle button widget clicks, to start and stop the thread
def toggler(button, out, restart, event):
    if restart:
        # Re-start the thread.  This will also set up the button for stopping.
        t = threading.Thread(target=threadfunc, args=(button, out, event), name="Kafka Poller")
        t.start()
    else:
        # Ask the thread to quit.  This will also set up the button for restarting.
        button.disabled = True
        button.description = "Stopping ..."
        event.set()

old_cb = None

# Actual thread code, to refresh the significant events table data periodically
def threadfunc(button, out, event):
    # Clear the event so somebody else has to ask us to stop again
    event.clear()
    
    # Set the button's onclick to cancel behavior
    button.disabled = True
    button.description = "Stop Updating"
    global old_cb
    if old_cb is not None:
        button.on_click(old_cb, remove=True)
    old_cb = lambda w: toggler(button, out, False, event)
    button.on_click(old_cb)
    button.disabled = False

    current_lines = collections.deque(maxlen=10)
    
    # Loop until asked to quit, fetching events
    while not event.is_set():
        global cons
        try:
            parts = cons.poll(10000, max_records=2)
            for tp in parts:
                for item in parts[tp]:
                    m = json.loads(item.value.decode('ascii'))
                    try:
                        lhp = m["event"]["lfe_home_purchase"]["probabilities"][0][1]
                    except:
                        lhp = math.nan
                    try:
                        lr = m["event"]["lfe_relocation"]["probabilities"][0][1]
                    except:
                        lr = math.nan
                        
                    try:
                        mstr = "%-20s   %6d  %-10s  %-20s %-18.9f %-18.9f %s\n" % \
                            (time.strftime("%FT%TZ", time.gmtime(item.timestamp/1000)), \
                             m["cust_id"], \
                             m["event"]["sc_end_date"], \
                             m["event"]["event_type_id"], \
                             lhp, \
                             lr, \
                             m["message"])
                    except:
                        mstr = str(m["event"]) + "\n"
                    current_lines.append(mstr)
            # Re-render
            out.append_stdout("%-20s   %6s  %-10s  %-20s %-18s %-18s %s\n" % \
                              ("Event Time", \
                               "CustId", \
                               "Scored To", \
                               "Last Event", \
                               "Home Purchase Prob", \
                               "Relocation Prob", \
                               "Significance"))
            out.append_stdout(("="*160)+"\n")
            for x in reversed(current_lines):
                out.append_stdout(x)
            out.clear_output(wait=True)
            time.sleep(0.25)
        except Exception as e:
            out.append_stderr("Got exception:\n")
            out.append_stderr(str(e))
            out.append_stderr("\n\n")
            raise
    
    # Set up the button for restarting
    button.disabled = True
    button.on_click(old_cb, remove=True)
    old_cb = lambda w: toggler(button, out, True, event)
    button.on_click(old_cb)
    button.description = "Start Updating"
    button.disabled = False


## Widget Layout and Background Thread Startup

In [None]:
# Create an output widget, with an HTML label up top to say what it is, and a button top stop fetching events
label = widgets.HTML(value='<h3>Live Significant Events</h3>')
o = widgets.Output(layout={'border': '1px solid black', 'height': '220px'})
b = widgets.Button(description='', button_style='danger', layout={'width': '25%'}, disabled=True)
vbox = widgets.VBox([label, o, b])

# Call the toggler function to (re)start the thread for us, with the event handle to control stopping the thread later.
halt_event = threading.Event()
toggler(b, o, True, halt_event)


## Live Significant Events Feed

Here we can watch as new Significant Events are generated by the Streams jobs.  New alerts show up at the top and only the 10 most recent alerts are shown, including the most recent scores for that customer, the significance of the alert, and information about the event that caused the customer to be re-scored resulting in the significant event.

Signficant events are generated for a customer whenever that customer's score changes from below 40% to above 50%, or vice versa.  That is, moving from 55% to 45% back to 55% would not be seen as 'significant', and neither would moving from 35% to 45% and back to 35%.  However, for this demo, the first time a customer's score moves above 50%, even if it wasn't previously below 40%, a significant event will be generated.

The 'significance' of a new customer score is determined in the `streaming_analytics_lfe_pipeline` notebook, in the `SignificantEvents` class definition.  Changing that cell to use a different definition of 'significance', and re-submitting the `significant_event_generation` Streams job from within that Notebook will change when these alerts are generated.  This could be a change to the thresholds, or a completely different way of determining 'signficance'.

Below the significant events feed view is a button that will stop (and/or re-start) the background thread updating the feed from Eventstreams.


In [None]:
# Actually display the widgets
display(vbox)
