diff --git a/README.md b/README.md index 58244fc..c647ba8 100644 --- a/README.md +++ b/README.md @@ -174,6 +174,31 @@ The complete code for the Channels Chat tutorial application (up to part 3) with the channels-mqtt-proxy additions is here: https://github.com/lbt/channels-mqtt-proxy/tree/main/examples +### Secure connections + +Additional settings in `site/settings.py` to connect to a MQTT broker with a secure connection: + +```python +# Local mqtt settings +MQTT_HOST = "mqtt.example.com" +MQTT_USER = "mqtt-test" +MQTT_PASSWORD = "mqtt-test" +MQTT_VERSION = 311 # defaults to 50 + +# TLS settings +MQTT_USE_SSL = True # enable ssl connection (bool) +MQTT_PORT = 8883 # override the port to connect to (int) +## Stop here if your server certificate has been properly signed. + +## Optional +## Settings to connect to a server with self-signed certificates +MQTT_SSL_VERIFY = False # set to False to connect to a server + # with a self signed certificate +MQTT_SSL_CA = "" # ca file from server +MQTT_SSL_CERT = "" # client specific cert file +MQTT_SSL_KEY = "" # client specific key file +``` + ## Example Usage Create and activate a suitable venv. diff --git a/chanmqttproxy/channelsmqttproxy.py b/chanmqttproxy/channelsmqttproxy.py index dcd1dde..7aa7c3e 100644 --- a/chanmqttproxy/channelsmqttproxy.py +++ b/chanmqttproxy/channelsmqttproxy.py @@ -5,8 +5,10 @@ import os import signal import socket +import ssl from gmqtt import Client as MQTTClient +from gmqtt.mqtt.handler import MQTTConnectError from gmqtt.mqtt.constants import MQTTv311, MQTTv50 LOGGER = logging.getLogger(__name__) @@ -20,9 +22,41 @@ def __init__(self, channel_layer, settings): # Creating the client does not connect. self.mqtt = MQTTClient( f"ChannelsMQTTProxy@{socket.gethostname()}.{os.getpid()}") - self.mqtt.set_auth_credentials(username=settings.MQTT_USER, - password=settings.MQTT_PASSWORD) + try: + self.mqtt.set_auth_credentials(username=settings.MQTT_USER, + password=settings.MQTT_PASSWORD) + except AttributeError: + # Settings are not defined. Try anonymous connection + pass self.mqtt_host = settings.MQTT_HOST + try: + self.mqtt_port = settings.MQTT_PORT + except AttributeError: + # Setting not defined. Use default unsecured. + self.mqtt_port = 1883 + # Set ssl + try: + self.mqtt_usessl = settings.MQTT_USE_SSL + except AttributeError: + # Setting is not defined. Assume false + self.mqtt_usessl = False + + try: + self.mqtt_ssl_ca = settings.MQTT_SSL_CA + self.mqtt_ssl_cert = settings.MQTT_SSL_CERT + self.mqtt_ssl_key = settings.MQTT_SSL_KEY + try: + self.mqtt_ssl_verify = settings.MQTT_SSL_VERIFY + except AttributeError: + # Assume True on error + self.mqtt_ssl_verify = True + except AttributeError: + # Setting is not defined. Set safe values. + self.mqtt_ssl_ca = None + self.mqtt_ssl_cert = None + self.mqtt_ssl_key = None + self.mqtt_ssl_verify = True + try: self.mqtt_version = settings.MQTT_VERSION except AttributeError: @@ -63,7 +97,47 @@ async def connect(self): while not self.mqtt.is_connected: try: - await self.mqtt.connect(self.mqtt_host, version=version) + LOGGER.debug('Connecting to mqtt%s://%s:%s using v%s', + "s" if self.mqtt_usessl else "", + self.mqtt_host, + self.mqtt_port, + self.mqtt_version) + use_ssl = self.mqtt_usessl + if (self.mqtt_usessl) and (self.mqtt_ssl_ca is not None): + LOGGER.debug('Using CA: %s Cert: %s Key: %s Verify: %s', + self.mqtt_ssl_ca, + self.mqtt_ssl_cert, + self.mqtt_ssl_key, + self.mqtt_ssl_verify) + try: + use_ssl = ssl.create_default_context( + ssl.Purpose.SERVER_AUTH, + cafile = self.mqtt_ssl_ca) + use_ssl.check_hostname = self.mqtt_ssl_verify + if self.mqtt_ssl_verify: + use_ssl.verify_mode = ssl.CERT_REQUIRED + else: + use_ssl.verify_mode =ssl.CERT_NONE + use_ssl.load_cert_chain( + certfile=self.mqtt_ssl_cert, + keyfile=self.mqtt_ssl_key) + except ssl.SSLError as e: + LOGGER.error('Error initialising ssl: %s. Retrying.',e) + await asyncio.sleep(1) + continue + await self.mqtt.connect( + self.mqtt_host, + port=self.mqtt_port, + ssl=use_ssl, + version=version) + except MQTTConnectError as e: + # Mqtt server returned an error. + # Back off as to not spam the server + LOGGER.info('MQTT Error trying to connect: %s. Retrying.',e) + # Close the connection since it is running and gmqtt will + # still retry to complete the connection. + await self.mqtt.disconnect() + await asyncio.sleep(30) except Exception as e: LOGGER.warn(f"Error trying to connect: {e}. Retrying.") await asyncio.sleep(1) @@ -77,10 +151,10 @@ async def finish(self): LOGGER.debug("MQTT client disconnected") def _on_connect(self, _client, _flags, _rc, _properties): + LOGGER.debug('Connected') for s in self.subscriptions.keys(): LOGGER.debug(f"Re-subscribing to {s}") self.mqtt.subscribe(s) - LOGGER.debug('Connected and subscribed') def _on_disconnect(self, _client, _packet, _exc=None): LOGGER.debug('Disconnected')