Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QCells #1037

Merged
merged 1 commit into from Jul 19, 2023
Merged

QCells #1037

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Empty file.
43 changes: 43 additions & 0 deletions packages/modules/devices/qcells/bat.py
@@ -0,0 +1,43 @@
#!/usr/bin/env python3
from typing import Dict, Union

from dataclass_utils import dataclass_from_dict
from modules.common import modbus
from modules.common.component_state import BatState
from modules.common.component_type import ComponentDescriptor
from modules.common.fault_state import ComponentInfo
from modules.common.modbus import ModbusDataType
from modules.common.store import get_bat_value_store
from modules.devices.qcells.config import QCellsBatSetup


class QCellsBat:
def __init__(self,
component_config: Union[Dict, QCellsBatSetup],
tcp_client: modbus.ModbusTcpClient_,
modbus_id: int) -> None:
self.__modbus_id = modbus_id
self.component_config = dataclass_from_dict(QCellsBatSetup, component_config)
self.__tcp_client = tcp_client
self.store = get_bat_value_store(self.component_config.id)
self.component_info = ComponentInfo.from_component_config(self.component_config)

def update(self) -> None:
with self.__tcp_client:
power = self.__tcp_client.read_input_registers(0x0016, ModbusDataType.INT_16, unit=self.__modbus_id)
soc = self.__tcp_client.read_input_registers(0x001C, ModbusDataType.UINT_16, unit=self.__modbus_id)
imported = self.__tcp_client.read_input_registers(
0x0021, ModbusDataType.UINT_16, unit=self.__modbus_id) * 100
exported = self.__tcp_client.read_input_registers(
0x001D, ModbusDataType.UINT_16, unit=self.__modbus_id) * 100

bat_state = BatState(
power=power,
soc=soc,
imported=imported,
exported=exported
)
self.store.set(bat_state)


component_descriptor = ComponentDescriptor(configuration_factory=QCellsBatSetup)
63 changes: 63 additions & 0 deletions packages/modules/devices/qcells/config.py
@@ -0,0 +1,63 @@
from typing import Optional

from modules.common.component_setup import ComponentSetup


class QCellsConfiguration:
def __init__(self, modbus_id: int = 1, ip_address: Optional[str] = None,):
self.modbus_id = modbus_id
self.ip_address = ip_address


class QCells:
def __init__(self,
name: str = "QCells ESS",
type: str = "qcells",
id: int = 0,
configuration: QCellsConfiguration = None) -> None:
self.name = name
self.type = type
self.id = id
self.configuration = configuration or QCellsConfiguration()


class QCellsBatConfiguration:
def __init__(self):
pass


class QCellsBatSetup(ComponentSetup[QCellsBatConfiguration]):
def __init__(self,
name: str = "QCells Speicher",
type: str = "bat",
id: int = 0,
configuration: QCellsBatConfiguration = None) -> None:
super().__init__(name, type, id, configuration or QCellsBatConfiguration())


class QCellsCounterConfiguration:
def __init__(self):
pass


class QCellsCounterSetup(ComponentSetup[QCellsCounterConfiguration]):
def __init__(self,
name: str = "QCells Zähler",
type: str = "counter",
id: int = 0,
configuration: QCellsCounterConfiguration = None) -> None:
super().__init__(name, type, id, configuration or QCellsCounterConfiguration())


class QCellsInverterConfiguration:
def __init__(self):
pass


class QCellsInverterSetup(ComponentSetup[QCellsInverterConfiguration]):
def __init__(self,
name: str = "QCells Wechselrichter",
type: str = "inverter",
id: int = 0,
configuration: QCellsInverterConfiguration = None) -> None:
super().__init__(name, type, id, configuration or QCellsInverterConfiguration())
72 changes: 72 additions & 0 deletions packages/modules/devices/qcells/counter.py
@@ -0,0 +1,72 @@
#!/usr/bin/env python3
from typing import Dict, Union
from pymodbus.constants import Endian

from dataclass_utils import dataclass_from_dict
from modules.common import modbus
from modules.common.component_state import CounterState
from modules.common.component_type import ComponentDescriptor
from modules.common.fault_state import ComponentInfo
from modules.common.modbus import ModbusDataType
from modules.common.store import get_counter_value_store
from modules.devices.qcells.config import QCellsCounterSetup


class QCellsCounter:
def __init__(self,
component_config: Union[Dict, QCellsCounterSetup],
tcp_client: modbus.ModbusTcpClient_,
modbus_id: int) -> None:

self.component_config = dataclass_from_dict(QCellsCounterSetup, component_config)
self.__modbus_id = modbus_id
self.__tcp_client = tcp_client
self.store = get_counter_value_store(self.component_config.id)
self.component_info = ComponentInfo.from_component_config(self.component_config)

def update(self):
with self.__tcp_client:
power = self.__tcp_client.read_input_registers(0x0046, ModbusDataType.INT_32, wordorder=Endian.Little,
unit=self.__modbus_id) * -1
frequency = self.__tcp_client.read_input_registers(
0x0007, ModbusDataType.UINT_16, unit=self.__modbus_id) / 100
try:
powers = [-value for value in self.__tcp_client.read_input_registers(
0x0082, [ModbusDataType.INT_32] * 3, wordorder=Endian.Little, unit=self.__modbus_id
)]
except Exception:
powers = None
try:
voltages = [self.__tcp_client.read_input_registers(
0x006A, ModbusDataType.UINT_16, unit=self.__modbus_id
) / 10, self.__tcp_client.read_input_registers(
0x006E, ModbusDataType.UINT_16, unit=self.__modbus_id
) / 10, self.__tcp_client.read_input_registers(
0x0072, ModbusDataType.UINT_16, unit=self.__modbus_id
) / 10]
if voltages[0] < 1:
voltages[0] = 230
if voltages[1] < 1:
voltages[1] = 230
if voltages[2] < 1:
voltages[2] = 230
except Exception:
voltages = [230, 230, 230]
exported, imported = [value * 10
for value in self.__tcp_client.read_input_registers(
0x0048, [ModbusDataType.UINT_32] * 2,
wordorder=Endian.Little, unit=self.__modbus_id
)]

counter_state = CounterState(
imported=imported,
exported=exported,
power=power,
powers=powers,
frequency=frequency,
voltages=voltages,
)
self.store.set(counter_state)


component_descriptor = ComponentDescriptor(configuration_factory=QCellsCounterSetup)
54 changes: 54 additions & 0 deletions packages/modules/devices/qcells/device.py
@@ -0,0 +1,54 @@
#!/usr/bin/env python3
import logging
from typing import Iterable, Union

from modules.common.abstract_device import DeviceDescriptor
from modules.common.component_context import SingleComponentUpdateContext
from modules.common.configurable_device import ConfigurableDevice, ComponentFactoryByType, MultiComponentUpdater
from modules.common.modbus import ModbusTcpClient_
from modules.devices.qcells.bat import QCellsBat
from modules.devices.qcells.config import QCells, QCellsBatSetup, QCellsCounterSetup, QCellsInverterSetup
from modules.devices.qcells.counter import QCellsCounter
from modules.devices.qcells.inverter import QCellsInverter

log = logging.getLogger(__name__)


def create_device(device_config: QCells):
def create_bat_component(component_config: QCellsBatSetup):
return QCellsBat(component_config,
device_config.configuration.ip_address,
device_config.configuration.modbus_id)

def create_counter_component(component_config: QCellsCounterSetup):
return QCellsCounter(component_config,
device_config.configuration.ip_address,
device_config.configuration.modbus_id)

def create_inverter_component(component_config: QCellsInverterSetup):
return QCellsInverter(component_config,
device_config.configuration.ip_address,
device_config.configuration.modbus_id)

def update_components(components: Iterable[Union[QCellsBat, QCellsCounter, QCellsInverter]]):
with client as c:
for component in components:
with SingleComponentUpdateContext(component.component_info):
component.update(c)

try:
client = ModbusTcpClient_(device_config.configuration.ip_address, 502)
except Exception:
log.exception("Fehler in create_device")
return ConfigurableDevice(
device_config=device_config,
component_factory=ComponentFactoryByType(
bat=create_bat_component,
counter=create_counter_component,
inverter=create_inverter_component,
),
component_updater=MultiComponentUpdater(update_components)
)


device_descriptor = DeviceDescriptor(configuration_factory=QCells)
46 changes: 46 additions & 0 deletions packages/modules/devices/qcells/inverter.py
@@ -0,0 +1,46 @@
#!/usr/bin/env python3
from typing import Dict, Union
from pymodbus.constants import Endian

from dataclass_utils import dataclass_from_dict
from modules.common import modbus
from modules.common.component_state import InverterState
from modules.common.component_type import ComponentDescriptor
from modules.common.fault_state import ComponentInfo
from modules.common.modbus import ModbusDataType
from modules.common.store import get_inverter_value_store
from modules.devices.qcells.config import QCellsInverterSetup


class QCellsInverter:
def __init__(self,
component_config: Union[Dict, QCellsInverterSetup],
tcp_client: modbus.ModbusTcpClient_,
modbus_id: int) -> None:
self.component_config = dataclass_from_dict(QCellsInverterSetup, component_config)
self.__modbus_id = modbus_id
self.__tcp_client = tcp_client
self.store = get_inverter_value_store(self.component_config.id)
self.component_info = ComponentInfo.from_component_config(self.component_config)

def update(self) -> None:
with self.__tcp_client:
power_string1 = (self.__tcp_client.read_input_registers(
0x0003, ModbusDataType.INT_16, unit=self.__modbus_id) / 10) * \
(self.__tcp_client.read_input_registers(0x0005, ModbusDataType.INT_16, unit=self.__modbus_id) / 10)
power_string2 = (self.__tcp_client.read_input_registers(
0x0004, ModbusDataType.INT_16, unit=self.__modbus_id) / 10) * \
(self.__tcp_client.read_input_registers(0x0006, ModbusDataType.INT_16, unit=self.__modbus_id) / 10)
power = (power_string1 + power_string2) * -1

exported = self.__tcp_client.read_input_registers(0x0094, ModbusDataType.UINT_32, wordorder=Endian.Little,
unit=self.__modbus_id) * 100

inverter_state = InverterState(
power=power,
exported=exported
)
self.store.set(inverter_state)


component_descriptor = ComponentDescriptor(configuration_factory=QCellsInverterSetup)