From 520e7eac045b09125ad0911facf3c03d28517c8f Mon Sep 17 00:00:00 2001 From: Jacopo Sartini <51260522+jacoposartini@users.noreply.github.com> Date: Mon, 21 Oct 2019 17:50:19 +0200 Subject: [PATCH 1/4] Update broker.py --- hbmqtt/broker.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index 98bab73..e014775 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -440,7 +440,8 @@ def client_connected(self, listener_name, reader: ReaderAdapter, writer: WriterA client_session, client_session.will_topic, client_session.will_message, - client_session.will_qos) + client_session.will_qos + ) if client_session.will_retain: self.retain_message(client_session, client_session.will_topic, @@ -493,7 +494,7 @@ def client_connected(self, listener_name, reader: ReaderAdapter, writer: WriterA yield from self.plugins_manager.fire_event(EVENT_BROKER_MESSAGE_RECEIVED, client_id=client_session.client_id, message=app_message) - yield from self._broadcast_message(client_session, app_message.topic, app_message.data) + yield from self._broadcast_message_acl(client_session, app_message.topic, app_message.data) if app_message.publish_packet.retain_flag: self.retain_message(client_session, app_message.topic, app_message.data, app_message.qos) wait_deliver = asyncio.Task(handler.mqtt_deliver_next_message(), loop=self._loop) @@ -598,6 +599,7 @@ def topic_filtering(self, session: Session, topic): return topic_result def retain_message(self, source_session, topic_name, data, qos=None): + print("RETAIN M") if data is not None and data != b'': # If retained flag set, store the message for further subscriptions self.logger.debug("Retaining message on topic %s" % topic_name) @@ -725,8 +727,22 @@ def _broadcast_loop(self): if running_tasks: yield from asyncio.wait(running_tasks, loop=self._loop) + + + + @asyncio.coroutine + def _broadcast_message_acl(self, session, topic, data, force_qos=None): + permitted = yield from self.topic_filtering(session, topic=topic) + + if permitted: + yield from self._broadcast_message(session, topic, data, force_qos) + + @asyncio.coroutine def _broadcast_message(self, session, topic, data, force_qos=None): + + print("data"+str(data)) + broadcast = { 'session': session, 'topic': topic, @@ -738,6 +754,8 @@ def _broadcast_message(self, session, topic, data, force_qos=None): @asyncio.coroutine def publish_session_retained_messages(self, session): + print("111111#############################") + print(session) self.logger.debug("Publishing %d messages retained for session %s" % (session.retained_messages.qsize(), format_client_message(session=session)) ) From b4c26b6942ddb29f66146de31b8aa8fced9cb890 Mon Sep 17 00:00:00 2001 From: Jacopo Sartini <51260522+jacoposartini@users.noreply.github.com> Date: Mon, 28 Oct 2019 14:47:38 +0100 Subject: [PATCH 2/4] Deleting my comments --- hbmqtt/broker.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index e014775..a62dad1 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -440,8 +440,7 @@ def client_connected(self, listener_name, reader: ReaderAdapter, writer: WriterA client_session, client_session.will_topic, client_session.will_message, - client_session.will_qos - ) + client_session.will_qos) if client_session.will_retain: self.retain_message(client_session, client_session.will_topic, @@ -599,7 +598,6 @@ def topic_filtering(self, session: Session, topic): return topic_result def retain_message(self, source_session, topic_name, data, qos=None): - print("RETAIN M") if data is not None and data != b'': # If retained flag set, store the message for further subscriptions self.logger.debug("Retaining message on topic %s" % topic_name) @@ -727,9 +725,6 @@ def _broadcast_loop(self): if running_tasks: yield from asyncio.wait(running_tasks, loop=self._loop) - - - @asyncio.coroutine def _broadcast_message_acl(self, session, topic, data, force_qos=None): permitted = yield from self.topic_filtering(session, topic=topic) @@ -737,12 +732,8 @@ def _broadcast_message_acl(self, session, topic, data, force_qos=None): if permitted: yield from self._broadcast_message(session, topic, data, force_qos) - @asyncio.coroutine def _broadcast_message(self, session, topic, data, force_qos=None): - - print("data"+str(data)) - broadcast = { 'session': session, 'topic': topic, From 806dd0776617962748700ac493d33cb048fa71ba Mon Sep 17 00:00:00 2001 From: Jacopo Sartini <51260522+jacoposartini@users.noreply.github.com> Date: Mon, 28 Oct 2019 14:48:20 +0100 Subject: [PATCH 3/4] Update broker.py --- hbmqtt/broker.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index a62dad1..962ab38 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -745,8 +745,6 @@ def _broadcast_message(self, session, topic, data, force_qos=None): @asyncio.coroutine def publish_session_retained_messages(self, session): - print("111111#############################") - print(session) self.logger.debug("Publishing %d messages retained for session %s" % (session.retained_messages.qsize(), format_client_message(session=session)) ) From 213454535f25923fff79be162237c3f5337ee097 Mon Sep 17 00:00:00 2001 From: Jacopo Sartini <51260522+jacoposartini@users.noreply.github.com> Date: Tue, 5 Nov 2019 18:22:44 +0100 Subject: [PATCH 4/4] Update broker.py --- hbmqtt/broker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py index 962ab38..31981e6 100644 --- a/hbmqtt/broker.py +++ b/hbmqtt/broker.py @@ -728,7 +728,6 @@ def _broadcast_loop(self): @asyncio.coroutine def _broadcast_message_acl(self, session, topic, data, force_qos=None): permitted = yield from self.topic_filtering(session, topic=topic) - if permitted: yield from self._broadcast_message(session, topic, data, force_qos)