# Lab Sample - Working with IBM Cloud Message Hub with REST API.

IBM Message Hub is a scalable, distributed, high throughput message bus to integrate your on-premise and off-premise cloud technologies. 
You can wire micro-services together using open protocols, connect stream data to analytics to realise powerful insight and feed event data to multiple applications to react in real time.

This lab will introduce you to working with MessageHub   It will be written in Python and run in IBM's Data Science Experience environment through a Jupyter notebook.  While you work, it will be valuable to reference the [Apache Spark Documentation](http://spark.apache.org/docs/latest/programming-guide.html).  Since it is Python, be careful of whitespace!


## The basics
Message Hub acts like a real-time messaging system or data pipeline used for all incoming and outgoing communication. The most important terms are:
- topic
- producer
- consumer
- cluster

A producer sends a message to a specific topic within a Kafka cluster and a consumer receives this message by subscribing to this topic.

Consumers can join what is called a Consumer Group. Within the group, only one consumer will get to process the message. Grouping consumers like that is useful for parallel processing of messages in high-throughput applications.

<img src='https://cdn-images-1.medium.com/max/1600/0*VD_oFFWFrE3QfDGu.png'></img>
(Fig 1: Basic architecture, Kafka Documentation)

The REST API allows you to define consumer instances and consumer groups.

Content for this notebook is prepared from following IBM DSX blog 
https://medium.com/ibm-data-science-experience/receive-messages-from-ibm-message-hub-c52ebdedcfb9

## Step:1 Get Service credentials for provisioned Message Hub Service into notebook.
You can do that rom within a notebook, pick that connection under Data panel and connections section
Click Insert to Code

<img src='https://github.com/smatlapudi/DSX-Training-Labs/raw/master/images/add_connection_to_notebook.png' width="60%" height="60%"></img>

In [None]:
# @hidden_cell
credentials_1 = {
  'instance_id':'d501d02b-234f-458a-8592-7080ad2a0e3b',
  'mqlight_lookup_url':'https://mqlight-lookup-prod02.messagehub.services.us-south.bluemix.net/Lookup?serviceId=d501d02b-234f-458a-8592-7080ad2a0e3b',
  'api_key':'9IyKpJoLKeB3zfPGQs3UUQHDzy6Oicx03EuDl9epGR8elWIi',
  'kafka_admin_url':'https://kafka-admin-prod02.messagehub.services.us-south.bluemix.net:443',
  'kafka_rest_url':'https://kafka-rest-prod02.messagehub.services.us-south.bluemix.net:443',
  'kafka_brokers_sasl':'kafka05-prod02.messagehub.services.us-south.bluemix.net:9093,kafka04-prod02.messagehub.services.us-south.bluemix.net:9093,kafka01-prod02.messagehub.services.us-south.bluemix.net:9093,kafka02-prod02.messagehub.services.us-south.bluemix.net:9093,kafka03-prod02.messagehub.services.us-south.bluemix.net:9093',
  'user':'9IyKpJoLKeB3zfPG',
  'password':"""Qs3UUQHDzy6Oicx03EuDl9epGR8elWIi""",
  'topic':'sfo.lab'
}


## 2) Write Messages to Message Hub Topic


### 2.1) Use python requests library for making http calls

In [None]:
!pip install --upgrade requests
import requests

### 2.2) Import json library and setup http header values for REST call

In [None]:
import json

# credentials_1 in below lines is comming from code insert for message hub connection.
# you may need to change this depending on the credentials name inserted into the code in step#1
authToken = credentials_1['api_key'] 
kafkaRestUrl = credentials_1['kafka_rest_url']
headers = {  
  'X-Auth-Token': authToken,
  'Content-Type': 'application/json',
  'Accept': 'application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json'
}

### 2.3) Prepare http body with  messages to publish

The values submitted in the records array have to be binary encoded for our consumer instance. JSON or Avro encoding is not currently supported with Message Hub REST API.

In [None]:
import binascii

body = json.dumps({  
  'records': [
    {'key': 'key-1','value': binascii.hexlify(b"value one").decode('utf-8')},
    {'key': 'key-2','value': binascii.hexlify(b"value two").decode('utf-8')},
    {'key': 'key-3','value': binascii.hexlify(b"value three").decode('utf-8')},
    {'key': 'key-4','value': binascii.hexlify(b"value four").decode('utf-8')},
    {'key': 'key-5','value': binascii.hexlify(b"value five").decode('utf-8')},
    {'key': 'key-6','value': binascii.hexlify(b"value six").decode('utf-8')},
    {'key': 'key-7','value': binascii.hexlify(b"value seven").decode('utf-8')},
    {'key': 'key-8','value': binascii.hexlify(b"value eight").decode('utf-8')},
    {'key': 'key-9','value': binascii.hexlify(b"value nine").decode('utf-8')},
    {'key': 'key-10','value': binascii.hexlify(b"value ten").decode('utf-8')},
  ]
}, ensure_ascii=False).encode('utf8')

### 2.4) Make the REST call

In [None]:
kafkaTopic = 'sfo_mhub_lab'
url = kafkaRestUrl + "/topics/" + kafkaTopic
response = requests.post(url, data=body, headers=headers)
print(response.text)

In [None]:
###

## Step:3 Read Messages from Message Hub Topic using Message Hub REST API

### 3.1) Create a Kafka consumer group & instance

In [None]:
# give a unique value for instance and group to avoid conflicts
consumerInstance = 'instance11'  
consumerGroup = 'group11'

authToken = credentials_1['api_key']  
kafkaRestUrl = credentials_1['kafka_rest_url']

In [None]:
body2 = json.dumps({  
  'name': consumerInstance,
  'format': 'binary',
  'auto.offset.reset': 'smallest'
})

The following method is used to submit a post request to the Kafka Rest API to create the consumer instance. It expects a valid authentication header and the consumer group and instance names.

In [None]:
def setConsumerInstanceAndGroup(consumerGroup, body1, headers):
  response = requests.post(kafkaRestUrl + "/consumers/" + consumerGroup, data=body1, headers=headers)
  print(response.status_code, response.reason, response.text)
  result = response.json()
  print(result)
  consumerUrl = result['base_uri']
  print(consumerUrl)
  return response

The following is usefullto delete the existing consumer instance and consumer group

In [None]:
def deleteConsumerInstance(consumerGroup, consumerInstance, headers):
  response = requests.delete(kafkaRestUrl + "/consumers/" + consumerGroup + "/instances/" + consumerInstance, headers=headers)
  print(response.status_code, response.reason, response.text)
  return response

Execute setConsumerInstanceAndGroup method once 

In [None]:
setConsumerInstanceAndGroup(consumerGroup, body2, headers)

The method below defines a loop we want to run for some amount of time. We eventually have to stop it in order to process the results. What we have here is not an actual streaming receiver as needed for Spark Streaming. We only demonstrate Spark core functionality by reading messages from Message Hub into a dataframe.

In [None]:
def getMessageFromKafka(maxArrayLength, maxIterations, consumerUrl, headers):  
  results = []
  length = 0
  iteration = 0
  while (length < maxArrayLength):
    if (iteration > maxIterations): break

    response = requests.get(kafkaRestUrl + "/consumers/"+consumerGroup+"/instances/"+consumerInstance+"/topics/"+kafkaTopic, headers=headers)        

    print (response, response.reason, response.text)
    data = response.text 

    x = json.loads(data)
    length = length + len(x)
    iteration = iteration + 1

    print ('===============================')
    print ('Number of incoming messages: ', len(x))
    print ('===============================')

    for obj in x:
      value = binascii.unhexlify(obj['value']).decode('utf-8')

      print(value)
      results.append(value)

  return results

Execute the method to receive all messages currently on the topic. Other parameters are set to limit the length of the message array and the number of iterations. 

In [None]:
maxArrayLength = 100  
maxIterations = 5 

results = getMessageFromKafka (maxArrayLength, maxIterations, url, headers)

Finally delete consumer instance and group to avoid conflicts with other executions of this test code.

In [None]:
deleteConsumerInstance(consumerGroup, consumerInstance, headers)

## Next: Read Messages from Message Hub Topic using Spark Streaming Kafka client.
Advanced exercise. Code not included.