Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Test api module."""

import asyncio
import binascii
import logging

import pytest
Expand All @@ -18,12 +19,18 @@


@pytest.fixture
def api(event_loop):
def uart_gw():
gw = MagicMock(auto_spec=uart.Gateway(MagicMock()))
return gw


@pytest.fixture
def api(event_loop, uart_gw):
controller = MagicMock(
spec_set=zigpy_deconz.zigbee.application.ControllerApplication
)
api = deconz_api.Deconz(controller, {zigpy.config.CONF_DEVICE_PATH: "/dev/null"})
api._uart = MagicMock()
api._uart = uart_gw
return api


Expand Down Expand Up @@ -192,18 +199,18 @@ def test_data_received_unk_status(api, monkeypatch):
my_handler = MagicMock()

for cmd, cmd_opts in deconz_api.RX_COMMANDS.items():
_, unsolicited = cmd_opts
_, solicited = cmd_opts
payload = b"\x01\x02\x03\x04"
status = t.uint8_t(0xFE).serialize()
data = cmd.serialize() + b"\x00" + status + b"\x00\x00" + payload
setattr(api, "_handle_{}".format(cmd.name), my_handler)
api._awaiting[0] = MagicMock()
api.data_received(data)
assert t.deserialize.call_count == 1
assert t.deserialize.call_args[0][0] == payload
if unsolicited:
if solicited:
assert my_handler.call_count == 0
assert t.deserialize.call_count == 0
else:
assert t.deserialize.call_count == 1
assert my_handler.call_count == 1
t.deserialize.reset_mock()
my_handler.reset_mock()
Expand Down Expand Up @@ -566,3 +573,30 @@ def test_tx_status(value, name):
def test_handle_add_neighbour(api):
"""Test handle_add_neighbour."""
api._handle_add_neighbour((12, 1, 0x1234, sentinel.ieee, 0x80))


@pytest.mark.parametrize("status", (0x00, 0x05))
async def test_aps_data_req_deserialize_error(api, uart_gw, status, caplog):
"""Test deserialization error."""

device_state = (
deconz_api.DeviceState.APSDE_DATA_INDICATION
| deconz_api.DeviceState.APSDE_DATA_REQUEST_SLOTS_AVAILABLE
| deconz_api.NetworkState.CONNECTED
)
api._handle_device_state_value(device_state)
await asyncio.sleep(0)
await asyncio.sleep(0)
await asyncio.sleep(0)
assert uart_gw.send.call_count == 1
assert api._data_indication is True

api.data_received(
uart_gw.send.call_args[0][0][0:2]
+ bytes([status])
+ binascii.unhexlify("0800010022")
)
await asyncio.sleep(0)
await asyncio.sleep(0)
await asyncio.sleep(0)
assert api._data_indication is False
29 changes: 19 additions & 10 deletions zigpy_deconz/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import serial
from zigpy.config import CONF_DEVICE_PATH
import zigpy.exceptions
from zigpy.types import APSStatus, Channels

from zigpy_deconz.exception import APIException, CommandError
Expand Down Expand Up @@ -332,21 +333,29 @@ def data_received(self, data):
status = Status(data[2])
except ValueError:
status = data[2]
try:
data, _ = t.deserialize(data[5:], schema)
except Exception as exc:
LOGGER.warning("Failed to deserialize frame: %s", binascii.hexlify(data))
if solicited and seq in self._awaiting:
fut = self._awaiting.pop(seq)
fut.set_exception(exc)
return

fut = None
if solicited and seq in self._awaiting:
fut = self._awaiting.pop(seq)
if status != Status.SUCCESS:
fut.set_exception(
CommandError(status, "%s, status: %s" % (command, status))
)
return

try:
data, _ = t.deserialize(data[5:], schema)
except Exception:
LOGGER.warning("Failed to deserialize frame: %s", binascii.hexlify(data))
if fut is not None:
fut.set_exception(
APIException(
f"Failed to deserialize frame: {binascii.hexlify(data)}"
)
)
return

if fut is not None:
fut.set_result(data)
getattr(self, "_handle_%s" % (command.name,))(data)

Expand Down Expand Up @@ -465,7 +474,7 @@ async def _aps_data_indication(self):
binascii.hexlify(r[8]),
)
return r
except asyncio.TimeoutError:
except (asyncio.TimeoutError, zigpy.exceptions.ZigbeeException):
self._data_indication = False

def _handle_aps_data_indication(self, data):
Expand Down Expand Up @@ -527,7 +536,7 @@ async def _aps_data_confirm(self):
r[5],
)
return r
except asyncio.TimeoutError:
except (asyncio.TimeoutError, zigpy.exceptions.ZigbeeException):
self._data_confirm = False

def _handle_add_neighbour(self, data) -> None:
Expand Down