Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bluegiga adapter support #126

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,35 @@ __NOTE:__ On macOS only Data Format 5 works as macOS doesn't advertise MAC-addre

__NOTE:__ On Windows Bleson requires _Python 3.6_. Unfortunately on Windows, Bleson doesn't send any payload for advertised package, so it is still unusable.

## Bluegiga

Use Bluegiga's BGAPI, which is compatible with USB adapters like the BLED112. Bluegiga should work with Linux, macOS and Windows.

Requires pygatt and pexpect, which are not installed automatically with `ruuvitag_sensor` package. You can install those manually e.g. via pip.

```sh
$ pip install pygatt pexpect
```

Add environment variable `RUUVI_BLE_ADAPTER` with value `Bluegiga`. E.g.

```sh
$ export RUUVI_BLE_ADAPTER="Bluegiga"
```
By default, pygatt will automatically detect USB adapters serial port, but if you have multiple Bluegiga adapters installed or pygatt can not find correct serial port automatically, serial port can be passed with `bt_device` parameter.

```sh
bt_device='/dev/ttyACM0'
```

Pygatt reset Bluegiga USB adapter during start, which might cause issues e.g. in VM environment. Reset can be disabled by environment variable `BLUEGIGA_RESET` with value `False`. E.g.

```sh
$ export BLUEGIGA_RESET="False"
```

Any other value is interpreted as a True.

## Examples

Examples are in [examples](https://github.com/ttu/ruuvitag-sensor/tree/master/examples) directory, e.g.
Expand Down
129 changes: 129 additions & 0 deletions ruuvitag_sensor/adapters/bluegiga.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import logging
import os
import sys
import time
import pygatt
import binascii
import threading
import os

from multiprocessing import Manager
from queue import Queue

from ruuvitag_sensor.adapters import BleCommunication

log = logging.getLogger(__name__)

class BleCommunicationBluegiga(BleCommunication):
"""Bluetooth LE communication for Bluegiga"""

@staticmethod
def get_data(mac, bt_device=''):
if not bt_device:
adapter = pygatt.BGAPIBackend()
else:
adapter = pygatt.BGAPIBackend(bt_device)

def scan_received(devices, addr, packet_type):
if mac and mac == addr:
log.debug('Received data from device: %s %s', addr, packet_type)
return True # stop scan

reset = False if os.environ.get('BLUEGIGA_RESET', '').upper() == 'FALSE' else True
adapter.start(reset=reset)
log.debug('Start receiving broadcasts (device %s)', bt_device)
try:
devices = adapter.scan(timeout=60, active=False, scan_cb=scan_received)
for dev in devices:
if mac and mac == dev['address']:
log.debug('Result found for device %s', mac )
rawdata = dev['packet_data']['non-connectable_advertisement_packet']['manufacturer_specific_data']
hexa = binascii.hexlify(rawdata).decode("ascii").upper()
log.debug('Data found: %s', hexa)
return hexa
finally:
adapter.stop()

@staticmethod
def get_datas(blacklist=[], bt_device=''):
m = Manager()
q = m.Queue()

# Use Manager dict to share data between processes
shared_data = m.dict()
shared_data['blacklist'] = blacklist
shared_data['stop'] = False

# Start background process
scanner = threading.Thread(
name='Bluegiga scanner',
target=BleCommunicationBluegiga._run_get_data_background,
args=[q, shared_data, bt_device])
scanner.start()

try:
while True:
while not q.empty():
data = q.get()
log.debug('Found data: %s', data)
yield data
time.sleep(0.1)
if not scanner.is_alive():
raise Exception('Bluegiga scanner is not alive')
except GeneratorExit:
pass
except KeyboardInterrupt as ex:
pass
except Exception as ex:
log.info(ex)

log.debug('Stop')
shared_data['stop'] = True
scanner.join()
log.debug('Exit')
return

@staticmethod
def _run_get_data_background(queue, shared_data, bt_device):
"""
Attributes:
device (string): BLE device (default auto)
"""

if bt_device:
adapter = pygatt.BGAPIBackend(bt_device)
else:
adapter = pygatt.BGAPIBackend()

reset = False if os.environ.get('BLUEGIGA_RESET', '').upper() == 'FALSE' else True
ttu marked this conversation as resolved.
Show resolved Hide resolved
adapter.start(reset=reset)
try:
while True:
try:
if shared_data['stop']:
break
devices = adapter.scan(timeout=0.5, active=False, )
for dev in devices:
log.debug('received: %s', dev)
mac = str(dev['address'])
if mac and mac in shared_data['blacklist']:
log.debug('MAC blacklisted: %s', mac)
continue
try:
rawdata = dev['packet_data']['non-connectable_advertisement_packet']['manufacturer_specific_data']
log.debug('Received manufacturer data from %s: %s', mac, rawdata)
hexa = binascii.hexlify(rawdata).decode("ascii").upper()
queue.put((mac, hexa))
except KeyError:
pass

# Prevent endless loop if device data never found
# Remove when #81 is fixed
queue.put(('', ''))
except GeneratorExit:
return
except KeyboardInterrupt as ex:
return
finally:
log.debug('Stop scan')
adapter.stop()
11 changes: 11 additions & 0 deletions ruuvitag_sensor/ruuvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@
if os.environ.get('RUUVI_BLE_ADAPTER') == 'Bleson':
from ruuvitag_sensor.adapters.bleson import BleCommunicationBleson
ble = BleCommunicationBleson()
elif os.environ.get('RUUVI_BLE_ADAPTER') == 'Bluegiga':
def get_raw_bluegiga(raw, data_format):
# Bluegiga drops FF from the raw data and it is required for DF 3 and 5
# TODO: Move convert_data to adaptor specific code
if data_format in (2, 4):
ttu marked this conversation as resolved.
Show resolved Hide resolved
return raw
return 'FF' + raw

DataFormats._parse_raw = get_raw_bluegiga # pylint: disable=protected-access
from ruuvitag_sensor.adapters.bluegiga import BleCommunicationBluegiga
ble = BleCommunicationBluegiga()
elif "RUUVI_NIX_FROMFILE" in os.environ:
# Emulate BleCommunicationNix by reading hcidump data from a file
from ruuvitag_sensor.adapters.nix_hci_file import BleCommunicationNixFile
Expand Down