diff --git a/source/bdDetect.py b/source/bdDetect.py index 6c489b86b45..9f213464691 100644 --- a/source/bdDetect.py +++ b/source/bdDetect.py @@ -106,6 +106,12 @@ class DeviceMatch(NamedTuple): _driverDevices = OrderedDict[str, DriverDictT]() +fallBackDevices: set[tuple[str, DeviceType, str]] = set() +""" +Used to store fallback devices. +When registered as a fallback device, it will be yielded last among the connected USB devices. +""" + scanForDevices = extensionPoints.Chain[Tuple[str, DeviceMatch]]() """ A Chain that can be iterated to scan for devices. @@ -153,13 +159,18 @@ def getDriversForConnectedUsbDevices( for port in deviceInfoFetcher.hidDevices if port["provider"] == "usb" )) + + fallbackDriversAndMatches: list[set[str, DeviceMatch]] = [] for match in itertools.chain(usbCustomDeviceMatches, usbHidDeviceMatchesForCustom, usbComDeviceMatches): for driver, devs in _driverDevices.items(): if limitToDevices and driver not in limitToDevices: continue for type, ids in devs.items(): if match.type == type and match.id in ids: - yield driver, match + if (driver, match.type, match.id) in fallBackDevices: + fallbackDriversAndMatches.append({driver, match}) + else: + yield driver, match hidName = _getStandardHidDriverName() if limitToDevices and hidName not in limitToDevices: @@ -169,7 +180,13 @@ def getDriversForConnectedUsbDevices( # This ensures that a vendor specific driver is preferred over the braille HID protocol. # This preference may change in the future. if _isHIDBrailleMatch(match): - yield (hidName, match) + if (driver, match.type, match.id) in fallBackDevices: + fallbackDriversAndMatches.append({hidName, match}) + else: + yield hidName, match + + for driver, match in fallbackDriversAndMatches: + yield driver, match def _getStandardHidDriverName() -> str: @@ -446,6 +463,8 @@ def terminate(self): appModuleHandler.post_appSwitch.unregister(self.pollBluetoothDevices) messageWindow.pre_handleWindowMessage.unregister(self.handleWindowMessage) self._stopBgScan() + # Clear the fallback devices + fallBackDevices.clear() # Clear the cache of bluetooth devices so new devices can be picked up with a new instance. deviceInfoFetcher.btDevsCache = None self._executor.shutdown(wait=False) @@ -471,15 +490,27 @@ def getConnectedUsbDevicesForDriver(driver: str) -> Iterator[DeviceMatch]: for port in deviceInfoFetcher.usbComPorts ) ) + + fallbackMatches: list[DeviceMatch] = [] + for match in usbDevs: if driver == _getStandardHidDriverName(): if _isHIDBrailleMatch(match): - yield match + if (driver, match.type, match.id) in fallBackDevices: + fallbackMatches.append(match) + else: + yield match else: devs = _driverDevices[driver] for type, ids in devs.items(): if match.type == type and match.id in ids: - yield match + if (driver, match.type, match.id) in fallBackDevices: + fallbackMatches.append(match) + else: + yield match + + for match in fallbackMatches: + yield match def getPossibleBluetoothDevicesForDriver(driver: str) -> Iterator[DeviceMatch]: @@ -605,11 +636,18 @@ def _getDriverDict(self) -> DriverDictT: ret = _driverDevices[self._driver] = DriverDictT(set) return ret - def addUsbDevices(self, type: DeviceType, ids: Set[str]): + def addUsbDevices(self, type: DeviceType, ids: set[str], useAsFallBack: bool = False): """Associate USB devices with the driver on this instance. :param type: The type of the driver. :param ids: A set of USB IDs in the form C{"VID_xxxx&PID_XXXX"}. Note that alphabetical characters in hexadecimal numbers should be uppercase. + :param useAsFallBack: A boolean flag to determine how USB devices are associated with the driver. + + If False (default), the devices are added directly to the primary driver list for the specified type, + meaning they are immediately available for use with the driver. + If True, the devices are added to a fallback list and are used only if the primary driver cannot use + the initial devices, serving as a backup option in case of compatibility issues. + This provides flexibility and robustness in managing driver-device connections. :raise ValueError: When one of the provided IDs is malformed. """ malformedIds = [id for id in ids if not isinstance(id, str) or not USB_ID_REGEX.match(id)] @@ -618,6 +656,9 @@ def addUsbDevices(self, type: DeviceType, ids: Set[str]): f"Invalid IDs provided for driver {self._driver!r}, type {type!r}: " f"{', '.join(malformedIds)}" ) + if useAsFallBack: + fallBackDevices.update((self._driver, type, id) for id in ids) + devs = self._getDriverDict() driverUsb = devs[type] driverUsb.update(ids) diff --git a/source/brailleDisplayDrivers/hims.py b/source/brailleDisplayDrivers/hims.py index bea0d651b73..f595de6c2ac 100644 --- a/source/brailleDisplayDrivers/hims.py +++ b/source/brailleDisplayDrivers/hims.py @@ -4,7 +4,7 @@ # Copyright (C) 2010-2023 Gianluca Casalino, NV Access Limited, Babbage B.V., Leonard de Ruijter, # Bram Duvigneau -from typing import List +from typing import List, Iterator import serial from io import BytesIO @@ -247,22 +247,31 @@ class BrailleDisplayDriver(braille.BrailleDisplayDriver): @classmethod def registerAutomaticDetection(cls, driverRegistrar: bdDetect.DriverRegistrar): - # Hid device - driverRegistrar.addUsbDevices(bdDetect.DeviceType.HID, { - "VID_045E&PID_940A", # Braille Edge3S 40 - }) - - # Bulk devices - driverRegistrar.addUsbDevices(bdDetect.DeviceType.CUSTOM, { - "VID_045E&PID_930A", # Braille Sense & Smart Beetle - "VID_045E&PID_930B", # Braille EDGE 40 - }) + deviceTypes = { + bdDetect.DeviceType.HID: ( + { + "VID_045E&PID_940A" # Braille Edge3S 40 + }, + True + ), + bdDetect.DeviceType.CUSTOM: ( + { + "VID_045E&PID_930A", # Braille Sense & Smart Beetle + "VID_045E&PID_930B" # Braille EDGE 40 + }, + False + ), + bdDetect.DeviceType.SERIAL: ( + { + "VID_0403&PID_6001", + "VID_1A86&PID_55D3" # Braille Edge2S 40 + }, + False + ) + } - # Sync Braille, serial device - driverRegistrar.addUsbDevices(bdDetect.DeviceType.SERIAL, { - "VID_0403&PID_6001", - "VID_1A86&PID_55D3", # Braille Edge2S 40 - }) + for deviceType, (ids, useAsFallback) in deviceTypes.items(): + driverRegistrar.addUsbDevices(deviceType, ids, useAsFallback) driverRegistrar.addBluetoothDevices(lambda m: any(m.id.startswith(prefix) for prefix in ( "BrailleSense", @@ -271,13 +280,14 @@ def registerAutomaticDetection(cls, driverRegistrar: bdDetect.DriverRegistrar): ))) @classmethod - def getManualPorts(cls): - return braille.getSerialPorts(filterFunc=lambda info: "bluetoothName" in info) + def getManualPorts(cls) -> Iterator[tuple[str, str]]: + return braille.getSerialPorts() def __init__(self, port="auto"): super(BrailleDisplayDriver, self).__init__() self.numCells = 0 self._model = None + self._serialData = b'' for match in self._getTryPorts(port): portType, portId, port, portInfo = match @@ -291,7 +301,7 @@ def __init__(self, port="auto"): case bdDetect.DeviceType.CUSTOM: # onReceiveSize based on max packet size according to USB endpoint information. self._dev = hwIo.Bulk(port, 0, 1, self._onReceive, onReceiveSize=64) - case _: + case bdDetect.DeviceType.SERIAL: self._dev = hwIo.Serial( port, baudrate=BAUD_RATE, @@ -300,6 +310,8 @@ def __init__(self, port="auto"): writeTimeout=self.timeout, onReceive=self._onReceive ) + case _: + log.error(f"No matching case for portType found: {portType}") except EnvironmentError: log.debugWarning("", exc_info=True) continue @@ -489,37 +501,61 @@ def _hidOnReceive(self, data: bytes): def _onReceive(self, data: bytes): if self.isBulk: - # data contains the entire packet. stream = BytesIO(data) - firstByte:bytes = data[0:1] + firstByte: bytes = data[0:1] stream.seek(1) else: firstByte = data - # data only contained the first byte. Read the rest from the device. stream = self._dev - if firstByte == b"\x1c": + + # sometimes serial data is received in fragments. + # so accumulate data until it reaches 10 bytes. + if not self._accumulateSerialData(data): + return + + if firstByte == b"\xfa": + self._processSerialData(firstByte, stream) + elif firstByte == b"\x1c": # A device is identifying itself deviceId: bytes = stream.read(2) # When a device identifies itself, the packets ends with 0x1f assert stream.read(1) == b"\x1f" self._handleIdentification(deviceId) - elif firstByte == b"\xfa": - # Command packets are ten bytes long - packet = firstByte + stream.read(9) - assert packet[2] == 0x01 # Fixed value - CHECKSUM_INDEX = 8 - checksum: int = packet[CHECKSUM_INDEX] - assert packet[9] == 0xfb # Command End - calcCheckSum: int = 0xff & sum( - c for index, c in enumerate(packet) if( - index != CHECKSUM_INDEX) - ) - assert(calcCheckSum == checksum) - self._handlePacket(packet) else: - log.debug("Unknown first byte received: 0x%x"%ord(firstByte)) + log.debug(f"Unknown first byte received: 0x{ord(firstByte):x}") return + def _accumulateSerialData(self, data: bytes) -> bool: + if self._serialData: + self._serialData += data + return len(self._serialData) == 10 + + return True + + def _processSerialData(self, firstByte: bytes, stream): + # serial data first received + if not self._serialData: + try: + # Command packets are ten bytes long + packet = firstByte + stream.read(9) + except IOError: + # remaining data will be received next onReceive + self._serialData = firstByte + return + else: + packet = self._serialData + self._serialData = b"" + + assert packet[2] == 0x01 # Fixed value + CHECKSUM_INDEX = 8 + checksum: int = packet[CHECKSUM_INDEX] + assert packet[9] == 0xfb # Command End + calcCheckSum: int = 0xff & sum( + c for index, c in enumerate(packet) if index != CHECKSUM_INDEX + ) + assert calcCheckSum == checksum + self._handlePacket(packet) + def _sendPacket( self, packetType: bytes,