Skip to content

Commit

Permalink
Responses with MQTT wildcarded topics. (#873)
Browse files Browse the repository at this point in the history
  • Loading branch information
pearmaster committed Jun 25, 2023
1 parent 463099f commit 93188d1
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 18 deletions.
25 changes: 25 additions & 0 deletions example/mqtt/test_mqtt.tavern.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,31 @@ stages:

---

test_name: Test mqtt wildcard subscription

includes:
- !include common.yaml


paho-mqtt: *mqtt_spec

stages:
- *setup_device_for_test

- name: Echo json
mqtt_publish:
topic: /device/{random_device_id}/echo
json:
message: hello world
mqtt_response:
topic: /device/+/echo/response
json:
message: hello world
timeout: 5
qos: 1

---

test_name: Test mqtt message echo json formatted topic name

marks:
Expand Down
28 changes: 10 additions & 18 deletions tavern/_plugins/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@
logger = logging.getLogger(__name__)


def root_topic(topic):
return topic.split("+")[0].split("#")[0]


@dataclasses.dataclass
class _Subscription:
topic: str
Expand Down Expand Up @@ -304,16 +300,15 @@ def _on_message(client, userdata, message) -> None:

logger.info("Received mqtt message on %s", message.topic)

sanitised = root_topic(message.topic)

try:
userdata["_subscribed"][
userdata["_subscription_mappings"][sanitised]
].queue.put(message)
except KeyError as e:
raise exceptions.MQTTTopicException(
"Message received on unregistered topic: {}".format(message.topic)
) from e
for sub_topic, sub_id in userdata["_subscription_mappings"].items():
if paho.topic_matches_sub(sub_topic, message.topic):
userdata["_subscribed"][sub_id].queue.put(message)
break
else:
raise exceptions.MQTTTopicException(
"Message received on unregistered topic: {}".format(message.topic)
)
except Full:
logger.exception("message queue full")

Expand Down Expand Up @@ -369,11 +364,9 @@ def message_received(self, topic: str, timeout: int = 1):
Allow regexes for topic names? Better validation for mqtt payloads
"""

sanitised = root_topic(topic)

try:
with self._subscribe_lock:
queue = self._subscribed[self._subscription_mappings[sanitised]].queue
queue = self._subscribed[self._subscription_mappings[topic]].queue
except KeyError as e:
raise exceptions.MQTTTopicException(
"Unregistered topic: {}".format(topic)
Expand Down Expand Up @@ -457,9 +450,8 @@ def subscribe(self, topic: str, *args, **kwargs) -> None:
(status, mid) = self._client.subscribe(topic, *args, **kwargs)

if status == 0:
sanitised = root_topic(topic)
with self._subscribe_lock:
self._subscription_mappings[sanitised] = mid
self._subscription_mappings[topic] = mid
self._subscribed[mid] = _Subscription(topic)
else:
raise exceptions.MQTTError(
Expand Down

0 comments on commit 93188d1

Please sign in to comment.