-
Notifications
You must be signed in to change notification settings - Fork 11
/
radio.py
239 lines (189 loc) · 7.19 KB
/
radio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
from __future__ import annotations
import asyncio
import collections
import importlib
import importlib.util
import itertools
import json
import logging
import click
import zigpy.state
import zigpy.types
import zigpy.zdo
import zigpy.zdo.types
from zigpy_cli.cli import cli, click_coroutine
from zigpy_cli.const import RADIO_LOGGING_CONFIGS, RADIO_TO_PACKAGE, RADIO_TO_PYPI
LOGGER = logging.getLogger(__name__)
@cli.group()
@click.pass_context
@click.argument("radio", type=click.Choice(list(RADIO_TO_PACKAGE.keys())))
@click.argument("port", type=str)
@click.option("--baudrate", type=int, default=None)
@click.option("--database", type=str, default=None)
@click_coroutine
async def radio(ctx, radio, port, baudrate=None, database=None):
# Setup logging for the radio
verbose = ctx.parent.params["verbose"]
logging_configs = RADIO_LOGGING_CONFIGS[radio]
logging_config = logging_configs[min(verbose, len(logging_configs) - 1)]
for logger, level in logging_config.items():
logging.getLogger(logger).setLevel(level)
module = RADIO_TO_PACKAGE[radio] + ".zigbee.application"
# Catching just `ImportError` masks dependency errors and is annoying
if importlib.util.find_spec(module) is None:
raise click.ClickException(
f"Radio module for {radio!r} is not installed."
f" Install it with `pip install {RADIO_TO_PYPI[radio]}`."
)
# Import the radio library
radio_module = importlib.import_module(module)
# Start the radio
app_cls = radio_module.ControllerApplication
config = app_cls.SCHEMA(
{
"device": {"path": port},
"backup_enabled": False,
"startup_energy_scan": False,
"database_path": database,
"use_thread": False,
}
)
if baudrate is not None:
config["device"]["baudrate"] = baudrate
app = app_cls(config)
ctx.obj = app
ctx.call_on_close(radio_cleanup)
@click.pass_obj
@click_coroutine
async def radio_cleanup(app):
try:
await app.shutdown()
except RuntimeError:
LOGGER.warning("Caught an exception when shutting down app", exc_info=True)
@radio.command()
@click.pass_obj
@click_coroutine
async def info(app):
await app.connect()
await app.load_network_info(load_devices=False)
print(f"PAN ID: 0x{app.state.network_info.pan_id:04X}")
print(f"Extended PAN ID: {app.state.network_info.extended_pan_id}")
print(f"Channel: {app.state.network_info.channel}")
print(f"Channel mask: {list(app.state.network_info.channel_mask)}")
print(f"NWK update ID: {app.state.network_info.nwk_update_id}")
print(f"Device IEEE: {app.state.node_info.ieee}")
print(f"Device NWK: 0x{app.state.node_info.nwk:04X}")
print(f"Network key: {app.state.network_info.network_key.key}")
print(f"Network key sequence: {app.state.network_info.network_key.seq}")
print(f"Network key counter: {app.state.network_info.network_key.tx_counter}")
@radio.command()
@click.option("-z", "--zigpy-format", is_flag=True, type=bool, default=False)
@click.option(
"--i-understand-i-can-update-eui64-only-once-and-i-still-want-to-do-it",
is_flag=True,
type=bool,
default=False,
)
@click.argument("output", type=click.File("w"), default="-")
@click.pass_obj
@click_coroutine
async def backup(
app,
zigpy_format,
i_understand_i_can_update_eui64_only_once_and_i_still_want_to_do_it,
output,
):
await app.connect()
backup = await app.backups.create_backup(load_devices=True)
if i_understand_i_can_update_eui64_only_once_and_i_still_want_to_do_it:
backup.network_info.stack_specific.setdefault("ezsp", {})[
"i_understand_i_can_update_eui64_only_once_and_i_still_want_to_do_it"
] = True
if zigpy_format:
obj = backup.as_dict()
else:
obj = backup.as_open_coordinator_json()
output.write(json.dumps(obj, indent=4) + "\n")
@radio.command()
@click.argument("input", type=click.File("r"))
@click.option("-c", "--frame-counter-increment", type=int, default=5000)
@click.pass_obj
@click_coroutine
async def restore(app, frame_counter_increment, input):
obj = json.load(input)
backup = zigpy.backups.NetworkBackup.from_dict(obj)
await app.connect()
await app.backups.restore_backup(backup, counter_increment=frame_counter_increment)
@radio.command()
@click.pass_obj
@click_coroutine
async def form(app):
await app.connect()
await app.form_network()
@radio.command()
@click.pass_obj
@click_coroutine
async def reset(app):
await app.connect()
await app.reset_network_info()
@radio.command()
@click.pass_obj
@click.option("-t", "--join-time", type=int, default=250)
@click_coroutine
async def permit(app, join_time):
await app.startup(auto_form=True)
await app.permit(join_time)
await asyncio.sleep(join_time)
@radio.command()
@click.pass_obj
@click.option("-n", "--num-scans", type=int, default=-1)
@click_coroutine
async def energy_scan(app, num_scans):
await app.startup()
LOGGER.info("Running scan...")
# We compute an average over the last 5 scans
channel_energies = collections.defaultdict(lambda: collections.deque([], maxlen=5))
for scan in itertools.count():
if num_scans != -1 and scan > num_scans:
break
results = await app.energy_scan(
channels=zigpy.types.Channels.ALL_CHANNELS, duration_exp=2, count=1
)
for channel, energy in results.items():
energies = channel_energies[channel]
energies.append(energy)
total = 0xFF * len(energies)
print(f"Channel energy (mean of {len(energies)} / {energies.maxlen}):")
print("------------------------------------------------")
print(" ! Different radios compute channel energy differently")
print()
print(" + Lower energy is better")
print(" + Active Zigbee networks on a channel may still cause congestion")
print(" + TX on 26 in North America may be with lower power due to regulations")
print(" + Zigbee channels 15, 20, 25 fall between WiFi channels 1, 6, 11")
print(" + Some Zigbee devices only join networks on channels 15, 20, and 25")
print(" + Current channel is enclosed in [square brackets]")
print("------------------------------------------------")
for channel, energies in channel_energies.items():
count = sum(energies)
asterisk = "*" if channel == 26 else " "
if channel == app.state.network_info.channel:
bracket_open = "["
bracket_close = "]"
else:
bracket_open = " "
bracket_close = " "
print(
f" - {bracket_open}{channel:>02}{asterisk}{bracket_close}"
+ f" {count / total:>7.2%} "
+ "#" * int(100 * count / total)
)
print()
@radio.command()
@click.pass_obj
@click.option("-c", "--channel", type=int)
@click_coroutine
async def change_channel(app, channel):
await app.startup()
LOGGER.info("Current channel is %s", app.state.network_info.channel)
await app.move_network_to_channel(channel)