Python MQP Subscribe
===================


This example will use widely available and used python language and libraries to download some announcements, and then retrieve the corresponding data, using only the paho.mqtt.client library, in addition to standard libraries.

In [6]:
import json
import paho.mqtt.client as mqtt
import random
import urllib
import urllib.request


host='localhost'
user='wis2box'
password='wis2box'

r = random.Random()
clientId='MyQueueName'+ f"{r.randint(1,1000):04d}"
# number of messages to subscribe to.
messageCount = 0
messageCountMaximum = 5

# maximum size of data download to print.
sizeMaximumThreshold = 1023

The above imports the modules we need. Assuming localhost is set up and is publishing messages. Message queueing protocols provide real-time notification about availability of products.

The standard python library used to subscribe to messages is paho.mqtt.client. The library uses callbacks.

The first callback needed is called when the connection is established,  which is when we need to subscribe to topics we are interested in (topics are:  'xpublic/#', where / is a topic separator and # is a wildcard for any tree of topics.

The qos=1 refers to Quality of Service.  1 establishes that we want to receive messages at least once. qos==1 is recommended.

messageCount is used to limit the length of the demonstration (otherwise infinite, as it is a continuous flow.)

In [7]:
def sub_connect(client, userdata, flags, rc, properties=None):
    print( "on connection to subscribe:", mqtt.connack_string(rc) )
    for s in [ "xpublic/#" ]: 
        client.subscribe( s , qos=1 )



The second callback needed is called every time a message is received.
The message is decoded and printed.  to keep the output short for the demonstration, we limit the subscriber to a few messages.

In [8]:
def sub_message(client, userdata, msg):
    """
      print messages received.  Exit on count received.
    """
    global messageCount,messageCountMaximum

    m = json.loads(msg.payload.decode('utf-8'))
    print( f"message {messageCount} topic: {msg.topic} received: {m}" )
    print( f"message {messageCount} data: {getData(m)}" )
    messageCount += 1
    if messageCount > messageCountMaximum:
        client.disconnect()
        client.loop_stop()


The message handler above calls getData(). The messages themselves are usually announcements of data availability, but when data is small, they can include the data itself in the *content* field. Usually the message refer to the data using a link. Here is a routine to obtain the data given an announcement message:

In [9]:
def getData(m, sizeMaximum=1000):
    """
      given a message, return the data it refers to
    """
    if 'size' in m and m['size'] > sizeMaximum:
        return f" data too large {m['size']} bytes"
    elif 'content' in m:
        if m['content']['encoding'] == 'base64':
            return b64decode(m['content']['value'])
        else:
            return m['content']['value'].encode('utf-8')
    else:
        url = m['baseUrl'] + '/' + m['relPath']       
        with urllib.request.urlopen(url) as response:
            return response.read()        

The mainline registers the callbacks, connects to the broker, and starts the event loop:

In [10]:
client = mqtt.Client( client_id=clientId, protocol=mqtt.MQTTv5 )
client.on_connect = sub_connect
client.on_message = sub_message
client.username_pw_set( user, password )
client.connect( host )

client.loop_forever()


on connection to subscribe: Connection Accepted.
message 0 topic: xpublic/v03/WIS/us/mobile_rgnl_al/surface/aviation/metar/us received: {'mode': '664', 'mtime': '20220224T052208.259097815', 'atime': '20220224T052208.259097815', 'pubTime': '20220224T052208.264983', 'baseUrl': 'http://localhost:8999/data/20220224T05', 'relPath': 'WIS/us/mobile_rgnl_al/surface/aviation/metar/us/SAUS44_KMOB_240503_COR_8d674aab16213ac2b13fab2d79950456.txt', 'integrity': {'method': 'md5', 'value': 'jWdKqxYhOsKxP6steZUEVg=='}, 'size': 137}
message 0 data: b'SAUS44 KMOB 240503 COR\r\r\nMTRPRN\r\r\nMETAR KPRN 240458Z AUTO 20006G15KT 10SM OVC006 19/16 A3016 RMK AO2 \r\r\nSLP161 T01940161 402830183\r\r\n\r\r\n\x03'
message 1 topic: xpublic/v03/WIS/pr/tjgu/surface/miscellaneous/pr received: {'mode': '664', 'mtime': '20220224T052208.427098989', 'atime': '20220224T052208.427098989', 'pubTime': '20220224T052208.430775', 'baseUrl': 'http://localhost:8999/data/20220224T05', 'relPath': 'WIS/pr/tjgu/surface/miscellaneous

7