Permalink
Browse files

Add 'Subscribe' and 'Unsubscribe' keywords

Subscribe keyword returns a list of messages returned within the
specified period. Unsubscribe removes the client's subscription so that
no further messages are received.
  • Loading branch information...
randomsync committed Feb 25, 2015
1 parent 49c268d commit 63dea9a6ef5e0ff92fc76297519a5045c7986032
Showing with 149 additions and 2 deletions.
  1. +1 −1 doc/MQTTLibrary.html
  2. +56 −0 src/MQTTLibrary/MQTTKeywords.py
  3. +1 −0 src/MQTTLibrary/__init__.py
  4. +1 −1 src/MQTTLibrary/version.py
  5. +21 −0 tests/keywords.txt
  6. +69 −0 tests/pubsub.txt
View
@@ -457,7 +457,7 @@
jQuery.extend({highlight:function(e,t,n,r){if(e.nodeType===3){var i=e.data.match(t);if(i){var s=document.createElement(n||"span");s.className=r||"highlight";var o=e.splitText(i.index);o.splitText(i[0].length);var u=o.cloneNode(true);s.appendChild(u);o.parentNode.replaceChild(s,o);return 1}}else if(e.nodeType===1&&e.childNodes&&!/(script|style)/i.test(e.tagName)&&!(e.tagName===n.toUpperCase()&&e.className===r)){for(var a=0;a<e.childNodes.length;a++){a+=jQuery.highlight(e.childNodes[a],t,n,r)}}return 0}});jQuery.fn.unhighlight=function(e){var t={className:"highlight",element:"span"};jQuery.extend(t,e);return this.find(t.element+"."+t.className).each(function(){var e=this.parentNode;e.replaceChild(this.firstChild,this);e.normalize()}).end()};jQuery.fn.highlight=function(e,t){var n={className:"highlight",element:"span",caseSensitive:false,wordsOnly:false};jQuery.extend(n,t);if(e.constructor===String){e=[e]}e=jQuery.grep(e,function(e,t){return e!=""});e=jQuery.map(e,function(e,t){return e.replace(/[-[\]{}()*+?.,\\^$|#\s]/g,"\\$&")});if(e.length==0){return this}var r=n.caseSensitive?"":"i";var i="("+e.join("|")+")";if(n.wordsOnly){i="\\b"+i+"\\b"}var s=new RegExp(i,r);return this.each(function(){jQuery.highlight(this,s,n.element,n.className)})}
</script>
<script type="text/javascript">
libdoc = {"doc":"<p>A keyword library for Robot Framework. It provides keywords for performing various operations on an MQTT broker. See <a href=\"http://mqtt.org/\">http://mqtt.org/\x3c/a> for more details on MQTT specification.\x3c/p>","generated":"2015-02-22 21:37:30","inits":[],"keywords":[{"args":"broker, port=1883, client_id=, clean_session=True","doc":"<p>Connect to an MQTT broker. This is a pre-requisite step for publish and subscribe keywords.\x3c/p>\n<p><span class=\"name\">broker\x3c/span> MQTT broker host\x3c/p>\n<p><span class=\"name\">port\x3c/span> broker port (default 1883)\x3c/p>\n<p><span class=\"name\">client_id\x3c/span> if not specified, a random id is generated\x3c/p>\n<p><span class=\"name\">clean_session\x3c/span> specifies the clean session flag for the connection\x3c/p>\n<p>Examples:\x3c/p>\n<p>Connect to a broker with default port and client id\x3c/p>\n<table border=\"1\">\n<tr>\n<td>Connect\x3c/td>\n<td>127.0.0.1\x3c/td>\n\x3c/tr>\n\x3c/table>\n<p>Connect to a broker by specifying the port and client id explicitly\x3c/p>\n<table border=\"1\">\n<tr>\n<td>Connect\x3c/td>\n<td>127.0.0.1\x3c/td>\n<td>1883\x3c/td>\n<td>test.client\x3c/td>\n\x3c/tr>\n\x3c/table>\n<p>Connect to a broker with clean session flag set to false\x3c/p>\n<table border=\"1\">\n<tr>\n<td>Connect\x3c/td>\n<td>127.0.0.1\x3c/td>\n<td>clean_session=${false}\x3c/td>\n\x3c/tr>\n\x3c/table>","name":"Connect","shortdoc":"Connect to an MQTT broker. This is a pre-requisite step for publish"},{"args":"","doc":"<p>Disconnect from MQTT Broker.\x3c/p>\n<p>Example:\x3c/p>\n<table border=\"1\">\n<tr>\n<td>Disconnect\x3c/td>\n\x3c/tr>\n\x3c/table>","name":"Disconnect","shortdoc":"Disconnect from MQTT Broker."},{"args":"topic, message=None, qos=0, retain=False","doc":"<p>Publish a message to a topic with specified qos and retained flag. It is required that a connection has been established using <a href=\"#Connect\" class=\"name\">Connect\x3c/a> keyword before using this keyword.\x3c/p>\n<p><span class=\"name\">topic\x3c/span> topic to which the message will be published\x3c/p>\n<p><span class=\"name\">message\x3c/span> message payload to publish\x3c/p>\n<p><span class=\"name\">qos\x3c/span> qos of the message\x3c/p>\n<p><span class=\"name\">retain\x3c/span> retained flag\x3c/p>\n<p>Examples:\x3c/p>\n<table border=\"1\">\n<tr>\n<td>Publish\x3c/td>\n<td>test/test\x3c/td>\n<td>test message\x3c/td>\n<td>1\x3c/td>\n<td>${false}\x3c/td>\n\x3c/tr>\n\x3c/table>","name":"Publish","shortdoc":"Publish a message to a topic with specified qos and retained flag."},{"args":"topic, qos, payload, timeout=1","doc":"<p>Subscribe to a topic and validate that the specified payload is received within timeout. It is required that a connection has been established using <a href=\"#Connect\" class=\"name\">Connect\x3c/a> keyword. The payload can be specified as a python regular expression. If the specified payload is not received within timeout, an AssertionError is thrown.\x3c/p>\n<p><span class=\"name\">topic\x3c/span> topic to subscribe to\x3c/p>\n<p><span class=\"name\">qos\x3c/span> quality of service for the subscription\x3c/p>\n<p><span class=\"name\">payload\x3c/span> payload (message) that is expected to arrive\x3c/p>\n<p><span class=\"name\">timeout\x3c/span> time to wait for the payload to arrive\x3c/p>\n<p>Examples:\x3c/p>\n<table border=\"1\">\n<tr>\n<td>Subscribe And Validate\x3c/td>\n<td>test/test\x3c/td>\n<td>1\x3c/td>\n<td>test message\x3c/td>\n\x3c/tr>\n\x3c/table>","name":"Subscribe And Validate","shortdoc":"Subscribe to a topic and validate that the specified payload is"}],"name":"MQTTLibrary","named_args":true,"scope":"global","version":""};
libdoc = {"doc":"<p>A keyword library for Robot Framework. It provides keywords for performing various operations on an MQTT broker. See <a href=\"http://mqtt.org/\">http://mqtt.org/\x3c/a> for more details on MQTT specification.\x3c/p>","generated":"2015-02-24 23:11:57","inits":[],"keywords":[{"args":"broker, port=1883, client_id=, clean_session=True","doc":"<p>Connect to an MQTT broker. This is a pre-requisite step for publish and subscribe keywords.\x3c/p>\n<p><span class=\"name\">broker\x3c/span> MQTT broker host\x3c/p>\n<p><span class=\"name\">port\x3c/span> broker port (default 1883)\x3c/p>\n<p><span class=\"name\">client_id\x3c/span> if not specified, a random id is generated\x3c/p>\n<p><span class=\"name\">clean_session\x3c/span> specifies the clean session flag for the connection\x3c/p>\n<p>Examples:\x3c/p>\n<p>Connect to a broker with default port and client id\x3c/p>\n<table border=\"1\">\n<tr>\n<td>Connect\x3c/td>\n<td>127.0.0.1\x3c/td>\n\x3c/tr>\n\x3c/table>\n<p>Connect to a broker by specifying the port and client id explicitly\x3c/p>\n<table border=\"1\">\n<tr>\n<td>Connect\x3c/td>\n<td>127.0.0.1\x3c/td>\n<td>1883\x3c/td>\n<td>test.client\x3c/td>\n\x3c/tr>\n\x3c/table>\n<p>Connect to a broker with clean session flag set to false\x3c/p>\n<table border=\"1\">\n<tr>\n<td>Connect\x3c/td>\n<td>127.0.0.1\x3c/td>\n<td>clean_session=${false}\x3c/td>\n\x3c/tr>\n\x3c/table>","name":"Connect","shortdoc":"Connect to an MQTT broker. This is a pre-requisite step for publish"},{"args":"","doc":"<p>Disconnect from MQTT Broker.\x3c/p>\n<p>Example:\x3c/p>\n<table border=\"1\">\n<tr>\n<td>Disconnect\x3c/td>\n\x3c/tr>\n\x3c/table>","name":"Disconnect","shortdoc":"Disconnect from MQTT Broker."},{"args":"topic, message=None, qos=0, retain=False","doc":"<p>Publish a message to a topic with specified qos and retained flag. It is required that a connection has been established using <a href=\"#Connect\" class=\"name\">Connect\x3c/a> keyword before using this keyword.\x3c/p>\n<p><span class=\"name\">topic\x3c/span> topic to which the message will be published\x3c/p>\n<p><span class=\"name\">message\x3c/span> message payload to publish\x3c/p>\n<p><span class=\"name\">qos\x3c/span> qos of the message\x3c/p>\n<p><span class=\"name\">retain\x3c/span> retained flag\x3c/p>\n<p>Examples:\x3c/p>\n<table border=\"1\">\n<tr>\n<td>Publish\x3c/td>\n<td>test/test\x3c/td>\n<td>test message\x3c/td>\n<td>1\x3c/td>\n<td>${false}\x3c/td>\n\x3c/tr>\n\x3c/table>","name":"Publish","shortdoc":"Publish a message to a topic with specified qos and retained flag."},{"args":"topic, qos, timeout=1, limit=1","doc":"<p>Subscribe to a topic and return a list of message payloads received within the specified time.\x3c/p>\n<p><span class=\"name\">topic\x3c/span> topic to subscribe to\x3c/p>\n<p><span class=\"name\">qos\x3c/span> quality of service for the subscription\x3c/p>\n<p><span class=\"name\">timeout\x3c/span> duration of subscription\x3c/p>\n<p><span class=\"name\">limit\x3c/span> the max number of payloads that will be returned. Specify 0 for no limit\x3c/p>\n<p>Examples:\x3c/p>\n<p>Subscribe and get a list of all messages received within 5 seconds\x3c/p>\n<table border=\"1\">\n<tr>\n<td>${messages}=\x3c/td>\n<td>Subscribe\x3c/td>\n<td>test/test\x3c/td>\n<td>qos=1\x3c/td>\n<td>timeout=5\x3c/td>\n<td>limit=0\x3c/td>\n\x3c/tr>\n\x3c/table>\n<p>Subscribe and get 1st message received within 60 seconds\x3c/p>\n<table border=\"1\">\n<tr>\n<td>@{messages}=\x3c/td>\n<td>Subscribe\x3c/td>\n<td>test/test\x3c/td>\n<td>qos=1\x3c/td>\n<td>timeout=60\x3c/td>\n<td>limit=1\x3c/td>\n\x3c/tr>\n<tr>\n<td>Length should be\x3c/td>\n<td>${messages}\x3c/td>\n<td>1\x3c/td>\n<td>\x3c/td>\n<td>\x3c/td>\n<td>\x3c/td>\n\x3c/tr>\n\x3c/table>","name":"Subscribe","shortdoc":"Subscribe to a topic and return a list of message payloads received"},{"args":"topic, qos, payload, timeout=1","doc":"<p>Subscribe to a topic and validate that the specified payload is received within timeout. It is required that a connection has been established using <a href=\"#Connect\" class=\"name\">Connect\x3c/a> keyword. The payload can be specified as a python regular expression. If the specified payload is not received within timeout, an AssertionError is thrown.\x3c/p>\n<p><span class=\"name\">topic\x3c/span> topic to subscribe to\x3c/p>\n<p><span class=\"name\">qos\x3c/span> quality of service for the subscription\x3c/p>\n<p><span class=\"name\">payload\x3c/span> payload (message) that is expected to arrive\x3c/p>\n<p><span class=\"name\">timeout\x3c/span> time to wait for the payload to arrive\x3c/p>\n<p>Examples:\x3c/p>\n<table border=\"1\">\n<tr>\n<td>Subscribe And Validate\x3c/td>\n<td>test/test\x3c/td>\n<td>1\x3c/td>\n<td>test message\x3c/td>\n\x3c/tr>\n\x3c/table>","name":"Subscribe And Validate","shortdoc":"Subscribe to a topic and validate that the specified payload is"},{"args":"topic","doc":"<p>Unsubscribe the client from the specified topic.\x3c/p>\n<p><span class=\"name\">topic\x3c/span> topic to unsubscribe from\x3c/p>\n<p>Example:\x3c/p>\n<table border=\"1\">\n<tr>\n<td>Unsubscribe\x3c/td>\n<td>test/mqtt_test\x3c/td>\n\x3c/tr>\n\x3c/table>","name":"Unsubscribe","shortdoc":"Unsubscribe the client from the specified topic."}],"name":"MQTTLibrary","named_args":true,"scope":"global","version":"0.3.0"};
</script>
<title></title>
</head>
@@ -66,6 +66,45 @@ def publish(self, topic, message=None, qos=0, retain=False):
% (topic, message, qos, retain), 'INFO')
self._mqttc.publish(topic, message, int(qos), retain)
def subscribe(self, topic, qos, timeout=1, limit=1):
""" Subscribe to a topic and return a list of message payloads received
within the specified time.
`topic` topic to subscribe to
`qos` quality of service for the subscription
`timeout` duration of subscription
`limit` the max number of payloads that will be returned. Specify 0
for no limit
Examples:
Subscribe and get a list of all messages received within 5 seconds
| ${messages}= | Subscribe | test/test | qos=1 | timeout=5 | limit=0 |
Subscribe and get 1st message received within 60 seconds
| @{messages}= | Subscribe | test/test | qos=1 | timeout=60 | limit=1 |
| Length should be | ${messages} | 1 |
"""
seconds = convert_time(timeout)
self._messages = []
limit = int(limit)
self.builtin.log('Subscribing to topic: %s' % topic, 'INFO')
self._mqttc.subscribe(str(topic), int(qos))
self._mqttc.on_message = self._on_message_list
timer_start = time.time()
while time.time() < timer_start + seconds:
if limit == 0 or len(self._messages) < limit:
self._mqttc.loop()
else:
break
return self._messages
def subscribe_and_validate(self, topic, qos, payload, timeout=1):
""" Subscribe to a topic and validate that the specified payload is
@@ -104,6 +143,18 @@ def subscribe_and_validate(self, topic, qos, payload, timeout=1):
if not self._verified:
raise AssertionError("The expected payload didn't arrive in the topic")
def unsubscribe(self, topic):
""" Unsubscribe the client from the specified topic.
`topic` topic to unsubscribe from
Example:
| Unsubscribe | test/mqtt_test |
"""
self._mqttc.unsubscribe(str(topic))
def disconnect(self):
""" Disconnect from MQTT Broker.
@@ -118,3 +169,8 @@ def _on_message(self, client, userdata, message):
self.builtin.log('Received message: %s on topic: %s with QoS: %s'
% (str(message.payload), message.topic, str(message.qos)), 'DEBUG')
self._verified = re.match(self._payload, str(message.payload))
def _on_message_list(self, client, userdata, message):
self.builtin.log('Received message: %s on topic: %s with QoS: %s'
% (str(message.payload), message.topic, str(message.qos)), 'DEBUG')
self._messages.append(message.payload)
@@ -11,3 +11,4 @@ class MQTTLibrary(MQTTKeywords):
"""
ROBOT_LIBRARY_SCOPE = 'GLOBAL'
ROBOT_LIBRARY_VERSION = __version__
@@ -1 +1 @@
VERSION = '0.2.0.dev1'
VERSION = '0.3.0'
View
@@ -29,3 +29,24 @@
| | Subscribe and Validate
| | ... | ${topic} | ${qos} | ${message} | ${timeout}
| | [Teardown] | Disconnect
| Subscribe and Get Messages
| | [Arguments] | ${broker.uri}=${broker.uri} | ${port}=${broker.port}
| | ... | ${client.id}=${client.id} | ${topic}=${topic}
| | ... | ${qos}=1 | ${timeout}=1s
| | ... | ${limit}=1
| | Connect | ${broker.uri} | ${broker.port} | ${client.id} | ${false}
| | @{messages} | Subscribe | ${topic} | ${qos} | ${timeout} | ${limit}
| | [Teardown] | Disconnect
| | [Return] | @{messages}
| Subscribe and Unsubscribe
| | [Arguments] | ${broker.uri}=${broker.uri} | ${port}=${broker.port}
| | ... | ${client.id}=${client.id} | ${topic}=${topic}
| | ... | ${qos}=1 | ${timeout}=1s
| | ... | ${limit}=1
| | Connect | ${broker.uri} | ${broker.port} | ${client.id} | ${false}
| | @{messages} | Subscribe | ${topic} | ${qos} | ${timeout} | ${limit}
| | Unsubscribe | ${topic}
| | [Teardown] | Disconnect
| | [Return] | @{messages}
View
@@ -59,3 +59,72 @@
| | ... | Subscribe to MQTT Broker and Validate | client.id=${client} | topic=${topic} | message=whatever
| | Publish to MQTT Broker and Disconnect | topic=${topic} | message=${message} | qos=1
| | Subscribe to MQTT Broker and Validate | client.id=${client} | topic=${topic} | message=${regex}
| Subscribe for the first time and validate that no messages are received
| | Sleep | 1s
| | ${time} | Get Time | epoch
| | ${client} | Catenate | SEPARATOR=. | robot.mqtt | ${time}
| | ${topic} | Set Variable | test/mqtt_test_sub
| | @{messages} | Subscribe and Get Messages | client.id=${client} | topic=${topic} | timeout=5s
| | LOG | ${messages}
| | Length Should Be | ${messages} | 0
| Subscribe, publish 1 message and validate it is received
| | Sleep | 1s
| | ${time} | Get Time | epoch
| | ${client} | Catenate | SEPARATOR=. | robot.mqtt | ${time}
| | ${topic} | Set Variable | test/mqtt_test_sub
| | Subscribe and Get Messages | client.id=${client} | topic=${topic}
| | Publish to MQTT Broker and Disconnect | topic=${topic} | message=test message | qos=1
| | @{messages} | Subscribe and Get Messages | client.id=${client} | topic=${topic}
| | LOG | ${messages}
| | Length Should Be | ${messages} | 1
| | Should Be Equal As Strings | @{messages}[0] | test message
| Subscribe with no limit, publish multiple messages and validate they are received
| | Sleep | 1s
| | ${time} | Get Time | epoch
| | ${client} | Catenate | SEPARATOR=. | robot.mqtt | ${time}
| | ${topic} | Set Variable | test/mqtt_test_sub
| | Subscribe and Get Messages | client.id=${client} | topic=${topic}
| | Publish to MQTT Broker and Disconnect | topic=${topic} | message=test message1 | qos=1
| | Publish to MQTT Broker and Disconnect | topic=${topic} | message=test message2 | qos=1
| | Publish to MQTT Broker and Disconnect | topic=${topic} | message=test message3 | qos=1
| | @{messages} | Subscribe and Get Messages | client.id=${client} | topic=${topic} | limit=0
| | LOG | ${messages}
| | Length Should Be | ${messages} | 3
| | Should Be Equal As Strings | @{messages}[0] | test message1
| | Should Be Equal As Strings | @{messages}[1] | test message2
| | Should Be Equal As Strings | @{messages}[2] | test message3
| Subscribe with limit
| | Sleep | 1s
| | ${time} | Get Time | epoch
| | ${client} | Catenate | SEPARATOR=. | robot.mqtt | ${time}
| | ${topic} | Set Variable | test/mqtt_test_sub
| | Subscribe and Get Messages | client.id=${client} | topic=${topic}
| | Publish to MQTT Broker and Disconnect | topic=${topic} | message=test message1 | qos=1
| | Publish to MQTT Broker and Disconnect | topic=${topic} | message=test message2 | qos=1
| | Publish to MQTT Broker and Disconnect | topic=${topic} | message=test message3 | qos=1
| | @{messages} | Subscribe and Get Messages | client.id=${client} | topic=${topic} | limit=1
| | LOG | ${messages}
| | Length Should Be | ${messages} | 1
| | Should Be Equal As Strings | @{messages}[0] | test message1
| | @{messages} | Subscribe and Get Messages | client.id=${client} | topic=${topic} | limit=2
| | LOG | ${messages}
| | Length Should Be | ${messages} | 2
| | Should Be Equal As Strings | @{messages}[0] | test message2
| | Should Be Equal As Strings | @{messages}[1] | test message3
| Unsubscribe and validate no messages are received
| | Sleep | 1s
| | ${time} | Get Time | epoch
| | ${client} | Catenate | SEPARATOR=. | robot.mqtt | ${time}
| | ${topic} | Set Variable | test/mqtt_test_sub
| | Subscribe and Get Messages | client.id=${client} | topic=${topic}
| | Publish to MQTT Broker and Disconnect | topic=${topic} | message=test message1 | qos=1
| | @{messages} | Subscribe and Unsubscribe | client.id=${client} | topic=${topic}
| | Publish to MQTT Broker and Disconnect | topic=${topic} | message=test message2 | qos=1
| | @{messages} | Subscribe and Get Messages | client.id=${client} | topic=${topic}
| | LOG | ${messages}
| | Length Should Be | ${messages} | 0

0 comments on commit 63dea9a

Please sign in to comment.