In [1]:
!export BLEAK_LOGGING=1

In [2]:
# jupyter: patches asyncio to allow nested use of asyncio.run and loop.run_until_complete.
import nest_asyncio
nest_asyncio.apply()

In [3]:
import asyncio
import traceback

from bleak import BleakScanner, BleakClient
from bleak.exc import BleakError

from pprint import pprint


loop = asyncio.get_event_loop()

## 1. find ble device

In [4]:
# async def discover():
#     devices = await BleakScanner.discover()
#     for d in devices:
#         print(d)

In [5]:
# find_device_task = loop.create_task(discover())

## 2. ble device service

In [6]:
ble_address = 'BA:33:DC:F6:46:6D'

In [7]:
async def get_device_services(ble_address: str):
    # 查找设备是否存在
    device = await BleakScanner.find_device_by_address(ble_address, timeout=20.0)
    if not device:
        raise BleakError(f"A device with address {ble_address} could not be found.")
    async with BleakClient(device) as client:
        svcs = await client.get_services()
        print("Services:")
        svc_data = []
        for service in svcs:
            obj = dict(service.obj, description=service.description)
            obj['characteristics'] = [dict(c.obj, description=c.description, handle=c.handle) for c in service.characteristics]
            svc_data.append(obj)
        pprint(svc_data)
        return svc_data

In [8]:
task_service = loop.run_until_complete(get_device_services(ble_address))

Services:
[{'Device': '/org/bluez/hci0/dev_BA_33_DC_F6_46_6D',
  'Includes': [],
  'Primary': True,
  'UUID': 'fa879af4-d601-420c-b2b4-07ffb528dde3',
  'characteristics': [{'Flags': ['write'],
                       'Service': '/org/bluez/hci0/dev_BA_33_DC_F6_46_6D/service0010',
                       'UUID': 'b02eaeaa-f6bc-4a7e-bc94-f7b7fc8ded0b',
                       'Value': b'',
                       'description': 'Unknown',
                       'handle': 20},
                      {'Flags': ['notify'],
                       'NotifyAcquired': False,
                       'Notifying': False,
                       'Service': '/org/bluez/hci0/dev_BA_33_DC_F6_46_6D/service0010',
                       'UUID': '10e2fde2-d7fe-4845-b3f3-a32010ebb095',
                       'Value': b'\xfe\x01\x00\x0eP\x12\x02\x07WW0001\x040.7',
                       'description': 'Unknown',
                       'handle': 17}],
  'description': 'Unknown'},
 {'Device': '/org/bluez/hci0/dev_BA_

In [9]:
# notebook 不需要，创建时立即就执行了
# loop.run_until_complete(task_service)
# print('done')

## 3. characteristics write and notification

In [12]:
def notification_handler(sender, data):
    """Simple notification handler which prints the data received."""
    print(f"--- notification_handler {sender}: {data}")


async def notify_char(address: str, notify_uuid: str, write_list: List):
    print(f'{address=}, {notify_uuid=}, {write_list=}')
    async with BleakClient(address) as client:
        try:
            print(f"Connected: {client.is_connected}")

            # await client.start_notify(char_uuid, notification_handler)
            # print("started notify")

            for data in write_list:
                encode_data = bytes.fromhex(data['data'])
                resp = await client.write_gatt_char(data['uuid'], encode_data)
                print(f"request: {data}, response: {resp}")
                await asyncio.sleep(0.5)

            # await asyncio.sleep(5.0)
            # await client.stop_notify(char_uuid)
        except Exception as e:
            print(f'error: {e}')
            traceback.print_exc()
            return False
    return True

In [None]:
ble_address = 'BA:33:DC:F6:46:6D'
notify_uuid = '10e2fde2-d7fe-4845-b3f3-a32010ebb095'
write_list = [{
    'uuid': 'b02eaeaa-f6bc-4a7e-bc94-f7b7fc8ded0b',
    'data': 'fe0100025011'
}, {
    'uuid': 'b02eaeaa-f6bc-4a7e-bc94-f7b7fc8ded0b',
    'data': 'fe0100023004'
}, {
    'uuid': 'b02eaeaa-f6bc-4a7e-bc94-f7b7fc8ded0b',
    'data': 'fe01001050013230323230323032313434313132'
}, {
    'uuid': 'b02eaeaa-f6bc-4a7e-bc94-f7b7fc8ded0b',
    'data': 'fe010006320101807f12'
}]

task_write_char = loop.run_until_complete(notify_char(ble_address, notify_uuid, write_list))
print(f'res: {task_write_char}')

## 4. characteristics read

In [31]:
async def read_gatt_char(address, char_uuid: str):
    print(f'{address=}, {char_uuid=}')
    async with BleakClient(address) as client:
        try:
            char_res = await client.read_gatt_char(char_uuid)
            print('done')
            print(f'{char_res=}')
        except Exception as e:
            print(f'error: {e}')
            traceback.print_exc()


In [32]:
read_char_uuid = '00002a01-0000-1000-8000-00805f9b34fb'
task_write_char = loop.create_task(read_gatt_char(ble_address, read_char_uuid))

address='BA:33:DC:F6:46:6D', char_uuid='00002a01-0000-1000-8000-00805f9b34fb'
error: Multiple Characteristics with this UUID, refer to your desired characteristic by the `handle` attribute instead.


Traceback (most recent call last):
  File "/tmp/ipykernel_512338/99780459.py", line 5, in read_gatt_char
    char_res = await client.read_gatt_char(char_uuid)
  File "/home/code/.local/lib/python3.8/site-packages/bleak/backends/bluezdbus/client.py", line 621, in read_gatt_char
    characteristic = self.services.get_characteristic(char_specifier)
  File "/home/code/.local/lib/python3.8/site-packages/bleak/backends/service.py", line 190, in get_characteristic
    raise BleakError(
bleak.exc.BleakError: Multiple Characteristics with this UUID, refer to your desired characteristic by the `handle` attribute instead.


In [11]:
asyncio.all_tasks()

set()