Skip to content

Commit

Permalink
Implement asynchronous USB I/O via get_port(async=True).
Browse files Browse the repository at this point in the history
  • Loading branch information
whitequark committed Jul 21, 2018
1 parent 6dd57c3 commit 228549f
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 29 deletions.
153 changes: 125 additions & 28 deletions software/glasgow/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ def __init__(self, firmware_file=None, vendor_id=VID_QIHW, product_id=PID_GLASGO
logger.debug("found device with serial %s",
self.usb.getDevice().getSerialNumber())

self.claimed_ports = []

def _read_eeprom_raw(self, idx, addr, length, chunk_size=0x1000):
"""
Read ``length`` bytes at ``addr`` from EEPROM at index ``idx``
Expand Down Expand Up @@ -359,21 +361,33 @@ def poll_alert(self):
except usb1.USBErrorPipe:
raise GlasgowDeviceError("Cannot poll alert status")

def get_port(self, port):
def get_port(self, port, async=False):
if port == "A":
interface_num = 0
elif port == "B":
interface_num = 1
else:
raise GlasgowDeviceError("Unknown I/O port {}".format(port))

return GlasgowPort(self, port, interface_num)
port = GlasgowPort(self, port, async, interface_num)
self.claimed_ports.append(port)
return port

def poll(self, timeout=None):
if any([len(port.buffer_in) for port in self.claimed_ports]):
# If we have data in IN endpoint buffers, always return right away, but also
# peek at what other fds might have become ready, for efficiency.
return super().poll(0)
else:
# Otherwise, just wait on USB transfers and any other registered fds.
return super().poll(timeout)


class GlasgowPort:
def __init__(self, device, port, interface_num):
def __init__(self, device, port, async, interface_num):
self.device = device
self.port = port
self.async = async

config_num = device.usb.getConfiguration()
for config in device.usb.getDevice().iterConfigurations():
Expand Down Expand Up @@ -407,44 +421,127 @@ def __init__(self, device, port, interface_num):
self.buffer_in = bytearray()
self.buffer_out = bytearray()

def read(self, length):
self.flush()

while len(self.buffer_in) < length:
packet = self.device.bulk_read(self.endpoint_in, self.in_packet_size)
logger.trace("USB EP%x IN: %s", self.endpoint_in, packet.hex())
self.buffer_in += packet
if self.async:
self.in_transfer = self.device.usb.getTransfer()
def callback(transfer):
logger.trace("USB EP%x IN (completed)", self.endpoint_in & 0x7f)
self.in_transfer.setBulk(self.endpoint_in, self.in_packet_size, callback)
self.in_transfer.submit()
logger.trace("USB EP%x IN (submit)", self.endpoint_in & 0x7f)

self.out_transfer = self.device.usb.getTransfer()
def callback(transfer):
logger.trace("USB EP%x OUT (completed)", self.endpoint_out)
self._write_packet_async()
self.out_transfer.setBulk(self.endpoint_out, 0, callback)

def _append_in_packet(self, packet):
logger.trace("USB EP%x IN: %s", self.endpoint_in & 0x7f, packet.hex())
self.buffer_in += packet

def _read_packet_async(self):
if self.in_transfer.isSubmitted():
return False
elif self.in_transfer.getStatus() != usb1.TRANSFER_COMPLETED:
usb1.raiseUSBError(self.in_transfer.getStatus())
else:
packet = self.in_transfer.getBuffer()[:self.in_transfer.getActualLength()]
self._append_in_packet(packet)
logger.trace("USB EP%x IN (submit)", self.endpoint_in & 0x7f)
self.in_transfer.submit()
return True

def _read_packet_sync(self):
packet = self.device.bulk_read(self.endpoint_in, self.in_packet_size)
self._append_in_packet(packet)

def read(self, length=None):
if len(self.buffer_out) > 0:
# Flush the buffer, so that everything written before the read reaches the device.
self.flush()

if length is None and len(self.buffer_in) > 0:
# Just return whatever is in the buffer.
length = len(self.buffer_in)
if self.async:
# Always check if we have new data waiting to be read, and rearm the transfer.
# This ensures that the poll call will have something to wait on.
self._read_packet_async()
if length is None:
# Return whatever is in the buffer, even if it's nothing.
length = len(self.buffer_in)
elif len(self.buffer_in) >= length:
# Return exactly the requested length if we have it.
pass
else:
# Return None if we can't fulfill the request.
return None
else:
if length is None:
# Sync reads with no requested length always block if there's nothing
# in the buffer, or we'll never get a chance to refill the buffer if
# the application code only issues reads with no requested length.
self._read_packet_sync()
length = len(self.buffer_in)
else:
# Sync reads always return exactly the requested length, if any.
while len(self.buffer_in) < length:
self._read_packet_sync()

result = self.buffer_in[:length]
self.buffer_in = self.buffer_in[length:]
logger.trace("port %s read: %s", self.port, result.hex())
return result

def read_str(self, length, encoding="utf-8"):
return self.read(length).decode(encoding)

def write(self, data):
def read_str(self, *args, encoding="utf-8", **kwargs):
result = self.read(*args, **kwargs)
if result is None:
return None
else:
return result.decode(encoding)

def _slice_out_packet(self):
packet = self.buffer_out[:self.out_packet_size]
self.buffer_out = self.buffer_out[self.out_packet_size:]
logger.trace("USB EP%x OUT: %s", self.endpoint_out, packet.hex())
return packet

def _write_packet_async(self):
if self.out_transfer.isSubmitted():
pass
elif self.out_transfer.getStatus() != usb1.TRANSFER_COMPLETED:
usb1.raiseUSBError(self.out_transfer.getStatus())
elif len(self.buffer_out) > 0:
packet = self._slice_out_packet()
self.out_transfer.setBuffer(packet)
logger.trace("USB EP%x OUT (submit)", self.endpoint_out)
self.out_transfer.submit()

def _write_packet_sync(self):
packet = self._slice_out_packet()
self.device.bulk_write(self.endpoint_out, packet)

def write(self, data, async=False):
logger.trace("port %s write: %s", self.port, data.hex())
self.buffer_out += bytearray(data)

# You can only write around 16 MB into an USB endpoint in one call,
# better just packetize it here.
while len(self.buffer_out) > self.out_packet_size:
packet = self.buffer_out[:self.out_packet_size]
logger.trace("USB EP%x OUT: %s", self.endpoint_out, packet.hex())
self.device.bulk_write(self.endpoint_out, packet)
self.buffer_out = self.buffer_out[self.out_packet_size:]
if self.async:
if len(self.buffer_out) > self.out_packet_size:
self._write_packet_async()
else:
while len(self.buffer_out) > self.out_packet_size:
self._write_packet_sync()

def write_str(self, data, encoding="utf-8"):
return self.write(data.encode(encoding))
def write_str(self, data, *args, encoding="utf-8", **kwargs):
return self.write(data.encode(encoding), *args, **kwargs)

def flush(self):
logger.trace("port %s flush", self.port)
while len(self.buffer_out) > 0:
packet = self.buffer_out[:self.out_packet_size]
logger.trace("USB EP%x OUT: %s", self.endpoint_out, packet.hex())
self.device.bulk_write(self.endpoint_out, packet)
self.buffer_out = self.buffer_out[self.out_packet_size:]
if self.async:
self._write_packet_async()
else:
while len(self.buffer_out) > 0:
self._write_packet_sync()

def __del__(self):
self.flush()
2 changes: 1 addition & 1 deletion vendor/libfx2

0 comments on commit 228549f

Please sign in to comment.