Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes taboo topic checking without session username. #151

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 7 additions & 7 deletions hbmqtt/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def start(self):
yield from self.plugins_manager.fire_event(EVENT_BROKER_POST_START)

#Start broadcast loop
self._broadcast_task = asyncio.ensure_future(self._broadcast_loop(), loop=self._loop)
self._broadcast_task = asyncio.async(self._broadcast_loop(), loop=self._loop)

self.logger.debug("Broker started")
except Exception as e:
Expand Down Expand Up @@ -416,10 +416,10 @@ def client_connected(self, listener_name, reader: ReaderAdapter, writer: WriterA
yield from self.publish_session_retained_messages(client_session)

# Init and start loop for handling client messages (publish, subscribe/unsubscribe, disconnect)
disconnect_waiter = asyncio.ensure_future(handler.wait_disconnect(), loop=self._loop)
subscribe_waiter = asyncio.ensure_future(handler.get_next_pending_subscription(), loop=self._loop)
unsubscribe_waiter = asyncio.ensure_future(handler.get_next_pending_unsubscription(), loop=self._loop)
wait_deliver = asyncio.ensure_future(handler.mqtt_deliver_next_message(), loop=self._loop)
disconnect_waiter = asyncio.async(handler.wait_disconnect(), loop=self._loop)
subscribe_waiter = asyncio.async(handler.get_next_pending_subscription(), loop=self._loop)
unsubscribe_waiter = asyncio.async(handler.get_next_pending_unsubscription(), loop=self._loop)
wait_deliver = asyncio.async(handler.mqtt_deliver_next_message(), loop=self._loop)
connected = True
while connected:
try:
Expand Down Expand Up @@ -708,7 +708,7 @@ def _broadcast_loop(self):
(format_client_message(session=broadcast['session']),
broadcast['topic'], format_client_message(session=target_session)))
handler = self._get_handler(target_session)
task = asyncio.ensure_future(
task = asyncio.async(
handler.mqtt_publish(broadcast['topic'], broadcast['data'], qos, retain=False),
loop=self._loop)
running_tasks.append(task)
Expand Down Expand Up @@ -744,7 +744,7 @@ def publish_session_retained_messages(self, session):
handler = self._get_handler(session)
while not session.retained_messages.empty():
retained = yield from session.retained_messages.get()
publish_tasks.append(asyncio.ensure_future(
publish_tasks.append(asyncio.async(
handler.mqtt_publish(
retained.topic, retained.data, retained.qos, True), loop=self._loop))
if publish_tasks:
Expand Down
4 changes: 2 additions & 2 deletions hbmqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def reconnect(self, cleansession=None):
@asyncio.coroutine
def _do_connect(self):
return_code = yield from self._connect_coro()
self._disconnect_task = asyncio.ensure_future(self.handle_connection_close(), loop=self._loop)
self._disconnect_task = asyncio.async(self.handle_connection_close(), loop=self._loop)
return return_code

@mqtt_connected
Expand Down Expand Up @@ -326,7 +326,7 @@ def deliver_message(self, timeout=None):
:return: instance of :class:`hbmqtt.session.ApplicationMessage` containing received message information flow.
:raises: :class:`asyncio.TimeoutError` if timeout occurs before a message is delivered
"""
deliver_task = asyncio.ensure_future(self._handler.mqtt_deliver_next_message(), loop=self._loop)
deliver_task = asyncio.async(self._handler.mqtt_deliver_next_message(), loop=self._loop)
self.client_tasks.append(deliver_task)
self.logger.debug("Waiting message delivery")
done, pending = yield from asyncio.wait([deliver_task], loop=self._loop, return_when=asyncio.FIRST_EXCEPTION, timeout=timeout)
Expand Down
2 changes: 1 addition & 1 deletion hbmqtt/mqtt/protocol/client_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def handle_write_timeout(self):
try:
if not self._ping_task:
self.logger.debug("Scheduling Ping")
self._ping_task = asyncio.ensure_future(self.mqtt_ping())
self._ping_task = asyncio.async(self.mqtt_ping())
except BaseException as be:
self.logger.debug("Exception ignored in ping task: %r" % be)

Expand Down
26 changes: 13 additions & 13 deletions hbmqtt/mqtt/protocol/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,31 +382,31 @@ def _reader_loop(self):
EVENT_MQTT_PACKET_RECEIVED, packet=packet, session=self.session)
task = None
if packet.fixed_header.packet_type == CONNACK:
task = asyncio.ensure_future(self.handle_connack(packet), loop=self._loop)
task = asyncio.async(self.handle_connack(packet), loop=self._loop)
elif packet.fixed_header.packet_type == SUBSCRIBE:
task = asyncio.ensure_future(self.handle_subscribe(packet), loop=self._loop)
task = asyncio.async(self.handle_subscribe(packet), loop=self._loop)
elif packet.fixed_header.packet_type == UNSUBSCRIBE:
task = asyncio.ensure_future(self.handle_unsubscribe(packet), loop=self._loop)
task = asyncio.async(self.handle_unsubscribe(packet), loop=self._loop)
elif packet.fixed_header.packet_type == SUBACK:
task = asyncio.ensure_future(self.handle_suback(packet), loop=self._loop)
task = asyncio.async(self.handle_suback(packet), loop=self._loop)
elif packet.fixed_header.packet_type == UNSUBACK:
task = asyncio.ensure_future(self.handle_unsuback(packet), loop=self._loop)
task = asyncio.async(self.handle_unsuback(packet), loop=self._loop)
elif packet.fixed_header.packet_type == PUBACK:
task = asyncio.ensure_future(self.handle_puback(packet), loop=self._loop)
task = asyncio.async(self.handle_puback(packet), loop=self._loop)
elif packet.fixed_header.packet_type == PUBREC:
task = asyncio.ensure_future(self.handle_pubrec(packet), loop=self._loop)
task = asyncio.async(self.handle_pubrec(packet), loop=self._loop)
elif packet.fixed_header.packet_type == PUBREL:
task = asyncio.ensure_future(self.handle_pubrel(packet), loop=self._loop)
task = asyncio.async(self.handle_pubrel(packet), loop=self._loop)
elif packet.fixed_header.packet_type == PUBCOMP:
task = asyncio.ensure_future(self.handle_pubcomp(packet), loop=self._loop)
task = asyncio.async(self.handle_pubcomp(packet), loop=self._loop)
elif packet.fixed_header.packet_type == PINGREQ:
task = asyncio.ensure_future(self.handle_pingreq(packet), loop=self._loop)
task = asyncio.async(self.handle_pingreq(packet), loop=self._loop)
elif packet.fixed_header.packet_type == PINGRESP:
task = asyncio.ensure_future(self.handle_pingresp(packet), loop=self._loop)
task = asyncio.async(self.handle_pingresp(packet), loop=self._loop)
elif packet.fixed_header.packet_type == PUBLISH:
task = asyncio.ensure_future(self.handle_publish(packet), loop=self._loop)
task = asyncio.async(self.handle_publish(packet), loop=self._loop)
elif packet.fixed_header.packet_type == DISCONNECT:
task = asyncio.ensure_future(self.handle_disconnect(packet), loop=self._loop)
task = asyncio.async(self.handle_disconnect(packet), loop=self._loop)
elif packet.fixed_header.packet_type == CONNECT:
self.handle_connect(packet)
else:
Expand Down
2 changes: 1 addition & 1 deletion hbmqtt/plugins/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def plugins(self):
return self._plugins

def _schedule_coro(self, coro):
return asyncio.ensure_future(coro, loop=self._loop)
return asyncio.async(coro, loop=self._loop)

@asyncio.coroutine
def fire_event(self, event_name, wait=False, *args, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion hbmqtt/plugins/sys/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def _broadcast_sys_topic(self, topic_basename, data):
return (yield from self.context.broadcast_message(topic_basename, data))

def schedule_broadcast_sys_topic(self, topic_basename, data):
return asyncio.ensure_future(
return asyncio.async(
self._broadcast_sys_topic(DOLLAR_SYS_ROOT + topic_basename, data),
loop=self.context.loop
)
Expand Down
8 changes: 4 additions & 4 deletions hbmqtt/plugins/topic_checking.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ def topic_filtering(self, *args, **kwargs):
if filter_result:
session = kwargs.get('session', None)
topic = kwargs.get('topic', None)
if session.username and topic:
if session.username != 'admin' and topic in self._taboo:
return False
if session.username and session.username == 'admin':
return True
else:
if topic and (topic in self._taboo):
return False
else:
return True
return filter_result