## Overview

This notebook:

- creates a MessageHub instance in Bluemix
- produces some messages using vanilla python
- consumes those messages using vanilla python
- produces some more messages using vanilla python
- consumes those messages using scala spark streaming (requires step 9)
- tears down the MessageHub instance

Enter your bluemix ID

In [7]:
from getpass import getpass
ibm_id = raw_input("IBM ID: ")

IBM ID: chris.snow@uk.ibm.com


Enter your bluemix ID password

In [8]:
from getpass import getpass
ibm_id_password = getpass("Password: ")

Password: ········


Setup some variables that this notebook will use

In [9]:
# change this to point to your bluemix organization name
bluemix_organization_name = 'chris.snow@uk.ibm.com'

# change this to point to your bluemix space name
bluemix_space_name = 'dev'

# You may need to change the target_endpoint to:
#
#   https://api.ng.bluemix.net     - for the US South Region
#   https://api.eu-gb.bluemix.net  - for the UK
#   https://api.au-syd.bluemix.net - for Australia
target_endpoint = 'https://api.ng.bluemix.net'

# This is the name of the Message Hub service instance that will
# be created for you in Bluemix
messagehub_instance_name = 'my_messagehub'

# This is the name of a topic that will get created for you
messagehub_topic_name = 'my_topic'

# Do you want to delete any existing MessageHub instance with
# messagehub_instance_name for before creating a new one?
delete_messagehub_instance = True

# This is the guid of the Message Hub service plan. I found
# this using:
#
#   cf = CloudFoundryUtil(target_endpoint, ibm_id, ibm_id_password)
#   print(cf.search_plans('message hub'))
mh_service_plan_guid = 'fe959ac5-aa47-43a6-9c58-6fc265ee9b0e'

Install a python utility script from github for interacting with cloud foundry via the API

In [49]:
!pip install --user protobuf==2.6.1
!pip install --user --upgrade git+https://github.com/snowch/cf-python-client
!pip install --user --upgrade git+https://github.com/snowch/cf_utils

Collecting git+https://github.com/snowch/cf-python-client
  Cloning https://github.com/snowch/cf-python-client to /gpfs/fs01/user/s85d-88ebffb000cc3e-39ca506ba762/notebook/tmp/pip-iK0EJD-build
Requirement already up-to-date: protobuf==2.6.1 in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s85d-88ebffb000cc3e-39ca506ba762/.local/lib/python2.7/site-packages (from cloudfoundry-client==0.0.17)
Requirement already up-to-date: oauth2-client==0.0.12 in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s85d-88ebffb000cc3e-39ca506ba762/.local/lib/python2.7/site-packages (from cloudfoundry-client==0.0.17)
Requirement already up-to-date: setuptools in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s85d-88ebffb000cc3e-39ca506ba762/.local/lib/python2.7/site-packages (from protobuf==2.6.1->cloudfoundry-client==0.0.17)
Requirement already up-to-date: requests>=2.5.0 in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s85d-88ebffb000cc3e-39ca506ba762/.local/lib/python2.7/site-packages (from oauth2-client==0.0

Create a python object for interacting with cloud foundry

In [50]:
from cf_utils import cf_utils
cf = cf_utils.CloudFoundryUtil(target_endpoint, ibm_id, ibm_id_password)

Remove any MessageHub instances with the messagehub_instance_name

In [51]:
if delete_messagehub_instance:
    cf.delete_service(service_instance_name=messagehub_instance_name, force=True)

Create a MessageHub service instance

In [52]:
cf.create_service_instance(
    bluemix_organization_name, 
    bluemix_space_name,
    mh_service_plan_guid,
    messagehub_instance_name,
)

Extract some variables that we will need to work with the MessageHub instance

In [53]:
bootstrap_servers = cf.get_service_credential(messagehub_instance_name, 'kafka_brokers_sasl')
# print(bootstrap_servers)

sasl_username = cf.get_service_credential(messagehub_instance_name, 'user')
# print(sasl_username)

sasl_password = cf.get_service_credential(messagehub_instance_name, 'password')
# print(sasl_password)

api_key = cf.get_service_credential(messagehub_instance_name, 'api_key')
# print(api_key)

kafka_admin_url = cf.get_service_credential(messagehub_instance_name, 'kafka_admin_url')
# print(kafka_admin_url)

kafka_rest_url = cf.get_service_credential(messagehub_instance_name, 'kafka_rest_url')
# print(kafka_rest_url)

with open('messagehub.properties', 'w') as w:
    w.write('bootstrap_servers:{0}\n'.format(','.join(bootstrap_servers)))
    w.write('sasl_username={0}\n'.format(sasl_username))
    w.write('sasl_password={0}\n'.format(sasl_password))
    w.write('api_key={0}\n'.format(api_key))
    w.write('kafka_admin_url={0}\n'.format(kafka_admin_url))
    w.write('kafka_rest_url={0}\n'.format(kafka_rest_url))
    w.write('messagehub_topic_name:{0}\n'.format(messagehub_topic_name))

Create a topic in the MessageHub instance

In [15]:
def create_topic(topic_name):
    import requests
    import json

    data = { 'name' : topic_name }
    headers = {
        'content-type': 'application/json',
        'X-Auth-Token' : api_key 
    }
    url = kafka_admin_url + '/admin/topics'

    # create the topic (http POST)
    response = requests.post(url, headers = headers, data = json.dumps(data))

    # verify the topic was created (http GET)
    response = requests.get(url, headers = headers, data = json.dumps(data))
    print (response.text)
    
create_topic(messagehub_topic_name)
create_topic(messagehub_topic_name + "_responses")

[{"name":"my_topic","partitions":1,"retentionMs":"86400000","markedForDeletion":false}]
[{"name":"my_topic","partitions":1,"retentionMs":"86400000","markedForDeletion":false},{"name":"my_topic_responses","partitions":1,"retentionMs":"86400000","markedForDeletion":false}]


Install a python kafka client

In [16]:
!pip install --user --quiet kafka-python

Create a kafka producer

In [17]:
def produce_message(message):
    
    # FIXME:this is a lot of overhead - creating a new KafkaProducer each time
    # a message is produced
    
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    import ssl

    sasl_plain_username = sasl_username
    sasl_plain_password = sasl_password 

    sasl_mechanism = 'PLAIN'
    security_protocol = 'SASL_SSL'

    # Create a new context using system defaults, disable all but TLS1.2
    context = ssl.create_default_context()
    context.options &= ssl.OP_NO_TLSv1
    context.options &= ssl.OP_NO_TLSv1_1

    producer = KafkaProducer(bootstrap_servers = bootstrap_servers,
                             sasl_plain_username = sasl_plain_username,
                             sasl_plain_password = sasl_plain_password,
                             security_protocol = security_protocol,
                             ssl_context = context,
                             sasl_mechanism = sasl_mechanism,
                             api_version = (0,10))

    producer.send(messagehub_topic_name, message)
    producer.flush()

Send some messages

In [18]:
produce_message(b'some_raw_bytes_1')
produce_message(b'some_raw_bytes_2')

Create a kafka consumer to consume the messages

In [40]:
def consume_messages(topic_name):
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    from uuid import uuid1
    import ssl

    sasl_plain_username = sasl_username
    sasl_plain_password = sasl_password 

    sasl_mechanism = 'PLAIN'
    security_protocol = 'SASL_SSL'

    # Create a new context using system defaults, disable all but TLS1.2
    context = ssl.create_default_context()
    context.options &= ssl.OP_NO_TLSv1
    context.options &= ssl.OP_NO_TLSv1_1

    consumer = KafkaConsumer(topic_name,
                             bootstrap_servers = bootstrap_servers,
                             sasl_plain_username = sasl_plain_username,
                             sasl_plain_password = sasl_plain_password,
                             security_protocol = security_protocol,
                             ssl_context = context,
                             sasl_mechanism = sasl_mechanism,
                             api_version = (0,10),
                             consumer_timeout_ms = 10000,
                             auto_offset_reset = 'earliest',
                             group_id = uuid1() # consume all messages
                            )

    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))

consume_messages(messagehub_topic_name)

my_topic:0:0: key=None value=some_raw_bytes_1
my_topic:0:1: key=None value=some_raw_bytes_2
my_topic:0:2: key=None value=1,500
my_topic:0:3: key=None value=1,501
my_topic:0:4: key=None value=1,502
my_topic:0:5: key=None value=1,503
my_topic:0:6: key=None value=100000,503
my_topic:0:7: key=None value=1,500
my_topic:0:8: key=None value=1,501
my_topic:0:9: key=None value=1,502
my_topic:0:10: key=None value=1,503
my_topic:0:11: key=None value=100000,503
my_topic:0:12: key=None value=1,500
my_topic:0:13: key=None value=1,501
my_topic:0:14: key=None value=1,502
my_topic:0:15: key=None value=1,503
my_topic:0:16: key=None value=100000,503
my_topic:0:17: key=None value=1,500
my_topic:0:18: key=None value=1,501
my_topic:0:19: key=None value=1,502
my_topic:0:20: key=None value=1,503
my_topic:0:21: key=None value=100000,503
my_topic:0:22: key=None value=1,500
my_topic:0:23: key=None value=1,501
my_topic:0:24: key=None value=1,502
my_topic:0:25: key=None value=1,503
my_topic:0:26: key=None value=10

Now open the notebook **"Step 9 - Scala spark streaming on DSX"** in another browser window.<br>
Run the notebook and while it is running, put some more messages in kafka by running the cell below.

In [43]:
produce_message('1,500') # user_id | movie_id
produce_message('1,501') # user_id | movie_id
produce_message('1,502') # user_id | movie_id
produce_message('1,503') # user_id | movie_id
produce_message('1000000,503') # user_id | movie_id : invalid user

You should see some predictions output in the Step 9 notebook.<br>
The predictions should have been put on the responses topic.<br>
Run the following cell to retrieve the responses from kafka.

In [44]:
consume_messages(messagehub_topic_name + "_responses")

my_topic_responses:0:0: key=None value=1, 500, 3.685349066437224
my_topic_responses:0:1: key=None value=1, 501, 3.2725849355746823
my_topic_responses:0:2: key=None value=1, 502, 1.9849899602592473
my_topic_responses:0:3: key=None value=1, 503, 2.5721063844976557
my_topic_responses:0:4: key=None value=100000, 503, -1
my_topic_responses:0:5: key=None value=1, 500, 3.685349066437224
my_topic_responses:0:6: key=None value=1, 501, 3.2725849355746823
my_topic_responses:0:7: key=None value=1, 502, 1.9849899602592473
my_topic_responses:0:8: key=None value=1, 503, 2.5721063844976557
my_topic_responses:0:9: key=None value=100000, 503, -1
my_topic_responses:0:10: key=None value=1, 500, 3.685349066437224
my_topic_responses:0:11: key=None value=1, 501, 3.2725849355746823
my_topic_responses:0:12: key=None value=1, 502, 1.9849899602592473
my_topic_responses:0:13: key=None value=1, 503, 2.5721063844976557
my_topic_responses:0:14: key=None value=1000000, 503, -1
my_topic_responses:0:15: key=None value=

Finally, when done you can remove the MessageHub instance from Bluemix

In [45]:
if delete_messagehub_instance:
    cf.delete_service(service_instance_name=messagehub_instance_name, force=True)


