Permalink
Browse files

Change create_subscription_queue to not swallow errors; add a test

  • Loading branch information...
osteele committed Feb 10, 2018
1 parent 4d3dc33 commit ba3d9379ee2453bddaa2c3d318d6548f6d0c0416
Showing with 48 additions and 25 deletions.
  1. +6 −4 mqtt_json/receive_mqtt_messages.py
  2. +1 −2 setup.cfg
  3. +41 −19 tests/mqtt_json_test.py
@@ -1,7 +1,6 @@
import json
import logging
import socket
import sys
from queue import Queue

import click
@@ -14,6 +13,10 @@


def create_subscription_queue(topic):
"""Generate messages. These are the decoded JSON payloads of MQTT messages.
Raises a socket.error if the connection fails.
"""
messages = Queue()

def on_connect(client, userdata, flags, rc):
@@ -46,10 +49,9 @@ def on_disconnect(client, userdata, other):
try:
client.connect(config.hostname, 1883, 60)
client.loop_start()
logger.info('subscribed to %s', config.hostname)
except socket.error as err:
print('MQTT:', err, file=sys.stderr)
print('Continuing without subscriptions', file=sys.stderr)
logger.error('MQTT:', err)
raise

while True:
payload = json.loads(messages.get().payload.decode('utf-8'))
@@ -17,8 +17,7 @@ max-line-length = 120

[coverage:report]
exclude_lines =
logger.info
except socket.error as err
if __name__ == '__main__':
omit =
tests/*

@@ -1,34 +1,56 @@
import os
import socket
import sys
from unittest.mock import MagicMock, patch

import pytest

# noqa: I003
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from mqtt_json import Client # noqa: E402,I001,I003
import mqtt_json # noqa: E402,I001,I003


@patch('paho.mqtt.publish.single')
def test_config(publish):
client = Client()
client.publish('topic', k1=1, k2=2)
assert publish.calleds


@patch('paho.mqtt.client.Client')
def test_create_subscription_queue(mqtt_factory):
def make_mqtt_client(connect_error=False):
# Configure the mock object to send a message to the subscriber.
def mock_on_loop_start():
mqtt_client.on_connect(mqtt_client, None, None, None)
if connect_error:
raise socket.error('simulated socket error')
msg = MagicMock()
msg.payload = '{"key":1}'.encode()
mqtt_client.on_connect(mqtt_client, None, None, None)
mqtt_client.on_message(mqtt_client, None, msg)
mqtt_client = mqtt_factory()
mqtt_client = MagicMock()
mqtt_client.connect = MagicMock()
mqtt_client.loop_start = MagicMock(side_effect=mock_on_loop_start)
return mqtt_client


@patch('paho.mqtt.publish.single')
def test_config(publish):
client = mqtt_json.Client()
client.publish('topic', k1=1, k2=2)
assert publish.calleds


def test_create_subscription_queue():
mqtt_client = make_mqtt_client()
with patch('paho.mqtt.client.Client', return_value=mqtt_client):
client = mqtt_json.Client()
queue = client.create_subscription_queue('topic')
# Since create_subscription_queue is a generator, its initial code isn't run
# until the generator's first ("next") value is requested.
msg = next(queue)
mqtt_client.connect.assert_called()
mqtt_client.subscribe.assert_called_with('topic', 0)
mqtt_client.loop_start.assert_called_with()
assert msg == {'key': 1}


client = Client()
queue = client.create_subscription_queue('topic')
msg = next(queue, None)
mqtt_client.connect.assert_called()
mqtt_client.subscribe.assert_called()
mqtt_client.loop_start.assert_called()
assert msg == {'key': 1}
def test_create_subscription_queue_connection_error():
mqtt_client = make_mqtt_client(connect_error=True)
with patch('paho.mqtt.client.Client', return_value=mqtt_client):
client = mqtt_json.Client()
with pytest.raises(socket.error):
queue = client.create_subscription_queue('topic')
# See the comment in `test_create_subscription_queue`.
next(queue)

0 comments on commit ba3d937

Please sign in to comment.