-
Notifications
You must be signed in to change notification settings - Fork 1k
Properly process incomplete RTU frames, improve test coverage #608
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -60,7 +60,7 @@ def __init__(self, decoder, client=None): | |
| :param decoder: The decoder factory implementation to use | ||
| """ | ||
| self._buffer = b'' | ||
| self._header = {'uid': 0x00, 'len': 0, 'crc': '0000'} | ||
| self._header = {'uid': 0x00, 'len': 0, 'crc': b'\x00\x00'} | ||
| self._hsize = 0x01 | ||
| self._end = b'\x0d\x0a' | ||
| self._min_frame_size = 4 | ||
|
|
@@ -89,14 +89,9 @@ def checkFrame(self): | |
| self.populateHeader() | ||
| frame_size = self._header['len'] | ||
| data = self._buffer[:frame_size - 2] | ||
| crc = self._buffer[frame_size - 2:frame_size] | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| crc = self._header['crc'] | ||
| crc_val = (byte2int(crc[0]) << 8) + byte2int(crc[1]) | ||
| if checkCRC(data, crc_val): | ||
| return True | ||
| else: | ||
| _logger.debug("CRC invalid, discarding header!!") | ||
| self.resetFrame() | ||
| return False | ||
| return checkCRC(data, crc_val) | ||
| except (IndexError, KeyError, struct.error): | ||
| return False | ||
|
|
||
|
|
@@ -107,13 +102,10 @@ def advanceFrame(self): | |
| it or determined that it contains an error. It also has to reset the | ||
| current frame header handle | ||
| """ | ||
| try: | ||
| self._buffer = self._buffer[self._header['len']:] | ||
| except KeyError: | ||
| # Error response, no header len found | ||
| self.resetFrame() | ||
|
|
||
| self._buffer = self._buffer[self._header['len']:] | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this pull request |
||
| _logger.debug("Frame advanced, resetting header!!") | ||
| self._header = {} | ||
| self._header = {'uid': 0x00, 'len': 0, 'crc': b'\x00\x00'} | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change makes |
||
|
|
||
| def resetFrame(self): | ||
| """ | ||
|
|
@@ -127,7 +119,7 @@ def resetFrame(self): | |
| _logger.debug("Resetting frame - Current Frame in " | ||
| "buffer - {}".format(hexlify_packets(self._buffer))) | ||
| self._buffer = b'' | ||
| self._header = {} | ||
| self._header = {'uid': 0x00, 'len': 0, 'crc': b'\x00\x00'} | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change makes |
||
|
|
||
| def isFrameReady(self): | ||
| """ | ||
|
|
@@ -137,31 +129,38 @@ def isFrameReady(self): | |
|
|
||
| :returns: True if ready, False otherwise | ||
| """ | ||
| if len(self._buffer) > self._hsize: | ||
| if not self._header: | ||
| self.populateHeader() | ||
| if len(self._buffer) <= self._hsize: | ||
| return False | ||
|
|
||
| return self._header and len(self._buffer) >= self._header['len'] | ||
| else: | ||
| try: | ||
| # Frame is ready only if populateHeader() successfully populates crc field which finishes RTU frame | ||
| # Otherwise, if buffer is not yet long enough, populateHeader() raises IndexError | ||
| self.populateHeader() | ||
| except IndexError: | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In case of incomplete frame |
||
| return False | ||
|
|
||
| return True | ||
|
|
||
| def populateHeader(self, data=None): | ||
| """ | ||
| Try to set the headers `uid`, `len` and `crc`. | ||
|
|
||
| This method examines `self._buffer` and writes meta | ||
| information into `self._header`. It calculates only the | ||
| values for headers that are not already in the dictionary. | ||
| information into `self._header`. | ||
|
|
||
| Beware that this method will raise an IndexError if | ||
| `self._buffer` is not yet long enough. | ||
| """ | ||
| data = data if data else self._buffer | ||
| data = data if data is not None else self._buffer | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some tests pass |
||
| self._header['uid'] = byte2int(data[0]) | ||
| func_code = byte2int(data[1]) | ||
| pdu_class = self.decoder.lookupPduClass(func_code) | ||
| size = pdu_class.calculateRtuFrameSize(data) | ||
| self._header['len'] = size | ||
|
|
||
| if len(data) < size: | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Slice operation |
||
| # crc yet not available | ||
| raise IndexError | ||
| self._header['crc'] = data[size - 2:size] | ||
|
|
||
| def addToFrame(self, message): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,7 +8,7 @@ | |
| from pymodbus.exceptions import ModbusIOException | ||
| from pymodbus.compat import IS_PYTHON3 | ||
| if IS_PYTHON3: | ||
| from unittest.mock import Mock | ||
| from unittest.mock import Mock, patch | ||
| else: # Python 2 | ||
| from mock import Mock | ||
|
|
||
|
|
@@ -44,7 +44,7 @@ def test_framer_initialization(framer): | |
| assert framer._start == b':' | ||
| assert framer._end == b"\r\n" | ||
| elif isinstance(framer, ModbusRtuFramer): | ||
| assert framer._header == {'uid': 0x00, 'len': 0, 'crc': '0000'} | ||
| assert framer._header == {'uid': 0x00, 'len': 0, 'crc': b'\x00\x00'} | ||
| assert framer._hsize == 0x01 | ||
| assert framer._end == b'\x0d\x0a' | ||
| assert framer._min_frame_size == 4 | ||
|
|
@@ -64,47 +64,78 @@ def test_decode_data(rtu_framer, data): | |
| assert decoded == expected | ||
|
|
||
|
|
||
| @pytest.mark.parametrize("data", [(b'', False), | ||
| (b'\x02\x01\x01\x00Q\xcc', True)]) | ||
| @pytest.mark.parametrize("data", [ | ||
| (b'', False), | ||
| (b'\x02\x01\x01\x00Q\xcc', True), | ||
| (b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD', True), # valid frame | ||
| (b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAC', False), # invalid frame CRC | ||
| ]) | ||
| def test_check_frame(rtu_framer, data): | ||
| data, expected = data | ||
| rtu_framer._buffer = data | ||
| assert expected == rtu_framer.checkFrame() | ||
|
|
||
|
|
||
| @pytest.mark.parametrize("data", [b'', b'abcd']) | ||
| def test_advance_framer(rtu_framer, data): | ||
| rtu_framer._buffer = data | ||
| @pytest.mark.parametrize("data", [ | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Useless tests (which do not test anything but only improve code coverage) are replaced with new tests. |
||
| (b'', {'uid': 0x00, 'len': 0, 'crc': b'\x00\x00'}, b''), | ||
| (b'abcd', {'uid': 0x00, 'len': 2, 'crc': b'\x00\x00'}, b'cd'), | ||
| (b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x12\x03', # real case, frame size is 11 | ||
| {'uid': 0x00, 'len': 11, 'crc': b'\x00\x00'}, b'\x12\x03'), | ||
| ]) | ||
| def test_rtu_advance_framer(rtu_framer, data): | ||
| before_buf, before_header, after_buf = data | ||
|
|
||
| rtu_framer._buffer = before_buf | ||
| rtu_framer._header = before_header | ||
| rtu_framer.advanceFrame() | ||
| assert rtu_framer._header == {} | ||
| assert rtu_framer._buffer == data | ||
| assert rtu_framer._header == {'uid': 0x00, 'len': 0, 'crc': b'\x00\x00'} | ||
| assert rtu_framer._buffer == after_buf | ||
|
|
||
|
|
||
| @pytest.mark.parametrize("data", [b'', b'abcd']) | ||
| def test_reset_framer(rtu_framer, data): | ||
| def test_rtu_reset_framer(rtu_framer, data): | ||
| rtu_framer._buffer = data | ||
| rtu_framer.resetFrame() | ||
| assert rtu_framer._header == {} | ||
| assert rtu_framer._header == {'uid': 0x00, 'len': 0, 'crc': b'\x00\x00'} | ||
| assert rtu_framer._buffer == b'' | ||
|
|
||
|
|
||
| @pytest.mark.parametrize("data", [ | ||
| (b'', False), | ||
| (b'\x11', False), | ||
| (b'\x11\x03', False), | ||
| (b'\x11\x03\x06', False), | ||
| (b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49', False), | ||
| (b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD', True), | ||
| (b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\xAB\xCD', True) | ||
| (b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\xAB\xCD', True), | ||
| ]) | ||
| def test_is_frame_ready(rtu_framer, data): | ||
| data, expected = data | ||
| rtu_framer._buffer = data | ||
| rtu_framer.advanceFrame() | ||
| # rtu_framer.advanceFrame() | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function is useless here and only adds some complexity to the code. It is removed, and some additional test cases are added to be sure that this function works in other cases |
||
| assert rtu_framer.isFrameReady() == expected | ||
|
|
||
|
|
||
| def test_populate_header(rtu_framer): | ||
| rtu_framer.populateHeader(b'abcd') | ||
| assert rtu_framer._header == {'crc': b'd', 'uid': 97, 'len': 5} | ||
| @pytest.mark.parametrize("data", [ | ||
| b'', | ||
| b'\x11', | ||
| b'\x11\x03', | ||
| b'\x11\x03\x06', | ||
| b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x43', | ||
| ]) | ||
| def test_rtu_populate_header_fail(rtu_framer, data): | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure that |
||
| with pytest.raises(IndexError): | ||
| rtu_framer.populateHeader(data) | ||
|
|
||
|
|
||
| @pytest.mark.parametrize("data", [ | ||
| (b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD', {'crc': b'\x49\xAD', 'uid': 17, 'len': 11}), | ||
| (b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x11\x03', {'crc': b'\x49\xAD', 'uid': 17, 'len': 11}) | ||
| ]) | ||
| def test_rtu_populate_header(rtu_framer, data): | ||
| buffer, expected = data | ||
| rtu_framer.populateHeader(buffer) | ||
| assert rtu_framer._header == expected | ||
|
|
||
|
|
||
| def test_add_to_frame(rtu_framer): | ||
|
|
@@ -126,12 +157,26 @@ def test_populate_result(rtu_framer): | |
| assert result.unit_id == 255 | ||
|
|
||
|
|
||
| @pytest.mark.parametrize('framer', [ascii_framer, rtu_framer, binary_framer]) | ||
| def test_process_incoming_packet(framer): | ||
| def cb(res): | ||
| return res | ||
| # data = b'' | ||
| # framer.processIncomingPacket(data, cb, unit=1, single=False) | ||
| @pytest.mark.parametrize("data", [ | ||
| (b'\x11', 17, False, False), # not complete frame | ||
| (b'\x11\x03', 17, False, False), # not complete frame | ||
| (b'\x11\x03\x06', 17, False, False), # not complete frame | ||
| (b'\x11\x03\x06\xAE\x41\x56\x52\x43', 17, False, False), # not complete frame | ||
| (b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40', 17, False, False), # not complete frame | ||
| (b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49', 17, False, False), # not complete frame | ||
| (b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAC', 17, True, False), # bad crc | ||
| (b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD', 17, False, True), # good frame | ||
| (b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD', 16, True, False), # incorrect unit id | ||
| (b'\x11\x03\x06\xAE\x41\x56\x52\x43\x40\x49\xAD\x11\x03', 17, False, True), # good frame + part of next frame | ||
| ]) | ||
| def test_rtu_process_incoming_packet(rtu_framer, data): | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Improve coverage and ensure that RTU framer properly deals with incomplete frames |
||
| buffer, units, reset_called, process_called = data | ||
|
|
||
| with patch.object(rtu_framer, '_process') as mock_process, \ | ||
| patch.object(rtu_framer, 'resetFrame') as mock_reset: | ||
| rtu_framer.processIncomingPacket(buffer, Mock(), units) | ||
| assert mock_process.call_count == (1 if process_called else 0) | ||
| assert mock_reset.call_count == (1 if reset_called else 0) | ||
|
|
||
|
|
||
| def test_build_packet(rtu_framer): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other parts of the code expect that
crcisbytesobject