Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,30 @@ class SDWire(StorageMuxFlasherInterface, Driver):

storage_device: str | None = field(default=None)

def effective_storage_device(self):
if self.storage_device is None:
context = pyudev.Context()
for udevice in (
context.list_devices(subsystem="usb")
.match_attribute("busnum", self.dev.bus)
.match_attribute("devnum", self.dev.address)
):
# find siblings block device
for block in filter(lambda d: d.subsystem == "block", udevice.parent.children):
# find stable device link under by-diskseq
for storage_device in filter(
lambda link: link.startswith("/dev/disk/by-diskseq/"), block.device_links
):
return storage_device
return None
else:
return self.storage_device

def __post_init__(self):
if hasattr(super(), "__post_init__"):
super().__post_init__()

for dev in usb.core.find(idVendor=0x04E8, idProduct=0x6001, find_all=True):
if self.storage_device is None:
context = pyudev.Context()
# find matching udev device
for udevice in (
context.list_devices(subsystem="usb")
.match_attribute("busnum", dev.bus)
.match_attribute("devnum", dev.address)
):
# find siblings block device
for block in filter(lambda d: d.subsystem == "block", udevice.parent.children):
# find stable device link under by-diskseq
for storage_device in filter(
lambda link: link.startswith("/dev/disk/by-diskseq/"), block.device_links
):
self.storage_device = storage_device

if self.storage_device is None:
raise FileNotFoundError("failed to find sdcard driver on sd-wire device")

product = usb.util.get_string(dev, dev.iProduct)
serial = usb.util.get_string(dev, dev.iSerialNumber)

Expand All @@ -60,6 +60,9 @@ def __post_init__(self):
bInterfaceProtocol=0xFF,
)

if self.effective_storage_device() is None:
raise FileNotFoundError("failed to find sdcard driver on sd-wire device")
Comment thread
NickCao marked this conversation as resolved.

return

raise FileNotFoundError("failed to find sd-wire device")
Expand Down Expand Up @@ -100,30 +103,43 @@ def off(self):

async def wait_for_storage_device(self):
with fail_after(10):
storage_device = self.effective_storage_device()

while True:
# https://stackoverflow.com/a/2774125
fd = os.open(self.storage_device, os.O_WRONLY)
try:
fd = os.open(storage_device, os.O_WRONLY)
if os.lseek(fd, 0, os.SEEK_END) > 0:
break
except OSError as e:
match e.errno:
case 123: # No medium found
pass
case 5: # Input/output error
pass
case _:
raise
finally:
os.close(fd)
if "fd" in locals():
os.close(fd)
await sleep(1)

return storage_device

@export
async def write(self, src: str):
self.host()
await self.wait_for_storage_device()
async with await FileWriteStream.from_path(self.storage_device) as stream:
storage_device = await self.wait_for_storage_device()
async with await FileWriteStream.from_path(storage_device) as stream:
async with self.resource(src) as res:
async for chunk in res:
await stream.send(chunk)

@export
async def read(self, dst: str):
self.host()
await self.wait_for_storage_device()
async with await FileReadStream.from_path(self.storage_device) as stream:
storage_device = await self.wait_for_storage_device()
async with await FileReadStream.from_path(storage_device) as stream:
async with self.resource(dst) as res:
async for chunk in stream:
await res.send(chunk)