-
Notifications
You must be signed in to change notification settings - Fork 3
/
cmd.py
93 lines (68 loc) · 1.91 KB
/
cmd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import os
import subprocess
try:
protocol = os.environ['PROTOCOL']
except KeyError:
protocol = None
print('Protocol is not set')
exit(1)
def serial_port():
try:
return os.environ['SERIAL_PORT']
except KeyError:
print('Serial port is not set')
exit(1)
def slave_address():
return os.environ.get('SLAVE_ADDRESS', None)
def parity():
return os.environ.get('PARITY', "even")
def baudrate():
return os.environ.get('BAUDRATE', None)
def tcp_port():
return os.environ.get('TCP_PORT', 502)
def connection_timeout():
return os.environ.get('CONNECTION_TIMEOUT', 60)
def data_bits():
return os.environ.get('DATA_BITS', 8)
def stop_bits():
return os.environ.get('STOP_BITS', 1)
def master_activity_timeout():
return os.environ.get('MASTER_ACTIVITY_TIMEOUT', 3)
def add_network_arguments(a):
a.append('-p')
a.append(tcp_port())
a.append('-c')
a.append(connection_timeout())
a.append('-o')
a.append(master_activity_timeout())
arguments = ['/usr/bin/diagslave', '-m']
if protocol == 'tcp':
arguments.append('tcp')
add_network_arguments(arguments)
elif protocol == 'udp':
arguments.append('udp')
add_network_arguments(arguments)
elif protocol in ['ascii', 'rtu']:
arguments.append(protocol)
sa = slave_address()
if sa:
arguments.append('-a')
arguments.append(sa)
b = baudrate()
if b:
arguments.append('-b')
arguments.append(b)
arguments.append('-p')
arguments.append(parity())
arguments.append('-d')
arguments.append(data_bits())
arguments.append('-s')
arguments.append(stop_bits())
arguments.append('-o')
arguments.append(master_activity_timeout())
arguments.append(serial_port())
else:
print(f'Unknown protocol {protocol}')
exit(1)
print(f'Start process with parameters {arguments}')
subprocess.call([str(a) for a in arguments])