-
Notifications
You must be signed in to change notification settings - Fork 881
/
client_async.py
executable file
·140 lines (122 loc) · 4.15 KB
/
client_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python3
"""Pymodbus asynchronous client example.
usage::
client_async.py [-h] [-c {tcp,udp,serial,tls}]
[-f {ascii,binary,rtu,socket,tls}]
[-l {critical,error,warning,info,debug}] [-p PORT]
[--baudrate BAUDRATE] [--host HOST]
-h, --help
show this help message and exit
-c, -comm {tcp,udp,serial,tls}
set communication, default is tcp
-f, --framer {ascii,binary,rtu,socket,tls}
set framer, default depends on --comm
-l, --log {critical,error,warning,info,debug}
set log level, default is info
-p, --port PORT
set port
--baudrate BAUDRATE
set serial device baud rate
--host HOST
set host, default is 127.0.0.1
The corresponding server must be started before e.g. as:
python3 server_sync.py
"""
import asyncio
import logging
import helper
import pymodbus.client as modbusClient
from pymodbus import ModbusException
_logger = logging.getLogger(__file__)
_logger.setLevel("DEBUG")
def setup_async_client(description=None, cmdline=None):
"""Run client setup."""
args = helper.get_commandline(
server=False, description=description, cmdline=cmdline
)
_logger.info("### Create client object")
if args.comm == "tcp":
client = modbusClient.AsyncModbusTcpClient(
args.host,
port=args.port, # on which port
# Common optional parameters:
framer=args.framer,
timeout=args.timeout,
retries=3,
reconnect_delay=1,
reconnect_delay_max=10,
# retry_on_empty=False,
# TCP setup parameters
# source_address=("localhost", 0),
)
elif args.comm == "udp":
client = modbusClient.AsyncModbusUdpClient(
args.host,
port=args.port,
# Common optional parameters:
framer=args.framer,
timeout=args.timeout,
# retries=3,
# retry_on_empty=False,
# UDP setup parameters
# source_address=None,
)
elif args.comm == "serial":
client = modbusClient.AsyncModbusSerialClient(
args.port,
# Common optional parameters:
# framer=ModbusRtuFramer,
timeout=args.timeout,
# retries=3,
# retry_on_empty=False,
# Serial setup parameters
baudrate=args.baudrate,
# bytesize=8,
# parity="N",
# stopbits=1,
# handle_local_echo=False,
)
elif args.comm == "tls":
client = modbusClient.AsyncModbusTlsClient(
args.host,
port=args.port,
# Common optional parameters:
framer=args.framer,
timeout=args.timeout,
# retries=3,
# retry_on_empty=False,
# close_comm_on_error=False,
# strict=True,
# TLS setup parameters
# sslctx=sslctx,
certfile=helper.get_certificate("crt"),
keyfile=helper.get_certificate("key"),
# password="none",
server_hostname="localhost",
)
return client
async def run_async_client(client, modbus_calls=None):
"""Run sync client."""
_logger.info("### Client starting")
await client.connect()
assert client.connected
if modbus_calls:
await modbus_calls(client)
client.close()
_logger.info("### End of Program")
async def run_a_few_calls(client):
"""Test connection works."""
try:
rr = await client.read_coils(32, 1, slave=1)
assert len(rr.bits) == 8
rr = await client.read_holding_registers(4, 2, slave=1)
assert rr.registers[0] == 17
assert rr.registers[1] == 17
except ModbusException:
pass
async def main(cmdline=None):
"""Combine setup and run."""
testclient = setup_async_client(description="Run client.", cmdline=cmdline)
await run_async_client(testclient, modbus_calls=run_a_few_calls)
if __name__ == "__main__":
asyncio.run(main(), debug=True) # pragma: no cover