From 8ef2d56a5366a69bc932abd5651a8f1d43a50ea1 Mon Sep 17 00:00:00 2001 From: David Lechner Date: Sun, 20 Jul 2025 12:28:52 -0500 Subject: [PATCH 1/4] ble.pybricks: fix wrong return type Fix wrong return type of unpack_hub_capabilities(). This was likely missed when we added a new field in Pybricks Profile v1.5. --- pybricksdev/ble/pybricks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pybricksdev/ble/pybricks.py b/pybricksdev/ble/pybricks.py index 2540c8d..90ada0e 100644 --- a/pybricksdev/ble/pybricks.py +++ b/pybricksdev/ble/pybricks.py @@ -351,7 +351,7 @@ class UserProgramId(IntEnum): """ -def unpack_hub_capabilities(data: bytes) -> Tuple[int, HubCapabilityFlag, int]: +def unpack_hub_capabilities(data: bytes) -> Tuple[int, HubCapabilityFlag, int, int]: """ Unpacks the value read from the hub capabilities characteristic. From db63a39cb1d69f50188f19459124189445e5be94 Mon Sep 17 00:00:00 2001 From: David Lechner Date: Sun, 20 Jul 2025 12:33:24 -0500 Subject: [PATCH 2/4] usb: add NXT USB PID Pybricks is working on supporting the NXT brick, so we will need the USB PID for it. --- pybricksdev/usb/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pybricksdev/usb/__init__.py b/pybricksdev/usb/__init__.py index 55cfd47..dac2c1c 100644 --- a/pybricksdev/usb/__init__.py +++ b/pybricksdev/usb/__init__.py @@ -3,6 +3,7 @@ # Official LEGO USB identifiers LEGO_USB_VID = 0x0694 +NXT_USB_PID = 0x0002 EV3_USB_PID = 0x0005 EV3_BOOTLOADER_USB_PID = 0x0006 SPIKE_PRIME_DFU_USB_PID = 0x0008 From 3eb37544e68b24c25c81eb145991064de98e3710 Mon Sep 17 00:00:00 2001 From: David Lechner Date: Sun, 20 Jul 2025 15:24:46 -0500 Subject: [PATCH 3/4] usb: change how reading characteristics works In the firmware, we changed how characteristics are read over USB. This adapts pybricksdev to match. --- pybricksdev/ble/pybricks.py | 21 +++++ pybricksdev/connections/pybricks.py | 120 +++++++++++++++++++--------- pybricksdev/usb/pybricks.py | 40 ++++++++++ 3 files changed, 145 insertions(+), 36 deletions(-) diff --git a/pybricksdev/ble/pybricks.py b/pybricksdev/ble/pybricks.py index 90ada0e..fcaf5ac 100644 --- a/pybricksdev/ble/pybricks.py +++ b/pybricksdev/ble/pybricks.py @@ -407,6 +407,18 @@ def _standard_uuid(short: int) -> str: return f"{short:08x}-0000-1000-8000-00805f9b34fb" +def short_uuid(uuid: str) -> int: + """Gets a 16-bit from a 128-bit UUID. + + Args: + uuid: a 128-bit UUID as a string. + + Returns: + The 16-bit UUID. + """ + return int(uuid[4:8], 16) + + # Device Information Service: https://www.bluetooth.com/specifications/specs/device-information-service-1-1/ DI_SERVICE_UUID = _standard_uuid(0x180A) @@ -415,6 +427,15 @@ def _standard_uuid(short: int) -> str: .. availability:: Since Pybricks protocol v1.0.0. """ +DEVICE_NAME_UUID = _standard_uuid(0x2A00) +"""Standard Device Name characteristic UUID. + +We typically don't read this directly over BLE since some OSes block reading it. +Instead,we use the name from the advertising data. + +.. availability:: Since Pybricks protocol v1.0.0. +""" + FW_REV_UUID = _standard_uuid(0x2A26) """Standard Firmware Revision String characteristic UUID diff --git a/pybricksdev/connections/pybricks.py b/pybricksdev/connections/pybricks.py index 1fcd3c9..fcdfee9 100644 --- a/pybricksdev/connections/pybricks.py +++ b/pybricksdev/connections/pybricks.py @@ -7,7 +7,6 @@ import os import struct from typing import Awaitable, Callable, List, Optional, TypeVar -from uuid import UUID import reactivex.operators as op import semver @@ -18,14 +17,23 @@ from reactivex.subject import BehaviorSubject, Subject from tqdm.auto import tqdm from tqdm.contrib.logging import logging_redirect_tqdm -from usb.control import get_descriptor from usb.core import Device as USBDevice -from usb.core import Endpoint, USBTimeoutError -from usb.util import ENDPOINT_IN, ENDPOINT_OUT, endpoint_direction, find_descriptor +from usb.core import Endpoint, Interface, USBTimeoutError +from usb.util import ( + CTRL_IN, + CTRL_RECIPIENT_INTERFACE, + CTRL_TYPE_CLASS, + ENDPOINT_IN, + ENDPOINT_OUT, + build_request_type, + endpoint_direction, + find_descriptor, +) from pybricksdev.ble.lwp3.bytecodes import HubKind from pybricksdev.ble.nus import NUS_RX_UUID, NUS_TX_UUID from pybricksdev.ble.pybricks import ( + DEVICE_NAME_UUID, FW_REV_UUID, PNP_ID_UUID, PYBRICKS_COMMAND_EVENT_UUID, @@ -37,6 +45,7 @@ HubCapabilityFlag, StatusFlag, UserProgramId, + short_uuid, unpack_hub_capabilities, unpack_pnp_id, ) @@ -45,7 +54,9 @@ from pybricksdev.tools import chunk from pybricksdev.tools.checksum import xor_bytes from pybricksdev.usb.pybricks import ( + PYBRICKS_USB_INTERFACE_CLASS_REQUEST_MAX_SIZE, PybricksUsbInEpMessageType, + PybricksUsbInterfaceClassRequest, PybricksUsbOutEpMessageType, ) @@ -786,6 +797,40 @@ async def start_notify(self, uuid: str, callback: Callable) -> None: return await self._client.start_notify(uuid, callback) +def get_interface_class_data( + dev: USBDevice, + interface: Interface, + desc_size: int, + request: int = 0, + value: int = 0, +) -> bytes: + """ + Get a vendor-specific descriptor. + + usb.util doesn't have a method like this, so we have to do it ourselves. + + Args: + dev: The USB device. + vendor_code: The vendor code from the BOS descriptor. + value: The wValue field of the request. + index: The wIndex field of the request. + """ + + bmRequestType = build_request_type( + CTRL_IN, CTRL_TYPE_CLASS, CTRL_RECIPIENT_INTERFACE + ) + + ret = dev.ctrl_transfer( + bmRequestType=bmRequestType, + bRequest=request, + wValue=value, + wIndex=interface.index, + data_or_wLength=desc_size, + ) + + return bytes(ret) + + class PybricksHubUSB(PybricksHub): _device: USBDevice _ep_in: Endpoint @@ -820,46 +865,49 @@ async def _client_connect(self) -> bool: # There is 1 byte overhead for PybricksUsbMessageType self._max_write_size = self._ep_out.wMaxPacketSize - 1 - # Get length of BOS descriptor - bos_descriptor = get_descriptor(self._device, 5, 0x0F, 0) - (ofst, _, bos_len, _) = struct.unpack(" bytes: + return get_interface_class_data( + self._device, + intf, + PYBRICKS_USB_INTERFACE_CLASS_REQUEST_MAX_SIZE, + req, + value, ) - if desc_type != 0x10: - logger.error("Expected Device Capability descriptor") - exit(1) + hub_name_desc = read_data( + PybricksUsbInterfaceClassRequest.GATT_CHARACTERISTIC, + short_uuid(DEVICE_NAME_UUID), + ) - # Look for platform descriptors - if cap_type == 0x05: - uuid_bytes = bos_descriptor[ofst + 4 : ofst + 4 + 16] - uuid_str = str(UUID(bytes_le=bytes(uuid_bytes))) + self._hub_name = str(hub_name_desc, "utf-8") - if uuid_str == FW_REV_UUID: - fw_version = bytearray(bos_descriptor[ofst + 20 : ofst + size]) - self.fw_version = Version(fw_version.decode()) + fw_version_desc = read_data( + PybricksUsbInterfaceClassRequest.GATT_CHARACTERISTIC, + short_uuid(FW_REV_UUID), + ) - elif uuid_str == SW_REV_UUID: - self._protocol_version = bytearray( - bos_descriptor[ofst + 20 : ofst + size] - ) + self.fw_version = Version(str(fw_version_desc, "utf-8")) - elif uuid_str == PYBRICKS_HUB_CAPABILITIES_UUID: - caps = bytearray(bos_descriptor[ofst + 20 : ofst + size]) - ( - _, - self._capability_flags, - self._max_user_program_size, - self._num_of_slots, - ) = unpack_hub_capabilities(caps) + sw_version_desc = read_data( + PybricksUsbInterfaceClassRequest.GATT_CHARACTERISTIC, + short_uuid(SW_REV_UUID), + ) + + self._protocol_version = str(sw_version_desc, "utf-8") + + hub_caps_desc = read_data( + PybricksUsbInterfaceClassRequest.PYBRICKS_CHARACTERISTIC, + short_uuid(PYBRICKS_HUB_CAPABILITIES_UUID), + ) - ofst += size + ( + _, + self._capability_flags, + self._max_user_program_size, + self._num_of_slots, + ) = unpack_hub_capabilities(hub_caps_desc) self._monitor_task = asyncio.create_task(self._monitor_usb()) diff --git a/pybricksdev/usb/pybricks.py b/pybricksdev/usb/pybricks.py index bfbdbf7..5f5aafb 100644 --- a/pybricksdev/usb/pybricks.py +++ b/pybricksdev/usb/pybricks.py @@ -3,11 +3,51 @@ """ Pybricks-specific USB protocol. + +This is generally a wrapper around the Pybricks BLE protocol adapted for USB. """ from enum import IntEnum +class PybricksUsbInterfaceClassRequest(IntEnum): + """ + Request type for the Pybricks USB interface class. + + This is passed as bRequest in the USB control transfer where wIndex is the + interface number of the Pybricks USB interface and bmRequestType has type + of Class (1) and Recipient of Interface (1). + """ + + GATT_CHARACTERISTIC = 1 + """ + Analogous to standard BLE GATT characteristics. + + bValue is the 16-bit UUID of the characteristic. + """ + + PYBRICKS_CHARACTERISTIC = 2 + """ + Analogous to custom BLE characteristics in the Pybricks Service. + + bValue is the 16-bit UUID of the characteristic (3rd and 4th bytes of the 128-bit UUID). + """ + + +PYBRICKS_USB_INTERFACE_CLASS_REQUEST_MAX_SIZE = 20 +""" +The maximum size of data that can be sent or received in a single control transfer +using the PybricksUsbInterfaceClassRequest interface class requests. + +This limit comes from the smallest MTU of BLE (23 bytes) minus the 3-byte ATT header. + +The Pybricks interface just uses data and doesn't use USB-style descriptors that +include the length. We can get away with this by limiting the size of the data +for each characteristic to be less than or equal to this value. Then, we can +always pass this as the wLength when reading. +""" + + class PybricksUsbInEpMessageType(IntEnum): RESPONSE = 1 """ From 86d312fd63b80943ed3861d2dfb1de1df42fed6c Mon Sep 17 00:00:00 2001 From: David Lechner Date: Sun, 20 Jul 2025 20:12:32 -0500 Subject: [PATCH 4/4] cli: add NXT and EV3 to USB match for run command Add the NXT and EV3 USB product IDs to the list of supported devices for the `run` command in the CLI. --- pybricksdev/cli/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pybricksdev/cli/__init__.py b/pybricksdev/cli/__init__.py index fc449a1..aa68d9d 100644 --- a/pybricksdev/cli/__init__.py +++ b/pybricksdev/cli/__init__.py @@ -196,8 +196,10 @@ async def run(self, args: argparse.Namespace): from pybricksdev.connections.pybricks import PybricksHubUSB from pybricksdev.usb import ( + EV3_USB_PID, LEGO_USB_VID, MINDSTORMS_INVENTOR_USB_PID, + NXT_USB_PID, SPIKE_ESSENTIAL_USB_PID, SPIKE_PRIME_USB_PID, ) @@ -208,6 +210,8 @@ def is_pybricks_usb(dev): and ( dev.idProduct in [ + NXT_USB_PID, + EV3_USB_PID, SPIKE_PRIME_USB_PID, SPIKE_ESSENTIAL_USB_PID, MINDSTORMS_INVENTOR_USB_PID,