# IBM Streams Spark PMML scoring sample application

This sample demonstrates creating a Streams Python application to perform scoring with a Spark PMML model and viewing the results.


<ol>
    <li><a href="#setup">Setup</a>
        <ul style="list-style-type:none;">
            <li>1.1. <a href="#setupStreams">Connection to Streams Instance</a></li>
            <li>1.2. <a href="#setupPackages">Install additional packages</a></li>
            <li>1.3. <a href="#setupDataModel">Add data and model</a></li>
        </ul>
    </li>
    <li><a href="#create">Create the application</a>
        <ul style="list-style-type:none;">
            <li>2.1. <a href="#defineInput">Define Data Input</a></li>
            <li>2.2. <a href="#analyzeData">Analyze Data (ML scoring)</a>
                <ul style="list-style-type:none;">
                    <li>2.2.1. <a href="#prepareData">Prepare Data for scoring</a></li>
                    <li>2.2.2. <a href="#scoreData">Score Data</a></li>
                </ul>
            </li>
            <li>2.3 <a href="#defineOutput">Define Data Output</a></li>
        </ul>
    </li>
    <li><a href="#submit">Submit the application</a></li>
    <li><a href="#view">Connect to the running application to view data</a></li>
    <li><a href="#cancel">Stop the application</a></li>
</ol>

<a name="overview"></a>

# Overview

**About the sample**

This sample application adapt the model of the heart disease patients' health data to demonstrate the process of developing Spark-based PMML model and then score it in IBM Streams. 

__Creating the model__
The ML scenario is a simple process of converting Apache Spark ML pipelines to PMML.
Details available via: https://openscoring.io/blog/2018/07/09/converting_sparkml_pipeline_pmml/


**How it works**

This sample build around the following previous samples: Basic Streams Sample and IBM Streams PMML scoring sample application. The main addition here is the introduction of Spark for in-memory processing.

<img src="https://developer.ibm.com/streamsdev/wp-content/uploads/sites/15/2019/04/how-it-works.jpg" alt="How it works">

<br>
<br>


### Documentation & Information links

- [Streams Python development guide](https://ibmstreams.github.io/streamsx.documentation/docs/latest/python/)
- [streamsx.topology Python API](https://streamsxtopology.readthedocs.io/)
- [streamsx.pmml Python API](https://streamsxpmml.readthedocs.io/)
- [PMML at Wikipedia](https://en.wikipedia.org/wiki/Predictive_Model_Markup_Language)
- [PMML specification at Data Mining Group](http://dmg.org/pmml/v4-3/GeneralStructure.html)
- https://github.com/jpmml/pyspark2pmml
- https://github.com/jpmml/jpmml-sparkml
- https://github.com/pmservice/wml-sample-models/tree/master/spark/import-pmml


<a id="setup"></a>

# 1. Setup

  
  
<div class="alert alert-block alert-info">
<b>What do we need to set up for our sample application?</b>  
<ul>    
<li>A connection to a running <b>IBM Streams instance</b> <br> 
   This notebook describes using a Streams instance in IBM Cloud Streaming Service.</li>
<li><b>streamsx.topology</b> - the Python package providing the IBM Streams standard/basic function set</li>
<li><b>streamsx.pmml</b> - the Python package providing the IBM Streams PMML scoring functionality</li>
<li><b>pyspark</b> - Spark Python API that exposes the Spark programming model to Python <br> 
    This library will not be required if the model is being developed in Watson Studio with Spark and Python session.</li>
<li><b>pyspark2pmml</b> - Python library for converting Apache Spark ML pipelines to PMML </li>    
<li><b>Data set</b> Iris dataset or any dataset that can be simulated to "stream" through the application</li>
<li><b>Trained PMML model</b> that will predict the Species of the streaming instances of the Iris data </li>
</ul>
</div>



<a id="setupStreams"></a>

### 1.1 Add credentials for the IBM Streams service

In order to submit a Streams application you need to provide the name of the Streams instance.

1. From the navigation menu, click **My instances**.
2. Click the **Provisioned Instances** tab.
3. Update the value of `streams_instance_name` in the cell below according to your Streams instance name.

In [None]:
## Paste snippet from development guide
from streamsx.topology.context import ConfigParams
from streamsx.topology import context
import json
import getpass
        
        
service_cfg  = {}
        
SA_credentials=getpass.getpass('Streaming Analytics credentials:')
service_cfg[ConfigParams.SERVICE_DEFINITION] = json.loads(SA_credentials)

def submit_topology(topo):
    global service_cfg
    service_cfg[context.ConfigParams.SSL_VERIFY] = False

    # This specifies how the application will be deployed

    contextType = context.ContextTypes.STREAMING_ANALYTICS_SERVICE

    return context.submit (contextType, topo, config = service_cfg)

<a id="setupPackages"></a>

## 1.2 Optional: Upgrade streamsx Python packages

As described in the [overview](#overview), we need the Python packages providing the Streams functionality to create streaming applications using Python, __streamsx__ and __streamsx.pmml__, __pyspark__, __pyspark2pmml__. These packages should be installed on the notebook. <br>
Uncomment and run the cell below if you want to upgrade to the latest version of the streamsx.pmml package.
 

In [None]:
#!pip install --user --upgrade streamsx.pmml
#!pip install pyspark==2.3.3
# !pip install pyspark2pmml
!wget https://raw.githubusercontent.com/hisah/dataset/master/jpmml-sparkml-executable-1.4.11.jar

The python packages will be installed in the top of user path.<br/>
If you have problem to get the latest version of python packages you can set the order of python packages manually to user path.<br/>
you can find the user path with this command:<br/>
`
import sys
for e in sys.path:
    print(e)
`

In [None]:
#import sys
#sys.path.insert(0, '/home/wsuser/.local/lib/python3.6/site-packages')

In [None]:
# show the installed versions of both packages
import streamsx.topology.context
import streamsx.pmml as pmml
print("INFO: streamsx package version: " + streamsx.topology.context.__version__)
print("INFO: streamsx.pmml package version: " + pmml.__version__)

In [None]:
## Imports
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("SparkKMeans").set("spark.jars", "jpmml-sparkml-executable-1.4.11.jar")
sc = SparkContext(conf = conf)

In [None]:
## Import data
!wget https://raw.githubusercontent.com/hisah/dataset/master/Iris.csv

In [None]:
## Develop model and export as pmml
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import RFormula
from pyspark.sql.session import SparkSession
spark = SparkSession(sc)
df = spark.read.csv("Iris.csv", header = True, inferSchema = True)

formula = RFormula(formula = "Species ~ .")
classifier = DecisionTreeClassifier()
pipeline = Pipeline(stages = [formula, classifier])
pipelineModel = pipeline.fit(df)
# Exporting the fitted example pipeline model to a PMML file:
from pyspark2pmml import PMMLBuilder
pmmlBuilder = PMMLBuilder(sc, df, pipelineModel).putOption(classifier, "compact", True)
pmmlBuilder.buildFile("DecisionTreeIris.pmml")

<a id="setupDataModel"></a>

## 1.3 Add data and model to your project

The PMML model and the streaming data set need to be imported into the project. 

In [None]:
## To do

<a id="create"></a>

# 2. Create the application


All Streams applications start with  a `Topology` object, so we start by creating one.<br>Additionally we add our data set to the topology as a dependency, which means that is bundled within the built application and so available at the environment where the application will run.

In [None]:
## To Do

<a id="defineInput"></a>

## 2.1 Define Data Input 

To simulate an endless stream of data we will read the Iris data file repeatedly and submit each line in the file as a tuple. 

In [None]:
## To do

<a id="analyzeData"></a>
## 2.2 Analyze and score data
Now that we have created a stream of data, we need to define the analytics we want to perform on our data. We want to use the PMML model to predict the Specie of each data instance on the input stream data. 


<a id="prepareData"></a>
### 2.2.1 Prepare data
Before scoring there is the need to make sure that your data is in the format that the model expects. 

In [None]:
## To do

<a id="scoreData"></a>
### 2.2.2 Score data
Invoke the `pmml.score` function to the `preprocess` stream to score data.

In [None]:
## To do

<a id="defineOutput"></a>
## 2.3 Define Output

The `score` stream is our final result.  We use `score.publish()` function to make this stream available to other applications. 

If you want to send the stream to another database or system, you would use a sink function (similar to the source function) and invoke it using e.g. `score.for_each`.



In [None]:
## To do

<a name="submit"></a>

# 3. Submit the application
A running Streams application is called a *job*. This next cell submits the application for execution and prints the resulting job id.


In [None]:
# The submission_result object contains information about the running application, or job
print("Submitting Topology to Streams for execution..")
submission_result = submit_topology(topo)

if submission_result.job:
  streams_job = submission_result.job
  print ("JobId: ", streams_job.id , "\nJob name: ", streams_job.name)
else:
  print("Submission failed: "   + str(submission_result))

<a name="view"></a>

# 4. Use the `View` to access data from the job
Now that the job is started, use the `View` object you created in step 2.3 to start retrieving data from a `Stream`.   

<b> Display the results in real time</b>  

Calling `View.display()` from the notebook displays the results of the view in a table that is updated in real-time.

In [None]:
# Display the results for 30 seconds
score_view.display(duration=30)

<b> See job status </b>  

You can view job status and logs by going to **My Instances** > **Jobs**. Find your job based on the id printed above.
Retrieve job logs using the "Download logs" action from the job's context menu.

To view other information about the job such as detailed metrics, access the graph. Go to **My Instances** > **Jobs**. Select "View graph" action for the running job.


<a name="cancel"></a>

# 5. Cancel the job

This cell generates a widget you can use to cancel the job.

In [None]:
#cancel the job in the IBM Streams service
submission_result.cancel_job_button()

You can also interact with the job through the [Job](https://streamsxtopology.readthedocs.io/en/stable/streamsx.rest_primitives.html#streamsx.rest_primitives.Job) object returned from `submission_result.job`

For example, use `job.cancel()` to cancel the running job directly.

# Summary

We started with a `Stream` called `records`, which contained the data we wanted to analyze. Next, we used functions in the `Stream` object to perform simple preprocessing before we scored the data with a ML model and produced the `score` stream.

After submitting the application to the Streams service, we connected to the `score_view` view to see the results within the notebook.