Skip to content

Commit

Permalink
Update mqtt_as.py
Browse files Browse the repository at this point in the history
  • Loading branch information
rroemhild committed Oct 31, 2019
1 parent 210a6da commit eccafcb
Showing 1 changed file with 113 additions and 65 deletions.
178 changes: 113 additions & 65 deletions lib/mqtt_as.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# (C) Copyright Peter Hinch 2017-2019.
# (C) Copyright Kevin Köck 2018-2019.
# Released under the MIT licence.
# Support for Sonoff removed.
# Support for Sonoff removed. Newer Sonoff devices work reliable without any workarounds.
# ESP32 hacks removed to reflect improvements to firmware.
# Pyboard D support added
# Patch for retained message support supplied by Kevin Köck.
Expand Down Expand Up @@ -140,11 +140,9 @@ def __init__(self, client_id, server, port, user, password, keepalive, ping_inte
self._sta_if.active(True)

self.pid = 0
self.rcv_pid = 0
self.suback = False
self.rcv_pids = set() # PUBACK and SUBACK pids awaiting ACK response
self.last_rx = ticks_ms() # Time of last communication from broker
self.lock = Lock()
self.lock_operation = Lock()

def _set_last_will(self, topic, msg, retain=False, qos=0):
qos_check(qos)
Expand Down Expand Up @@ -319,42 +317,57 @@ def disconnect(self):
self._sock.write(b"\xe0\0")
except OSError:
pass
self._has_connected = False
self.close()

def close(self):
if self._sock is not None:
self._sock.close()

async def _await_pid(self, pid):
t = ticks_ms()
self.dprint("awaiting pid", pid)
while pid in self.rcv_pids: # local copy
if self._timeout(t) or not self.isconnected():
self.dprint("timeout pid", pid, "isconnected", self.isconnected())
break # Must repub or bail out
await asyncio.sleep_ms(100) # putting sleep at the end will solve the issue
# of not recognizing received pids in the event of disconnect
else:
self.dprint("pid", pid, "not in rcv_pids anymore")
return True # PID received. All done.
return False

# qos == 1: coro blocks until wait_msg gets correct PID.
# If WiFi fails completely subclass re-publishes with new PID.
async def publish(self, topic, msg, retain, qos):
self.pid = newpid(self.pid) # Why did I only update self.pid if qos == 1 ??
pid = self.pid # Keep local in case self.pid is updated
if qos:
async with self.lock_operation:
self.pid = newpid(self.pid)
self.rcv_pid = 0
count = 0
async with self.lock:
await self._publish(topic, msg, retain, qos, 0)
while 1: # Await PUBACK, republish on timeout
t = ticks_ms()
while self.pid != self.rcv_pid:
await asyncio.sleep_ms(200)
if self._timeout(t) or not self.isconnected():
break # Must repub or bail out
else:
return # PID's match. All done.
# No match
if count >= self._max_repubs or not self.isconnected():
raise OSError(-1) # Subclass to re-publish with new PID
async with self.lock:
await self._publish(topic, msg, retain, qos, dup=1)
count += 1
self.REPUB_COUNT += 1
else:
self.rcv_pids.add(pid)
self.dprint("publish", topic, msg, "pid", pid)
async with self.lock:
self.dprint("publish lock", topic, pid)
await self._publish(topic, msg, retain, qos, 0,
pid) # ._publish adapted to use passed pid
if qos == 0:
return

count = 0
while 1: # Await PUBACK, republish on timeout
if await self._await_pid(pid):
return
# No match
if count >= self._max_repubs or not self.isconnected(): # **** see note below ****
self.dprint("timeout or max repubs pid", pid)
raise OSError(-1) # Subclass to re-publish with new PID
self.dprint("republish pid", pid)
async with self.lock:
await self._publish(topic, msg, retain, qos, 0)
await self._publish(topic, msg, retain, qos, dup=1, pid=pid) # Add pid
count += 1
self.REPUB_COUNT += 1

async def _publish(self, topic, msg, retain, qos, dup):
async def _publish(self, topic, msg, retain, qos, dup, pid):
pkt = bytearray(b"\x30\0\0\0")
pkt[0] |= qos << 1 | retain | dup << 3
sz = 2 + len(topic) + len(msg)
Expand All @@ -371,46 +384,40 @@ async def _publish(self, topic, msg, retain, qos, dup):
await self._as_write(pkt, i + 1)
await self._send_str(topic)
if qos > 0:
struct.pack_into("!H", pkt, 0, self.pid)
struct.pack_into("!H", pkt, 0, pid)
await self._as_write(pkt, 2)
await self._as_write(msg)

# Can raise OSError if WiFi fails. Subclass traps
async def subscribe(self, topic, qos):
async with self.lock_operation:
self.suback = False
pkt = bytearray(b"\x82\0\0\0")
self.pid = newpid(self.pid)
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
self.pkt = pkt
async with self.lock:
await self._as_write(pkt)
await self._send_str(topic)
await self._as_write(qos.to_bytes(1, "little"))
pkt = bytearray(b"\x82\0\0\0")
pid = newpid(self.pid)
self.pid = pid # will otherwise result in multiple operations having the same pid
self.rcv_pids.add(pid)
self.dprint("subscribe", topic, "pid", pid)
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, pid)
async with self.lock:
await self._as_write(pkt)
await self._send_str(topic)
await self._as_write(qos.to_bytes(1, "little"))

t = ticks_ms()
while not self.suback:
await asyncio.sleep_ms(200)
if self._timeout(t):
raise OSError(-1)
if not await self._await_pid(pid):
raise OSError(-1)

# Can raise OSError if WiFi fails. Subclass traps
async def unsubscribe(self, topic):
async with self.lock_operation:
self.suback = False
pkt = bytearray(b"\xa2\0\0\0")
self.pid = newpid(self.pid)
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic), self.pid)
self.pkt = pkt
async with self.lock:
await self._as_write(pkt)
await self._send_str(topic)
pkt = bytearray(b"\xa2\0\0\0")
pid = newpid(self.pid)
self.pid = pid # will otherwise result in multiple operations having the same pid
self.rcv_pids.add(pid)
self.dprint("unsubscribe", topic, "pid", pid)
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic), pid)
async with self.lock:
await self._as_write(pkt)
await self._send_str(topic)

t = ticks_ms()
while not self.suback:
await asyncio.sleep_ms(200)
if self._timeout(t):
raise OSError(-1)
if not await self._await_pid(pid):
raise OSError(-1)

# Wait for a single incoming MQTT message and process it.
# Subscribed messages are delivered to a callback previously
Expand All @@ -434,19 +441,49 @@ async def wait_msg(self):
if sz != b"\x02":
raise OSError(-1)
rcv_pid = await self._as_read(2)
self.rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
pid = rcv_pid[0] << 8 | rcv_pid[1]
self.dprint("received PUBACK", pid)
if pid in self.rcv_pids:
self.rcv_pids.discard(pid)
else:
self.dprint("PUBACK unknown pid", pid)
raise OSError(-1)
self.dprint("rcv_pids:", self.rcv_pids)
# Discard old pid's: can arise if a publish fails before PUBACK arrives
# self.rcv_pids = {x for x in self.rcv_pids if (self.pid - x) % 65536 < 50}
# not needed as reinitializing set at reconnect.
# self.dprint("rcv_pids after pruning:", self.rcv_pids)

if op == 0x90: # SUBACK
self.dprint("received suback")
resp = await self._as_read(4)
if resp[1] != self.pkt[2] or resp[2] != self.pkt[3] or resp[3] == 0x80:
if resp[3] == 0x80:
self.dprint("suback oserror")
raise OSError(-1)
self.suback = True
pid = resp[2] | (resp[1] << 8)
self.dprint("got suback pid", pid)
if pid in self.rcv_pids:
self.rcv_pids.discard(pid)
else:
self.dprint("SUBACK unknown pid", pid)
raise OSError(-1)
self.dprint("rcv_pids:", self.rcv_pids)
# Discard old pid's. Can arise if subacks lost and no subscribe timeout.
# self.rcv_pids = {x for x in self.rcv_pids if (self.pid - x) % 65536 < 50}
# not needed as reinitializing set at reconnect
# self.dprint("rcv_pids after pruning:", self.rcv_pids)

if op == 0xB0: # UNSUBACK
self.dprint("received unsuback")
resp = await self._as_read(3)
if resp[1] != self.pkt[2] or resp[2] != self.pkt[3]:
pid = resp[2] | (resp[1] << 8)
self.dprint("got unsuback pid", pid)
if pid in self.rcv_pids:
self.rcv_pids.discard(pid)
else:
self.dprint("UNSUBACK unknown pid", pid)
raise OSError(-1)
self.suback = True
self.dprint("rcv_pids:", self.rcv_pids)

if op & 0xf0 != 0x30:
return
Expand Down Expand Up @@ -520,7 +557,12 @@ async def wifi_connect(self):
return
s.active(True)
s.connect() # ESP8266 remembers connection.
while s.status() == network.STAT_CONNECTING: # Break out on fail or success. Check once per sec.
for _ in range(60):
if s.status() != network.STAT_CONNECTING: # Break out on fail or success. Check once per sec.
break
await asyncio.sleep(1)
if s.status() == network.STAT_CONNECTING: # might hang forever awaiting dhcp lease renewal or something else
s.disconnect()
await asyncio.sleep(1)
if not s.isconnected() and self._ssid is not None and self._wifi_pw is not None:
s.connect(self._ssid, self._wifi_pw)
Expand Down Expand Up @@ -566,6 +608,8 @@ async def connect(self):
except Exception:
self.close()
raise
self.dprint("rcv_pids before pruning", self.rcv_pids)
self.rcv_pids = set()
# If we get here without error broker/LAN must be up.
self._isconnected = True
self._in_connect = False # Low level code can now check connectivity.
Expand Down Expand Up @@ -647,7 +691,7 @@ async def _connection(self):
# Scheduled on 1st successful connection. Runs forever maintaining wifi and
# broker connection. Must handle conditions at edge of WiFi range.
async def _keep_connected(self):
while True:
while self._has_connected:
if self.isconnected(): # Pause for 1 second
await asyncio.sleep(1)
gc.collect()
Expand All @@ -663,6 +707,9 @@ async def _keep_connected(self):
await self.wifi_connect()
except OSError:
continue
if not self._has_connected:
self.dprint('Disconnected, exiting _keep_connected')
break
try:
await self.connect()
# Now has set ._isconnected and scheduled _connect_handler().
Expand All @@ -673,6 +720,7 @@ async def _keep_connected(self):
self.close() # Disconnect and try again.
self._in_connect = False
self._isconnected = False
self.dprint('Disconnected, exited _keep_connected')

async def subscribe(self, topic, qos=0):
qos_check(qos)
Expand Down

0 comments on commit eccafcb

Please sign in to comment.