Skip to content

Commit

Permalink
Implement support for polling USB fds, alongside synchronous calls.
Browse files Browse the repository at this point in the history
  • Loading branch information
whitequark committed Jul 21, 2018
1 parent 36563a7 commit 9dac8a6
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 1 deletion.
8 changes: 8 additions & 0 deletions docs/host_library.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ Host-side library reference

.. autoclass:: FX2Device

.. automethod:: control_read
.. automethod:: control_write
.. automethod:: bulk_read
.. automethod:: bulk_write
.. automethod:: create_poller
.. automethod:: get_poller
.. automethod:: poll

.. automethod:: read_ram
.. automethod:: write_ram
.. automethod:: load_ram
Expand Down
35 changes: 34 additions & 1 deletion software/fx2/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import struct
import usb1
import select
from . import poll_wrapper


__all__ = ["FX2Config", "FX2Device", "FX2DeviceError"]
Expand Down Expand Up @@ -181,7 +183,9 @@ def __init__(self, vendor_id=VID_CYPRESS, product_id=PID_FX2):
self.timeout = 1000

try:
self.usb = usb1.USBContext().openByVendorIDAndProductID(vendor_id, product_id)
self.usb_context = usb1.USBContext()
self.usb_poller = None
self.usb = self.usb_context.openByVendorIDAndProductID(vendor_id, product_id)
except usb1.USBErrorAccess:
raise FX2DeviceError("Cannot access device {:04x}:{:04x}"
.format(vendor_id, product_id))
Expand Down Expand Up @@ -224,6 +228,35 @@ def bulk_write(self, endpoint, data, timeout=None):
timeout = self.timeout
self.usb.bulkWrite(endpoint, data, timeout)

def create_poller(self, poller):
"""
Integrate waiting for USB transfers into an event loop, by taking an object
conforming to the Python ``poll`` interface and returning another conforming
to the same interface.
Note that if ``create_poller`` is called more than once, events will only be
delivered to the last returned poller instance.
"""
return usb1.USBPoller(self.usb_context, poll_wrapper.wrap_poller_for_libusb(poller))

def get_poller(self):
"""
Return a poller instance associated with this device, creating it using
``create_poller(select.poll())`` if necessary.
"""
if self.usb_poller is None:
self.usb_poller = self.create_poller(select.poll())
return self.usb_poller

def poll(self, timeout=None):
"""
Wait for USB transfers, as well as any other events registered on the poller
returned by ``get_poller()``, with timeout defaulting to ``self.timeout``.
"""
if timeout is None:
timeout = self.timeout
return self.get_poller().poll(timeout)

def read_ram(self, addr, length):
"""
Read ``length`` bytes at ``addr`` from internal RAM.
Expand Down
34 changes: 34 additions & 0 deletions software/fx2/poll_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import select


_cls_select_poll = type(select.poll())


class poll_wrapper:
def __init__(self, wrapped):
self.wrapped = wrapped

def register(self, *args, **kwargs):
self.wrapped.register(*args, **kwargs)

def modify(self, *args, **kwargs):
self.wrapped.modify(*args, **kwargs)

def unregister(self, *args, **kwargs):
self.wrapped.unregister(*args, **kwargs)

def poll(self, timeout=None):
if timeout is None:
return self.wrapped.poll()
else:
return self.wrapped.poll(round(timeout * 1000))


# As per libusb1 documentation, work around the mismatch of select.poll
# timeout argument (in milliseconds) and the one expected by USBPoller
# (in seconds).
def wrap_poller_for_libusb(poller):
if isinstance(poller, _cls_select_poll):
return poll_wrapper(poller)
else:
return poller

0 comments on commit 9dac8a6

Please sign in to comment.