diff --git a/tests/test_api.py b/tests/test_api.py index 9ce5f77..29b14a6 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -398,3 +398,19 @@ async def test_version(protocol_ver, firmware_version, flags, api): def test_handle_version(api): api._handle_version([mock.sentinel.version]) + + +@pytest.mark.parametrize( + "data, network_state", + ((0x00, "OFFLINE"), (0x01, "JOINING"), (0x02, "CONNECTED"), (0x03, "LEAVING")), +) +def test_device_state_network_state(data, network_state): + """Test device state flag.""" + extra = b"the rest of the data\xaa\x55" + + for other_fields in (0x04, 0x08, 0x0C, 0x10, 0x24, 0x28, 0x30, 0x2C): + new_data = t.uint8_t(data | other_fields).serialize() + state, rest = deconz_api.DeviceState.deserialize(new_data + extra) + assert rest == extra + assert state.network_state == deconz_api.NetworkState[network_state] + assert state.serialize() == new_data diff --git a/tests/test_application.py b/tests/test_application.py index c5031b4..c2dd662 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -10,12 +10,14 @@ import zigpy_deconz.zigbee.application as application from zigpy.types import EUI64 from zigpy_deconz import types as t -from zigpy_deconz.api import Deconz +import zigpy_deconz.api as deconz_api @pytest.fixture def app(monkeypatch, database_file=None): - app = application.ControllerApplication(Deconz(), database_file=database_file) + app = application.ControllerApplication( + deconz_api.Deconz(), database_file=database_file + ) return app @@ -172,11 +174,11 @@ async def test_form_network(app): side_effect=asyncio.coroutine(mock.MagicMock()) ) - app._api.network_state = 2 + app._api._device_state = deconz_api.DeviceState(2) await app.form_network() assert app._api.device_state.call_count == 0 - app._api.network_state = 0 + app._api._device_state = deconz_api.DeviceState(0) application.CHANGE_NETWORK_WAIT = 0.001 with pytest.raises(Exception): await app.form_network() diff --git a/zigpy_deconz/api.py b/zigpy_deconz/api.py index 5b3c69d..977cc4c 100644 --- a/zigpy_deconz/api.py +++ b/zigpy_deconz/api.py @@ -2,6 +2,7 @@ import logging import enum import binascii +import typing from . import uart from . import types as t @@ -14,6 +15,46 @@ MIN_PROTO_VERSION = 0x010B +class Status(t.uint8_t, enum.Enum): + SUCCESS = 0 + FAILURE = 1 + BUSY = 2 + TIMEOUT = 3 + UNSUPPORTED = 4 + ERROR = 5 + NO_NETWORK = 6 + INVALID_VALUE = 7 + + +class DeviceState(enum.IntFlag): + APSDE_DATA_CONFIRM = 0x04 + APSDE_DATA_INDICATION = 0x08 + CONF_CHANGED = 0x10 + APSDE_DATA_REQUEST_SLOTS_AVAILABLE = 0x20 + + @classmethod + def deserialize(cls, data) -> typing.Tuple["DeviceState", bytes]: + """Deserialize DevceState.""" + state, data = t.uint8_t.deserialize(data) + return cls(state), data + + def serialize(self) -> bytes: + """Serialize data.""" + return t.uint8_t(self).serialize() + + @property + def network_state(self) -> "NetworkState": + """Return network state.""" + return NetworkState(self & 0x03) + + +class NetworkState(t.uint8_t, enum.Enum): + OFFLINE = 0 + JOINING = 1 + CONNECTED = 2 + LEAVING = 3 + + class Command(t.uint8_t, enum.Enum): aps_data_confirm = 0x04 device_state = 0x07 @@ -55,7 +96,7 @@ class Command(t.uint8_t, enum.Enum): Command.aps_data_confirm: ( ( t.uint16_t, - t.uint8_t, + DeviceState, t.uint8_t, t.DeconzAddressEndpoint, t.uint8_t, @@ -70,7 +111,7 @@ class Command(t.uint8_t, enum.Enum): Command.aps_data_indication: ( ( t.uint16_t, - t.uint8_t, + DeviceState, t.DeconzAddress, t.uint8_t, t.DeconzAddress, @@ -89,10 +130,10 @@ class Command(t.uint8_t, enum.Enum): ), True, ), - Command.aps_data_request: ((t.uint16_t, t.uint8_t, t.uint8_t), True), + Command.aps_data_request: ((t.uint16_t, DeviceState, t.uint8_t), True), Command.change_network_state: ((t.uint8_t,), True), - Command.device_state: ((t.uint8_t, t.uint8_t, t.uint8_t), True), - Command.device_state_changed: ((t.uint8_t, t.uint8_t), False), + Command.device_state: ((DeviceState, t.uint8_t, t.uint8_t), True), + Command.device_state_changed: ((DeviceState, t.uint8_t), False), Command.mac_poll: ((t.uint16_t, t.DeconzAddress, t.uint8_t, t.int8s), False), Command.read_parameter: ((t.uint16_t, t.uint8_t, t.Bytes), True), Command.simplified_beacon: ( @@ -142,36 +183,6 @@ class NetworkParameter(t.uint8_t, enum.Enum): } -class Status(t.uint8_t, enum.Enum): - SUCCESS = 0 - FAILURE = 1 - BUSY = 2 - TIMEOUT = 3 - UNSUPPORTED = 4 - ERROR = 5 - NO_NETWORK = 6 - INVALID_VALUE = 7 - - -class DeviceState(t.uint8_t, enum.Enum): - APSDE_DATA_CONFIRM = 0x04 - APSDE_DATA_INDICATION = 0x08 - CONF_CHANGED = 0x10 - APSDE_DATA_REQUEST = 0x20 - - @classmethod - def flags(cls, value: int): - """Make it into list of flags, until we deprecate py35 and py36.""" - return [flag for flag in cls if (value & flag) == flag] - - -class NetworkState(t.uint8_t, enum.Enum): - OFFLINE = 0 - JOINING = 1 - CONNECTED = 2 - LEAVING = 3 - - class Deconz: def __init__(self): self._uart = None @@ -179,12 +190,17 @@ def __init__(self): self._awaiting = {} self._app = None self._cmd_mode_future = None - self.network_state = NetworkState.OFFLINE + self._device_state = DeviceState(NetworkState.OFFLINE) self._data_indication = False self._data_confirm = False self._proto_ver = None self._aps_data_ind_flags = 0x01 + @property + def network_state(self) -> NetworkState: + """Return current network state.""" + return self._device_state.network_state + @property def protocol_version(self): """Protocol Version.""" @@ -433,20 +449,20 @@ def _handle_simplified_beacon(self, data): data[5], ) - def _handle_device_state_value(self, value): - flags = DeviceState.flags(value) - ns = NetworkState(value & 0x03) - if ns != self.network_state: + def _handle_device_state_value(self, state: DeviceState) -> None: + if state.network_state != self.network_state: LOGGER.debug( - "Network state transition: %s -> %s", self.network_state.name, ns.name + "Network state transition: %s -> %s", + self.network_state.name, + state.network_state.name, ) - self.network_state = ns - if DeviceState.APSDE_DATA_REQUEST not in flags: + self._device_state = state + if DeviceState.APSDE_DATA_REQUEST_SLOTS_AVAILABLE not in state: LOGGER.debug("Data request queue full.") - if DeviceState.APSDE_DATA_INDICATION in flags and not self._data_indication: + if DeviceState.APSDE_DATA_INDICATION in state and not self._data_indication: self._data_indication = True asyncio.ensure_future(self._aps_data_indication()) - if DeviceState.APSDE_DATA_CONFIRM in flags and not self._data_confirm: + if DeviceState.APSDE_DATA_CONFIRM in state and not self._data_confirm: self._data_confirm = True asyncio.ensure_future(self._aps_data_confirm())