/
counter.py
64 lines (57 loc) · 2.38 KB
/
counter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#!/usr/bin/env python3
from modules.common import simcount
from modules.common.component_state import CounterState
from modules.common.fault_state import ComponentInfo
from modules.common.store import get_counter_value_store
from modules.http.api import create_request_function
def get_default_config() -> dict:
return {
"name": "HTTP Zähler",
"id": 0,
"type": "counter",
"configuration": {
"power_path": None,
"imported_path": None,
"exported_path": None,
"current_l1_path": None,
"current_l2_path": None,
"current_l3_path": None,
}
}
class HttpCounter:
def __init__(self, device_id: int, component_config: dict, url: str) -> None:
self.__get_power = create_request_function(url, component_config["configuration"]["power_path"])
self.__get_imported = create_request_function(url, component_config["configuration"]["imported_path"])
self.__get_exported = create_request_function(url, component_config["configuration"]["exported_path"])
self.__get_currents = [
create_request_function(url,
component_config["configuration"]["current_l" + str(i) + "_path"])
for i in range(1, 4)
]
self.__device_id = device_id
self.component_config = component_config
self.__sim_count = simcount.SimCountFactory().get_sim_counter()()
self.simulation = {}
self.__store = get_counter_value_store(component_config["id"])
self.component_info = ComponentInfo.from_component_config(component_config)
def update(self):
imported = self.__get_imported()
exported = self.__get_exported()
power = self.__get_power()
if imported is None or exported is None:
topic_str = "openWB/set/system/device/{}/component/{}/".format(
self.__device_id, self.component_config["id"]
)
imported, exported = self.__sim_count.sim_count(
power,
topic=topic_str,
data=self.simulation,
prefix="bezug"
)
counter_state = CounterState(
currents=[getter() for getter in self.__get_currents],
imported=imported,
exported=exported,
power=power
)
self.__store.set(counter_state)