Skip to content

Commit

Permalink
Enhancements on keepalive functionality (#105)
Browse files Browse the repository at this point in the history
* Enhancements on keepalive functionality

* Fix case when no messages from broker for a long time

* Up version
  • Loading branch information
Lenka42 committed Apr 8, 2020
1 parent bf46bad commit c622160
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 6 deletions.
2 changes: 1 addition & 1 deletion gmqtt/__init__.py
Expand Up @@ -13,7 +13,7 @@
"Mikhail Turchunovich",
"Elena Nikolaichik"
]
__version__ = "0.6.3"
__version__ = "0.6.4"


__all__ = [
Expand Down
25 changes: 21 additions & 4 deletions gmqtt/mqtt/connection.py
Expand Up @@ -28,15 +28,17 @@ async def create_connection(cls, host, port, ssl, clean_session, keepalive, loop
return MQTTConnection(transport, protocol, clean_session, keepalive)

def _keep_connection(self):
if self.is_closing():
if self.is_closing() or not self._keepalive:
return

if time.monotonic() - self._last_data_in >= 2 * self._keepalive:
time_ = time.monotonic()
if time_ - self._last_data_in >= 2 * self._keepalive:
logger.warning("[LOST HEARTBEAT FOR %s SECONDS, GOING TO CLOSE CONNECTION]", 2 * self._keepalive)
asyncio.ensure_future(self.close())
return

if time.monotonic() - self._last_data_in >= 0.8 * self._keepalive:
if time_ - self._last_data_out >= 0.8 * self._keepalive or \
time_ - self._last_data_in >= 0.8 * self._keepalive:
self._send_ping_request()
self._keep_connection_callback = asyncio.get_event_loop().call_later(self._keepalive / 2, self._keep_connection)

Expand All @@ -47,6 +49,7 @@ def put_package(self, pkg):
def send_package(self, package):
# This is not blocking operation, because transport place the data
# to the buffer, and this buffer flushing async
self._last_data_out = time.monotonic()
if isinstance(package, (bytes, bytearray)):
package = package
else:
Expand Down Expand Up @@ -83,8 +86,22 @@ def set_handler(self, handler):
self._handler = handler

async def close(self):
self._keep_connection_callback.cancel()
if self._keep_connection_callback:
self._keep_connection_callback.cancel()
self._transport.close()

def is_closing(self):
return self._transport.is_closing()

@property
def keepalive(self):
return self._keepalive

@keepalive.setter
def keepalive(self, value):
if self._keepalive == value:
return
self._keepalive = value
if self._keep_connection_callback:
self._keep_connection_callback.cancel()
self._keep_connection_callback = asyncio.get_event_loop().call_later(self._keepalive / 2, self._keep_connection)
7 changes: 7 additions & 0 deletions gmqtt/mqtt/handler.py
Expand Up @@ -243,6 +243,12 @@ def _parse_properties(self, packet):
properties_dict = dict(properties_dict)
return properties_dict, left_packet

def _update_keepalive_if_needed(self):
if not self._connack_properties.get('server_keep_alive'):
return
self._keepalive = self._connack_properties['server_keep_alive']
self._connection.keepalive = self._keepalive

def _handle_connack_packet(self, cmd, packet):
self._connected.set()

Expand Down Expand Up @@ -270,6 +276,7 @@ def _handle_connack_packet(self, cmd, packet):
self._error = MQTTConnectError(10)
asyncio.ensure_future(self.disconnect())
self._connack_properties = properties
self._update_keepalive_if_needed()

# TODO: Implement checking for the flags and results
# see 3.2.2.3 Connect Return code of the http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.pdf
Expand Down
3 changes: 2 additions & 1 deletion gmqtt/mqtt/protocol.py
@@ -1,6 +1,6 @@
import asyncio
import logging
import struct
import time

from . import package
from .constants import MQTTv50, MQTTCommands
Expand Down Expand Up @@ -42,6 +42,7 @@ def data_received(self, data):
super(BaseMQTTProtocol, self).data_received(data)

def write_data(self, data: bytes):
self._connection._last_data_out = time.monotonic()
if self._transport and not self._transport.is_closing():
self._transport.write(data)
else:
Expand Down

0 comments on commit c622160

Please sign in to comment.