-
Notifications
You must be signed in to change notification settings - Fork 882
/
client_performance.py
executable file
·83 lines (69 loc) · 2.41 KB
/
client_performance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#!/usr/bin/env python3
"""Test performance of client: sync vs. async.
This example show how much faster the async version is.
example run:
(pymodbus) % ./client_performance.py
--- Testing sync client v3.4.1
running 1000 call (each 10 registers), took 114.10 seconds
Averages 114.10 ms pr call and 11.41 ms pr register.
--- Testing async client v3.4.1
running 1000 call (each 10 registers), took 0.33 seconds
Averages 0.33 ms pr call and 0.03 ms pr register.
"""
import asyncio
import time
from pymodbus import Framer
from pymodbus.client import AsyncModbusSerialClient, ModbusSerialClient
LOOP_COUNT = 1000
REGISTER_COUNT = 10
def run_sync_client_test():
"""Run sync client."""
print("--- Testing sync client v3.4.1")
client = ModbusSerialClient(
"/dev/ttys007",
framer_name=Framer.RTU,
baudrate=9600,
)
client.connect()
assert client.connected
start_time = time.time()
for _i in range(LOOP_COUNT):
rr = client.read_input_registers(1, REGISTER_COUNT, slave=1)
if rr.isError():
print(f"Received Modbus library error({rr})")
break
client.close()
run_time = time.time() - start_time
avg_call = (run_time / LOOP_COUNT) * 1000
avg_register = avg_call / REGISTER_COUNT
print(
f"running {LOOP_COUNT} call (each {REGISTER_COUNT} registers), took {run_time:.2f} seconds"
)
print(f"Averages {avg_call:.2f} ms pr call and {avg_register:.2f} ms pr register.")
async def run_async_client_test():
"""Run async client."""
print("--- Testing async client v3.4.1")
client = AsyncModbusSerialClient(
"/dev/ttys007",
framer_name=Framer.RTU,
baudrate=9600,
)
await client.connect()
assert client.connected
start_time = time.time()
for _i in range(LOOP_COUNT):
rr = await client.read_input_registers(1, REGISTER_COUNT, slave=1)
if rr.isError():
print(f"Received Modbus library error({rr})")
break
client.close()
run_time = time.time() - start_time
avg_call = (run_time / LOOP_COUNT) * 1000
avg_register = avg_call / REGISTER_COUNT
print(
f"running {LOOP_COUNT} call (each {REGISTER_COUNT} registers), took {run_time:.2f} seconds"
)
print(f"Averages {avg_call:.2f} ms pr call and {avg_register:.2f} ms pr register.")
if __name__ == "__main__":
run_sync_client_test()
asyncio.run(run_async_client_test())