From 5b394a76f5ec09c84a0978b274d1ccb0bae48625 Mon Sep 17 00:00:00 2001 From: PHammond Date: Tue, 22 Nov 2022 09:35:02 -0700 Subject: [PATCH] handle username and pw for mqtt source --- streamz/sources.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/streamz/sources.py b/streamz/sources.py index 3c291603..777f9181 100644 --- a/streamz/sources.py +++ b/streamz/sources.py @@ -896,12 +896,15 @@ class from_mqtt(from_q): :param client_kwargs: Passed to the client's ``connect()`` method """ - def __init__(self, host, port, topic, keepalive=60 , client_kwargs=None, **kwargs): + def __init__(self, host, port, topic, keepalive=60 , client_kwargs=None, + user=None, pw=None, **kwargs): self.host = host self.port = port self.keepalive = keepalive self.topic = topic self.client_kwargs = client_kwargs + self.user = user + self.pw = pw super().__init__(q=queue.Queue(), **kwargs) def _on_connect(self, client, userdata, flags, rc): @@ -913,6 +916,8 @@ def _on_message(self, client, userdata, msg): async def run(self): import paho.mqtt.client as mqtt client = mqtt.Client() + if self.user: + client.username_pw_set(self.user, self.pw) client.on_connect = self._on_connect client.on_message = self._on_message client.connect(self.host, self.port, self.keepalive, **(self.client_kwargs or {}))