-
-
Notifications
You must be signed in to change notification settings - Fork 187
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Simplify discovery query, refactor dump-devinfo (#147)
* Use only system:get_sysinfo for discovery * move dump-devinfo into a separate tool under devtools * Fix linting
- Loading branch information
Showing
3 changed files
with
144 additions
and
39 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
"""This script generates devinfo files for the test suite. | ||
If you have new, yet unsupported device or a device with no devinfo file under kasa/tests/fixtures, | ||
feel free to run this script and create a PR to add the file to the repository. | ||
Executing this script will several modules and methods one by one, | ||
and finally execute a query to query all of them at once. | ||
""" | ||
import asyncio | ||
import collections.abc | ||
import json | ||
import logging | ||
import re | ||
from collections import defaultdict, namedtuple | ||
from pprint import pprint | ||
|
||
import click | ||
|
||
from kasa import TPLinkSmartHomeProtocol | ||
|
||
Call = namedtuple("Call", "module method") | ||
|
||
|
||
def scrub(res): | ||
"""Remove identifiers from the given dict.""" | ||
keys_to_scrub = [ | ||
"deviceId", | ||
"fwId", | ||
"hwId", | ||
"oemId", | ||
"mac", | ||
"mic_mac", | ||
"latitude_i", | ||
"longitude_i", | ||
"latitude", | ||
"longitude", | ||
] | ||
|
||
for k, v in res.items(): | ||
if isinstance(v, collections.abc.Mapping): | ||
res[k] = scrub(res.get(k)) | ||
else: | ||
if k in keys_to_scrub: | ||
if k in ["latitude_i", "longitude_i"]: | ||
v = 0 | ||
else: | ||
v = re.sub(r"\w", "0", v) | ||
|
||
res[k] = v | ||
return res | ||
|
||
|
||
def default_to_regular(d): | ||
"""Convert nested defaultdicts to regular ones. | ||
From https://stackoverflow.com/a/26496899 | ||
""" | ||
if isinstance(d, defaultdict): | ||
d = {k: default_to_regular(v) for k, v in d.items()} | ||
return d | ||
|
||
|
||
@click.command() | ||
@click.argument("host") | ||
@click.option("--debug") | ||
def cli(host, debug): | ||
"""Generate devinfo file for given device.""" | ||
if debug: | ||
logging.basicConfig(level=logging.DEBUG) | ||
|
||
items = [ | ||
Call(module="system", method="get_sysinfo"), | ||
Call(module="emeter", method="get_realtime"), | ||
Call(module="smartlife.iot.dimmer", method="get_dimmer_parameters"), | ||
Call(module="smartlife.iot.common.emeter", method="get_realtime"), | ||
Call( | ||
module="smartlife.iot.smartbulb.lightingservice", method="get_light_state" | ||
), | ||
] | ||
|
||
protocol = TPLinkSmartHomeProtocol() | ||
|
||
successes = [] | ||
|
||
for test_call in items: | ||
try: | ||
click.echo(f"Testing {test_call}..", nl=False) | ||
info = asyncio.run( | ||
protocol.query(host, {test_call.module: {test_call.method: None}}) | ||
) | ||
resp = info[test_call.module] | ||
except Exception as ex: | ||
click.echo(click.style(f"FAIL {ex}", fg="red")) | ||
else: | ||
if "err_msg" in resp: | ||
click.echo(click.style(f"FAIL {resp['err_msg']}", fg="red")) | ||
else: | ||
click.echo(click.style("OK", fg="green")) | ||
successes.append((test_call, info)) | ||
|
||
final_query = defaultdict(defaultdict) | ||
final = defaultdict(defaultdict) | ||
|
||
for succ, resp in successes: | ||
final_query[succ.module][succ.method] = None | ||
final[succ.module][succ.method] = resp | ||
|
||
final = default_to_regular(final) | ||
|
||
try: | ||
final = asyncio.run(protocol.query(host, final_query)) | ||
except Exception as ex: | ||
click.echo( | ||
click.style( | ||
f"Unable to query all successes at once: {ex}", bold=True, fg="red" | ||
) | ||
) | ||
|
||
click.echo("Got %s successes" % len(successes)) | ||
click.echo(click.style("## device info file ##", bold=True)) | ||
|
||
sysinfo = final["system"]["get_sysinfo"] | ||
model = sysinfo["model"] | ||
hw_version = sysinfo["hw_ver"] | ||
sw_version = sysinfo["sw_ver"] | ||
sw_version = sw_version.split(" ", maxsplit=1)[0] | ||
save_to = f"{model}_{hw_version}_{sw_version}.json" | ||
pprint(scrub(final)) | ||
save = click.prompt(f"Do you want to save the above content to {save_to} (y/n)") | ||
if save == "y": | ||
click.echo(f"Saving info to {save_to}") | ||
|
||
with open(save_to, "w") as f: | ||
json.dump(final, f, sort_keys=True, indent=4) | ||
f.write("\n") | ||
else: | ||
click.echo("Not saving.") | ||
|
||
|
||
if __name__ == "__main__": | ||
cli() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters