# Single Heart Rate Monitor Example

This is a minimal demonstration of how to set up and use BleakFSM.

You may choose to use raw BleakClient or Pycycling.

Click on "Run All" to run the full notebook (which should run to the end without errors).

Or, go at your own pace by running each cell.

## Tips

+ At any time, print `model.state` to confirm which state you're in. This will help you figure out what you can do from there.

In [30]:
!pip3 install -r -q requirements.txt

Defaulting to user installation because normal site-packages is not writeable
[31mERROR: Could not open requirements file: [Errno 2] No such file or directory: '-q'[0m
You should consider upgrading via the '/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip' command.[0m


# Setup

In [31]:
import asyncio


In [32]:
from bleak_fsm import machine, BleakModel

In [33]:
model = BleakModel()
machine.add_model(model)

In [34]:
USE_PYCYCLING = True

In [35]:
# Define callbacks

if USE_PYCYCLING:
    from pycycling.heart_rate_service import HeartRateService
    model.wrap = lambda client: HeartRateService(client)
    model.enable_notifications = lambda client: client.enable_hr_measurement_notifications()
    model.disable_notifications = lambda client: client.disable_hr_measurement_notifications()
    def handle_hr_measurement(value):
        print("Using Pycycling wrapper around BleakClient")
        print(f"Heart Rate: {value}")
    model.set_measurement_handler = lambda client: client.set_hr_measurement_handler(handle_hr_measurement)
else:
    HEART_RATE_MEASUREMENT_CHARACTERISTIC_UUID = "00002a37-0000-1000-8000-00805f9b34fb"
    model.enable_notifications = lambda client: client.start_notify(HEART_RATE_MEASUREMENT_CHARACTERISTIC_UUID, handle_hr_measurement)
    model.disable_notifications = lambda client: client.stop_notify(HEART_RATE_MEASUREMENT_CHARACTERISTIC_UUID)
    def handle_hr_measurement(sender, data):
        print("Using raw BleakClient")
        print(f"Data received from {sender}: {data}")
        flag = data[0]
        heart_rate = data[1]
        print(f"Heart Rate: {heart_rate} beats per minute")
    model.set_measurement_handler = lambda client: handle_hr_measurement


# Usage

In [36]:
# Since we're in a Jupyter notebook, 
# we need to use `create_task()` instead of `get_event_loop().run_until_complete()`
# to use the existing event loop which Jupyter runs on.
asyncio.create_task(model.start_scan())
# if we used
# `await model.start_scan()`
# then it would work, but it would block until the scan is finished, which is not what we want,
# because below we will show how the `model.bt_devices` dictionary gets filled with the results of the scan.

<Task pending name='Task-26' coro=<AsyncEvent.trigger() running at /Users/tensorturtle/Library/Python/3.9/lib/python/site-packages/transitions/extensions/asyncio.py:166>>

In [37]:
model.state

'Scanning'

In [38]:
# Stop scan after delay
async def stop_after_delay(delay, model):
    print("Starting sleep for 2 seconds (giving the scanner time to scan)")
    await asyncio.sleep(delay)
    print("Slept for 2 seconds (sending end signal to scanner)")
    await model.stop_scan()
    print("Stopped scanning")
    

asyncio.create_task(stop_after_delay(2, model))

<Task pending name='Task-28' coro=<stop_after_delay() running at /var/folders/16/dbv855p93hx5m7mgrd8fwscw0000gn/T/ipykernel_86696/1686944154.py:2>>

In [39]:
async def print_as_we_go():
    for i in range(20):
        print(f"{len(model.bt_devices)} devices found in {round(i*0.05,2)} seconds.")
        await asyncio.sleep(0.05)

await print_as_we_go()

Starting sleep for 2 seconds (giving the scanner time to scan)
143 devices found in 0.0 seconds.
143 devices found in 0.05 seconds.
143 devices found in 0.1 seconds.
143 devices found in 0.15 seconds.
143 devices found in 0.2 seconds.
143 devices found in 0.25 seconds.
143 devices found in 0.3 seconds.
143 devices found in 0.35 seconds.
143 devices found in 0.4 seconds.
143 devices found in 0.45 seconds.
143 devices found in 0.5 seconds.
143 devices found in 0.55 seconds.
143 devices found in 0.6 seconds.
143 devices found in 0.65 seconds.
143 devices found in 0.7 seconds.
143 devices found in 0.75 seconds.
143 devices found in 0.8 seconds.
143 devices found in 0.85 seconds.
143 devices found in 0.9 seconds.
143 devices found in 0.95 seconds.


In [40]:
model.bt_devices

{'71236655-F4FC-D837-41B0-E892B06683BD': BLEDevice(71236655-F4FC-D837-41B0-E892B06683BD, Buds2 Pro),
 '25F0889C-F780-F707-F08C-E3D61B2FDD28': BLEDevice(25F0889C-F780-F707-F08C-E3D61B2FDD28, None),
 'E9568877-2EF4-2623-BC72-75F46B60844F': BLEDevice(E9568877-2EF4-2623-BC72-75F46B60844F, None),
 'FDCE05D9-92FD-46C5-05CC-F21094FF49DB': BLEDevice(FDCE05D9-92FD-46C5-05CC-F21094FF49DB, None),
 'D1F8CF76-8380-5BEB-96E9-61A97C8E5354': BLEDevice(D1F8CF76-8380-5BEB-96E9-61A97C8E5354, None),
 '765DF92E-3806-13CC-7D8D-FDB1ED983EA5': BLEDevice(765DF92E-3806-13CC-7D8D-FDB1ED983EA5, None),
 'C0C42CD0-51EA-8EF8-8E82-5E87024F8870': BLEDevice(C0C42CD0-51EA-8EF8-8E82-5E87024F8870, Logitech Pebble),
 '552F8AFB-F463-133D-0C77-222F9AFA2ECB': BLEDevice(552F8AFB-F463-133D-0C77-222F9AFA2ECB, None),
 '9F61C36A-D851-6DCD-0A62-926D93E0C6C9': BLEDevice(9F61C36A-D851-6DCD-0A62-926D93E0C6C9, 55445771,4,0,0),
 '03B447A1-E005-B088-6E5F-52A5D391BA92': BLEDevice(03B447A1-E005-B088-6E5F-52A5D391BA92, None),
 '06F5ABB5-584

In [41]:
# Client should select the ID from the following list
target_name = "WHOOPDEDOO"

target_address = ""
for k,v in model.bt_devices.items():
    if v.name == target_name:
        target_address = k

if target_address == "":
    print(f"Fail. No device with name {target_name} found")
else:
    print(f"Success! Found device named {target_name}")

Success! Found device named WHOOPDEDOO


In [42]:
await asyncio.sleep(3)
await model.set_target(target_address)

Slept for 2 seconds (sending end signal to scanner)
Stopped scanning


True

In [43]:
model.state

'TargetSet'

In [44]:
await asyncio.sleep(3) # wait for scan to finish

In [45]:
await model.connect() # first time connecting

Connected to 14863C4D-BF71-4EA3-C6B4-98001056AAF8


True

In [46]:
model.state

'Connected'

In [47]:
async def run_stream():
    await model.stream()

asyncio.create_task(run_stream())

<Task pending name='Task-36' coro=<run_stream() running at /var/folders/16/dbv855p93hx5m7mgrd8fwscw0000gn/T/ipykernel_86696/3697076873.py:1>>

In [48]:
await asyncio.sleep(5)

Started streaming
Using Pycycling wrapper around BleakClient
Heart Rate: HeartRateMeasurement(sensor_contact=False, bpm=92, rr_interval=[665, 671], energy_expended=None)
Using Pycycling wrapper around BleakClient
Heart Rate: HeartRateMeasurement(sensor_contact=False, bpm=94, rr_interval=[642], energy_expended=None)
Using Pycycling wrapper around BleakClient
Heart Rate: HeartRateMeasurement(sensor_contact=False, bpm=93, rr_interval=[681], energy_expended=None)
Using Pycycling wrapper around BleakClient
Heart Rate: HeartRateMeasurement(sensor_contact=False, bpm=92, rr_interval=[648, 662], energy_expended=None)
Using Pycycling wrapper around BleakClient
Heart Rate: HeartRateMeasurement(sensor_contact=False, bpm=90, rr_interval=[655], energy_expended=None)


In [49]:
await model.disconnect()

Stopped streaming
Disconnected from 14863C4D-BF71-4EA3-C6B4-98001056AAF8


True

In [50]:
model.state

'TargetSet'

In [51]:
await model.unset_target()

True

In [52]:
model.state

'Init'

At this point, we have returned to 'Init' state, and you may 'start_scan', 'set_target_address', 'connect', etc.

# Again, with Errors

For a fuller understanding of BleakFSM, Let's step through the same workflow as above but also check that the proper errors are being emitted. 

We'll discuss how best to handle those errors.

In [53]:
model.state

'Init'

Let's try to skip a step in the Finite State Machine.

From `Init`, we may only go to `TargetSet`.

If we try to go to `Connect`, we should get a `MachineError`:

In [54]:
from transitions import MachineError
try:
    await model.connect()
except MachineError as e:
    print(e)

"Can't trigger event connect from state Init!"


In order to go to `TargetSet`, we need an address.

To get an address, we need to scan

In [55]:
await model.start_scan()
await asyncio.sleep(3)
await model.stop_scan()

True

When scanning is done, the results are saved in `model.bt_devices`

In [56]:
model.bt_devices

{'71236655-F4FC-D837-41B0-E892B06683BD': BLEDevice(71236655-F4FC-D837-41B0-E892B06683BD, Buds2 Pro),
 '25F0889C-F780-F707-F08C-E3D61B2FDD28': BLEDevice(25F0889C-F780-F707-F08C-E3D61B2FDD28, None),
 'E9568877-2EF4-2623-BC72-75F46B60844F': BLEDevice(E9568877-2EF4-2623-BC72-75F46B60844F, None),
 'FDCE05D9-92FD-46C5-05CC-F21094FF49DB': BLEDevice(FDCE05D9-92FD-46C5-05CC-F21094FF49DB, None),
 'D1F8CF76-8380-5BEB-96E9-61A97C8E5354': BLEDevice(D1F8CF76-8380-5BEB-96E9-61A97C8E5354, None),
 '765DF92E-3806-13CC-7D8D-FDB1ED983EA5': BLEDevice(765DF92E-3806-13CC-7D8D-FDB1ED983EA5, None),
 'C0C42CD0-51EA-8EF8-8E82-5E87024F8870': BLEDevice(C0C42CD0-51EA-8EF8-8E82-5E87024F8870, Logitech Pebble),
 '552F8AFB-F463-133D-0C77-222F9AFA2ECB': BLEDevice(552F8AFB-F463-133D-0C77-222F9AFA2ECB, None),
 '9F61C36A-D851-6DCD-0A62-926D93E0C6C9': BLEDevice(9F61C36A-D851-6DCD-0A62-926D93E0C6C9, 55445771,4,0,0),
 '03B447A1-E005-B088-6E5F-52A5D391BA92': BLEDevice(03B447A1-E005-B088-6E5F-52A5D391BA92, None),
 '06F5ABB5-584

The keys of this dictionary is the address. 