diff --git a/examples/server_datamodel.py b/examples/server_datamodel.py index 4a7ce80ac..4b1c1c3b3 100755 --- a/examples/server_datamodel.py +++ b/examples/server_datamodel.py @@ -32,40 +32,41 @@ def define_datamodel(): assert SimData( 5, 10, 17, DataType.REGISTERS ) == SimData( - address=5, value=17, count=10, datatype=DataType.REGISTERS + address=5, values=17, count=10, datatype=DataType.REGISTERS ) # Define a group of coils/direct inputs non-shared (address=15..31 each 1 bit) - block1 = SimData(address=15, count=16, value=True, datatype=DataType.BITS) + #block1 = SimData(address=15, count=16, values=True, datatype=DataType.BITS) # Define a group of coils/direct inputs shared (address=15..31 each 16 bit) - block2 = SimData(address=15, count=16, value=0xFFFF, datatype=DataType.BITS) + #block2 = SimData(address=15, count=16, values=0xFFFF, datatype=DataType.BITS) # Define a group of holding/input registers (remark NO difference between shared and non-shared) - block3 = SimData(10, 1, 123.4, datatype=DataType.FLOAT32) - block4 = SimData(17, count=5, value=123, datatype=DataType.INT64) + #block3 = SimData(10, 1, 123.4, datatype=DataType.FLOAT32) + #block4 = SimData(17, count=5, values=123, datatype=DataType.INT64) block5 = SimData(27, 1, "Hello ", datatype=DataType.STRING) - block_def = SimData(0, count=1000, datatype=DataType.REGISTERS) + block_def = SimData(0, count=1000, datatype=DataType.REGISTERS, default=True) # SimDevice can be instantiated with positional or optional parameters: assert SimDevice( - 5,False, [block_def, block5] + 5, + [block_def, block5], ) == SimDevice( - id=5, type_check=False, block_shared=[block_def, block5] + id=5, type_check=False, registers=[block_def, block5] ) # SimDevice can define either a shared or a non-shared register model - SimDevice(1, False, block_shared=[block_def, block5]) - SimDevice(2, False, - block_coil=[block1], - block_direct=[block1], - block_holding=[block2], - block_input=[block3, block4]) + SimDevice(id=1, type_check=False, registers=[block_def, block5]) + #SimDevice(2, False, + # block_coil=[block1], + # block_direct=[block1], + # block_holding=[block2], + # block_input=[block3, block4]) # Remark: it is legal to reuse SimData, the object is only used for configuration, # not for runtime. # id=0 in a SimDevice act as a "catch all". Requests to an unknown id is executed in this SimDevice. - SimDevice(0, block_shared=[block2]) + #SimDevice(0, block_shared=[block2]) def main(): diff --git a/pymodbus/constants.py b/pymodbus/constants.py index 832fe31f3..9635e9dac 100644 --- a/pymodbus/constants.py +++ b/pymodbus/constants.py @@ -3,8 +3,9 @@ This is the single location for storing default values for the servers and clients. """ +from __future__ import annotations + import enum -from typing import Union INTERNAL_ERROR = "Pymodbus internal error" @@ -161,7 +162,7 @@ class DataType(enum.IntEnum): #: Registers == 2 bytes (identical to UINT16) REGISTERS = enum.auto() -DATATYPE_STRUCT: dict[DataType, tuple[Union[type, tuple[type, ...]], int]] = { # pylint: disable=consider-using-namedtuple-or-dataclass +DATATYPE_STRUCT: dict[DataType, tuple[type | tuple[type, type], int]] = { # pylint: disable=consider-using-namedtuple-or-dataclass DataType.INT16: (int, 1), DataType.UINT16: (int, 1), DataType.INT32: (int, 2), @@ -171,6 +172,6 @@ class DataType(enum.IntEnum): DataType.FLOAT32: (float, 2), DataType.FLOAT64: (float, 4), DataType.STRING: (str, -1), - DataType.BITS: ((list, int, bool), -2), + DataType.BITS: ((list, int), -2), DataType.REGISTERS: (int, 1), } diff --git a/pymodbus/simulator/__init__.py b/pymodbus/simulator/__init__.py index 26fe7ed12..3371d83a9 100644 --- a/pymodbus/simulator/__init__.py +++ b/pymodbus/simulator/__init__.py @@ -1,6 +1,7 @@ """Simulator.""" __all__ = [ + "SimAction", "SimCore", "SimData", "SimDevice", @@ -9,6 +10,7 @@ from .simcore import SimCore from .simdata import ( + SimAction, SimData, SimValueType, ) diff --git a/pymodbus/simulator/simcore.py b/pymodbus/simulator/simcore.py index 3f038f08f..7ef6a9bdc 100644 --- a/pymodbus/simulator/simcore.py +++ b/pymodbus/simulator/simcore.py @@ -5,8 +5,8 @@ from .simdevice import SimDevice -class SimCore: - """Datastore for the simulator/server.""" +class SimCore: # pylint: disable=too-few-public-methods + """Handler for the simulator/server.""" def __init__(self) -> None: """Build datastore.""" @@ -16,30 +16,3 @@ def __init__(self) -> None: def build_block(cls, _block: list[SimData]) -> tuple[int, int, int] | None: """Build registers for device.""" return None - - @classmethod - def build_config(cls, devices: list[SimDevice]) -> SimCore: - """Build devices/registers ready for server/simulator.""" - core = SimCore() - for dev in devices: - if dev.id in core.devices: - raise TypeError(f"device id {dev.id} defined multiple times.") - block_coil = block_direct = block_holding = block_input = block_shared = None - for cfg_block, _block in ( - (dev.block_coil, block_coil), - (dev.block_direct, block_direct), - (dev.block_holding, block_holding), - (dev.block_input, block_input), - (dev.block_shared, block_shared) - ): - if cfg_block: - cls.build_block(cfg_block) - - core.devices[dev.id] = dev - # block_coil, - # block_direct, - # block_holding, - # block_input, - # block_shared - #) - return core diff --git a/pymodbus/simulator/simdata.py b/pymodbus/simulator/simdata.py index b38e06c67..3d1e5af0e 100644 --- a/pymodbus/simulator/simdata.py +++ b/pymodbus/simulator/simdata.py @@ -4,17 +4,17 @@ import asyncio from collections.abc import Awaitable, Callable from dataclasses import dataclass -from typing import TypeAlias +from typing import TypeAlias, cast from pymodbus.constants import DATATYPE_STRUCT, DataType from pymodbus.pdu import ExceptionResponse -SimValueTypeSimple: TypeAlias = int | float | str | bool | bytes -SimValueType: TypeAlias = SimValueTypeSimple | list[SimValueTypeSimple] +SimValueTypeSimple: TypeAlias = int | float | str | bytes +SimValueType: TypeAlias = SimValueTypeSimple | list[SimValueTypeSimple | bool] SimAction: TypeAlias = Callable[[int, int, list[int]], Awaitable[list[int] | ExceptionResponse]] -@dataclass(frozen=True) +@dataclass(order=True, frozen=True) class SimData: """Configure a group of continuous identical values/registers. @@ -83,7 +83,7 @@ class SimData: #: Value/Values of datatype, #: will automatically be converted to registers, according to datatype. - value: SimValueType = 0 + values: SimValueType = 0 #: Used to check access and convert value to/from registers. datatype: DataType = DataType.REGISTERS @@ -104,22 +104,68 @@ class SimData: #: .. tip:: use functools.partial to add extra parameters if needed. action: SimAction | None = None + #: Mark register(s) as readonly. + readonly: bool = False - def __post_init__(self): - """Define a group of registers.""" - if not isinstance(self.address, int) or not 0 <= self.address < 65535: + #: Mark register(s) as invalid. + #: **remark** only to be used with address= and count= + invalid: bool = False + + #: Use as default for undefined registers + #: Define legal register range as: + #: + #: address= <= legal addresses <= address= + count= + #: + #: **remark** only to be used with address= and count= + default: bool = False + + #: The following are internal variables + register_count: int = -1 + type_size: int = -1 + + def __check_default(self): + """Check use of default=.""" + if self.datatype != DataType.REGISTERS: + raise TypeError("default=True only works with datatype=DataType.REGISTERS") + if isinstance(self.values, list): + raise TypeError("default=True only works with values=") + + def __check_simple(self): + """Check simple parameters.""" + if not isinstance(self.address, int) or not 0 <= self.address <= 65535: raise TypeError("0 <= address < 65535") - if not isinstance(self.count, int) or not 0 <= self.count < 65535: - raise TypeError("0 <= count < 65535") + if not isinstance(self.count, int) or not 1 <= self.count <= 65536: + raise TypeError("1 <= count < 65536") + if not 1 <= self.address + self.count <= 65536: + raise TypeError("1 <= address + count < 65536") if not isinstance(self.datatype, DataType): - raise TypeError("datatype must by an DataType") - if isinstance(self.value, list): - if self.count > 1 or self.datatype == DataType.STRING: - raise TypeError("count > 1 cannot be combined with given values=") - for entry in self.value: - if not isinstance(entry, DATATYPE_STRUCT[self.datatype][0]) or isinstance(entry, str): - raise TypeError(f"elements in values must be {self.datatype!s} and not string") - elif not isinstance(self.value, DATATYPE_STRUCT[self.datatype][0]): - raise TypeError(f"value must be {self.datatype!s}") + raise TypeError("datatype= must by an DataType") if self.action and not (callable(self.action) and asyncio.iscoroutinefunction(self.action)): - raise TypeError("action not a async function") + raise TypeError("action= not a async function") + if self.register_count != -1: + raise TypeError("register_count= is illegal") + if self.type_size != -1: + raise TypeError("type_size= is illegal") + + def __post_init__(self): + """Define a group of registers.""" + self.__check_simple() + if self.default: + self.__check_default() + x_datatype: type | tuple[type, type] + if self.datatype == DataType.STRING: + if not isinstance(self.values, str): + raise TypeError("datatype=DataType.STRING only allows values=\"string\"") + x_datatype, x_len = str, int((len(self.values) +1) / 2) + else: + x_datatype, x_len = DATATYPE_STRUCT[self.datatype] + if not isinstance(self.values, list): + super().__setattr__("values", [self.values]) + for x_value in cast(list, self.values): + if not isinstance(x_value, x_datatype): + raise TypeError(f"value= can only contain {x_datatype!s}") + super().__setattr__("register_count", self.count * x_len) + super().__setattr__("type_size", x_len) + + + diff --git a/pymodbus/simulator/simdevice.py b/pymodbus/simulator/simdevice.py index a1049e5ab..22ecf55a4 100644 --- a/pymodbus/simulator/simdevice.py +++ b/pymodbus/simulator/simdevice.py @@ -6,109 +6,126 @@ from .simdata import SimData -@dataclass(frozen=True) +OFFSET_NONE = (-1, -1, -1, -1) + +@dataclass(order=True, frozen=True) class SimDevice: """Configure a device with parameters and registers. - Registers can be defined as shared or as 4 separate blocks. - - shared_block means all requests access the same registers, - e.g. read_input_register and read_holding_register - give the same result. + Registers are always defined as one block. - .. warning:: Shared block cannot be mixed with non-shared blocks ! + Some old devices uses 4 distinct blocks instead of 1 block, to + support these devices, define 1 large block consisting of the + 4 blocks and use the offset_*= parameters. - In shared mode, individual coils/direct input are not addressed directly ! - Instead the register address is used with count and each register contains 16 bit. - In non-shared mode coils/direct input can be addressed directly individually and - each register contain 1 bit. + When using distinct blocks, coils and direct inputs are addressed differently, + each register represent 1 coil/relay. **Device with shared registers**:: SimDevice( id=1, - block_shared=[SimData(...)] + registers=[SimData(...)] ) **Device with non-shared registers**:: SimDevice( id=1, - block_coil=[SimData(...)], - block_direct=[SimData(...)], - block_holding=[SimData(...)], - block_input=[SimData(...)], + registers=[SimData(...)], + non_shared_mode=True, + offset_coil=0, + offset_direct=10, + offset_holding=20, + offset_input=30, ) + Meaning registers: + + - 0-9 are coils + - 10-19 are relays + - 20-29 are holding registers + - 30-.. are input registers + A server can contain either a single :class:`SimDevice` or list of :class:`SimDevice` to simulate a multipoint line. + + .. warning:: each block is sorted by address !! """ - #: Address of device + #: Address/id of device #: #: Default 0 means accept all devices, except those specifically defined. - id: int = 0 + id: int - #: Enforce type checking, if True access are controlled to be conform with datatypes. + #: List of registers. #: - #: Used to control that e.g. INT32 are not read as INT16. - type_check: bool = False + registers: list[SimData] - #: Use this block for shared registers (Modern devices). - #: - #: Requests accesses all registers in this block. - #: - #: .. warning:: cannot be used together with other block_* parameters! - block_shared: list[SimData] | None = None - - #: Use this block for devices which are divided in 4 blocks. - #: - #: In this block an address is a single coil, there are no registers. + #: Use this for old devices with 4 blocks. #: - #: Request of type read/write_coil accesses this block. - #: - #: .. tip:: block_coil/direct/holding/input must all be defined - block_coil: list[SimData] | None = None + #: .. tip:: content is (coil, direct, holding, input) + offset_address: tuple[int, int, int, int] = OFFSET_NONE - #: Use this block for devices which are divided in 4 blocks. - #: - #: In this block an address is a single relay, there are no registers. - #: - #: Request of type read/write_direct_input accesses this block. + #: Enforce type checking, if True access are controlled to be conform with datatypes. #: - #: .. tip:: block_coil/direct/holding/input must all be defined - block_direct: list[SimData] | None = None + #: Type violations like e.g. reading INT32 as INT16 are returned as ExceptionResponses, + #: as well as being logged. + type_check: bool = False - #: Use this block for devices which are divided in 4 blocks. - #: - #: In this block an address is a register. - #: - #: Request of type read/write_holding accesses this block. - #: - #: .. tip:: block_coil/direct/holding/input must all be defined - block_holding: list[SimData] | None = None - #: Use this block for non-shared registers (very old devices). - #: - #: In this block an address is a register. - #: - #: Request of type read/write_input accesses this block. - #: - #: .. tip:: block_coil/direct/holding/input must all be defined - block_input: list[SimData] | None = None + def __check_block(self, block: list[SimData]) -> list[SimData]: + """Check block content.""" + for inx, entry in enumerate(block): + if not isinstance(entry, SimData): + raise TypeError(f"registers[{inx}]= is a SimData entry") + block.sort(key=lambda x: x.address) + return self.__check_block_entries(block) + + def __check_block_entries(self, block: list[SimData]) -> list[SimData]: + """Check block entries.""" + last_address = -1 + if len(block) > 1 and block[1].default: + temp = block[0] + block[0] = block[1] + block[1] = temp + first = True + for entry in block: + if entry.default: + if first: + first = False + continue + raise TypeError("Multiple default SimData, not allowed") + first = False + if entry.address <= last_address: + raise TypeError("SimData address {entry.address} is overlapping!") + last_address = entry.address + entry.register_count -1 + if not block[0].default: + default = SimData(address=block[0].address, count=last_address - block[0].address +1, default=True) + block.insert(0, default) + max_address = block[0].address + block[0].register_count -1 + if last_address > max_address: + raise TypeError("Default set max address {max_address} but {last_address} is defined?") + if len(block) > 1 and block[0].address > block[1].address: + raise TypeError("Default set lowest address to {block[0].address} but {block[1].address} is defined?") + return block def __post_init__(self): """Define a device.""" - if not isinstance(self.id, int) or 255 < self.id < 0: + if not isinstance(self.id, int) or not 0 <= self.id <= 255: raise TypeError("0 <= id < 255") - blocks = (self.block_coil, self.block_direct, self.block_holding, self.block_input) - if self.block_shared: - if not isinstance(self.block_shared, list): - raise TypeError("block_shared must be a list") - for entry in blocks: - if entry: - raise TypeError(f"{entry} cannot be combined with block_shared") - return - for entry in blocks: - if not entry or not isinstance(entry, list): - raise TypeError(f"{entry} not defined or not a list") + if not isinstance(self.registers, list) or not self.registers: + raise TypeError("registers= not a list") + if not isinstance(self.type_check, bool): + raise TypeError("type_check= not a bool") + super().__setattr__("registers", self.__check_block(self.registers)) + if self.offset_address != OFFSET_NONE: + if len(self.offset_address) != 4: + raise TypeError("offset_address= must have 4 addresses") + reg_start = self.registers[0].address + reg_end = self.registers[0].address + self.registers[0].register_count + for i in range(4): + if not (reg_start < self.offset_address[i] < reg_end): + raise TypeError(f"offset_address[{i}] outside defined range") + if i and self.offset_address[i-1] >= self.offset_address[i]: + raise TypeError("offset_address= must be ascending addresses") diff --git a/pymodbus/simulator/simruntime.py b/pymodbus/simulator/simruntime.py new file mode 100644 index 000000000..286bc5951 --- /dev/null +++ b/pymodbus/simulator/simruntime.py @@ -0,0 +1,71 @@ +"""Simulator data model implementation.""" +from __future__ import annotations + +from dataclasses import dataclass + +from .simdata import SimAction, SimData +from .simdevice import SimDevice + + +FLAG_REGISTERS = 2^8 -1 # bits 0-3 is datatype size +FLAG_INVALID = 2^8 # bit 4, neither read nor write is allowed +FLAG_READONLY = 2^16 # bit 5, only read is allowed +FLAG_NO_DIRECT = 2^32 # bit 6, part of a Datatype e.g. INT32 +FLAG_ACTION = 2^64 # bit 7, Action defined + +@dataclass(order=True) +class SimRuntimeRegister: + """Datastore for a single register.""" + + flags: int = 0 + register: int = 0 + +@dataclass(order=True) +class SimRuntimeDefault(SimRuntimeRegister): + """Datastore for not defined registers.""" + + start_address: int = 0 + end_address: int = 0 + +@dataclass(order=True) +class SimRuntimeAction: + """Datastore for a single action.""" + + start_address: int + end_address: int + action: SimAction + +@dataclass(order=True) +class SimRuntimeBlock: + """Datastore for a continuous block of registers.""" + + start_address: int + end_address: int + registers: list[SimRuntimeRegister] + actions: list[SimRuntimeAction] + +@dataclass(order=True) +class SimRuntimeDevice: + """Datastore for a device.""" + + register_blocks: list[SimRuntimeBlock] + type_check: bool + offset_coil: int = 0 + offset_direct: int = 0 + offset_holding: int = 0 + offset_input: int = 0 + +class SimSetupRuntime: + """Helper class to convert SimData/SimDevice to runtime data.""" + + def __init__(self, devices: list[SimDevice]) -> None: + """Register SimDevice(s).""" + self.configDevices = devices + self.runtimeDevices: dict[int, SimRuntimeDevice] = {} + + def prepare_block(self, _block: list[SimData], _name: str, _device_id: int) -> tuple[list[SimRuntimeBlock], list[SimRuntimeAction]]: + """Prepare blocks.""" + return ([], []) + + def build_runtime(self): + """Build runtime classes.""" diff --git a/test/global/test_logging.py b/test/global/test_logging.py index c4d0d7da4..fbac628a0 100644 --- a/test/global/test_logging.py +++ b/test/global/test_logging.py @@ -20,7 +20,7 @@ class TestLogging: @classmethod def teardown_class(cls): """Remove test file.""" - if "CI" not in os.environ: + if "CI" not in os.environ: # pragma: no cover os.remove(cls.LOG_FILE) def test_log_dont_call_build_msg(self): diff --git a/test/simulator/test_simcore.py b/test/simulator/test_simcore.py index 99a7b5d6e..68f590bfb 100644 --- a/test/simulator/test_simcore.py +++ b/test/simulator/test_simcore.py @@ -12,10 +12,12 @@ def test_instanciate(self): """Test that simdata can be objects.""" SimCore() + @pytest.mark.skip def test_build_block(self): """Test that simdata can be objects.""" SimCore.build_block(None) + @pytest.mark.skip def test_build_config(self): """Test that simdata can be objects.""" device = SimDevice(17, block_shared=[SimData(0)]) diff --git a/test/simulator/test_simdata.py b/test/simulator/test_simdata.py index 87a22ce9d..b42666a9a 100644 --- a/test/simulator/test_simdata.py +++ b/test/simulator/test_simdata.py @@ -8,70 +8,96 @@ class TestSimData: """Test simulator data config.""" - def test_instanciate(self): - """Test that simdata can be objects.""" - SimData(0) + async def async_dummy_action(self): + """Set action.""" - @pytest.mark.parametrize("address", ["not ok", 1.0, -1, 70000]) - def test_simdata_address(self, address): - """Test simdata address.""" - with pytest.raises(TypeError): - SimData(address) - SimData(0) - SimData(2^16 -1) + def dummy_action(self): + """Set action.""" - @pytest.mark.parametrize("count", ["not ok", 1.0, -1, 70000]) - def test_simdata_count(self, count): - """Test simdata count.""" - with pytest.raises(TypeError): - SimData(0, count=count) - SimData(0, count=1) - SimData(0, count=2^16 -1) + @pytest.mark.parametrize("kwargs", [ + {"address": 0}, + {"address": 65535}, + {"address": 65535, "count": 1}, + {"address": 0, "count": 65536}, + {"address": 1, "count": 65535}, + {"address": 1, "count": 10, "invalid": True}, + {"address": 2, "count": 10, "default": True}, + {"address": 3, "count": 10, "readonly": True}, + {"address": 4, "datatype": DataType.INT16, "values": 17}, + {"address": 5, "datatype": DataType.INT16, "values": [17, 18]}, + {"address": 6, "count": 10, "datatype": DataType.INT16, "values": [17, 18]}, + {"address": 7, "datatype": DataType.STRING, "values": "test"}, + {"address": 8, "count": 10, "datatype": DataType.STRING, "values": "test"}, + {"address": 9, "action": async_dummy_action}, + {"address": 0, "datatype": DataType.REGISTERS, "values": 17, "count": 5, "default": True}, + {"address": 1, "datatype": DataType.INT16, "values": 17, "invalid": True}, + {"address": 3, "datatype": DataType.INT16, "values": 17, "readonly": True}, + {"address": 0, "count": 2^16 -1}, + {"address": 4, "datatype": DataType.BITS}, + {"address": 4, "datatype": DataType.BITS, "values": 117}, + {"address": 1, "datatype": DataType.BITS, "values": True}, + {"address": 4, "datatype": DataType.BITS, "values": [True, True]}, + {"address": 2, "values": 17, "default": True}, + ]) + def test_simdata_instanciate(self, kwargs): + """Test that simdata can be objects.""" + SimData(**kwargs) - @pytest.mark.parametrize("datatype", ["not ok", 1.0, 11]) - def test_simdata_datatype(self, datatype): - """Test simdata datatype.""" + @pytest.mark.parametrize("kwargs", [ + {"address": "not ok"}, + {"address": 1.0}, + {"address": -1}, + {"address": 70000}, + {"address": 1, "count": 65536}, + {"address": 65535, "count": 2}, + {"address": 1, "count": "not ok"}, + {"address": 1, "count": 1.0}, + {"address": 1, "count": -1}, + {"address": 1, "count": 70000}, + {"address": 1, "count": 0}, + {"address": 1, "datatype": "not ok"}, + {"address": 1, "datatype": 11}, + {"address": 1, "action": "not ok"}, + {"address": 1, "action": dummy_action}, + {"address": 1, "register_count": 117}, + {"address": 1, "type_size": 117}, + {"address": 2, "datatype": DataType.INT16, "count": 10, "default": True}, + {"address": 2, "values": [17], "count": 10, "default": True}, + ]) + def test_simdata_not_ok(self, kwargs): + """Test that simdata can be objects.""" with pytest.raises(TypeError): - SimData(0, datatype=datatype) - SimData(0, datatype=DataType.BITS) + SimData(**kwargs) @pytest.mark.parametrize(("value", "value_type"), [ ("ok str", DataType.STRING), (1.0, DataType.FLOAT32), + ([1.0, 2.0], DataType.FLOAT32), + (1, DataType.INT32), + ([1, 2], DataType.INT32), (11, DataType.REGISTERS), + ([11, 12], DataType.REGISTERS), (True, DataType.BITS), ([True, False], DataType.BITS), ]) def test_simdata_value_ok(self, value, value_type): """Test simdata value.""" - SimData(0, value=value, datatype=value_type) + SimData(0, values=value, datatype=value_type) @pytest.mark.parametrize(("value", "value_type"), [ - ({0: 1}, DataType.REGISTERS), - ((1, 0), DataType.REGISTERS), - (123, DataType.STRING), - (["ab", "cd"], DataType.STRING), - ("", DataType.INT16), - ([123, ""], DataType.INT16), - (123, DataType.FLOAT32), - (123.0, DataType.BITS), - (123.0, DataType.REGISTERS), + (["ok str", "ok2"], DataType.STRING), + (1, DataType.STRING), + (1, DataType.FLOAT32), + ([1.0, 2], DataType.FLOAT32), + (1.0, DataType.INT32), + ([1, 2.0], DataType.INT32), + ("not ok", DataType.REGISTERS), + (1.0, DataType.REGISTERS), + ([11, 12.0], DataType.REGISTERS), + (1.0, DataType.BITS), + ([True, 1.0], DataType.BITS), ]) def test_simdata_value_not_ok(self, value, value_type): """Test simdata value.""" with pytest.raises(TypeError): - SimData(0, value=value, datatype=value_type) - - def test_simdata_action(self): - """Test simdata action.""" - async def async_dummy_action(): - """Set action.""" - - def dummy_action(): - """Set action.""" - - with pytest.raises(TypeError): - SimData(0, action="not_ok") - with pytest.raises(TypeError): - SimData(0, action=dummy_action) - SimData(0, action=async_dummy_action) + SimData(0, values=value, datatype=value_type) diff --git a/test/simulator/test_simdevice.py b/test/simulator/test_simdevice.py index b3d89c8d6..f3e36693f 100644 --- a/test/simulator/test_simdevice.py +++ b/test/simulator/test_simdevice.py @@ -8,38 +8,73 @@ class TestSimDevice: """Test simulator device config.""" - def test_instanciate(self): - """Test that simdata can be objects.""" - a = SimData(0) - SimDevice(0, block_shared=[a]) - - @pytest.mark.parametrize("id", ["not ok", 1.0, 256]) - def test_simid(self, id): - """Test device id.""" - with pytest.raises(TypeError): - SimDevice(id=id) - SimDevice(id=1, block_shared=[SimData(0)]) + simdata1 = SimData(0, values=15) + simdata2 = SimData(1, values=16) + simdatadef = SimData(0, values=17, count=10, default=True) - def test_block_shared(self): - """Test shared block.""" - with pytest.raises(TypeError): - SimDevice(id=1, block_shared=[SimData(0)], block_coil=[SimData(0)]) - SimDevice(id=1, block_shared=[SimData(0)]) + @pytest.mark.parametrize("kwargs", [ + {"id": 0, "type_check": True, "registers": [simdata1]}, + {"id": 0, "registers": [simdatadef], "offset_address": (1, 4, 6, 8)}, + ]) + def test_simdevice_instanciate(self, kwargs): + """Test that simdata can be objects.""" + SimDevice(**kwargs) - def test_block_non_shared(self): - """Test non-shared block.""" + @pytest.mark.parametrize("kwargs", [ + {"registers": [simdata1]}, + {"id": 0}, + {"id": "not ok", "registers": [simdata1]}, + {"id": 1.0, "registers": [simdata1]}, + {"id": 256, "registers": [simdata1]}, + {"id": -1, "registers": [simdata1]}, + {"id": 1, "registers": []}, + {"id": 0, "registers": [SimData(1, 10, default=True), SimData(200)]}, + {"id": 0, "registers": [SimData(10, 10, default=True), SimData(2)]}, + {"id": 0, "registers": [SimData(1, 2), SimData(2)]}, + {"id": 0, "registers": [simdatadef, SimData(2, 10, default=True)]}, + {"id": 0, "registers": [simdata1], "type_check": "jan"}, + {"id": 0, "registers": [simdatadef], "offset_address": ()}, + {"id": 0, "registers": [simdatadef], "offset_address": (1, 2, 3)}, + {"id": 0, "registers": [simdatadef], "offset_address": (1, 3, 2, 4)}, + {"id": 0, "registers": [simdatadef], "offset_address": (1, 3, 2, 20)}, + {"id": 0, "registers": [SimData(1, 10, default=True)], "offset_address": (1, 3, 2, 15)}, + {"id": 0, "registers": [SimData(10, 10, default=True)], "offset_address": (1, 3, 2, 4)}, + ]) + def test_simdevice_not_ok(self, kwargs): + """Test that simdata can be objects.""" with pytest.raises(TypeError): - SimDevice(id=1, block_coil=[SimData(0)]) - SimDevice(id=1, - block_coil=[SimData(0)], - block_direct=[SimData(0)], - block_holding=[SimData(0)], - block_input=[SimData(0)], - ) + SimDevice(**kwargs) def test_wrong_block(self): """Test that simdata can be objects.""" with pytest.raises(TypeError): - SimDevice(id=1, block_shared=SimData(0)) + SimDevice(id=1, registers=SimData(0)) with pytest.raises(TypeError): - SimDevice(id=1, block_coil=["no valid"]) + SimDevice(id=1, registers=["no valid"]) + + @pytest.mark.parametrize(("block", "expect"), [ + ([simdata1], 0), + ([simdatadef], 1), + ([simdatadef, simdata1], 1), + ([simdata1, simdatadef], 1), + ([simdata1, simdata2, simdatadef], 1), + ]) + def test_simdevice_block(self, block, expect): + """Test that simdata can be objects.""" + if not expect: + a = SimDevice(id=0, registers=block) + assert a.registers[0].values == [0] + assert a.registers[0].default + assert not a.registers[1].default + elif expect == 1: + a = SimDevice(id=0, registers=block) + assert a.registers[0].values == [17] + assert a.registers[0].default + if len(a.registers) > 1: + assert not a.registers[1].default + else: # expect == 2: + with pytest.raises(TypeError): + SimDevice(id=0, registers=block) + + +# test BITS shared/non-shared !