Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,64 @@ async def test_aps_data_request_busy(api, monkeypatch):
with pytest.raises(zigpy_deconz.exception.CommandError):
await api.aps_data_request(*params)
assert mock_cmd.call_count == 4


def test_handle_read_parameter(api):
api._handle_read_parameter(mock.sentinel.data)


@pytest.mark.asyncio
async def test_read_parameter(api):
api._command = mock.MagicMock()
api._command.side_effect = asyncio.coroutine(
mock.MagicMock(return_value=(mock.sentinel.len,
mock.sentinel.param_id,
b'\xaa\x55')))

r = await api.read_parameter(deconz_api.NetworkParameter.nwk_panid)
assert api._command.call_count == 1
assert r == 0x55aa

api._command.reset_mock()
r = await api.read_parameter(0x05)
assert api._command.call_count == 1
assert r == 0x55aa

with pytest.raises(KeyError):
await api.read_parameter('unknown_param')

unk_param = 0xff
assert unk_param not in list(deconz_api.NetworkParameter)
with pytest.raises(KeyError):
await api.read_parameter(unk_param)


def test_handle_write_parameter(api):
param_id = 0x05
api._handle_write_parameter([mock.sentinel.len, param_id])

unk_param = 0xff
assert unk_param not in list(deconz_api.NetworkParameter)
api._handle_write_parameter([mock.sentinel.len, unk_param])


@pytest.mark.asyncio
async def test_write_parameter(api):
api._command = mock.MagicMock()
api._command.side_effect = asyncio.coroutine(
mock.MagicMock())

await api.write_parameter(deconz_api.NetworkParameter.nwk_panid, 0x55aa)
assert api._command.call_count == 1

api._command.reset_mock()
await api.write_parameter(0x05, 0x55aa)
assert api._command.call_count == 1

with pytest.raises(KeyError):
await api.write_parameter('unknown_param', 0x55aa)

unk_param = 0xff
assert unk_param not in list(deconz_api.NetworkParameter)
with pytest.raises(KeyError):
await api.write_parameter(unk_param, 0x55aa)
4 changes: 4 additions & 0 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,12 @@ async def _version():
side_effect=asyncio.coroutine(mock.MagicMock()))
app._api._command = mock.MagicMock(
side_effect=asyncio.coroutine(mock.MagicMock()))
app._api.read_parameter = mock.MagicMock(
side_effect=asyncio.coroutine(mock.MagicMock()))
app._api.version = mock.MagicMock(
side_effect=_version)
app._api.write_parameter = mock.MagicMock(
side_effect=asyncio.coroutine(mock.MagicMock()))

new_mock = mock.MagicMock(side_effect=asyncio.coroutine(mock.MagicMock()))
monkeypatch.setattr(application.ConBeeDevice, 'new', new_mock)
Expand Down
96 changes: 72 additions & 24 deletions zigpy_deconz/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,25 +67,42 @@ class Command(t.uint8_t, enum.Enum):
Command.zigbee_green_power: ((t.LVBytes, ), False),
}

NETWORK_PARAMETER = {
'mac_address': (0x01, t.uint64_t),
'nwk_panid': (0x05, t.uint16_t),
'nwk_address': (0x07, t.uint16_t),
'nwk_extended_panid': (0x08, t.uint64_t),
'aps_designed_coordinator': (0x09, t.uint8_t),
'channel_mask': (0x0A, t.uint32_t),
'aps_extended_panid': (0x0B, t.uint64_t),
'trust_center_address': (0x0E, t.uint64_t),
'security_mode': (0x10, t.uint8_t),
'network_key': (0x18, t.uint8_t),
'current_channel': (0x1C, t.uint8_t),
'permit_join': (0x21, t.uint8_t),
'protocol_version': (0x22, t.uint16_t),
'nwk_update_id': (0x24, t.uint8_t),
'watchdog_ttl': (0x26, t.uint32_t),
}

NETWORK_PARAMETER_BY_ID = {v[0]: (k, v[1]) for k, v in NETWORK_PARAMETER.items()}
class NetworkParameter(t.uint8_t, enum.Enum):
mac_address = 0x01
nwk_panid = 0x05
nwk_address = 0x07
nwk_extended_panid = 0x08
aps_designed_coordinator = 0x09
channel_mask = 0x0A
aps_extended_panid = 0x0B
trust_center_address = 0x0E
security_mode = 0x10
network_key = 0x18
current_channel = 0x1C
permit_join = 0x21
protocol_version = 0x22
nwk_update_id = 0x24
watchdog_ttl = 0x26


NETWORK_PARAMETER_SCHEMA = {
NetworkParameter.mac_address: t.EUI64,
NetworkParameter.nwk_panid: t.uint16_t,
NetworkParameter.nwk_address: t.uint16_t,
NetworkParameter.nwk_extended_panid: t.uint64_t,
NetworkParameter.aps_designed_coordinator: t.uint8_t,
NetworkParameter.channel_mask: t.uint32_t,
NetworkParameter.aps_extended_panid: t.uint64_t,
NetworkParameter.trust_center_address: t.uint64_t,
NetworkParameter.security_mode: t.uint8_t,
NetworkParameter.network_key: t.uint8_t,
NetworkParameter.current_channel: t.uint8_t,
NetworkParameter.permit_join: t.uint8_t,
NetworkParameter.protocol_version: t.uint16_t,
NetworkParameter.nwk_update_id: t.uint8_t,
NetworkParameter.watchdog_ttl: t.uint32_t,
}


class Status(t.uint8_t, enum.Enum):
Expand Down Expand Up @@ -206,19 +223,44 @@ def change_network_state(self, state):
def _handle_change_network_state(self, data):
LOGGER.debug("Change network state response: %s", NetworkState(data[0]).name)

def read_parameter(self, id_):
return self._command(Command.read_parameter, 1, id_)
async def read_parameter(self, id_):
try:
if isinstance(id_, str):
param = NetworkParameter[id_]
else:
param = NetworkParameter(id_)
except (KeyError, ValueError):
raise KeyError("Unknown parameter id: %s" % (id_, ))

r = await self._command(Command.read_parameter, 1, param)
data = NETWORK_PARAMETER_SCHEMA[param].deserialize(r[2])[0]
LOGGER.debug("Read parameter %s response: %s", param.name, data)
return data

def _handle_read_parameter(self, data):
LOGGER.debug("Read parameter %s response: %s", NETWORK_PARAMETER_BY_ID[data[1]][0], data[2])
pass

def write_parameter(self, id_, value):
v = NETWORK_PARAMETER_BY_ID[id_][1](value).serialize()
try:
if isinstance(id_, str):
param = NetworkParameter[id_]
else:
param = NetworkParameter(id_)
except (KeyError, ValueError):
raise KeyError("Unknown parameter id: %s write request" % (id_,))

v = NETWORK_PARAMETER_SCHEMA[param](value).serialize()
length = len(v) + 1
return self._command(Command.write_parameter, length, id_, v)
return self._command(Command.write_parameter, length, param, v)

def _handle_write_parameter(self, data):
LOGGER.debug("Write parameter %s: SUCCESS", NETWORK_PARAMETER_BY_ID[data[1]][0])
try:
param = NetworkParameter(data[1])
except ValueError:
LOGGER.error("Received unknown network param id '%s' response",
data[1])
return
LOGGER.debug("Write parameter %s: SUCCESS", param.name)

def version(self):
return self._command(Command.version)
Expand Down Expand Up @@ -317,3 +359,9 @@ def _handle_device_state_value(self, value):
if DeviceState.APSDE_DATA_CONFIRM in flags and not self._data_confirm:
self._data_confirm = True
asyncio.ensure_future(self._aps_data_confirm())

def __getitem__(self, key):
return self.read_parameter(key)

def __setitem__(self, key, value):
return asyncio.ensure_future(self.write_parameter(key, value))
35 changes: 16 additions & 19 deletions zigpy_deconz/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import zigpy.util
import zigpy_deconz.exception
from zigpy_deconz import types as t
from zigpy_deconz.api import NETWORK_PARAMETER, NetworkState
from zigpy_deconz.api import NetworkParameter, NetworkState

LOGGER = logging.getLogger(__name__)

Expand All @@ -32,7 +32,7 @@ def __init__(self, api, database_file=None):

async def _reset_watchdog(self):
while True:
await self._api.write_parameter(NETWORK_PARAMETER['watchdog_ttl'][0], 3600)
await self._api.write_parameter(NetworkParameter.watchdog_ttl, 3600)
await asyncio.sleep(1200)

async def shutdown(self):
Expand All @@ -44,19 +44,19 @@ async def startup(self, auto_form=False):
r = await self._api.version()
self.version = r[0]
await self._api.device_state()
r = await self._api.read_parameter(NETWORK_PARAMETER['mac_address'][0])
self._ieee = zigpy.types.EUI64([zigpy.types.uint8_t(r[2][i]) for i in range(7, -1, -1)])
await self._api.read_parameter(NETWORK_PARAMETER['nwk_panid'][0])
await self._api.read_parameter(NETWORK_PARAMETER['nwk_address'][0])
await self._api.read_parameter(NETWORK_PARAMETER['nwk_extended_panid'][0])
await self._api.read_parameter(NETWORK_PARAMETER['channel_mask'][0])
await self._api.read_parameter(NETWORK_PARAMETER['aps_extended_panid'][0])
await self._api.read_parameter(NETWORK_PARAMETER['trust_center_address'][0])
await self._api.read_parameter(NETWORK_PARAMETER['security_mode'][0])
await self._api.read_parameter(NETWORK_PARAMETER['current_channel'][0])
await self._api.read_parameter(NETWORK_PARAMETER['protocol_version'][0])
await self._api.read_parameter(NETWORK_PARAMETER['nwk_update_id'][0])
await self._api.write_parameter(NETWORK_PARAMETER['aps_designed_coordinator'][0], 1)
ieee = await self._api[NetworkParameter.mac_address]
self._ieee = zigpy.types.EUI64(ieee)
await self._api[NetworkParameter.nwk_panid]
await self._api[NetworkParameter.nwk_address]
await self._api[NetworkParameter.nwk_extended_panid]
await self._api[NetworkParameter.channel_mask]
await self._api[NetworkParameter.aps_extended_panid]
await self._api[NetworkParameter.trust_center_address]
await self._api[NetworkParameter.security_mode]
await self._api[NetworkParameter.current_channel]
await self._api[NetworkParameter.protocol_version]
await self._api[NetworkParameter.nwk_update_id]
self._api[NetworkParameter.aps_designed_coordinator] = 1

if self.version > 0x261f0500:
asyncio.ensure_future(self._reset_watchdog())
Expand Down Expand Up @@ -152,10 +152,7 @@ async def broadcast(self, profile, cluster, src_ep, dst_ep, grpid, radius,

async def permit_ncp(self, time_s=60):
assert 0 <= time_s <= 254
await self._api.write_parameter(
NETWORK_PARAMETER['permit_join'][0],
time_s
)
await self._api.write_parameter(NetworkParameter.permit_join, time_s)

def handle_rx(self, src_addr, src_ep, dst_ep, profile_id, cluster_id, data, lqi, rssi):
# intercept ZDO device announce frames
Expand Down