-
Notifications
You must be signed in to change notification settings - Fork 0
/
device_manager.py
77 lines (62 loc) · 2.84 KB
/
device_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import asyncio
from typing import Optional
from bleak import BleakScanner
from loguru import logger
from service_manager import ServiceManager
class DeviceManager:
def __init__(self, labels_by_address: Optional[dict[str, dict[str, str]]] = None):
self._service_managers: dict[str, ServiceManager] = {}
self.device_labels = labels_by_address or {}
self.lock = asyncio.Lock()
def _get_or_create_manager(self, address: str) -> ServiceManager:
manager = self._service_managers.get(address)
labels_by_address = self.device_labels.get(address, {})
if manager is None:
manager = ServiceManager(address, labels={**labels_by_address, 'device': address})
logger.info(f'Created manager for {address}')
self._service_managers[address] = manager
return manager
def get_expanders(self):
for service_manager_address, service_manager in self._service_managers.items():
spi_expander = service_manager.get_expander()
if spi_expander is not None:
yield service_manager_address, service_manager, spi_expander
def remove_manager(self, address: str):
self._service_managers.pop(address, None)
async def discover(self):
while True:
devices = await BleakScanner.discover()
logger.info(f'Discovered {len(devices)} devices')
for device in devices:
# if device.address.lower() != 'FA:6F:EC:EE:4B:36'.lower():
# continue
if device.name != 'Sensor Hub BLE':
continue
if device.address in self._service_managers:
continue
await self.subscribe(device.address)
async def subscribe(self, device_address: str):
async with self.lock:
try:
manager = self._get_or_create_manager(device_address)
except Exception as e:
logger.exception('Failed to initialize manager: {}', e)
self.remove_manager(device_address)
return
try:
await manager.subscribe_all()
except Exception as e:
logger.exception('Failed to subscribe: {}', e)
self.remove_manager(device_address)
return
async def task():
try:
logger.info('Waiting for device {} to disconnect', device_address)
await manager.block()
except Exception as ex:
logger.exception('Exception: {}', ex, exc_info=True)
logger.error('Disconnected from {}', device_address)
self.remove_manager(device_address)
async with self.lock:
manager.reset_service_metrics()
asyncio.create_task(task())