diff --git a/README.md b/README.md index 084185f..ab23053 100644 --- a/README.md +++ b/README.md @@ -434,6 +434,35 @@ Install from pypi $ python -m pip install ruuvitag-sensor==1.2.1 ``` +## Bluegiga + +Use Bluegiga's BGAPI, which is compatible with USB adapters like the BLED112. Bluegiga should work with Linux, macOS and Windows. + +Requires pygatt and pexpect, which are not installed automatically with `ruuvitag_sensor` package. You can install those manually e.g. via pip. + +```sh +$ pip install pygatt pexpect +``` + +Add environment variable `RUUVI_BLE_ADAPTER` with value `Bluegiga`. E.g. + +```sh +$ export RUUVI_BLE_ADAPTER="Bluegiga" +``` +By default, pygatt will automatically detect USB adapters serial port, but if you have multiple Bluegiga adapters installed or pygatt can not find correct serial port automatically, serial port can be passed with `bt_device` parameter. + +```sh +bt_device='/dev/ttyACM0' +``` + +Pygatt reset Bluegiga USB adapter during start, which might cause issues e.g. in VM environment. Reset can be disabled by environment variable `BLUEGIGA_RESET` with value `False`. E.g. + +```sh +$ export BLUEGIGA_RESET="False" +``` + +Any other value is interpreted as a True. + ## Examples Examples are in [examples](https://github.com/ttu/ruuvitag-sensor/tree/master/examples) directory, e.g. diff --git a/ruuvitag_sensor/adapters/__init__.py b/ruuvitag_sensor/adapters/__init__.py index 13ae454..f581e51 100644 --- a/ruuvitag_sensor/adapters/__init__.py +++ b/ruuvitag_sensor/adapters/__init__.py @@ -13,6 +13,9 @@ def get_ble_adapter(): elif 'bleson' in os.environ.get('RUUVI_BLE_ADAPTER', '').lower(): from ruuvitag_sensor.adapters.bleson import BleCommunicationBleson return BleCommunicationBleson() + elif 'bluegiga' in os.environ.get('RUUVI_BLE_ADAPTER', '').lower(): + from ruuvitag_sensor.adapters.bluegiga import BleCommunicationBluegiga + return BleCommunicationBluegiga() elif 'RUUVI_NIX_FROMFILE' in os.environ: # Emulate BleCommunicationNix by reading hcidump data from a file from ruuvitag_sensor.adapters.nix_hci_file import BleCommunicationNixFile diff --git a/ruuvitag_sensor/adapters/bluegiga.py b/ruuvitag_sensor/adapters/bluegiga.py new file mode 100644 index 0000000..ead6478 --- /dev/null +++ b/ruuvitag_sensor/adapters/bluegiga.py @@ -0,0 +1,154 @@ +import logging +from multiprocessing import Manager +from multiprocessing.managers import DictProxy +from multiprocessing.queues import Queue +import os +import time +from typing import Iterator, List +import pygatt +import binascii +import threading + + +from ruuvitag_sensor.adapters import BleCommunication +from ruuvitag_sensor.ruuvi_types import MacAndRawData, RawData + +log = logging.getLogger(__name__) + + +class BleCommunicationBluegiga(BleCommunication): + """Bluetooth LE communication for Bluegiga""" + + @staticmethod + def get_first_data(mac: str, bt_device: str = '') -> RawData: + if not bt_device: + adapter = pygatt.BGAPIBackend() + else: + adapter = pygatt.BGAPIBackend(bt_device) + + def scan_received(devices, addr, packet_type): + if mac and mac == addr: + log.debug('Received data from device: %s %s', addr, packet_type) + return True # stop scan + + reset = False if os.environ.get('BLUEGIGA_RESET', '').upper() == 'FALSE' else True + adapter.start(reset=reset) + log.debug('Start receiving broadcasts (device %s)', bt_device) + try: + devices = adapter.scan(timeout=60, active=False, scan_cb=scan_received) + for dev in devices: + if mac and mac == dev['address']: + log.debug('Result found for device %s', mac) + rawdata = dev['packet_data']['non-connectable_advertisement_packet']['manufacturer_specific_data'] # noqa: E501 + hexa = binascii.hexlify(rawdata).decode('ascii').upper() + hexa_formatted = BleCommunicationBluegiga._fix_payload(hexa) + log.debug('Data found: %s', hexa_formatted) + return hexa_formatted + finally: + adapter.stop() + + @staticmethod + def get_data(blacklist: List[str] = [], bt_device: str = '') -> Iterator[MacAndRawData]: + m = Manager() + q = m.Queue() + + # Use Manager dict to share data between processes + shared_data = m.dict() + shared_data['blacklist'] = blacklist + shared_data['stop'] = False + + # Start background process + scanner = threading.Thread( + name='Bluegiga scanner', + target=BleCommunicationBluegiga._run_get_data_background, + args=[q, shared_data, bt_device]) + scanner.start() + + try: + while True: + while not q.empty(): + data = q.get() + log.debug('Found data: %s', data) + yield data + time.sleep(0.1) + if not scanner.is_alive(): + raise Exception('Bluegiga scanner is not alive') + except GeneratorExit: + pass + except KeyboardInterrupt: + pass + except Exception as ex: + log.info(ex) + + log.debug('Stop') + shared_data['stop'] = True + scanner.join() + log.debug('Exit') + return + + @staticmethod + def _run_get_data_background(queue: Queue, shared_data: DictProxy, bt_device: str): + """ + Attributes: + device (string): BLE device (default auto) + """ + + if bt_device: + adapter = pygatt.BGAPIBackend(bt_device) + else: + adapter = pygatt.BGAPIBackend() + + reset = False if os.environ.get('BLUEGIGA_RESET', '').upper() == 'FALSE' else True + adapter.start(reset=reset) + try: + while True: + try: + if shared_data['stop']: + break + devices = adapter.scan(timeout=0.5, active=False, ) + for dev in devices: + log.debug('received: %s', dev) + mac = str(dev['address']) + if mac and mac in shared_data['blacklist']: + log.debug('MAC blacklisted: %s', mac) + continue + try: + rawdata = dev['packet_data']['non-connectable_advertisement_packet']['manufacturer_specific_data'] # noqa: E501 + log.debug('Received manufacturer data from %s: %s', mac, rawdata) + hexa = binascii.hexlify(rawdata).decode('ascii').upper() + hexa_formatted = BleCommunicationBluegiga._fix_payload(hexa) + queue.put((mac, hexa_formatted)) + except KeyError: + pass + + # Prevent endless loop if device data never found + # Remove when #81 is fixed + queue.put(('', '')) + except GeneratorExit: + return + except KeyboardInterrupt: + return + finally: + log.debug('Stop scan') + adapter.stop() + + @staticmethod + def _fix_payload(data: str) -> str: + # Adapter returns data in a different format than the nix_hci + # adapter. Since the rest of the processing pipeline is + # somewhat reliant on the additional data, add to the + # beginning of the actual data: + # + # - An FF type marker + # - A length marker, covering the vendor specific data + # - Another length marker, covering the length-marked + # vendor data. + # + # Thus extended, the result can be parsed by the rest of + # the pipeline. + # + # TODO: This is kinda awkward, and should be handled better. + data = f'FF{data.hex()}' + data = f'{(len(data) >> 1):02x}{data}' + data = f'{(len(data) >> 1):02x}{data}' + return data