Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
15 changed files
with
701 additions
and
1,067 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,9 @@ | ||
from pathlib import Path | ||
|
||
RAMDSIK_PATH = Path(__file__).resolve().parents[2] / "ramdisk" | ||
|
||
|
||
def is_ramdisk_in_use() -> bool: | ||
""" prüft, ob die Daten in der Ramdisk liegen (v1.x), sonst wird mit dem Broker (2.x) gearbeitet. | ||
""" | ||
return (Path(__file__).resolve().parents[2] / "ramdisk" / "bootinprogress").is_file() | ||
return (RAMDSIK_PATH / "bootinprogress").is_file() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
#!/usr/bin/env python3 | ||
from typing import List, Tuple | ||
|
||
from modules.common import modbus | ||
from modules.common.modbus import ModbusDataType | ||
|
||
|
||
class B32: | ||
def __init__(self, modbus_id: int, client: modbus.ModbusTcpClient_) -> None: | ||
self.client = client | ||
self.id = modbus_id | ||
|
||
def get_imported(self) -> float: | ||
return self.client.read_holding_registers(0x5000, ModbusDataType.UINT_64, unit=self.id) / 1000 | ||
|
||
def get_frequency(self) -> float: | ||
return self.client.read_holding_registers(0x5B2C, ModbusDataType.INT_16, unit=self.id) / 100 | ||
|
||
def get_currents(self) -> List[float]: | ||
return [val / 10 for val in self.client.read_holding_registers( | ||
0x5B0C, [ModbusDataType.UINT_32]*3, unit=self.id)] | ||
|
||
def get_power(self) -> Tuple[List[float], float]: | ||
power = self.client.read_input_registers(0x0C, ModbusDataType.INT_32, unit=self.id) / 100 | ||
return [0]*3, power | ||
|
||
def get_voltages(self) -> List[float]: | ||
return [val / 10 for val in self.client.read_holding_registers( | ||
0x5B00, [ModbusDataType.UINT_32]*3, unit=self.id)] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
#!/usr/bin/env python3 | ||
import logging | ||
from enum import IntEnum | ||
from typing import Optional, Tuple | ||
|
||
from modules.common import modbus | ||
from modules.common.fault_state import FaultState | ||
from modules.common.modbus import ModbusDataType | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class EvseState(IntEnum): | ||
READY = (1, False, False) | ||
EV_PRESENT = (2, True, False) | ||
CHARGING = (3, True, True) | ||
CHARGING_WITH_VENTILATION = (4, True, True) | ||
FAILURE = (5, None, None) | ||
|
||
def __new__(cls, num: int, plugged: Optional[bool], charge_enabled: Optional[bool]): | ||
member = int.__new__(cls, num) | ||
member._value_ = num | ||
member.plugged = plugged | ||
member.charge_enabled = charge_enabled | ||
return member | ||
|
||
|
||
class Evse: | ||
def __init__(self, modbus_id: int, client: modbus.ModbusSerialClient_) -> None: | ||
self.client = client | ||
self.id = modbus_id | ||
|
||
def get_plug_charge_state(self) -> Tuple[bool, bool, float]: | ||
set_current, _, state_number = self.client.read_holding_registers( | ||
1000, [ModbusDataType.UINT_16]*3, unit=self.id) | ||
log.debug("Gesetzte Stromstärke EVSE: "+str(set_current) + | ||
", Status: "+str(state_number)+", Modbus-ID: "+str(self.id)) | ||
state = EvseState(state_number) | ||
if state == EvseState.FAILURE: | ||
raise FaultState.error("Unbekannter Zustand der EVSE: State " + | ||
str(state)+", Sollstromstärke: "+str(set_current)) | ||
plugged = state.plugged | ||
charging = set_current > 0 if state.charge_enabled else False | ||
return plugged, charging, set_current | ||
|
||
def get_firmware_version(self) -> None: | ||
log.debug( | ||
"FW-Version: "+str(self.client.read_holding_registers(1005, ModbusDataType.UINT_16, unit=self.id))) | ||
|
||
def set_current(self, current: int) -> None: | ||
self.client.delegate.write_registers(1000, current, unit=self.id) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,68 +1,41 @@ | ||
#!/usr/bin/env python3 | ||
from modules.common.fault_state import FaultState | ||
from typing import List, Tuple | ||
|
||
from modules.common import modbus | ||
from modules.common.modbus import ModbusDataType | ||
from typing import List, Tuple | ||
|
||
|
||
class Mpm3pm: | ||
def __init__(self, modbus_id: int, client: modbus.ModbusTcpClient_) -> None: | ||
self.client = client | ||
self.id = modbus_id | ||
|
||
def __process_error(self, e): | ||
if isinstance(e, FaultState): | ||
raise | ||
else: | ||
raise FaultState.error(__name__+" "+str(type(e))+" " + str(e)) from e | ||
|
||
def get_voltages(self) -> List[float]: | ||
try: | ||
return [val / 10 for val in self.client.read_input_registers( | ||
0x08, [ModbusDataType.UINT_32]*3, unit=self.id)] | ||
except Exception as e: | ||
self.__process_error(e) | ||
return [val / 10 for val in self.client.read_input_registers( | ||
0x08, [ModbusDataType.UINT_32]*3, unit=self.id)] | ||
|
||
def get_imported(self) -> float: | ||
try: | ||
# Faktorisierung anders als in der Dokumentation angegeben | ||
return self.client.read_input_registers(0x0002, ModbusDataType.UINT_32, unit=self.id) * 10 | ||
except Exception as e: | ||
self.__process_error(e) | ||
# Faktorisierung anders als in der Dokumentation angegeben | ||
return self.client.read_input_registers(0x0002, ModbusDataType.UINT_32, unit=self.id) * 10 | ||
|
||
def get_power(self) -> Tuple[List[float], float]: | ||
try: | ||
powers = [val / 100 for val in self.client.read_input_registers( | ||
0x14, [ModbusDataType.INT_32]*3, unit=self.id)] | ||
power = self.client.read_input_registers(0x26, ModbusDataType.INT_32, unit=self.id) / 100 | ||
return powers, power | ||
except Exception as e: | ||
self.__process_error(e) | ||
powers = [val / 100 for val in self.client.read_input_registers( | ||
0x14, [ModbusDataType.INT_32]*3, unit=self.id)] | ||
power = self.client.read_input_registers(0x26, ModbusDataType.INT_32, unit=self.id) / 100 | ||
return powers, power | ||
|
||
def get_exported(self) -> float: | ||
try: | ||
# Faktorisierung anders als in der Dokumentation angegeben | ||
return self.client.read_input_registers(0x0004, ModbusDataType.UINT_32, unit=self.id) * 10 | ||
except Exception as e: | ||
self.__process_error(e) | ||
# Faktorisierung anders als in der Dokumentation angegeben | ||
return self.client.read_input_registers(0x0004, ModbusDataType.UINT_32, unit=self.id) * 10 | ||
|
||
def get_power_factors(self) -> List[float]: | ||
try: | ||
# Faktorisierung anders als in der Dokumentation angegeben | ||
return [val / 10 for val in self.client.read_input_registers( | ||
0x20, [ModbusDataType.UINT_32]*3, unit=self.id)] | ||
except Exception as e: | ||
self.__process_error(e) | ||
# Faktorisierung anders als in der Dokumentation angegeben | ||
return [val / 10 for val in self.client.read_input_registers( | ||
0x20, [ModbusDataType.UINT_32]*3, unit=self.id)] | ||
|
||
def get_frequency(self) -> float: | ||
try: | ||
return self.client.read_input_registers(0x2c, ModbusDataType.UINT_32, unit=self.id) / 100 | ||
except Exception as e: | ||
self.__process_error(e) | ||
return self.client.read_input_registers(0x2c, ModbusDataType.UINT_32, unit=self.id) / 100 | ||
|
||
def get_currents(self) -> List[float]: | ||
try: | ||
return [val / 100 for val in self.client.read_input_registers( | ||
0x0E, [ModbusDataType.UINT_32]*3, unit=self.id)] | ||
except Exception as e: | ||
self.__process_error(e) | ||
return [val / 100 for val in self.client.read_input_registers( | ||
0x0E, [ModbusDataType.UINT_32]*3, unit=self.id)] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,81 +1,51 @@ | ||
#!/usr/bin/env python3 | ||
from typing import List, Tuple | ||
|
||
from modules.common import modbus | ||
from modules.common.fault_state import FaultState | ||
from modules.common.modbus import ModbusDataType | ||
from typing import List, Tuple | ||
|
||
|
||
class Sdm: | ||
def __init__(self, modbus_id: int, client: modbus.ModbusTcpClient_) -> None: | ||
self.client = client | ||
self.id = modbus_id | ||
|
||
def process_error(self, e): | ||
if isinstance(e, FaultState): | ||
raise | ||
else: | ||
raise FaultState.error(__name__+" "+str(type(e))+" "+str(e)) from e | ||
|
||
def get_imported(self) -> float: | ||
try: | ||
return self.client.read_input_registers(0x0048, ModbusDataType.FLOAT_32, unit=self.id) * 1000 | ||
except Exception as e: | ||
self.process_error(e) | ||
return self.client.read_input_registers(0x0048, ModbusDataType.FLOAT_32, unit=self.id) * 1000 | ||
|
||
def get_exported(self) -> float: | ||
try: | ||
return self.client.read_input_registers(0x004a, ModbusDataType.FLOAT_32, unit=self.id) * 1000 | ||
except Exception as e: | ||
self.process_error(e) | ||
return self.client.read_input_registers(0x004a, ModbusDataType.FLOAT_32, unit=self.id) * 1000 | ||
|
||
def get_frequency(self) -> float: | ||
try: | ||
frequency = self.client.read_input_registers(0x46, ModbusDataType.FLOAT_32, unit=self.id) | ||
if frequency > 100: | ||
frequency = frequency / 10 | ||
return frequency | ||
except Exception as e: | ||
self.process_error(e) | ||
frequency = self.client.read_input_registers(0x46, ModbusDataType.FLOAT_32, unit=self.id) | ||
if frequency > 100: | ||
frequency = frequency / 10 | ||
return frequency | ||
|
||
|
||
class Sdm630(Sdm): | ||
def __init__(self, modbus_id: int, client: modbus.ModbusTcpClient_) -> None: | ||
super().__init__(modbus_id, client) | ||
|
||
def get_currents(self) -> List[float]: | ||
try: | ||
return self.client.read_input_registers(0x06, [ModbusDataType.FLOAT_32]*3, unit=self.id) | ||
except Exception as e: | ||
self.process_error(e) | ||
return self.client.read_input_registers(0x06, [ModbusDataType.FLOAT_32]*3, unit=self.id) | ||
|
||
def get_power_factors(self) -> List[float]: | ||
try: | ||
return self.client.read_input_registers(0x1E, [ModbusDataType.FLOAT_32]*3, unit=self.id) | ||
except Exception as e: | ||
self.process_error(e) | ||
return self.client.read_input_registers(0x1E, [ModbusDataType.FLOAT_32]*3, unit=self.id) | ||
|
||
def get_power(self) -> Tuple[List[float], float]: | ||
try: | ||
powers = self.client.read_input_registers(0x0C, [ModbusDataType.FLOAT_32]*3, unit=self.id) | ||
power = sum(powers) | ||
return powers, power | ||
except Exception as e: | ||
self.process_error(e) | ||
powers = self.client.read_input_registers(0x0C, [ModbusDataType.FLOAT_32]*3, unit=self.id) | ||
power = sum(powers) | ||
return powers, power | ||
|
||
def get_voltages(self) -> List[float]: | ||
try: | ||
return self.client.read_input_registers(0x00, [ModbusDataType.FLOAT_32]*3, unit=self.id) | ||
except Exception as e: | ||
self.process_error(e) | ||
return self.client.read_input_registers(0x00, [ModbusDataType.FLOAT_32]*3, unit=self.id) | ||
|
||
|
||
class Sdm120(Sdm): | ||
def __init__(self, modbus_id: int, client: modbus.ModbusTcpClient_) -> None: | ||
super().__init__(modbus_id, client) | ||
|
||
def get_power(self) -> Tuple[List[float], float]: | ||
try: | ||
power = self.client.read_input_registers(0x0C, ModbusDataType.FLOAT_32, unit=self.id) | ||
return [power, 0, 0], power | ||
except Exception as e: | ||
self.process_error(e) | ||
power = self.client.read_input_registers(0x0C, ModbusDataType.FLOAT_32, unit=self.id) | ||
return [power, 0, 0], power |
Oops, something went wrong.