## Overview

This notebook:

- creates a MessageHub instance in Bluemix
- produces some messages using vanilla python
- consumes those messages using vanilla python
- produces some more messages using vanilla python
- consumes those messages using scala spark streaming
- tears down the MessageHub instance

First setup pixiedust

In [3]:
!pip install --user --upgrade --quiet pixiedust
import pixiedust

jars = ["http://central.maven.org/maven2/org/apache/kafka/kafka-clients/0.9.0.0/kafka-clients-0.9.0.0.jar",
        "http://central.maven.org/maven2/org/apache/kafka/kafka_2.10/0.9.0.0/kafka_2.10-0.9.0.0.jar",
        "http://central.maven.org/maven2/org/apache/kafka/kafka-log4j-appender/0.9.0.0/kafka-log4j-appender-0.9.0.0.jar",
        "https://github.com/ibm-messaging/message-hub-samples/raw/master/java/message-hub-login-library/messagehub.login-1.0.0.jar",
        "https://github.com/ibm-messaging/iot-messgehub-spark-samples/releases/download/v0.1/streaming-kafka.jar"]

for j in jars:
    pixiedust.installPackage(j)
    
print("\n** If this is the first time you have run this notebook, please restart the kernel and re-run this notebook now. **")

Pixiedust database opened successfully
Pixiedust version 0.58
Package already installed: http://central.maven.org/maven2/org/apache/kafka/kafka-clients/0.9.0.0/kafka-clients-0.9.0.0.jar
Package already installed: http://central.maven.org/maven2/org/apache/kafka/kafka_2.10/0.9.0.0/kafka_2.10-0.9.0.0.jar
Package already installed: http://central.maven.org/maven2/org/apache/kafka/kafka-log4j-appender/0.9.0.0/kafka-log4j-appender-0.9.0.0.jar
Package already installed: https://github.com/ibm-messaging/message-hub-samples/raw/master/java/message-hub-login-library/messagehub.login-1.0.0.jar
Package already installed: https://github.com/ibm-messaging/iot-messgehub-spark-samples/releases/download/v0.1/streaming-kafka.jar

** If this is the first time you have run this notebook, please restart the kernel and re-run this notebook now. **


Enter your bluemix ID

In [4]:
from getpass import getpass
ibm_id = raw_input("IBM ID: ")

IBM ID: chris.snow@uk.ibm.com


Enter your bluemix ID password

In [5]:
from getpass import getpass
ibm_id_password = getpass("Password: ")

Password: ········


Setup some variables that this notebook will use

In [6]:
# change this to point to your bluemix organization name
bluemix_organization_name = 'chris.snow@uk.ibm.com'

# change this to point to your bluemix space name
bluemix_space_name = 'dev'

# You may need to change the target_endpoint to:
#
#   https://api.ng.bluemix.net     - for the US South Region
#   https://api.eu-gb.bluemix.net  - for the UK
#   https://api.au-syd.bluemix.net - for Austrailia
target_endpoint = 'https://api.ng.bluemix.net'

# This is the name of the Message Hub service instance that will
# be created for you in Bluemix
messagehub_instance_name = 'my_messagehub'

# This is the name of a topic that will get created for you
messagehub_topic_name = 'my_topic'

# Do you want to delete any existing MessageHub instance with
# messagehub_instance_name for before creating a new one?
delete_messagehub_instance = True

# This is the guid of the Message Hub service plan. I found
# this using:
#
#   cf = CloudFoundryUtil(target_endpoint, ibm_id, ibm_id_password)
#   print(cf.search_plans('message hub'))
mh_service_plan_guid = 'fe959ac5-aa47-43a6-9c58-6fc265ee9b0e'

Install a python utility script from github for interacting with cloud foundry via the API

In [7]:
!pip install --user --upgrade protobuf
!pip install --user --upgrade --quiet git+https://github.com/snowch/cf_utils

Requirement already up-to-date: protobuf in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s85d-88ebffb000cc3e-39ca506ba762/.local/lib/python2.7/site-packages
Requirement already up-to-date: setuptools in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s85d-88ebffb000cc3e-39ca506ba762/.local/lib/python2.7/site-packages (from protobuf)
Requirement already up-to-date: six>=1.9 in /usr/local/src/bluemix_jupyter_bundle.v22/notebook/lib/python2.7/site-packages (from protobuf)


Create a python object for interacting with cloud foundry

In [8]:
from cf_utils import cf_utils
cf = cf_utils.CloudFoundryUtil(target_endpoint, ibm_id, ibm_id_password)

Remove any MessageHub instances with the messagehub_instance_name

In [9]:
if delete_messagehub_instance:
    cf.delete_service(service_instance_name=messagehub_instance_name, force=True)




Create a MessageHub service instance

In [10]:
cf.create_service_instance(
    bluemix_organization_name, 
    bluemix_space_name,
    mh_service_plan_guid,
    messagehub_instance_name,
)

Extract some variables that we will need to work with the MessageHub instance

In [11]:
bootstrap_servers = cf.get_service_credential(messagehub_instance_name, 'kafka_brokers_sasl')
# print(bootstrap_servers)

sasl_username = cf.get_service_credential(messagehub_instance_name, 'user')
# print(sasl_username)

sasl_password = cf.get_service_credential(messagehub_instance_name, 'password')
# print(sasl_password)

api_key = cf.get_service_credential(messagehub_instance_name, 'api_key')
# print(api_key)

kafka_admin_url = cf.get_service_credential(messagehub_instance_name, 'kafka_admin_url')
# print(kafka_admin_url)

kafka_rest_url = cf.get_service_credential(messagehub_instance_name, 'kafka_rest_url')
# print(kafka_rest_url)

Create a topic in the MessageHub instance

In [12]:
import requests
import json

data = { 'name' : messagehub_topic_name }
headers = {
    'content-type': 'application/json',
    'X-Auth-Token' : api_key 
}
url = kafka_admin_url + '/admin/topics'

# create the topic (http POST)
response = requests.post(url, headers = headers, data = json.dumps(data))

# verify the topic was created (http GET)
response = requests.get(url, headers = headers, data = json.dumps(data))
print (response.text)

[{"name":"my_topic","partitions":1,"retentionMs":"86400000","markedForDeletion":false}]


Install a python kafka client

In [13]:
!pip install --user --quiet kafka-python

Create a kafka producer and send some messages

In [14]:
def produce_messages():
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    import ssl

    sasl_plain_username = sasl_username
    sasl_plain_password = sasl_password 

    sasl_mechanism = 'PLAIN'
    security_protocol = 'SASL_SSL'

    # Create a new context using system defaults, disable all but TLS1.2
    context = ssl.create_default_context()
    context.options &= ssl.OP_NO_TLSv1
    context.options &= ssl.OP_NO_TLSv1_1

    producer = KafkaProducer(bootstrap_servers = bootstrap_servers,
                             sasl_plain_username = sasl_plain_username,
                             sasl_plain_password = sasl_plain_password,
                             security_protocol = security_protocol,
                             ssl_context = context,
                             sasl_mechanism = sasl_mechanism,
                             api_version = (0,10))

    producer.send(messagehub_topic_name, b'some_raw_bytes_1')
    producer.send(messagehub_topic_name, b'some_raw_bytes_2')

    producer.flush()
    
produce_messages()

Create a kafka consumer to consume the messages

In [15]:
def consume_messages():
    from kafka import KafkaConsumer
    from kafka.errors import KafkaError
    import ssl

    sasl_plain_username = sasl_username
    sasl_plain_password = sasl_password 

    sasl_mechanism = 'PLAIN'
    security_protocol = 'SASL_SSL'

    # Create a new context using system defaults, disable all but TLS1.2
    context = ssl.create_default_context()
    context.options &= ssl.OP_NO_TLSv1
    context.options &= ssl.OP_NO_TLSv1_1

    consumer = KafkaConsumer(messagehub_topic_name,
                             bootstrap_servers = bootstrap_servers,
                             sasl_plain_username = sasl_plain_username,
                             sasl_plain_password = sasl_plain_password,
                             security_protocol = security_protocol,
                             ssl_context = context,
                             sasl_mechanism = sasl_mechanism,
                             api_version = (0,10),
                             consumer_timeout_ms = 10000,
                             auto_offset_reset = 'earliest',
                             group_id = "python_client")

    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))

consume_messages()

my_topic:0:0: key=None value=some_raw_bytes_1
my_topic:0:1: key=None value=some_raw_bytes_2


the scala code expects a single string for the boostrap servers

In [16]:
bootstrap_servers_string = ','.join(bootstrap_servers)

Put some more messages in kafka

In [26]:
produce_messages()
produce_messages()
produce_messages()
produce_messages()
produce_messages()

Now start the kafka spark streaming listener.<br><br>
**Important:** If you run the below cell multiple times, always run the previous cell each time before you run the below cell.

In [27]:
%%scala

import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import com.ibm.cds.spark.samples.config.MessageHubConfig
import com.ibm.cds.spark.samples.dstream.KafkaStreaming.KafkaStreamingContextAdapter
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.serialization.StringDeserializer
import java.util.UUID

val kafkaProps = new MessageHubConfig

kafkaProps.setConfig("bootstrap.servers",   bootstrap_servers_string.toString())
kafkaProps.setConfig("kafka.user.name",     sasl_username.toString())
kafkaProps.setConfig("kafka.user.password", sasl_password.toString())
kafkaProps.setConfig("kafka.topic",         messagehub_topic_name.toString())
kafkaProps.setConfig("api_key",             api_key.toString())
kafkaProps.setConfig("kafka_rest_url",      kafka_rest_url.toString())
kafkaProps.setConfig("auto.offset.reset",   "earliest") // should this be "smallest"?
kafkaProps.setConfig("group.id",            UUID.randomUUID().toString())

kafkaProps.createConfiguration()

// kafkaProps.toImmutableMap.foreach { keyVal => println(keyVal._1 + "=" + keyVal._2) }
// println("group.id=" + kafkaProps.getConfig("group.id"))

val ssc = new StreamingContext( sc, Seconds(2) )

val stream = ssc.createKafkaStream[String, String, StringDeserializer, StringDeserializer](
                     kafkaProps,
                     List(kafkaProps.getConfig("kafka.topic"))
                     )
stream.print()

ssc.start()
ssc.awaitTerminationOrTimeout(30000)
ssc.stop(stopSparkContext=false, stopGracefully=true)

default location of ssl Trust store is: /usr/local/src/spark160master/ibm-java-x86_64-80/jre/lib/security/cacerts
Registering JaasConfiguration: /gpfs/fs01/user/s85d-88ebffb000cc3e-39ca506ba762/notebook/tmp/Ve2C6SkrQBr5UYH1/jaas.conf
group.id=5f33a695-80d4-4efe-8d48-3bc74c19d5ce
default location of ssl Trust store is: /usr/local/src/spark160master/ibm-java-x86_64-80/jre/lib/security/cacerts
-------------------------------------------
Time: 1477916592000 ms
-------------------------------------------
-------------------------------------------
Time: 1477916594000 ms
-------------------------------------------
(null,some_raw_bytes_1)
(null,some_raw_bytes_2)
(null,some_raw_bytes_1)
(null,some_raw_bytes_2)
(null,some_raw_bytes_1)
(null,some_raw_bytes_2)
(null,some_raw_bytes_1)
(null,some_raw_bytes_2)
(null,some_raw_bytes_1)
(null,some_raw_bytes_2)
-------------------------------------------
Time: 1477916596000 ms
-------------------------------------------
---------------------------------

Remove the MessageHub instance from Bluemix

In [None]:
if delete_messagehub_instance:
    cf.delete_service(service_instance_name=messagehub_instance_name, force=True)