-
Notifications
You must be signed in to change notification settings - Fork 900
/
transaction.py
580 lines (511 loc) · 23.3 KB
/
transaction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
"""Collection of transaction based abstractions."""
__all__ = [
"FifoTransactionManager",
"DictTransactionManager",
"ModbusSocketFramer",
"ModbusTlsFramer",
"ModbusRtuFramer",
"ModbusAsciiFramer",
"ModbusBinaryFramer",
]
# pylint: disable=missing-type-doc
import struct
import time
from contextlib import suppress
from functools import partial
from threading import RLock
from pymodbus.constants import Defaults
from pymodbus.exceptions import (
InvalidMessageReceivedException,
ModbusIOException,
NotImplementedException,
)
from pymodbus.framer.ascii_framer import ModbusAsciiFramer
from pymodbus.framer.binary_framer import ModbusBinaryFramer
from pymodbus.framer.rtu_framer import ModbusRtuFramer
from pymodbus.framer.socket_framer import ModbusSocketFramer
from pymodbus.framer.tls_framer import ModbusTlsFramer
from pymodbus.logging import Log
from pymodbus.utilities import ModbusTransactionState, hexlify_packets
# --------------------------------------------------------------------------- #
# The Global Transaction Manager
# --------------------------------------------------------------------------- #
class ModbusTransactionManager:
"""Implement a transaction for a manager.
The transaction protocol can be represented by the following pseudo code::
count = 0
do
result = send(message)
if (timeout or result == bad)
count++
else break
while (count < 3)
This module helps to abstract this away from the framer and protocol.
"""
def __init__(self, client, **kwargs):
"""Initialize an instance of the ModbusTransactionManager.
:param client: The client socket wrapper
:param retry_on_empty: Should the client retry on empty
:param retries: The number of retries to allow
"""
self.tid = Defaults.TransactionId
self.client = client
self.backoff = kwargs.get("backoff", Defaults.Backoff) or 0.3
self.retry_on_empty = kwargs.get("retry_on_empty", Defaults.RetryOnEmpty)
self.retry_on_invalid = kwargs.get("retry_on_invalid", Defaults.RetryOnInvalid)
self.retries = kwargs.get("retries", Defaults.Retries) or 1
self.reset_socket = kwargs.get("reset_socket", True)
self._transaction_lock = RLock()
self._no_response_devices = []
if client:
self._set_adu_size()
def _set_adu_size(self):
"""Set adu size."""
# base ADU size of modbus frame in bytes
if isinstance(self.client.framer, ModbusSocketFramer):
self.base_adu_size = 7 # tid(2), pid(2), length(2), uid(1)
elif isinstance(self.client.framer, ModbusRtuFramer):
self.base_adu_size = 3 # address(1), CRC(2)
elif isinstance(self.client.framer, ModbusAsciiFramer):
self.base_adu_size = 7 # start(1)+ Address(2), LRC(2) + end(2)
elif isinstance(self.client.framer, ModbusBinaryFramer):
self.base_adu_size = 5 # start(1) + Address(1), CRC(2) + end(1)
elif isinstance(self.client.framer, ModbusTlsFramer):
self.base_adu_size = 0 # no header and footer
else:
self.base_adu_size = -1
def _calculate_response_length(self, expected_pdu_size):
"""Calculate response length."""
if self.base_adu_size == -1:
return None
return self.base_adu_size + expected_pdu_size
def _calculate_exception_length(self):
"""Return the length of the Modbus Exception Response according to the type of Framer."""
if isinstance(self.client.framer, (ModbusSocketFramer, ModbusTlsFramer)):
return self.base_adu_size + 2 # Fcode(1), ExceptionCode(1)
if isinstance(self.client.framer, ModbusAsciiFramer):
return self.base_adu_size + 4 # Fcode(2), ExceptionCode(2)
if isinstance(self.client.framer, (ModbusRtuFramer, ModbusBinaryFramer)):
return self.base_adu_size + 2 # Fcode(1), ExceptionCode(1)
return None
def _validate_response(self, request, response, exp_resp_len):
"""Validate Incoming response against request.
:param request: Request sent
:param response: Response received
:param exp_resp_len: Expected response length
:return: New transactions state
"""
if not response:
return False
mbap = self.client.framer.decode_data(response)
if (
mbap.get("slave") != request.slave_id
or mbap.get("fcode") & 0x7F != request.function_code
):
return False
if "length" in mbap and exp_resp_len:
return mbap.get("length") == exp_resp_len
return True
def execute(self, request): # pylint: disable=too-complex
"""Start the producer to send the next request to consumer.write(Frame(request))."""
with self._transaction_lock:
try:
Log.debug(
"Current transaction state - {}",
ModbusTransactionState.to_string(self.client.state),
)
retries = self.retries
request.transaction_id = self.getNextTID()
Log.debug("Running transaction {}", request.transaction_id)
if _buffer := hexlify_packets(
self.client.framer._buffer # pylint: disable=protected-access
):
Log.debug("Clearing current Frame: - {}", _buffer)
self.client.framer.resetFrame()
if broadcast := (
self.client.params.broadcast_enable and not request.slave_id
):
self._transact(request, None, broadcast=True)
response = b"Broadcast write sent - no response expected"
else:
expected_response_length = None
if not isinstance(self.client.framer, ModbusSocketFramer):
if hasattr(request, "get_response_pdu_size"):
response_pdu_size = request.get_response_pdu_size()
if isinstance(self.client.framer, ModbusAsciiFramer):
response_pdu_size = response_pdu_size * 2
if response_pdu_size:
expected_response_length = (
self._calculate_response_length(response_pdu_size)
)
if ( # pylint: disable=simplifiable-if-statement
request.slave_id in self._no_response_devices
):
full = True
else:
full = False
c_str = str(self.client)
if "modbusudpclient" in c_str.lower().strip():
full = True
if not expected_response_length:
expected_response_length = Defaults.ReadSize
response, last_exception = self._transact(
request,
expected_response_length,
full=full,
broadcast=broadcast,
)
while retries > 0:
valid_response = self._validate_response(
request, response, expected_response_length
)
if valid_response:
if (
request.slave_id in self._no_response_devices
and response
):
self._no_response_devices.remove(request.slave_id)
Log.debug("Got response!!!")
break
if not response:
if request.slave_id not in self._no_response_devices:
self._no_response_devices.append(request.slave_id)
if self.retry_on_empty:
response, last_exception = self._retry_transaction(
retries,
"empty",
request,
expected_response_length,
full=full,
)
retries -= 1
else:
# No response received and retries not enabled
break
elif self.retry_on_invalid:
response, last_exception = self._retry_transaction(
retries,
"invalid",
request,
expected_response_length,
full=full,
)
retries -= 1
else:
break
# full = False
addTransaction = partial( # pylint: disable=invalid-name
self.addTransaction,
tid=request.transaction_id,
)
self.client.framer.processIncomingPacket(
response,
addTransaction,
request.slave_id,
tid=request.transaction_id,
)
if not (response := self.getTransaction(request.transaction_id)):
if len(self.transactions):
response = self.getTransaction(tid=0)
else:
last_exception = last_exception or (
"No Response received from the remote slave"
"/Unable to decode response"
)
response = ModbusIOException(
last_exception, request.function_code
)
if self.reset_socket:
self.client.close()
if hasattr(self.client, "state"):
Log.debug(
"Changing transaction state from "
'"PROCESSING REPLY" to '
'"TRANSACTION_COMPLETE"'
)
self.client.state = ModbusTransactionState.TRANSACTION_COMPLETE
return response
except ModbusIOException as exc:
# Handle decode errors in processIncomingPacket method
Log.error("Modbus IO exception {}", exc)
self.client.state = ModbusTransactionState.TRANSACTION_COMPLETE
if self.reset_socket:
self.client.close()
return exc
def _retry_transaction(self, retries, reason, packet, response_length, full=False):
"""Retry transaction."""
Log.debug("Retry on {} response - {}", reason, retries)
Log.debug('Changing transaction state from "WAITING_FOR_REPLY" to "RETRYING"')
self.client.state = ModbusTransactionState.RETRYING
if self.backoff:
delay = 2 ** (self.retries - retries) * self.backoff
time.sleep(delay)
Log.debug("Sleeping {}", delay)
self.client.connect()
if hasattr(self.client, "_in_waiting"):
if (
in_waiting := self.client._in_waiting() # pylint: disable=protected-access
):
if response_length == in_waiting:
result = self._recv(response_length, full)
return result, None
return self._transact(packet, response_length, full=full)
def _transact(self, packet, response_length, full=False, broadcast=False):
"""Do a Write and Read transaction.
:param packet: packet to be sent
:param response_length: Expected response length
:param full: the target device was notorious for its no response. Dont
waste time this time by partial querying
:param broadcast:
:return: response
"""
last_exception = None
try:
self.client.connect()
packet = self.client.framer.buildPacket(packet)
Log.debug("SEND: {}", packet, ":hex")
size = self._send(packet)
if (
isinstance(size, bytes)
and self.client.state == ModbusTransactionState.RETRYING
):
Log.debug(
"Changing transaction state from "
'"RETRYING" to "PROCESSING REPLY"'
)
self.client.state = ModbusTransactionState.PROCESSING_REPLY
return size, None
if broadcast:
if size:
Log.debug(
'Changing transaction state from "SENDING" '
'to "TRANSACTION_COMPLETE"'
)
self.client.state = ModbusTransactionState.TRANSACTION_COMPLETE
return b"", None
if size:
Log.debug(
'Changing transaction state from "SENDING" '
'to "WAITING FOR REPLY"'
)
self.client.state = ModbusTransactionState.WAITING_FOR_REPLY
if (
hasattr(self.client, "handle_local_echo")
and self.client.handle_local_echo is True
):
if self._recv(size, full) != packet:
return b"", "Wrong local echo"
result = self._recv(response_length, full)
# result2 = self._recv(response_length, full)
Log.debug("RECV: {}", result, ":hex")
except (OSError, ModbusIOException, InvalidMessageReceivedException) as msg:
if self.reset_socket:
self.client.close()
Log.debug("Transaction failed. ({}) ", msg)
last_exception = msg
result = b""
return result, last_exception
def _send(self, packet, _retrying=False):
"""Send."""
return self.client.framer.sendPacket(packet)
def _recv(self, expected_response_length, full): # pylint: disable=too-complex
"""Receive."""
total = None
if not full:
exception_length = self._calculate_exception_length()
if isinstance(self.client.framer, ModbusSocketFramer):
min_size = 8
elif isinstance(self.client.framer, ModbusRtuFramer):
min_size = 4
elif isinstance(self.client.framer, ModbusAsciiFramer):
min_size = 5
elif isinstance(self.client.framer, ModbusBinaryFramer):
min_size = 3
else:
min_size = expected_response_length
read_min = self.client.framer.recvPacket(min_size)
if len(read_min) != min_size:
msg_start = "Incomplete message" if read_min else "No response"
raise InvalidMessageReceivedException(
f"{msg_start} received, expected at least {min_size} bytes "
f"({len(read_min)} received)"
)
if read_min:
if isinstance(self.client.framer, ModbusSocketFramer):
func_code = int(read_min[-1])
elif isinstance(self.client.framer, ModbusRtuFramer):
func_code = int(read_min[1])
elif isinstance(self.client.framer, ModbusAsciiFramer):
func_code = int(read_min[3:5], 16)
elif isinstance(self.client.framer, ModbusBinaryFramer):
func_code = int(read_min[-1])
else:
func_code = -1
if func_code < 0x80: # Not an error
if isinstance(self.client.framer, ModbusSocketFramer):
# Omit UID, which is included in header size
h_size = (
self.client.framer._hsize # pylint: disable=protected-access
)
length = struct.unpack(">H", read_min[4:6])[0] - 1
expected_response_length = h_size + length
elif expected_response_length is None and isinstance(
self.client.framer, ModbusRtuFramer
):
with suppress(
IndexError # response length indeterminate with available bytes
):
expected_response_length = (
self.client.framer.get_expected_response_length(
read_min
)
)
if expected_response_length is not None:
expected_response_length -= min_size
total = expected_response_length + min_size
else:
expected_response_length = exception_length - min_size
total = expected_response_length + min_size
else:
total = expected_response_length
else:
read_min = b""
total = expected_response_length
result = self.client.framer.recvPacket(expected_response_length)
result = read_min + result
actual = len(result)
if total is not None and actual != total:
msg_start = "Incomplete message" if actual else "No response"
Log.debug(
"{} received, Expected {} bytes Received {} bytes !!!!",
msg_start,
total,
actual,
)
elif not actual:
# If actual == 0 and total is not None then the above
# should be triggered, so total must be None here
Log.debug("No response received to unbounded read !!!!")
if self.client.state != ModbusTransactionState.PROCESSING_REPLY:
Log.debug(
"Changing transaction state from "
'"WAITING FOR REPLY" to "PROCESSING REPLY"'
)
self.client.state = ModbusTransactionState.PROCESSING_REPLY
return result
def addTransaction(self, request, tid=None):
"""Add a transaction to the handler.
This holds the request in case it needs to be resent.
After being sent, the request is removed.
:param request: The request to hold on to
:param tid: The overloaded transaction id to use
:raises NotImplementedException:
"""
raise NotImplementedException("addTransaction")
def getTransaction(self, tid):
"""Return a transaction matching the referenced tid.
If the transaction does not exist, None is returned
:param tid: The transaction to retrieve
:raises NotImplementedException:
"""
raise NotImplementedException("getTransaction")
def delTransaction(self, tid):
"""Remove a transaction matching the referenced tid.
:param tid: The transaction to remove
:raises NotImplementedException:
"""
raise NotImplementedException("delTransaction")
def getNextTID(self):
"""Retrieve the next unique transaction identifier.
This handles incrementing the identifier after
retrieval
:returns: The next unique transaction identifier
"""
self.tid = (self.tid + 1) & 0xFFFF
return self.tid
def reset(self):
"""Reset the transaction identifier."""
self.tid = Defaults.TransactionId
self.transactions = type( # pylint: disable=attribute-defined-outside-init
self.transactions
)()
class DictTransactionManager(ModbusTransactionManager):
"""Implements a transaction for a manager.
Where the results are keyed based on the supplied transaction id.
"""
def __init__(self, client, **kwargs):
"""Initialize an instance of the ModbusTransactionManager.
:param client: The client socket wrapper
"""
self.transactions = {}
super().__init__(client, **kwargs)
def __iter__(self):
"""Iterate over the current managed transactions.
:returns: An iterator of the managed transactions
"""
return iter(self.transactions.keys())
def addTransaction(self, request, tid=None):
"""Add a transaction to the handler.
This holds the requests in case it needs to be resent.
After being sent, the request is removed.
:param request: The request to hold on to
:param tid: The overloaded transaction id to use
"""
tid = tid if tid is not None else request.transaction_id
Log.debug("Adding transaction {}", tid)
self.transactions[tid] = request
def getTransaction(self, tid):
"""Return a transaction matching the referenced tid.
If the transaction does not exist, None is returned
:param tid: The transaction to retrieve
"""
Log.debug("Getting transaction {}", tid)
if not tid:
if self.transactions:
return self.transactions.popitem()[1]
return None
return self.transactions.pop(tid, None)
def delTransaction(self, tid):
"""Remove a transaction matching the referenced tid.
:param tid: The transaction to remove
"""
Log.debug("deleting transaction {}", tid)
self.transactions.pop(tid, None)
class FifoTransactionManager(ModbusTransactionManager):
"""Implements a transaction.
For a manager where the results are returned in a FIFO manner.
"""
def __init__(self, client, **kwargs):
"""Initialize an instance of the ModbusTransactionManager.
:param client: The client socket wrapper
"""
super().__init__(client, **kwargs)
self.transactions = []
def __iter__(self):
"""Iterate over the current managed transactions.
:returns: An iterator of the managed transactions
"""
return iter(self.transactions)
def addTransaction(self, request, tid=None):
"""Add a transaction to the handler.
This holds the requests in case it needs to be resent.
After being sent, the request is removed.
:param request: The request to hold on to
:param tid: The overloaded transaction id to use
"""
tid = tid if tid is not None else request.transaction_id
Log.debug("Adding transaction {}", tid)
self.transactions.append(request)
def getTransaction(self, tid):
"""Return a transaction matching the referenced tid.
If the transaction does not exist, None is returned
:param tid: The transaction to retrieve
"""
return self.transactions.pop(0) if self.transactions else None
def delTransaction(self, tid):
"""Remove a transaction matching the referenced tid.
:param tid: The transaction to remove
"""
Log.debug("Deleting transaction {}", tid)
if self.transactions:
self.transactions.pop(0)