forked from kcbam/dbus-huaweisun2000-pvinverter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dbus-huaweisun2000-pvinverter.py
201 lines (164 loc) · 8.9 KB
/
dbus-huaweisun2000-pvinverter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
#!/usr/bin/env python3
"""
A class to put a simple service on the dbus, according to victron standards, with constantly updating
paths. See example usage below. It is used to generate dummy data for other processes that rely on the
dbus. See files in dbus_vebus_to_pvinverter/test and dbus_vrm/test for other usage examples.
To change a value while testing, without stopping your dummy script and changing its initial value, write
to the dummy data via the dbus. See example.
https://github.com/victronenergy/dbus_vebus_to_pvinverter/tree/master/test
"""
from gi.repository import GLib
import platform
import logging
import sys
import time
import os
import config
from dbus.mainloop.glib import DBusGMainLoop
from connector_modbus import ModbusDataCollector2000Delux
from settings import HuaweiSUN2000Settings
# our own packages from victron
sys.path.insert(1, os.path.join(os.path.dirname(__file__), '/opt/victronenergy/dbus-systemcalc-py/ext/velib_python'))
from vedbus import VeDbusService
class DbusSun2000Service:
def __init__(self, servicename, settings, paths, data_connector, serialnumber='X',
productname='Huawei Sun2000 PV-Inverter'):
self._dbusservice = VeDbusService(servicename)
# self._paths = paths
self._data_connector = data_connector
logging.debug("%s /DeviceInstance = %d" % (servicename, settings.get_vrm_instance()))
# productname="Huawei Sun2000" #tmp please del
# Create the management objects, as specified in the ccgx dbus-api document
self._dbusservice.add_path('/Mgmt/ProcessName', __file__)
self._dbusservice.add_path('/Mgmt/ProcessVersion',
'Unkown version, and running on Python ' + platform.python_version())
self._dbusservice.add_path('/Mgmt/Connection', 'Internal Wifi Modbus TCP')
# Create the mandatory objects
self._dbusservice.add_path('/DeviceInstance', settings.get_vrm_instance())
self._dbusservice.add_path('/ProductId', 0) # Huawei does not have a product id
self._dbusservice.add_path('/ProductName', productname)
self._dbusservice.add_path('/CustomName', settings.get("custom_name"))
self._dbusservice.add_path('/FirmwareVersion', 1.0)
self._dbusservice.add_path('/HardwareVersion', 0)
self._dbusservice.add_path('/Connected', 1, writeable=True)
# Create the mandatory objects
self._dbusservice.add_path('/Latency', None)
self._dbusservice.add_path('/Role', "pvinverter")
self._dbusservice.add_path('/Position', settings.get("position")) # 0 = AC Input, 1 = AC-Out 1, AC-Out 2
self._dbusservice.add_path('/Serial', serialnumber)
self._dbusservice.add_path('/ErrorCode', 0)
self._dbusservice.add_path('/UpdateIndex', 0)
self._dbusservice.add_path('/StatusCode', 7)
for _path, _settings in paths.items():
self._dbusservice.add_path(
_path, _settings['initial'], gettextcallback=_settings.get('textformat', lambda p,v:v), writeable=True,
onchangecallback=self._handlechangedvalue)
GLib.timeout_add(settings.get('update_time_ms'), self._update) # pause in ms before the next request
def _update(self):
with self._dbusservice as s:
try:
meter_data = self._data_connector.getData()
for k, v in meter_data.items():
logging.info(f"set {k} to {v}")
s[k] = v
# increment UpdateIndex - to show that new data is available (and wrap)
s['/UpdateIndex'] = (s['/UpdateIndex'] + 1) % 256
# update lastupdate vars
self._lastUpdate = time.time()
except Exception as e:
logging.critical('Error at %s', '_update', exc_info=e)
return True
def _handlechangedvalue(self, path, value):
logging.debug("someone else updated %s to %s" % (path, value))
return True # accept the change
def exit_mainloop(mainloop):
mainloop.quit()
def main():
# FIXME: This should be a proper private logger, instead of trying to configure the root logger,
# which doesn't work unless force=True is specified and then leads to all sorts of libraries
# logging lots of debug data
logging.basicConfig(format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=config.LOGGING,
handlers=[
logging.StreamHandler()
])
# Have a mainloop, so we can send/receive asynchronous calls to and from dbus
DBusGMainLoop(set_as_default=True)
settings = HuaweiSUN2000Settings()
logging.info(f"VRM pvinverter instance: {settings.get_vrm_instance()}")
logging.info(f"Settings: ModbusHost '{settings.get('modbus_host')}', ModbusPort '{settings.get('modbus_port')}', ModbusUnit '{settings.get('modbus_unit')}'")
logging.info(f"Settings: CustomName '{settings.get('custom_name')}', Position '{settings.get('position')}', UpdateTimeMS '{settings.get('update_time_ms')}'")
logging.info(f"Settings: PowerCorrectionFactor '{settings.get('power_correction_factor')}'")
while "255" in settings.get("modbus_host"):
# This catches the initial setting and allows the service to be installed without configuring it first
logging.warning(f"Please configure the modbus host and other settings in the VenusOS GUI (current setting: {settings.get('modbus_host')})")
# Running a mainloop means we'll be notified about config changes and exit in that case (which restarts the service)
mainloop = GLib.MainLoop()
mainloop.run()
modbus = ModbusDataCollector2000Delux(host = settings.get("modbus_host"),
port=settings.get("modbus_port"),
modbus_unit=settings.get("modbus_unit"),
power_correction_factor=settings.get("power_correction_factor"))
while True:
staticdata = modbus.getStaticData()
if staticdata is None:
logging.error(f"Didn't receive static data from modbus, error is above. Sleeping 10 seconds before retrying.")
# Again we sleep in the mainloop in order to pick up config changes
mainloop = GLib.MainLoop()
GLib.timeout_add(10000, exit_mainloop, mainloop)
mainloop.run()
continue
else:
break
try:
logging.info("Starting up");
# formatting
_kwh = lambda p, v: (str(round(v, 2)) + ' kWh')
_a = lambda p, v: (str(round(v, 1)) + ' A')
_w = lambda p, v: (str(round(v, 1)) + ' W')
_v = lambda p, v: (str(round(v, 1)) + ' V')
_hz = lambda p, v: f"{v:.4f}Hz"
_n = lambda p, v: f"{v:i}"
dbuspath = {
'/Ac/Power': {'initial': 0, 'textformat': _w},
'/Ac/Current': {'initial': 0, 'textformat': _a},
'/Ac/Voltage': {'initial': 0, 'textformat': _v},
'/Ac/Energy/Forward': {'initial': None, 'textformat': _kwh},
#
'/Ac/L1/Power': {'initial': 0, 'textformat': _w},
'/Ac/L1/Current': {'initial': 0, 'textformat': _a},
'/Ac/L1/Voltage': {'initial': 0, 'textformat': _v},
'/Ac/L1/Frequency': {'initial': None, 'textformat': _hz},
'/Ac/L1/Energy/Forward': {'initial': None, 'textformat': _kwh},
#
'/Ac/MaxPower': {'initial': 20000, 'textformat': _w},
'/Ac/StatusCode': {'initial': 0, 'textformat': _n},
'/Ac/L2/Power': {'initial': 0, 'textformat': _w},
'/Ac/L2/Current': {'initial': 0, 'textformat': _a},
'/Ac/L2/Voltage': {'initial': 0, 'textformat': _v},
'/Ac/L2/Frequency': {'initial': None, 'textformat': _hz},
'/Ac/L2/Energy/Forward': {'initial': None, 'textformat': _kwh},
'/Ac/L3/Power': {'initial': 0, 'textformat': _w},
'/Ac/L3/Current': {'initial': 0, 'textformat': _a},
'/Ac/L3/Voltage': {'initial': 0, 'textformat': _v},
'/Ac/L3/Frequency': {'initial': None, 'textformat': _hz},
'/Ac/L3/Energy/Forward': {'initial': None, 'textformat': _kwh},
'/Dc/Power': {'initial': 0, 'textformat': _w},
'/Status': {'initial': ""},
}
pvac_output = DbusSun2000Service(
servicename='com.victronenergy.pvinverter.sun2000',
settings=settings,
paths=dbuspath,
productname=staticdata['Model'],
serialnumber=staticdata['SN'],
data_connector=modbus
)
logging.info('Connected to dbus, and switching over to GLib.MainLoop() (= event based)')
mainloop = GLib.MainLoop()
mainloop.run()
except Exception as e:
logging.critical('Error at %s', 'main', exc_info=e)
if __name__ == "__main__":
main()