From 52ea7caba7215e9b91b92f497f08cf3c3b524830 Mon Sep 17 00:00:00 2001 From: Jon Nordby Date: Wed, 22 Nov 2017 20:35:13 +0100 Subject: [PATCH 1/2] mqtt: Support TLS connections Activated by specifying mqtts:// scheme instead of mqtt:// By default uses trusted CA certs from system location, but can be overridden with ca_certs MSGFLO_BROKER=mqtts://iot.example.com?ca_certs=/etc/ssl/cert.pem --- msgflo/msgflo.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/msgflo/msgflo.py b/msgflo/msgflo.py index 33df999..b430ca3 100755 --- a/msgflo/msgflo.py +++ b/msgflo/msgflo.py @@ -5,8 +5,10 @@ try: from urllib.parse import urlparse + from urllib.parse import parse_qsl except ImportError: - from urlparse import urlparse + from urlparse import urlparse + from urlparse import parse_qsl from optparse import OptionParser @@ -248,6 +250,12 @@ def __init__(self, broker): self.participants = [] self.connected = False + params = dict(parse_qsl(self.broker_info.query)) + default_port = 1883 + if self.broker_info.scheme == 'mqtts': + default_port = 8883 + ca_certs = params.get('ca_certs') + self._client.tls_set(ca_certs=ca_certs) if self.broker_info.username: self._client.username_pw_set(self.broker_info.username, self.broker_info.password) @@ -257,9 +265,7 @@ def __init__(self, broker): self._client.on_subscribe = lambda c, u, m, q: self._on_subscribe(c, u, m, q) host = self.broker_info.hostname - port = self.broker_info.port - if port is None: - port = 1883 + port = self.broker_info.port or default_port self._client.connect(host, port, 60) def add_participant(self, participant, iips={}): @@ -387,9 +393,9 @@ def run(participants, broker=None, done_cb=None, iips={}): engine = None broker_info = urlparse(broker) - if broker_info.scheme == 'amqp': + if broker_info.scheme in ('amqp', 'amqps'): engine = AmqpEngine(broker) - elif broker_info.scheme == 'mqtt': + elif broker_info.scheme in ('mqtt', 'mqtts'): engine = MqttEngine(broker) else: raise ValueError("msgflo: No engine implementation found for broker URL scheme %s" % (broker_info.scheme,)) From c00156e8cfa205be6cf1574ebc3cd10b6a256dd9 Mon Sep 17 00:00:00 2001 From: Jon Nordby Date: Wed, 22 Nov 2017 20:37:57 +0100 Subject: [PATCH 2/2] mqtt: Also support PEM encoded client cert with privatekey Can be used for authentication, but not supported by all MQTT brokers --- msgflo/msgflo.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/msgflo/msgflo.py b/msgflo/msgflo.py index b430ca3..f8f4a98 100755 --- a/msgflo/msgflo.py +++ b/msgflo/msgflo.py @@ -255,7 +255,9 @@ def __init__(self, broker): if self.broker_info.scheme == 'mqtts': default_port = 8883 ca_certs = params.get('ca_certs') - self._client.tls_set(ca_certs=ca_certs) + certfile = params.get('certfile') + keyfile = params.get('keyfile') + self._client.tls_set(ca_certs=ca_certs, certfile=certfile, keyfile=keyfile) if self.broker_info.username: self._client.username_pw_set(self.broker_info.username, self.broker_info.password)