Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ operations from within a Python script.

`pybricksdev` requires Python 3.10 or higher.

- For Windows, use the [official Python installer][py-dl] or the [Windows Store][py38-win].
- For Windows, use the [official Python installer][py-dl] or the [Windows Store][py312-win].
- For Mac, use the [official Python installer][py-dl] or Homebrew (`brew install python@3.12`).
- For Linux, use the distro provided `python3.12` or if not available, use a Python
runtime version manager such as [asdf][asdf] or [pyenv][pyenv].


[py-dl]: https://www.python.org/downloads/
[py38-win]: https://www.microsoft.com/en-us/p/python-38/9mssztt1n39l
[py312-win]: https://apps.microsoft.com/detail/9ncvdn91xzqp
[asdf]: https://asdf-vm.com
[pyenv]: https://github.com/pyenv/pyenv

Expand Down
1 change: 0 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
("py:class", "bleak.backends.device.BLEDevice"),
("py:exc", "asyncio.TimeoutError"),
("py:class", "bleak.BleakClient"),
("py:obj", "typing.Union"),
("py:class", "os.PathLike"),
("py:obj", "typing.BinaryIO"),
("py:class", "BinaryIO"), # yes, we need both!
Expand Down
3 changes: 1 addition & 2 deletions pybricksdev/ble/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import asyncio
import logging
from typing import Optional

from bleak import BleakClient, BleakScanner
from bleak.backends.device import BLEDevice
Expand All @@ -16,7 +15,7 @@


async def find_device(
name: Optional[str] = None,
name: str | None = None,
service: str = PYBRICKS_SERVICE_UUID,
timeout: float = 10,
) -> BLEDevice:
Expand Down
5 changes: 2 additions & 3 deletions pybricksdev/ble/lwp3/bytecodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
"""

from enum import IntEnum, IntFlag, unique
from typing import Type, Union


def _create_pseudo_member_(cls: Type[IntEnum], value: int) -> IntEnum:
def _create_pseudo_member_(cls: type[IntEnum], value: int) -> IntEnum:
"""
Creates a new enum member at runtime for ``IntEnum``s.
"""
Expand Down Expand Up @@ -122,7 +121,7 @@ class BluetoothAddress(bytes):
identifying individual Bluetooth devices instead of network cards.
"""

def __new__(cls, value: Union[str, bytes]) -> "BluetoothAddress":
def __new__(cls, value: str | bytes) -> "BluetoothAddress":
if isinstance(value, str):
# if it is a string, assume the format "XX:XX:XX:XX:XX:XX"
value = [int(x, 16) for x in value.split(":")]
Expand Down
48 changes: 24 additions & 24 deletions pybricksdev/ble/lwp3/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import abc
import struct
from enum import IntEnum
from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Type, Union, overload
from typing import Any, NamedTuple, overload

from pybricksdev.ble.lwp3.bytecodes import (
MAX_NAME_SIZE,
Expand Down Expand Up @@ -144,7 +144,7 @@ def __repr__(self) -> str:
class _HubPropertyType(NamedTuple):
type: type
fmt: str
max_size: Optional[int] = None
max_size: int | None = None


# specifies payload type information for each property
Expand Down Expand Up @@ -861,7 +861,7 @@ def __repr__(self) -> str:


class PortFormatSetupComboMessage(AbstractPortFormatSetupComboMessage):
def __init__(self, port: PortID, modes_and_datasets: List[Tuple[int, int]]) -> None:
def __init__(self, port: PortID, modes_and_datasets: list[tuple[int, int]]) -> None:
super().__init__(
5 + len(modes_and_datasets), port, PortInfoFormatSetupCommand.SET
)
Expand All @@ -870,7 +870,7 @@ def __init__(self, port: PortID, modes_and_datasets: List[Tuple[int, int]]) -> N
self._data[i] = ((mode & 0xF) << 4) | (dataset & 0xF)

@property
def modes_and_datasets(self) -> List[Tuple[int, int]]:
def modes_and_datasets(self) -> list[tuple[int, int]]:
return [(x >> 4, x & 0xF) for x in self._data[5:]]

def __repr__(self) -> str:
Expand Down Expand Up @@ -923,8 +923,8 @@ def __init__(
port: PortID,
capabilities: ModeCapabilities,
num_modes: int,
input_modes: List[int],
output_modes: List[int],
input_modes: list[int],
output_modes: list[int],
) -> None:
super().__init__(11, port, InfoKind.MODE_INFO)

Expand Down Expand Up @@ -955,12 +955,12 @@ def num_modes(self) -> int:
return self._data[6]

@property
def input_modes(self) -> List[int]:
def input_modes(self) -> list[int]:
(flags,) = struct.unpack_from("<H", self._data, 7)
return [n for n in range(16) if flags & (1 << n)]

@property
def output_modes(self) -> List[int]:
def output_modes(self) -> list[int]:
(flags,) = struct.unpack_from("<H", self._data, 9)
return [n for n in range(16) if flags & (1 << n)]

Expand All @@ -972,7 +972,7 @@ class PortInfoCombosMessage(AbstractPortInfoMessage):
def __init__(
self,
port: PortID,
combos: List[List[int]],
combos: list[list[int]],
) -> None:
super().__init__(5 + len(combos) * 2, port, InfoKind.COMBOS)

Expand All @@ -988,7 +988,7 @@ def __init__(
struct.pack_into(f"<{len(flags)}H", self._data, 5, *flags)

@property
def combos(self) -> List[List[int]]:
def combos(self) -> list[list[int]]:
count = (len(self._data) - 5) // 2
return [
[m for m in range(16) if flags & (1 << m)]
Expand Down Expand Up @@ -1211,7 +1211,7 @@ def __repr__(self) -> str:


class PortValueMessage(AbstractMessage):
def __init__(self, port: PortID, fmt: str, *values: Union[int, float]) -> None:
def __init__(self, port: PortID, fmt: str, *values: int | float) -> None:
super().__init__(4 + struct.calcsize(fmt), MessageKind.PORT_VALUE)

self._data[3] = port
Expand All @@ -1221,7 +1221,7 @@ def __init__(self, port: PortID, fmt: str, *values: Union[int, float]) -> None:
def port(self) -> PortID:
return PortID(self._data[3])

def unpack(self, fmt: str) -> Tuple[Union[int, float], ...]:
def unpack(self, fmt: str) -> tuple[int | float, ...]:
return struct.unpack_from(fmt, self._data, 4)

def __repr__(self) -> str:
Expand All @@ -1232,7 +1232,7 @@ def __repr__(self) -> str:

class PortValueComboMessage(AbstractMessage):
def __init__(
self, port: PortID, modes: List[int], fmt: str, *values: Union[int, float]
self, port: PortID, modes: list[int], fmt: str, *values: int | float
) -> None:
super().__init__(6 + struct.calcsize(fmt), MessageKind.PORT_VALUE_COMBO)

Expand All @@ -1249,11 +1249,11 @@ def port(self) -> PortID:
return PortID(self._data[3])

@property
def modes(self) -> List[int]:
def modes(self) -> list[int]:
(flags,) = struct.unpack_from("<H", self._data, 4)
return [m for m in range(16) if flags & (1 << m)]

def unpack(self, fmt: str) -> Tuple[Union[int, float], ...]:
def unpack(self, fmt: str) -> tuple[int | float, ...]:
return struct.unpack_from(fmt, self._data, 6)

def __repr__(self) -> str:
Expand Down Expand Up @@ -1294,7 +1294,7 @@ def __init__(
port: PortID,
combo: int,
multi_update: bool,
modes_and_datasets: List[int],
modes_and_datasets: list[int],
) -> None:
super().__init__(7, MessageKind.PORT_INPUT_FMT_COMBO)

Expand All @@ -1321,7 +1321,7 @@ def multi_update(self) -> bool:
return bool(self._data[4] & 0x80)

@property
def modes_and_datasets(self) -> List[int]:
def modes_and_datasets(self) -> list[int]:
(flags,) = struct.unpack_from("<H", self._data, 5)
return [m for m in range(16) if flags & (1 << m)]

Expand Down Expand Up @@ -1450,7 +1450,7 @@ def __init__(
end: EndInfo,
mode: int,
fmt: str,
*values: Union[int, float],
*values: int | float,
) -> None:
super().__init__(
7 + struct.calcsize(fmt),
Expand All @@ -1467,7 +1467,7 @@ def __init__(
def mode(self) -> int:
return self._data[6]

def unpack(self, fmt: str) -> Tuple[Union[int, float], ...]:
def unpack(self, fmt: str) -> tuple[int | float, ...]:
return struct.unpack_from(fmt, self._data, 7)

def __repr__(self) -> str:
Expand Down Expand Up @@ -1500,10 +1500,10 @@ def __init__(
self,
port1: PortID,
feedback1: Feedback,
port2: Optional[PortID] = None,
feedback2: Optional[Feedback] = None,
port3: Optional[PortID] = None,
feedback3: Optional[Feedback] = None,
port2: PortID | None = None,
feedback2: Feedback | None = None,
port3: PortID | None = None,
feedback3: Feedback | None = None,
) -> None:
length = 5

Expand Down Expand Up @@ -1577,7 +1577,7 @@ class _Lookup(NamedTuple):
index: int
"""The index of the bytecode that determines the type."""

value: Union[Dict[IntEnum, Type[AbstractMessage]], Dict[IntEnum, "_Lookup"]]
value: dict[IntEnum, type[AbstractMessage]] | dict[IntEnum, "_Lookup"]
"""
A dictionary mapping a bytecode to the cooresponding Python type if the type can be determined or
a dictionary mapping a bytecode to another lookup if more discrimination is required.
Expand Down
8 changes: 4 additions & 4 deletions pybricksdev/ble/pybricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from enum import IntEnum, IntFlag
from struct import unpack
from typing import Literal, Tuple
from typing import Literal

import semver

Expand Down Expand Up @@ -351,7 +351,7 @@ class UserProgramId(IntEnum):
"""


def unpack_hub_capabilities(data: bytes) -> Tuple[int, HubCapabilityFlag, int, int]:
def unpack_hub_capabilities(data: bytes) -> tuple[int, HubCapabilityFlag, int, int]:
"""
Unpacks the value read from the hub capabilities characteristic.

Expand All @@ -373,7 +373,7 @@ def unpack_hub_capabilities(data: bytes) -> Tuple[int, HubCapabilityFlag, int, i
return max_char_size, HubCapabilityFlag(flags), max_user_prog_size, num_of_slots


def unpack_hub_capabilities_v15(data: bytes) -> Tuple[int, HubCapabilityFlag, int, int]:
def unpack_hub_capabilities_v15(data: bytes) -> tuple[int, HubCapabilityFlag, int, int]:
"""
Unpacks the value read from the hub capabilities characteristic. (Pybricks protocol v1.5)

Expand Down Expand Up @@ -459,7 +459,7 @@ def short_uuid(uuid: str) -> int:
"""


def unpack_pnp_id(data: bytes) -> Tuple[Literal["BT", "USB"], int, int, int]:
def unpack_pnp_id(data: bytes) -> tuple[Literal["BT", "USB"], int, int, int]:
"""
Unpacks raw data from the PnP ID characteristic.

Expand Down
20 changes: 11 additions & 9 deletions pybricksdev/cli/flash.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import sys
import zlib
from tempfile import NamedTemporaryFile
from typing import BinaryIO, Dict, Optional
from typing import BinaryIO

from bleak import BleakClient, BleakScanner
from bleak.backends.device import BLEDevice
Expand Down Expand Up @@ -273,7 +273,7 @@ async def flash_ble(hub_kind: HubKind, firmware: bytes, metadata: dict):
# as return value from find_device_by_filter()
# https://github.com/hbldh/bleak/issues/1277

device_adv_map: Dict[str, AdvertisementData] = {}
device_adv_map: dict[str, AdvertisementData] = {}

def map_and_match(device: BLEDevice, adv: AdvertisementData):
# capture the adv data for later use
Expand Down Expand Up @@ -391,17 +391,19 @@ async def tick(callback):
callback(CHUNK)

print("Erasing memory and preparing firmware download...")
with logging_redirect_tqdm(), tqdm(
total=len(firmware), unit="B", unit_scale=True
) as pbar:
with (
logging_redirect_tqdm(),
tqdm(total=len(firmware), unit="B", unit_scale=True) as pbar,
):
await asyncio.gather(
bootloader.erase_and_begin_download(len(firmware)), tick(pbar.update)
)

print("Downloading firmware...")
with logging_redirect_tqdm(), tqdm(
total=len(firmware), unit="B", unit_scale=True
) as pbar:
with (
logging_redirect_tqdm(),
tqdm(total=len(firmware), unit="B", unit_scale=True) as pbar,
):
await bootloader.download(firmware, pbar.update)

print("Verifying...", end="", flush=True)
Expand All @@ -419,7 +421,7 @@ async def tick(callback):
print("Done.")


async def flash_firmware(firmware_zip: BinaryIO, new_name: Optional[str]) -> None:
async def flash_firmware(firmware_zip: BinaryIO, new_name: str | None) -> None:
"""
Command line tool for flashing firmware.

Expand Down
26 changes: 15 additions & 11 deletions pybricksdev/cli/oad.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ def on_disconnect(_):
disconnect_event.set()

# long timeout in case pairing is needed
async with asyncio.timeout(60), BleakClient(
device, on_disconnect
) as client, OADImageIdentify(client) as image_identify, OADControlPoint(
client
) as control_point:
async with (
asyncio.timeout(60),
BleakClient(device, on_disconnect) as client,
OADImageIdentify(client) as image_identify,
OADControlPoint(client) as control_point,
):
image_block = OADImageBlock(client)

print(f"Connected to {device.name}")
Expand Down Expand Up @@ -107,9 +108,10 @@ def on_disconnect(_):

print("Flashing...")

with logging_redirect_tqdm(), tqdm(
total=header.image_length, unit="B", unit_scale=True
) as pbar:
with (
logging_redirect_tqdm(),
tqdm(total=header.image_length, unit="B", unit_scale=True) as pbar,
):
async with asyncio.TaskGroup() as group:
try:
async for (
Expand Down Expand Up @@ -161,9 +163,11 @@ async def dump_oad_info():
return

# long timeout in case pairing is needed
async with asyncio.timeout(30), BleakClient(device) as client, OADControlPoint(
client
) as control_point:
async with (
asyncio.timeout(30),
BleakClient(device) as client,
OADControlPoint(client) as control_point,
):
sw_ver = await control_point.get_software_version()
print(
f"Software version: app={sw_ver.app.major}.{sw_ver.app.minor}, stack={sw_ver.stack.major}.{sw_ver.stack.minor}"
Expand Down
Loading