Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions pymodbus/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,12 @@ def _recv(self, size):
""" Reads data from the underlying descriptor

:param size: The number of bytes to read
:return: The bytes read
:return: The bytes read if the peer sent a response, or a zero-length
response if no data packets were received from the client at
all.
:raises: ConnectionException if the socket is not initialized, or the
peer either has closed the connection before this method is
invoked or closes it before sending any data before timeout.
"""
if not self.socket:
raise ConnectionException(self.__str__())
Expand All @@ -256,9 +261,9 @@ def _recv(self, size):

timeout = self.timeout

# If size isn't specified read 1 byte at a time.
# If size isn't specified read up to 4096 bytes at a time.
if size is None:
recv_size = 1
recv_size = 4096
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value is debatable. This would essentially block the recv till all 4096 bytes are received or timeout. Adding further to delay in reads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not how recv works. From the documentation:

$ pydoc socket.socket.recv | more
Help on method_descriptor in socket.socket:

socket.socket.recv = recv(...)
    recv(buffersize[, flags]) -> data
    
    Receive up to buffersize bytes from the socket.  For the optional flags
    argument, see the Unix manual.  When no data is available, block until
    at least one byte is available or until the remote end is closed.  When
    the remote end is closed and all data is read, return the empty string.

So it never blocks when a single byte is available, regardless of what buffersize is provided. I.e. it's a maximum limit, the minimum limit is always 1 regardless.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thormick That matches the underlying recv man page too:

https://man7.org/linux/man-pages/man2/recv.2.html

If no messages are available at the socket, the receive calls wait for a message to arrive, unless the socket is nonblocking (see fcntl(2)), in which case the value -1 is returned and the external variable errno is set to EAGAIN or EWOULDBLOCK. The receive calls normally return any data available, up to the requested amount, rather than waiting for receipt of the full amount requested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I got this confused with Serial.Apologies for the mistake. Looks good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it's good that you picked on it, it made me gave it a quick look over and I realize now that ModbusTlsClient does the same, and now those two will be inconsistent. I don't have a setup ready for testing modbus TLS, so I'm not sure about touching that part of the code. Not necessarily a problem, though?

else:
recv_size = size

Expand All @@ -270,6 +275,9 @@ def _recv(self, size):
ready = select.select([self.socket], [], [], end - time_)
if ready[0]:
recv_data = self.socket.recv(recv_size)
if recv_data == b'':
return self._handle_abrupt_socket_close(
size, data, time.time() - time_)
data.append(recv_data)
data_length += len(recv_data)
time_ = time.time()
Expand All @@ -286,6 +294,35 @@ def _recv(self, size):

return b"".join(data)

def _handle_abrupt_socket_close(self, size, data, duration):
""" Handle unexpected socket close by remote end

Intended to be invoked after determining that the remote end
has unexpectedly closed the connection, to clean up and handle
the situation appropriately.

:param size: The number of bytes that was attempted to read
:param data: The actual data returned
:param duration: Duration from the read was first attempted
until it was determined that the remote closed the
socket
:return: The more than zero bytes read from the remote end
:raises: ConnectionException If the remote end didn't send any
data at all before closing the connection.
"""
self.close()
readsize = ("read of %s bytes" % size if size
else "unbounded read")
msg = ("%s: Connection unexpectedly closed "
"%.6f seconds into %s" % (self, duration, readsize))
if data:
result = b"".join(data)
msg += " after returning %s bytes" % len(result)
_logger.warning(msg)
return result
msg += " without response from unit before it closed connection"
raise ConnectionException(msg)

def is_socket_open(self):
return True if self.socket is not None else False

Expand Down
167 changes: 167 additions & 0 deletions pymodbus/client/sync_diag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import socket
import logging
import time

from pymodbus.constants import Defaults
from pymodbus.client.sync import ModbusTcpClient
from pymodbus.transaction import ModbusSocketFramer
from pymodbus.exceptions import ConnectionException

_logger = logging.getLogger(__name__)

LOG_MSGS = {
'conn_msg': 'Connecting to modbus device %s',
'connfail_msg': 'Connection to (%s, %s) failed: %s',
'discon_msg': 'Disconnecting from modbus device %s',
'timelimit_read_msg':
'Modbus device read took %.4f seconds, '
'returned %s bytes in timelimit read',
'timeout_msg':
'Modbus device timeout after %.4f seconds, '
'returned %s bytes %s',
'delay_msg':
'Modbus device read took %.4f seconds, '
'returned %s bytes of %s expected',
'read_msg':
'Modbus device read took %.4f seconds, '
'returned %s bytes of %s expected',
'unexpected_dc_msg': '%s %s'}


class ModbusTcpDiagClient(ModbusTcpClient):
"""
Variant of pymodbus.client.sync.ModbusTcpClient with additional
logging to diagnose network issues.

The following events are logged:

+---------+-----------------------------------------------------------------+
| Level | Events |
+=========+=================================================================+
| ERROR | Failure to connect to modbus unit; unexpected disconnect by |
| | modbus unit |
+---------+-----------------------------------------------------------------+
| WARNING | Timeout on normal read; read took longer than warn_delay_limit |
+---------+-----------------------------------------------------------------+
| INFO | Connection attempt to modbus unit; disconnection from modbus |
| | unit; each time limited read |
+---------+-----------------------------------------------------------------+
| DEBUG | Normal read with timing information |
+---------+-----------------------------------------------------------------+

Reads are differentiated between "normal", which reads a specified number of
bytes, and "time limited", which reads all data for a duration equal to the
timeout period configured for this instance.
"""

# pylint: disable=no-member

def __init__(self, host='127.0.0.1', port=Defaults.Port,
framer=ModbusSocketFramer, **kwargs):
""" Initialize a client instance

The keys of LOG_MSGS can be used in kwargs to customize the messages.

:param host: The host to connect to (default 127.0.0.1)
:param port: The modbus port to connect to (default 502)
:param source_address: The source address tuple to bind to (default ('', 0))
:param timeout: The timeout to use for this socket (default Defaults.Timeout)
:param warn_delay_limit: Log reads that take longer than this as warning.
Default True sets it to half of "timeout". None never logs these as
warning, 0 logs everything as warning.
:param framer: The modbus framer to use (default ModbusSocketFramer)

.. note:: The host argument will accept ipv4 and ipv6 hosts
"""
self.warn_delay_limit = kwargs.get('warn_delay_limit', True)
super().__init__(host, port, framer, **kwargs)
if self.warn_delay_limit is True:
self.warn_delay_limit = self.timeout / 2

# Set logging messages, defaulting to LOG_MSGS
for (k, v) in LOG_MSGS.items():
self.__dict__[k] = kwargs.get(k, v)

def connect(self):
""" Connect to the modbus tcp server

:returns: True if connection succeeded, False otherwise
"""
if self.socket:
return True
try:
_logger.info(self.conn_msg, self)
self.socket = socket.create_connection(
(self.host, self.port),
timeout=self.timeout,
source_address=self.source_address)
except socket.error as msg:
_logger.error(self.connfail_msg, self.host, self.port, msg)
self.close()
return self.socket is not None

def close(self):
""" Closes the underlying socket connection
"""
if self.socket:
_logger.info(self.discon_msg, self)
self.socket.close()
self.socket = None

def _recv(self, size):
try:
start = time.time()

result = super()._recv(size)

delay = time.time() - start
if self.warn_delay_limit is not None and delay >= self.warn_delay_limit:
self._log_delayed_response(len(result), size, delay)
elif not size:
_logger.debug(self.timelimit_read_msg, delay, len(result))
else:
_logger.debug(self.read_msg, delay, len(result), size)

return result
except ConnectionException as ex:
# Only log actual network errors, "if not self.socket" then it's a internal code issue
if 'Connection unexpectedly closed' in ex.string:
_logger.error(self.unexpected_dc_msg, self, ex)
raise ex

def _log_delayed_response(self, result_len, size, delay):
if not size and result_len > 0:
_logger.info(self.timelimit_read_msg, delay, result_len)
elif (result_len == 0 or (size and result_len < size)) and delay >= self.timeout:
read_type = ("of %i expected" % size) if size else "in timelimit read"
_logger.warning(self.timeout_msg, delay, result_len, read_type)
else:
_logger.warning(self.delay_msg, delay, result_len, size)

def __str__(self):
""" Builds a string representation of the connection

:returns: The string representation
"""
return "ModbusTcpDiagClient(%s:%s)" % (self.host, self.port)


def get_client():
""" Returns an appropriate client based on logging level

This will be ModbusTcpDiagClient by default, or the parent class
if the log level is such that the diagnostic client will not log
anything.

:returns: ModbusTcpClient or a child class thereof
"""
return ModbusTcpDiagClient if _logger.isEnabledFor(logging.ERROR) else ModbusTcpClient


# --------------------------------------------------------------------------- #
# Exported symbols
# --------------------------------------------------------------------------- #

__all__ = [
"ModbusTcpDiagClient", "get_client"
]
16 changes: 12 additions & 4 deletions pymodbus/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def execute(self, request):
"/Unable to decode response")
response = ModbusIOException(last_exception,
request.function_code)
self.client.close()
if hasattr(self.client, "state"):
_logger.debug("Changing transaction state from "
"'PROCESSING REPLY' to "
Expand All @@ -209,6 +210,7 @@ def execute(self, request):
return response
except ModbusIOException as ex:
# Handle decode errors in processIncomingPacket method
self.client.close()
_logger.exception(ex)
self.client.state = ModbusTransactionState.TRANSACTION_COMPLETE
return ex
Expand Down Expand Up @@ -275,9 +277,10 @@ def _recv(self, expected_response_length, full):

read_min = self.client.framer.recvPacket(min_size)
if len(read_min) != min_size:
msg_start = "Incomplete message" if read_min else "No response"
raise InvalidMessageReceivedException(
"Incomplete message received, expected at least %d bytes "
"(%d received)" % (min_size, len(read_min))
"%s received, expected at least %d bytes "
"(%d received)" % (msg_start, min_size, len(read_min))
)
if read_min:
if isinstance(self.client.framer, ModbusSocketFramer):
Expand Down Expand Up @@ -312,9 +315,14 @@ def _recv(self, expected_response_length, full):
result = read_min + result
actual = len(result)
if total is not None and actual != total:
_logger.debug("Incomplete message received, "
msg_start = "Incomplete message" if actual else "No response"
_logger.debug("{} received, "
"Expected {} bytes Recieved "
"{} bytes !!!!".format(total, actual))
"{} bytes !!!!".format(msg_start, total, actual))
elif actual == 0:
# If actual == 0 and total is not None then the above
# should be triggered, so total must be None here
_logger.debug("No response received to unbounded read !!!!")
if self.client.state != ModbusTransactionState.PROCESSING_REPLY:
_logger.debug("Changing transaction state from "
"'WAITING FOR REPLY' to 'PROCESSING REPLY'")
Expand Down
Loading