Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 35 additions & 14 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
import zigpy_deconz.exception


COMMANDS = [*deconz_api.TX_COMMANDS.items(), *deconz_api.RX_COMMANDS.items()]


@pytest.fixture
def api():
api = deconz_api.Deconz()
Expand Down Expand Up @@ -41,7 +38,7 @@ def test_close(api):
def test_commands():
import string
anum = string.ascii_letters + string.digits + '_'
for cmd_name, cmd_opts in COMMANDS:
for cmd_name, cmd_opts in deconz_api.RX_COMMANDS.items():
assert isinstance(cmd_name, str) is True
assert all([c in anum for c in cmd_name]), cmd_name
assert len(cmd_opts) == 3
Expand All @@ -50,12 +47,19 @@ def test_commands():
assert isinstance(schema, tuple) is True
assert reply is None or isinstance(reply, bool)

for cmd_name, cmd_opts in deconz_api.TX_COMMANDS.items():
assert isinstance(cmd_name, str) is True
assert all([c in anum for c in cmd_name]), cmd_name
assert len(cmd_opts) == 2
cmd_id, schema = cmd_opts
assert isinstance(cmd_id, int) is True
assert isinstance(schema, tuple) is True


@pytest.mark.asyncio
async def test_command(api, monkeypatch):
def mock_api_frame(name, *args):
c = deconz_api.TX_COMMANDS[name]
return mock.sentinel.api_frame_data, c[2]
return mock.sentinel.api_frame_data
api._api_frame = mock.MagicMock(side_effect=mock_api_frame)
api._uart.send = mock.MagicMock()

Expand All @@ -64,12 +68,29 @@ async def mock_fut():
monkeypatch.setattr(asyncio, 'Future', mock_fut)

for cmd_name, cmd_opts in deconz_api.TX_COMMANDS.items():
_, _, expect_reply = cmd_opts
ret = await api._command(cmd_name, mock.sentinel.cmd_data)
if expect_reply:
assert ret is mock.sentinel.cmd_result
else:
assert ret is None
assert ret is mock.sentinel.cmd_result
assert api._api_frame.call_count == 1
assert api._api_frame.call_args[0][0] == cmd_name
assert api._api_frame.call_args[0][1] == mock.sentinel.cmd_data
assert api._uart.send.call_count == 1
assert api._uart.send.call_args[0][0] == mock.sentinel.api_frame_data
api._api_frame.reset_mock()
api._uart.send.reset_mock()


@pytest.mark.asyncio
async def test_command_timeout(api, monkeypatch):
def mock_api_frame(name, *args):
return mock.sentinel.api_frame_data
api._api_frame = mock.MagicMock(side_effect=mock_api_frame)
api._uart.send = mock.MagicMock()

monkeypatch.setattr(deconz_api, 'COMMAND_TIMEOUT', 0.1)

for cmd_name, cmd_opts in deconz_api.TX_COMMANDS.items():
with pytest.raises(asyncio.TimeoutError):
await api._command(cmd_name, mock.sentinel.cmd_data)
assert api._api_frame.call_count == 1
assert api._api_frame.call_args[0][0] == cmd_name
assert api._api_frame.call_args[0][1] == mock.sentinel.cmd_data
Expand All @@ -85,7 +106,7 @@ def test_api_frame(api):
addr.address = t.uint8_t(0)
addr.endpoint = t.uint8_t(0)
for cmd_name, cmd_opts in deconz_api.TX_COMMANDS.items():
_, schema, _ = cmd_opts
_, schema = cmd_opts
if schema:
args = [addr if isinstance(a(), t.DeconzAddressEndpoint) else a() for a in schema]
api._api_frame(cmd_name, *args)
Expand All @@ -103,7 +124,7 @@ def test_data_received(api, monkeypatch):
payload = b'\x01\x02\x03\x04'
data = cmd_id.to_bytes(1, 'big') + b'\x00\x00\x00\x00' + payload
setattr(api, '_handle_{}'.format(cmd), my_handler)
api._awaiting[0] = (mock.MagicMock(), )
api._awaiting[0] = mock.MagicMock()
api.data_received(data)
assert t.deserialize.call_count == 1
assert t.deserialize.call_args[0][0] == payload
Expand All @@ -125,7 +146,7 @@ def test_data_received_unk_status(api, monkeypatch):
data = cmd_id.to_bytes(1, 'big') + b'\x00' + \
status + b'\x00\x00' + payload
setattr(api, '_handle_{}'.format(cmd), my_handler)
api._awaiting[0] = (mock.MagicMock(), )
api._awaiting[0] = mock.MagicMock()
api.data_received(data)
assert t.deserialize.call_count == 1
assert t.deserialize.call_args[0][0] == payload
Expand Down
39 changes: 19 additions & 20 deletions zigpy_deconz/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@
DECONZ_BAUDRATE = 38400

TX_COMMANDS = {
'device_state': (0x07, (t.uint8_t, t.uint8_t, t.uint8_t), True),
'change_network_state': (0x08, (t.uint8_t, ), True),
'read_parameter': (0x0A, (t.uint16_t, t.uint8_t), True),
'write_parameter': (0x0B, (t.uint16_t, t.uint8_t, t.Bytes), True),
'version': (0x0D, (), True),
'aps_data_indication': (0x17, (t.uint16_t, t.uint8_t), True),
'device_state': (0x07, (t.uint8_t, t.uint8_t, t.uint8_t), ),
'change_network_state': (0x08, (t.uint8_t, ), ),
'read_parameter': (0x0A, (t.uint16_t, t.uint8_t), ),
'write_parameter': (0x0B, (t.uint16_t, t.uint8_t, t.Bytes), ),
'version': (0x0D, (), ),
'aps_data_indication': (0x17, (t.uint16_t, t.uint8_t), ),
'aps_data_request': (
0x12,
(t.uint16_t, t.uint8_t, t.uint8_t, t.DeconzAddressEndpoint,
t.uint16_t, t.uint16_t, t.uint8_t, t.LVBytes, t.uint8_t,
t.uint8_t),
True),
'aps_data_confirm': (0x04, (t.uint16_t, ), True),
),
'aps_data_confirm': (0x04, (t.uint16_t, ), ),
}

RX_COMMANDS = {
Expand Down Expand Up @@ -126,28 +126,27 @@ def close(self):

async def _command(self, name, *args):
LOGGER.debug("Command %s %s", name, args)
data, needs_response = self._api_frame(name, *args)
self._uart.send(data)
fut = None
if needs_response:
fut = asyncio.Future()
self._awaiting[self._seq] = (fut, )
self._seq = (self._seq % 255) + 1
data = self._api_frame(name, *args)
self._uart.send(data)
fut = asyncio.Future()
self._awaiting[self._seq] = fut
try:
return await asyncio.wait_for(fut, timeout=COMMAND_TIMEOUT)
except asyncio.TimeoutError:
LOGGER.warning("No response to '%s' command", name)
self._awaiting.pop(self._seq)
raise

def _api_frame(self, name, *args):
c = TX_COMMANDS[name]
d = t.serialize(args, c[1])
data = t.uint8_t(c[0]).serialize()
cmd_id, schema = TX_COMMANDS[name]
d = t.serialize(args, schema)
data = t.uint8_t(cmd_id).serialize()
data += t.uint8_t(self._seq).serialize()
data += t.uint8_t(0).serialize()
data += t.uint16_t(len(d) + 5).serialize()
data += d
return data, c[2]
return data

def data_received(self, data):
if data[0] not in self._commands_by_id:
Expand All @@ -163,8 +162,8 @@ def data_received(self, data):
data, _ = t.deserialize(data[5:], RX_COMMANDS[command][1])
except Exception:
LOGGER.warning("Failed to deserialize frame: %s", binascii.hexlify(data))
if RX_COMMANDS[command][2]:
fut, = self._awaiting.pop(seq)
if RX_COMMANDS[command][2] and seq in self._awaiting:
fut = self._awaiting.pop(seq)
if status != STATUS.SUCCESS:
fut.set_exception(
CommandError(status, '%s, status: %s' % (command,
Expand Down