diff --git a/pymodbus/client/sync.py b/pymodbus/client/sync.py index 8b0b832b6..471626e11 100644 --- a/pymodbus/client/sync.py +++ b/pymodbus/client/sync.py @@ -239,7 +239,12 @@ def _recv(self, size): """ Reads data from the underlying descriptor :param size: The number of bytes to read - :return: The bytes read + :return: The bytes read if the peer sent a response, or a zero-length + response if no data packets were received from the client at + all. + :raises: ConnectionException if the socket is not initialized, or the + peer either has closed the connection before this method is + invoked or closes it before sending any data before timeout. """ if not self.socket: raise ConnectionException(self.__str__()) @@ -256,9 +261,9 @@ def _recv(self, size): timeout = self.timeout - # If size isn't specified read 1 byte at a time. + # If size isn't specified read up to 4096 bytes at a time. if size is None: - recv_size = 1 + recv_size = 4096 else: recv_size = size @@ -270,6 +275,9 @@ def _recv(self, size): ready = select.select([self.socket], [], [], end - time_) if ready[0]: recv_data = self.socket.recv(recv_size) + if recv_data == b'': + return self._handle_abrupt_socket_close( + size, data, time.time() - time_) data.append(recv_data) data_length += len(recv_data) time_ = time.time() @@ -286,6 +294,35 @@ def _recv(self, size): return b"".join(data) + def _handle_abrupt_socket_close(self, size, data, duration): + """ Handle unexpected socket close by remote end + + Intended to be invoked after determining that the remote end + has unexpectedly closed the connection, to clean up and handle + the situation appropriately. + + :param size: The number of bytes that was attempted to read + :param data: The actual data returned + :param duration: Duration from the read was first attempted + until it was determined that the remote closed the + socket + :return: The more than zero bytes read from the remote end + :raises: ConnectionException If the remote end didn't send any + data at all before closing the connection. + """ + self.close() + readsize = ("read of %s bytes" % size if size + else "unbounded read") + msg = ("%s: Connection unexpectedly closed " + "%.6f seconds into %s" % (self, duration, readsize)) + if data: + result = b"".join(data) + msg += " after returning %s bytes" % len(result) + _logger.warning(msg) + return result + msg += " without response from unit before it closed connection" + raise ConnectionException(msg) + def is_socket_open(self): return True if self.socket is not None else False diff --git a/pymodbus/client/sync_diag.py b/pymodbus/client/sync_diag.py new file mode 100644 index 000000000..90521de97 --- /dev/null +++ b/pymodbus/client/sync_diag.py @@ -0,0 +1,167 @@ +import socket +import logging +import time + +from pymodbus.constants import Defaults +from pymodbus.client.sync import ModbusTcpClient +from pymodbus.transaction import ModbusSocketFramer +from pymodbus.exceptions import ConnectionException + +_logger = logging.getLogger(__name__) + +LOG_MSGS = { + 'conn_msg': 'Connecting to modbus device %s', + 'connfail_msg': 'Connection to (%s, %s) failed: %s', + 'discon_msg': 'Disconnecting from modbus device %s', + 'timelimit_read_msg': + 'Modbus device read took %.4f seconds, ' + 'returned %s bytes in timelimit read', + 'timeout_msg': + 'Modbus device timeout after %.4f seconds, ' + 'returned %s bytes %s', + 'delay_msg': + 'Modbus device read took %.4f seconds, ' + 'returned %s bytes of %s expected', + 'read_msg': + 'Modbus device read took %.4f seconds, ' + 'returned %s bytes of %s expected', + 'unexpected_dc_msg': '%s %s'} + + +class ModbusTcpDiagClient(ModbusTcpClient): + """ + Variant of pymodbus.client.sync.ModbusTcpClient with additional + logging to diagnose network issues. + + The following events are logged: + + +---------+-----------------------------------------------------------------+ + | Level | Events | + +=========+=================================================================+ + | ERROR | Failure to connect to modbus unit; unexpected disconnect by | + | | modbus unit | + +---------+-----------------------------------------------------------------+ + | WARNING | Timeout on normal read; read took longer than warn_delay_limit | + +---------+-----------------------------------------------------------------+ + | INFO | Connection attempt to modbus unit; disconnection from modbus | + | | unit; each time limited read | + +---------+-----------------------------------------------------------------+ + | DEBUG | Normal read with timing information | + +---------+-----------------------------------------------------------------+ + + Reads are differentiated between "normal", which reads a specified number of + bytes, and "time limited", which reads all data for a duration equal to the + timeout period configured for this instance. + """ + + # pylint: disable=no-member + + def __init__(self, host='127.0.0.1', port=Defaults.Port, + framer=ModbusSocketFramer, **kwargs): + """ Initialize a client instance + + The keys of LOG_MSGS can be used in kwargs to customize the messages. + + :param host: The host to connect to (default 127.0.0.1) + :param port: The modbus port to connect to (default 502) + :param source_address: The source address tuple to bind to (default ('', 0)) + :param timeout: The timeout to use for this socket (default Defaults.Timeout) + :param warn_delay_limit: Log reads that take longer than this as warning. + Default True sets it to half of "timeout". None never logs these as + warning, 0 logs everything as warning. + :param framer: The modbus framer to use (default ModbusSocketFramer) + + .. note:: The host argument will accept ipv4 and ipv6 hosts + """ + self.warn_delay_limit = kwargs.get('warn_delay_limit', True) + super().__init__(host, port, framer, **kwargs) + if self.warn_delay_limit is True: + self.warn_delay_limit = self.timeout / 2 + + # Set logging messages, defaulting to LOG_MSGS + for (k, v) in LOG_MSGS.items(): + self.__dict__[k] = kwargs.get(k, v) + + def connect(self): + """ Connect to the modbus tcp server + + :returns: True if connection succeeded, False otherwise + """ + if self.socket: + return True + try: + _logger.info(self.conn_msg, self) + self.socket = socket.create_connection( + (self.host, self.port), + timeout=self.timeout, + source_address=self.source_address) + except socket.error as msg: + _logger.error(self.connfail_msg, self.host, self.port, msg) + self.close() + return self.socket is not None + + def close(self): + """ Closes the underlying socket connection + """ + if self.socket: + _logger.info(self.discon_msg, self) + self.socket.close() + self.socket = None + + def _recv(self, size): + try: + start = time.time() + + result = super()._recv(size) + + delay = time.time() - start + if self.warn_delay_limit is not None and delay >= self.warn_delay_limit: + self._log_delayed_response(len(result), size, delay) + elif not size: + _logger.debug(self.timelimit_read_msg, delay, len(result)) + else: + _logger.debug(self.read_msg, delay, len(result), size) + + return result + except ConnectionException as ex: + # Only log actual network errors, "if not self.socket" then it's a internal code issue + if 'Connection unexpectedly closed' in ex.string: + _logger.error(self.unexpected_dc_msg, self, ex) + raise ex + + def _log_delayed_response(self, result_len, size, delay): + if not size and result_len > 0: + _logger.info(self.timelimit_read_msg, delay, result_len) + elif (result_len == 0 or (size and result_len < size)) and delay >= self.timeout: + read_type = ("of %i expected" % size) if size else "in timelimit read" + _logger.warning(self.timeout_msg, delay, result_len, read_type) + else: + _logger.warning(self.delay_msg, delay, result_len, size) + + def __str__(self): + """ Builds a string representation of the connection + + :returns: The string representation + """ + return "ModbusTcpDiagClient(%s:%s)" % (self.host, self.port) + + +def get_client(): + """ Returns an appropriate client based on logging level + + This will be ModbusTcpDiagClient by default, or the parent class + if the log level is such that the diagnostic client will not log + anything. + + :returns: ModbusTcpClient or a child class thereof + """ + return ModbusTcpDiagClient if _logger.isEnabledFor(logging.ERROR) else ModbusTcpClient + + +# --------------------------------------------------------------------------- # +# Exported symbols +# --------------------------------------------------------------------------- # + +__all__ = [ + "ModbusTcpDiagClient", "get_client" +] diff --git a/pymodbus/transaction.py b/pymodbus/transaction.py index 0da18a607..614da2b1d 100644 --- a/pymodbus/transaction.py +++ b/pymodbus/transaction.py @@ -200,6 +200,7 @@ def execute(self, request): "/Unable to decode response") response = ModbusIOException(last_exception, request.function_code) + self.client.close() if hasattr(self.client, "state"): _logger.debug("Changing transaction state from " "'PROCESSING REPLY' to " @@ -209,6 +210,7 @@ def execute(self, request): return response except ModbusIOException as ex: # Handle decode errors in processIncomingPacket method + self.client.close() _logger.exception(ex) self.client.state = ModbusTransactionState.TRANSACTION_COMPLETE return ex @@ -275,9 +277,10 @@ def _recv(self, expected_response_length, full): read_min = self.client.framer.recvPacket(min_size) if len(read_min) != min_size: + msg_start = "Incomplete message" if read_min else "No response" raise InvalidMessageReceivedException( - "Incomplete message received, expected at least %d bytes " - "(%d received)" % (min_size, len(read_min)) + "%s received, expected at least %d bytes " + "(%d received)" % (msg_start, min_size, len(read_min)) ) if read_min: if isinstance(self.client.framer, ModbusSocketFramer): @@ -312,9 +315,14 @@ def _recv(self, expected_response_length, full): result = read_min + result actual = len(result) if total is not None and actual != total: - _logger.debug("Incomplete message received, " + msg_start = "Incomplete message" if actual else "No response" + _logger.debug("{} received, " "Expected {} bytes Recieved " - "{} bytes !!!!".format(total, actual)) + "{} bytes !!!!".format(msg_start, total, actual)) + elif actual == 0: + # If actual == 0 and total is not None then the above + # should be triggered, so total must be None here + _logger.debug("No response received to unbounded read !!!!") if self.client.state != ModbusTransactionState.PROCESSING_REPLY: _logger.debug("Changing transaction state from " "'WAITING FOR REPLY' to 'PROCESSING REPLY'") diff --git a/test/test_client_sync.py b/test/test_client_sync.py old mode 100644 new mode 100755 index 1014a9382..00543f83f --- a/test/test_client_sync.py +++ b/test/test_client_sync.py @@ -1,5 +1,7 @@ #!/usr/bin/env python import unittest +from itertools import count +from io import StringIO from pymodbus.compat import IS_PYTHON3 if IS_PYTHON3: # Python 3 @@ -17,6 +19,8 @@ from pymodbus.exceptions import ParameterException from pymodbus.transaction import ModbusAsciiFramer, ModbusRtuFramer from pymodbus.transaction import ModbusBinaryFramer +from pymodbus.transaction import ModbusSocketFramer +from pymodbus.utilities import hexlify_packets # ---------------------------------------------------------------------------# @@ -62,8 +66,8 @@ def testBaseModbusClient(self): client = BaseModbusClient(None) client.transaction = None self.assertRaises(NotImplementedException, lambda: client.connect()) - self.assertRaises(NotImplementedException, lambda: client._send(None)) - self.assertRaises(NotImplementedException, lambda: client._recv(None)) + self.assertRaises(NotImplementedException, lambda: client.send(None)) + self.assertRaises(NotImplementedException, lambda: client.recv(None)) self.assertRaises(NotImplementedException, lambda: client.__enter__()) self.assertRaises(NotImplementedException, lambda: client.execute()) self.assertRaises(NotImplementedException, lambda: client.is_socket_open()) @@ -71,6 +75,20 @@ def testBaseModbusClient(self): client.close() client.__exit__(0, 0, 0) + # Test information methods + client.last_frame_end = 2 + client.silent_interval = 2 + self.assertEqual(4, client.idle_time()) + client.last_frame_end = None + self.assertEqual(0, client.idle_time()) + + # Test debug/trace/_dump methods + self.assertEqual(False, client.debug_enabled()) + writable = StringIO() + client.trace(writable) + client._dump([0, 1, 2], None) + self.assertEqual(hexlify_packets([0, 1, 2]), writable.getvalue()) + # a successful execute client.connect = lambda: True client.transaction = Mock(**{'execute.return_value': True}) @@ -133,6 +151,11 @@ def settimeout(self, *a, **kwa): client = ModbusUdpClient() self.assertFalse(client.connect()) + def testUdpClientIsSocketOpen(self): + ''' Test the udp client is_socket_open method''' + client = ModbusUdpClient() + self.assertFalse(client.is_socket_open()) + def testUdpClientSend(self): ''' Test the udp client send method''' client = ModbusUdpClient() @@ -201,6 +224,11 @@ def testTcpClientConnect(self): client = ModbusTcpClient() self.assertFalse(client.connect()) + def testTcpClientIsSocketOpen(self): + ''' Test the tcp client is_socket_open method''' + client = ModbusTcpClient() + self.assertFalse(client.is_socket_open()) + def testTcpClientSend(self): ''' Test the tcp client send method''' client = ModbusTcpClient() @@ -210,11 +238,13 @@ def testTcpClientSend(self): self.assertEqual(0, client._send(None)) self.assertEqual(4, client._send('1234')) + @patch('pymodbus.client.sync.time') @patch('pymodbus.client.sync.select') - def testTcpClientRecv(self, mock_select): + def testTcpClientRecv(self, mock_select, mock_time): ''' Test the tcp client receive method''' mock_select.select.return_value = [True] + mock_time.time.side_effect = count() client = ModbusTcpClient() self.assertRaises(ConnectionException, lambda: client._recv(1024)) @@ -225,7 +255,7 @@ def testTcpClientRecv(self, mock_select): mock_socket = MagicMock() mock_socket.recv.side_effect = iter([b'\x00', b'\x01', b'\x02']) client.socket = mock_socket - client.timeout = 1 + client.timeout = 3 self.assertEqual(b'\x00\x01\x02', client._recv(3)) mock_socket.recv.side_effect = iter([b'\x00', b'\x01', b'\x02']) self.assertEqual(b'\x00\x01', client._recv(2)) @@ -235,7 +265,16 @@ def testTcpClientRecv(self, mock_select): mock_select.select.return_value = [True] self.assertIn(b'\x00', client._recv(None)) - def testSerialClientRpr(self): + mock_socket = MagicMock() + mock_socket.recv.return_value = b'' + client.socket = mock_socket + self.assertRaises(ConnectionException, lambda: client._recv(1024)) + + mock_socket.recv.side_effect = iter([b'\x00', b'\x01', b'\x02', b'']) + client.socket = mock_socket + self.assertEqual(b'\x00\x01\x02', client._recv(1024)) + + def testTcpClientRpr(self): client = ModbusTcpClient() rep = "<{} at {} socket={}, ipaddr={}, port={}, timeout={}>".format( client.__class__.__name__, hex(id(client)), client.socket, @@ -307,19 +346,25 @@ def testTlsClientSend(self): self.assertEqual(0, client._send(None)) self.assertEqual(4, client._send('1234')) - def testTlsClientRecv(self): + @patch('pymodbus.client.sync.time') + def testTlsClientRecv(self, mock_time): ''' Test the tls client receive method''' client = ModbusTlsClient() self.assertRaises(ConnectionException, lambda: client._recv(1024)) + mock_time.time.side_effect = count() + client.socket = mockSocket() self.assertEqual(b'', client._recv(0)) self.assertEqual(b'\x00' * 4, client._recv(4)) + client.timeout = 2 + self.assertIn(b'\x00', client._recv(None)) + mock_socket = MagicMock() mock_socket.recv.side_effect = iter([b'\x00', b'\x01', b'\x02']) client.socket = mock_socket - client.timeout = 1 + client.timeout = 3 self.assertEqual(b'\x00\x01\x02', client._recv(3)) mock_socket.recv.side_effect = iter([b'\x00', b'\x01', b'\x02']) self.assertEqual(b'\x00\x01', client._recv(2)) @@ -354,6 +399,8 @@ def testSyncSerialClientInstantiation(self): ModbusRtuFramer)) self.assertTrue(isinstance(ModbusSerialClient(method='binary').framer, ModbusBinaryFramer)) + self.assertTrue(isinstance(ModbusSerialClient(method='socket').framer, + ModbusSocketFramer)) self.assertRaises(ParameterException, lambda: ModbusSerialClient(method='something')) @@ -384,6 +431,12 @@ def testBasicSyncSerialClient(self, mock_serial): self.assertTrue(client.connect()) client.close() + # rtu connect/disconnect + rtu_client = ModbusSerialClient(method='rtu') + self.assertTrue(rtu_client.connect()) + self.assertEqual(rtu_client.socket.interCharTimeout, rtu_client.inter_char_timeout) + rtu_client.close() + # already closed socket client.socket = False client.close() @@ -402,6 +455,14 @@ def testSerialClientConnect(self): client = ModbusSerialClient() self.assertFalse(client.connect()) + @patch("serial.Serial") + def testSerialClientIsSocketOpen(self, mock_serial): + ''' Test the serial client is_socket_open method''' + client = ModbusSerialClient() + self.assertFalse(client.is_socket_open()) + client.socket = mock_serial + self.assertTrue(client.is_socket_open()) + @patch("serial.Serial") def testSerialClientSend(self, mock_serial): ''' Test the serial client send method''' @@ -444,6 +505,8 @@ def testSerialClientRecv(self): self.assertEqual(b'', client._recv(None)) client.socket.timeout = 0 self.assertEqual(b'', client._recv(0)) + client.timeout = None + self.assertEqual(b'', client._recv(None)) def testSerialClientRepr(self): client = ModbusSerialClient() diff --git a/test/test_client_sync_diag.py b/test/test_client_sync_diag.py new file mode 100755 index 000000000..c32d283e7 --- /dev/null +++ b/test/test_client_sync_diag.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python +import unittest +from itertools import count +from pymodbus.compat import IS_PYTHON3 + +if IS_PYTHON3: # Python 3 + from unittest.mock import patch, Mock, MagicMock +else: # Python 2 + from mock import patch, Mock, MagicMock +import socket + +from pymodbus.client.sync_diag import ModbusTcpDiagClient, get_client +from pymodbus.exceptions import ConnectionException, NotImplementedException +from pymodbus.exceptions import ParameterException +from test.test_client_sync import mockSocket + + +# ---------------------------------------------------------------------------# +# Fixture +# ---------------------------------------------------------------------------# +class SynchronousDiagnosticClientTest(unittest.TestCase): + ''' + This is the unittest for the pymodbus.client.sync_diag module. It is + a copy of parts of the test for the TCP class in the pymodbus.client.sync + module, as it should operate identically and only log some additional + lines. + ''' + + # -----------------------------------------------------------------------# + # Test TCP Diagnostic Client + # -----------------------------------------------------------------------# + + def testSyncTcpDiagClientInstantiation(self): + client = get_client() + self.assertNotEqual(client, None) + + def testBasicSyncTcpDiagClient(self): + ''' Test the basic methods for the tcp sync diag client''' + + # connect/disconnect + client = ModbusTcpDiagClient() + client.socket = mockSocket() + self.assertTrue(client.connect()) + client.close() + + def testTcpDiagClientConnect(self): + ''' Test the tcp sync diag client connection method''' + with patch.object(socket, 'create_connection') as mock_method: + mock_method.return_value = object() + client = ModbusTcpDiagClient() + self.assertTrue(client.connect()) + + with patch.object(socket, 'create_connection') as mock_method: + mock_method.side_effect = socket.error() + client = ModbusTcpDiagClient() + self.assertFalse(client.connect()) + + @patch('pymodbus.client.sync.time') + @patch('pymodbus.client.sync_diag.time') + @patch('pymodbus.client.sync.select') + def testTcpDiagClientRecv(self, mock_select, mock_diag_time, mock_time): + ''' Test the tcp sync diag client receive method''' + + mock_select.select.return_value = [True] + mock_time.time.side_effect = count() + mock_diag_time.time.side_effect = count() + client = ModbusTcpDiagClient() + self.assertRaises(ConnectionException, lambda: client._recv(1024)) + + client.socket = mockSocket() + # Test logging of non-delayed responses + self.assertIn(b'\x00', client._recv(None)) + self.assertEqual(b'\x00', client._recv(1)) + + # Fool diagnostic logger into thinking we're running late, + # test logging of delayed responses + mock_diag_time.time.side_effect = count(step=3) + self.assertEqual(b'', client._recv(0)) + self.assertEqual(b'\x00' * 4, client._recv(4)) + + mock_socket = MagicMock() + mock_socket.recv.side_effect = iter([b'\x00', b'\x01', b'\x02']) + client.socket = mock_socket + client.timeout = 3 + self.assertEqual(b'\x00\x01\x02', client._recv(3)) + mock_socket.recv.side_effect = iter([b'\x00', b'\x01', b'\x02']) + self.assertEqual(b'\x00\x01', client._recv(2)) + mock_select.select.return_value = [False] + self.assertEqual(b'', client._recv(2)) + client.socket = mockSocket() + mock_select.select.return_value = [True] + self.assertIn(b'\x00', client._recv(None)) + + mock_socket = MagicMock() + mock_socket.recv.return_value = b'' + client.socket = mock_socket + self.assertRaises(ConnectionException, lambda: client._recv(1024)) + + mock_socket.recv.side_effect = iter([b'\x00', b'\x01', b'\x02', b'']) + client.socket = mock_socket + self.assertEqual(b'\x00\x01\x02', client._recv(1024)) + + def testTcpDiagClientRpr(self): + client = ModbusTcpDiagClient() + rep = "<{} at {} socket={}, ipaddr={}, port={}, timeout={}>".format( + client.__class__.__name__, hex(id(client)), client.socket, + client.host, client.port, client.timeout + ) + self.assertEqual(repr(client), rep) + + +# ---------------------------------------------------------------------------# +# Main +# ---------------------------------------------------------------------------# +if __name__ == "__main__": + unittest.main() diff --git a/test/test_transaction.py b/test/test_transaction.py old mode 100644 new mode 100755 index a3c469da1..172b8d796 --- a/test/test_transaction.py +++ b/test/test_transaction.py @@ -1,6 +1,14 @@ #!/usr/bin/env python import pytest import unittest +from itertools import count +from pymodbus.compat import IS_PYTHON3 + +if IS_PYTHON3: # Python 3 + from unittest.mock import patch, Mock, MagicMock +else: # Python 2 + from mock import patch, Mock, MagicMock + from binascii import a2b_hex from pymodbus.pdu import * from pymodbus.transaction import * @@ -82,7 +90,10 @@ def testCalculateExceptionLength(self): self.assertEqual(self._tm._calculate_exception_length(), exception_length) - def testExecute(self): + @patch('pymodbus.transaction.time') + def testExecute(self, mock_time): + mock_time.time.side_effect = count() + client = MagicMock() client.framer = self._ascii client.framer._buffer = b'deadbeef' @@ -123,6 +134,12 @@ def testExecute(self): response = tm.execute(request) self.assertIsInstance(response, ModbusIOException) + # wrong handle_local_echo + tm._recv = MagicMock(side_effect=iter([b'abcdef', b'deadbe', b'123456'])) + client.handle_local_echo = True + self.assertEqual(tm.execute(request).message, '[Input/Output] Wrong local echo') + client.handle_local_echo = False + # retry on invalid response tm.retry_on_invalid = True tm._recv = MagicMock(side_effect=iter([b'', b'abcdef', b'deadbe', b'123456'])) @@ -136,6 +153,12 @@ def testExecute(self): client.framer.processIncomingPacket.side_effect = MagicMock(side_effect=ModbusIOException()) self.assertIsInstance(tm.execute(request), ModbusIOException) + # broadcast + request.unit_id = 0 + client.broadcast_enable = True + self.assertEqual(tm.execute(request), + b'Broadcast write sent - no response expected') + # ----------------------------------------------------------------------- # # Dictionary based transaction manager # ----------------------------------------------------------------------- #