diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..2e20093 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,12 @@ +repos: +- repo: https://github.com/psf/black + rev: 19.3b0 + hooks: + - id: black + args: + - --safe + - --quiet +- repo: https://gitlab.com/pycqa/flake8 + rev: 3.7.8 + hooks: + - id: flake8 diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..af878e0 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,31 @@ +[flake8] +exclude = .venv,.git,.tox,docs,venv,bin,lib,deps,build +# To work with Black +max-line-length = 88 +# W503: Line break occurred before a binary operator +# E203: Whitespace before ':' +# D202 No blank lines allowed after function docstring +ignore = + W503, + E203, + D202 + +[isort] +# https://github.com/timothycrosley/isort +# https://github.com/timothycrosley/isort/wiki/isort-Settings +# splits long import on multiple lines indented by 4 spaces +multi_line_output = 3 +include_trailing_comma=True +force_grid_wrap=0 +use_parentheses=True +line_length=88 +indent = " " +# by default isort don't check module indexes +not_skip = __init__.py +# will group `import x` and `from x import` of the same module. +force_sort_within_sections = true +sections = FUTURE,STDLIB,INBETWEENS,THIRDPARTY,FIRSTPARTY,LOCALFOLDER +default_section = THIRDPARTY +known_first_party = zigpy_deconz,tests +forced_separate = tests +combine_as_imports = true diff --git a/setup.py b/setup.py index 285f4ae..fde3208 100644 --- a/setup.py +++ b/setup.py @@ -12,12 +12,7 @@ author="Daniel Schmidt", author_email="schmidt.d@aon.at", license="GPL-3.0", - packages=find_packages(exclude=['*.tests']), - install_requires=[ - 'pyserial-asyncio', - 'zigpy-homeassistant>=0.10.0', - ], - tests_require=[ - 'pytest', - ], + packages=find_packages(exclude=["*.tests"]), + install_requires=["pyserial-asyncio", "zigpy-homeassistant>=0.10.0"], + tests_require=["pytest"], ) diff --git a/tests/test_api.py b/tests/test_api.py index eb06558..9ce5f77 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -24,8 +24,8 @@ async def test_connect(monkeypatch): api = deconz_api.Deconz() dev = mock.MagicMock() monkeypatch.setattr( - uart, 'connect', - mock.MagicMock(side_effect=asyncio.coroutine(mock.MagicMock()))) + uart, "connect", mock.MagicMock(side_effect=asyncio.coroutine(mock.MagicMock())) + ) await api.connect(dev, 115200) @@ -52,12 +52,14 @@ def test_commands(): async def test_command(api, monkeypatch): def mock_api_frame(name, *args): return mock.sentinel.api_frame_data, api._seq + api._api_frame = mock.MagicMock(side_effect=mock_api_frame) api._uart.send = mock.MagicMock() async def mock_fut(): return mock.sentinel.cmd_result - monkeypatch.setattr(asyncio, 'Future', mock_fut) + + monkeypatch.setattr(asyncio, "Future", mock_fut) for cmd, cmd_opts in deconz_api.TX_COMMANDS.items(): ret = await api._command(cmd, mock.sentinel.cmd_data) @@ -75,10 +77,11 @@ async def mock_fut(): async def test_command_timeout(api, monkeypatch): def mock_api_frame(name, *args): return mock.sentinel.api_frame_data, api._seq + api._api_frame = mock.MagicMock(side_effect=mock_api_frame) api._uart.send = mock.MagicMock() - monkeypatch.setattr(deconz_api, 'COMMAND_TIMEOUT', 0.1) + monkeypatch.setattr(deconz_api, "COMMAND_TIMEOUT", 0.1) for cmd, cmd_opts in deconz_api.TX_COMMANDS.items(): with pytest.raises(asyncio.TimeoutError): @@ -99,21 +102,27 @@ def test_api_frame(api): addr.endpoint = t.uint8_t(0) for cmd, schema in deconz_api.TX_COMMANDS.items(): if schema: - args = [addr if isinstance(a(), t.DeconzAddressEndpoint) else a() for a in schema] + args = [ + addr if isinstance(a(), t.DeconzAddressEndpoint) else a() + for a in schema + ] api._api_frame(cmd, *args) else: api._api_frame(cmd) def test_data_received(api, monkeypatch): - monkeypatch.setattr(t, 'deserialize', mock.MagicMock( - return_value=(mock.sentinel.deserialize_data, b''))) + monkeypatch.setattr( + t, + "deserialize", + mock.MagicMock(return_value=(mock.sentinel.deserialize_data, b"")), + ) my_handler = mock.MagicMock() for cmd, cmd_opts in deconz_api.RX_COMMANDS.items(): - payload = b'\x01\x02\x03\x04' - data = cmd.serialize() + b'\x00\x00\x00\x00' + payload - setattr(api, '_handle_{}'.format(cmd.name), my_handler) + payload = b"\x01\x02\x03\x04" + data = cmd.serialize() + b"\x00\x00\x00\x00" + payload + setattr(api, "_handle_{}".format(cmd.name), my_handler) api._awaiting[0] = mock.MagicMock() api.data_received(data) assert t.deserialize.call_count == 1 @@ -125,16 +134,19 @@ def test_data_received(api, monkeypatch): def test_data_received_unk_status(api, monkeypatch): - monkeypatch.setattr(t, 'deserialize', mock.MagicMock( - return_value=(mock.sentinel.deserialize_data, b''))) + monkeypatch.setattr( + t, + "deserialize", + mock.MagicMock(return_value=(mock.sentinel.deserialize_data, b"")), + ) my_handler = mock.MagicMock() for cmd, cmd_opts in deconz_api.RX_COMMANDS.items(): _, unsolicited = cmd_opts - payload = b'\x01\x02\x03\x04' - status = t.uint8_t(0xfe).serialize() - data = cmd.serialize() + b'\x00' + status + b'\x00\x00' + payload - setattr(api, '_handle_{}'.format(cmd.name), my_handler) + payload = b"\x01\x02\x03\x04" + status = t.uint8_t(0xFE).serialize() + data = cmd.serialize() + b"\x00" + status + b"\x00\x00" + payload + setattr(api, "_handle_{}".format(cmd.name), my_handler) api._awaiting[0] = mock.MagicMock() api.data_received(data) assert t.deserialize.call_count == 1 @@ -148,37 +160,36 @@ def test_data_received_unk_status(api, monkeypatch): def test_data_received_unk_cmd(api, monkeypatch): - monkeypatch.setattr(t, 'deserialize', mock.MagicMock( - return_value=(mock.sentinel.deserialize_data, b''))) + monkeypatch.setattr( + t, + "deserialize", + mock.MagicMock(return_value=(mock.sentinel.deserialize_data, b"")), + ) for cmd_id in range(253, 255): - payload = b'\x01\x02\x03\x04' + payload = b"\x01\x02\x03\x04" status = t.uint8_t(0x00).serialize() - data = cmd_id.to_bytes(1, 'big') + b'\x00' + \ - status + b'\x00\x00' + payload - api._awaiting[0] = (mock.MagicMock(), ) + data = cmd_id.to_bytes(1, "big") + b"\x00" + status + b"\x00\x00" + payload + api._awaiting[0] = (mock.MagicMock(),) api.data_received(data) assert t.deserialize.call_count == 0 t.deserialize.reset_mock() def test_simplified_beacon(api): - api._handle_simplified_beacon( - (0x0007, 0x1234, 0x5678, 0x19, 0x00, 0x01) - ) + api._handle_simplified_beacon((0x0007, 0x1234, 0x5678, 0x19, 0x00, 0x01)) @pytest.mark.asyncio async def test_aps_data_confirm(api, monkeypatch): - monkeypatch.setattr(deconz_api, 'COMMAND_TIMEOUT', 0.1) + monkeypatch.setattr(deconz_api, "COMMAND_TIMEOUT", 0.1) success = True def mock_cmd(*args, **kwargs): res = asyncio.Future() if success: - res.set_result([7, 0x22, 0x11, mock.sentinel.dst_addr, 1, 0x00, - 0, 0, 0, 0]) + res.set_result([7, 0x22, 0x11, mock.sentinel.dst_addr, 1, 0x00, 0, 0, 0, 0]) return asyncio.wait_for(res, timeout=deconz_api.COMMAND_TIMEOUT) api._command = mock_cmd @@ -196,7 +207,7 @@ def mock_cmd(*args, **kwargs): @pytest.mark.asyncio async def test_aps_data_ind(api, monkeypatch): - monkeypatch.setattr(deconz_api, 'COMMAND_TIMEOUT', 0.1) + monkeypatch.setattr(deconz_api, "COMMAND_TIMEOUT", 0.1) success = True @@ -204,8 +215,19 @@ def mock_cmd(*args, **kwargs): res = asyncio.Future() s = mock.sentinel if success: - res.set_result([s.len, 0x22, t.DeconzAddress(), 1, - t.DeconzAddress(), 1, 0x0104, 0x0000, b'\x00\x01\x02']) + res.set_result( + [ + s.len, + 0x22, + t.DeconzAddress(), + 1, + t.DeconzAddress(), + 1, + 0x0104, + 0x0000, + b"\x00\x01\x02", + ] + ) return asyncio.wait_for(res, timeout=deconz_api.COMMAND_TIMEOUT) api._command = mock_cmd @@ -225,16 +247,14 @@ def mock_cmd(*args, **kwargs): async def test_aps_data_request(api): params = [ 0x00, # req id - t.DeconzAddressEndpoint.deserialize( - b'\x02\xaa\x55\x01')[0], # dst + ep + t.DeconzAddressEndpoint.deserialize(b"\x02\xaa\x55\x01")[0], # dst + ep 0x0104, # profile id 0x0007, # cluster id 0x01, # src ep - b'aps payload' + b"aps payload", ] - mock_cmd = mock.MagicMock( - side_effect=asyncio.coroutine(mock.MagicMock())) + mock_cmd = mock.MagicMock(side_effect=asyncio.coroutine(mock.MagicMock())) api._command = mock_cmd await api.aps_data_request(*params) @@ -245,18 +265,19 @@ async def test_aps_data_request(api): async def test_aps_data_request_timeout(api, monkeypatch): params = [ 0x00, # req id - t.DeconzAddressEndpoint.deserialize( - b'\x02\xaa\x55\x01')[0], # dst + ep + t.DeconzAddressEndpoint.deserialize(b"\x02\xaa\x55\x01")[0], # dst + ep 0x0104, # profile id 0x0007, # cluster id 0x01, # src ep - b'aps payload' + b"aps payload", ] - monkeypatch.setattr(deconz_api, 'COMMAND_TIMEOUT', .1) + monkeypatch.setattr(deconz_api, "COMMAND_TIMEOUT", 0.1) mock_cmd = mock.MagicMock( - return_value=asyncio.wait_for(asyncio.Future(), - timeout=deconz_api.COMMAND_TIMEOUT)) + return_value=asyncio.wait_for( + asyncio.Future(), timeout=deconz_api.COMMAND_TIMEOUT + ) + ) api._command = mock_cmd with pytest.raises(asyncio.TimeoutError): @@ -268,23 +289,22 @@ async def test_aps_data_request_timeout(api, monkeypatch): async def test_aps_data_request_busy(api, monkeypatch): params = [ 0x00, # req id - t.DeconzAddressEndpoint.deserialize( - b'\x02\xaa\x55\x01')[0], # dst + ep + t.DeconzAddressEndpoint.deserialize(b"\x02\xaa\x55\x01")[0], # dst + ep 0x0104, # profile id 0x0007, # cluster id 0x01, # src ep - b'aps payload' + b"aps payload", ] res = asyncio.Future() - exc = zigpy_deconz.exception.CommandError(deconz_api.Status.BUSY, 'busy') + exc = zigpy_deconz.exception.CommandError(deconz_api.Status.BUSY, "busy") res.set_exception(exc) mock_cmd = mock.MagicMock(return_value=res) api._command = mock_cmd - monkeypatch.setattr(deconz_api, 'COMMAND_TIMEOUT', .1) + monkeypatch.setattr(deconz_api, "COMMAND_TIMEOUT", 0.1) sleep = mock.MagicMock(side_effect=asyncio.coroutine(mock.MagicMock())) - monkeypatch.setattr(asyncio, 'sleep', sleep) + monkeypatch.setattr(asyncio, "sleep", sleep) with pytest.raises(zigpy_deconz.exception.CommandError): await api.aps_data_request(*params) @@ -299,23 +319,24 @@ def test_handle_read_parameter(api): async def test_read_parameter(api): api._command = mock.MagicMock() api._command.side_effect = asyncio.coroutine( - mock.MagicMock(return_value=(mock.sentinel.len, - mock.sentinel.param_id, - b'\xaa\x55'))) + mock.MagicMock( + return_value=(mock.sentinel.len, mock.sentinel.param_id, b"\xaa\x55") + ) + ) r = await api.read_parameter(deconz_api.NetworkParameter.nwk_panid) assert api._command.call_count == 1 - assert r[0] == 0x55aa + assert r[0] == 0x55AA api._command.reset_mock() r = await api.read_parameter(0x05) assert api._command.call_count == 1 - assert r[0] == 0x55aa + assert r[0] == 0x55AA with pytest.raises(KeyError): - await api.read_parameter('unknown_param') + await api.read_parameter("unknown_param") - unk_param = 0xff + unk_param = 0xFF assert unk_param not in list(deconz_api.NetworkParameter) with pytest.raises(KeyError): await api.read_parameter(unk_param) @@ -325,7 +346,7 @@ def test_handle_write_parameter(api): param_id = 0x05 api._handle_write_parameter([mock.sentinel.len, param_id]) - unk_param = 0xff + unk_param = 0xFF assert unk_param not in list(deconz_api.NetworkParameter) api._handle_write_parameter([mock.sentinel.len, unk_param]) @@ -333,42 +354,43 @@ def test_handle_write_parameter(api): @pytest.mark.asyncio async def test_write_parameter(api): api._command = mock.MagicMock() - api._command.side_effect = asyncio.coroutine( - mock.MagicMock()) + api._command.side_effect = asyncio.coroutine(mock.MagicMock()) - await api.write_parameter(deconz_api.NetworkParameter.nwk_panid, 0x55aa) + await api.write_parameter(deconz_api.NetworkParameter.nwk_panid, 0x55AA) assert api._command.call_count == 1 api._command.reset_mock() - await api.write_parameter(0x05, 0x55aa) + await api.write_parameter(0x05, 0x55AA) assert api._command.call_count == 1 with pytest.raises(KeyError): - await api.write_parameter('unknown_param', 0x55aa) + await api.write_parameter("unknown_param", 0x55AA) - unk_param = 0xff + unk_param = 0xFF assert unk_param not in list(deconz_api.NetworkParameter) with pytest.raises(KeyError): - await api.write_parameter(unk_param, 0x55aa) + await api.write_parameter(unk_param, 0x55AA) @pytest.mark.parametrize( "protocol_ver, firmware_version, flags", [ - (0x010A, 0x123405dd, 0x01), - (0x010B, 0x123405dd, 0x04), - (0x010A, 0x123407dd, 0x01), - (0x010B, 0x123407dd, 0x01), + (0x010A, 0x123405DD, 0x01), + (0x010B, 0x123405DD, 0x04), + (0x010A, 0x123407DD, 0x01), + (0x010B, 0x123407DD, 0x01), ], ) @pytest.mark.asyncio async def test_version(protocol_ver, firmware_version, flags, api): api.read_parameter = mock.MagicMock() api.read_parameter.side_effect = asyncio.coroutine( - mock.MagicMock(return_value=[protocol_ver])) + mock.MagicMock(return_value=[protocol_ver]) + ) api._command = mock.MagicMock() api._command.side_effect = asyncio.coroutine( - mock.MagicMock(return_value=[firmware_version])) + mock.MagicMock(return_value=[firmware_version]) + ) r = await api.version() assert r == firmware_version assert api._aps_data_ind_flags == flags diff --git a/tests/test_application.py b/tests/test_application.py index 1ca8761..c5031b4 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -15,14 +15,13 @@ @pytest.fixture def app(monkeypatch, database_file=None): - app = application.ControllerApplication( - Deconz(), database_file=database_file) + app = application.ControllerApplication(Deconz(), database_file=database_file) return app @pytest.fixture def ieee(): - return EUI64.deserialize(b'\x00\x01\x02\x03\x04\x05\x06\x07')[0] + return EUI64.deserialize(b"\x00\x01\x02\x03\x04\x05\x06\x07")[0] @pytest.fixture @@ -57,7 +56,7 @@ def addr_nwk_and_ieee(nwk, ieee): def _test_rx(app, addr_ieee, addr_nwk, device, data): app.get_device = mock.MagicMock(return_value=device) - app.devices = (EUI64(addr_ieee.address), ) + app.devices = (EUI64(addr_ieee.address),) app.handle_rx( addr_nwk, @@ -127,7 +126,7 @@ def test_rx_wrong_addr_mode(app, addr_ieee, addr_nwk, caplog): app.handle_message = mock.MagicMock() app.get_device = mock.MagicMock(return_value=device) - app.devices = (EUI64(addr_ieee.address), ) + app.devices = (EUI64(addr_ieee.address),) with pytest.raises(Exception): # TODO: don't use broad exceptions addr_nwk.address_mode = 0x22 @@ -137,7 +136,7 @@ def test_rx_wrong_addr_mode(app, addr_ieee, addr_nwk, caplog): mock.sentinel.dst_ep, mock.sentinel.profile_id, mock.sentinel.cluster_id, - b'', + b"", mock.sentinel.lqi, mock.sentinel.rssi, ) @@ -155,7 +154,7 @@ def test_rx_unknown_device(app, addr_ieee, addr_nwk, caplog): mock.sentinel.dst_ep, mock.sentinel.profile_id, mock.sentinel.cluster_id, - b'', + b"", mock.sentinel.lqi, mock.sentinel.rssi, ) @@ -167,9 +166,11 @@ def test_rx_unknown_device(app, addr_ieee, addr_nwk, caplog): @pytest.mark.asyncio async def test_form_network(app): app._api.change_network_state = mock.MagicMock( - side_effect=asyncio.coroutine(mock.MagicMock())) + side_effect=asyncio.coroutine(mock.MagicMock()) + ) app._api.device_state = mock.MagicMock( - side_effect=asyncio.coroutine(mock.MagicMock())) + side_effect=asyncio.coroutine(mock.MagicMock()) + ) app._api.network_state = 2 await app.form_network() @@ -183,36 +184,29 @@ async def test_form_network(app): @pytest.mark.parametrize( - "protocol_ver, watchdog_cc", - [ - (0x0107, False,), - (0x0108, True, ), - (0x010B, True, ), - ], + "protocol_ver, watchdog_cc", [(0x0107, False), (0x0108, True), (0x010B, True)] ) @pytest.mark.asyncio async def test_startup(protocol_ver, watchdog_cc, app, monkeypatch, version=0): - async def _version(): app._api._proto_ver = protocol_ver return [version] app._reset_watchdog = mock.MagicMock( - side_effect=asyncio.coroutine(mock.MagicMock())) - app.form_network = mock.MagicMock( - side_effect=asyncio.coroutine(mock.MagicMock())) - app._api._command = mock.MagicMock( - side_effect=asyncio.coroutine(mock.MagicMock())) + side_effect=asyncio.coroutine(mock.MagicMock()) + ) + app.form_network = mock.MagicMock(side_effect=asyncio.coroutine(mock.MagicMock())) + app._api._command = mock.MagicMock(side_effect=asyncio.coroutine(mock.MagicMock())) app._api.read_parameter = mock.MagicMock( - side_effect=asyncio.coroutine( - mock.MagicMock(return_value=[[0]]))) - app._api.version = mock.MagicMock( - side_effect=_version) + side_effect=asyncio.coroutine(mock.MagicMock(return_value=[[0]])) + ) + app._api.version = mock.MagicMock(side_effect=_version) app._api.write_parameter = mock.MagicMock( - side_effect=asyncio.coroutine(mock.MagicMock())) + side_effect=asyncio.coroutine(mock.MagicMock()) + ) new_mock = mock.MagicMock(side_effect=asyncio.coroutine(mock.MagicMock())) - monkeypatch.setattr(application.ConBeeDevice, 'new', new_mock) + monkeypatch.setattr(application.ConBeeDevice, "new", new_mock) await app.startup(auto_form=False) assert app.form_network.call_count == 0 assert app._reset_watchdog.call_count == watchdog_cc @@ -223,15 +217,15 @@ async def _version(): @pytest.mark.asyncio async def test_permit(app, nwk): app._api.write_parameter = mock.MagicMock( - side_effect=asyncio.coroutine(mock.MagicMock())) + side_effect=asyncio.coroutine(mock.MagicMock()) + ) time_s = 30 await app.permit_ncp(time_s) assert app._api.write_parameter.call_count == 1 assert app._api.write_parameter.call_args_list[0][0][1] == time_s -async def _test_request(app, send_success=True, aps_data_error=False, - **kwargs): +async def _test_request(app, send_success=True, aps_data_error=False, **kwargs): seq = 123 async def req_mock(req_id, dst_addr_ep, profile, cluster, src_ep, data): @@ -246,7 +240,7 @@ async def req_mock(req_id, dst_addr_ep, profile, cluster, src_ep, data): device = zigpy.device.Device(app, mock.sentinel.ieee, 0x1122) app.get_device = mock.MagicMock(return_value=device) - return await app.request(device, 0x0260, 1, 2, 3, seq, b'\x01\x02\x03', **kwargs) + return await app.request(device, 0x0260, 1, 2, 3, seq, b"\x01\x02\x03", **kwargs) @pytest.mark.asyncio @@ -276,8 +270,7 @@ async def test_request_send_aps_data_error(app): assert r[0] != 0 -async def _test_broadcast(app, send_success=True, aps_data_error=False, - **kwargs): +async def _test_broadcast(app, send_success=True, aps_data_error=False, **kwargs): seq = mock.sentinel.req_id async def req_mock(req_id, dst_addr_ep, profile, cluster, src_ep, data): @@ -292,14 +285,21 @@ async def req_mock(req_id, dst_addr_ep, profile, cluster, src_ep, data): app.get_device = mock.MagicMock(spec_set=zigpy.device.Device) r = await app.broadcast( - mock.sentinel.profile, mock.sentinel.cluster, 2, - mock.sentinel.dst_ep, mock.sentinel.grp_id, mock.sentinel.radius, - seq, b'\x01\x02\x03', **kwargs) + mock.sentinel.profile, + mock.sentinel.cluster, + 2, + mock.sentinel.dst_ep, + mock.sentinel.grp_id, + mock.sentinel.radius, + seq, + b"\x01\x02\x03", + **kwargs + ) assert app._api.aps_data_request.call_count == 1 assert app._api.aps_data_request.call_args[0][0] is seq assert app._api.aps_data_request.call_args[0][2] is mock.sentinel.profile assert app._api.aps_data_request.call_args[0][3] is mock.sentinel.cluster - assert app._api.aps_data_request.call_args[0][5] == b'\x01\x02\x03' + assert app._api.aps_data_request.call_args[0][5] == b"\x01\x02\x03" return r @@ -337,7 +337,7 @@ def _handle_reply(app, tsn): mock.sentinel.dst_ep, tsn, mock.sentinel.command_id, - mock.sentinel.args + mock.sentinel.args, ) @@ -359,10 +359,10 @@ def test_rx_device_annce(app, addr_ieee, addr_nwk): app._handle_reply = mock.MagicMock() app.handle_message = mock.MagicMock() - data = t.uint8_t(0xaa).serialize() + data = t.uint8_t(0xAA).serialize() data += addr_nwk.address.serialize() data += addr_ieee.address.serialize() - data += t.uint8_t(0x8e).serialize() + data += t.uint8_t(0x8E).serialize() app.handle_rx( addr_nwk, @@ -389,10 +389,11 @@ async def test_conbee_dev_add_to_group(app, nwk): app._groups.add_group.return_value = group conbee = application.ConBeeDevice(app, mock.sentinel.ieee, nwk) - conbee.endpoints = {0: mock.sentinel.zdo, - 1: mock.sentinel.ep1, - 2: mock.sentinel.ep2 - } + conbee.endpoints = { + 0: mock.sentinel.zdo, + 1: mock.sentinel.ep1, + 2: mock.sentinel.ep2, + } await conbee.add_to_group(mock.sentinel.grp_id, mock.sentinel.grp_name) assert group.add_member.call_count == 2 @@ -406,12 +407,12 @@ async def test_conbee_dev_add_to_group(app, nwk): async def test_conbee_dev_remove_from_group(app, nwk): group = mock.MagicMock() app.groups[mock.sentinel.grp_id] = group - conbee = application.ConBeeDevice(app, - mock.sentinel.ieee, nwk) - conbee.endpoints = {0: mock.sentinel.zdo, - 1: mock.sentinel.ep1, - 2: mock.sentinel.ep2 - } + conbee = application.ConBeeDevice(app, mock.sentinel.ieee, nwk) + conbee.endpoints = { + 0: mock.sentinel.zdo, + 1: mock.sentinel.ep1, + 2: mock.sentinel.ep2, + } await conbee.remove_from_group(mock.sentinel.grp_id) assert group.remove_member.call_count == 2 @@ -425,10 +426,8 @@ def test_conbee_props(nwk): @pytest.mark.asyncio async def test_conbee_new(app, nwk, monkeypatch): - mock_init = mock.MagicMock( - side_effect=asyncio.coroutine(mock.MagicMock()) - ) - monkeypatch.setattr(zigpy.device.Device, '_initialize', mock_init) + mock_init = mock.MagicMock(side_effect=asyncio.coroutine(mock.MagicMock())) + monkeypatch.setattr(zigpy.device.Device, "_initialize", mock_init) conbee = await application.ConBeeDevice.new(app, mock.sentinel.ieee, nwk) assert isinstance(conbee, application.ConBeeDevice) @@ -436,9 +435,11 @@ async def test_conbee_new(app, nwk, monkeypatch): mock_init.reset_mock() mock_dev = mock.MagicMock() - mock_dev.endpoints = {0: mock.MagicMock(), - 1: mock.MagicMock(), - 22: mock.MagicMock()} + mock_dev.endpoints = { + 0: mock.MagicMock(), + 1: mock.MagicMock(), + 22: mock.MagicMock(), + } app.devices[mock.sentinel.ieee] = mock_dev conbee = await application.ConBeeDevice.new(app, mock.sentinel.ieee, nwk) assert isinstance(conbee, application.ConBeeDevice) @@ -461,18 +462,17 @@ def test_tx_confirm_dup(app, caplog): app.handle_tx_confirm(tsn, mock.sentinel.status) assert req.result.set_result.call_count == 1 assert req.result.set_result.call_args[0][0] is mock.sentinel.status - assert any(r.levelname == 'DEBUG' for r in caplog.records) + assert any(r.levelname == "DEBUG" for r in caplog.records) assert "probably duplicate response" in caplog.text def test_tx_confirm_unexpcted(app, caplog): app.handle_tx_confirm(123, 0x00) - assert any(r.levelname == 'WARNING' for r in caplog.records) + assert any(r.levelname == "WARNING" for r in caplog.records) assert "Unexpected transmit confirm for request id" in caplog.text -async def _test_mrequest(app, send_success=True, aps_data_error=False, - **kwargs): +async def _test_mrequest(app, send_success=True, aps_data_error=False, **kwargs): seq = 123 req_id = mock.sentinel.req_id app.get_sequence = mock.MagicMock(return_value=req_id) @@ -489,7 +489,7 @@ async def req_mock(req_id, dst_addr_ep, profile, cluster, src_ep, data): device = zigpy.device.Device(app, mock.sentinel.ieee, 0x1122) app.get_device = mock.MagicMock(return_value=device) - return await app.mrequest(0x55aa, 0x0260, 1, 2, seq, b'\x01\x02\x03', **kwargs) + return await app.mrequest(0x55AA, 0x0260, 1, 2, seq, b"\x01\x02\x03", **kwargs) @pytest.mark.asyncio diff --git a/tests/test_exception.py b/tests/test_exception.py index e07a7ff..559eab8 100644 --- a/tests/test_exception.py +++ b/tests/test_exception.py @@ -4,6 +4,7 @@ def test_command_error(): - ex = zigpy_deconz.exception.CommandError(mock.sentinel.status, - mock.sentinel.message) + ex = zigpy_deconz.exception.CommandError( + mock.sentinel.status, mock.sentinel.message + ) assert ex.status is mock.sentinel.status diff --git a/tests/test_types.py b/tests/test_types.py index 022c069..03482ed 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -6,54 +6,54 @@ def test_deconz_address_group(): - data = b'\x01\x55\xaa' - extra = b'the rest of the owl' + data = b"\x01\x55\xaa" + extra = b"the rest of the owl" addr, rest = t.DeconzAddress.deserialize(data + extra) assert rest == extra assert addr.address_mode == t.ADDRESS_MODE.GROUP assert addr.address_mode == 1 - assert addr.address == 0xaa55 + assert addr.address == 0xAA55 assert addr.serialize() == data def test_deconz_address_nwk(): - data = b'\x02\x55\xaa' - extra = b'the rest of the owl' + data = b"\x02\x55\xaa" + extra = b"the rest of the owl" addr, rest = t.DeconzAddress.deserialize(data + extra) assert rest == extra assert addr.address_mode == t.ADDRESS_MODE.NWK assert addr.address_mode == 2 - assert addr.address == 0xaa55 + assert addr.address == 0xAA55 assert addr.serialize() == data def test_deconz_address_ieee(): - data = b'\x03\x55\xaa\xbb\xcc\xdd\xee\xef\xbe' - extra = b'the rest of the owl' + data = b"\x03\x55\xaa\xbb\xcc\xdd\xee\xef\xbe" + extra = b"the rest of the owl" addr, rest = t.DeconzAddress.deserialize(data + extra) assert rest == extra assert addr.address_mode == t.ADDRESS_MODE.IEEE assert addr.address_mode == 3 assert addr.address[0] == 0x55 - assert addr.address[1] == 0xaa - assert addr.address[2] == 0xbb - assert addr.address[3] == 0xcc - assert addr.address[4] == 0xdd - assert addr.address[5] == 0xee - assert addr.address[6] == 0xef - assert addr.address[7] == 0xbe + assert addr.address[1] == 0xAA + assert addr.address[2] == 0xBB + assert addr.address[3] == 0xCC + assert addr.address[4] == 0xDD + assert addr.address[5] == 0xEE + assert addr.address[6] == 0xEF + assert addr.address[7] == 0xBE assert addr.serialize() == data def test_deconz_address_nwk_and_ieee(): - data = b'\x04\x55\xaa\x88\x99\xbb\xcc\xdd\xee\xef\xbe' - extra = b'the rest of the owl' + data = b"\x04\x55\xaa\x88\x99\xbb\xcc\xdd\xee\xef\xbe" + extra = b"the rest of the owl" addr, rest = t.DeconzAddress.deserialize(data + extra) assert rest == extra @@ -61,13 +61,13 @@ def test_deconz_address_nwk_and_ieee(): assert addr.address_mode == 4 assert addr.ieee[0] == 0x88 assert addr.ieee[1] == 0x99 - assert addr.ieee[2] == 0xbb - assert addr.ieee[3] == 0xcc - assert addr.ieee[4] == 0xdd - assert addr.ieee[5] == 0xee - assert addr.ieee[6] == 0xef - assert addr.ieee[7] == 0xbe - assert addr.address == 0xaa55 + assert addr.ieee[2] == 0xBB + assert addr.ieee[3] == 0xCC + assert addr.ieee[4] == 0xDD + assert addr.ieee[5] == 0xEE + assert addr.ieee[6] == 0xEF + assert addr.ieee[7] == 0xBE + assert addr.address == 0xAA55 assert addr.serialize() == data @@ -81,8 +81,8 @@ def test_extended_pan_id(): def test_key(): - data = b'\x31\x39\x63\x32\x30\x65\x61\x63\x36\x36\x32\x63\x61\x38\x30\x35' - extra = b'extra data' + data = b"\x31\x39\x63\x32\x30\x65\x61\x63\x36\x36\x32\x63\x61\x38\x30\x35" + extra = b"extra data" key, rest = t.Key.deserialize(data + extra) assert rest == extra @@ -92,18 +92,18 @@ def test_key(): def test_bytes(): - data = b'abcde\x00\xff' + data = b"abcde\x00\xff" r, rest = t.Bytes.deserialize(data) - assert rest == b'' + assert rest == b"" assert r == data assert r.serialize() == data def test_lvbytes(): - data = b'abcde\x00\xff' - extra = b'\xffrest of the data\x00' + data = b"abcde\x00\xff" + extra = b"\xffrest of the data\x00" r, rest = t.LVBytes.deserialize(len(data).to_bytes(2, "little") + data + extra) assert rest == extra @@ -141,8 +141,8 @@ def test_list(): class TestList(t.List): _itemtype = t.uint16_t - r = TestList([1, 2, 3, 0x55aa]) - assert r.serialize() == b'\x01\x00\x02\x00\x03\x00\xaa\x55' + r = TestList([1, 2, 3, 0x55AA]) + assert r.serialize() == b"\x01\x00\x02\x00\x03\x00\xaa\x55" def test_list_deserialize(): @@ -153,11 +153,11 @@ class TestList(t.List): extra = b"\x00\xff" r, rest = TestList.deserialize(data + extra) - assert rest == b'' + assert rest == b"" assert r[0] == 0x1234 - assert r[1] == 0xaa55 - assert r[2] == 0xab89 - assert r[3] == 0xff00 + assert r[1] == 0xAA55 + assert r[2] == 0xAB89 + assert r[3] == 0xFF00 def test_fixed_list(): @@ -166,7 +166,7 @@ class TestList(t.FixedList): _itemtype = t.uint16_t with pytest.raises(AssertionError): - r = TestList([1, 2, 3, 0x55aa]) + r = TestList([1, 2, 3, 0x55AA]) r.serialize() with pytest.raises(AssertionError): @@ -175,7 +175,7 @@ class TestList(t.FixedList): r = TestList([1, 2, 3]) - assert r.serialize() == b'\x01\x00\x02\x00\x03\x00' + assert r.serialize() == b"\x01\x00\x02\x00\x03\x00" def test_fixed_list_deserialize(): @@ -189,13 +189,13 @@ class TestList(t.FixedList): r, rest = TestList.deserialize(data + extra) assert rest == extra assert r[0] == 0x1234 - assert r[1] == 0xaa55 - assert r[2] == 0xab89 + assert r[1] == 0xAA55 + assert r[2] == 0xAB89 def test_eui64(): - r = t.EUI64([0x0f, 0x0e, 0x0d, 0x0c, 0x0b, 0x0a, 0x09, 0x08]) - ieee = '08:09:0a:0b:0c:0d:0e:0f' + r = t.EUI64([0x0F, 0x0E, 0x0D, 0x0C, 0x0B, 0x0A, 0x09, 0x08]) + ieee = "08:09:0a:0b:0c:0d:0e:0f" assert repr(r) == ieee i = {} i[r] = mock.sentinel.data @@ -205,7 +205,7 @@ def test_hexrepr(): class TestHR(t.HexRepr, t.uint16_t): pass - i = TestHR(0xaa55) + i = TestHR(0xAA55) assert repr(i) == "0xaa55" assert str(i) == "0xaa55" @@ -217,8 +217,8 @@ def test_addr_ep_nwk(): r, rest = t.DeconzAddressEndpoint.deserialize(data + extra) assert rest == extra assert r.address_mode == t.ADDRESS_MODE.NWK - assert r.address == 0x55aa - assert r.endpoint == 0xcc + assert r.address == 0x55AA + assert r.endpoint == 0xCC def test_addr_ep_ieee(): @@ -229,49 +229,49 @@ def test_addr_ep_ieee(): assert rest == extra assert r.address_mode == t.ADDRESS_MODE.IEEE assert repr(r.address) == "31:32:33:34:35:36:37:38" - assert r.endpoint == 0xcc + assert r.endpoint == 0xCC def test_deconz_addr_ep(): - data = b'\x01\xaa\x55' - extra = b'the rest of the owl' + data = b"\x01\xaa\x55" + extra = b"the rest of the owl" r, rest = t.DeconzAddressEndpoint.deserialize(data + extra) assert rest == extra assert r.address_mode == t.ADDRESS_MODE.GROUP - assert r.address == 0x55aa + assert r.address == 0x55AA assert r.serialize() == data a = t.DeconzAddressEndpoint() a.address_mode = 1 - a.address = 0x55aa + a.address = 0x55AA assert a.serialize() == data - data = b'\x02\xaa\x55\xcc' + data = b"\x02\xaa\x55\xcc" r, rest = t.DeconzAddressEndpoint.deserialize(data + extra) assert rest == extra assert r.address_mode == t.ADDRESS_MODE.NWK - assert r.address == 0x55aa - assert r.endpoint == 0xcc + assert r.address == 0x55AA + assert r.endpoint == 0xCC assert r.serialize() == data a = t.DeconzAddressEndpoint() a.address_mode = 2 - a.address = 0x55aa + a.address = 0x55AA with pytest.raises(AttributeError): a.serialize() - a.endpoint = 0xcc + a.endpoint = 0xCC assert a.serialize() == data - data = b'\x03\x31\x32\x33\x34\x35\x36\x37\x38\xcc' + data = b"\x03\x31\x32\x33\x34\x35\x36\x37\x38\xcc" r, rest = t.DeconzAddressEndpoint.deserialize(data + extra) assert rest == extra assert r.address_mode == t.ADDRESS_MODE.IEEE assert r.address == [0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38] - assert r.endpoint == 0xcc + assert r.endpoint == 0xCC assert r.serialize() == data a = t.DeconzAddressEndpoint() a.address_mode = 3 a.address = [0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38] with pytest.raises(AttributeError): a.serialize() - a.endpoint = 0xcc + a.endpoint = 0xCC assert a.serialize() == data diff --git a/tests/test_uart.py b/tests/test_uart.py index 44bae50..3fb06cb 100644 --- a/tests/test_uart.py +++ b/tests/test_uart.py @@ -22,13 +22,14 @@ async def mock_conn(loop, protocol_factory, **kwargs): protocol = protocol_factory() loop.call_soon(protocol.connection_made, None) return None, protocol - monkeypatch.setattr(serial_asyncio, 'create_serial_connection', mock_conn) + + monkeypatch.setattr(serial_asyncio, "create_serial_connection", mock_conn) await uart.connect(portmock, 57600, api) def test_send(gw): - data = b'\x00' + data = b"\x00" gw.send(data) assert gw._transport.write.call_count == 1 assert gw._transport.write.called_once_with(data) @@ -40,7 +41,7 @@ def test_close(gw): def test_data_received_chunk_frame(gw): - data = b'\x07\x01\x00\x08\x00\xaa\x00\x02\x44\xFF\xC0' + data = b"\x07\x01\x00\x08\x00\xaa\x00\x02\x44\xFF\xC0" gw.data_received(data[:-4]) assert gw._api.data_received.call_count == 0 gw.data_received(data[-4:]) @@ -49,60 +50,60 @@ def test_data_received_chunk_frame(gw): def test_data_received_full_frame(gw): - data = b'\x07\x01\x00\x08\x00\xaa\x00\x02\x44\xFF\xC0' + data = b"\x07\x01\x00\x08\x00\xaa\x00\x02\x44\xFF\xC0" gw.data_received(data) assert gw._api.data_received.call_count == 1 assert gw._api.data_received.call_args[0][0] == data[:-3] def test_data_received_incomplete_frame(gw): - data = b'~\x00\x00' + data = b"~\x00\x00" gw.data_received(data) assert gw._api.data_received.call_count == 0 def test_data_received_runt_frame(gw): - data = b'\x02\x44\xC0' + data = b"\x02\x44\xC0" gw.data_received(data) assert gw._api.data_received.call_count == 0 def test_data_received_extra(gw): - data = b'\x07\x01\x00\x08\x00\xaa\x00\x02\x44\xFF\xC0\x00' + data = b"\x07\x01\x00\x08\x00\xaa\x00\x02\x44\xFF\xC0\x00" gw.data_received(data) assert gw._api.data_received.call_count == 1 assert gw._api.data_received.call_args[0][0] == data[:-4] - assert gw._buffer == b'\x00' + assert gw._buffer == b"\x00" def test_data_received_wrong_checksum(gw): - data = b'\x07\x01\x00\x08\x00\xaa\x00\x02\x44\xFE\xC0' + data = b"\x07\x01\x00\x08\x00\xaa\x00\x02\x44\xFE\xC0" gw.data_received(data) assert gw._api.data_received.call_count == 0 def test_unescape(gw): - data = b'\x00\xDB\xDC\x00\xDB\xDD\x00\x00\x00' - data_unescaped = b'\x00\xC0\x00\xDB\x00\x00\x00' + data = b"\x00\xDB\xDC\x00\xDB\xDD\x00\x00\x00" + data_unescaped = b"\x00\xC0\x00\xDB\x00\x00\x00" r = gw._unescape(data) assert r == data_unescaped def test_unescape_error(gw): - data = b'\x00\xDB\xDC\x00\xDB\xDD\x00\x00\x00\xDB' + data = b"\x00\xDB\xDC\x00\xDB\xDD\x00\x00\x00\xDB" r = gw._unescape(data) assert r is None def test_escape(gw): - data = b'\x00\xC0\x00\xDB\x00\x00\x00' - data_escaped = b'\x00\xDB\xDC\x00\xDB\xDD\x00\x00\x00' + data = b"\x00\xC0\x00\xDB\x00\x00\x00" + data_escaped = b"\x00\xDB\xDC\x00\xDB\xDD\x00\x00\x00" r = gw._escape(data) assert r == data_escaped def test_checksum(gw): - data = b'\x07\x01\x00\x08\x00\xaa\x00\x02' - checksum = b'\x44\xFF' + data = b"\x07\x01\x00\x08\x00\xaa\x00\x02" + checksum = b"\x44\xFF" r = gw._checksum(data) assert r == checksum diff --git a/tox.ini b/tox.ini index 88b2674..f0c57f7 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ # and then run "tox" from this directory. [tox] -envlist = py35, py36, py37, lint +envlist = py35, py36, py37, lint, black skip_missing_interpreters = True [testenv] @@ -22,6 +22,10 @@ basepython = python3 deps = flake8 commands = flake8 -[flake8] -ignore = E501 -max-complexity = 16 +[testenv:black] +deps=black +setenv = + LC_ALL=C.UTF-8 + LANG=C.UTF-8 +commands= + black --check --fast {toxinidir}/zigpy_deconz {toxinidir}/tests {toxinidir}/setup.py diff --git a/zigpy_deconz/__init__.py b/zigpy_deconz/__init__.py index 3d0c874..be2e9aa 100644 --- a/zigpy_deconz/__init__.py +++ b/zigpy_deconz/__init__.py @@ -1,6 +1,6 @@ # coding: utf-8 MAJOR_VERSION = 0 MINOR_VERSION = 6 -PATCH_VERSION = '0' -__short_version__ = '{}.{}'.format(MAJOR_VERSION, MINOR_VERSION) -__version__ = '{}.{}'.format(__short_version__, PATCH_VERSION) +PATCH_VERSION = "0" +__short_version__ = "{}.{}".format(MAJOR_VERSION, MINOR_VERSION) +__version__ = "{}.{}".format(__short_version__, PATCH_VERSION) diff --git a/zigpy_deconz/api.py b/zigpy_deconz/api.py index ea9ae23..5b3c69d 100644 --- a/zigpy_deconz/api.py +++ b/zigpy_deconz/api.py @@ -30,13 +30,21 @@ class Command(t.uint8_t, enum.Enum): TX_COMMANDS = { - Command.aps_data_confirm: (t.uint16_t, ), + Command.aps_data_confirm: (t.uint16_t,), Command.aps_data_indication: (t.uint16_t, t.uint8_t), Command.aps_data_request: ( - t.uint16_t, t.uint8_t, t.uint8_t, t.DeconzAddressEndpoint, - t.uint16_t, t.uint16_t, t.uint8_t, t.LVBytes, t.uint8_t, t.uint8_t, + t.uint16_t, + t.uint8_t, + t.uint8_t, + t.DeconzAddressEndpoint, + t.uint16_t, + t.uint16_t, + t.uint8_t, + t.LVBytes, + t.uint8_t, + t.uint8_t, ), - Command.change_network_state: (t.uint8_t, ), + Command.change_network_state: (t.uint8_t,), Command.device_state: (t.uint8_t, t.uint8_t, t.uint8_t), Command.read_parameter: (t.uint16_t, t.uint8_t, t.Bytes), Command.version: (), @@ -45,27 +53,55 @@ class Command(t.uint8_t, enum.Enum): RX_COMMANDS = { Command.aps_data_confirm: ( - (t.uint16_t, t.uint8_t, t.uint8_t, t.DeconzAddressEndpoint, - t.uint8_t, t.uint8_t, t.uint8_t, t.uint8_t, t.uint8_t, t.uint8_t), - True), + ( + t.uint16_t, + t.uint8_t, + t.uint8_t, + t.DeconzAddressEndpoint, + t.uint8_t, + t.uint8_t, + t.uint8_t, + t.uint8_t, + t.uint8_t, + t.uint8_t, + ), + True, + ), Command.aps_data_indication: ( - (t.uint16_t, t.uint8_t, t.DeconzAddress, t.uint8_t, t.DeconzAddress, - t.uint8_t, t.uint16_t, t.uint16_t, t.LVBytes, t.uint8_t, - t.uint8_t, t.uint8_t, t.uint8_t, t.uint8_t, t.uint8_t, t.uint8_t, - t.int8s), - True), + ( + t.uint16_t, + t.uint8_t, + t.DeconzAddress, + t.uint8_t, + t.DeconzAddress, + t.uint8_t, + t.uint16_t, + t.uint16_t, + t.LVBytes, + t.uint8_t, + t.uint8_t, + t.uint8_t, + t.uint8_t, + t.uint8_t, + t.uint8_t, + t.uint8_t, + t.int8s, + ), + True, + ), Command.aps_data_request: ((t.uint16_t, t.uint8_t, t.uint8_t), True), - Command.change_network_state: ((t.uint8_t, ), True), + Command.change_network_state: ((t.uint8_t,), True), Command.device_state: ((t.uint8_t, t.uint8_t, t.uint8_t), True), Command.device_state_changed: ((t.uint8_t, t.uint8_t), False), Command.mac_poll: ((t.uint16_t, t.DeconzAddress, t.uint8_t, t.int8s), False), Command.read_parameter: ((t.uint16_t, t.uint8_t, t.Bytes), True), Command.simplified_beacon: ( (t.uint16_t, t.uint16_t, t.uint16_t, t.uint8_t, t.uint8_t, t.uint8_t), - False), - Command.version: ((t.uint32_t, ), True), + False, + ), + Command.version: ((t.uint32_t,), True), Command.write_parameter: ((t.uint16_t, t.uint8_t), True), - Command.zigbee_green_power: ((t.LVBytes, ), False), + Command.zigbee_green_power: ((t.LVBytes,), False), } @@ -88,21 +124,21 @@ class NetworkParameter(t.uint8_t, enum.Enum): NETWORK_PARAMETER_SCHEMA = { - NetworkParameter.mac_address: (t.EUI64, ), - NetworkParameter.nwk_panid: (t.PanId, ), - NetworkParameter.nwk_address: (t.NWK, ), - NetworkParameter.nwk_extended_panid: (t.ExtendedPanId, ), - NetworkParameter.aps_designed_coordinator: (t.uint8_t, ), - NetworkParameter.channel_mask: (t.uint32_t, ), - NetworkParameter.aps_extended_panid: (t.ExtendedPanId, ), - NetworkParameter.trust_center_address: (t.EUI64, ), - NetworkParameter.security_mode: (t.uint8_t, ), - NetworkParameter.network_key: (t.uint8_t, t.Key, ), - NetworkParameter.current_channel: (t.uint8_t, ), - NetworkParameter.permit_join: (t.uint8_t, ), - NetworkParameter.protocol_version: (t.uint16_t, ), - NetworkParameter.nwk_update_id: (t.uint8_t, ), - NetworkParameter.watchdog_ttl: (t.uint32_t, ), + NetworkParameter.mac_address: (t.EUI64,), + NetworkParameter.nwk_panid: (t.PanId,), + NetworkParameter.nwk_address: (t.NWK,), + NetworkParameter.nwk_extended_panid: (t.ExtendedPanId,), + NetworkParameter.aps_designed_coordinator: (t.uint8_t,), + NetworkParameter.channel_mask: (t.uint32_t,), + NetworkParameter.aps_extended_panid: (t.ExtendedPanId,), + NetworkParameter.trust_center_address: (t.EUI64,), + NetworkParameter.security_mode: (t.uint8_t,), + NetworkParameter.network_key: (t.uint8_t, t.Key), + NetworkParameter.current_channel: (t.uint8_t,), + NetworkParameter.permit_join: (t.uint8_t,), + NetworkParameter.protocol_version: (t.uint16_t,), + NetworkParameter.nwk_update_id: (t.uint8_t,), + NetworkParameter.watchdog_ttl: (t.uint32_t,), } @@ -212,11 +248,11 @@ def data_received(self, data): fut = self._awaiting.pop(seq) if status != Status.SUCCESS: fut.set_exception( - CommandError(status, '%s, status: %s' % (command, - status, ))) + CommandError(status, "%s, status: %s" % (command, status)) + ) return fut.set_result(data) - getattr(self, '_handle_%s' % (command.name, ))(data) + getattr(self, "_handle_%s" % (command.name,))(data) def device_state(self): return self._command(Command.device_state, 0, 0, 0) @@ -238,7 +274,7 @@ async def read_parameter(self, id_, *args): else: param = NetworkParameter(id_) except (KeyError, ValueError): - raise KeyError("Unknown parameter id: %s" % (id_, )) + raise KeyError("Unknown parameter id: %s" % (id_,)) data = t.serialize(args, NETWORK_PARAMETER_SCHEMA[param]) r = await self._command(Command.read_parameter, 1 + len(data), param, data) @@ -266,16 +302,17 @@ def _handle_write_parameter(self, data): try: param = NetworkParameter(data[1]) except ValueError: - LOGGER.error("Received unknown network param id '%s' response", - data[1]) + LOGGER.error("Received unknown network param id '%s' response", data[1]) return LOGGER.debug("Write parameter %s: SUCCESS", param.name) async def version(self): - self._proto_ver, = await self[NetworkParameter.protocol_version] + (self._proto_ver,) = await self[NetworkParameter.protocol_version] version = await self._command(Command.version) - if self.protocol_version >= MIN_PROTO_VERSION and \ - (version[0] & 0x0000FF00) == 0x00000500: + if ( + self.protocol_version >= MIN_PROTO_VERSION + and (version[0] & 0x0000FF00) == 0x00000500 + ): self._aps_data_ind_flags = 0x04 return version[0] @@ -288,10 +325,20 @@ def _handle_device_state_changed(self, data): async def _aps_data_indication(self): try: - r = await self._command(Command.aps_data_indication, 1, self._aps_data_ind_flags) - LOGGER.debug(("'aps_data_indication' response from %s, ep: %s, " - "profile: 0x%04x, cluster_id: 0x%04x, data: %s"), - r[4], r[5], r[6], r[7], binascii.hexlify(r[8])) + r = await self._command( + Command.aps_data_indication, 1, self._aps_data_ind_flags + ) + LOGGER.debug( + ( + "'aps_data_indication' response from %s, ep: %s, " + "profile: 0x%04x, cluster_id: 0x%04x, data: %s" + ), + r[4], + r[5], + r[6], + r[7], + binascii.hexlify(r[8]), + ) return r except asyncio.TimeoutError: self._data_indication = False @@ -301,24 +348,38 @@ def _handle_aps_data_indication(self, data): self._data_indication = False self._handle_device_state_value(data[1]) if self._app: - self._app.handle_rx(data[4], # src_addr - data[5], # src_ep - data[3], # dst_ep - data[6], # profile_id - data[7], # cluster_id - data[8], # APS payload - data[11], # lqi - data[16]) # rssi - - async def aps_data_request(self, req_id, dst_addr_ep, profile, cluster, src_ep, aps_payload): + self._app.handle_rx( + data[4], # src_addr + data[5], # src_ep + data[3], # dst_ep + data[6], # profile_id + data[7], # cluster_id + data[8], # APS payload + data[11], # lqi + data[16], + ) # rssi + + async def aps_data_request( + self, req_id, dst_addr_ep, profile, cluster, src_ep, aps_payload + ): dst = dst_addr_ep.serialize() length = len(dst) + len(aps_payload) + 11 delays = (0.5, 1.0, 1.5, None) for delay in delays: try: - return await self._command(Command.aps_data_request, length, - req_id, 0, dst_addr_ep, profile, - cluster, src_ep, aps_payload, 2, 0) + return await self._command( + Command.aps_data_request, + length, + req_id, + 0, + dst_addr_ep, + profile, + cluster, + src_ep, + aps_payload, + 2, + 0, + ) except CommandError as ex: LOGGER.debug("'aps_data_request' failure: %s", ex) if delay is not None and ex.status == Status.BUSY: @@ -334,14 +395,20 @@ def _handle_aps_data_request(self, data): async def _aps_data_confirm(self): try: r = await self._command(Command.aps_data_confirm, 0) - LOGGER.debug(("Request id: 0x%02x 'aps_data_confirm' for %s, " - "status: 0x%02x"), r[2], r[3], r[5]) + LOGGER.debug( + ("Request id: 0x%02x 'aps_data_confirm' for %s, " "status: 0x%02x"), + r[2], + r[3], + r[5], + ) return r except asyncio.TimeoutError: self._data_confirm = False def _handle_aps_data_confirm(self, data): - LOGGER.debug("APS data confirm response for request with id %s: %02x", data[2], data[5]) + LOGGER.debug( + "APS data confirm response for request with id %s: %02x", data[2], data[5] + ) self._data_confirm = False self._handle_device_state_value(data[1]) self._app.handle_tx_confirm(data[2], data[5]) @@ -353,17 +420,26 @@ def _handle_zigbee_green_power(self, data): pass def _handle_simplified_beacon(self, data): - LOGGER.debug(("Received simplified beacon frame: source=0x%04x, " - "pan_id=0x%04x, channel=%s, flags=0x%02x, " - "update_id=0x%02x"), - data[1], data[2], data[3], data[4], data[5]) + LOGGER.debug( + ( + "Received simplified beacon frame: source=0x%04x, " + "pan_id=0x%04x, channel=%s, flags=0x%02x, " + "update_id=0x%02x" + ), + data[1], + data[2], + data[3], + data[4], + data[5], + ) def _handle_device_state_value(self, value): flags = DeviceState.flags(value) ns = NetworkState(value & 0x03) if ns != self.network_state: - LOGGER.debug("Network state transition: %s -> %s", - self.network_state.name, ns.name) + LOGGER.debug( + "Network state transition: %s -> %s", self.network_state.name, ns.name + ) self.network_state = ns if DeviceState.APSDE_DATA_REQUEST not in flags: LOGGER.debug("Data request queue full.") diff --git a/zigpy_deconz/types.py b/zigpy_deconz/types.py index 72f4793..46351f1 100644 --- a/zigpy_deconz/types.py +++ b/zigpy_deconz/types.py @@ -10,7 +10,7 @@ def deserialize(data, schema): def serialize(data, schema): - return b''.join(t(v).serialize() for t, v in zip(schema, data)) + return b"".join(t(v).serialize() for t, v in zip(schema, data)) class Bytes(bytes): @@ -19,7 +19,7 @@ def serialize(self): @classmethod def deserialize(cls, data): - return cls(data), b'' + return cls(data), b"" class LVBytes(bytes): @@ -27,7 +27,7 @@ def serialize(self): return uint16_t(len(self)).serialize() + self @classmethod - def deserialize(cls, data, byteorder='little'): + def deserialize(cls, data, byteorder="little"): length, data = uint16_t.deserialize(data) return cls(data[:length]), data[length:] @@ -36,14 +36,14 @@ class int_t(int): _signed = True _size = 0 - def serialize(self, byteorder='little'): + def serialize(self, byteorder="little"): return self.to_bytes(self._size, byteorder, signed=self._signed) @classmethod - def deserialize(cls, data, byteorder='little'): + def deserialize(cls, data, byteorder="little"): # Work around https://bugs.python.org/issue23640 - r = cls(int.from_bytes(data[:cls._size], byteorder, signed=cls._signed)) - data = data[cls._size:] + r = cls(int.from_bytes(data[: cls._size], byteorder, signed=cls._signed)) + data = data[cls._size :] return r, data @@ -135,7 +135,7 @@ def __init__(self, *args, **kwargs): setattr(self, field[0], getattr(args[0], field[0])) def serialize(self): - r = b'' + r = b"" for field in self._fields: if hasattr(self, field[0]): r += getattr(self, field[0]).serialize() @@ -150,11 +150,11 @@ def deserialize(cls, data): return r, data def __repr__(self): - r = '<%s ' % (self.__class__.__name__, ) - r += ' '.join( - ['%s=%s' % (f[0], getattr(self, f[0], None)) for f in self._fields] + r = "<%s " % (self.__class__.__name__,) + r += " ".join( + ["%s=%s" % (f[0], getattr(self, f[0], None)) for f in self._fields] ) - r += '>' + r += ">" return r @@ -195,7 +195,7 @@ class EUI64(FixedList): _itemtype = uint8_t def __repr__(self): - return ':'.join('%02x' % i for i in self[::-1]) + return ":".join("%02x" % i for i in self[::-1]) def __hash__(self): return hash(repr(self)) @@ -203,10 +203,10 @@ def __hash__(self): class HexRepr: def __repr__(self): - return ('0x{:0' + str(self._size * 2) + 'x}').format(self) + return ("0x{:0" + str(self._size * 2) + "x}").format(self) def __str__(self): - return ('0x{:0' + str(self._size * 2) + 'x}').format(self) + return ("0x{:0" + str(self._size * 2) + "x}").format(self) class GroupId(HexRepr, uint16_t): @@ -228,8 +228,8 @@ class ExtendedPanId(EUI64): class DeconzAddress(Struct): _fields = [ # The address format (AddressMode) - ('address_mode', ADDRESS_MODE), - ('address', EUI64), + ("address_mode", ADDRESS_MODE), + ("address", EUI64), ] @classmethod @@ -237,9 +237,7 @@ def deserialize(cls, data): r = cls() mode, data = ADDRESS_MODE.deserialize(data) r.address_mode = mode - if mode in [ADDRESS_MODE.GROUP, - ADDRESS_MODE.NWK, - ADDRESS_MODE.NWK_AND_IEEE]: + if mode in [ADDRESS_MODE.GROUP, ADDRESS_MODE.NWK, ADDRESS_MODE.NWK_AND_IEEE]: r.address, data = NWK.deserialize(data) elif mode == ADDRESS_MODE.IEEE: r.address, data = EUI64.deserialize(data) @@ -257,9 +255,9 @@ def serialize(self): class DeconzAddressEndpoint(Struct): _fields = [ # The address format (AddressMode) - ('address_mode', ADDRESS_MODE), - ('address', EUI64), - ('endpoint', uint8_t) + ("address_mode", ADDRESS_MODE), + ("address", EUI64), + ("endpoint", uint8_t), ] @classmethod @@ -288,7 +286,7 @@ def serialize(self): r += GroupId(self.address).serialize() elif self.address_mode == ADDRESS_MODE.IEEE: r += EUI64(self.address).serialize() - if self.address_mode in (ADDRESS_MODE.NWK, ADDRESS_MODE.IEEE, ): + if self.address_mode in (ADDRESS_MODE.NWK, ADDRESS_MODE.IEEE): r += uint8_t(self.endpoint).serialize() return r diff --git a/zigpy_deconz/uart.py b/zigpy_deconz/uart.py index d0482da..fbdd9a1 100644 --- a/zigpy_deconz/uart.py +++ b/zigpy_deconz/uart.py @@ -9,13 +9,13 @@ class Gateway(asyncio.Protocol): - END = b'\xC0' - ESC = b'\xDB' - ESC_END = b'\xDC' - ESC_ESC = b'\xDD' + END = b"\xC0" + ESC = b"\xDB" + ESC_END = b"\xDC" + ESC_ESC = b"\xDD" def __init__(self, api, connected_future=None): - self._buffer = b'' + self._buffer = b"" self._connected_future = connected_future self._api = api @@ -45,18 +45,20 @@ def data_received(self, data): return None frame = self._buffer[:end] - self._buffer = self._buffer[(end + 1):] + self._buffer = self._buffer[(end + 1) :] frame = self._unescape(frame) - if (len(frame) < 4): + if len(frame) < 4: continue checksum = frame[-2:] frame = frame[:-2] if self._checksum(frame) != checksum: - LOGGER.warning("Invalid checksum: 0x%s, data: 0x%s", - binascii.hexlify(checksum).decode(), - binascii.hexlify(frame).decode()) + LOGGER.warning( + "Invalid checksum: 0x%s, data: 0x%s", + binascii.hexlify(checksum).decode(), + binascii.hexlify(frame).decode(), + ) continue LOGGER.debug("Frame received: 0x%s", binascii.hexlify(frame).decode()) diff --git a/zigpy_deconz/zigbee/application.py b/zigpy_deconz/zigbee/application.py index 31c6dcf..a96607d 100644 --- a/zigpy_deconz/zigbee/application.py +++ b/zigpy_deconz/zigbee/application.py @@ -44,7 +44,7 @@ async def startup(self, auto_form=False): """Perform a complete application startup""" self.version = await self._api.version() await self._api.device_state() - ieee, = await self._api[NetworkParameter.mac_address] + (ieee,) = await self._api[NetworkParameter.mac_address] self._ieee = zigpy.types.EUI64(ieee) await self._api[NetworkParameter.nwk_panid] await self._api[NetworkParameter.nwk_address] @@ -63,8 +63,7 @@ async def startup(self, auto_form=False): if auto_form: await self.form_network() - self.devices[self.ieee] = await ConBeeDevice.new(self, - self.ieee, self.nwk) + self.devices[self.ieee] = await ConBeeDevice.new(self, self.ieee, self.nwk) async def force_remove(self, dev): """Forcibly remove device from NCP.""" @@ -112,8 +111,12 @@ async def mrequest( has more context to provide a more meaningful error message """ req_id = self.get_sequence() - LOGGER.debug("Sending Zigbee multicast with tsn %s under %s request id, data: %s", - sequence, req_id, binascii.hexlify(data)) + LOGGER.debug( + "Sending Zigbee multicast with tsn %s under %s request id, data: %s", + sequence, + req_id, + binascii.hexlify(data), + ) dst_addr_ep = t.DeconzAddressEndpoint() dst_addr_ep.address_mode = t.ADDRESS_MODE.GROUP dst_addr_ep.address = group_id @@ -121,12 +124,7 @@ async def mrequest( with self._pending.new(req_id) as req: try: await self._api.aps_data_request( - req_id, - dst_addr_ep, - profile, - cluster, - min(1, src_ep), - data + req_id, dst_addr_ep, profile, cluster, min(1, src_ep), data ) except zigpy_deconz.exception.CommandError as ex: return ex.status, "Couldn't enqueue send data request: {}".format(ex) @@ -139,11 +137,25 @@ async def mrequest( return Status.SUCCESS, "message send success" @zigpy.util.retryable_request - async def request(self, device, profile, cluster, src_ep, dst_ep, sequence, data, - expect_reply=True, use_ieee=False): + async def request( + self, + device, + profile, + cluster, + src_ep, + dst_ep, + sequence, + data, + expect_reply=True, + use_ieee=False, + ): req_id = self.get_sequence() - LOGGER.debug("Sending Zigbee request with tsn %s under %s request id, data: %s", - sequence, req_id, binascii.hexlify(data)) + LOGGER.debug( + "Sending Zigbee request with tsn %s under %s request id, data: %s", + sequence, + req_id, + binascii.hexlify(data), + ) dst_addr_ep = t.DeconzAddressEndpoint() dst_addr_ep.endpoint = t.uint8_t(dst_ep) if use_ieee: @@ -156,12 +168,7 @@ async def request(self, device, profile, cluster, src_ep, dst_ep, sequence, data with self._pending.new(req_id) as req: try: await self._api.aps_data_request( - req_id, - dst_addr_ep, - profile, - cluster, - min(1, src_ep), - data + req_id, dst_addr_ep, profile, cluster, min(1, src_ep), data ) except zigpy_deconz.exception.CommandError as ex: return ex.status, "Couldn't enqueue send data request: {}".format(ex) @@ -174,12 +181,25 @@ async def request(self, device, profile, cluster, src_ep, dst_ep, sequence, data return r, "message send success" - async def broadcast(self, profile, cluster, src_ep, dst_ep, grpid, radius, - sequence, data, - broadcast_address=zigpy.types.BroadcastAddress.RX_ON_WHEN_IDLE): + async def broadcast( + self, + profile, + cluster, + src_ep, + dst_ep, + grpid, + radius, + sequence, + data, + broadcast_address=zigpy.types.BroadcastAddress.RX_ON_WHEN_IDLE, + ): req_id = self.get_sequence() - LOGGER.debug("Sending Zigbee broadcast with tsn %s under %s request id, data: %s", - sequence, req_id, binascii.hexlify(data)) + LOGGER.debug( + "Sending Zigbee broadcast with tsn %s under %s request id, data: %s", + sequence, + req_id, + binascii.hexlify(data), + ) dst_addr_ep = t.DeconzAddressEndpoint() dst_addr_ep.address_mode = t.uint8_t(t.ADDRESS_MODE.GROUP.value) dst_addr_ep.address = t.uint16_t(broadcast_address) @@ -187,21 +207,20 @@ async def broadcast(self, profile, cluster, src_ep, dst_ep, grpid, radius, with self._pending.new(req_id) as req: try: await self._api.aps_data_request( - req_id, - dst_addr_ep, - profile, - cluster, - min(1, src_ep), - data + req_id, dst_addr_ep, profile, cluster, min(1, src_ep), data ) except zigpy_deconz.exception.CommandError as ex: - return ex.status, "Couldn't enqueue send data request for broadcast: {}".format(ex) + return ( + ex.status, + "Couldn't enqueue send data request for broadcast: {}".format(ex), + ) r = await asyncio.wait_for(req.result, SEND_CONFIRM_TIMEOUT) if r: - LOGGER.warning("Error while sending %s req id broadcast: 0x%02x", - req_id, r) + LOGGER.warning( + "Error while sending %s req id broadcast: 0x%02x", req_id, r + ) return r, "broadcast send failure" return r, "broadcast send success" @@ -209,7 +228,9 @@ async def permit_ncp(self, time_s=60): assert 0 <= time_s <= 254 await self._api.write_parameter(NetworkParameter.permit_join, time_s) - def handle_rx(self, src_addr, src_ep, dst_ep, profile_id, cluster_id, data, lqi, rssi): + def handle_rx( + self, src_addr, src_ep, dst_ep, profile_id, cluster_id, data, lqi, rssi + ): # intercept ZDO device announce frames if dst_ep == 0 and cluster_id == 0x13: nwk, rest = t.uint16_t.deserialize(data[1:]) @@ -225,7 +246,10 @@ def handle_rx(self, src_addr, src_ep, dst_ep, profile_id, cluster_id, data, lqi, elif src_addr.address_mode == t.ADDRESS_MODE.IEEE.value: device = self.get_device(ieee=src_addr.address) else: - raise Exception("Unsupported address mode in handle_rx: %s" % (src_addr.address_mode)) + raise Exception( + "Unsupported address mode in handle_rx: %s" + % (src_addr.address_mode) + ) except KeyError: LOGGER.debug("Received frame from unknown device: 0x%04x", src_addr.address) return @@ -238,16 +262,22 @@ def handle_tx_confirm(self, req_id, status): self._pending[req_id].result.set_result(status) return except KeyError as exc: - LOGGER.warning("Unexpected transmit confirm for request id %s, Status: 0x%02x, %s", req_id, status, exc) + LOGGER.warning( + "Unexpected transmit confirm for request id %s, Status: 0x%02x, %s", + req_id, + status, + exc, + ) except asyncio.futures.InvalidStateError as exc: - LOGGER.debug("Invalid state on future - probably duplicate response: %s", exc) + LOGGER.debug( + "Invalid state on future - probably duplicate response: %s", exc + ) class ConBeeDevice(zigpy.device.Device): """Zigpy Device representing Coordinator.""" - async def add_to_group(self, grp_id: int, - name: str = None) -> None: + async def add_to_group(self, grp_id: int, name: str = None) -> None: group = self.application.groups.add_group(grp_id, name) for epid in self.endpoints: @@ -269,7 +299,7 @@ def manufacturer(self): @property def model(self): - return 'ConBee' + return "ConBee" @classmethod async def new(cls, application, ieee, nwk):