Skip to content

Commit

Permalink
auart_hd.py added. Tutorial amended to suit.
Browse files Browse the repository at this point in the history
  • Loading branch information
peterhinch committed Apr 5, 2018
1 parent d7f8b09 commit 61c24c3
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 2 deletions.
3 changes: 1 addition & 2 deletions README.md
Expand Up @@ -2,8 +2,7 @@

This GitHub repository consists of the following parts:
* [A tutorial](./TUTORIAL.md) An introductory tutorial on asynchronous
programming and the use of the uasyncio library is offered. This is a work in
progress, not least because uasyncio is not yet complete.
programming and the use of the uasyncio library is offered.
* [Asynchronous device drivers](./DRIVERS.md). A module providing drivers for
devices such as switches and pushbuttons.
* [Synchronisation primitives](./PRIMITIVES.md). Provides commonly used
Expand Down
27 changes: 27 additions & 0 deletions TUTORIAL.md
Expand Up @@ -123,6 +123,8 @@ and rebuilding.

5.1 [The IORead mechnaism](./TUTORIAL.md#51-the-ioread-mechanism)

5.1.1 [A UART driver example](./TUTORIAL.md#511-a-uart-driver-example)

5.2 [Using a coro to poll hardware](./TUTORIAL.md#52-using-a-coro-to-poll-hardware)

5.3 [Using IORead to poll hardware](./TUTORIAL.md#53-using-ioread-to-poll-hardware)
Expand Down Expand Up @@ -211,6 +213,9 @@ results by accessing Pyboard hardware.
8. `aqtest.py` Demo of uasyncio `Queue` class.
9. `aremote.py` Example device driver for NEC protocol IR remote control.
10. `auart.py` Demo of streaming I/O via a Pyboard UART.
11. `auart_hd.py` Use of the Pyboard UART to communicate with a device using a
half-duplex protocol. Suits devices such as those using the 'AT' modem command
set.

**Test Programs**

Expand Down Expand Up @@ -1033,6 +1038,28 @@ The mechanism works because the device driver (written in C) implements the
following methods: `ioctl`, `read`, `write`, `readline` and `close`. See
section 5.3 for further discussion.

### 5.1.1 A UART driver example

The program `auart_hd.py` illustrates a method of communicating with a half
duplex device such as one responding to the modem 'AT' command set. Half duplex
means that the device never sends unsolicited data: its transmissions are
always in response to a command from the master.

The device is emulated, enabling the test to be run on a Pyboard with two wire
links.

The (highly simplified) emulated device responds to any command by sending four
lines of data with a pause between each, to simulate slow processing.

The master sends a command, but does not know in advance how many lines of data
will be returned. It starts a retriggerable timer, which is retriggered each
time a line is received. When the timer times out it is assumed that the device
has completed transmission, and a list of received lines is returned.

The case of device failure is also demonstrated. This is done by omitting the
transmission before awaiting a response. After the timeout an empty list is
returned. See the code comments for more details.

###### [Contents](./TUTORIAL.md#contents)

## 5.2 Using a coro to poll hardware
Expand Down
106 changes: 106 additions & 0 deletions auart_hd.py
@@ -0,0 +1,106 @@
# auart_hd.py
# Author: Peter Hinch
# Copyright Peter Hinch 2018 Released under the MIT license

# Demo of running a half-duplex protocol to a device. The device never sends
# unsolicited messages. An example is a communications device which responds
# to AT commands.
# The master sends a message to the device, which may respond with one or more
# lines of data. The master assumes that the device has sent all its data when
# a timeout has elapsed.

# In this test a physical device is emulated by the DEVICE class
# To test link X1-X4 and X2-X3

from pyb import UART
import uasyncio as asyncio
import aswitch

# Dummy device waits for any incoming line and responds with 4 lines at 1 second
# intervals.
class DEVICE():
def __init__(self, uart_no = 4):
self.uart = UART(uart_no, 9600)
self.loop = asyncio.get_event_loop()
self.swriter = asyncio.StreamWriter(self.uart, {})
self.sreader = asyncio.StreamReader(self.uart)
loop = asyncio.get_event_loop()
loop.create_task(self._run())

async def _run(self):
responses = ['Line 1', 'Line 2', 'Line 3', 'Goodbye']
while True:
res = await self.sreader.readline()
for response in responses:
await self.swriter.awrite("{}\r\n".format(response))
# Demo the fact that the master tolerates slow response.
await asyncio.sleep_ms(300)

# The master's send_command() method sends a command and waits for a number of
# lines from the device. The end of the process is signified by a timeout, when
# a list of lines is returned. This allows line-by-line processing.
# A special test mode demonstrates the behaviour with a non-responding device. If
# None is passed, no commend is sent. The master waits for a response which never
# arrives and returns an empty list.
class MASTER():
def __init__(self, uart_no = 2, timeout=4000):
self.uart = UART(uart_no, 9600)
self.timeout = timeout
self.loop = asyncio.get_event_loop()
self.swriter = asyncio.StreamWriter(self.uart, {})
self.sreader = asyncio.StreamReader(self.uart)
self.delay = aswitch.Delay_ms()
self.response = []
loop = asyncio.get_event_loop()
loop.create_task(self._recv())

async def _recv(self):
while True:
res = await self.sreader.readline()
self.response.append(res) # Append to list of lines
self.delay.trigger(self.timeout) # Got something, retrigger timer

async def send_command(self, command):
self.response = [] # Discard any pending messages
if command is None:
print('Timeout test.')
else:
await self.swriter.awrite("{}\r\n".format(command))
print('Command sent:', command)
self.delay.trigger(self.timeout) # Re-initialise timer
while self.delay.running():
await asyncio.sleep(1) # Wait for 4s after last msg received
return self.response

async def test():
print('This test takes 10s to complete.')
for cmd in ['Run', None]:
print()
res = await master.send_command(cmd)
# can use b''.join(res) if a single string is required.
if res:
print('Result is:')
for line in res:
print(line.decode('UTF8'), end='')
else:
print('Timed out waiting for result.')

loop = asyncio.get_event_loop()
master = MASTER()
device = DEVICE()
loop.run_until_complete(test())

# Expected output
# >>> import auart_hd
# This test takes 10s to complete.
#
# Command sent: Run
# Result is:
# Line 1
# Line 2
# Line 3
# Goodbye
#
# Timeout test.
# Timed out waiting for result.
# >>>

0 comments on commit 61c24c3

Please sign in to comment.