Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion pybricksdev/ble/pybricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ class UserProgramId(IntEnum):
"""


def unpack_hub_capabilities(data: bytes) -> Tuple[int, HubCapabilityFlag, int]:
def unpack_hub_capabilities(data: bytes) -> Tuple[int, HubCapabilityFlag, int, int]:
"""
Unpacks the value read from the hub capabilities characteristic.

Expand Down Expand Up @@ -407,6 +407,18 @@ def _standard_uuid(short: int) -> str:
return f"{short:08x}-0000-1000-8000-00805f9b34fb"


def short_uuid(uuid: str) -> int:
"""Gets a 16-bit from a 128-bit UUID.

Args:
uuid: a 128-bit UUID as a string.

Returns:
The 16-bit UUID.
"""
return int(uuid[4:8], 16)


# Device Information Service: https://www.bluetooth.com/specifications/specs/device-information-service-1-1/

DI_SERVICE_UUID = _standard_uuid(0x180A)
Expand All @@ -415,6 +427,15 @@ def _standard_uuid(short: int) -> str:
.. availability:: Since Pybricks protocol v1.0.0.
"""

DEVICE_NAME_UUID = _standard_uuid(0x2A00)
"""Standard Device Name characteristic UUID.

We typically don't read this directly over BLE since some OSes block reading it.
Instead,we use the name from the advertising data.

.. availability:: Since Pybricks protocol v1.0.0.
"""

FW_REV_UUID = _standard_uuid(0x2A26)
"""Standard Firmware Revision String characteristic UUID

Expand Down
4 changes: 4 additions & 0 deletions pybricksdev/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,10 @@ async def run(self, args: argparse.Namespace):

from pybricksdev.connections.pybricks import PybricksHubUSB
from pybricksdev.usb import (
EV3_USB_PID,
LEGO_USB_VID,
MINDSTORMS_INVENTOR_USB_PID,
NXT_USB_PID,
SPIKE_ESSENTIAL_USB_PID,
SPIKE_PRIME_USB_PID,
)
Expand All @@ -208,6 +210,8 @@ def is_pybricks_usb(dev):
and (
dev.idProduct
in [
NXT_USB_PID,
EV3_USB_PID,
SPIKE_PRIME_USB_PID,
SPIKE_ESSENTIAL_USB_PID,
MINDSTORMS_INVENTOR_USB_PID,
Expand Down
120 changes: 84 additions & 36 deletions pybricksdev/connections/pybricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
import struct
from typing import Awaitable, Callable, List, Optional, TypeVar
from uuid import UUID

import reactivex.operators as op
import semver
Expand All @@ -18,14 +17,23 @@
from reactivex.subject import BehaviorSubject, Subject
from tqdm.auto import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
from usb.control import get_descriptor
from usb.core import Device as USBDevice
from usb.core import Endpoint, USBTimeoutError
from usb.util import ENDPOINT_IN, ENDPOINT_OUT, endpoint_direction, find_descriptor
from usb.core import Endpoint, Interface, USBTimeoutError
from usb.util import (
CTRL_IN,
CTRL_RECIPIENT_INTERFACE,
CTRL_TYPE_CLASS,
ENDPOINT_IN,
ENDPOINT_OUT,
build_request_type,
endpoint_direction,
find_descriptor,
)

from pybricksdev.ble.lwp3.bytecodes import HubKind
from pybricksdev.ble.nus import NUS_RX_UUID, NUS_TX_UUID
from pybricksdev.ble.pybricks import (
DEVICE_NAME_UUID,
FW_REV_UUID,
PNP_ID_UUID,
PYBRICKS_COMMAND_EVENT_UUID,
Expand All @@ -37,6 +45,7 @@
HubCapabilityFlag,
StatusFlag,
UserProgramId,
short_uuid,
unpack_hub_capabilities,
unpack_pnp_id,
)
Expand All @@ -45,7 +54,9 @@
from pybricksdev.tools import chunk
from pybricksdev.tools.checksum import xor_bytes
from pybricksdev.usb.pybricks import (
PYBRICKS_USB_INTERFACE_CLASS_REQUEST_MAX_SIZE,
PybricksUsbInEpMessageType,
PybricksUsbInterfaceClassRequest,
PybricksUsbOutEpMessageType,
)

Expand Down Expand Up @@ -786,6 +797,40 @@ async def start_notify(self, uuid: str, callback: Callable) -> None:
return await self._client.start_notify(uuid, callback)


def get_interface_class_data(
dev: USBDevice,
interface: Interface,
desc_size: int,
request: int = 0,
value: int = 0,
) -> bytes:
"""
Get a vendor-specific descriptor.

usb.util doesn't have a method like this, so we have to do it ourselves.

Args:
dev: The USB device.
vendor_code: The vendor code from the BOS descriptor.
value: The wValue field of the request.
index: The wIndex field of the request.
"""

bmRequestType = build_request_type(
CTRL_IN, CTRL_TYPE_CLASS, CTRL_RECIPIENT_INTERFACE
)

ret = dev.ctrl_transfer(
bmRequestType=bmRequestType,
bRequest=request,
wValue=value,
wIndex=interface.index,
data_or_wLength=desc_size,
)

return bytes(ret)


class PybricksHubUSB(PybricksHub):
_device: USBDevice
_ep_in: Endpoint
Expand Down Expand Up @@ -820,46 +865,49 @@ async def _client_connect(self) -> bool:
# There is 1 byte overhead for PybricksUsbMessageType
self._max_write_size = self._ep_out.wMaxPacketSize - 1

# Get length of BOS descriptor
bos_descriptor = get_descriptor(self._device, 5, 0x0F, 0)
(ofst, _, bos_len, _) = struct.unpack("<BBHB", bos_descriptor)

# Get full BOS descriptor
bos_descriptor = get_descriptor(self._device, bos_len, 0x0F, 0)
# This is the equivalent of reading GATT characteristics for BLE connections.

while ofst < bos_len:
(size, desc_type, cap_type) = struct.unpack_from(
"<BBB", bos_descriptor, offset=ofst
def read_data(req: PybricksUsbInterfaceClassRequest, value: int) -> bytes:
return get_interface_class_data(
self._device,
intf,
PYBRICKS_USB_INTERFACE_CLASS_REQUEST_MAX_SIZE,
req,
value,
)

if desc_type != 0x10:
logger.error("Expected Device Capability descriptor")
exit(1)
hub_name_desc = read_data(
PybricksUsbInterfaceClassRequest.GATT_CHARACTERISTIC,
short_uuid(DEVICE_NAME_UUID),
)

# Look for platform descriptors
if cap_type == 0x05:
uuid_bytes = bos_descriptor[ofst + 4 : ofst + 4 + 16]
uuid_str = str(UUID(bytes_le=bytes(uuid_bytes)))
self._hub_name = str(hub_name_desc, "utf-8")

if uuid_str == FW_REV_UUID:
fw_version = bytearray(bos_descriptor[ofst + 20 : ofst + size])
self.fw_version = Version(fw_version.decode())
fw_version_desc = read_data(
PybricksUsbInterfaceClassRequest.GATT_CHARACTERISTIC,
short_uuid(FW_REV_UUID),
)

elif uuid_str == SW_REV_UUID:
self._protocol_version = bytearray(
bos_descriptor[ofst + 20 : ofst + size]
)
self.fw_version = Version(str(fw_version_desc, "utf-8"))

elif uuid_str == PYBRICKS_HUB_CAPABILITIES_UUID:
caps = bytearray(bos_descriptor[ofst + 20 : ofst + size])
(
_,
self._capability_flags,
self._max_user_program_size,
self._num_of_slots,
) = unpack_hub_capabilities(caps)
sw_version_desc = read_data(
PybricksUsbInterfaceClassRequest.GATT_CHARACTERISTIC,
short_uuid(SW_REV_UUID),
)

self._protocol_version = str(sw_version_desc, "utf-8")

hub_caps_desc = read_data(
PybricksUsbInterfaceClassRequest.PYBRICKS_CHARACTERISTIC,
short_uuid(PYBRICKS_HUB_CAPABILITIES_UUID),
)

ofst += size
(
_,
self._capability_flags,
self._max_user_program_size,
self._num_of_slots,
) = unpack_hub_capabilities(hub_caps_desc)

self._monitor_task = asyncio.create_task(self._monitor_usb())

Expand Down
1 change: 1 addition & 0 deletions pybricksdev/usb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

# Official LEGO USB identifiers
LEGO_USB_VID = 0x0694
NXT_USB_PID = 0x0002
EV3_USB_PID = 0x0005
EV3_BOOTLOADER_USB_PID = 0x0006
SPIKE_PRIME_DFU_USB_PID = 0x0008
Expand Down
40 changes: 40 additions & 0 deletions pybricksdev/usb/pybricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,51 @@

"""
Pybricks-specific USB protocol.

This is generally a wrapper around the Pybricks BLE protocol adapted for USB.
"""

from enum import IntEnum


class PybricksUsbInterfaceClassRequest(IntEnum):
"""
Request type for the Pybricks USB interface class.

This is passed as bRequest in the USB control transfer where wIndex is the
interface number of the Pybricks USB interface and bmRequestType has type
of Class (1) and Recipient of Interface (1).
"""

GATT_CHARACTERISTIC = 1
"""
Analogous to standard BLE GATT characteristics.

bValue is the 16-bit UUID of the characteristic.
"""

PYBRICKS_CHARACTERISTIC = 2
"""
Analogous to custom BLE characteristics in the Pybricks Service.

bValue is the 16-bit UUID of the characteristic (3rd and 4th bytes of the 128-bit UUID).
"""


PYBRICKS_USB_INTERFACE_CLASS_REQUEST_MAX_SIZE = 20
"""
The maximum size of data that can be sent or received in a single control transfer
using the PybricksUsbInterfaceClassRequest interface class requests.

This limit comes from the smallest MTU of BLE (23 bytes) minus the 3-byte ATT header.

The Pybricks interface just uses data and doesn't use USB-style descriptors that
include the length. We can get away with this by limiting the size of the data
for each characteristic to be less than or equal to this value. Then, we can
always pass this as the wLength when reading.
"""


class PybricksUsbInEpMessageType(IntEnum):
RESPONSE = 1
"""
Expand Down