Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions examples/server_datamodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,40 +32,41 @@ def define_datamodel():
assert SimData(
5, 10, 17, DataType.REGISTERS
) == SimData(
address=5, value=17, count=10, datatype=DataType.REGISTERS
address=5, values=17, count=10, datatype=DataType.REGISTERS
)

# Define a group of coils/direct inputs non-shared (address=15..31 each 1 bit)
block1 = SimData(address=15, count=16, value=True, datatype=DataType.BITS)
#block1 = SimData(address=15, count=16, values=True, datatype=DataType.BITS)
# Define a group of coils/direct inputs shared (address=15..31 each 16 bit)
block2 = SimData(address=15, count=16, value=0xFFFF, datatype=DataType.BITS)
#block2 = SimData(address=15, count=16, values=0xFFFF, datatype=DataType.BITS)

# Define a group of holding/input registers (remark NO difference between shared and non-shared)
block3 = SimData(10, 1, 123.4, datatype=DataType.FLOAT32)
block4 = SimData(17, count=5, value=123, datatype=DataType.INT64)
#block3 = SimData(10, 1, 123.4, datatype=DataType.FLOAT32)
#block4 = SimData(17, count=5, values=123, datatype=DataType.INT64)
block5 = SimData(27, 1, "Hello ", datatype=DataType.STRING)

block_def = SimData(0, count=1000, datatype=DataType.REGISTERS)
block_def = SimData(0, count=1000, datatype=DataType.REGISTERS, default=True)

# SimDevice can be instantiated with positional or optional parameters:
assert SimDevice(
5,False, [block_def, block5]
5,
[block_def, block5],
) == SimDevice(
id=5, type_check=False, block_shared=[block_def, block5]
id=5, type_check=False, registers=[block_def, block5]
)

# SimDevice can define either a shared or a non-shared register model
SimDevice(1, False, block_shared=[block_def, block5])
SimDevice(2, False,
block_coil=[block1],
block_direct=[block1],
block_holding=[block2],
block_input=[block3, block4])
SimDevice(id=1, type_check=False, registers=[block_def, block5])
#SimDevice(2, False,
# block_coil=[block1],
# block_direct=[block1],
# block_holding=[block2],
# block_input=[block3, block4])
# Remark: it is legal to reuse SimData, the object is only used for configuration,
# not for runtime.

# id=0 in a SimDevice act as a "catch all". Requests to an unknown id is executed in this SimDevice.
SimDevice(0, block_shared=[block2])
#SimDevice(0, block_shared=[block2])


def main():
Expand Down
7 changes: 4 additions & 3 deletions pymodbus/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
This is the single location for storing default
values for the servers and clients.
"""
from __future__ import annotations

import enum
from typing import Union


INTERNAL_ERROR = "Pymodbus internal error"
Expand Down Expand Up @@ -161,7 +162,7 @@ class DataType(enum.IntEnum):
#: Registers == 2 bytes (identical to UINT16)
REGISTERS = enum.auto()

DATATYPE_STRUCT: dict[DataType, tuple[Union[type, tuple[type, ...]], int]] = { # pylint: disable=consider-using-namedtuple-or-dataclass
DATATYPE_STRUCT: dict[DataType, tuple[type | tuple[type, type], int]] = { # pylint: disable=consider-using-namedtuple-or-dataclass
DataType.INT16: (int, 1),
DataType.UINT16: (int, 1),
DataType.INT32: (int, 2),
Expand All @@ -171,6 +172,6 @@ class DataType(enum.IntEnum):
DataType.FLOAT32: (float, 2),
DataType.FLOAT64: (float, 4),
DataType.STRING: (str, -1),
DataType.BITS: ((list, int, bool), -2),
DataType.BITS: ((list, int), -2),
DataType.REGISTERS: (int, 1),
}
2 changes: 2 additions & 0 deletions pymodbus/simulator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Simulator."""

__all__ = [
"SimAction",
"SimCore",
"SimData",
"SimDevice",
Expand All @@ -9,6 +10,7 @@

from .simcore import SimCore
from .simdata import (
SimAction,
SimData,
SimValueType,
)
Expand Down
31 changes: 2 additions & 29 deletions pymodbus/simulator/simcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from .simdevice import SimDevice


class SimCore:
"""Datastore for the simulator/server."""
class SimCore: # pylint: disable=too-few-public-methods
"""Handler for the simulator/server."""

def __init__(self) -> None:
"""Build datastore."""
Expand All @@ -16,30 +16,3 @@ def __init__(self) -> None:
def build_block(cls, _block: list[SimData]) -> tuple[int, int, int] | None:
"""Build registers for device."""
return None

@classmethod
def build_config(cls, devices: list[SimDevice]) -> SimCore:
"""Build devices/registers ready for server/simulator."""
core = SimCore()
for dev in devices:
if dev.id in core.devices:
raise TypeError(f"device id {dev.id} defined multiple times.")
block_coil = block_direct = block_holding = block_input = block_shared = None
for cfg_block, _block in (
(dev.block_coil, block_coil),
(dev.block_direct, block_direct),
(dev.block_holding, block_holding),
(dev.block_input, block_input),
(dev.block_shared, block_shared)
):
if cfg_block:
cls.build_block(cfg_block)

core.devices[dev.id] = dev
# block_coil,
# block_direct,
# block_holding,
# block_input,
# block_shared
#)
return core
86 changes: 66 additions & 20 deletions pymodbus/simulator/simdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
import asyncio
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import TypeAlias
from typing import TypeAlias, cast

from pymodbus.constants import DATATYPE_STRUCT, DataType
from pymodbus.pdu import ExceptionResponse


SimValueTypeSimple: TypeAlias = int | float | str | bool | bytes
SimValueType: TypeAlias = SimValueTypeSimple | list[SimValueTypeSimple]
SimValueTypeSimple: TypeAlias = int | float | str | bytes
SimValueType: TypeAlias = SimValueTypeSimple | list[SimValueTypeSimple | bool]
SimAction: TypeAlias = Callable[[int, int, list[int]], Awaitable[list[int] | ExceptionResponse]]

@dataclass(frozen=True)
@dataclass(order=True, frozen=True)
class SimData:
"""Configure a group of continuous identical values/registers.

Expand Down Expand Up @@ -83,7 +83,7 @@ class SimData:

#: Value/Values of datatype,
#: will automatically be converted to registers, according to datatype.
value: SimValueType = 0
values: SimValueType = 0

#: Used to check access and convert value to/from registers.
datatype: DataType = DataType.REGISTERS
Expand All @@ -104,22 +104,68 @@ class SimData:
#: .. tip:: use functools.partial to add extra parameters if needed.
action: SimAction | None = None

#: Mark register(s) as readonly.
readonly: bool = False

def __post_init__(self):
"""Define a group of registers."""
if not isinstance(self.address, int) or not 0 <= self.address < 65535:
#: Mark register(s) as invalid.
#: **remark** only to be used with address= and count=
invalid: bool = False

#: Use as default for undefined registers
#: Define legal register range as:
#:
#: address= <= legal addresses <= address= + count=
#:
#: **remark** only to be used with address= and count=
default: bool = False

#: The following are internal variables
register_count: int = -1
type_size: int = -1

def __check_default(self):
"""Check use of default=."""
if self.datatype != DataType.REGISTERS:
raise TypeError("default=True only works with datatype=DataType.REGISTERS")
if isinstance(self.values, list):
raise TypeError("default=True only works with values=<integer>")

def __check_simple(self):
"""Check simple parameters."""
if not isinstance(self.address, int) or not 0 <= self.address <= 65535:
raise TypeError("0 <= address < 65535")
if not isinstance(self.count, int) or not 0 <= self.count < 65535:
raise TypeError("0 <= count < 65535")
if not isinstance(self.count, int) or not 1 <= self.count <= 65536:
raise TypeError("1 <= count < 65536")
if not 1 <= self.address + self.count <= 65536:
raise TypeError("1 <= address + count < 65536")
if not isinstance(self.datatype, DataType):
raise TypeError("datatype must by an DataType")
if isinstance(self.value, list):
if self.count > 1 or self.datatype == DataType.STRING:
raise TypeError("count > 1 cannot be combined with given values=")
for entry in self.value:
if not isinstance(entry, DATATYPE_STRUCT[self.datatype][0]) or isinstance(entry, str):
raise TypeError(f"elements in values must be {self.datatype!s} and not string")
elif not isinstance(self.value, DATATYPE_STRUCT[self.datatype][0]):
raise TypeError(f"value must be {self.datatype!s}")
raise TypeError("datatype= must by an DataType")
if self.action and not (callable(self.action) and asyncio.iscoroutinefunction(self.action)):
raise TypeError("action not a async function")
raise TypeError("action= not a async function")
if self.register_count != -1:
raise TypeError("register_count= is illegal")
if self.type_size != -1:
raise TypeError("type_size= is illegal")

def __post_init__(self):
"""Define a group of registers."""
self.__check_simple()
if self.default:
self.__check_default()
x_datatype: type | tuple[type, type]
if self.datatype == DataType.STRING:
if not isinstance(self.values, str):
raise TypeError("datatype=DataType.STRING only allows values=\"string\"")
x_datatype, x_len = str, int((len(self.values) +1) / 2)
else:
x_datatype, x_len = DATATYPE_STRUCT[self.datatype]
if not isinstance(self.values, list):
super().__setattr__("values", [self.values])
for x_value in cast(list, self.values):
if not isinstance(x_value, x_datatype):
raise TypeError(f"value= can only contain {x_datatype!s}")
super().__setattr__("register_count", self.count * x_len)
super().__setattr__("type_size", x_len)



Loading