diff --git a/README.md b/README.md index 46654cd..5b7bdc3 100644 --- a/README.md +++ b/README.md @@ -16,14 +16,14 @@ operations from within a Python script. `pybricksdev` requires Python 3.10 or higher. -- For Windows, use the [official Python installer][py-dl] or the [Windows Store][py38-win]. +- For Windows, use the [official Python installer][py-dl] or the [Windows Store][py312-win]. - For Mac, use the [official Python installer][py-dl] or Homebrew (`brew install python@3.12`). - For Linux, use the distro provided `python3.12` or if not available, use a Python runtime version manager such as [asdf][asdf] or [pyenv][pyenv]. [py-dl]: https://www.python.org/downloads/ -[py38-win]: https://www.microsoft.com/en-us/p/python-38/9mssztt1n39l +[py312-win]: https://apps.microsoft.com/detail/9ncvdn91xzqp [asdf]: https://asdf-vm.com [pyenv]: https://github.com/pyenv/pyenv diff --git a/docs/conf.py b/docs/conf.py index 46b4455..9e7abab 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -65,7 +65,6 @@ ("py:class", "bleak.backends.device.BLEDevice"), ("py:exc", "asyncio.TimeoutError"), ("py:class", "bleak.BleakClient"), - ("py:obj", "typing.Union"), ("py:class", "os.PathLike"), ("py:obj", "typing.BinaryIO"), ("py:class", "BinaryIO"), # yes, we need both! diff --git a/pybricksdev/ble/__init__.py b/pybricksdev/ble/__init__.py index a097fcd..010d4d5 100644 --- a/pybricksdev/ble/__init__.py +++ b/pybricksdev/ble/__init__.py @@ -3,7 +3,6 @@ import asyncio import logging -from typing import Optional from bleak import BleakClient, BleakScanner from bleak.backends.device import BLEDevice @@ -16,7 +15,7 @@ async def find_device( - name: Optional[str] = None, + name: str | None = None, service: str = PYBRICKS_SERVICE_UUID, timeout: float = 10, ) -> BLEDevice: diff --git a/pybricksdev/ble/lwp3/bytecodes.py b/pybricksdev/ble/lwp3/bytecodes.py index f1c0ee2..d7f9228 100644 --- a/pybricksdev/ble/lwp3/bytecodes.py +++ b/pybricksdev/ble/lwp3/bytecodes.py @@ -11,10 +11,9 @@ """ from enum import IntEnum, IntFlag, unique -from typing import Type, Union -def _create_pseudo_member_(cls: Type[IntEnum], value: int) -> IntEnum: +def _create_pseudo_member_(cls: type[IntEnum], value: int) -> IntEnum: """ Creates a new enum member at runtime for ``IntEnum``s. """ @@ -122,7 +121,7 @@ class BluetoothAddress(bytes): identifying individual Bluetooth devices instead of network cards. """ - def __new__(cls, value: Union[str, bytes]) -> "BluetoothAddress": + def __new__(cls, value: str | bytes) -> "BluetoothAddress": if isinstance(value, str): # if it is a string, assume the format "XX:XX:XX:XX:XX:XX" value = [int(x, 16) for x in value.split(":")] diff --git a/pybricksdev/ble/lwp3/messages.py b/pybricksdev/ble/lwp3/messages.py index a2dcf5f..e8a1ec0 100644 --- a/pybricksdev/ble/lwp3/messages.py +++ b/pybricksdev/ble/lwp3/messages.py @@ -13,7 +13,7 @@ import abc import struct from enum import IntEnum -from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Type, Union, overload +from typing import Any, NamedTuple, overload from pybricksdev.ble.lwp3.bytecodes import ( MAX_NAME_SIZE, @@ -144,7 +144,7 @@ def __repr__(self) -> str: class _HubPropertyType(NamedTuple): type: type fmt: str - max_size: Optional[int] = None + max_size: int | None = None # specifies payload type information for each property @@ -861,7 +861,7 @@ def __repr__(self) -> str: class PortFormatSetupComboMessage(AbstractPortFormatSetupComboMessage): - def __init__(self, port: PortID, modes_and_datasets: List[Tuple[int, int]]) -> None: + def __init__(self, port: PortID, modes_and_datasets: list[tuple[int, int]]) -> None: super().__init__( 5 + len(modes_and_datasets), port, PortInfoFormatSetupCommand.SET ) @@ -870,7 +870,7 @@ def __init__(self, port: PortID, modes_and_datasets: List[Tuple[int, int]]) -> N self._data[i] = ((mode & 0xF) << 4) | (dataset & 0xF) @property - def modes_and_datasets(self) -> List[Tuple[int, int]]: + def modes_and_datasets(self) -> list[tuple[int, int]]: return [(x >> 4, x & 0xF) for x in self._data[5:]] def __repr__(self) -> str: @@ -923,8 +923,8 @@ def __init__( port: PortID, capabilities: ModeCapabilities, num_modes: int, - input_modes: List[int], - output_modes: List[int], + input_modes: list[int], + output_modes: list[int], ) -> None: super().__init__(11, port, InfoKind.MODE_INFO) @@ -955,12 +955,12 @@ def num_modes(self) -> int: return self._data[6] @property - def input_modes(self) -> List[int]: + def input_modes(self) -> list[int]: (flags,) = struct.unpack_from(" List[int]: + def output_modes(self) -> list[int]: (flags,) = struct.unpack_from(" None: super().__init__(5 + len(combos) * 2, port, InfoKind.COMBOS) @@ -988,7 +988,7 @@ def __init__( struct.pack_into(f"<{len(flags)}H", self._data, 5, *flags) @property - def combos(self) -> List[List[int]]: + def combos(self) -> list[list[int]]: count = (len(self._data) - 5) // 2 return [ [m for m in range(16) if flags & (1 << m)] @@ -1211,7 +1211,7 @@ def __repr__(self) -> str: class PortValueMessage(AbstractMessage): - def __init__(self, port: PortID, fmt: str, *values: Union[int, float]) -> None: + def __init__(self, port: PortID, fmt: str, *values: int | float) -> None: super().__init__(4 + struct.calcsize(fmt), MessageKind.PORT_VALUE) self._data[3] = port @@ -1221,7 +1221,7 @@ def __init__(self, port: PortID, fmt: str, *values: Union[int, float]) -> None: def port(self) -> PortID: return PortID(self._data[3]) - def unpack(self, fmt: str) -> Tuple[Union[int, float], ...]: + def unpack(self, fmt: str) -> tuple[int | float, ...]: return struct.unpack_from(fmt, self._data, 4) def __repr__(self) -> str: @@ -1232,7 +1232,7 @@ def __repr__(self) -> str: class PortValueComboMessage(AbstractMessage): def __init__( - self, port: PortID, modes: List[int], fmt: str, *values: Union[int, float] + self, port: PortID, modes: list[int], fmt: str, *values: int | float ) -> None: super().__init__(6 + struct.calcsize(fmt), MessageKind.PORT_VALUE_COMBO) @@ -1249,11 +1249,11 @@ def port(self) -> PortID: return PortID(self._data[3]) @property - def modes(self) -> List[int]: + def modes(self) -> list[int]: (flags,) = struct.unpack_from(" Tuple[Union[int, float], ...]: + def unpack(self, fmt: str) -> tuple[int | float, ...]: return struct.unpack_from(fmt, self._data, 6) def __repr__(self) -> str: @@ -1294,7 +1294,7 @@ def __init__( port: PortID, combo: int, multi_update: bool, - modes_and_datasets: List[int], + modes_and_datasets: list[int], ) -> None: super().__init__(7, MessageKind.PORT_INPUT_FMT_COMBO) @@ -1321,7 +1321,7 @@ def multi_update(self) -> bool: return bool(self._data[4] & 0x80) @property - def modes_and_datasets(self) -> List[int]: + def modes_and_datasets(self) -> list[int]: (flags,) = struct.unpack_from(" None: super().__init__( 7 + struct.calcsize(fmt), @@ -1467,7 +1467,7 @@ def __init__( def mode(self) -> int: return self._data[6] - def unpack(self, fmt: str) -> Tuple[Union[int, float], ...]: + def unpack(self, fmt: str) -> tuple[int | float, ...]: return struct.unpack_from(fmt, self._data, 7) def __repr__(self) -> str: @@ -1500,10 +1500,10 @@ def __init__( self, port1: PortID, feedback1: Feedback, - port2: Optional[PortID] = None, - feedback2: Optional[Feedback] = None, - port3: Optional[PortID] = None, - feedback3: Optional[Feedback] = None, + port2: PortID | None = None, + feedback2: Feedback | None = None, + port3: PortID | None = None, + feedback3: Feedback | None = None, ) -> None: length = 5 @@ -1577,7 +1577,7 @@ class _Lookup(NamedTuple): index: int """The index of the bytecode that determines the type.""" - value: Union[Dict[IntEnum, Type[AbstractMessage]], Dict[IntEnum, "_Lookup"]] + value: dict[IntEnum, type[AbstractMessage]] | dict[IntEnum, "_Lookup"] """ A dictionary mapping a bytecode to the cooresponding Python type if the type can be determined or a dictionary mapping a bytecode to another lookup if more discrimination is required. diff --git a/pybricksdev/ble/pybricks.py b/pybricksdev/ble/pybricks.py index fcaf5ac..1b0fdc0 100644 --- a/pybricksdev/ble/pybricks.py +++ b/pybricksdev/ble/pybricks.py @@ -21,7 +21,7 @@ from enum import IntEnum, IntFlag from struct import unpack -from typing import Literal, Tuple +from typing import Literal import semver @@ -351,7 +351,7 @@ class UserProgramId(IntEnum): """ -def unpack_hub_capabilities(data: bytes) -> Tuple[int, HubCapabilityFlag, int, int]: +def unpack_hub_capabilities(data: bytes) -> tuple[int, HubCapabilityFlag, int, int]: """ Unpacks the value read from the hub capabilities characteristic. @@ -373,7 +373,7 @@ def unpack_hub_capabilities(data: bytes) -> Tuple[int, HubCapabilityFlag, int, i return max_char_size, HubCapabilityFlag(flags), max_user_prog_size, num_of_slots -def unpack_hub_capabilities_v15(data: bytes) -> Tuple[int, HubCapabilityFlag, int, int]: +def unpack_hub_capabilities_v15(data: bytes) -> tuple[int, HubCapabilityFlag, int, int]: """ Unpacks the value read from the hub capabilities characteristic. (Pybricks protocol v1.5) @@ -459,7 +459,7 @@ def short_uuid(uuid: str) -> int: """ -def unpack_pnp_id(data: bytes) -> Tuple[Literal["BT", "USB"], int, int, int]: +def unpack_pnp_id(data: bytes) -> tuple[Literal["BT", "USB"], int, int, int]: """ Unpacks raw data from the PnP ID characteristic. diff --git a/pybricksdev/cli/flash.py b/pybricksdev/cli/flash.py index 0b708a6..ac7e3d8 100644 --- a/pybricksdev/cli/flash.py +++ b/pybricksdev/cli/flash.py @@ -8,7 +8,7 @@ import sys import zlib from tempfile import NamedTemporaryFile -from typing import BinaryIO, Dict, Optional +from typing import BinaryIO from bleak import BleakClient, BleakScanner from bleak.backends.device import BLEDevice @@ -273,7 +273,7 @@ async def flash_ble(hub_kind: HubKind, firmware: bytes, metadata: dict): # as return value from find_device_by_filter() # https://github.com/hbldh/bleak/issues/1277 - device_adv_map: Dict[str, AdvertisementData] = {} + device_adv_map: dict[str, AdvertisementData] = {} def map_and_match(device: BLEDevice, adv: AdvertisementData): # capture the adv data for later use @@ -391,17 +391,19 @@ async def tick(callback): callback(CHUNK) print("Erasing memory and preparing firmware download...") - with logging_redirect_tqdm(), tqdm( - total=len(firmware), unit="B", unit_scale=True - ) as pbar: + with ( + logging_redirect_tqdm(), + tqdm(total=len(firmware), unit="B", unit_scale=True) as pbar, + ): await asyncio.gather( bootloader.erase_and_begin_download(len(firmware)), tick(pbar.update) ) print("Downloading firmware...") - with logging_redirect_tqdm(), tqdm( - total=len(firmware), unit="B", unit_scale=True - ) as pbar: + with ( + logging_redirect_tqdm(), + tqdm(total=len(firmware), unit="B", unit_scale=True) as pbar, + ): await bootloader.download(firmware, pbar.update) print("Verifying...", end="", flush=True) @@ -419,7 +421,7 @@ async def tick(callback): print("Done.") -async def flash_firmware(firmware_zip: BinaryIO, new_name: Optional[str]) -> None: +async def flash_firmware(firmware_zip: BinaryIO, new_name: str | None) -> None: """ Command line tool for flashing firmware. diff --git a/pybricksdev/cli/oad.py b/pybricksdev/cli/oad.py index c162c97..5dbc5f7 100644 --- a/pybricksdev/cli/oad.py +++ b/pybricksdev/cli/oad.py @@ -73,11 +73,12 @@ def on_disconnect(_): disconnect_event.set() # long timeout in case pairing is needed - async with asyncio.timeout(60), BleakClient( - device, on_disconnect - ) as client, OADImageIdentify(client) as image_identify, OADControlPoint( - client - ) as control_point: + async with ( + asyncio.timeout(60), + BleakClient(device, on_disconnect) as client, + OADImageIdentify(client) as image_identify, + OADControlPoint(client) as control_point, + ): image_block = OADImageBlock(client) print(f"Connected to {device.name}") @@ -107,9 +108,10 @@ def on_disconnect(_): print("Flashing...") - with logging_redirect_tqdm(), tqdm( - total=header.image_length, unit="B", unit_scale=True - ) as pbar: + with ( + logging_redirect_tqdm(), + tqdm(total=header.image_length, unit="B", unit_scale=True) as pbar, + ): async with asyncio.TaskGroup() as group: try: async for ( @@ -161,9 +163,11 @@ async def dump_oad_info(): return # long timeout in case pairing is needed - async with asyncio.timeout(30), BleakClient(device) as client, OADControlPoint( - client - ) as control_point: + async with ( + asyncio.timeout(30), + BleakClient(device) as client, + OADControlPoint(client) as control_point, + ): sw_ver = await control_point.get_software_version() print( f"Software version: app={sw_ver.app.major}.{sw_ver.app.minor}, stack={sw_ver.stack.major}.{sw_ver.stack.minor}" diff --git a/pybricksdev/compile.py b/pybricksdev/compile.py index 72816cd..75d2ca7 100644 --- a/pybricksdev/compile.py +++ b/pybricksdev/compile.py @@ -5,7 +5,6 @@ import logging import os from modulefinder import ModuleFinder -from typing import List, Optional, Tuple, Union import mpy_cross_v5 import mpy_cross_v6 @@ -30,7 +29,7 @@ def make_build_dir(): async def compile_file( - proj_dir: str, proj_path: str, abi: int, compile_args: Optional[List[str]] = None + proj_dir: str, proj_path: str, abi: int, compile_args: list[str] | None = None ): """Compiles a Python file with ``mpy-cross``. @@ -81,7 +80,7 @@ async def compile_file( return mpy -async def compile_multi_file(path: str, abi: Union[int, Tuple[int, int]]): +async def compile_multi_file(path: str, abi: int | tuple[int, int]): """Compiles a Python file and its dependencies with ``mpy-cross``. On the hub, all dependencies behave as independent modules. Any (leading) @@ -134,7 +133,7 @@ async def compile_multi_file(path: str, abi: Union[int, Tuple[int, int]]): logger.debug("missing modules: %r", finder.any_missing()) # Get a data blob with all scripts. - parts: List[bytes] = [] + parts: list[bytes] = [] abi_major, abi_minor = (abi, None) if isinstance(abi, int) else abi diff --git a/pybricksdev/connections/ev3.py b/pybricksdev/connections/ev3.py index b5a3573..9c4e2ac 100644 --- a/pybricksdev/connections/ev3.py +++ b/pybricksdev/connections/ev3.py @@ -5,7 +5,7 @@ import enum import itertools import struct -from typing import Callable, Optional, Tuple +from typing import Callable import hid @@ -75,7 +75,7 @@ def close(self) -> None: """ self._device.close() - def _send_command(self, command: Command, payload: Optional[bytes] = None) -> int: + def _send_command(self, command: Command, payload: bytes | None = None) -> int: length = 4 if payload is not None: @@ -145,7 +145,7 @@ def _receive_reply( def download_sync( self, data: bytes, - progress: Optional[Callable[[int], None]] = None, + progress: Callable[[int], None] | None = None, ) -> None: """ Blocking version of :meth:`download`. @@ -169,7 +169,7 @@ def download_sync( async def download( self, data: bytes, - progress: Optional[Callable[[int], None]] = None, + progress: Callable[[int], None] | None = None, ) -> None: """ Downloads a firmware blob to the EV3. @@ -244,7 +244,7 @@ async def get_checksum(self, address: int, size: int) -> int: None, self.get_checksum_sync, address, size ) - def get_version_sync(self) -> Tuple[int, int]: + def get_version_sync(self) -> tuple[int, int]: """ Blocking version of :meth:`get_version`. """ @@ -257,7 +257,7 @@ def get_version_sync(self) -> Tuple[int, int]: payload = self._receive_reply(Command.GET_VERSION, num, force_length=13) return struct.unpack(" Tuple[int, int]: + async def get_version(self) -> tuple[int, int]: """ Gets the bootloader firmware version and the hardware version. diff --git a/pybricksdev/connections/pybricks.py b/pybricksdev/connections/pybricks.py index 709f238..8b76c1d 100644 --- a/pybricksdev/connections/pybricks.py +++ b/pybricksdev/connections/pybricks.py @@ -6,7 +6,7 @@ import logging import os import struct -from typing import Awaitable, Callable, List, Optional, TypeVar +from typing import Awaitable, Callable, TypeVar import reactivex.operators as op import semver @@ -76,7 +76,7 @@ class HubPowerButtonPressedError(RuntimeError): class PybricksHub: EOL = b"\r\n" # MicroPython EOL - fw_version: Optional[Version] + fw_version: Version | None """ Firmware version of the connected hub or ``None`` if not connected yet. """ @@ -152,7 +152,7 @@ def __init__(self): # REVISIT: It would be better to be able to subscribe to output instead # of always capturing it even if it is not used. This is currently # used in motor test code in pybricks-micropython. - self.output: List[bytes] = [] + self.output: list[bytes] = [] """ Contains lines printed to stdout of the hub as a a list of bytes. @@ -473,9 +473,10 @@ async def download_user_program(self, program: bytes) -> None: payload_size = self._max_write_size - 5 # write program data with progress bar - with logging_redirect_tqdm(), tqdm( - total=len(program), unit="B", unit_scale=True - ) as pbar: + with ( + logging_redirect_tqdm(), + tqdm(total=len(program), unit="B", unit_scale=True) as pbar, + ): for i, c in enumerate(chunk(program, payload_size)): await self.write_gatt_char( PYBRICKS_COMMAND_EVENT_UUID, @@ -579,7 +580,7 @@ async def download(self, script_path: str) -> None: async def run( self, - py_path: Optional[str] = None, + py_path: str | None = None, wait: bool = True, print_output: bool = True, line_handler: bool = True, @@ -678,9 +679,10 @@ async def send_block(data: bytes) -> None: await send_block(length) # Send the data chunk by chunk - with logging_redirect_tqdm(), tqdm( - total=len(mpy), unit="B", unit_scale=True - ) as pbar: + with ( + logging_redirect_tqdm(), + tqdm(total=len(mpy), unit="B", unit_scale=True) as pbar, + ): for c in chunk(mpy, 100): await send_block(c) pbar.update(len(c)) diff --git a/pybricksdev/firmware.py b/pybricksdev/firmware.py index 8a77979..7913afe 100644 --- a/pybricksdev/firmware.py +++ b/pybricksdev/firmware.py @@ -11,7 +11,7 @@ import struct import sys import zipfile -from typing import BinaryIO, List, Literal, Optional, Tuple, TypedDict, Union +from typing import BinaryIO, Literal, TypedDict if sys.version_info < (3, 10): from typing_extensions import TypeGuard @@ -33,7 +33,7 @@ class FirmwareMetadataV100( "device-id": Literal[0x40, 0x41, 0x80, 0x81], "checksum-type": Literal["sum", "crc32"], "mpy-abi-version": int, - "mpy-cross-options": List[str], + "mpy-cross-options": list[str], "user-mpy-offset": int, "max-firmware-size": int, }, @@ -98,17 +98,17 @@ class FirmwareMetadataV210( """ -AnyFirmwareV1Metadata = Union[FirmwareMetadataV100, FirmwareMetadataV110] +AnyFirmwareV1Metadata = FirmwareMetadataV100 | FirmwareMetadataV110 """ Type for data contained in ``firmware.metadata.json`` files of any 1.x version. """ -AnyFirmwareV2Metadata = Union[FirmwareMetadataV200, FirmwareMetadataV210] +AnyFirmwareV2Metadata = FirmwareMetadataV200 | FirmwareMetadataV210 """ Type for data contained in ``firmware.metadata.json`` files of any 2.x version. """ -AnyFirmwareMetadata = Union[AnyFirmwareV1Metadata, AnyFirmwareV2Metadata] +AnyFirmwareMetadata = AnyFirmwareV1Metadata | AnyFirmwareV2Metadata """ Type for data contained in ``firmware.metadata.json`` files of any version. """ @@ -127,7 +127,7 @@ def _firmware_metadata_is_v2( async def _create_firmware_v1( - metadata: AnyFirmwareV1Metadata, archive: zipfile.ZipFile, name: Optional[str] + metadata: AnyFirmwareV1Metadata, archive: zipfile.ZipFile, name: str | None ) -> bytearray: base = archive.open("firmware-base.bin").read() @@ -189,7 +189,7 @@ async def _create_firmware_v1( async def _create_firmware_v2( - metadata: AnyFirmwareV2Metadata, archive: zipfile.ZipFile, name: Optional[str] + metadata: AnyFirmwareV2Metadata, archive: zipfile.ZipFile, name: str | None ) -> bytearray: base = archive.open("firmware-base.bin").read() @@ -233,8 +233,8 @@ async def _create_firmware_v2( async def create_firmware_blob( - firmware_zip: Union[str, os.PathLike, BinaryIO], name: Optional[str] = None -) -> Tuple[bytes, AnyFirmwareMetadata, str]: + firmware_zip: str | os.PathLike | BinaryIO, name: str | None = None +) -> tuple[bytes, AnyFirmwareMetadata, str]: """Creates a firmware blob from base firmware and an optional custom name. .. note:: The firmware.zip file must contain the following files:: diff --git a/pybricksdev/flash.py b/pybricksdev/flash.py index 5371631..9f1f873 100644 --- a/pybricksdev/flash.py +++ b/pybricksdev/flash.py @@ -7,7 +7,6 @@ import platform import struct from collections import namedtuple -from typing import Dict, List, Optional, Tuple from tqdm.auto import tqdm from tqdm.contrib.logging import logging_redirect_tqdm @@ -20,7 +19,7 @@ # NAME, PAYLOAD_SIZE requirement -HUB_INFO: Dict[HubKind, Tuple[str, int]] = { +HUB_INFO: dict[HubKind, tuple[str, int]] = { HubKind.BOOST: ("Move Hub", 14), HubKind.CITY: ("City Hub", 32), HubKind.TECHNIC: ("Technic Hub", 32), @@ -34,7 +33,7 @@ def __init__( self, command: BootloaderCommand, name: str, - request_format: List[str], + request_format: list[str], data_format: str, request_reply: bool = True, write_with_response: bool = True, @@ -47,7 +46,7 @@ def __init__( self.reply_len += 1 self.write_with_response = write_with_response - def make_request(self, payload: Optional[bytes] = None) -> bytearray: + def make_request(self, payload: bytes | None = None) -> bytearray: request = bytearray([self.command]) if payload is not None: request += payload @@ -220,9 +219,10 @@ async def flash(self, firmware, metadata): logger.debug("Begin update.") # Maintain progress using tqdm - with logging_redirect_tqdm(), tqdm( - total=firmware_size, unit="B", unit_scale=True - ) as pbar: + with ( + logging_redirect_tqdm(), + tqdm(total=firmware_size, unit="B", unit_scale=True) as pbar, + ): def reader(): while True: diff --git a/pyproject.toml b/pyproject.toml index ad1b1cf..8898b7d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,7 +71,7 @@ requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" [tool.black] -target-version = ['py38'] +target-version = ['py310'] [tool.isort] profile = "black"