Skip to content

Commit

Permalink
Update mqtt_as
Browse files Browse the repository at this point in the history
  • Loading branch information
rroemhild committed Nov 6, 2019
1 parent 444ad73 commit b8ff29a
Showing 1 changed file with 26 additions and 57 deletions.
83 changes: 26 additions & 57 deletions lib/mqtt_as.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
# (C) Copyright Peter Hinch 2017-2019.
# (C) Copyright Kevin Köck 2018-2019.
# Released under the MIT licence.
# Support for Sonoff removed. Newer Sonoff devices work reliable without any workarounds.
# ESP32 hacks removed to reflect improvements to firmware.

# Pyboard D support added
# Patch for retained message support supplied by Kevin Köck.
# Various improvements contributed by Kevin Köck.

import gc
import usocket as socket
Expand All @@ -25,6 +24,8 @@
gc.collect()
from sys import platform

VERSION = (0, 5, 0)

# Default short delay for good SynCom throughput (avoid sleep(0) with SynCom).
_DEFAULT_MS = const(20)
_SOCKET_POLL_DELAY = const(5) # 100ms added greatly to publish latency
Expand Down Expand Up @@ -59,8 +60,11 @@ class MQTTException(Exception):
pass


def newpid(pid):
return pid + 1 if pid < 65535 else 1
def pid_gen():
pid = 0
while True:
pid = pid + 1 if pid < 65535 else 1
yield pid


def qos_check(qos):
Expand Down Expand Up @@ -117,7 +121,7 @@ def __init__(self, client_id, server, port, user, password, keepalive, ping_inte
else:
self._set_last_will(*will)
# WiFi config
self._ssid = ssid # For ESP32 / Pyboard D
self._ssid = ssid # Required ESP32 / Pyboard D
self._wifi_pw = wifi_pw
self._ssl = ssl
self._ssl_params = ssl_params
Expand All @@ -139,7 +143,7 @@ def __init__(self, client_id, server, port, user, password, keepalive, ping_inte
self._sta_if = network.WLAN(network.STA_IF)
self._sta_if.active(True)

self.pid = 0
self.newpid = pid_gen()
self.rcv_pids = set() # PUBACK and SUBACK pids awaiting ACK response
self.last_rx = ticks_ms() # Time of last communication from broker
self.lock = Lock()
Expand Down Expand Up @@ -174,7 +178,7 @@ async def _as_read(self, n, sock=None): # OSError caught by superclass
msg = None
if e.args[0] not in BUSY_ERRORS:
raise
if msg == b'': # Connection closed by host (?)
if msg == b'': # Connection closed by host
raise OSError(-1)
if msg is not None: # data received
data = b''.join((data, msg))
Expand Down Expand Up @@ -232,7 +236,7 @@ async def _connect(self, clean):
import ussl
self._sock = ussl.wrap_socket(self._sock, **self._ssl_params)
premsg = bytearray(b"\x10\0\0\0\0\0")
msg = bytearray(b"\x04MQTT\x04\0\0\0")
msg = bytearray(b"\x04MQTT\x04\0\0\0") # Protocol 3.1.1

sz = 10 + 2 + len(self._client_id)
msg[6] = clean << 1
Expand Down Expand Up @@ -312,9 +316,10 @@ async def broker_up(self): # Test broker connectivity
return True
return False

def disconnect(self):
async def disconnect(self):
try:
self._sock.write(b"\xe0\0")
async with self.lock:
self._sock.write(b"\xe0\0")
except OSError:
pass
self._has_connected = False
Expand All @@ -326,30 +331,22 @@ def close(self):

async def _await_pid(self, pid):
t = ticks_ms()
self.dprint("awaiting pid", pid)
while pid in self.rcv_pids: # local copy
if self._timeout(t) or not self.isconnected():
self.dprint("timeout pid", pid, "isconnected", self.isconnected())
break # Must repub or bail out
await asyncio.sleep_ms(100) # putting sleep at the end will solve the issue
# of not recognizing received pids in the event of disconnect
await asyncio.sleep_ms(100)
else:
self.dprint("pid", pid, "not in rcv_pids anymore")
return True # PID received. All done.
return False

# qos == 1: coro blocks until wait_msg gets correct PID.
# If WiFi fails completely subclass re-publishes with new PID.
async def publish(self, topic, msg, retain, qos):
self.pid = newpid(self.pid) # Why did I only update self.pid if qos == 1 ??
pid = self.pid # Keep local in case self.pid is updated
pid = next(self.newpid)
if qos:
self.rcv_pids.add(pid)
self.dprint("publish", topic, msg, "pid", pid)
async with self.lock:
self.dprint("publish lock", topic, pid)
await self._publish(topic, msg, retain, qos, 0,
pid) # ._publish adapted to use passed pid
await self._publish(topic, msg, retain, qos, 0, pid)
if qos == 0:
return

Expand All @@ -358,10 +355,8 @@ async def publish(self, topic, msg, retain, qos):
if await self._await_pid(pid):
return
# No match
if count >= self._max_repubs or not self.isconnected(): # **** see note below ****
self.dprint("timeout or max repubs pid", pid)
if count >= self._max_repubs or not self.isconnected():
raise OSError(-1) # Subclass to re-publish with new PID
self.dprint("republish pid", pid)
async with self.lock:
await self._publish(topic, msg, retain, qos, dup=1, pid=pid) # Add pid
count += 1
Expand Down Expand Up @@ -391,10 +386,8 @@ async def _publish(self, topic, msg, retain, qos, dup, pid):
# Can raise OSError if WiFi fails. Subclass traps
async def subscribe(self, topic, qos):
pkt = bytearray(b"\x82\0\0\0")
pid = newpid(self.pid)
self.pid = pid # will otherwise result in multiple operations having the same pid
pid = next(self.newpid)
self.rcv_pids.add(pid)
self.dprint("subscribe", topic, "pid", pid)
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, pid)
async with self.lock:
await self._as_write(pkt)
Expand All @@ -407,10 +400,8 @@ async def subscribe(self, topic, qos):
# Can raise OSError if WiFi fails. Subclass traps
async def unsubscribe(self, topic):
pkt = bytearray(b"\xa2\0\0\0")
pid = newpid(self.pid)
self.pid = pid # will otherwise result in multiple operations having the same pid
pid = next(self.newpid)
self.rcv_pids.add(pid)
self.dprint("unsubscribe", topic, "pid", pid)
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic), pid)
async with self.lock:
await self._as_write(pkt)
Expand Down Expand Up @@ -442,48 +433,28 @@ async def wait_msg(self):
raise OSError(-1)
rcv_pid = await self._as_read(2)
pid = rcv_pid[0] << 8 | rcv_pid[1]
self.dprint("received PUBACK", pid)
if pid in self.rcv_pids:
self.rcv_pids.discard(pid)
else:
self.dprint("PUBACK unknown pid", pid)
raise OSError(-1)
self.dprint("rcv_pids:", self.rcv_pids)
# Discard old pid's: can arise if a publish fails before PUBACK arrives
# self.rcv_pids = {x for x in self.rcv_pids if (self.pid - x) % 65536 < 50}
# not needed as reinitializing set at reconnect.
# self.dprint("rcv_pids after pruning:", self.rcv_pids)

if op == 0x90: # SUBACK
self.dprint("received suback")
resp = await self._as_read(4)
if resp[3] == 0x80:
self.dprint("suback oserror")
raise OSError(-1)
pid = resp[2] | (resp[1] << 8)
self.dprint("got suback pid", pid)
if pid in self.rcv_pids:
self.rcv_pids.discard(pid)
else:
self.dprint("SUBACK unknown pid", pid)
raise OSError(-1)
self.dprint("rcv_pids:", self.rcv_pids)
# Discard old pid's. Can arise if subacks lost and no subscribe timeout.
# self.rcv_pids = {x for x in self.rcv_pids if (self.pid - x) % 65536 < 50}
# not needed as reinitializing set at reconnect
# self.dprint("rcv_pids after pruning:", self.rcv_pids)

if op == 0xB0: # UNSUBACK
self.dprint("received unsuback")
resp = await self._as_read(3)
pid = resp[2] | (resp[1] << 8)
self.dprint("got unsuback pid", pid)
if pid in self.rcv_pids:
self.rcv_pids.discard(pid)
else:
self.dprint("UNSUBACK unknown pid", pid)
raise OSError(-1)
self.dprint("rcv_pids:", self.rcv_pids)

if op & 0xf0 != 0x30:
return
Expand Down Expand Up @@ -608,16 +579,16 @@ async def connect(self):
except Exception:
self.close()
raise
self.dprint("rcv_pids before pruning", self.rcv_pids)
self.rcv_pids = set()
self.rcv_pids.clear()
# If we get here without error broker/LAN must be up.
self._isconnected = True
self._in_connect = False # Low level code can now check connectivity.
loop = asyncio.get_event_loop()
loop.create_task(self._wifi_handler(True)) # User handler.
if not self._has_connected:
self._has_connected = True # Use normal clean flag on reconnect.
loop.create_task(self._keep_connected()) # Runs forever.
loop.create_task(
self._keep_connected()) # Runs forever unless user issues .disconnect()

loop.create_task(self._handle_msg()) # Tasks quit on connection fail.
loop.create_task(self._keep_alive())
Expand Down Expand Up @@ -700,14 +671,12 @@ async def _keep_connected(self):
self._sta_isconnected = False
else:
self._sta_if.disconnect()
# if PYBOARD:
# self._sta_if.deinit()
await asyncio.sleep(1)
try:
await self.wifi_connect()
except OSError:
continue
if not self._has_connected:
if not self._has_connected: # User has issued the terminal .disconnect()
self.dprint('Disconnected, exiting _keep_connected')
break
try:
Expand Down

0 comments on commit b8ff29a

Please sign in to comment.