-
Notifications
You must be signed in to change notification settings - Fork 18
/
application.py
346 lines (288 loc) · 12.1 KB
/
application.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
from __future__ import annotations
import asyncio
import importlib.metadata
import logging
from typing import Any
import zigpy.application
import zigpy.config
import zigpy.device
import zigpy.exceptions
import zigpy.types
import zigpy.util
import zigpy.zdo
from zigpy_zigate import common as c, types as t
from zigpy_zigate.api import PDM_EVENT, NoResponseError, ResponseId, ZiGate
from zigpy_zigate.config import (
CONF_DEVICE,
CONF_DEVICE_PATH,
CONFIG_SCHEMA,
SCHEMA_DEVICE,
)
LOGGER = logging.getLogger(__name__)
class ControllerApplication(zigpy.application.ControllerApplication):
SCHEMA = CONFIG_SCHEMA
SCHEMA_DEVICE = SCHEMA_DEVICE
probe = ZiGate.probe
def __init__(self, config: dict[str, Any]):
super().__init__(zigpy.config.ZIGPY_SCHEMA(config))
self._api: ZiGate | None = None
self._pending = {}
self._pending_join = []
self.version: str = ""
async def connect(self):
api = await ZiGate.new(self._config[CONF_DEVICE], self)
await api.set_raw_mode()
await api.set_time()
(_, version), lqi = await api.version()
major, minor = version.to_bytes(2, "big")
self.version = f"{major:x}.{minor:x}"
self._api = api
if self.version < "3.21":
LOGGER.error(
"Old ZiGate firmware detected, you should upgrade to 3.21 or newer"
)
async def disconnect(self):
# TODO: how do you stop the network? Is it possible?
if self._api is not None:
try:
await self._api.reset(wait=False)
except Exception as e:
LOGGER.warning("Failed to reset before disconnect: %s", e)
finally:
self._api.close()
self._api = None
async def start_network(self):
# TODO: how do you start the network? Is it always automatically started?
dev = ZiGateDevice(self, self.state.node_info.ieee, self.state.node_info.nwk)
self.devices[dev.ieee] = dev
await dev.schedule_initialize()
async def load_network_info(self, *, load_devices: bool = False):
network_state, lqi = await self._api.get_network_state()
if not network_state or network_state[3] == 0 or network_state[0] == 0xFFFF:
raise zigpy.exceptions.NetworkNotFormed()
self.state.node_info = zigpy.state.NodeInfo(
nwk=zigpy.types.NWK(network_state[0]),
ieee=zigpy.types.EUI64(network_state[1]),
logical_type=zigpy.zdo.types.LogicalType.Coordinator,
)
epid, _ = zigpy.types.ExtendedPanId.deserialize(
zigpy.types.uint64_t(network_state[3]).serialize()
)
self.state.network_info = zigpy.state.NetworkInfo(
source=f"zigpy-zigate@{importlib.metadata.version('zigpy-zigate')}",
extended_pan_id=epid,
pan_id=zigpy.types.PanId(network_state[2]),
nwk_update_id=0,
nwk_manager_id=zigpy.types.NWK(0x0000),
channel=network_state[4],
channel_mask=zigpy.types.Channels.from_channel_list([network_state[4]]),
security_level=5,
# TODO: is it possible to read keys?
# network_key=zigpy.state.Key(),
# tc_link_key=zigpy.state.Key(),
children=[],
key_table=[],
nwk_addresses={},
stack_specific={},
metadata={
"zigate": {
"version": self.version,
}
},
)
self.state.network_info.tc_link_key.partner_ieee = self.state.node_info.ieee
if not load_devices:
return
for device in await self._api.get_devices_list():
if device.power_source != 0: # only battery-powered devices
continue
ieee = zigpy.types.EUI64(device.ieee_addr)
self.state.network_info.children.append(ieee)
self.state.network_info.nwk_addresses[ieee] = zigpy.types.NWK(
device.short_addr
)
async def reset_network_info(self):
await self._api.erase_persistent_data()
async def write_network_info(self, *, network_info, node_info):
LOGGER.warning("Setting the pan_id is not supported by ZiGate")
await self.reset_network_info()
await self._api.set_channel(network_info.channel)
epid, _ = zigpy.types.uint64_t.deserialize(
network_info.extended_pan_id.serialize()
)
await self._api.set_extended_panid(epid)
network_formed, lqi = await self._api.start_network()
if network_formed[0] not in (
t.Status.Success,
t.Status.IncorrectParams,
t.Status.Busy,
):
raise zigpy.exceptions.FormationFailure(
f"Unexpected error starting network: {network_formed!r}"
)
LOGGER.warning("Starting network got status %s, wait...", network_formed[0])
for attempt in range(3):
await asyncio.sleep(1)
try:
await self.load_network_info()
except zigpy.exceptions.NetworkNotFormed as e:
if attempt == 2:
raise zigpy.exceptions.FormationFailure() from e
async def permit_with_link_key(self, node, link_key, time_s=60):
LOGGER.warning("ZiGate does not support joins with link keys")
async def _move_network_to_channel(
self, new_channel: int, *, new_nwk_update_id: int
) -> None:
"""Moves the network to a new channel."""
await self._api.set_channel(new_channel)
async def energy_scan(
self, channels: zigpy.types.Channels, duration_exp: int, count: int
) -> dict[int, float]:
"""Runs an energy detection scan and returns the per-channel scan results."""
LOGGER.warning("Coordinator does not support energy scanning")
return {c: 0 for c in channels}
async def force_remove(self, dev):
await self._api.remove_device(self.state.node_info.ieee, dev.ieee)
async def add_endpoint(self, descriptor):
# ZiGate does not support adding new endpoints
pass
def zigate_callback_handler(self, msg, response, lqi):
LOGGER.debug("zigate_callback_handler %s %s", msg, response)
if msg == ResponseId.LEAVE_INDICATION:
nwk = 0
ieee = zigpy.types.EUI64(response[0])
self.handle_leave(nwk, ieee)
elif msg == ResponseId.DEVICE_ANNOUNCE:
nwk = response[0]
ieee = zigpy.types.EUI64(response[1])
parent_nwk = 0
self.handle_join(nwk, ieee, parent_nwk)
# Temporary disable two stages pairing due to firmware bug
# rejoin = response[3]
# if nwk in self._pending_join or rejoin:
# LOGGER.debug('Finish pairing {} (2nd device announce)'.format(nwk))
# if nwk in self._pending_join:
# self._pending_join.remove(nwk)
# self.handle_join(nwk, ieee, parent_nwk)
# else:
# LOGGER.debug('Start pairing {} (1st device announce)'.format(nwk))
# self._pending_join.append(nwk)
elif msg == ResponseId.DATA_INDICATION:
(
status,
profile_id,
cluster_id,
src_ep,
dst_ep,
src,
dst,
payload,
) = response
packet = zigpy.types.ZigbeePacket(
src=src.to_zigpy_type()[0],
src_ep=src_ep,
dst=dst.to_zigpy_type()[0],
dst_ep=dst_ep,
profile_id=profile_id,
cluster_id=cluster_id,
data=zigpy.types.SerializableBytes(payload),
lqi=lqi,
rssi=None,
)
self.packet_received(packet)
elif msg == ResponseId.ACK_DATA:
LOGGER.debug("ACK Data received %s %s", response[4], response[0])
# disabled because of https://github.com/fairecasoimeme/ZiGate/issues/324
# self._handle_frame_failure(response[4], response[0])
elif msg == ResponseId.APS_DATA_CONFIRM:
LOGGER.debug(
"ZPS Event APS data confirm, message routed to %s %s",
response[3],
response[0],
)
elif msg == ResponseId.PDM_EVENT:
try:
event = PDM_EVENT(response[0]).name
except ValueError:
event = "Unknown event"
LOGGER.debug("PDM Event %s %s, record %s", response[0], event, response[1])
elif msg == ResponseId.APS_DATA_CONFIRM_FAILED:
LOGGER.debug("APS Data confirm Fail %s %s", response[4], response[0])
self._handle_frame_failure(response[4], response[0])
elif msg == ResponseId.EXTENDED_ERROR:
LOGGER.warning("Extended error code %s", response[0])
def _handle_frame_failure(self, message_tag, status):
try:
send_fut = self._pending.pop(message_tag)
send_fut.set_result(status)
except KeyError:
LOGGER.warning("Unexpected message send failure")
except asyncio.futures.InvalidStateError as exc:
LOGGER.debug(
"Invalid state on future - probably duplicate response: %s", exc
)
async def send_packet(self, packet):
LOGGER.debug("Sending packet %r", packet)
# Firmwares 3.1d and below allow a couple of _NO_ACK packets to send but all
# subsequent ones will fail. ACKs must be enabled.
ack = (
zigpy.types.TransmitOptions.ACK in packet.tx_options
or self.version <= "3.1d"
)
try:
(status, tsn, packet_type, _), _ = await self._api.raw_aps_data_request(
addr=packet.dst.address,
src_ep=(
1 if packet.dst_ep is None or packet.dst_ep > 0 else 0
), # ZiGate only support endpoint 1
dst_ep=packet.dst_ep or 0,
profile=packet.profile_id,
cluster=packet.cluster_id,
payload=packet.data.serialize(),
addr_mode=t.ZIGPY_TO_ZIGATE_ADDR_MODE[packet.dst.addr_mode, ack],
radius=packet.radius,
)
except NoResponseError:
raise zigpy.exceptions.DeliveryError("ZiGate did not respond to command")
self._pending[tsn] = asyncio.get_running_loop().create_future()
if status != t.Status.Success:
self._pending.pop(tsn)
# Firmwares 3.1d and below fail to send packets on every request
if status == t.Status.InvalidParameter and self.version <= "3.1d":
pass
else:
raise zigpy.exceptions.DeliveryError(
f"Failed to send packet: {status!r}", status=status
)
# disabled because of https://github.com/fairecasoimeme/ZiGate/issues/324
# try:
# v = await asyncio.wait_for(send_fut, 120)
# except asyncio.TimeoutError:
# return 1, "timeout waiting for message %s send ACK" % (sequence, )
# finally:
# self._pending.pop(tsn)
# return v, "Message sent"
async def permit_ncp(self, time_s=60):
assert 0 <= time_s <= 254
status, lqi = await self._api.permit_join(time_s)
if status[0] != t.Status.Success:
await self._api.reset()
class ZiGateDevice(zigpy.device.Device):
def __init__(self, application, ieee, nwk):
"""Initialize instance."""
super().__init__(application, ieee, nwk)
port = application._config[CONF_DEVICE][CONF_DEVICE_PATH]
model = "ZiGate USB-TTL"
if c.is_zigate_wifi(port):
model = "ZiGate WiFi"
elif c.is_pizigate(port):
model = "PiZiGate"
elif c.is_zigate_din(port):
model = "ZiGate USB-DIN"
self._model = f"{model} {application.version}"
@property
def manufacturer(self):
return "ZiGate"
@property
def model(self):
return self._model