Skip to content

Commit

Permalink
Convert to newer 1.0 API of pyusb.
Browse files Browse the repository at this point in the history
  • Loading branch information
Carl Ritson authored and Tat-Chee Wan (USM) committed Jul 25, 2011
1 parent a69dc31 commit b681779
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 21 deletions.
27 changes: 7 additions & 20 deletions pynxt/nxt/lowlevel.py
Expand Up @@ -4,22 +4,12 @@
USB_BULK_OUT_EP = 0x1
USB_BULK_IN_EP = 0x82

def enumerate_usb():
"""Return a generator yielding all attached USB devices."""
busses = usb.busses()
for bus in busses:
devices = bus.devices
for dev in devices:
yield dev

def get_device(vendor_id, product_id, version=0, timeout=None):
"""Return the first device matching the given vendor/product ID."""
while True:
for dev in enumerate_usb():
if (dev.idVendor == vendor_id and
dev.idProduct == product_id and
dev.iSerialNumber == version):
return UsbBrick(dev)
dev = usb.core.find(idVendor=vendor_id, idProduct=product_id)
if dev is not None:
return UsbBrick(dev)
if timeout is None or timeout <= 0:
return None
sleep_time = min(1.0, timeout)
Expand All @@ -39,25 +29,22 @@ def __del__(self):
def open(self, interface, configuration=1):
self._iface = interface
self._config = configuration
self._hdl = self._dev.open()
self._hdl.setConfiguration(configuration)
self._hdl.claimInterface(interface)
self._dev.set_configuration(configuration=self._config)

def close(self):
self._hdl.releaseInterface()
del self._hdl
del self._dev

def read(self, size, timeout = 100):
"""Read the given amount of data from the device and return it."""
# For some reason, bulkRead returns a tuple of longs. This is
# dumb, so we convert it back to a string before returning,
# kthx.
try:
data = self._hdl.bulkRead(USB_BULK_IN_EP, size, timeout)
data = self._dev.read(USB_BULK_IN_EP, size, self._iface, timeout)
except usb.USBError:
return None
return ''.join(chr(x) for x in data)

def write(self, data, timeout = 100):
"""Write the given amount of data to the device."""
return self._hdl.bulkWrite(USB_BULK_OUT_EP, data, timeout)
return self._dev.write(USB_BULK_OUT_EP, data, self._iface, timeout)
8 changes: 7 additions & 1 deletion pynxt/nxt/samba.py
Expand Up @@ -9,6 +9,9 @@
class SambaOpenError(Exception):
"""An error occured while opening a connection to SAM-BA"""

class SambaWriteError(Exception):
"""An error occured writing a buffer via SAM-BA"""

def _command(code, address):
return '%c%08X#' % (code, address)

Expand All @@ -30,6 +33,7 @@ def open(self, timeout=None):
ATMEL_VENDOR_ID, SAMBA_PRODUCT_ID, timeout=timeout)
if not self.usb:
raise SambaOpenError("Could not find a SAM-BA brick to connect to")

self.usb.open(SAMBA_USB_INTERFACE)

# Initial SAM-BA handshake.
Expand All @@ -56,7 +60,9 @@ def write_word(self, address, word):

def write_buffer(self, address, data):
self.usb.write(_command2('S', address, len(data)))
self.usb.write(data)
ret = self.usb.write(data)
if ret != len(data):
raise SambaWriteError()

def _read_common(self, code, address, size, struct_code):
assert size in (1,2,4)
Expand Down

0 comments on commit b681779

Please sign in to comment.