/
cli.py
72 lines (49 loc) · 1.75 KB
/
cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import asyncio
import logging
from typing import List, Tuple
import click
from victron_ble.scanner import DebugScanner, DiscoveryScanner, Scanner
logger = logging.getLogger("victron_ble")
logging.basicConfig()
class DeviceKeyParam(click.ParamType):
name = "device_key"
def convert(self, value, param, ctx):
if isinstance(value, str):
parts = value.split("@")
if len(parts) == 2:
addr = parts[0].strip()
key = parts[1].strip()
return (addr, key)
self.fail(f"{value} is not a valid <addr>@<key> pair", param, ctx)
@click.group()
@click.option("-v", "--verbose", is_flag=True, help="Increase logging output")
def cli(verbose):
logger.setLevel(logging.DEBUG if verbose else logging.INFO)
@cli.command(help="Discover Victron devices with Instant Readout")
def discover():
loop = asyncio.get_event_loop()
async def scan():
scanner = DiscoveryScanner()
await scanner.start()
asyncio.ensure_future(scan())
loop.run_forever()
@cli.command(help="Dump all advertisements matching the given device ID")
@click.argument("id", type=str)
def dump(id: str):
loop = asyncio.get_event_loop()
async def scan():
scanner = DebugScanner(id)
await scanner.start()
asyncio.ensure_future(scan())
loop.run_forever()
@cli.command(help="Read data from specified devices")
@click.argument("device_keys", nargs=-1, type=DeviceKeyParam())
def read(device_keys: List[Tuple[str, str]]):
loop = asyncio.get_event_loop()
async def scan(keys):
scanner = Scanner(keys)
await scanner.start()
asyncio.ensure_future(scan({k: v for k, v in device_keys}))
loop.run_forever()
if __name__ == "__main__":
cli()