-
Notifications
You must be signed in to change notification settings - Fork 11
/
cli.py
executable file
·206 lines (184 loc) · 8.66 KB
/
cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
"""Command-line interface."""
import logging
import sys
from time import time, sleep
import click
from paho.mqtt.client import Client as MQTTClient
from samil import InverterListener
from samil.inverter import InverterNotFoundError
# @click.group(context_settings={"help_option_names": ["-h", "--help"]})
@click.group()
@click.option('--debug', is_flag=True, help="Enable debug output.")
@click.version_option()
def cli(debug: bool):
"""Samil Power inverter command-line tool."""
if debug:
logging.basicConfig(level=logging.DEBUG)
@cli.command()
@click.option('--interval',
default=5.0,
help="Status interval.",
show_default=True,
type=click.FloatRange(min=0, max=20))
@click.option('--interface', help="IP address of local network interface to bind to.")
def monitor(interval: float, interface: str):
"""Print model and status info for an inverter.
When you have multiple inverters, run this command multiple times to
connect to all inverters.
"""
_model_keys = {
'device_type': 'Device type',
'va_rating': 'VA rating',
'firmware_version': 'Firmware version',
'model_name': 'Model name',
'manufacturer': 'Manufacturer',
'serial_number': 'Serial number',
'communication_version': 'Communication version',
'other_version': 'Other version',
'general': 'General',
}
_status_keys = {
'operation_mode': ('Operation mode', ''),
'total_operation_time': ('Total operation time', 'h'),
'pv1_input_power': ('PV1 input power', 'W'),
'pv2_input_power': ('PV2 input power', 'W'),
'pv1_voltage': ('PV1 voltage', 'V'),
'pv2_voltage': ('PV2 voltage', 'V'),
'pv1_current': ('PV1 current', 'A'),
'pv2_current': ('PV2 current', 'A'),
'output_power': ('Output power', 'W'),
'energy_today': ('Energy today', 'kWh'),
'energy_total': ('Energy total', 'kWh'),
'grid_current': ('Grid current', 'A'),
'grid_voltage': ('Grid voltage', 'V'),
'grid_frequency': ('Grid frequency', 'Hz'),
'grid_current_r_phase': ('Grid current R-phase', 'A'),
'grid_voltage_r_phase': ('Grid voltage R-phase', 'V'),
'grid_frequency_r_phase': ('Grid frequency R-phase', 'Hz'),
'grid_current_s_phase': ('Grid current S-phase', 'A'),
'grid_voltage_s_phase': ('Grid voltage S-phase', 'V'),
'grid_frequency_s_phase': ('Grid frequency S-phase', 'Hz'),
'grid_current_t_phase': ('Grid current T-phase', 'A'),
'grid_voltage_t_phase': ('Grid voltage T-phase', 'V'),
'grid_frequency_t_phase': ('Grid frequency T-phase', 'Hz'),
'internal_temperature': ('Internal temperature', '°C'),
'heatsink_temperature': ('Heatsink temperature', '°C'),
}
def _format_two_tuple(t):
width = max([len(k) for k, v in t])
rows = ['{:.<{width}}...{}'.format(k, v, width=width) for k, v in t]
return '\n'.join(rows)
def _format_model(d):
t = [(_model_keys[k], v) for k, v in d.items()]
return _format_two_tuple(t)
def _format_status(d):
t = [(_status_keys[k], v) for k, v in d.items()]
t = [(form[0], '{}{}{}'.format(v, ' ' if form[1] else '', form[1])) for form, v in t]
return _format_two_tuple(t)
with InverterListener(interface_ip=interface or '') as listener:
print("Searching for inverter")
try:
inverter = listener.accept_inverter()
except InverterNotFoundError:
print("Could not find inverter")
sys.exit()
with inverter:
print("Found inverter on address {}".format(inverter.addr))
model_dict = inverter.model()
print()
print("Model info")
print(_format_model(model_dict))
n = 1
t = time()
while True:
status_dict = inverter.status()
print()
print("Status data #{}".format(n))
print(_format_status(status_dict))
t += interval
n += 1
sleep(max(t - time(), 0))
@cli.command()
@click.option('--host', '-h', default="localhost", help="MQTT broker hostname/IP address.", show_default=True)
@click.option('--port', '-p', default=1883, help="MQTT broker port.", show_default=True)
@click.option('--client-id',
default='',
help="Client ID used when connecting to the broker. If not provided, one will be randomly generated.")
@click.option('--tls', is_flag=True, default=False, help="Enable SSL/TLS support.")
@click.option('--username', help="MQTT username.")
@click.option('--password', help="MQTT password.")
@click.option('--interface', help="IP address of local network interface to bind to.")
def mqtt(host, port: int, client_id, tls: bool, username, password, interface):
"""Publish inverter data to an MQTT broker."""
client = MQTTClient(client_id=client_id)
if tls:
client.tls_set()
if username:
client.username_pw_set(username, password)
client.connect(host=host, port=port, bind_address=interface or '')
client.publish("inverter", payload="hihi")
# client.loop_start() # Starts handling MQTT traffic in separate thread
client.loop_forever()
# def pvoutput(args):
# if args.num < 1:
# raise ValueError('Invalid number of inverters')
# # Set logging
# loglevel = logging.ERROR if args.quiet else logging.DEBUG if args.verbose else logging.INFO
# logging.basicConfig(format='%(levelname)s:%(module)s:%(message)s', level=loglevel)
# # Search for the right inverter
# with InverterListener(interface_ip=args.interface) as listener:
# selected_inverters = []
# ignored_inverters = []
# while True:
# inverter = listener.accept_inverter()
# if not inverter:
# raise ConnectionError('Could not find inverter')
# model = inverter.model()
# logging.info('Inverter serial number: %s', model['serial_number'])
# # Check if inverter is selected
# selected = True
# if args.serial_number and model['serial_number'] not in args.serial_number:
# selected = False
# if args.ip and inverter.addr[0] not in args.ip:
# selected = False
# logging.info('Inverter is %s', 'selected' if selected else 'ignored')
# if selected:
# selected_inverters.append(inverter)
# else:
# ignored_inverters.append(inverter)
#
# # PVOutput
# parser_pvoutput = subparsers.add_parser('pvoutput', help='upload status data to PVOutput',
# description='Upload status data to PVOutput.')
# parser_pvoutput.add_argument('-i', '--interface', help='bind interface IP (default: all interfaces)', default='')
# parser_pvoutput.add_argument('-n', '--inverters', type=int, default=1,
# help='number of inverters (default: %(default)s)', dest='num')
# matcher_group = parser_pvoutput.add_mutually_exclusive_group()
# matcher_group.add_argument('--only-serial', nargs='*', dest='serial_number',
# help='only match inverters with one of the given serial numbers')
# matcher_group.add_argument('--only-ip', nargs='*', dest='ip',
# help='only match inverters with one of the given IPs')
# parser_pvoutput.add_argument('-s', '--system', type=int, help='PVOutput system ID')
# parser_pvoutput.add_argument('-k', '--api-key', type=int, help='PVOutput system API key')
# parser_pvoutput.set_defaults(func=pvoutput)
#
# # History
# parser_history = subparsers.add_parser('history', help='fetch historical generation data from inverter',
# description='Fetch historical generation data from inverter.')
# parser_history.add_argument('start', type=int, help='start year')
# parser_history.add_argument('end', type=int, help='end year')
# parser_history.add_argument('-i', '--interface', help='bind interface IP (default: all interfaces)', default='')
# matcher_group = parser_history.add_mutually_exclusive_group()
# matcher_group.add_argument('--serial', help='match only inverter with given serial number', dest='serial_number')
# matcher_group.add_argument('--ip', help='match only inverter with given IP address', dest='ip')
# parser_history.set_defaults(func=history)
#
# args = parser.parse_args()
# # Debug output
# if args.debug:
# logging.basicConfig(level=logging.DEBUG)
#
# if not args.subcommand:
# parser.print_help()
# parser.exit()
# args.func(args)