-
-
Notifications
You must be signed in to change notification settings - Fork 7.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
read() & readline() on rp2 port don't work/work as expected #6851
Comments
Also, any() does not return the number of characters in the input buffer, but just a flag, whether character(s) are present. |
I looked in the source a while back (when the source of the port was still raspberry pi private repo) and there was a comment in the any() function in machine_uart.c mentioning that due to a hardware limitation any() only returned a boolean. I can't find that comment in a commit now but if that's the case we can work with it. not as nice as returning chars/bytes waiting but hey ho |
Tracing down the sources i got to the getchar() function in the pico_stdio.c file of the pcio-sdk, which indeed has a preview scope of one character. A common technique. That is a software limitation. I did not look further, whether the hardware has the same limitation. But accessing that would require a change of the code structure in the lib. |
Section 4.2 of https://datasheets.raspberrypi.org/rp2040/rp2040-datasheet.pdf says that each UART has a 32-byte TX FIFO and a 32-byte RX FIFO.
If pico-sdk only has a preview scope of one character, maybe it has turned off the FIFOs as described in sections 4.2.2.4 and 4.2.2.5 ? Although from a quick skim through of the registers (in section 4.2.8) it seems you can only tell if the FIFOs are full or empty, not how many entries they contain? 🤷 (with interrupts being available when the FIFOs cross particular thresholds) |
Some ports like the ESP32 and it's drivers have a software buffer in addition to the UART FIFO, such that the UART FIFO mainly serves to cope with IRQ response time variation. |
Glancing through the code, I noticed a function called uart_is_readable_within_us(), which is like uart_is_readable(), but waits for the given time. That could be used to support the timeout. Since it does an internal comparison between two 32 bit unsigned integers, it may get into a wrap-around freeze. Alternatively there is a 64 bit timer which can be used in a similar function. |
lots of googling and faffing around & I found an example that sort of gives us at least a readline() equivalent (until timeout is implemented and any other improvements to machine_uart.c ) that mostly does what I need. might be of use to you. the readUntil() serves as a good starting point to work from `from machine import Pin, UART def readUntil(uartObject, termination): print(uart.write("mj mjver\r\n")) |
The same using class looks a little bit more neat, including a timeout option and replacing readline. from machine import UART, Pin
class myUART(UART):
from time import sleep_us, ticks_diff, ticks_ms
def readUntil(self, termination, timeout=None):
result = ''
start = self.ticks_ms()
while timeout is None or self.ticks_diff(self.ticks_ms(), start) < timeout:
if self.any():
byte = self.read(1)
result = result + chr(byte[0])
if chr(byte[0]) == termination:
break
self.sleep_us(10)
return result
def readline(self, timeout=None):
return self.readUntil('\n', timeout)
uart = myUART(1, baudrate=115200, tx=Pin(8), rx=Pin(9), bits=8, parity=None, stop=1)
print(uart.write("mj mjver\r\n"))
print(uart.readUntil('\n'))
print(uart.write("mj apl\r\n"))
print(uart.readline())
print(uart.readUntil('\n', 2000)) |
Or, slightly more compactly: from machine import UART, Pin
from time import sleep_us
class myUART(UART):
def readUntil(self, termination):
result = ''
while True:
if self.any():
result += chr(self.read(1)[0])
if result.endswith(termination):
break
sleep_us(10)
return result
uart = myUART(1, baudrate=115200, tx=Pin(8), rx=Pin(9), bits=8, parity=None, stop=1)
print(uart.write("mj mjver\r\n"))
print(uart.readUntil('\n'))
print(uart.write("mj apl\r\n"))
print(uart.readUntil('\n')) Added bonus: using Could also add an optional keyword-arg to allow from machine import UART, Pin
from time import sleep_us
class myUART(UART):
def readUntil(self, termination, maxlen=-1, includeTermination=True):
result = ''
while maxlen < 0 or len(result) < maxlen:
if self.any():
result += chr(self.read(1)[0])
if result.endswith(termination):
if not includeTermination:
result = result[:-len(termination)]
break
sleep_us(10)
return result
uart = myUART(1, baudrate=115200, tx=Pin(8), rx=Pin(9), bits=8, parity=None, stop=1)
print(uart.write("mj mjver\r\n"))
print(uart.readUntil('\n'))
print(uart.write("mj apl\r\n"))
print(uart.readUntil('\n')) (note that these examples have just been typed in ad-hoc onto GitHub, I've not actually tested them on MicroPython, so apologies if I've made any syntax errors or used Python features that MP doesn't support!) |
Just to keep the initial return types and overloading readline: from machine import UART, Pin
class myUART(UART):
from time import sleep_us, ticks_diff, ticks_ms
def readUntil(self, termination, timeout=None):
result = b''
start = self.ticks_ms()
while timeout is None or self.ticks_diff(self.ticks_ms(), start) < timeout:
if self.any():
byte = self.read(1)
result = result + byte
if byte == termination:
break
self.sleep_us(10)
return result
def readline(self, timeout=None):
return self.readUntil(b'\n', timeout)
uart = myUART(1, baudrate=115200, tx=Pin(8), rx=Pin(9), bits=8, parity=None, stop=1)
print(uart.write("mj mjver\r\n"))
print(uart.readline())
print(uart.write("mj apl\r\n"))
print(uart.readUntil(b'\n'))
print(uart.readline(2000)) |
Typically you would put your own UART extension into a separate module, making it more smooth to use. from machine import UART
from time import sleep_us, ticks_diff, ticks_ms
class myUART(UART):
def readUntil(self, termination, timeout=None):
result = b''
start = ticks_ms()
while timeout is None or ticks_diff(ticks_ms(), start) < timeout:
if self.any():
byte = self.read(1)
result = result + byte
if byte == termination:
break
sleep_us(10)
return result
def readline(self, timeout=None):
return self.readUntil(b'\n', timeout) Test code, which uses it: from machine import Pin
from uart import myUART as UART
uart = UART(1, baudrate=115200, tx=Pin(8), rx=Pin(9), bits=8, parity=None, stop=1)
print(uart.write("mj mjver\r\n"))
print(uart.readline())
print(uart.write("mj apl\r\n"))
print(uart.readUntil(b'\n'))
print(uart.readline(2000)) |
Is it worth moving the |
I hope timeout support will be part of the c++ part (uart.c) of the firmware. I use it in my current project (on an esp32 at the moment but final goal will be the Pico). And I use it in the constructor/init method as per documentation. |
@lurch That could be done. The problem seems to be a bug in MP, where the keyword parameters are checked against the super class, and using the timeout keyword in the derived class gives an error. Things like: class myUART(UART):
def __init__(self, *args, **kwargs):
self.timeout = kwargs.pop("timeout", None)
UART(*args, **kwargs) should work, but do not. Whenever I use timeout=xxx in the constructor, I get an "extra keyword" error. |
OK. So a little bit less elegant it works. And you can extend that to read and readinto with timeout as well. from machine import UART
from time import sleep_us, ticks_diff, ticks_ms
class myUART():
def __init__(self, id, *args, **kwargs):
self.timeout = kwargs.pop("timeout", None)
self.uart = UART(id, *args, **kwargs)
self.read = self.uart.read
self.write = self.uart.write
self.readinto = self.uart.readinto
self.any = self.uart.any
self.sendbreak = self.uart.sendbreak
def readUntil(self, termination):
result = b''
start = ticks_ms()
while self.timeout is None or ticks_diff(ticks_ms(), start) < self.timeout:
if self.uart.any():
byte = self.uart.read(1)
result = result + byte
if byte == termination:
break
sleep_us(10)
return result
def readline(self):
return self.readUntil(b'\n') |
So I made a first version of machine_uart.c with timeout. That works to a certain extend. So Timeout can be set, it will be printed, and when a timeout occurs it will return with a proper error code. The problem is, when .e.g at uart.read() the numbers requested do not match the ones in the FIFO. And that is different to e.g. the esp32 port.
What happens is, that on the RP2, if machine_uart_read() returns less than the initially requested bytes, it will be called again with the remaining number, which will then run into a timeout. On ESP32, the caller is just happy with what it got and returns to the top level. Edit: For uart.read() without a size, the size presented to machine_uart_read() is 256. |
Yeah, http://docs.micropython.org/en/latest/library/machine.UART.html?highlight=uart#machine.UART.read says "If |
The way I've made it for now works, but never raises an error on timeout. Instead it silently returns without data. That seems acceptable so far. See #6870 |
@dpgeorge Is there a good reason for the RP2 micropython UART API working differently to the ESP32 UART API? If not, would it make sense to try and harmonise them? 🤷♂️ |
I think the only (?) difference is that timeout is not implemented, which @robert-hh has done in #6870. |
the UART on the rp2 port mostly works. read([num of bytes]) works mostly as expected although it seems to drop bytes occasionally and "hangs" if you try to read more bytes than currently available however read() with no number of bytes to read set and readline() just sit there indefinitely. I suspect it's to do with the fact that timeout isn't implemented
The text was updated successfully, but these errors were encountered: