Skip to content

Commit

Permalink
Attempt to fix deadlock in subscribe locks (#850)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelboulton committed Feb 16, 2023
1 parent 0e40a5f commit 1014f1b
Showing 1 changed file with 15 additions and 13 deletions.
28 changes: 15 additions & 13 deletions tavern/_plugins/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,18 +365,20 @@ def message_received(self, topic: str, timeout: int = 1):
sanitised = root_topic(topic)

try:
msg = self._subscribed[self._subscription_mappings[sanitised]].queue.get(
block=True, timeout=timeout
)
with self._subscribe_lock:
queue = self._subscribed[self._subscription_mappings[sanitised]].queue
except KeyError as e:
raise exceptions.MQTTTopicException(
"Unregistered topic: {}".format(topic)
) from e

try:
msg = queue.get(block=True, timeout=timeout)
except Empty:
logger.error("Message not received after %d seconds", timeout)
return None
else:
return msg

return msg

def publish(self, topic, payload=None, qos=None, retain=None):
"""publish message using paho library"""
Expand Down Expand Up @@ -445,17 +447,17 @@ def subscribe(self, topic: str, *args, **kwargs) -> None:
"""
logger.debug("Subscribing to topic '%s'", topic)

with self._subscribe_lock:
(status, mid) = self._client.subscribe(topic, *args, **kwargs)
(status, mid) = self._client.subscribe(topic, *args, **kwargs)

if status == 0:
sanitised = root_topic(topic)
if status == 0:
sanitised = root_topic(topic)
with self._subscribe_lock:
self._subscription_mappings[sanitised] = mid
self._subscribed[mid] = _Subscription(topic)
else:
raise exceptions.MQTTError(
"Error subscribing to '{}' (err code {})".format(topic, status)
)
else:
raise exceptions.MQTTError(
"Error subscribing to '{}' (err code {})".format(topic, status)
)

def unsubscribe_all(self) -> None:
"""Unsubscribe from all topics"""
Expand Down

0 comments on commit 1014f1b

Please sign in to comment.