/
bleson.py
126 lines (103 loc) · 3.66 KB
/
bleson.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import sys
import logging
from multiprocessing import Manager, Process
from queue import Queue
import time
from bleson import get_provider, Observer
from ruuvitag_sensor.adapters import BleCommunication
log = logging.getLogger(__name__)
class BleCommunicationBleson(BleCommunication):
'''Bluetooth LE communication with Bleson'''
@staticmethod
def _run_get_data_background(queue, shared_data, bt_device):
(observer, q) = BleCommunicationBleson.start(bt_device)
for advertisement in BleCommunicationBleson.get_lines(q):
log.debug('Data: %s', advertisement)
if shared_data['stop']:
break
try:
# macOS doesn't return address on advertised package
mac = advertisement.address.address if advertisement.address is not None else None
if mac and mac in shared_data['blacklist']:
log.debug('MAC blacklised: %s', mac)
continue
if advertisement.mfg_data is None:
continue
# Linux returns bytearray for mfg_data, but macOS returns _NSInlineData
# which casts to byte array. We need to explicitly cast it to use hex
data = bytearray(advertisement.mfg_data) if sys.platform.startswith('darwin') \
else advertisement.mfg_data
queue.put((mac, data.hex()))
except GeneratorExit:
break
except:
log.exception('Error in advertisement handling')
continue
BleCommunicationBleson.stop(observer)
@staticmethod
def start(bt_device=''):
'''
Attributes:
device (string): BLE device (default 0)
'''
if not bt_device:
bt_device = 0
else:
# Old communication used hci0 etc.
bt_device = bt_device.replace('hci', '')
log.info('Start receiving broadcasts (device %s)', bt_device)
q = Queue()
adapter = get_provider().get_adapter(int(bt_device))
observer = Observer(adapter)
observer.on_advertising_data = q.put
observer.start()
return (observer, q)
@staticmethod
def stop(observer):
observer.stop()
@staticmethod
def get_lines(queue):
try:
while True:
next_item = queue.get(True, None)
yield next_item
except KeyboardInterrupt as ex:
return
except Exception as ex:
log.info(ex)
return
@staticmethod
def get_datas(blacklist=[], bt_device=''):
m = Manager()
q = m.Queue()
# Use Manager dict to share data between processes
shared_data = m.dict()
shared_data['blacklist'] = blacklist
shared_data['stop'] = False
# Start background process
proc = Process(
target=BleCommunicationBleson._run_get_data_background,
args=[q, shared_data, bt_device])
proc.start()
try:
while True:
while not q.empty():
data = q.get()
yield data
time.sleep(0.1)
except GeneratorExit:
pass
shared_data['stop'] = True
proc.join()
return
@staticmethod
def get_data(mac, bt_device=''):
data = None
data_iter = BleCommunicationBleson.get_datas([], bt_device)
for data in data_iter:
if mac == data[0]:
log.info('Data found')
data_iter.send(StopIteration)
data = data[1]
break
return data