Skip to content

Commit

Permalink
Added chunk_size argument + refactored test + improved docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert-DEMCON authored and bilderbuchi committed Mar 6, 2023
1 parent 90516c2 commit 8bf3c29
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 78 deletions.
12 changes: 7 additions & 5 deletions pymeasure/adapters/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,14 @@ def _read_bytes(self, count, break_on_termchar, **kwargs):
# For -1 we empty the buffer completely
return self._read_bytes_until_timeout()

def _read_bytes_until_timeout(self, **kwargs):
"""Read from the serial until a timeout occurs, regardless of how many bytes."""
# `Serial.readlines()` sounds appropriate instead but it turns out it has an
# unpredictable timeout
def _read_bytes_until_timeout(self, chunk_size=256, **kwargs):
"""Read from the serial until a timeout occurs, regardless of the number of bytes.
:chunk_size: The number of bytes attempted to in a single transaction.
Multiple of these transactions will occur.
"""
# `Serial.readlines()` has an unpredictable timeout, see PR #866
data = bytes()
chunk_size = 256
while True:
chunk = self.connection.read(chunk_size, **kwargs)
data += chunk
Expand Down
134 changes: 61 additions & 73 deletions tests/adapters/test_serial_with_loopback.py
Original file line number Diff line number Diff line change
@@ -1,80 +1,68 @@
#
# Test the serial adapter using looped COM ports.
# This looping needs to be done outside this script. One suggestion is
# using the application `com0com`.
#

import pytest

from serial import Serial
from time import time
from pymeasure.adapters import SerialAdapter


ADAPTER_TIMEOUT = 3.0 # Seconds


@pytest.fixture()
def adapter(connected_device_address):
"""Call with `--device-address="COM3,COM4"` to select two devices."""
device = connected_device_address.split(",")[0]
return SerialAdapter(
device,
baudrate=19200,
timeout=ADAPTER_TIMEOUT,
read_termination=chr(0x0F),
)


@pytest.fixture()
def loopback(connected_device_address):
"""See `adapter()`."""
device = connected_device_address.split(",")[1]
return Serial(device, baudrate=19200)


def test_read(adapter, loopback):
"""Regular read with fixed number of bytes."""
loopback.write(b"abc")
result = adapter.read_bytes(3)

assert len(result) == 3


def test_read_all_short(adapter, loopback):
"""Read with unlimited number of bytes"""
loopback.write(b"a")
start = time()
result = adapter.read_bytes(-1)
elapsed = time() - start

assert len(result) == 1
assert ADAPTER_TIMEOUT == pytest.approx(elapsed, abs=0.1)


def test_read_all(adapter, loopback):
loopback.write(b"aaaaabbbbbccccceeeee" * 50)
start = time()
result = adapter.read_bytes(-1)
elapsed = time() - start

assert len(result) == 20 * 50
assert ADAPTER_TIMEOUT == pytest.approx(elapsed, abs=0.1)


def test_read_all_exact_chunck_size(adapter, loopback):
loopback.write(b"a" * 256)
start = time()
result = adapter.read_bytes(-1)
elapsed = time() - start

assert len(result) == 256
assert ADAPTER_TIMEOUT == pytest.approx(elapsed, abs=0.1)


def test_read_all_with_newline(adapter, loopback):
data = b"aaaaabbbbb\nccccceeeee" * 50
loopback.write(data)
result = adapter.read_bytes(-1)

assert len(result) == 21 * 50
class TestSerialLoopback:
"""Test serial adapter with two real but looped COM ports.
A pair of COM ports that are looped will read the written data from the other.
This looping needs to be done outside this script. One suggestion is using the application
`com0com <https://com0com.sourceforge.net/>`_.
Alternatively two UART adapters could be used, that have their RX/TX physically connected.
Call PyTest with the argument``--device-address="COM1,COM2"`` to specify the serial addresses,
minding the comma for the separation of the two ports.
"""

ADAPTER_TIMEOUT = 1.0 # Seconds

@pytest.fixture()
def adapter(self, connected_device_address):
device = connected_device_address.split(",")[0]
return SerialAdapter(
device,
baudrate=19200,
timeout=self.ADAPTER_TIMEOUT,
read_termination=chr(0x0F),
)

@pytest.fixture()
def loopback(self, connected_device_address):
"""See `adapter()`."""
device = connected_device_address.split(",")[1]
return Serial(device, baudrate=19200)

def test_read(self, adapter, loopback):
"""Regular read with fixed number of bytes."""
loopback.write(b"abc")
result = adapter.read_bytes(3)

assert len(result) == 3

@pytest.mark.parametrize("data", [
b"a", # Short data
b"aaaaabbbbbccccceeeee" * 50, # A lot data
b"a" * 256, # Exactly one chunk size
b"aaaaabbbbb\nccccceeeee" * 50, # With a newline (should be ignored)
])
def test_read_all(self, adapter, loopback, data):
"""Read with undefined number of bytes - also confirm timeout duration is correct."""
loopback.write(data)
start = time()
result = adapter.read_bytes(-1)
elapsed = time() - start

assert result == data
assert self.ADAPTER_TIMEOUT == pytest.approx(elapsed, abs=0.1)

@pytest.mark.parametrize("chunk", [1, 256, 10000])
def test_read_varied_chunk_size(self, adapter, loopback, chunk):
"""Read with undefined number of bytes with non-default chunk size."""
data = b"abcde" * 10
loopback.write(data)
result = adapter.read_bytes(-1)

assert result == data

0 comments on commit 8bf3c29

Please sign in to comment.