-
-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
examples/bluetooth: Add simple UART demo with central and peripheral.
- Loading branch information
Showing
2 changed files
with
343 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,245 @@ | ||
# This example finds and connects to a peripheral running the | ||
# UART service (e.g. ble_simple_peripheral.py). | ||
|
||
import bluetooth | ||
import random | ||
import struct | ||
import time | ||
import micropython | ||
|
||
from ble_advertising import decode_services, decode_name | ||
|
||
from micropython import const | ||
|
||
_IRQ_CENTRAL_CONNECT = const(1) | ||
_IRQ_CENTRAL_DISCONNECT = const(2) | ||
_IRQ_GATTS_WRITE = const(3) | ||
_IRQ_GATTS_READ_REQUEST = const(4) | ||
_IRQ_SCAN_RESULT = const(5) | ||
_IRQ_SCAN_DONE = const(6) | ||
_IRQ_PERIPHERAL_CONNECT = const(7) | ||
_IRQ_PERIPHERAL_DISCONNECT = const(8) | ||
_IRQ_GATTC_SERVICE_RESULT = const(9) | ||
_IRQ_GATTC_SERVICE_DONE = const(10) | ||
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11) | ||
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12) | ||
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13) | ||
_IRQ_GATTC_DESCRIPTOR_DONE = const(14) | ||
_IRQ_GATTC_READ_RESULT = const(15) | ||
_IRQ_GATTC_READ_DONE = const(16) | ||
_IRQ_GATTC_WRITE_DONE = const(17) | ||
_IRQ_GATTC_NOTIFY = const(18) | ||
_IRQ_GATTC_INDICATE = const(19) | ||
|
||
_ADV_IND = const(0x00) | ||
_ADV_DIRECT_IND = const(0x01) | ||
_ADV_SCAN_IND = const(0x02) | ||
_ADV_NONCONN_IND = const(0x03) | ||
|
||
_UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E") | ||
_UART_RX_CHAR_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E") | ||
_UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E") | ||
|
||
|
||
class BLESimpleCentral: | ||
def __init__(self, ble): | ||
self._ble = ble | ||
self._ble.active(True) | ||
self._ble.irq(handler=self._irq) | ||
|
||
self._reset() | ||
|
||
def _reset(self): | ||
# Cached name and address from a successful scan. | ||
self._name = None | ||
self._addr_type = None | ||
self._addr = None | ||
|
||
# Callbacks for completion of various operations. | ||
# These reset back to None after being invoked. | ||
self._scan_callback = None | ||
self._conn_callback = None | ||
self._read_callback = None | ||
|
||
# Persistent callback for when new data is notified from the device. | ||
self._notify_callback = None | ||
|
||
# Connected device. | ||
self._conn_handle = None | ||
self._start_handle = None | ||
self._end_handle = None | ||
self._tx_handle = None | ||
self._rx_handle = None | ||
|
||
def _irq(self, event, data): | ||
if event == _IRQ_SCAN_RESULT: | ||
addr_type, addr, adv_type, rssi, adv_data = data | ||
if adv_type in (_ADV_IND, _ADV_DIRECT_IND,) and _UART_SERVICE_UUID in decode_services( | ||
adv_data | ||
): | ||
# Found a potential device, remember it and stop scanning. | ||
self._addr_type = addr_type | ||
self._addr = bytes( | ||
addr | ||
) # Note: addr buffer is owned by caller so need to copy it. | ||
self._name = decode_name(adv_data) or "?" | ||
self._ble.gap_scan(None) | ||
|
||
elif event == _IRQ_SCAN_DONE: | ||
if self._scan_callback: | ||
if self._addr: | ||
# Found a device during the scan (and the scan was explicitly stopped). | ||
self._scan_callback(self._addr_type, self._addr, self._name) | ||
self._scan_callback = None | ||
else: | ||
# Scan timed out. | ||
self._scan_callback(None, None, None) | ||
|
||
elif event == _IRQ_PERIPHERAL_CONNECT: | ||
# Connect successful. | ||
conn_handle, addr_type, addr, = data | ||
if addr_type == self._addr_type and addr == self._addr: | ||
self._conn_handle = conn_handle | ||
self._ble.gattc_discover_services(self._conn_handle) | ||
|
||
elif event == _IRQ_PERIPHERAL_DISCONNECT: | ||
# Disconnect (either initiated by us or the remote end). | ||
conn_handle, _, _, = data | ||
if conn_handle == self._conn_handle: | ||
# If it was initiated by us, it'll already be reset. | ||
self._reset() | ||
|
||
elif event == _IRQ_GATTC_SERVICE_RESULT: | ||
# Connected device returned a service. | ||
conn_handle, start_handle, end_handle, uuid = data | ||
print("service", data) | ||
if conn_handle == self._conn_handle and uuid == _UART_SERVICE_UUID: | ||
self._start_handle, self._end_handle = start_handle, end_handle | ||
|
||
elif event == _IRQ_GATTC_SERVICE_DONE: | ||
# Service query complete. | ||
if self._start_handle and self._end_handle: | ||
self._ble.gattc_discover_characteristics( | ||
self._conn_handle, self._start_handle, self._end_handle | ||
) | ||
else: | ||
print("Failed to find uart service.") | ||
|
||
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT: | ||
# Connected device returned a characteristic. | ||
conn_handle, def_handle, value_handle, properties, uuid = data | ||
if conn_handle == self._conn_handle and uuid == _UART_RX_CHAR_UUID: | ||
self._rx_handle = value_handle | ||
if conn_handle == self._conn_handle and uuid == _UART_TX_CHAR_UUID: | ||
self._tx_handle = value_handle | ||
|
||
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE: | ||
# Characteristic query complete. | ||
if self._tx_handle is not None and self._rx_handle is not None: | ||
# We've finished connecting and discovering device, fire the connect callback. | ||
if self._conn_callback: | ||
self._conn_callback() | ||
else: | ||
print("Failed to find uart rx characteristic.") | ||
|
||
elif event == _IRQ_GATTC_WRITE_DONE: | ||
conn_handle, value_handle, status = data | ||
print("TX complete") | ||
|
||
elif event == _IRQ_GATTC_NOTIFY: | ||
conn_handle, value_handle, notify_data = data | ||
if conn_handle == self._conn_handle and value_handle == self._tx_handle: | ||
if self._notify_callback: | ||
self._notify_callback(notify_data) | ||
|
||
# Returns true if we've successfully connected and discovered characteristics. | ||
def is_connected(self): | ||
return ( | ||
self._conn_handle is not None | ||
and self._tx_handle is not None | ||
and self._rx_handle is not None | ||
) | ||
|
||
# Find a device advertising the environmental sensor service. | ||
def scan(self, callback=None): | ||
self._addr_type = None | ||
self._addr = None | ||
self._scan_callback = callback | ||
self._ble.gap_scan(2000, 30000, 30000) | ||
|
||
# Connect to the specified device (otherwise use cached address from a scan). | ||
def connect(self, addr_type=None, addr=None, callback=None): | ||
self._addr_type = addr_type or self._addr_type | ||
self._addr = addr or self._addr | ||
self._conn_callback = callback | ||
if self._addr_type is None or self._addr is None: | ||
return False | ||
self._ble.gap_connect(self._addr_type, self._addr) | ||
return True | ||
|
||
# Disconnect from current device. | ||
def disconnect(self): | ||
if not self._conn_handle: | ||
return | ||
self._ble.gap_disconnect(self._conn_handle) | ||
self._reset() | ||
|
||
# Send data over the UART | ||
def write(self, v, response=False): | ||
if not self.is_connected(): | ||
return | ||
self._ble.gattc_write(self._conn_handle, self._rx_handle, v, 1 if response else 0) | ||
|
||
# Set handler for when data is received over the UART. | ||
def on_notify(self, callback): | ||
self._notify_callback = callback | ||
|
||
|
||
def demo(): | ||
ble = bluetooth.BLE() | ||
central = BLESimpleCentral(ble) | ||
|
||
not_found = False | ||
|
||
def on_scan(addr_type, addr, name): | ||
if addr_type is not None: | ||
print("Found peripheral:", addr_type, addr, name) | ||
central.connect() | ||
else: | ||
nonlocal not_found | ||
not_found = True | ||
print("No peripheral found.") | ||
|
||
central.scan(callback=on_scan) | ||
|
||
# Wait for connection... | ||
while not central.is_connected(): | ||
time.sleep_ms(100) | ||
if not_found: | ||
return | ||
|
||
print("Connected") | ||
|
||
def on_rx(v): | ||
print("RX", v) | ||
|
||
central.on_notify(on_rx) | ||
|
||
with_response = False | ||
|
||
i = 0 | ||
while central.is_connected(): | ||
try: | ||
v = str(i) + "_" | ||
print("TX", v) | ||
central.write(v, with_response) | ||
except: | ||
print("TX failed") | ||
i += 1 | ||
time.sleep_ms(400 if with_response else 30) | ||
|
||
print("Disconnected") | ||
|
||
|
||
if __name__ == "__main__": | ||
demo() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
# This example demonstrates a UART periperhal. | ||
|
||
import bluetooth | ||
import random | ||
import struct | ||
import time | ||
from ble_advertising import advertising_payload | ||
|
||
from micropython import const | ||
|
||
_IRQ_CENTRAL_CONNECT = const(1) | ||
_IRQ_CENTRAL_DISCONNECT = const(2) | ||
_IRQ_GATTS_WRITE = const(3) | ||
|
||
_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E") | ||
_UART_TX = ( | ||
bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"), | ||
bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY, | ||
) | ||
_UART_RX = ( | ||
bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"), | ||
bluetooth.FLAG_WRITE | bluetooth.FLAG_WRITE_NO_RESPONSE, | ||
) | ||
_UART_SERVICE = ( | ||
_UART_UUID, | ||
(_UART_TX, _UART_RX,), | ||
) | ||
|
||
|
||
class BLESimplePeripheral: | ||
def __init__(self, ble, name="mpy-uart"): | ||
self._ble = ble | ||
self._ble.active(True) | ||
self._ble.irq(handler=self._irq) | ||
((self._handle_tx, self._handle_rx,),) = self._ble.gatts_register_services( | ||
(_UART_SERVICE,) | ||
) | ||
self._connections = set() | ||
self._write_callback = None | ||
self._payload = advertising_payload(name=name, services=[_UART_UUID],) | ||
self._advertise() | ||
|
||
def _irq(self, event, data): | ||
# Track connections so we can send notifications. | ||
if event == _IRQ_CENTRAL_CONNECT: | ||
conn_handle, _, _, = data | ||
print("New connection", conn_handle) | ||
self._connections.add(conn_handle) | ||
elif event == _IRQ_CENTRAL_DISCONNECT: | ||
conn_handle, _, _, = data | ||
print("Disconnected", conn_handle) | ||
self._connections.remove(conn_handle) | ||
# Start advertising again to allow a new connection. | ||
self._advertise() | ||
elif event == _IRQ_GATTS_WRITE: | ||
conn_handle, value_handle = data | ||
value = self._ble.gatts_read(value_handle) | ||
if value_handle == self._handle_rx and self._write_callback: | ||
self._write_callback(value) | ||
|
||
def send(self, data): | ||
for conn_handle in self._connections: | ||
self._ble.gatts_notify(conn_handle, self._handle_tx, data) | ||
|
||
def is_connected(self): | ||
return len(self._connections) > 0 | ||
|
||
def _advertise(self, interval_us=500000): | ||
print("Starting advertising") | ||
self._ble.gap_advertise(interval_us, adv_data=self._payload) | ||
|
||
def on_write(self, callback): | ||
self._write_callback = callback | ||
|
||
|
||
def demo(): | ||
ble = bluetooth.BLE() | ||
p = BLESimplePeripheral(ble) | ||
|
||
def on_rx(v): | ||
print("RX", v) | ||
|
||
p.on_write(on_rx) | ||
|
||
i = 0 | ||
while True: | ||
if p.is_connected(): | ||
# Short burst of queued notifications. | ||
for _ in range(3): | ||
data = str(i) + "_" | ||
print("TX", data) | ||
p.send(data) | ||
i += 1 | ||
time.sleep_ms(100) | ||
|
||
|
||
if __name__ == "__main__": | ||
demo() |