<table style="border: none" align="left">
   <tr style="border: none">
      <th style="border: none"><font face="verdana" size="5" color="black"><b>Twitter sentiment analysis with IBM Watson Machine Learning</b></th>
      <th style="border: none"><img src="https://github.com/pmservice/customer-satisfaction-prediction/blob/master/app/static/images/ml_icon_gray.png?raw=true" alt="Watson Machine Learning icon" height="40" width="40"></th>
   <tr style="border: none">
       <th style="border: none"><img src="https://github.com/pmservice/wml-sample-models/blob/master/spark/sentiment-prediction/images/sentiment_banner_3-04.png?raw=true" width="600" alt="Icon"> </th>
   </tr>
</table>

This notebook contains steps and code to develop a predictive model, and start scoring new data. This notebook introduces commands for getting data and for basic data cleaning and exploration, pipeline creation, model training, model persistance to Watson Machine Learning repository, model deployment, and scoring.

Some familiarity with Python is helpful. This notebook uses Python 2.0 and Apache® Spark 2.0.

You will use a data set named **Twitter Sentiment**, which includes many tweets and their binary labels (e.g. POSITIVE or NEGATIVE). Use the details of this data set to predict sentiment of tweets.

## Learning goals

The learning goals of this notebook are:

-  Load a CSV file into an Apache® Spark DataFrame.
-  Explore data.
-  Prepare data for training and evaluation.
-  Create an Apache® Spark machine learning pipeline.
-  Train and evaluate a model.
-  Persist a pipeline and model in Watson Machine Learning repository.
-  Explore and visualize prediction result using the plotly package.
-  Deploy a model for streaming using Wastson Machine Learning API.
-  Working with Bluemix MessageHub.


## Contents

This notebook contains the following parts:

1.	[Setup](#setup)
2.	[Load and explore data](#load)
3.	[Create spark ml model](#model)
4.	[Persist model](#persistence)
5.	[Predict locally and visualize](#visualization)
6.	[Deploy and score in a Cloud](#scoring)
7.	[Summary and next steps](#summary)

<a id="setup"></a>
## 1. Setup

Before you use the sample code in this notebook, you must perform the following setup tasks:

-  Create a [Watson Machine Learning Service](https://console.ng.bluemix.net/catalog/services/ibm-watson-machine-learning/) instance (a free plan is offered). 
-  Make sure that you are using a Spark 2.0 kernel.

### Create the Twitter Sentiment Data Asset  

The Twitter Sentiment data is available on project's Github page.

1.  On [Sentiment Prediction project page](https://github.com/pmservice/wml-sample-models/tree/master/spark/sentiment-prediction) and copy the link for the dataset in [data folder](https://github.com/pmservice/wml-sample-models/tree/master/spark/sentiment-prediction/data) by right clicking on file name and selecting "Copy Link Address".
2. Paste the link to any cell and change "blob" with "raw", this will be the link you will be using to download the data in next section.

<a id="load"></a>
## 2. Load and explore data

In this section you will load the data as an Apache® Spark DataFrame and perform a basic exploration.

Load the data to the Spark DataFrame by using wget to upload the data to gpfs and then read method.

Example: First, you need to install required packages. You can do it by running the following code. Run it only one time.

!pip install wget --user 

In [3]:
import wget

link_to_data = 'https://github.com/pmservice/wml-sample-models/raw/master/spark/sentiment-prediction/data/trainingTweets.csv'
filename = wget.download(link_to_data)

print filename

trainingTweets.csv


The csv file trainingTweets.csv is availble on gpfs now. Load the file to Apache® Spark DataFrame using code below.

In [4]:
spark = SparkSession.builder.getOrCreate()

df_data = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load(filename)

[Row(id=1, text=u'realdonaldtrump sse slashes offshore wind investmentwants british government to pay for its losses on these monstrosities   ', label=1),
 Row(id=2, text=u'cnnvideo hillaryclinton dammit why she missed', label=1),
 Row(id=3, text=u'realdonaldtrump thepodiumeffect without passion you dont have energy without energy you have nothing donald trump ', label=1),
 Row(id=4, text=u'sensanders the american higher education system is beginning to fall behind those in the rest of the world fairshot', label=1),
 Row(id=5, text=u'billburton realdonaldtrump as someone who demonstrated such bravery in combat i can see why youd have so many questions oh wait i mean stfu', label=0)]

Explore the loaded data by using the following Apache® Spark DataFrame methods:
-  print schema
-  print top ten records
-  count all records

In [5]:
df_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



As you can see, the data contains 3 fields. "label" field is the one we would like to classify tweets.

In [6]:
df_data.show()

+---+--------------------+-----+
| id|                text|label|
+---+--------------------+-----+
|  1|realdonaldtrump s...|    1|
|  2|cnnvideo hillaryc...|    1|
|  3|realdonaldtrump t...|    1|
|  4|sensanders the am...|    1|
|  5|billburton realdo...|    0|
|  6|reince hillarycli...|    0|
|  7|bentechpro realdo...|    1|
|  8|dahbigj hopeobama...|    0|
|  9|theosmelek thuddy...|    0|
| 10|realdonaldtrump r...|    0|
| 11|jnjinnovation nee...|    0|
| 12|sen tedcruz halbi...|    1|
| 13|progressiveswin t...|    1|
| 14|mclem retward rea...|    1|
| 15|sometimes the onl...|    0|
| 16|cassandra17lina e...|    1|
| 17|realdonaldtrump d...|    1|
| 18|willmcavoyacn hey...|    0|
| 19|hillaryclinton i ...|    0|
| 20|realdonaldtrump d...|    0|
+---+--------------------+-----+
only showing top 20 rows



In [7]:
print "Total number of records: " + str(df_data.count())

5987

Data set contains 5987 records.

<a id="model"></a>
## 3. Create an Apache® Spark machine learning model

In this section you will learn how to prepare data, create an Apache® Spark machine learning pipeline, and train a model.

### 3.1: Prepare data

In this subsection you will split your data into: train, test and predict datasets.

In [8]:
splitted_data = df_data.randomSplit([0.8, 0.18, 0.02], 24)
train_data = splitted_data[0]
test_data = splitted_data[1]
predict_data = splitted_data[2]

print "Number of training records: " + str(train_data.count())
print "Number of testing records : " + str(test_data.count())
print "Number of prediction records : " + str(predict_data.count())

Number of training records: 4783
Number of testing records : 1076
Number of prediction records : 128


As you can see our data has been successfully split into three datasets: 

-  The train data set, which is the largest group, is used for training.
-  The test data set will be used for model evaluation and is used to test the assumptions of the model.
-  The predict data set will be used for prediction.

### 3.2: Create pipeline and train a model

In this section you will create an Apache® Spark machine learning pipeline and then train the model.

In the first step you need to import the Apache® Spark machine learning packages that will be needed in the subsequent steps.

In [9]:
from pyspark.ml.feature import Tokenizer, OneHotEncoder, StringIndexer, HashingTF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # TrainValidationSplit
from pyspark.ml import Pipeline, Model

In the following step, convert all the string fields to numeric ones by using the HashingTF transformer.

In [10]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

Next, define estimators you want to use for classification. Logistic Regression is used in the following example.

In [11]:
lr = LogisticRegression(maxIter=10, regParam=0.01)

Let's build the pipeline now. A pipeline consists of transformers and an estimator. 

ML pipeline will consist of tree stages: tokenizer, hashingTF, and lr.

In [12]:
pipeline_lr = Pipeline(stages=[tokenizer, hashingTF, lr])

Now, you can train your Logistic Regression model by using previously defined **pipeline** and **train data**.

In [13]:
model_lr = pipeline_lr.fit(train_data)

You can check your **model accuracy** now. To evaluate the model, use **test data**.

In [14]:
predictions = model_lr.transform(test_data)
evaluatorRF = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
accuracy = evaluatorRF.evaluate(predictions)

print("Accuracy = %g" % accuracy)
print("Test Error = %g" % (1.0 - accuracy))

Accuracy = 0.761854
Test Error = 0.238146


You can tune your model now to achieve better accuracy. For simplicity of this notebook, tuning section is omitted.

<a id="persistence"></a>
## 4. Persist model

In this section you will learn how to store your pipeline and model in Watson Machine Learning repository by using python client libraries.

First, you must import client libraries.

**Note**: Apache® Spark 2.0 or higher is required.

In [15]:
from repository.mlrepositoryclient import MLRepositoryClient
from repository.mlrepositoryartifact import MLRepositoryArtifact
from repository.mlrepository import MetaProps

Authenticate to Watson Machine Learning service on Bluemix.

**Action**: Put authentication information from your instance of Watson Machine Learning service here.</div>

In [16]:
wml_credentials={
  "url": "https://ibm-watson-ml.mybluemix.net",
  "access_key": "***",
  "username": "***",
  "password": "***",
  "instance_id": "***"
}

**Tip**: service_path, user and password can be found on **Service Credentials** tab of service instance created in Bluemix.

In [17]:
ml_repository_client = MLRepositoryClient(wml_credentials['url'])
ml_repository_client.authorize(wml_credentials['username'], wml_credentials['password'])

Create model artifact (abstraction layer).

In [18]:
model_artifact = MLRepositoryArtifact(model_lr, training_data=train_data, name="Sentiment Prediction Model" )

**Tip**: The MLRepositoryArtifact method expects a trained model object, training data, and a model name. (It is this model name that is displayed by the Watson Machine Learning service).

### 4.1: Save pipeline and model

In this subsection you will learn how to save pipeline and model artifacts to your Watson Machine Learning instance.

In [19]:
saved_model = ml_repository_client.models.save(model_artifact)

Get saved model metadata from Watson Machine Learning.

**Tip**: Use *meta.available_props()* to get the list of available props.

In [20]:
saved_model.meta.available_props()

['inputDataSchema',
 'evaluationMetrics',
 'pipelineVersionHref',
 'modelVersionHref',
 'trainingDataRef',
 'pipelineType',
 'creationTime',
 'lastUpdated',
 'label',
 'authorEmail',
 'trainingDataSchema',
 'authorName',
 'version',
 'modelType',
 'runtime',
 'evaluationMethod']

In [21]:
print "modelType: " + saved_model.meta.prop("modelType")
print "trainingDataSchema: " + str(saved_model.meta.prop("trainingDataSchema"))
print "creationTime: " + str(saved_model.meta.prop("creationTime"))
print "modelVersionHref: " + saved_model.meta.prop("modelVersionHref")
print "label: " + saved_model.meta.prop("label")

modelType: sparkml-model-2.0
trainingDataSchema: {u'fields': [{u'nullable': True, u'type': u'integer', u'name': u'id', u'metadata': {}}, {u'nullable': True, u'type': u'string', u'name': u'text', u'metadata': {}}, {u'nullable': True, u'type': u'integer', u'name': u'label', u'metadata': {}}], u'type': u'struct'}
creationTime: 2017-09-11 21:54:06.675000+00:00
modelVersionHref: https://ibm-watson-ml.mybluemix.net/v2/artifacts/models/ddc9f747-de2e-411b-892b-e9bd9b7846a3/versions/7f42543b-4617-433c-a489-00fa6cac304a
label: label


**Tip**: **modelVersionHref** is our model unique indentifier in the Watson Machine Learning repository.

### 4.2: Load model

In this subsection you will learn how to load back saved model from specified instance of Watson Machine Learning.

In [24]:
loadedModelArtifact = ml_repository_client.models.get(saved_model.uid)

You can print for example model name to make sure that model has been loaded correctly.

In [25]:
print str(loadedModelArtifact.name)

Sentiment Prediction Model


As you can see the name is correct. You have already learned how save and load the model from Watson Machine Learning repository.

<a id="visualization"></a>
## 5. Predict locally and visualize

In this section you will learn how to score test data using loaded model and visualize the prediction results with plotly package.

### 5.1: Make local prediction using previously loaded model and test data

In this subsection you will score *predict_data* data set.

In [26]:
predictions = loadedModelArtifact.model_instance().transform(predict_data)

Preview the results by calling the *show()* method on the predictions DataFrame.

In [27]:
predictions.show(5)

+---+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
| id|                text|label|               words|            features|       rawPrediction|         probability|prediction|
+---+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|254|realdonaldtrump t...|    1|[realdonaldtrump,...|(262144,[4312,961...|[-1.3150612511415...|[0.21164114610918...|       1.0|
|256|realdonaldtrump j...|    0|[realdonaldtrump,...|(262144,[14,13396...|[5.48207446382385...|[0.99585655006955...|       0.0|
|296|realdonaldtrump t...|    1|[realdonaldtrump,...|(262144,[15889,21...|[-1.3175186267989...|[0.21123142555360...|       1.0|
|312|sensanders keep t...|    0|[sensanders, keep...|(262144,[32890,91...|[3.29411163264101...|[0.96422625174639...|       0.0|
|362|katiedaviscourt i...|    0|[katiedaviscourt,...|(262144,[16332,21...|[3.83815328742005...|[0.978920

By tabulating a count, you can see the split by sentiment.

In [28]:
predictions.select("label").groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1|   49|
|    0|   79|
+-----+-----+



### 5.2: Sample visualization of data with Plotly package

In this subsection you will explore prediction results with Plotly, which is an online analytics and data visualization tool.

**Example**: First, you need to install required packages. You can do it by running the following code. Run it only one time.<BR><BR>
!pip install plotly --user <BR>
!pip install cufflinks --user

In [29]:
import sys
import pandas
import plotly.plotly as py
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import cufflinks as cf
import plotly.graph_objs as go

init_notebook_mode(connected=True)
sys.path.append("".join([os.environ["HOME"]])) 

Convert the Apache Spark DataFrame to a Pandas DataFrame.

In [30]:
predictions_pdf = predictions.select("prediction", "label", "text").toPandas()
cumulative_stats = predictions_pdf.groupby(['label']).count()
labels_data_plot = cumulative_stats.index
values_data_plot = cumulative_stats['text']

Plot a pie chart that shows the predicted sentiment label.

In [31]:
product_data = [go.Pie(
            labels=labels_data_plot,
            values=values_data_plot,
    )]

product_layout = go.Layout(
    title='Sentiment',
)

fig = go.Figure(data=product_data, layout=product_layout)
iplot(fig)

<a id="scoring"></a>
## 6. Deploy and score in a Cloud

In this section you will learn how to create streaming deployment by using the Watson Machine Learning REST API and work with Bluemix MessageHub. 
For more information about REST APIs, see the [Swagger Documentation](http://watson-ml-api.mybluemix.net/).

To work with the Watson Machine Leraning REST API you must generate an access token. To do that you can use the following sample code:

In [32]:
import urllib3, requests, json, base64

headers = urllib3.util.make_headers(basic_auth='{username}:{password}'.format(username=wml_credentials['username'], password=wml_credentials['password']))
url = '{url}/v3/identity/token'.format(url=wml_credentials['url'])
response = requests.get(url, headers=headers)
mltoken = json.loads(response.text).get('token')

### 6.1: Create streaming deployment

#### Get published_models url from instance details

In [33]:
endpoint_instance = "{url}/v3/wml_instances/{instance_id}".format(url=wml_credentials['url'], instance_id=wml_credentials['instance_id'])
header = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + mltoken}

response_get_instance = requests.get(endpoint_instance, headers=header)

print response_get_instance
print response_get_instance.text

<Response [200]>
{"metadata":{"guid":"360c510b-012c-4793-ae3f-063410081c3e","url":"https://ibm-watson-ml.mybluemix.net/v3/wml_instances/360c510b-012c-4793-ae3f-063410081c3e","created_at":"2017-08-04T09:15:48.344Z","modified_at":"2017-09-11T21:54:06.800Z"},"entity":{"source":"Bluemix","published_models":{"url":"https://ibm-watson-ml.mybluemix.net/v3/wml_instances/360c510b-012c-4793-ae3f-063410081c3e/published_models"},"usage":{"expiration_date":"2017-10-01T00:00:00.000Z","computation_time":{"current":24},"model_count":{"limit":1000,"current":0},"prediction_count":{"current":39},"deployment_count":{"limit":1000,"current":7}},"plan_id":"0f2a3c2c-456b-40f3-9b19-726d2740b11c","status":"Active","organization_guid":"b0e61605-a82e-4f03-9e9f-2767973c084d","region":"us-south","account":{"id":"f52968f3dbbe7b0b53e15743d45e5e90","name":"Umit Cakmak's Account","type":"TRIAL"},"owner":{"ibm_id":"31000292EV","email":"umit.cakmak@pl.ibm.com","user_id":"43e0ee0e-6bfb-48fc-bcd8-d61e40d19253","country_cod

In [34]:
endpoint_published_models = json.loads(response_get_instance.text).get('entity').get('published_models').get('url')

print endpoint_published_models

https://ibm-watson-ml.mybluemix.net/v3/wml_instances/360c510b-012c-4793-ae3f-063410081c3e/published_models


Execute the following sample code that uses the published_models endpoint to get deployments url.

#### Get the list of published models

In [35]:
header = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + mltoken}
response_get = requests.get(endpoint_published_models, headers=header)

print response_get
print response_get.text

<Response [200]>
{"count":1,"resources":[{"metadata":{"guid":"ddc9f747-de2e-411b-892b-e9bd9b7846a3","url":"https://ibm-watson-ml.mybluemix.net/v3/wml_instances/360c510b-012c-4793-ae3f-063410081c3e/published_models/ddc9f747-de2e-411b-892b-e9bd9b7846a3","created_at":"2017-09-11T21:54:06.675Z","modified_at":"2017-09-11T21:54:06.814Z"},"entity":{"runtime_environment":"spark-2.0","author":{},"name":"Sentiment Prediction Model","label_col":"label","training_data_schema":{"fields":[{"metadata":{},"type":"integer","name":"id","nullable":true},{"metadata":{},"type":"string","name":"text","nullable":true},{"metadata":{},"type":"integer","name":"label","nullable":true}],"type":"struct"},"latest_version":{"url":"https://ibm-watson-ml.mybluemix.net/v2/artifacts/models/ddc9f747-de2e-411b-892b-e9bd9b7846a3/versions/7f42543b-4617-433c-a489-00fa6cac304a","guid":"7f42543b-4617-433c-a489-00fa6cac304a","created_at":"2017-09-11T21:54:06.814Z"},"model_type":"sparkml-model-2.0","deployments":{"count":0,"url"

In [36]:
[endpoint_deployments] = [x.get('entity').get('deployments').get('url') for x in json.loads(response_get.text).get('resources') if x.get('metadata').get('guid') == saved_model.uid]

print endpoint_deployments

https://ibm-watson-ml.mybluemix.net/v3/wml_instances/360c510b-012c-4793-ae3f-063410081c3e/published_models/ddc9f747-de2e-411b-892b-e9bd9b7846a3/deployments


#### Create streaming deployment for published model

Encode the spark credentials

In [None]:
spark_credentials = {
      "tenant_id": "***",
      "tenant_id_full": "***",
      "cluster_master_url": "https://spark.bluemix.net",
      "tenant_secret": "***",
      "instance_id": "***",
      "plan": "***"
    }

In [38]:
spark_instance = {
    "credentials": spark_credentials,
    "version": "2.0"
}
encoded_spark_instance = base64.b64encode(json.dumps(spark_instance))

#### Create streaming deployment for published model

In order to create a streaming deployment, you need to provision MessageHub instance from [Bluemix Catalog](https://console.bluemix.net/catalog). 

Once your instance is provisioned, click on your instance from your Services to open your intance's page.

We will create topics for streaming deployment to work with. Create 2 topics named "streamingi" and "streamingo". 

After input and output topics are created, get your credentials from "Manage Credentials" section.

In [33]:
header_streaming = {'Content-Type': 'application/json', 'Authorization': mltoken, 'X-Spark-Service-Instance': encoded_spark_instance}
payload_streaming = {"type":"stream", "name": "Sentiment Prediction", "description": "Streaming Deployment", "input": { "connection": { "kafka_brokers_sasl": ["kafka01-prod01.messagehub.services.us-south.bluemix.net:9093","kafka02-prod01.messagehub.services.us-south.bluemix.net:9093", "kafka03-prod01.messagehub.services.us-south.bluemix.net:9093", "kafka04-prod01.messagehub.services.us-south.bluemix.net:9093", "kafka05-prod01.messagehub.services.us-south.bluemix.net:9093"], "kafka_admin_url": "https://kafka-admin-prod01.messagehub.services.us-south.bluemix.net:443", "api_key": "gDic7zCjvBJQjMjBMGcVHeBLI5VkqELVQsiuMx8Ka0XtddRk", "mqlight_lookup_url": "https://mqlight-lookup-prod01.messagehub.services.us-south.bluemix.net/Lookup?serviceId=2e9e624d-967b-4bf3-8eda-504e10fbc14d", "kafka_rest_url": "https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443", "user": "gDic7zCjvBJQjMjB", "password": "MGcVHeBLI5VkqELVQsiuMx8Ka0XtddRk"}, "source": { "type": "kafka", "topic": "streamingi" }}, "output": { "connection": { "kafka_brokers_sasl": [ "kafka01-prod01.messagehub.services.us-south.bluemix.net:9093", "kafka02-prod01.messagehub.services.us-south.bluemix.net:9093", "kafka03-prod01.messagehub.services.us-south.bluemix.net:9093", "kafka04-prod01.messagehub.services.us-south.bluemix.net:9093", "kafka05-prod01.messagehub.services.us-south.bluemix.net:9093"], "kafka_admin_url": "https://kafka-admin-prod01.messagehub.services.us-south.bluemix.net:443", "api_key": "gDic7zCjvBJQjMjBMGcVHeBLI5VkqELVQsiuMx8Ka0XtddRk", "mqlight_lookup_url": "https://mqlight-lookup-prod01.messagehub.services.us-south.bluemix.net/Lookup?serviceId=2e9e624d-967b-4bf3-8eda-504e10fbc14d", "kafka_rest_url": "https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443", "user": "gDic7zCjvBJQjMjB", "password": "MGcVHeBLI5VkqELVQsiuMx8Ka0XtddRk"},"target": {"type": "kafka", "topic": "streamingo"}}}

response_streaming = requests.post(endpoint_deployments, json=payload_streaming, headers=header_streaming)

print response_streaming
print response_streaming.text

<Response [201]>
{"metadata":{"guid":"78ed5757-6654-4322-bb78-1f276f7087c7","href":"https://ibm-watson-ml.mybluemix.net/v2/published_models/044564b4-5f12-4a2f-b4b8-9f638b7a359a/deployments/78ed5757-6654-4322-bb78-1f276f7087c7","created_at":"2017-06-20T08:09:01.592Z","modified_at":"2017-06-20T08:09:02.609Z"},"entity":{"runtime_environment":"spark-2.0","name":"Sentiment Prediction","instance_href":"https://ibm-watson-ml.mybluemix.net/v2/streaming/deployments/699480ef-9df5-42d6-b851-85f6af443ff2","description":"Streaming Deployment","published_model":{"author":{},"name":"Sentiment Prediction Model","guid":"044564b4-5f12-4a2f-b4b8-9f638b7a359a","created_at":"2017-06-20T08:09:01.572Z","href":"https://ibm-watson-ml.mybluemix.net/v2/published_models/044564b4-5f12-4a2f-b4b8-9f638b7a359a"},"model_type":"sparkml-model-2.0","status":"INITIALIZING","output":{"connection":{"kafka_brokers_sasl":["kafka01-prod01.messagehub.services.us-south.bluemix.net:9093","kafka02-prod01.messagehub.services.us-sou

Now, you can start produce messages to your topics in MessageHub to make predictions.

<a id="messagehub"></a>
## 7. Working with Bluemix MessageHub

In this section, we will use simple curl calls to work with MessageHub. We will use API key from MessageHub credentials and input/output topic names.

#### Get the list of topics from MessageHub

In [35]:
!curl -H "X-Auth-Token: gDic7zCjvBJQjMjBMGcVHeBLI5VkqELVQsiuMx8Ka0XtddRk" "https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443/topics/"

["streamingi","streamingo"]

### Produce message

#### Encode input data

In [36]:
!echo "{'id': 1, 'text':'My first streaming deployment! It feels so awesome!!'}" | base64

eydpZCc6IDEsICd0ZXh0JzonTXkgZmlyc3Qgc3RyZWFtaW5nIGRlcGxveW1lbnQhIEl0IGZlZWxz
IHNvIGF3ZXNvbWUhISd9Cg==


In order to push this message to our topic, we will use below curl call

In [38]:
!curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \
-H "X-Auth-Token: gDic7zCjvBJQjMjBMGcVHeBLI5VkqELVQsiuMx8Ka0XtddRk" \
-d '{"records":[{ "key": "null", "value": "eydpZCc6IDEsICd0ZXh0JzonTXkgZmlyc3Qgc3RyZWFtaW5nIGRlcGxveW1lbnQhIEl0IGZlZWxzIHNvIGF3ZXNvbWUhISd9Cg=="}]}' \
https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443/topics/streamingi

{"offsets":[{"partition":0,"offset":8,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}

### Create Input consumer

We can read messages from input topic by creating a consumer.

In [39]:
!curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \
-H "X-Auth-Token: gDic7zCjvBJQjMjBMGcVHeBLI5VkqELVQsiuMx8Ka0XtddRk" \
--data '{"name": "my_input_consumer", "format": "binary", "auto.offset.reset": "smallest"}' \
https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443/consumers/my_json_consumer 

{"instance_id":"my_input_consumer","base_uri":"https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net/consumers/my_json_consumer/instances/my_input_consumer"}

#### Read messages from input topic

In [41]:
!curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \
-H "X-Auth-Token: gDic7zCjvBJQjMjBMGcVHeBLI5VkqELVQsiuMx8Ka0XtddRk" \
https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443/consumers/my_json_consumer/instances/my_input_consumer/topics/streamingi

[{"key":"null","value":"eydpZCc6IDEsICd0ZXh0JzonTXkgZmlyc3Qgc3RyZWFtaW5nIGRlcGxveW1lbnQhIEl0IGZlZWxzIHNvIGF3ZXNvbWUhISd9Cg==","partition":0,"offset":8}]

#### Decode the input message

In [44]:
!echo "eydpZCc6IDEsICd0ZXh0JzonTXkgZmlyc3Qgc3RyZWFtaW5nIGRlcGxveW1lbnQhIEl0IGZlZWxzIHNvIGF3ZXNvbWUhISd9Cg==" | base64 -d

{'id': 1, 'text':'My first streaming deployment! It feels so awesome!!'}


### Create Output consumer

When we push messages to input topic, streaming deployment will process that message and write results to output topic. Similarly, we will create consumer to read from output topic.

In [40]:
!curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \
-H "X-Auth-Token: gDic7zCjvBJQjMjBMGcVHeBLI5VkqELVQsiuMx8Ka0XtddRk" \
--data '{"name": "my_output_consumer", "format": "binary", "auto.offset.reset": "smallest"}' \
https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443/consumers/my_json_consumer 

{"instance_id":"my_output_consumer","base_uri":"https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net/consumers/my_json_consumer/instances/my_output_consumer"}

#### Read messages from output topic

In [46]:
!curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \
-H "X-Auth-Token: gDic7zCjvBJQjMjBMGcVHeBLI5VkqELVQsiuMx8Ka0XtddRk" \
https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443/consumers/my_json_consumer/instances/my_output_consumer/topics/streamingo

[{"key":null,"value":"eyJpZCI6MSwidGV4dCI6Ik15IGZpcnN0IHN0cmVhbWluZyBkZXBsb3ltZW50ISBJdCBmZWVscyBzbyBhd2Vzb21lISEiLCJ3b3JkcyI6WyJteSIsImZpcnN0Iiwic3RyZWFtaW5nIiwiZGVwbG95bWVudCEiLCJpdCIsImZlZWxzIiwic28iLCJhd2Vzb21lISEiXSwiZmVhdHVyZXMiOnsidHlwZSI6MCwic2l6ZSI6MjYyMTQ0LCJpbmRpY2VzIjpbMzc4NTIsODIxMTEsODYxNzUsMTQxMDYzLDE1MzA3NywxODg0MjQsMTk2NzY2LDI1NjE3N10sInZhbHVlcyI6WzEuMCwxLjAsMS4wLDEuMCwxLjAsMS4wLDEuMCwxLjBdfSwicmF3UHJlZGljdGlvbiI6eyJ0eXBlIjoxLCJ2YWx1ZXMiOlswLjk1MTU4MDk2MDUzNzY0OTcsLTAuOTUxNTgwOTYwNTM3NjQ5N119LCJwcm9iYWJpbGl0eSI6eyJ0eXBlIjoxLCJ2YWx1ZXMiOlswLjcyMTQzMzAxMDc4NTY3MTEsMC4yNzg1NjY5ODkyMTQzMjg5XX0sInByZWRpY3Rpb24iOjAuMH0=","partition":0,"offset":5}]

#### Decode the output message

In [47]:
!echo "eyJpZCI6MSwidGV4dCI6Ik15IGZpcnN0IHN0cmVhbWluZyBkZXBsb3ltZW50ISBJdCBmZWVscyBzbyBhd2Vzb21lISEiLCJ3b3JkcyI6WyJteSIsImZpcnN0Iiwic3RyZWFtaW5nIiwiZGVwbG95bWVudCEiLCJpdCIsImZlZWxzIiwic28iLCJhd2Vzb21lISEiXSwiZmVhdHVyZXMiOnsidHlwZSI6MCwic2l6ZSI6MjYyMTQ0LCJpbmRpY2VzIjpbMzc4NTIsODIxMTEsODYxNzUsMTQxMDYzLDE1MzA3NywxODg0MjQsMTk2NzY2LDI1NjE3N10sInZhbHVlcyI6WzEuMCwxLjAsMS4wLDEuMCwxLjAsMS4wLDEuMCwxLjBdfSwicmF3UHJlZGljdGlvbiI6eyJ0eXBlIjoxLCJ2YWx1ZXMiOlswLjk1MTU4MDk2MDUzNzY0OTcsLTAuOTUxNTgwOTYwNTM3NjQ5N119LCJwcm9iYWJpbGl0eSI6eyJ0eXBlIjoxLCJ2YWx1ZXMiOlswLjcyMTQzMzAxMDc4NTY3MTEsMC4yNzg1NjY5ODkyMTQzMjg5XX0sInByZWRpY3Rpb24iOjAuMH0=" | base64 -d

{"id":1,"text":"My first streaming deployment! It feels so awesome!!","words":["my","first","streaming","deployment!","it","feels","so","awesome!!"],"features":{"type":0,"size":262144,"indices":[37852,82111,86175,141063,153077,188424,196766,256177],"values":[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]},"rawPrediction":{"type":1,"values":[0.9515809605376497,-0.9515809605376497]},"probability":{"type":1,"values":[0.7214330107856711,0.2785669892143289]},"prediction":0.0}

<a id="summary"></a>
## 8. Summary and next steps     

 You successfully completed this notebook! You learned how to use Apache Spark machine learning as well as Watson Machine Learning for model creation and deployment. Check out our _[Online Documentation](https://console.ng.bluemix.net/docs/services/PredictiveModeling/index.html?pos=2)_ for more samples, tutorials, documentation, how-tos, and blog posts. 

### Authors

**Umit Mert Cakmak**, is a Data Scientist in IBM with a track record of developing enterprise-level applications that substantially increases clients' ability to turn data into actionable knowledge.

Copyright © 2017 IBM. This notebook and its source code are released under the terms of the MIT License.