Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,3 +398,19 @@ async def test_version(protocol_ver, firmware_version, flags, api):

def test_handle_version(api):
api._handle_version([mock.sentinel.version])


@pytest.mark.parametrize(
"data, network_state",
((0x00, "OFFLINE"), (0x01, "JOINING"), (0x02, "CONNECTED"), (0x03, "LEAVING")),
)
def test_device_state_network_state(data, network_state):
"""Test device state flag."""
extra = b"the rest of the data\xaa\x55"

for other_fields in (0x04, 0x08, 0x0C, 0x10, 0x24, 0x28, 0x30, 0x2C):
new_data = t.uint8_t(data | other_fields).serialize()
state, rest = deconz_api.DeviceState.deserialize(new_data + extra)
assert rest == extra
assert state.network_state == deconz_api.NetworkState[network_state]
assert state.serialize() == new_data
10 changes: 6 additions & 4 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import zigpy_deconz.zigbee.application as application
from zigpy.types import EUI64
from zigpy_deconz import types as t
from zigpy_deconz.api import Deconz
import zigpy_deconz.api as deconz_api


@pytest.fixture
def app(monkeypatch, database_file=None):
app = application.ControllerApplication(Deconz(), database_file=database_file)
app = application.ControllerApplication(
deconz_api.Deconz(), database_file=database_file
)
return app


Expand Down Expand Up @@ -172,11 +174,11 @@ async def test_form_network(app):
side_effect=asyncio.coroutine(mock.MagicMock())
)

app._api.network_state = 2
app._api._device_state = deconz_api.DeviceState(2)
await app.form_network()
assert app._api.device_state.call_count == 0

app._api.network_state = 0
app._api._device_state = deconz_api.DeviceState(0)
application.CHANGE_NETWORK_WAIT = 0.001
with pytest.raises(Exception):
await app.form_network()
Expand Down
106 changes: 61 additions & 45 deletions zigpy_deconz/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import enum
import binascii
import typing

from . import uart
from . import types as t
Expand All @@ -14,6 +15,46 @@
MIN_PROTO_VERSION = 0x010B


class Status(t.uint8_t, enum.Enum):
SUCCESS = 0
FAILURE = 1
BUSY = 2
TIMEOUT = 3
UNSUPPORTED = 4
ERROR = 5
NO_NETWORK = 6
INVALID_VALUE = 7


class DeviceState(enum.IntFlag):
APSDE_DATA_CONFIRM = 0x04
APSDE_DATA_INDICATION = 0x08
CONF_CHANGED = 0x10
APSDE_DATA_REQUEST_SLOTS_AVAILABLE = 0x20

@classmethod
def deserialize(cls, data) -> typing.Tuple["DeviceState", bytes]:
"""Deserialize DevceState."""
state, data = t.uint8_t.deserialize(data)
return cls(state), data

def serialize(self) -> bytes:
"""Serialize data."""
return t.uint8_t(self).serialize()

@property
def network_state(self) -> "NetworkState":
"""Return network state."""
return NetworkState(self & 0x03)


class NetworkState(t.uint8_t, enum.Enum):
OFFLINE = 0
JOINING = 1
CONNECTED = 2
LEAVING = 3


class Command(t.uint8_t, enum.Enum):
aps_data_confirm = 0x04
device_state = 0x07
Expand Down Expand Up @@ -55,7 +96,7 @@ class Command(t.uint8_t, enum.Enum):
Command.aps_data_confirm: (
(
t.uint16_t,
t.uint8_t,
DeviceState,
t.uint8_t,
t.DeconzAddressEndpoint,
t.uint8_t,
Expand All @@ -70,7 +111,7 @@ class Command(t.uint8_t, enum.Enum):
Command.aps_data_indication: (
(
t.uint16_t,
t.uint8_t,
DeviceState,
t.DeconzAddress,
t.uint8_t,
t.DeconzAddress,
Expand All @@ -89,10 +130,10 @@ class Command(t.uint8_t, enum.Enum):
),
True,
),
Command.aps_data_request: ((t.uint16_t, t.uint8_t, t.uint8_t), True),
Command.aps_data_request: ((t.uint16_t, DeviceState, t.uint8_t), True),
Command.change_network_state: ((t.uint8_t,), True),
Command.device_state: ((t.uint8_t, t.uint8_t, t.uint8_t), True),
Command.device_state_changed: ((t.uint8_t, t.uint8_t), False),
Command.device_state: ((DeviceState, t.uint8_t, t.uint8_t), True),
Command.device_state_changed: ((DeviceState, t.uint8_t), False),
Command.mac_poll: ((t.uint16_t, t.DeconzAddress, t.uint8_t, t.int8s), False),
Command.read_parameter: ((t.uint16_t, t.uint8_t, t.Bytes), True),
Command.simplified_beacon: (
Expand Down Expand Up @@ -142,49 +183,24 @@ class NetworkParameter(t.uint8_t, enum.Enum):
}


class Status(t.uint8_t, enum.Enum):
SUCCESS = 0
FAILURE = 1
BUSY = 2
TIMEOUT = 3
UNSUPPORTED = 4
ERROR = 5
NO_NETWORK = 6
INVALID_VALUE = 7


class DeviceState(t.uint8_t, enum.Enum):
APSDE_DATA_CONFIRM = 0x04
APSDE_DATA_INDICATION = 0x08
CONF_CHANGED = 0x10
APSDE_DATA_REQUEST = 0x20

@classmethod
def flags(cls, value: int):
"""Make it into list of flags, until we deprecate py35 and py36."""
return [flag for flag in cls if (value & flag) == flag]


class NetworkState(t.uint8_t, enum.Enum):
OFFLINE = 0
JOINING = 1
CONNECTED = 2
LEAVING = 3


class Deconz:
def __init__(self):
self._uart = None
self._seq = 1
self._awaiting = {}
self._app = None
self._cmd_mode_future = None
self.network_state = NetworkState.OFFLINE
self._device_state = DeviceState(NetworkState.OFFLINE)
self._data_indication = False
self._data_confirm = False
self._proto_ver = None
self._aps_data_ind_flags = 0x01

@property
def network_state(self) -> NetworkState:
"""Return current network state."""
return self._device_state.network_state

@property
def protocol_version(self):
"""Protocol Version."""
Expand Down Expand Up @@ -433,20 +449,20 @@ def _handle_simplified_beacon(self, data):
data[5],
)

def _handle_device_state_value(self, value):
flags = DeviceState.flags(value)
ns = NetworkState(value & 0x03)
if ns != self.network_state:
def _handle_device_state_value(self, state: DeviceState) -> None:
if state.network_state != self.network_state:
LOGGER.debug(
"Network state transition: %s -> %s", self.network_state.name, ns.name
"Network state transition: %s -> %s",
self.network_state.name,
state.network_state.name,
)
self.network_state = ns
if DeviceState.APSDE_DATA_REQUEST not in flags:
self._device_state = state
if DeviceState.APSDE_DATA_REQUEST_SLOTS_AVAILABLE not in state:
LOGGER.debug("Data request queue full.")
if DeviceState.APSDE_DATA_INDICATION in flags and not self._data_indication:
if DeviceState.APSDE_DATA_INDICATION in state and not self._data_indication:
self._data_indication = True
asyncio.ensure_future(self._aps_data_indication())
if DeviceState.APSDE_DATA_CONFIRM in flags and not self._data_confirm:
if DeviceState.APSDE_DATA_CONFIRM in state and not self._data_confirm:
self._data_confirm = True
asyncio.ensure_future(self._aps_data_confirm())

Expand Down