forked from beveradb/pysonofflan
/
discover.py
78 lines (60 loc) · 2 KB
/
discover.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import logging
import time
from typing import Dict
from datetime import datetime
from zeroconf import ServiceBrowser, Zeroconf
class Discover:
@staticmethod
async def discover(logger=None, seconds_to_wait=None) -> Dict[str, str]:
"""
:rtype: dict
:return: Array of devices {"device_id", "ip:port"}
"""
if logger is None:
print("new logger!")
logger = logging.getLogger(__name__)
logger.debug("Looking for all eWeLink devices on local network.")
zeroconf = Zeroconf()
listener = MyListener()
listener.logger = logger
ServiceBrowser(zeroconf, "_ewelink._tcp.local.", listener)
if seconds_to_wait is None:
time.sleep(4)
else:
time.sleep(seconds_to_wait)
zeroconf.close()
return listener.devices
class MyListener:
def __init__(self):
self.devices = {}
def remove_service(self, zeroconf, type, name):
print("%s - Service %s removed" % (datetime.now(), name))
def add_service(self, zeroconf, type, name):
self.logger.debug("%s - Service %s added" % (datetime.now(), name))
info = zeroconf.get_service_info(type, name)
self.logger.debug(info)
device = info.properties[b"id"].decode("ascii")
ip = self.parseAddress(info.address) + ":" + str(info.port)
self.logger.info(
"Found Sonoff LAN Mode device %s at socket %s" % (device, ip)
)
self.devices[device] = ip
def parseAddress(self, address):
"""
Resolve the IP address of the device
:param address:
:return: add_str
"""
add_list = []
for i in range(4):
add_list.append(int(address.hex()[(i * 2): (i + 1) * 2], 16))
add_str = (
str(add_list[0])
+ "."
+ str(add_list[1])
+ "."
+ str(add_list[2])
+ "."
+ str(add_list[3])
)
return add_str