-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update, document and test SerialData #180
Changes from 2 commits
25c4c1d
3587039
e984024
3129a01
75aedeb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
import pytest | ||
import serial | ||
|
||
from pocs.utils import error | ||
from pocs.utils import rs232 | ||
from pocs.utils.config import load_config | ||
|
||
|
@@ -11,6 +12,23 @@ | |
from pocs.tests.serial_handlers import protocol_hooked | ||
|
||
|
||
def test_missing_port(): | ||
with pytest.raises(ValueError): | ||
rs232.SerialData() | ||
|
||
|
||
def test_non_existent_device(): | ||
"""Doesn't complain if it can't find the device.""" | ||
port = '/dev/tty12345698765' | ||
ser = rs232.SerialData(port=port) | ||
assert not ser.is_connected | ||
assert port == ser.name | ||
# Can't connect to that device. | ||
with pytest.raises(error.BadSerialConnection): | ||
ser.connect() | ||
assert not ser.is_connected | ||
|
||
|
||
def test_detect_uninstalled_scheme(): | ||
"""If our handlers aren't installed, will detect unknown scheme.""" | ||
with pytest.raises(ValueError): | ||
|
@@ -32,9 +50,16 @@ def test_detect_bogus_scheme(handler): | |
rs232.SerialData(port='bogus://') | ||
|
||
|
||
def test_detect_bogus_scheme(handler): | ||
"""When our handlers are installed, will still detect unknown scheme.""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What are valid schemes? Have we documented these handlers and schemes as they apply to us? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than document all of those (mostly provided by PySerial), I documented what the scheme must be, and then violated that so that the exception will be raised. |
||
with pytest.raises(ValueError): | ||
rs232.SerialData(port='bogus://') | ||
|
||
|
||
def test_basic_no_op(handler): | ||
# Confirm we can create the SerialData object. | ||
ser = rs232.SerialData(port='no_op://', open_delay=0, threaded=False) | ||
ser = rs232.SerialData(port='no_op://', name='a name', open_delay=0) | ||
assert ser.name == 'a name' | ||
|
||
# Peek inside, it should have a NoOpSerial instance as member ser. | ||
assert ser.ser | ||
|
@@ -43,34 +68,51 @@ def test_basic_no_op(handler): | |
# Open is automatically called by SerialData. | ||
assert ser.is_connected | ||
|
||
# no_op handler doesn't do any reading, analogous to /dev/null, which | ||
# never produces any output. | ||
assert '' == ser.read(retry_delay=0.01, retry_limit=2) | ||
assert 0 == ser.write('abcdef') | ||
|
||
# Disconnect from the serial port. | ||
# connect() is idempotent. | ||
ser.connect() | ||
assert ser.is_connected | ||
ser.disconnect() | ||
assert not ser.is_connected | ||
|
||
# Should no longer be able to read or write. | ||
with pytest.raises(AssertionError): | ||
ser.read(retry_delay=0.01, retry_limit=1) | ||
with pytest.raises(AssertionError): | ||
ser.write('a') | ||
# Several passes of reading, writing, disconnecting and connecting. | ||
for _ in range(3): | ||
# no_op handler doesn't do any reading, analogous to /dev/null, which | ||
# never produces any output. | ||
assert '' == ser.read(retry_delay=0.01, retry_limit=2) | ||
assert b'' == ser.read_bytes(size=1) | ||
assert 0 == ser.write('abcdef') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The no_op handler never returns anything, but other handles do report the size written. |
||
ser.reset_input_buffer() | ||
|
||
# Disconnect from the serial port. | ||
assert ser.is_connected | ||
ser.disconnect() | ||
assert not ser.is_connected | ||
|
||
# Should no longer be able to read or write. | ||
with pytest.raises(AssertionError): | ||
ser.read(retry_delay=0.01, retry_limit=1) | ||
with pytest.raises(AssertionError): | ||
ser.read_bytes(size=1) | ||
with pytest.raises(AssertionError): | ||
ser.write('a') | ||
ser.reset_input_buffer() | ||
|
||
# And we should be able to reconnect. | ||
assert not ser.is_connected | ||
ser.connect() | ||
assert ser.is_connected | ||
|
||
|
||
def test_basic_io(handler): | ||
protocol_buffers.ResetBuffers(b'abc\r\n') | ||
ser = rs232.SerialData(port='buffers://', open_delay=0, threaded=False) | ||
ser = rs232.SerialData(port='buffers://', open_delay=0.01, retry_delay=0.01, | ||
retry_limit=2) | ||
|
||
# Peek inside, it should have a BuffersSerial instance as member ser. | ||
assert isinstance(ser.ser, protocol_buffers.BuffersSerial) | ||
|
||
# Can read one line, "abc\r\n", from the read buffer. | ||
assert 'abc\r\n' == ser.read(retry_delay=0.1, retry_limit=10) | ||
# Another read will fail, having exhausted the contents of the read buffer. | ||
assert '' == ser.read(retry_delay=0.01, retry_limit=2) | ||
assert '' == ser.read() | ||
|
||
# Can write to the "device", the handler will accumulate the results. | ||
assert 5 == ser.write('def\r\n') | ||
|
@@ -138,7 +180,7 @@ def write(self, data): | |
|
||
def test_hooked_io(handler): | ||
protocol_hooked.Serial = HookedSerialHandler | ||
ser = rs232.SerialData(port='hooked://', open_delay=0, threaded=False) | ||
ser = rs232.SerialData(port='hooked://', open_delay=0) | ||
|
||
# Peek inside, it should have a PySerial instance as member ser. | ||
assert ser.ser | ||
|
@@ -157,6 +199,8 @@ def test_hooked_io(handler): | |
else: | ||
first_line = line | ||
assert 'message' in line | ||
reading = ser.get_reading() | ||
assert reading[1] == line | ||
|
||
# Can write to the "device" many times. | ||
line = 'abcdefghijklmnop' * 30 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is why I was a little skeptical about creating the
SerialData
object regardless. Theerror.BadSerialConnection
is a good replacement, but it is a big annoying when in use viapocs_shell
, because the error right here will trigger before anything else is set up and tell the user to use a simulator.However, I think that could also be handled in a better manner. I think we could simple remove the
try/except
here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed that (and will send another PR).
I'll create an issue to discuss the issue you raise.