diff --git a/doc/getting-started-with-gpio.md b/doc/getting-started-with-gpio.md index 9c200ec..67e7c25 100644 --- a/doc/getting-started-with-gpio.md +++ b/doc/getting-started-with-gpio.md @@ -22,63 +22,58 @@ group membership. Now Let's Write Some Code! -------------------------- -The GPIO pins are controlled by GPIOPin objects. Python program must -import the GPIOPin class from the quick2wire.gpio module, along with -constants to configure the pin: - - from quick2wire.gpio import GPIOPin, In, Out - -Then you can create a Pin. The Pin's constructor takes two arguments: -the header pin number and whether the pin is to be used for input or -output. - - in_pin = Pin(0, direction=In) - out_pin = Pin(1, direction=Out) - -When you have a Pin instance you can read or write its value. A value -of 1 is high, a value of 0 is low. - - out_pin.value = 1 - print(in_pin.value) - -When you have finished using the pin, you must unexport it: +The GPIO pins are controlled by Pin objects, and those Pin objects are +managed by a "pin bank". The simplest pin bank to use is called +`pins` and gives access to the pins labelled P0 to P7 on the +Quick2Wire interface board (or named GPIO0 to GPIO7 on the Raspberry +Pi's header number 1). There's also a bank called pi_header_1 that +gives access to all the header pins, but we don't need that for this +example. + +Python programs must import the `pins` pin bank from the +`quick2wire.gpio` module, along with constants to configure the pin: + + from quick2wire.gpio import pins, In, Out + +Then you can get a Pin by calling the pin bank's `pin` method. This +takes two arguments: the pin number and whether the pin is to be used +for input or output. + + in_pin = pins.pin(0, direction=In) + out_pin = pins.pin(1, direction=Out) + +You must open a pin before you can read or write its value and close +the pin when you no longer need it. The most convenient way to do +this is to use Python's `with` statement, which will open the pins at +the start of the statement and close them when the body of the +statement has finished running, even if the user kills the program or +failure makes the code throw an exception. + + with in_pin, out_pin: + out_pin.value = 1 + print(in_pin.value) - out_pin.unexport() - in_pin.unexport() +A pin has a value of 1 when high, a value of 0 when low. Putting it all together into a single program: - from quick2wire.gpio import Pin - - in_pin = Pin(0, direction=In) - out_pin = Pin(1, direction=Out) + from quick2wire.gpio import pins, In, Out - out_pin.value = 1 - print(in_pin.value) + in_pin = pins.pin(0, direction=In) + out_pin = pins.pin(1, direction=Out) - out_pin.unexport() - in_pin.unexport() - -To make sure you always unexport any pins you've exported, you can wrap the Pin objects -with `exported()`, a Python [context manager](http://docs.python.org/reference/datamodel.html#context-managers), -as part of a [with](http://docs.python.org/reference/compound_stmts.html#with) statement: - - from quick2wire.gpio import Pin, exported - - with exported(GPIOPin(0, direction=In)) as in_pin, exported(GPIOPin(1, direction=Out)) as out_pin: + with in_pin, out_pin: out_pin.value = 1 - print(in_pin.value) + print(in_pin.value) -This will unexport the pins when the program leaves the `with` statement, even -if the user kills the program or a bad piece of code throws an exception. Here's a slightly more complicated example that blinks an LED attached to pin 1. This will loop forever until the user stops it with a Control-C. from time import sleep - from quick2wire.gpio import GPIOPin, Out, exported + from quick2wire.gpio import pins, Out - with exported(GPIOPin(1, direction=Out)) as pin: + with pins.pin(1, direction=Out) as pin: while True: pin.value = 1 - pin.value sleep(1) diff --git a/examples/analogue-bar b/examples/analogue-bar deleted file mode 100755 index 4ead8da..0000000 --- a/examples/analogue-bar +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/python3 - -from contextlib import closing -from quick2wire.gpio import Pin -from quick2wire.helpers.display import AnalogueDisplay -import quick2wire.i2c as i2c -from quick2wire.parts.pcf8591 import PCF8591 - -with closing(i2c.I2CMaster()) as master: - pcf = PCF8591(master) - pins = [Pin(header_pin_number, Pin.Out) for header_pin_number in [11, 12, 13, 15, 16, 18, 22, 7]] - display = AnalogueDisplay(64, *pins) - pin = pcf.begin_analogue_read(0) - try: - while(True): - display.display(pin.read()) - except KeyboardInterrupt: - display.display(63) - diff --git a/examples/blink b/examples/blink index 748ac90..ba3b8e5 100755 --- a/examples/blink +++ b/examples/blink @@ -2,16 +2,12 @@ import sys from time import sleep -from quick2wire.gpio import GPIOPin, Out +from quick2wire.gpio import pins, Out -pin = GPIOPin(int(sys.argv[1]) if len(sys.argv) > 1 else 1, direction=Out) - - -try: +pin = pins.pin(int(sys.argv[1]) if len(sys.argv) > 1 else 1, direction=Out) +with pin: pin.value = 1 while True: sleep(1) pin.value = 1 - pin.value -except KeyboardInterrupt: - pin.value = 0 - pin.unexport() + diff --git a/examples/button-blink b/examples/button-blink index 53cd6af..22fcdd2 100755 --- a/examples/button-blink +++ b/examples/button-blink @@ -1,12 +1,13 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 import sys from time import sleep -from quick2wire.gpio import GPIOPin, In, Out +from quick2wire.gpio import pins, In, Out -led = GPIOPin(1, direction=Out) -button = GPIOPin(0, direction=In) +led = pins.pin(1, direction=Out) +button = pins.pin(0, direction=In) -while True: - led.value = button.value - sleep(0.1) +with led, button: + while True: + led.value = button.value + sleep(0.1) diff --git a/examples/counter b/examples/counter index 1b6b246..63a1ec4 100755 --- a/examples/counter +++ b/examples/counter @@ -1,19 +1,20 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 from itertools import cycle from time import sleep -from quick2wire.gpio import Pin +from quick2wire.gpio import gpio, Out -pins = [Pin(n, Pin.Out) for n in [7, 22, 18, 16, 15, 13, 12, 11]] +pins = [gpio.pin(i, Out) for i in range(8)] try: + for p in pins: + p.open() + for count in cycle(range(256)): bitset = [int(count & (1< 0) for n in range(8)] for (pin, value) in zip(pins, bitset): pin.value = value sleep(0.5) - -except KeyboardInterrupt: - for pin in pins: - pin.value = 0 - pin.unexport() +finally: + for p in pins: + p.close() diff --git a/examples/gpio-speed b/examples/gpio-speed index 04c879f..bb7c06b 100755 --- a/examples/gpio-speed +++ b/examples/gpio-speed @@ -1,12 +1,9 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 from datetime import datetime -from quick2wire.gpio import Pin +from quick2wire.gpio import pins, In, Out from timeit import Timer -outpin = Pin(12, Pin.Out) -inpin = Pin(11, Pin.In) -iterations = 10000 def nothin(): pass @@ -18,12 +15,16 @@ def onepass_toggle(): outpin.value = 1 outpin.value = 0 -overhead = Timer(nothin).timeit(iterations) -readresult = Timer(onepass_read).timeit(iterations) -toggleresult = Timer(onepass_toggle).timeit(iterations) +iterations = 10000 -print("The time to do nothing %d times is %4.3fsec" % (iterations, overhead)) -print("The time to read %d times is %4.3fsec" % (iterations, readresult)) -print("The time to toggle %d times is %4.3fsec" % (iterations, toggleresult)) +outpin = pins.pin(0, Out) +inpin = pins.pin(1, In) +with inpin, outpin: + overhead = Timer(nothin).timeit(iterations) + readresult = Timer(onepass_read).timeit(iterations) + toggleresult = Timer(onepass_toggle).timeit(iterations) + print("The time to do nothing %d times is %4.3fsec" % (iterations, overhead)) + print("The time to read %d times is %4.3fsec" % (iterations, readresult)) + print("The time to toggle %d times is %4.3fsec" % (iterations, toggleresult)) diff --git a/examples/pullup b/examples/pullup index d6997bd..3822076 100755 --- a/examples/pullup +++ b/examples/pullup @@ -5,16 +5,14 @@ # the Pi's GPIO pins. # -from quick2wire.gpio import Pin +from quick2wire.gpio import pins, In, PullUp, PullDown -pin = Pin(11, Pin.In) -print( "Pull up/down is unspecified, pin is now", pin.value) -pin.unexport() +with pins.pin(0, In) as pin: + print("Pull up/down is unspecified, pin is now", pin.value) -pin = Pin(11, Pin.In, pull=Pin.PullUp) -print( "Pin is set with pull up, pin in is now", pin.value) -pin.unexport() +with pins.pin(0, In, pull=PullUp) as pin: + print("Pin is set with pull up, pin in is now", pin.value) + +with pins.pin(0, In, pull=PullDown) as pin: + print("Pin is set with pull down, pin is now", pin.value) -pin = Pin(11, Pin.In, pull=Pin.PullDown) -print( "Pin is set with pull down, pin is now", pin.value) -pin.unexport() diff --git a/examples/selector-button-blink b/examples/selector-button-blink index 599d5d9..4fe184c 100755 --- a/examples/selector-button-blink +++ b/examples/selector-button-blink @@ -1,15 +1,15 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 import sys import os from contextlib import closing -from quick2wire.gpio import GPIOPin, Both, In, Out, exported +from quick2wire.gpio import pins, Both, In, Out from quick2wire.selector import Selector, Timer blink_rate=2.0 -with exported(GPIOPin(0, direction=In, interrupt=Both)) as button_pin, \ - exported(GPIOPin(1, direction=Out)) as led_pin, \ +with pins.pin(0, direction=In, interrupt=Both) as button_pin, \ + pins.pin(1, direction=Out) as led_pin, \ Timer(interval=1/blink_rate) as timer, \ Selector() as selector: diff --git a/quick2wire/board_revision.py b/quick2wire/board_revision.py index 2319ea6..ea5c623 100644 --- a/quick2wire/board_revision.py +++ b/quick2wire/board_revision.py @@ -8,3 +8,4 @@ def revision(): return 0 except: return 0 + diff --git a/quick2wire/gpio.py b/quick2wire/gpio.py index fe05461..89d7aaf 100644 --- a/quick2wire/gpio.py +++ b/quick2wire/gpio.py @@ -8,57 +8,6 @@ from quick2wire.board_revision import revision from quick2wire.selector import EDGE -# Maps header pin numbers to SoC GPIO numbers -# See http://elinux.org/RPi_Low-level_peripherals -# -# Note: - header pins are numbered from 1, SoC GPIO from zero -# - the Pi documentation identifies some header pins as GPIO0, -# GPIO1, etc., but these are not the same as the SoC GPIO -# numbers. -# -# Todo - different factory functions for creating Pins by SoC id, -# header id and Pi GPIO id. - -RaspberryPi_HeaderToSOC = { - 3: 0, - 5: 1, - 7: 4, - 8: 14, - 10: 15, - 11: 17, - 12: 18, - 13: 21, - 15: 22, - 16: 23, - 18: 24, - 19: 10, - 21: 9, - 22: 25, - 23: 11, - 24: 8, - 26: 7 -} - -if revision() > 1: - RaspberryPi_HeaderToSOC[3] = 2 - RaspberryPi_HeaderToSOC[5] = 3 - RaspberryPi_HeaderToSOC[13] = 27 - - -RaspberryPi_GPIOToHeader = [11, 12, 13, 15, 16, 18, 22, 7] - -def gpio_to_soc(gpio_pin_number): - if 0 <= gpio_pin_number < 8: - return header_to_soc(RaspberryPi_GPIOToHeader[gpio_pin_number]) - else: - raise ValueError(str(gpio_pin_number)+" is not a valid GPIO pin") - - -def header_to_soc(header_pin_number): - if header_pin_number in RaspberryPi_HeaderToSOC: - return RaspberryPi_HeaderToSOC[header_pin_number] - else: - raise ValueError(str(header_pin_number)+" is not a valid IO header pin") def gpio_admin(subcommand, pin, pull=None): if pull: @@ -67,200 +16,268 @@ def gpio_admin(subcommand, pin, pull=None): subprocess.check_call(["gpio-admin", subcommand, str(pin)]) +Out = "out" +In = "in" + +Rising = "rising" +Falling = "falling" +Both = "both" + +PullDown = "pulldown" +PullUp = "pullup" -def _pin_file(name, parser, doc): - def _read(self): - self._ensure_exported() - with open(self._pin_path(name), "r") as f: - return parser(f.read()) - - def _write(self, value): - self._ensure_exported() - with open(self._pin_path(name), "w") as f: - f.write(str(value)) - - return property(_read, _write, doc=doc) -class _IOPin(object): - """Controls a GPIO pin.""" +class PinAPI(object): + def __init__(self, bank, index): + self._bank = bank + self._index = index + + @property + def index(self): + return self._index + + @property + def bank(self): + return self._bank - Out = "out" - In = "in" + def __enter__(self): + self.open() + return self - Rising = "rising" - Falling = "falling" - Both = "both" + def __exit__(self, exc_type, exc_value, traceback): + self.close() - PullDown = "pulldown" - PullUp = "pullup" + value = property(lambda p: p.get(), + lambda p,v: p.set(v), + doc="""The value of the pin: 1 if the pin is high, 0 if the pin is low.""") + + +class PinBankAPI(object): + def __getitem__(self, n): + if 0 < n < len(self): + raise ValueError("no pin index {n} out of range", n=n) + return self.pin(n) + + def write(self): + pass + + def read(self): + pass + + + +class Pin(PinAPI): + """Controls a GPIO pin.""" __trigger__ = EDGE - def __init__(self, user_pin_number, soc_pin_number, direction=None, interrupt=None, pull=None): + def __init__(self, bank, index, soc_pin_number, direction=In, interrupt=None, pull=None): """Creates a pin - If the direction is specified, the pin is exported if - necessary and its direction is set. If the direction is not - specified, the pin is not exported and you must call export() - before you start using it. - Parameters: user_pin_number -- the identity of the pin used to create the derived class. soc_pin_number -- the pin on the header to control, identified by the SoC pin number. direction -- (optional) the direction of the pin, either In or Out. - interrupt -- (optional) + interrupt -- (optional) + pull -- (optional) Raises: IOError -- could not export the pin (if direction is given) """ - self.index = user_pin_number - self.soc_pin_number = soc_pin_number + super(Pin,self).__init__(None, index) + self._soc_pin_number = soc_pin_number self._file = None - self.pull = pull - if direction is not None: - self.direction = direction - if interrupt is not None: - self.interrupt = interrupt - - - def __repr__(self): - return self.__module__ + "." + str(self) + self._direction = direction + self._interrupt = interrupt + self._pull = pull - def __str__(self): - return "%s(%i)"%(self.__class__.__name__, self.index) @property - def is_exported(self): - """Has the pin been exported to user space?""" - return os.path.exists(self._pin_path()) - - def export(self): - """Export the pin to user space, making its control files visible in the filesystem. - - Raises: - IOError -- could not export the pin. - - - """ - gpio_admin("export", self.soc_pin_number, self.pull) + def soc_pin_number(self): + return self._soc_pin_number - def unexport(self): - """Unexport the pin, removing its control files from the filesystem. - - Raises: - IOError -- could not unexport the pin. - - """ - self._maybe_close() - gpio_admin("unexport", self.soc_pin_number) - - def _ensure_exported(self): - if not self.is_exported: - self.export() + def open(self): + gpio_admin("export", self.soc_pin_number, self._pull) + self._file = open(self._pin_path("value"), "r+") + self._write("direction", self._direction) + if self._direction == In: + self._write("edge", self._interrupt if self._interrupt is not None else "none") + + def close(self): + if not self.closed: + if self.direction == Out: + self.value = 0 + self._file.close() + self._file = None + self._write("direction", In) + self._write("edge", "none") + gpio_admin("unexport", self.soc_pin_number) - @property - def value(self): - """The current value of the pin: 1 if the pin is high or 0 if - the pin is low. + def get(self): + """The current value of the pin: 1 if the pin is high or 0 if the pin is low. The value can only be set if the pin's direction is Out. Raises: IOError -- could not read or write the pin's value. - """ - f = self._lazyopen() - f.seek(0) - v = f.read() + self._check_open() + self._file.seek(0) + v = self._file.read() return int(v) if v else 0 - @value.setter - def value(self, new_value): - f = self._lazyopen() - f.seek(0) - f.write(str(int(new_value))) - f.flush() + def set(self, new_value): + self._check_open() + if self._direction != Out: + raise ValueError("not an output pin") + self._file.seek(0) + self._file.write(str(int(new_value))) + self._file.flush() - direction = _pin_file("direction", str.strip, + @property + def direction(self): """The direction of the pin: either In or Out. The value of the pin can only be set if its direction is Out. Raises: - IOError -- could not read or set the pin's direction. + IOError -- could not set the pin's direction. + """ + return self._direction + + @direction.setter + def direction(self, new_value): + self._write("direction", new_value) + self._direction = new_value + + @property + def interrupt(self): + """The interrupt property specifies what event (if any) will raise an interrupt. - """) - - interrupt = _pin_file("edge", str.strip, - """The interrupt property specifies what event (if any) will trigger an interrupt. - - Raises: - IOError -- could not read or set the pin's interrupt trigger + One of: + Rising -- voltage changing from low to high + Falling -- voltage changing from high to low + Both -- voltage changing in either direction + None -- interrupts are not raised + + Raises: + IOError -- could not read or set the pin's interrupt trigger + """ + return self._interrupt + + @interrupt.setter + def interrupt(self, new_value): + self._write("edge", new_value) + self._interrupt = new_value - """) + @property + def pull(self): + return self._pull def fileno(self): - """ - Return the underlying fileno. Useful for calling select - """ - return self._lazyopen().fileno() - - def _lazyopen(self): - if self._file is None: - self._file = open(self._pin_path("value"), "r+") - return self._file + """Return the underlying file descriptor. Useful for select, epoll, etc.""" + return self._file.fileno() - def _maybe_close(self): - if self._file is not None: - self._file.close() - self._file = None + @property + def closed(self): + """Returns if this pin is closed""" + return self._file is None or self._file.closed + + def _check_open(self): + if self.closed: + raise IOError(str(self) + " is closed") + + def _write(self, filename, value): + with open(self._pin_path(filename), "w+") as f: + f.write(value) def _pin_path(self, filename=""): return "/sys/devices/virtual/gpio/gpio%i/%s" % (self.soc_pin_number, filename) + def __repr__(self): + return self.__module__ + "." + str(self) + + def __str__(self): + return "{type}({index})".format( + type=self.__class__.__name__, + index=self.index) -class HeaderPin(_IOPin): - def __init__(self, header_pin_number, *args, **kwargs): - return super(HeaderPin, self).__init__(header_pin_number, header_to_soc(header_pin_number), *args, **kwargs) -class GPIOPin(_IOPin): - def __init__(self, gpio_pin_number, *args, **kwargs): - return super(GPIOPin, self).__init__(gpio_pin_number, gpio_to_soc(gpio_pin_number), *args, **kwargs) +class PinBank(PinBankAPI): + def __init__(self, index_to_soc_fn, count=None): + super(PinBank,self).__init__() + self._index_to_soc = index_to_soc_fn + self._count = count + + def pin(self, index, *args, **kwargs): + return Pin(self, index, self._index_to_soc(index), *args, **kwargs) + + @property + def has_len(self): + return self._count is not None + + def __len__(self): + if self._count is not None: + return self._count + else: + raise TypeError(self.__class__.__name__ + " has no len") -Out = _IOPin.Out -In = _IOPin.In -Rising = _IOPin.Rising -Falling = _IOPin.Falling -Both = _IOPin.Both +_pi_revision = revision() -PullDown = _IOPin.PullDown -PullUp = _IOPin.PullUp +def by_revision(d): + return d[_pi_revision] +# Maps header pin numbers to SoC GPIO numbers +# See http://elinux.org/RPi_Low-level_peripherals +# +# Note: - header pins are numbered from 1, SoC GPIO from zero +# - the Pi documentation identifies some header pins as GPIO0, +# GPIO1, etc., but these are not the same as the SoC GPIO +# numbers. + +_pi_header_1_pins = { + 3: by_revision({1:0, 2:2}), + 5: by_revision({1:1, 2:3}), + 7: 4, + 8: 14, + 10: 15, + 11: 17, + 12: 18, + 13: by_revision({1:21, 2:27}), + 15: 22, + 16: 23, + 18: 24, + 19: 10, + 21: 9, + 22: 25, + 23: 11, + 24: 8, + 26: 7 +} -# Backwards compatability -Pin = HeaderPin +_pi_gpio_pins = [_pi_header_1_pins[i] for i in [11, 12, 13, 15, 16, 18, 22, 7]] -@contextmanager -def exported(pin): - """A context manager that automatically exports a pin if necessary - and unexports it at the end of the block. - - Example:: - - with exported(Pin(15)) as pin: - print(pin.value) - - """ - - if not pin.is_exported: - pin.export() + +def lookup(pin_mapping, i): try: - yield pin - finally: - pin.unexport() + if i >= 0: + return pin_mapping[i] + except LookupError: + pass + + raise IndexError(str(i) + " is not a valid pin index") + +def map_with(pin_mapping): + return lambda i: lookup(pin_mapping,i) + + +pi_broadcom_soc = PinBank(lambda p: p) +pi_header_1 = PinBank(map_with(_pi_header_1_pins)) +pins = PinBank(map_with(_pi_gpio_pins), len(_pi_gpio_pins)) + diff --git a/quick2wire/parts/mcp23x17.py b/quick2wire/parts/mcp23x17.py index 9ffd96e..fe55e53 100644 --- a/quick2wire/parts/mcp23x17.py +++ b/quick2wire/parts/mcp23x17.py @@ -9,6 +9,7 @@ import contextlib from warnings import warn +from quick2wire.gpio import PinAPI, PinBankAPI # TODO - import from GPIO or common definitions module In = "in" @@ -208,7 +209,7 @@ def immediate_write(f): f() -class PinBank(object): +class PinBank(PinBankAPI): """A bank of 8 GPIO pins""" def __init__(self, chip, bank_id): @@ -320,13 +321,12 @@ def _write(self, value): return property(_read, _write, doc=doc) -class Pin(object): +class Pin(PinAPI): """A digital Pin that can be used for input or output.""" def __init__(self, bank, index): """Called by the PinBank. Not used by application code.""" - self.bank = bank - self.index = index + super(Pin,self).__init__(bank,index) self._is_claimed = False def open(self): @@ -341,13 +341,6 @@ def open(self): def close(self): self._is_claimed = False - def __enter__(self): - self.open() - return self - - def __exit__(self, exc_type, exc_value, traceback): - self.close() - def get(self): """Returns the value of the pin. @@ -361,8 +354,6 @@ def set(self, new_value): The same as pin.value, but a method so that it can easily be passed around as a function. """ self._set_register_bit(OLAT, new_value) - - value = property(get, set, doc="""The value of the pin: 1 if the pin is high, 0 if the pin is low.""") direction = _register_bit(IODIR, high_value=In, low_value=Out, doc="""The direction of the pin: In if the pin is used for input, Out if it is used for output.""") diff --git a/quick2wire/test_gpio.py b/quick2wire/test_gpio.py index 2c028a1..7901928 100644 --- a/quick2wire/test_gpio.py +++ b/quick2wire/test_gpio.py @@ -1,85 +1,102 @@ -from quick2wire.gpio import GPIOPin, exported, In, Out +import os +from quick2wire.gpio import pins, In, Out, PullDown, gpio_admin import pytest @pytest.mark.gpio @pytest.mark.loopback -class TestPin: - def setup_method(self, method): - self.pin = GPIOPin(0) - - def teardown_method(self, method): - if self.pin.is_exported: - self.pin.unexport() - - def test_pin_must_be_exported_before_use(self): - with pytest.raises(IOError): - self.pin.value - self.pin.export() - self.pin.value +class TestGPIO: + def test_pin_must_be_opened_before_use_and_is_unusable_after_being_closed(self): + pin = pins.pin(0) - def test_pin_can_be_unexported_and_made_unusable(self): - self.pin.export() - self.pin.unexport() with pytest.raises(IOError): - self.pin.value - - def test_can_set_and_query_direction_of_pin(self): - self.pin.export() + pin.value - self.pin.direction = Out - assert self.pin.direction == Out + pin.open() + try: + pin.value + finally: + pin.close() - self.pin.direction = In - assert self.pin.direction == In - + with pytest.raises(IOError): + pin.value + + + def test_opens_and_closes_itself_when_used_as_a_context_manager(self): + pin = pins.pin(0) - def test_can_set_value_of_output_pin(self): - self.pin.export() + with pin: + pin.value - self.pin.direction = Out + with pytest.raises(IOError): + pin.value + + + def test_exports_gpio_device_to_userspace_when_opened_and_unexports_when_closed(self): + with pins.pin(0) as pin: + assert os.path.exists('/sys/class/gpio/gpio17/value') - self.pin.value = 1 - assert self.pin.value == 1 + assert not os.path.exists('/sys/class/gpio/gpio17/value') + + + def test_can_set_and_query_direction_of_pin_when_open(self): + with pins.pin(0) as pin: + pin.direction = Out + assert pin.direction == Out + + assert content_of("/sys/class/gpio/gpio17/direction") == "out\n" + + pin.direction = In + assert pin.direction == In + + assert content_of("/sys/class/gpio/gpio17/direction") == "in\n" + + + def test_can_set_direction_on_construction(self): + pin = pins.pin(0, Out) - self.pin.value = 0 - assert self.pin.value == 0 + assert pin.direction == Out + assert not os.path.exists("/sys/class/gpio/gpio17/direction") - def test_can_export_pin_and_set_direction_on_construction(self): - p = GPIOPin(0, Out) + with pin: + assert content_of("/sys/class/gpio/gpio17/direction") == "out\n" + assert pin.direction == Out + + + def test_setting_value_of_output_pin_writes_to_device_file(self): + with pins.pin(0) as pin: + pin.direction = Out + + pin.value = 1 + assert pin.value == 1 + assert content_of('/sys/class/gpio/gpio17/value') == '1\n' + + pin.value = 0 + assert pin.value == 0 + assert content_of('/sys/class/gpio/gpio17/value') == '0\n' + + + def test_direction_and_value_of_pin_is_reset_when_closed(self): + with pins.pin(0, Out) as pin: + pin.value = 1 - assert p.is_exported - assert p.direction == Out + gpio_admin("export", 17, PullDown) + try: + assert content_of('/sys/class/gpio/gpio17/value') == '0\n' + assert content_of('/sys/class/gpio/gpio17/direction') == 'in\n' + finally: + gpio_admin("unexport", 17) - def test_can_read_after_write(self): - outpin = GPIOPin(0, Out) + def test_cannot_get_a_pin_with_an_invalid_index(self): + with pytest.raises(IndexError): + pins.pin(-1) - outpin.value = 1 - assert self.pin.value == 1 - with open('/sys/class/gpio/gpio17/value', 'r+') as f: - assert f.read() == '1\n' - - + with pytest.raises(IndexError): + pins.pin(len(pins)) -@pytest.mark.gpio -@pytest.mark.loopback -class TestExportedContextManager: - def test_can_automatically_unexport_pin_with_context_manager(self): - with exported(GPIOPin(0)) as p: - assert p.is_exported - - p = GPIOPin(0) - assert not p.is_exported - - def test_can_use_context_manager_with_pin_exported_by_constructor(self): - with exported(GPIOPin(0, Out)) as p: - assert p.is_exported - - p = GPIOPin(0) - assert not p.is_exported - - def test_can_use_context_manager_with_pin_already_exported(self): - GPIOPin(0).export() - self.test_can_automatically_unexport_pin_with_context_manager() +def content_of(filename): + with open(filename, 'r') as f: + return f.read() + diff --git a/quick2wire/test_gpio_loopback.py b/quick2wire/test_gpio_loopback.py index 92f1107..9593fe2 100644 --- a/quick2wire/test_gpio_loopback.py +++ b/quick2wire/test_gpio_loopback.py @@ -1,5 +1,5 @@ -from quick2wire.gpio import HeaderPin, GPIOPin, In, Out, exported +from quick2wire.gpio import pins, pi_header_1, In, Out from time import sleep import pytest @@ -12,7 +12,7 @@ def inverse(topology): @pytest.mark.gpio def test_gpio_loopback(): assert_outputs_seen_at_corresponding_inputs( - GPIOPin, + pins, [(i,i+4) for i in range(4)]) @@ -20,24 +20,24 @@ def test_gpio_loopback(): @pytest.mark.gpio def test_gpio_loopback_by_header_pin(): assert_outputs_seen_at_corresponding_inputs( - HeaderPin, + pi_header_1, [(11,16), (12,18), (13,22), (15,7)]) -def assert_outputs_seen_at_corresponding_inputs(pin_type, topology): + +def assert_outputs_seen_at_corresponding_inputs(pin_bank, topology): for (op, ip) in topology: - assert_output_seen_at_input(pin_type, op, ip) + assert_output_seen_at_input(pin_bank, op, ip) for (op, ip) in inverse(topology): - assert_output_seen_at_input(pin_type, op, ip) + assert_output_seen_at_input(pin_bank, op, ip) -def assert_output_seen_at_input(pin_type, op, ip): - with exported(pin_type(op, direction=Out)) as output_pin, exported(pin_type(ip, direction=In)) as input_pin: +def assert_output_seen_at_input(pin_bank, op, ip): + output_pin = pin_bank.pin(op, direction=Out) + input_pin = pin_bank.pin(ip, direction=In) + + with output_pin, input_pin: for value in [1, 0, 1, 0]: output_pin.value = value assert input_pin.value == value - print(op, "->", ip,": ok!") - - -