Skip to content

Commit

Permalink
Merge pull request #1224 from hnez/httpdigitaloutput
Browse files Browse the repository at this point in the history
httpdigitaloutput: support generic digital outputs via HTTP
  • Loading branch information
Emantor committed Jul 26, 2023
2 parents b409e1d + d8279dd commit 3e1c0df
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 1 deletion.
54 changes: 54 additions & 0 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,39 @@ Arguments:
Used by:
- `HIDRelayDriver`_

HttpDigitalOutput
+++++++++++++++++
A ``HttpDigitalOutput`` resource describes a generic digital output that can be
controlled via HTTP.

.. code-block:: yaml
HttpDigitalOutput:
url: http://host.example/some/endpoint
body_asserted: "On"
body_deasserted: "Off"
The example assumes a simple scenario where the same URL is used for PUT
requests that set the output state and GET requests to get the current state.
It also assumes that the returned state matches either "On" or "Off" exactly.

The `HttpDigitalOutputDriver`_ also supports more advanced use cases where the
current state is fetched from another URL and is interpreted using regular
expressions.

Arguments:
- url (str): URL to use for setting a new state
- body_asserted (str): Request body to send to assert the output
- body_deasserted (str): Request body to send to de-assert the output
- method (str, default="PUT"): HTTP method to set a new state

- url_get (str): URL to use instead of ``url`` for getting the state
- body_get_asserted (str): Regular Expression that matches an asserted response body
- body_get_deasserted (str): Regular Expression that matches a de-asserted response body

Used by:
- `HttpDigitalOutputDriver`_

NetworkHIDRelay
+++++++++++++++
A NetworkHIDRelay describes an `HIDRelay`_ exported over the network.
Expand Down Expand Up @@ -2799,6 +2832,27 @@ Implements:
Arguments:
- None


HttpDigitalOutputDriver
~~~~~~~~~~~~~~~~~~~~~~~
A HttpDigitalOutputDriver binds to a `HttpDigitalOutput` to set and get a
digital output state via HTTP.

Binds to:
http:
- `HttpDigitalOutput`_

.. code-block:: yaml
HttpDigitalOutputDriver: {}
Implements:
- :any:`DigitalOutputProtocol`

Arguments:
- None


PyVISADriver
~~~~~~~~~~~~
The PyVISADriver uses a PyVISADevice resource to control test equipment manageable by PyVISA.
Expand Down
1 change: 1 addition & 0 deletions labgrid/driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@
from .usbtmcdriver import USBTMCDriver
from .deditecrelaisdriver import DeditecRelaisDriver
from .dediprogflashdriver import DediprogFlashDriver
from .httpdigitaloutput import HttpDigitalOutputDriver
71 changes: 71 additions & 0 deletions labgrid/driver/httpdigitaloutput.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import re
from importlib import import_module

import attr

from ..factory import target_factory
from ..protocol import DigitalOutputProtocol
from ..step import step
from ..util.proxy import proxymanager
from .common import Driver
from .exception import ExecutionError


@target_factory.reg_driver
@attr.s(eq=False)
class HttpDigitalOutputDriver(Driver, DigitalOutputProtocol):
bindings = { "http": "HttpDigitalOutput" }

def __attrs_post_init__(self):
super().__attrs_post_init__()
self._requests = import_module("requests")

def on_activate(self):
self._url_set = proxymanager.get_url(
self.http.url,
default_port=(443 if self.http.url.startswith("https") else 80),
)

if self.http.url_get:
self._url_get = proxymanager.get_url(
self.http.url_get,
default_port=(443 if self.http.url_get.startswith("https") else 80),
)

else:
self._url_get = self._url_set

@Driver.check_active
@step(args=["status"])
def set(self, status):
method = self.http.method or "PUT"
body = self.http.body_asserted if status else self.http.body_deasserted

res = self._requests.request(method, self._url_set, data=body)
res.raise_for_status()

@Driver.check_active
@step(result=["True"])
def get(self):
res = self._requests.get(self._url_get)
res.raise_for_status()

# Check if the response body matches an asserted state
if self.http.body_get_asserted:
if re.fullmatch(self.http.body_get_asserted, res.text) is not None:
return True

elif res.text == self.http.body_asserted:
return True

# Check if the response body matches a de-asserted state
if self.http.body_get_deasserted:
if re.fullmatch(self.http.body_get_deasserted, res.text) is not None:
return False

elif res.text == self.http.body_deasserted:
return False

raise ExecutionError(
f'response does not match asserted or deasserted state: "{res.text}"'
)
4 changes: 3 additions & 1 deletion labgrid/remote/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ def digital_io(self):
action = self.args.action
name = self.args.name
target = self._get_target(place)
from ..resource import ModbusTCPCoil, OneWirePIO
from ..resource import ModbusTCPCoil, OneWirePIO, HttpDigitalOutput
from ..resource.remote import (NetworkDeditecRelais8, NetworkSysfsGPIO, NetworkLXAIOBusPIO,
NetworkHIDRelay)

Expand All @@ -775,6 +775,8 @@ def digital_io(self):
drv = self._get_driver_or_new(target, "ModbusCoilDriver", name=name)
elif isinstance(resource, OneWirePIO):
drv = self._get_driver_or_new(target, "OneWirePIODriver", name=name)
elif isinstance(resource, HttpDigitalOutput):
drv = self._get_driver_or_new(target, "HttpDigitalOutputDriver", name=name)
elif isinstance(resource, NetworkDeditecRelais8):
drv = self._get_driver_or_new(target, "DeditecRelaisDriver", name=name)
elif isinstance(resource, NetworkSysfsGPIO):
Expand Down
1 change: 1 addition & 0 deletions labgrid/resource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
from .mqtt import TasmotaPowerPort
from .httpvideostream import HTTPVideoStream
from .dediprogflasher import DediprogFlasher, NetworkDediprogFlasher
from .httpdigitalout import HttpDigitalOutput
34 changes: 34 additions & 0 deletions labgrid/resource/httpdigitalout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import attr

from ..factory import target_factory
from .common import Resource


@target_factory.reg_resource
@attr.s(eq=False)
class HttpDigitalOutput(Resource):
"""This resource describes a generic HTTP-controlled output pin.
Args:
url (str): URL to use for setting a new state
body_asserted (str): Request body to send to assert the output
body_deasserted (str): Request body to send to de-assert the output
method (str): HTTP method to use instead of PUT (the default) to set a new state
url_get (str): URL to use for getting the state
body_get_asserted (str): Regular Expression that matches an asserted response body
body_get_deasserted (str): Regular Expression that matches a de-asserted response body
"""

url = attr.ib(validator=attr.validators.instance_of(str))
body_asserted = attr.ib(validator=attr.validators.instance_of(str))
body_deasserted = attr.ib(validator=attr.validators.instance_of(str))
method = attr.ib(default="PUT", validator=attr.validators.instance_of(str))

url_get = attr.ib(default="", validator=attr.validators.instance_of(str))
body_get_asserted = attr.ib(
default="", validator=attr.validators.instance_of(str)
)
body_get_deasserted = attr.ib(
default="", validator=attr.validators.instance_of(str)
)
97 changes: 97 additions & 0 deletions tests/test_httpdigitalout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import pytest
import requests

from labgrid.driver import HttpDigitalOutputDriver
from labgrid.resource import HttpDigitalOutput


@pytest.fixture(scope="function")
def mock_server(mocker):
state = '"Unknown"'

def request(method, url, data=None):
nonlocal state
state = data
return mocker.MagicMock()

def get(url):
r = mocker.MagicMock()
r.text = state
return r

mock_request = mocker.patch("requests.request")
mock_request.side_effect = request
mock_get = mocker.patch("requests.get")
mock_get.side_effect = get

return (mock_request, mock_get)


def _make_http_driver(target, with_tls, with_regex, separate_get, match_error):
scheme = "https" if with_tls else "http"
url = f"{scheme}://host.example/set"
url_get = f"{scheme}://host.example/get" if separate_get else ""

body_get_asserted = ".*n.*" if with_regex else ""
body_get_deasserted = ".*ff.*" if with_regex else ""

if match_error:
body_get_asserted = "--- DOES NOT MATCH ---"
body_get_deasserted = "--- DOES NOT MATCH EITHER ---"

dig_out_res = HttpDigitalOutput(
target,
name=None,
url=url,
body_asserted='"On"',
body_deasserted='"Off"',
method="PUT",
url_get=url_get,
body_get_asserted=body_get_asserted,
body_get_deasserted=body_get_deasserted,
)

http_driver = HttpDigitalOutputDriver(target, name=None)
target.activate(http_driver)

return http_driver


@pytest.mark.parametrize(
"asserted,with_tls,with_regex,separate_get",
[
(False, False, False, False),
(True, False, False, False),
(True, True, False, False),
(True, False, True, False),
(False, False, True, False),
(True, False, False, True),
(True, True, True, True),
],
)
def test_set_get(asserted, with_tls, with_regex, separate_get, target, mock_server):
http_driver = _make_http_driver(target, with_tls, with_regex, separate_get, False)
mock_request, mock_get = mock_server

data = '"On"' if asserted else '"Off"'
scheme = "https" if with_tls else "http"
port = 443 if with_tls else 80
get_endpoint = "get" if separate_get else "set"

set_url = f"{scheme}://host.example:{port}/set"
get_url = f"{scheme}://host.example:{port}/{get_endpoint}"

http_driver.set(asserted)
mock_request.assert_called_once_with("PUT", set_url, data=data)

assert http_driver.get() == asserted
mock_get.assert_called_once_with(get_url)


def test_match_exception(target, mock_server):
http_driver = _make_http_driver(target, False, False, False, True)
mock_request, mock_get = mock_server

http_driver.set(True)
with pytest.raises(Exception):
http_driver.get()

0 comments on commit 3e1c0df

Please sign in to comment.