Skip to content
Permalink
Browse files

gpiodriver: implement agent interfaces using sysfs

This allows to export local GPIO resources for consumption by remote
clients.

It is implemented using the deprecated `sysfs` interface, because
the `libgpiod` interface does not retain its state once the process
exits. A behavior that is expected by labgrid.

Signed-off-by: Leif Middelschulte <leif.middelschulte@klsmartin.com>
[Jan Luebbe: squashed additional fixes into this commit]
Signed-off-by: Jan Luebbe <jlu@pengutronix.de>
  • Loading branch information...
leiflm authored and jluebbe committed Jul 8, 2019
1 parent aab70d5 commit 23d6aa4480570fe61c7de4cf5bec6aee45615305
@@ -4,7 +4,7 @@ Release 0.3.0 (unreleased)
New Features in 0.3.0
~~~~~~~~~~~~~~~~~~~~~

- The new `GpioDigitalOutputDriver` controls the state of a GPIO via libgpiod.
- The new `GpioDigitalOutputDriver` controls the state of a GPIO via the sysfs interface.
- Crossbar and autobahn have been updated to 19.3.3 and 19.3.5 respectively.
- The InfoDriver was removed. The functions have been integrated into the
labgridhelper library, please use the library for the old functionality.
@@ -1092,20 +1092,19 @@ GpioDigitalOutputDriver
~~~~~~~~~~~~~~~~~~~~~~~
The GpioDigitalOutputDriver writes a digital signal to a GPIO line.

This driver uses the libgpiod python bindings.
Make sure to have them installed.
This driver configures GPIO lines via `the sysfs kernel interface <https://www.kernel.org/doc/html/latest/gpio/sysfs.html>`.
While the driver automatically exports the GPIO, it does not configure it in any other way than as an output.

Implements:
- :any:`DigitalOutputProtocol`

.. code-block:: yaml
GpioDigitalOutputDriver:
offset: 42
index: 42
Arguments:
- chip (str): A GPIO device's path, name, label or number (default: "0")
- offset (int): The offset to a GPIO line
- index (int): The index of a GPIO line

SerialPortDigitalOutputDriver
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -0,0 +1,3 @@
desk:
GpioDigitalOutputDriver:
index: 60
@@ -0,0 +1,9 @@
targets:
main:
resources:
RemotePlace:
name: gpio
drivers:
GpioDigitalOutputDriver: {}
options:
crossbar_url: 'ws://labgrid:20408/ws'
@@ -0,0 +1,32 @@
import sys
import labgrid
import logging
import time

from labgrid import Environment, StepReporter
from labgrid.driver.gpiodriver import GpioDigitalOutputDriver

# enable debug logging
logging.basicConfig(
level=logging.DEBUG,
format='%(levelname)7s: %(message)s',
stream=sys.stderr,
)

# show labgrid steps on the console
StepReporter()

t = labgrid.Target('main')
r = labgrid.resource.base.SysfsGPIO(t, name=None, index=60)
d = GpioDigitalOutputDriver(t, name=None)

p = t.get_driver("DigitalOutputProtocol")
print(t.resources)
p.set(True)
print(p.get())
time.sleep(2)
p.set(False)
print(p.get())
time.sleep(2)
p.set(True)
print(p.get())
@@ -0,0 +1,31 @@
import sys
import labgrid
import logging
import time

from labgrid import Environment, StepReporter
from labgrid.driver.gpiodriver import GpioDigitalOutputDriver

# enable debug logging
logging.basicConfig(
level=logging.DEBUG,
format='%(levelname)7s: %(message)s',
stream=sys.stderr,
)

# show labgrid steps on the console
StepReporter()

e = labgrid.Environment('import-gpio.yaml')
t = e.get_target()

p = t.get_driver("DigitalOutputProtocol")
print(t.resources)
p.set(True)
print(p.get())
time.sleep(2)
p.set(False)
print(p.get())
time.sleep(2)
p.set(True)
print(p.get())
@@ -3,49 +3,44 @@

from ..factory import target_factory
from ..protocol import DigitalOutputProtocol
from ..resource.base import SysfsGPIO
from ..resource.remote import NetworkSysfsGPIO
from ..step import step
from .common import Driver
from ..util.agentwrapper import AgentWrapper


@target_factory.reg_driver
@attr.s(cmp=False)
class GpioDigitalOutputDriver(Driver, DigitalOutputProtocol):
"""
Controls the state of a GPIO using libgpiod.

Takes a string property 'chip' which refers to the GPIO device.
You can use its path, name, label or number (default: 0).
The offset to the GPIO line is set by the integer property 'offset'.
"""

bindings = {}
offset = attr.ib(validator=attr.validators.instance_of(int))
chip = attr.ib(default="0", validator=attr.validators.instance_of(str))
gpio_chip = None
gpio_state = True
bindings = {
"gpio": {SysfsGPIO, NetworkSysfsGPIO},
}

def __attrs_post_init__(self):
super().__attrs_post_init__()
self.wrapper = None

def on_activate(self):
import gpiod
self.gpio_chip = gpiod.Chip(self.chip)
gpio_line = self.gpio_chip.get_line(self.offset)
gpio_line.request("labgrid", gpiod.LINE_REQ_DIR_OUT)
if isinstance(self.gpio, NetworkSysfsGPIO):
host = self.gpio.host
else:
host = None
self.wrapper = AgentWrapper(host)
self.proxy = self.wrapper.load('sysfsgpio')

def on_deactivate(self):
gpio_line = self.gpio_chip.get_line(self.offset)
gpio_line.release()
self.gpio_chip.close()
self.wrapper.close()
self.wrapper = None
self.proxy = None

@Driver.check_active
@step()
def get(self):
return self.gpio_state
@step(args=['status'])
def set(self, status):
self.proxy.set(self.gpio.index, status)

@Driver.check_active
@step()
def set(self, status):
gpio_line = self.gpio_chip.get_line(self.offset)
gpio_line.set_value(int(status))
self.gpio_state = status
@step(result=True)
def get(self):
return self.proxy.get(self.gpio.index)
@@ -666,9 +666,11 @@ def digital_io(self):
from ..resource.modbus import ModbusTCPCoil
from ..resource.onewireport import OneWirePIO
from ..resource.remote import NetworkDeditecRelais8
from ..resource.remote import NetworkSysfsGPIO
from ..driver.modbusdriver import ModbusCoilDriver
from ..driver.onewiredriver import OneWirePIODriver
from ..driver.deditecrelaisdriver import DeditecRelaisDriver
from ..driver.gpiodriver import GpioDigitalOutputDriver
drv = None
for resource in target.resources:
if isinstance(resource, ModbusTCPCoil):
@@ -692,6 +694,13 @@ def digital_io(self):
target.set_binding_map({"relais": name})
drv = DeditecRelaisDriver(target, name=name)
break
elif isinstance(resource, NetworkSysfsGPIO):
try:
drv = target.get_driver(GpioDigitalOutputDriver, name=name)
except NoDriverFoundError:
target.set_binding_map({"gpio": name})
drv = GpioDigitalOutputDriver(target, name=name)
break
if not drv:
raise UserError("target has no compatible resource available")
target.activate(drv)
@@ -330,6 +330,28 @@ def _get_params(self):
exports["SNMPEthernetPort"] = EthernetPortExport


@attr.s(cmp=False)
class GPIOGenericExport(ResourceExport):
"""ResourceExport for GPIO lines accessed directly from userspace"""

def __attrs_post_init__(self):
super().__attrs_post_init__()
local_cls_name = self.cls
self.data['cls'] = "Network{}".format(self.cls)
from ..resource import udev
local_cls = getattr(udev, local_cls_name)
self.local = local_cls(target=None, name=None, **self.local_params)

def _get_params(self):
"""Helper function to return parameters"""
return {
'host': self.host,
'index': self.local.index,
}

exports["SysfsGPIO"] = GPIOGenericExport


class ExporterSession(ApplicationSession):
def onConnect(self):
"""Set up internal datastructures on successful connection:
@@ -33,3 +33,14 @@ class EthernetPort(Resource):
interface (str): name of the interface"""
switch = attr.ib(default=None)
interface = attr.ib(default=None)


@target_factory.reg_resource
@attr.s(cmp=False)
class SysfsGPIO(Resource):
"""The basic SysfsGPIO contains an index
Args:
index (int): index of target gpio line."""
index = attr.ib(default=None, validator=attr.validators.instance_of(int))

@@ -217,3 +217,15 @@ class NetworkDeditecRelais8(RemoteUSBResource):
def __attrs_post_init__(self):
self.timeout = 10.0
super().__attrs_post_init__()


@target_factory.reg_resource
@attr.s(cmp=False)
class NetworkSysfsGPIO(NetworkResource, ManagedResource):
manager_cls = RemotePlaceManager

"""The NetworkSysfsGPIO describes a remotely accessible gpio line"""
index = attr.ib(validator=attr.validators.optional(attr.validators.instance_of(int)))
def __attrs_post_init__(self):
self.timeout = 10.0
super().__attrs_post_init__()
@@ -0,0 +1,83 @@
"""
This module implements switching GPIOs via sysfs GPIO kernel interface.
Takes an integer property 'index' which refers to the already exported GPIO device.
"""
import logging
import os

class GpioDigitalOutput:
_gpio_sysfs_path_prefix = '/sys/class/gpio'
_buffered_file_access=False

def _assert_gpio_line_is_exported(index):
gpio_sysfs_path = os.path.join(GpioDigitalOutput._gpio_sysfs_path_prefix, 'gpio{0}'.format(index))
if not os.path.exists(gpio_sysfs_path):
export_sysfs_path = os.path.join(GpioDigitalOutput._gpio_sysfs_path_prefix, 'export')
with open(export_sysfs_path, mode = 'r+', buffering = GpioDigitalOutput._buffered_file_access, closefd = True) as export:
export.write(str(index))
if not os.path.exists(gpio_sysfs_path):
raise ValueError("Device not found")

def __init__(self, **kwargs):
index = kwargs['index']
self._logger = logging.getLogger("Device: ")
GpioDigitalOutput._assert_gpio_line_is_exported(index)
gpio_sysfs_path = os.path.join(GpioDigitalOutput._gpio_sysfs_path_prefix, 'gpio{0}'.format(index))
gpio_sysfs_direction_path = os.path.join(gpio_sysfs_path, 'direction')
self._logger.debug("Configuring GPIO {idx} as output.".format(idx = index))
with open(gpio_sysfs_direction_path, 'wb') as direction_fd:
direction_fd.write(b'out')
gpio_sysfs_value_path = os.path.join(gpio_sysfs_path, 'value')
self.gpio_sysfs_value_fd = os.open(gpio_sysfs_value_path, flags=(os.O_RDWR | os.O_SYNC))

def __del__(self):
os.close(self.gpio_sysfs_value_fd)
self.gpio_sysfs_value_fd = None

def get(self):
os.lseek(self.gpio_sysfs_value_fd, 0, os.SEEK_SET)
literal_value = os.read(self.gpio_sysfs_value_fd, 1)
if literal_value == b'0':
return False
elif literal_value == b'1':
return True
raise ValueError("GPIO value is out of range.")

def set(self, status):
self._logger.debug(
"Setting GPIO to `{}`.".format(status))
binary_value = None
if status is True:
binary_value = b'1'
elif status is False:
binary_value = b'0'

if binary_value is None:
raise ValueError("GPIO value is out of range.")

os.write(self.gpio_sysfs_value_fd, binary_value)


_gpios = {}

def _get_gpio_line(_index):
if _index not in _gpios:
_gpios[_index] = GpioDigitalOutput(index = _index)
return _gpios[_index]

def handle_set(index, status):
gpio_line = _get_gpio_line(index)
gpio_line.set(status)


def handle_get(index):
gpio_line = _get_gpio_line(index)
return gpio_line.get()


methods = {
'set': handle_set,
'get': handle_get,
}
@@ -102,8 +102,13 @@ def test_all_modules():
methods = aw.list()
assert 'deditec_relais8.set' in methods
assert 'deditec_relais8.get' in methods
aw.load('sysfsgpio')
methods = aw.list()
assert 'sysfsgpio.set' in methods
assert 'sysfsgpio.get' in methods


def test_import_modules():
import labgrid.util.agents
import labgrid.util.agents.dummy
import labgrid.util.agents.deditec_relais8
from labgrid.util.agents import deditec_relais8, sysfsgpio

0 comments on commit 23d6aa4

Please sign in to comment.
You can’t perform that action at this time.