diff --git a/README.md b/README.md index d28f6aa..1c144cb 100644 --- a/README.md +++ b/README.md @@ -1 +1,340 @@ -# micropython-mqtt \ No newline at end of file +# Bringing MQTT to MicroPython targets + +MQTT is an easily used networking protocol designed for IOT (internet of +things) applications. It is well suited for controlling hardware devices and +for reading sensors across a local network or the internet. + +The aim of this project is to bring the protocol via WiFi to generic host +devices running MicroPython but lacking a WiFi interface. A cheap ESP8266 board +running firmware from this repository supplies WiFi connectivity. Connection +between the host and the ESP8266 is via five GPIO lines. The means of +communication, and justification for it, is documented +[here](https://github.com/peterhinch/micropython-async/tree/master/syncom_as). +It is designed to be hardware independent requiring three output lines and two +inputs. It uses no hardware-specific features like timers, interrupts or +machine code, nor does it make assumptions about processor speed. It should be +compatible with any hardware running MicroPython and having five free GPIO +lines. + +The driver is event driven using uasyncio for asynchronous programming. +Applications continue to run unaffected by delays experienced on the ESP8266. + +This document assumes familiarity with the umqtt and uasyncio libraries. +Unofficial guides may be found via these links: +[umqtt FAQ](https://forum.micropython.org/viewtopic.php?f=16&t=2239&p=12694). +[uasyncio tutorial](https://github.com/peterhinch/micropython-async/blob/master/TUTORIAL.md). + +The ESP8266 operates in station mode. The host interface aims to support all +the MQTT functionality provided in the umqtt library. It tries to keep the link +to the broker open continuously, allowing applications which seldom or never +publish. The ESP8266 firmware and the host interface are designed to be +resilient under error conditions. The ESP8266 is rebooted in the event of fatal +errors or crashes. + +# Project status + +At the time of writing (June 2017) the code is best described as being of beta +quality. Testing has been done with a local broker. It is untested with a +broker on the WAN, also with SSL. If anyone is in a position to test these I +would welcome reports. Please raise an issue - including to report positive +outcomes :). + +Testing was performed using a Pyboard V1.0 as the host. The following boards +have run as ESP8266 targets: Adafruit Feather Huzzah, Adafruit Huzzah and WeMos +D1 Mini. + +Originally written when the ESP8266 firmware was error-prone it has substantial +provision for error recovery. Much of the code deals with events like blocked +sockets, ESP8266 crashes and WiFi and broker outages. The underlying stability +of MicroPython on the ESP8266 is now excellent, but wireless networking remains +vulnerable to error conditions such as radio frequency interference or the host +moving out of range of the access point. + +# 1. Wiring + +Connections to the ESP8266 are as follows. + +In the table below Feather refers to the Adafruit Feather Huzzah reference board +or to the Huzzah with serial rather than USB connectivity. Mini refers to the +WeMos D1 Mini. Pyboard refers to any Pyboard version. Pins are for the Pyboard +test programs, but host pins may be changed at will in the application code. + + +| Signal | Feather | Mini | Pyboard | Signal | +|:-------:|:-------:|:----:|:-------:|:-------:| +| mckin | 12 | D6 | Y6 | sckout | +| mrx | 13 | D7 | Y5 | stx | +| mtx | 14 | D5 | Y7 | srx | +| mckout | 15 | D8 | Y8 | sckin | +| reset | reset | rst | Y4 | reset | + +Host and target must share a sigle power supply with a common ground. + +# 2. The Host + +The MQTT API is via the ``MQTTlink`` class described below. + +## 2.1 Files + +### 2.1.1 Dependencies + +The following files originate from the [micropython-async](https://github.com/peterhinch/micropython-async.git) library. +``aswitch.py`` Provides a retriggerable delay class. +``asyn.py`` Synchronisation primitives. +``syncom.py`` Communication library. + +From this library: +``pbmqtt.py`` Python MQTT interface. +``status_values.py`` Numeric constants shared between user code, the ESP8266 +firmware and ``pbmqtt.py``; including status values sent to host from ESP8266. +``net_local.py`` Local network credentials and MQTT parameters. Edit this. + +### 2.1.2 Test programs + +``pb_simple.py`` Minimal publish/subscribe test. A remote unit can turn the +Pyboard green LED on and off and can display regular publications from the +host. +``pb_status.py`` Demonstrates interception of status messages. +``pbmqtt_test.py`` Tests coroutines which must shut down on ESP8266 reboot. +Also demonstrates the ramcheck facility. + +## 2.2 Quick start guide + +Ensure you have a working MQTT broker on a known IP address, and that you have +PC client software. This document assumes mosquitto as broker, mosquitto_pub +and mosquitto_sub as clients. For test purposes it's best to start with the +broker on the local network. Clients may be run on any connected PC. + +Modify ``net_local.py`` to match your MQTT broker address, WiFi SSID and +password. + +Copy the above dependencies to the Pyboard. Install the supplied firmware to +the ESP8266 (section 3.1). Copy ``pb_simple.py`` to the Pyboard and run it. +Assuming the broker is on 192.168.0.9, on a PC run: + +mosquitto_sub -h 192.168.0.9 -t result + +The test program publishes an incrementing count every 10 seconds under the +"result" topic. It subscribes to a topic "green" and responds to messages "on" +or "off" to control the state of the Pyboard green LED. To test this run + +mosquitto_pub -h 192.168.0.9 -t green -m on +mosquitto_pub -h 192.168.0.9 -t green -m off + +## 2.3 The MQTTlink class + +### 2.3.1 Constructor + +This takes the following positional args: + 1. ``reset`` A ``Signal`` instance associated with the reset output pin. The + test programs instantiate the pin with ``Pin.OPEN_DRAIN`` because some boards + have a capacitor to ground. On a low to high transition a push-pull pin could + cause spikes on the power supply. The truly paranoid might replace the reset + wire with a 100Ω resistor to limit current. + 2. ``sckin`` Initialised input pin. + 3. ``sckout`` Initialised output pin with value 0. + 4. ``srx`` Initialised input pin. + 5. ``stx`` Initialised output pin with value 0. + 6. ``init`` A tuple of initialisation values. See below. + 7. ``user_start=None`` A callback to run when communication link is up. + 8. ``args=()`` Optional args for above. + 9. ``local_time_offset=0`` If the host's RTC is to be synchronised to an NTP + server, this allows an offset to be added. Unit is hours. + 10. ``verbose=True`` Print diagnostic messages. + 11. ``timeout=10`` Duration of ESP8266 watchdog (secs). This limits how long a + socket can block before the ESP8266 is rebooted. If the broker is on a slow + internet connection this should be increased. + +The ``user_start`` callback runs when the link between the boards has +initialised. If the ESP8266 crashes the link times out and the driver resets +the ESP8266. When the link is re-initialised ``user_start`` runs again. Typical +use is to define subscriptions. The callback can launch coroutines but these +should run to completion promptly (less than a few seconds). If such coros run +forever, precautions apply. See section 2.3.5. + +INIT: data to be sent to ESP8266 after a reset. +Init elements. Entries 0-6 are strings, 7-12 are integers: + 0. 'init' + 1. SSID for WiFi. + 2. WiFi password. + 3. Broker IP address. + 4. MQTT user ('' for none). + 5. MQTT password ('' for none). + 6. SSL params (repr of a dictionary) + 7. 1/0 Use default network if possible. If 1, tries to connect to the network + stored on the ESP8266. If this fails, it will connect to the specified + network. If 0, ignores the saved LAN. The specified LAN becomes the new + default. + 8. port: use 0 for default. + 9. 1/0 ssl + 10. 1/0 fast: if 1, clock ESP8266 at 160MHz + 11. RTC resync interval (secs). 0 == disable. If interval > 0 the ESP8266 will + periodically retrieve the time from an NTP server and send the result to the + host, which will adjust its RTC. The local time offset specified to the + constructor will be applied. If interval == -1 synchronisation will occur once + only. + 12. keepalive time (secs) (0 = disable). Sets the broker keepalive time. + +### 2.3.2 Methods + + 1. ``publish`` Args topic (str), message (str), retain (bool), qos (0/1). Puts + publication on a queue and returns immediately. Defaults: retain ``False``, + qos 0. ``publish`` can be called at any time, even if an ESP8266 reboot is in + progress. + 2. ``subscribe`` Args topic (str), callback, qos (0/1). Subscribes to the + topic. The callback will run when a publication is received. The callback args + are the topic and message. It should be called from the ``user_start`` + callback to re-subscribe after an ESP8266 reboot. + 3. ``pubq_len`` No args. Returns the length of the publication queue. + 4. ``rtc_syn`` No args. Returns ``True`` if the RTC has been synchronised to + an NTP time server. + 5. ``status_handler`` Arg: a coroutine. Overrides the default status handler. + 6. ``running`` No args. Returns ``True`` if WiFi and broker are up and system + is running normally. + 7. ``command`` Intended for test/debug. Takes an arbitrary number of + positional args, formats them and sends them to the ESP8266. + +### 2.3.3 Class Method + + 1. ``will`` Args topic (str), msg (str), retain, qos. Set the last will. Must + be called before instantiating the ``MQTTlink``. Defaults: retain ``False``, + qos 0. + +### 2.3.4 Intercepting status values + +The ESP8266 can send numeric status values to the host. These are defined and +documented in ``status_values.py``. The default handler specifies how a network +connection +is performed after a reset. Initially, if the ESP8266 fails to connect to the +default LAN stored in its flash memory, it attempts to connect to the network +specified in INIT. On ESP8266 reboots it avoids flash wear by avoiding the +specified LAN. By default it waits 30 seconds and reboots again. + +The behaviour in response to status messages may be modified by replacing the +default handler with a user supplied coroutine as described in 2.3.2 above; +the test program ``pb_status.py`` illustrates this. + +The driver waits for the handler to terminate, then responds in a way dependent +on the status value. If it was a fatal error the ESP8266 will be rebooted. For +informational values execution will continue. + +The return value from the coroutine is ignored except in response to a +``SPECNET`` message. If it returns 1 the driver will attempt to connect to the +specified network. If it returns 0 it will reboot the ESP8266. + +A typical reason for interception is to handle fatal errors, prompting for user +intervention or reporting and issuing long delays before rebooting. + +### 2.3.5 The ``user_start`` callback + +This callback runs each time the ESP8266 is reset. The callback should return +promptly, and any coroutines launched by it should terminate quickly. If the +callback launches coroutines which run forever beware of the following hazard. + +After an error which causes the ESP8266 to be rebooted the callback runs again +causing coroutines launched by the callback to be re-created. This will cause +the scheduler's task queue to grow. To avoid unconstrained growth such a coro +should be launched on the first run only. + +If this is impossible it should be designed to quit prior to an ESP8266 reboot +(the MicroPython asyncio subset has no way to kill a running coro). For +internal use the MQTTLink has an ``ExitGate`` instance ``exit_gate``. A +continuously running coroutine should use the ``exit_gate`` context manager and +poll ``exit_gate.ending()``. If it returns ``True`` the coro should terminate. +If it needs to pause it should issue + +```python +result = await mqtt_link.exit_gate.sleep(time) +``` + +and quit if result is ``False``. + +See ``pbmqtt_test.py`` and the +[synchronisation primitives docs](https://github.com/peterhinch/micropython-async/blob/master/PRIMITIVES.md). + +### 2.3.6 Notes on behaviour + +The implicit characteristics of radio links mean that WiFi is subject to +outages of arbitrary duration: RF interference may occur, or the unit may move +out of range of the access point. + +In the event of a WiFi outage the ESP8266, running the umqtt library, may +respond in two ways. If a socket is blocking it may block forever; it will also +wait forever if a publication with qos == 1 has taken place and the MQTT +protocol is waiting on a PUBACK response. In this case the serial link will +time out and the ESP8266 will be rebooted. If a socket operation is not in +progress a WIFI_DOWN status will be sent to the Pyboard; if the outage is brief +WIFI_UP will be sent and operation will continue. Outages longer than the link +timeout will result in an ESP8266 reboot. + +An implication of this behaviour is that publications with qos == 1 may not be +delivered if the WiFi fails and causes a reboot while waiting for the PUBACK. +In my view qos == 1 is not strictly achievable with the current umqtt library +over a WiFi link. On a reliable network it is effective. + +The driver aims to keep the link to the broker open at all times. It does this +by sending MQTT pings to the broker: four pings are sent during the keepalive +time. If no response is achieved in this period the broker is presumed to have +failed or timed out. The ``NO_NET`` status is reported (to enable user action) +and the ESP8266 rebooted. + +# 3. The ESP8266 + +To use the precompiled build, follow the instructions in 3.1 below. The +remainder of the ESP8266 documentation is for those wishing to modify the +ESP8266 code. Since the Pyboard and the ESP8266 communicate via GPIO pins the +UART/USB interface are available for debugging. + +# 3.1 Installing the precompiled build + +Erase the flash with +``esptool.py erase_flash`` +Then, from the project directory, issue +``esptool.py --port /dev/ttyUSB0 --baud 115200 write_flash --verify --flash_size=detect -fm dio 0 firmware-combined.bin`` +These args for the reference board may need amending for other hardware. + +# 3.2 Files + +In the precompiled build all modules are implemented as frozen bytecode. For +development purposes you may wish to run mqtt.py (or others) as regular source +files. The precompiled build's modules directory comprises the following: + + 1. The uasyncio library. + 2. The umqtt.robust library. + 3. ``mqtt.py`` Main module. + 4. ``syncom.py`` Bitbanged communications driver. + 5. ``aswitch.py`` Module has a retriggerable delay class. + 6. ``asyn.py`` Synchronisation primitives. + 7. ``status_values.py`` Numeric status codes. + +To conserve space unused drivers are removed from the project's ``modules`` +directory, leaving the following standard files: + + 1. ``ntptime.py`` + 2. ``flashbdev.py`` + 3. ``inisetup.py`` + 4. ``_boot.py`` Optionally replaced by modified version. + +The ``mqtt`` module needs to auto-start after a hard reset. This requires a +``main.py`` file. If the standard ``_boot.py`` is used you will need to create +the file as below and copy it to the filesystem: + +```python +import mqtt +``` + +The modified ``_boot.py`` in this repository removes the need for this step +enabling the firmware image to be flashed to an erased flash chip. After boot +if ``main.py`` does not exist it is created in the filesystem. + +# 3.3 Pinout + +This is defined in mqtt.py (``Channel`` constructor). Pin 15 is used for mckout +because this has an on-board pull down resistor. This ensures that the ESP8266 +clock line is zero while the host asserts Reset: at that time GPIO lines are +high impedance. If the pin lacks a pull down one should be supplied. A value of +10KΩ or thereabouts will suffice. + +# 3.4 Mode of operation + +TODO. diff --git a/_boot.py b/_boot.py new file mode 100644 index 0000000..d99478f --- /dev/null +++ b/_boot.py @@ -0,0 +1,21 @@ +import gc +gc.threshold((gc.mem_free() + gc.mem_alloc()) // 4) +import uos +from flashbdev import bdev + +try: + if bdev: + uos.mount(bdev, '/') +except OSError: + import inisetup + inisetup.setup() + +try: + uos.stat('/main.py') +except OSError: + with open("/main.py", "w") as f: + f.write("""\ +import mqtt +""") + +gc.collect() diff --git a/aswitch.py b/aswitch.py new file mode 100644 index 0000000..d3bef73 --- /dev/null +++ b/aswitch.py @@ -0,0 +1,159 @@ +# aswitch.py Switch and pushbutton classes for asyncio +# Delay_ms A retriggerable delay class. Can schedule a coro on timeout. +# Switch Simple debounced switch class for normally open grounded switch. +# Pushbutton extend the above to support logical state, long press and +# double-click events +# Tested on Pyboard but should run on other microcontroller platforms +# running MicroPython and uasyncio. +# Author: Peter Hinch. +# Copyright Peter Hinch 2017 Released under the MIT license. + +try: + import asyncio_priority as asyncio +except ImportError: + import uasyncio as asyncio +import utime as time +from asyn import launch +# launch: run a callback or initiate a coroutine depending on which is passed. + + +class Delay_ms(object): + def __init__(self, func=None, args=()): + self.func = func + self.args = args + self._running = False + + def stop(self): + self._running = False + + def trigger(self, duration): # Update end time + loop = asyncio.get_event_loop() + self.tstop = time.ticks_add(loop.time(), duration) + if not self._running: + # Start a task which stops the delay after its period has elapsed + loop.create_task(self.killer()) + self._running = True + + def running(self): + return self._running + + async def killer(self): + loop = asyncio.get_event_loop() + twait = time.ticks_diff(self.tstop, loop.time()) + while twait > 0 and self._running: # Return if stop() called during wait + # Must loop here: might be retriggered + await asyncio.sleep_ms(twait) + twait = time.ticks_diff(self.tstop, loop.time()) + if self._running and self.func is not None: + launch(self.func, self.args) # Execute callback + self._running = False + + +class Switch(object): + debounce_ms = 50 + def __init__(self, pin): + self.pin = pin # Should be initialised for input with pullup + self._open_func = False + self._close_func = False + self.switchstate = self.pin.value() # Get initial state + loop = asyncio.get_event_loop() + loop.create_task(self.switchcheck()) # Thread runs forever + + def open_func(self, func, args=()): + self._open_func = func + self._open_args = args + + def close_func(self, func, args=()): + self._close_func = func + self._close_args = args + + # Return current state of switch (0 = pressed) + def __call__(self): + return self.switchstate + + async def switchcheck(self): + loop = asyncio.get_event_loop() + while True: + state = self.pin.value() + if state != self.switchstate: + # State has changed: act on it now. + self.switchstate = state + if state == 0 and self.close_func: + launch(self._close_func, self._close_args) + elif state == 1 and self._open_func: + launch(self._open_func, self._open_args) + # Ignore further state changes until switch has settled + await asyncio.sleep_ms(Switch.debounce_ms) + +class Pushbutton(object): + debounce_ms = 50 + long_press_ms = 1000 + double_click_ms = 400 + def __init__(self, pin): + self.pin = pin # Initialise for input + self._true_func = False + self._false_func = False + self._double_func = False + self._long_func = False + self.sense = pin.value() # Convert from electrical to logical value + self.buttonstate = self.rawstate() # Initial state + loop = asyncio.get_event_loop() + loop.create_task(self.buttoncheck()) # Thread runs forever + + def press_func(self, func, args=()): + self._true_func = func + self._true_args = args + + def release_func(self, func, args=()): + self._false_func = func + self._false_args = args + + def double_func(self, func, args=()): + self._double_func = func + self._double_args = args + + def long_func(self, func, args=()): + self._long_func = func + self._long_args = args + + # Current non-debounced logical button state: True == pressed + def rawstate(self): + return bool(self.pin.value() ^ self.sense) + + # Current debounced state of button (True == pressed) + def __call__(self): + return self.buttonstate + + async def buttoncheck(self): + loop = asyncio.get_event_loop() + if self._long_func: + longdelay = Delay_ms(self._long_func, self._long_args) + if self._double_func: + doubledelay = Delay_ms() + while True: + state = self.rawstate() + # State has changed: act on it now. + if state != self.buttonstate: + self.buttonstate = state + if state: + # Button is pressed + if self._long_func and not longdelay.running(): + # Start long press delay + longdelay.trigger(Pushbutton.long_press_ms) + if self._double_func: + if doubledelay.running(): + launch(self._double_func, self._double_args) + else: + # First click: start doubleclick timer + doubledelay.trigger(Pushbutton.double_click_ms) + if self._true_func: + launch(self._true_func, self._true_args) + else: + # Button release + if self._long_func and longdelay.running(): + # Avoid interpreting a second click as a long push + longdelay.stop() + if self._false_func: + launch(self._false_func, self._false_args) + # Ignore state changes until switch has settled + await asyncio.sleep_ms(Pushbutton.debounce_ms) diff --git a/asyn.py b/asyn.py new file mode 100644 index 0000000..76eb776 --- /dev/null +++ b/asyn.py @@ -0,0 +1,226 @@ +# asyn.py 'micro' synchronisation primitives for uasyncio +# Author: Peter Hinch +# Copyright Peter Hinch 2017 Released under the MIT license +# Test/demo programs asyntest.py, barrier_test.py +# Provides Lock, Event and Barrier classes and launch function +# Now uses low_priority where available and appropriate. + +# CPython 3.5 compatibility +# (ignore RuntimeWarning: coroutine '_g' was never awaited) + +# Check availability of 'priority' version +try: + import asyncio_priority as asyncio + p_version = True +except ImportError: + p_version = False + try: + import uasyncio as asyncio + except ImportError: + import asyncio + +after = asyncio.after if p_version else asyncio.sleep + +async def _g(): + pass +type_coro = type(_g()) + +# If a callback is passed, run it and return. +# If a coro is passed initiate it and return. +# coros are passed by name i.e. not using function call syntax. +def launch(func, tup_args): + res = func(*tup_args) + if isinstance(res, type_coro): + loop = asyncio.get_event_loop() + loop.create_task(res) + + +# To access a lockable resource a coro should issue +# async with lock_instance: +# access the locked resource + +# Alternatively: +# await lock.acquire() +# try: +# do stuff with locked resource +# finally: +# lock.release +# Uses normal scheduling on assumption that locks are held briefly. +class Lock(): + def __init__(self): + self._locked = False + + def locked(self): + return self._locked + + async def __aenter__(self): + await self.acquire() + + async def __aexit__(self, *args): + self.release() + await asyncio.sleep(0) + + async def acquire(self): + while True: + if self._locked: + await asyncio.sleep(0) + else: + self._locked = True + break + + def release(self): + if not self._locked: + raise RuntimeError('Attempt to release a lock which has not been set') + self._locked = False + + +# A coro waiting on an event issues await event +# A coro rasing the event issues event.set() +# When all waiting coros have run +# event.clear() should be issued +# Use of low_priority may be specified in the constructor +# when it will be used if available. +class Event(): + def __init__(self, lp=False): + self.after = after if (p_version and lp) else asyncio.sleep + self.clear() + + def clear(self): + self._flag = False + self._data = None + + def __await__(self): + while not self._flag: + yield from self.after(0) + + __iter__ = __await__ + + def is_set(self): + return self._flag + + def set(self, data=None): + self._flag = True + self._data = data + + def value(self): + return self._data + +# A Barrier synchronises N coros. Each issues await barrier +# execution pauses until all other participant coros are waiting on it. +# At that point the callback is executed. Then the barrier is 'opened' and +# excution of all participants resumes. +# Uses low_priority if available +class Barrier(): + def __init__(self, participants, func=None, args=()): + self._participants = participants + self._func = func + self._args = args + self._reset(True) + + def __await__(self): + self._update() + if self._at_limit(): # All other threads are also at limit + if self._func is not None: + launch(self._func, self._args) + self._reset(not self._down) + return + + direction = self._down + while True: # Wait until last waiting thread changes the direction + if direction != self._down: + return + yield from after(0) + + __iter__ = __await__ + + def _reset(self, down): + self._down = down + self._count = self._participants if down else 0 + + def _at_limit(self): + limit = 0 if self._down else self._participants + return self._count == limit + + def _update(self): + self._count += -1 if self._down else 1 + if self._count < 0 or self._count > self._participants: + raise ValueError('Too many threads accessing Barrier') + +# A Semaphore is typically used to limit the number of coros running a +# particular piece of code at once. The number is defined in the constructor. +class Semaphore(): + def __init__(self, value=1): + self._count = value + + async def __aenter__(self): + await self.acquire() + + async def __aexit__(self, *args): + self.release() + await asyncio.sleep(0) + + async def acquire(self): + while self._count == 0: + await after(0) + self._count -= 1 + + def release(self): + self._count += 1 + +class BoundedSemaphore(Semaphore): + def __init__(self, value=1): + super().__init__(value) + self._initial_value = value + + def release(self): + if self._count < self._initial_value: + self._count += 1 + else: + raise ValueError('Semaphore released more than acquired') + +# uasyncio does not have a mechanism whereby a task can be terminated by another. +# This can cause an issue if a task instantiates other tasks then terminates: +# the child tasks continue to run, which may not be desired. The ExitGate helps +# in managing this. The parent instantiates an ExitGate and makes it available +# to the children. The latter use it as a context manager and can poll the +# ending method to check if it's necessary to terminate. The parent issues +# await exit_gate_instance +# before terminating. Termination will pause until all children have completed. + +class ExitGate(): + def __init__(self, granularity=100): + self._ntasks = 0 + self._ending = False + self._granularity = granularity + + def ending(self): + return self._ending + + def __await__(self): + self._ending = True + while self._ntasks: + yield from asyncio.sleep_ms(self._granularity) + self._ending = False # May want to re-use + + __iter__ = __await__ + + async def __aenter__(self): + self._ntasks += 1 + await asyncio.sleep_ms(0) + + async def __aexit__(self, *args): + self._ntasks -= 1 + await asyncio.sleep_ms(0) + + # Sleep while checking for premature ending. Return True on normal ending, + # False if premature. + async def sleep(self, t): + n, rem = divmod(t * 1000, self._granularity) + for _ in range(n): + if self._ending: + return False + await asyncio.sleep_ms(self._granularity) + if self._ending: + return False + await asyncio.sleep_ms(rem) + return not self._ending diff --git a/firmware-combined.bin b/firmware-combined.bin new file mode 100644 index 0000000..db17891 Binary files /dev/null and b/firmware-combined.bin differ diff --git a/main.py b/main.py new file mode 100644 index 0000000..84b042a --- /dev/null +++ b/main.py @@ -0,0 +1,3 @@ +#import webrepl +#webrepl.start() +import mqtt diff --git a/mqtt.py b/mqtt.py new file mode 100644 index 0000000..8b8d9bf --- /dev/null +++ b/mqtt.py @@ -0,0 +1,247 @@ +# mqtt.py MQTT library for the micropython board using an ESP8266. +# asyncio version + +# Author: Peter Hinch +# Copyright Peter Hinch 2017 Released under the MIT license + +# Accessed via pbmqtt.py on Pyboard + +import gc +import ubinascii +from umqtt.robust import MQTTClient +from machine import Pin, unique_id, freq +import uasyncio as asyncio +gc.collect() +from network import WLAN, STA_IF, AP_IF +gc.collect() +from syncom import SynCom +gc.collect() +import ntptime +gc.collect() +from status_values import * # Numeric status values shared with user code. + +_WIFI_DELAY = 15 # Time (s) to wait for default network + +# Format an arbitrary list of positional args as a comma separated string +def argformat(*a): + return ','.join(['{}' for x in range(len(a))]).format(*a) + +async def heartbeat(): + led = Pin(0, Pin.OUT) + while True: + await asyncio.sleep_ms(500) + led(not led()) + +class Client(MQTTClient): + lw_parms = None + @classmethod + def will(cls, parms): + cls.lw_parms = parms + + def __init__(self, channel, client_id, server, callback, t_resync, + port, user, password, keepalive, ssl, ssl_params): + self.channel = channel + self.t_resync = t_resync + super().__init__(client_id, server, port, user, password, keepalive, ssl, ssl_params) + self.set_callback(callback) + if self.lw_parms is not None: + self.set_last_will(self.lw_parms[0], self.lw_parms[1], + bool(self.lw_parms[2]), int(self.lw_parms[3])) + channel.send(argformat('status', BROKER_CHECK)) + super().connect(clean_session=True) # Throws OSError if broker down. + channel.send(argformat('status', BROKER_OK)) + loop = asyncio.get_event_loop() + loop.create_task(self.message_task()) # Respond to messages from broker + if self.t_resync: + loop.create_task(self.rtc_task()) + self.channel.send(argformat('status', RUNNING)) + + # If t_resync > 0, at intervals get time from timeserver, send time to channel + # If t_resync < 0 synchronise once only. + # If t_rsync == 0 no synch (task never runs) + async def rtc_task(self): + while True: + try: + t = ntptime.time() # secs since Y2K + except OSError: + t = 0 + await asyncio.sleep(30) + if t > 16 * 365 * 24 * 3600: + self.channel.send(argformat('time', t)) + if self.t_resync > 0: + await asyncio.sleep(self.t_resync) + else: + await asyncio.sleep(0) + return + else: + await asyncio.sleep(30) + + async def message_task(self): + while True: + try: + self.check_msg() # Be responsive to subscriptions + except OSError: + pass + await asyncio.sleep_ms(50) + + # Subclassed for ping response. + def wait_msg(self): + res = self.sock.read(1) +# self.sock.settimeout(_SOCKET_TIMEOUT) + self.sock.setblocking(True) # reverted + if res is None: + return None + if res == b"": + raise OSError(-1) + if res == b"\xd0": # PINGRESP + sz = self.sock.read(1)[0] + self.channel.send('pingresp') + assert sz == 0 + return None + op = res[0] + if op & 0xf0 != 0x30: + return op + sz = self._recv_len() + topic_len = self.sock.read(2) + topic_len = (topic_len[0] << 8) | topic_len[1] + topic = self.sock.read(topic_len) + sz -= topic_len + 2 + if op & 6: + pid = self.sock.read(2) + pid = pid[0] << 8 | pid[1] + sz -= 2 + msg = self.sock.read(sz) + self.cb(topic, msg) + if op & 6 == 2: + pkt = bytearray(b"\x40\x02\0\0") + struct.pack_into("!H", pkt, 2, pid) + self.sock.write(pkt) + elif op & 6 == 4: + assert 0 + + +class Channel(SynCom): + def __init__(self): + mtx = Pin(14, Pin.OUT) # Define pins + mckout = Pin(15, Pin.OUT, value=0) # clocks must be initialised to 0 + mrx = Pin(13, Pin.IN) + mckin = Pin(12, Pin.IN) + super().__init__(True, mckin, mckout, mrx, mtx, string_mode = True) + self.cstatus = False # Connection status + self.client = None + + def callback(self, topic, msg): # Triggered by client.message_task + self.send(argformat('subs', topic.decode('UTF8'), msg.decode('UTF8'))) + +# Task runs continuously. Process incoming Pyboard messages. +# Started by main_task() after client instantiated. + async def from_pyboard(self): + client = self.client + while True: + istr = await self.await_obj(100) # wait for string (poll interval 100ms) + s = istr.split(',') + command = s[0] + if command == 'publish': + client.publish(s[1], s[2], bool(s[3]), int(s[4])) + # If qos == 1 only returns once PUBACK received. + self.send(argformat('status', PUBOK)) + elif command == 'subscribe': + client.subscribe(s[1], int(s[2])) + elif command == 'mem': + gc.collect() + gc.threshold(gc.mem_free() // 4 + gc.mem_alloc()) + self.send(argformat('mem', gc.mem_free(), gc.mem_alloc())) + elif command == 'ping': + client.ping() + else: + self.send(argformat('status', UNKNOWN, 'Unknown command:', ostr)) + +# Get or set connected status. Note that sta_if.isconnected() has a latency in +# detecting WiFi down on the order of a second. There doesn't seem to be a +# reliable way to stop the code attemting to access a downed LAN. + def connected(self, val): + if self.cstatus != val: + self.cstatus = val + if val: + self.send(argformat('status', WIFI_UP)) + else: + self.send(argformat('status', WIFI_DOWN)) + return self.cstatus + +# Runs when channel has synchronised. No return: Pyboard resets ESP on fail. +# Get parameters from Pyboard. Process them. Connect. Instantiate client. Start +# from_pyboard() task. Wait forever, updating connected status. + async def main_task(self, _): + got_params = False + + # Await connection parameters (init record) + while not got_params: + istr = await self.await_obj(100) + ilst = istr.split(',') + command = ilst[0] + if command == 'init': + got_params = True + ssid, pw, broker, m_user, m_pw, ssl_params = ilst[1:7] + use_default, port, ssl, fast, t_resync, keepalive = [int(x) for x in ilst[7:]] + m_user = m_user if m_user else None + m_pw = m_pw if m_pw else None + elif command == 'will': + Client.will(ilst[1:5]) + self.send(argformat('status', WILLOK)) + else: + self.send(argformat('status', UNKNOWN, 'Expected init, got: ', istr)) + + # Got parameters + if fast: + freq(160000000) + + # try default LAN if required + sta_if = WLAN(STA_IF) + if use_default: + self.send(argformat('status', DEFNET)) + secs = _WIFI_DELAY + while secs >= 0 and not sta_if.isconnected(): + await asyncio.sleep(1) + secs -= 1 + + # If can't use default, use specified LAN + if not sta_if.isconnected(): + self.send(argformat('status', SPECNET)) + # Pause for confirmation. User may opt to reboot instead. + istr = await self.await_obj(100) + ap = WLAN(AP_IF) # create access-point interface + ap.active(False) # deactivate the interface + sta_if.active(True) + sta_if.connect(ssid, pw) + while not sta_if.isconnected(): + await asyncio.sleep(1) + + # WiFi is up: connect to the broker + await asyncio.sleep(5) # Let WiFi stabilise before connecting + client_id = ubinascii.hexlify(unique_id()) + try: + self.client = Client(self, client_id, broker, self.callback, + t_resync, port, m_user, m_pw, keepalive, ssl, + eval(ssl_params)) + # Sends BROKER_OK and RUNNING + except OSError: + # Cause Pyboard to reboot us when application requires it. + self.send(argformat('status', BROKER_FAIL)) + while True: + await asyncio.sleep(60) # Twiddle our thumbs... + + # Set channel running + loop = asyncio.get_event_loop() + loop.create_task(self.from_pyboard()) + while True: + gc.collect() + gc.threshold(gc.mem_free() // 4 + gc.mem_alloc()) + self.connected(sta_if.isconnected()) # Message Pyboard on WiFi up/down + await asyncio.sleep(1) + +loop = asyncio.get_event_loop() +loop.create_task(heartbeat()) +# Comms channel to Pyboard +channel = Channel() +loop.create_task(channel.start(channel.main_task)) +loop.run_forever() diff --git a/net_local.py b/net_local.py new file mode 100644 index 0000000..4ada8fc --- /dev/null +++ b/net_local.py @@ -0,0 +1,8 @@ +# net_local.py Local network parameters for Pyboard MQTT link +# Edit for your network. See README.md for this structure. + +# Author: Peter Hinch. +# Copyright Peter Hinch 2017 Released under the MIT license. + +INIT = ('init', 'my_SSID', 'my_password', 'broker_ip_address', '', '', repr({}), + 1, 0, 0, 1, 3600, 60) diff --git a/pb_simple.py b/pb_simple.py new file mode 100644 index 0000000..17e16c1 --- /dev/null +++ b/pb_simple.py @@ -0,0 +1,53 @@ +# pb_simple.py Minimal publish/subscribe test program for Pyboard MQTT link + +# Author: Peter Hinch. +# Copyright Peter Hinch 2017 Released under the MIT license. + +# From PC issue (for example) +# Turn the Pyboard green LED on (or off): +# mosquitto_pub -h 192.168.0.9 -t green -m on +# Check the publications from the Pyboard: +# mosquitto_sub -h 192.168.0.9 -t result + +from machine import Pin, Signal +import pyb +import uasyncio as asyncio +from pbmqtt import MQTTlink +from net_local import INIT # Local network details + +green = pyb.LED(2) # Green +qos = 1 # for test all messages have the same qos + +async def publish(mqtt_link, tim): + count = 1 + while True: + mqtt_link.publish('result', str(count), 0, qos) + count += 1 + await asyncio.sleep(tim) + +def cbgreen(command, text): + if text == 'on': + green.on() + elif text == 'off': + green.off() + else: + print('led value must be "on" or "off"') + +def start(mqtt_link): + mqtt_link.subscribe('green', cbgreen, qos) # LED control qos 1 + +def test(): + stx = Pin(Pin.board.Y5, Pin.OUT_PP) # Define pins + sckout = Pin(Pin.board.Y6, Pin.OUT_PP, value = 0) + srx = Pin(Pin.board.Y7, Pin.IN) + sckin = Pin(Pin.board.Y8, Pin.IN) + reset = Pin(Pin.board.Y4, Pin.OPEN_DRAIN) + sig_reset = Signal(reset, invert = True) + + MQTTlink.will('result', 'simple client died') + mqtt_link = MQTTlink(sig_reset, sckin, sckout, srx, stx, INIT, start, local_time_offset = 1) + loop = asyncio.get_event_loop() + loop.create_task(publish(mqtt_link, 10)) # Publish a count every 10 seconds + loop.run_forever() + +test() diff --git a/pb_status.py b/pb_status.py new file mode 100644 index 0000000..eed2b40 --- /dev/null +++ b/pb_status.py @@ -0,0 +1,78 @@ +# pb_status.py Demonstrate status interception for Pyboard MQTT link + +# Author: Peter Hinch. +# Copyright Peter Hinch 2017 Released under the MIT license. + +# From PC issue (for example) +# Turn the Pyboard green LED on (or off): +# mosquitto_pub -h 192.168.0.9 -t green -m on +# Check the publications from the Pyboard: +# mosquitto_sub -h 192.168.0.9 -t result + +from machine import Pin, Signal +import pyb +import uasyncio as asyncio +from pbmqtt import MQTTlink +from net_local import INIT # Local network details +from status_values import * + +green = pyb.LED(2) +blue = pyb.LED(4) +qos = 1 # for test all messages have the same qos +reset_count = 0 + +async def status_handler(mqtt_link, status): + await asyncio.sleep_ms(0) + if status == SPECNET: # Couldn't start default network. + if mqtt_link.first_run: + mqtt_link.first_run = False + return 1 # Try specified network on 1st run only + print('Unable to start network. Pausing for 1 minute before reboot') + await asyncio.sleep(60) + return 0 + if status == BROKER_FAIL: + print('Broker is down. Pausing for 1 minute before reboot') + await asyncio.sleep(60) + return + if status == WIFI_UP: + blue.on() + mqtt_link.publish('result', 'WiFi up', 0, qos) + return + if status == WIFI_DOWN: + blue.off() + return + +async def publish(mqtt_link, tim): + count = 1 + while True: + mqtt_link.publish('result', str(count), 0, qos) + count += 1 + await asyncio.sleep(tim) + +def cbgreen(command, text): + if text == 'on': + green.on() + elif text == 'off': + green.off() + else: + print('led value must be "on" or "off"') + +def start(mqtt_link): + mqtt_link.subscribe('green', cbgreen, qos) # LED control qos 1 + +def test(): + stx = Pin(Pin.board.Y5, Pin.OUT_PP) # Define pins + sckout = Pin(Pin.board.Y6, Pin.OUT_PP, value = 0) + srx = Pin(Pin.board.Y7, Pin.IN) + sckin = Pin(Pin.board.Y8, Pin.IN) + reset = Pin(Pin.board.Y4, Pin.OPEN_DRAIN) + sig_reset = Signal(reset, invert = True) + + MQTTlink.will('result', 'simple client died') + mqtt_link = MQTTlink(sig_reset, sckin, sckout, srx, stx, INIT, start, local_time_offset = 1) + mqtt_link.status_handler(status_handler) # Override the default + loop = asyncio.get_event_loop() + loop.create_task(publish(mqtt_link, 10)) # Publish a count every 10 seconds + loop.run_forever() + +test() diff --git a/pbmqtt.py b/pbmqtt.py new file mode 100644 index 0000000..2eddcd8 --- /dev/null +++ b/pbmqtt.py @@ -0,0 +1,304 @@ +# pbmqtt.py Implement MQTT on Pyboard using an ESP8266 +# The ESP8266 runs mqtt.py on startup. +# Boards are electrically connected as per the README. +# asyncio version +# On fatal error performs hardware reset on ESP8266. + +# Author: Peter Hinch. +# Copyright Peter Hinch 2017 Released under the MIT license. + +# Latency ~500ms tested using echo.py running on PC with broker running on Pi +# (half time for round-trip) + +import uasyncio as asyncio +from utime import localtime, time +import pyb +from syncom import SynCom +from aswitch import Delay_ms +from asyn import ExitGate +from status_values import * # Numeric status values shared with user code. + +# _WIFI_DOWN is bad during initialisation +_BAD_STATUS = (BROKER_FAIL, WIFI_DOWN, UNKNOWN) +_DIRE_STATUS = (BROKER_FAIL, UNKNOWN) # Always fatal + +# Format an arbitrary list of positional args as a comma separated string +def argformat(*a): + return ','.join(['{}' for x in range(len(a))]).format(*a) + +def printtime(): + print('{:02d}:{:02d}:{:02d} '.format(localtime()[3], localtime()[4], localtime()[5]), end='') + +async def heartbeat(): + led = pyb.LED(1) + while True: + await asyncio.sleep_ms(500) + led.toggle() + +# Subclass to handle status changes. In the case of fatal status values the +# ESP8266 will be rebooted on return. You may want to pause for remedial +# action before the reboot. Information statuses can be ignored with rapid +# return. Cases which may require attention: +# SPECNET return 1 to try specified network or 0 to reboot ESP. Code below +# tries specified LAN (if default LAN fails) on first run only to limit +# flash wear. +# BROKER_FAIL Pause for server fix? Return (and so reboot) after delay? +# NO_NET WiFi has timed out. +# Pauses must be implemented with the following to ensure task quits on fail +# if not await self.sleep_while_running(delay_in_secs): +# return + +async def default_status_handler(mqtt_link, status): + await asyncio.sleep_ms(0) + if status == SPECNET: + if mqtt_link.first_run: + mqtt_link.first_run = False + return 1 # By default try specified network on 1st run only + asyncio.sleep(30) # Pause before reboot. + return 0 + +class MQTTlink(object): + lw_topic = None + lw_msg = None + lw_retain = False + lw_qos = 0 + @classmethod + def will(cls, topic, msg, retain=False, qos=0): + cls.lw_topic = topic + cls.lw_msg = msg + cls.lw_retain = retain + cls.lw_qos = qos + status_msgs = ('connected to broker', 'awaiting broker', + 'awaiting default network', 'awaiting specified network', + 'publish OK', 'running', 'unk', 'Will registered', 'Fail to connect to broker', + 'WiFi up', 'WiFi down') + + def __init__(self, reset, sckin, sckout, srx, stx, init, user_start=None, + args=(), local_time_offset=0, verbose=True, timeout=10): + self.user_start = (user_start, args) + self.local_time_offset = local_time_offset + self.init_str = argformat(*init) + self.keepalive = init[12] + self.ping_period = self.keepalive // 4 + self._rtc_syn = False # RTC persists over ESP8266 reboots + self._running = False + self.verbose = verbose + wdog = timeout * 1000 # Blocking timeout for ESP8266 + # SynCom string mode + self.channel = SynCom(False, sckin, sckout, srx, stx, reset, wdog, True, verbose) + loop = asyncio.get_event_loop() + loop.create_task(heartbeat()) + self.exit_gate = ExitGate() + loop.create_task(self.channel.start(self.start, self.exit_gate)) + self.subs = {} # Callbacks indexed by topic + self.pubs = [] + self._pub_free = True # No publication in progress + self.pub_delay = Delay_ms(self.quit) # Wait for qos 1 response + self.s_han = default_status_handler + # Only specify network on first run + self.first_run = True + + +# API + def publish(self, topic, msg, retain=False, qos=0): + self.pubs.append((topic, msg, retain, qos)) + + def pubq_len(self): + return len(self.pubs) + + def subscribe(self, topic, callback, qos=0): + self.subs[topic] = callback + self.channel.send(argformat('subscribe', topic, qos)) + + # Command handled directly by mqtt.py on ESP8266 e.g. 'mem' + def command(self, *argsend): + self.channel.send(argformat(*argsend)) + + # Is Pyboard RTC synchronised? + def rtc_syn(self): + return self._rtc_syn + + def status_handler(self, coro): + self.s_han = coro + + def running(self): + return self._running +# API END + +# Convenience method allows return self.quit() on error + def quit(self, *args): + if args is not (): + self.vbprint(*args) + self._running = False + +# On publication of a qos 1 message this is called with False. This prevents +# further pubs until the response is received, ensuring we wait for the PUBACK +# that corresponds to the last publication. +# So publication starts the pub response timer, which on timeout sets _running +# False +# A puback message calls pub_free(True) which stops the timer. +# IOW if the PUBACK is not received within the keepalive period assume we're +# broken and restart. + def pub_free(self, val = None): + if val is not None: + self._pub_free = val + if val: + self.pub_delay.stop() + else: + self.pub_delay.trigger(self.keepalive * 1000) + return self._pub_free + + + def vbprint(self, *args): + if self.verbose: + print(*args) + + def get_cmd(self, istr): + ilst = istr.split(',') + return ilst[0], ilst[1:] + + def do_status(self, action, last_status): + try: + iact = int(action[0]) + except ValueError: # Gibberish from ESP8266: caller reboots + return UNKNOWN + if iact == PUBOK: + self.pub_free(True) # Unlock so we can do another pub. + elif iact == RUNNING: + self._running = True + if self.verbose: + if iact != UNKNOWN: + if iact != last_status: # Ignore repeats + printtime() + print('Status: ', self.status_msgs[iact]) + else: + printtime() + print('Unknown status: {} {}'.format(action[1], action[2])) + return iact + + def do_time(self, action): + try: + t = int(action[0]) + except ValueError: + self._running = False + return + rtc = pyb.RTC() + tm = localtime(t) + hours = (tm[3] + self.local_time_offset) % 24 + tm = tm[0:3] + (tm[6] + 1,) + (hours,) + tm[4:6] + (0,) + rtc.datetime(tm) + self._rtc_syn = True + self.vbprint('time', localtime()) + + async def _ping(self): + egate = self.exit_gate + async with egate: + while True: + if not await egate.sleep(self.ping_period): + break # self.start() has returned + self.channel.send('ping') + + async def _publish(self): + egate = self.exit_gate + async with egate: + while not egate.ending(): # Until parent task has died + await asyncio.sleep_ms(100) + if len(self.pubs) and self._pub_free: + args = self.pubs.pop(0) + self.channel.send(argformat('publish', *args)) + # if qos == 1 set _pub_free False and start timer. If no response + # received in the keepalive period _running is set False and we + # reboot the ESP8266 + self.pub_free(args[3] == 0) + +# start() is run each time the channel acquires sync i.e. on startup and also +# after an ESP8266 crash and reset. +# Behaviour after fatal error with ESP8266: +# It sets _running False to cause local tasks to terminate, then returns. +# A return causes the local channel instance to wait for fail_delay to let +# tasks in this program terminate, then issues a hardware reset to the ESP8266. +# After acquiring sync, start() gets rescheduled. + async def start(self, channel): + self.vbprint('Starting...') + self.subs = {} # Callbacks indexed by topic + self._pub_free = True # No publication in progress + self._running = False + if self.lw_topic is not None: + channel.send(argformat('will', self.lw_topic, self.lw_msg, self.lw_retain, self.lw_qos)) + res = await channel.await_obj() + if res is None: + await self.s_han(self, ESP_FAIL) + return self.quit('ESP8266 fail 1. Resetting.') + command, action = self.get_cmd(res) + if command == 'status': + iact = self.do_status(action, -1) + await self.s_han(self, iact) + if iact in _BAD_STATUS: + return self.quit('Bad status. Resetting.') + else: + self.vbprint('Expected will OK got: ', command, action) + channel.send(self.init_str) + while not self._running: # Until RUNNING status received + res = await channel.await_obj() + if res is None: + await self.s_han(self, ESP_FAIL) + return self.quit('ESP8266 fail 2. Resetting.') + command, action = self.get_cmd(res) + if command == 'status': + iact = self.do_status(action, -1) + result = await self.s_han(self, iact) + if iact == SPECNET: + if result == 1: + channel.send('1') # Any string will do + else: + return self.quit() + if iact in _BAD_STATUS: + return self.quit('Bad status. Resetting.') + elif command == 'time': + self.do_time(action) + else: + self.vbprint('Init got: ', command, action) + + self.vbprint('About to run user program.') + if self.user_start is not None: + self.user_start[0](self, *self.user_start[1]) # User start function + + loop = asyncio.get_event_loop() + loop.create_task(self._ping()) + loop.create_task(self._publish()) + lastping = time() # Prevent premature fail + iact = -1 # Invalidate last status for change detection + while True: # print incoming messages, handle subscriptions + res = await channel.await_obj() + if res is None: + await self.s_han(self, ESP_FAIL) + return self.quit('ESP8266 fail 3. Resetting.') # ESP8266 fail + + command, action = self.get_cmd(res) + if command == 'subs': # It's a subscription + if action[0] in self.subs: # topic found + self.subs[action[0]](*action) # Run the callback + elif command == 'status': # 1st arg of status is an integer + iact = self.do_status(action, iact) # Update pub q and wifi status + await self.s_han(self, iact) + if iact in _DIRE_STATUS: # Tolerate brief WiFi outages: let channel timeout + return self.quit('Fatal status. Resetting.') # or ping response handle it. + elif command == 'time': + self.do_time(action) + lastping = time() # RTC clock may have changed. In any event, LAN is up. + elif command == 'mem': # Wouldn't ask for this if we didn't want a printout + print('ESP8266 RAM free: {} allocated: {}'.format(action[0], action[1])) + elif command == 'pingresp': + lastping = time() + else: + await self.s_han(self, UNKNOWN) + return self.quit('Got unhandled command, resetting ESP8266:', command, action) # ESP8266 has failed + + if time() - lastping > self.keepalive: + await self.s_han(self, NO_NET) + return self.quit('Ping response timeout, resetting ESP8266') + + if not self._running: # puback not received? + await self.s_han(self, NO_NET) + return self.quit('Not running, resetting ESP8266') + diff --git a/pbmqtt_test.py b/pbmqtt_test.py new file mode 100644 index 0000000..4155fa6 --- /dev/null +++ b/pbmqtt_test.py @@ -0,0 +1,84 @@ +# pbmqtt_test.py TEST PROGRAM for Pyboard MQTT link +# This tests the ramcheck facility and the mechanism for launching continuously +# running coroutines from the user_start program. Note this is not normally +# recommended. + +# Author: Peter Hinch. +# Copyright Peter Hinch 2017 Released under the MIT license. + +# From PC issue (for example) +# mosquitto_pub -h 192.168.0.9 -t green -m on +# mosquitto_sub -h 192.168.0.9 -t result + +from machine import Pin, Signal +import pyb +import uasyncio as asyncio +from pbmqtt import MQTTlink +from net_local import INIT # Local network details + +green = pyb.LED(2) # Green +qos = 1 # for test all messages have the same qos +reset_count = 0 + +# User tasks. Must terminate as soon as link stops running +async def ramcheck(mqtt_link): + egate = mqtt_link.exit_gate + async with egate: + while True: + mqtt_link.command('mem') + if not await egate.sleep(1800): + break + +async def publish(mqtt_link, tim): + count = 1 + egate = mqtt_link.exit_gate + async with egate: + while True: + mqtt_link.publish('result', '{} {}'.format(count, reset_count), 0, qos) + count += 1 + if not await egate.sleep(tim): + break + +async def pulse_blue(): + global reset_count + reset_count += 1 + blue = pyb.LED(4) + blue.on() + await asyncio.sleep(3) + blue.off() + +def cbgreen(command, text): + if text == 'on': + green.on() + elif text == 'off': + green.off() + else: + print('led value must be "on" or "off"') + +# start() is run once communication with the broker has been established and before +# the MQTTlink main loop commences. User tasks should be added here: this ensures +# that they will be restarted if the ESP8266 times out. +# User tasks which run forever must quit on a failure. This is done by waiting on +# the mqtt_link's ExitGate's sleep method, quitting if it returns False + +def start(mqtt_link): + mqtt_link.subscribe('green', cbgreen, qos) # LED control qos 1 + loop = asyncio.get_event_loop() + loop.create_task(ramcheck(mqtt_link)) # Check RAM every 30 minutes + loop.create_task(publish(mqtt_link, 10)) # Publish a count every 10 seconds + loop.create_task(pulse_blue()) # Flash blue LED each time we restart ESP8266 + +def test(): + stx = Pin(Pin.board.Y5, Pin.OUT_PP) # Define pins + sckout = Pin(Pin.board.Y6, Pin.OUT_PP, value = 0) + srx = Pin(Pin.board.Y7, Pin.IN) + sckin = Pin(Pin.board.Y8, Pin.IN) + reset = Pin(Pin.board.Y4, Pin.OPEN_DRAIN) + sig_reset = Signal(reset, invert = True) + + MQTTlink.will('result', 'client died') + mqtt_link = MQTTlink(sig_reset, sckin, sckout, srx, stx, INIT, start, local_time_offset = 1) + loop = asyncio.get_event_loop() + loop.run_forever() + +test() diff --git a/pubtest b/pubtest new file mode 100755 index 0000000..4ffc85c --- /dev/null +++ b/pubtest @@ -0,0 +1,9 @@ +#! /bin/bash + +while : +do + mosquitto_pub -h 192.168.0.9 -t green -m on + sleep 5 + mosquitto_pub -h 192.168.0.9 -t green -m off + sleep 5 +done diff --git a/status_values.py b/status_values.py new file mode 100644 index 0000000..abe6cb6 --- /dev/null +++ b/status_values.py @@ -0,0 +1,35 @@ +# Author: Peter Hinch. +# Copyright Peter Hinch 2017 Released under the MIT license. +# status values for Pyboard MQTT via ESP8266 + +# ESP8266 has connected to the broker. +BROKER_OK = 0 +# ESP8266 is about to connect to the broker. +BROKER_CHECK = 1 +# ESP8266 is about to connect to the default network. +DEFNET = 2 +# ESP8266 is about to connect to LAN specified in INIT. +SPECNET = 3 +# ESP8266 has completed a publication. +PUBOK = 4 +# ESP8266 initialisation is complete. +RUNNING = 5 +# Fatal. ESP8266 has received an unknown command from host. +UNKNOWN = 6 +# ESP8266 has executed a will command. +WILLOK = 7 +# Fatal. ESP8266 failed to connect to the broker. +BROKER_FAIL = 8 +# ESP8266 reports WiFi up. +WIFI_UP = 9 +# ESP8266 reports WiFi down. +# Fatal only during initialisation i.e. when running() returns False. +WIFI_DOWN = 10 + +# Local status values + +# Fatal. ESP8266 has crashed. +ESP_FAIL = 11 +# Fatal. keepalive period has elapsed with ESP8266 unable to get a ping +# response from broker. Or it elapsed with no PUBACK from a qos == 1 publish. +NO_NET = 12 \ No newline at end of file diff --git a/syncom.py b/syncom.py new file mode 100644 index 0000000..1d101e2 --- /dev/null +++ b/syncom.py @@ -0,0 +1,236 @@ +# syncom.py Synchronous communication channel between two MicroPython +# platforms. 4 June 2017 +# Uses uasyncio. + +# The MIT License (MIT) +# +# Copyright (c) 2017 Peter Hinch +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +# Timing: was 4.5mS per char between Pyboard and ESP8266 i.e. ~1.55Kbps. But +# this version didn't yield on every bit, invalidating t/o detection. +# New asyncio version yields on every bit. +# Instantaneous bit rate running ESP8266 at 160MHz: 1.6Kbps +# Mean throughput running test programs 8.8ms per char (800bps). + +from utime import ticks_diff, ticks_ms +import uasyncio as asyncio +from micropython import const + +_BITS_PER_CH = const(7) +_BITS_SYN = const(8) +_SYN = const(0x9d) + +class SynComError(Exception): + pass + +class SynCom(object): + def __init__(self, passive, ckin, ckout, din, dout, sig_reset=None, + timeout=0, string_mode=False, verbose=True): + self.passive = passive + self.string_mode = string_mode + if not string_mode: + global pickle + import pickle + self._running = False # _run coro is down + self._synchronised = False + self.verbose = verbose + if verbose: + self.idstr = 'passive' if self.passive else 'initiator' + + self.ckin = ckin # Interface pins + self.ckout = ckout + self.din = din + self.dout = dout + self.sig_reset = sig_reset + + self._timeout = timeout # In ms. 0 == No timeout. + self.lsttx = [] # Queue of strings to send + self.lstrx = [] # Queue of received strings + +# Start interface and initiate an optional user task. If a timeout and reset +# signal are specified and the target times out, the target is reset and the +# interface restarted. If a user task is provided, this must return if a +# timeout occurs (i.e. not running() or await_obj returns None). +# If it returns for other (error) reasons, a timeout event is forced. + async def start(self, user_task=None, awaitable=None): + loop = asyncio.get_event_loop() + while True: + if not self._running: # Restarting + self.lstrx = [] # Clear down queues + self.lsttx = [] + self._synchronised = False + loop.create_task(self._run()) # Reset target (if possible) + while not self._synchronised: # Wait for sync + await asyncio.sleep_ms(100) + if user_task is None: + while self._running: + await asyncio.sleep_ms(100) + else: + await user_task(self) # User task must quit on timeout + # If it quit for other reasons force a t/o exception + self.stop() + await asyncio.sleep_ms(0) + if awaitable is not None: # User code may use an ExitGate + await awaitable # to ensure all coros have quit + +# Can be used to force a failure + def stop(self): + self._running = False + self.dout(0) + self.ckout(0) + +# Queue an object for tx. Convert to string NOW: snapshot of current +# object state + def send(self, obj): + if self.string_mode: + self.lsttx.append(obj) + else: + self.lsttx.append(pickle.dumps(obj)) + + def any(self): + return len(self.lstrx) + +# Wait for an object. Return None on timeout. +# If in string mode returns a string (or None on t/o) + async def await_obj(self, t_ms=10): + while self._running: + await asyncio.sleep_ms(t_ms) + if len(self.lstrx): + return self.lstrx.pop(0) + +# running() is False if the target has timed out. + def running(self): + return self._running + +# Private methods + def _vbprint(self, *args): + if self.verbose: + print(*args) + + async def _run(self): + self.indata = 0 # Current data bits + self.inbits = 0 + self.odata = _SYN + self.phase = 0 # Interface initial conditions + if self.passive: + self.dout(0) + self.ckout(0) + else: + self.dout(self.odata & 1) + self.ckout(1) + self.odata >>= 1 # we've sent that bit + self.phase = 1 + if self.sig_reset is not None: + self._vbprint(self.idstr, ' resetting target...') + self.sig_reset.on() + await asyncio.sleep_ms(100) + self.sig_reset.off() + await asyncio.sleep(1) # let target settle down + + self._vbprint(self.idstr, ' awaiting sync...') + try: + self._running = True # False on failure: can be cleared by other tasks + while self.indata != _SYN: # Don't hog CPU while waiting for start + await self._synchronise() + self._synchronised = True + self._vbprint(self.idstr, ' synchronised.') + + sendstr = '' # string for transmission + send_idx = None # character index. None: no current string + getstr = '' # receive string + while True: + if send_idx is None: + if len(self.lsttx): + sendstr = self.lsttx.pop(0) # oldest first + send_idx = 0 + if send_idx is not None: + if send_idx < len(sendstr): + self.odata = ord(sendstr[send_idx]) + send_idx += 1 + else: + send_idx = None + if send_idx is None: # send zeros when nothing to send + self.odata = 0 + if self.passive: + await self._get_byte_passive() + else: + await self._get_byte_active() + if self.indata: + getstr = ''.join((getstr, chr(self.indata))) + else: # Got 0: + if len(getstr): # if there's a string, it's complete + if self.string_mode: + self.lstrx.append(getstr) + else: + try: + self.lstrx.append(pickle.loads(getstr)) + except: # Pickle fail means target has crashed + raise SynComError + getstr = '' + + except SynComError: + if self._running: + self._vbprint('SynCom Timeout.') + else: + self._vbprint('SynCom was stopped.') + finally: + self.stop() + + async def _get_byte_active(self): + inbits = 0 + for _ in range(_BITS_PER_CH): + inbits = await self._get_bit(inbits) # LSB first + self.indata = inbits + + async def _get_byte_passive(self): + self.indata = await self._get_bit(self.inbits) # MSB is outstanding + inbits = 0 + for _ in range(_BITS_PER_CH - 1): + inbits = await self._get_bit(inbits) + self.inbits = inbits + + async def _synchronise(self): # wait for clock + t = ticks_ms() + while self.ckin() == self.phase ^ self.passive ^ 1: + # Other tasks can clear self._running by calling stop() + if (self._timeout and ticks_diff(ticks_ms(), t) > self._timeout) or not self._running: + raise SynComError + await asyncio.sleep_ms(0) + self.indata = (self.indata | (self.din() << _BITS_SYN)) >> 1 + odata = self.odata + self.dout(odata & 1) + self.odata = odata >> 1 + self.phase ^= 1 + self.ckout(self.phase) # set clock + + async def _get_bit(self, dest): + t = ticks_ms() + while self.ckin() == self.phase ^ self.passive ^ 1: + if (self._timeout and ticks_diff(ticks_ms(), t) > self._timeout) or not self._running: + raise SynComError + yield # Faster than await asyncio.sleep_ms() + dest = (dest | (self.din() << _BITS_PER_CH)) >> 1 + obyte = self.odata + self.dout(obyte & 1) + self.odata = obyte >> 1 + self.phase ^= 1 + self.ckout(self.phase) + return dest