Skip to content
Permalink
Browse files

Merge pull request #432 from jluebbe/topic/relais8

Add agent module, resource and driver for Dedidec Relais 8 device
  • Loading branch information...
jluebbe committed May 28, 2019
2 parents 712d1bd + 487703a commit 9091a772d94caa6a73a3a7fe31df5d5a7d14a56e
@@ -0,0 +1,33 @@
import sys
import labgrid
import logging
import time

from labgrid import Environment, StepReporter
from labgrid.strategy.bareboxstrategy import Status
from labgrid.driver.deditecrelaisdriver import DeditecRelaisDriver

# enable debug logging
logging.basicConfig(
level=logging.DEBUG,
format='%(levelname)7s: %(message)s',
stream=sys.stderr,
)

# show labgrid steps on the console
StepReporter()

t = labgrid.Target('main')
r = labgrid.resource.udev.DeditecRelais8(t, name=None, index=1)
d = DeditecRelaisDriver(t, name=None)

p = t.get_driver("DigitalOutputProtocol")
print(t.resources)
p.set(True)
print(p.get())
time.sleep(2)
p.set(False)
print(p.get())
time.sleep(2)
p.set(True)
print(p.get())
@@ -0,0 +1,32 @@
import sys
import labgrid
import logging
import time

from labgrid import Environment, StepReporter
from labgrid.strategy.bareboxstrategy import Status
from labgrid.driver.deditecrelaisdriver import DeditecRelaisDriver

# enable debug logging
logging.basicConfig(
level=logging.DEBUG,
format='%(levelname)7s: %(message)s',
stream=sys.stderr,
)

# show labgrid steps on the console
StepReporter()

e = labgrid.Environment('import-dedicontrol.yaml')
t = e.get_target()

p = t.get_driver("DigitalOutputProtocol")
print(t.resources)
p.set(True)
print(p.get())
time.sleep(2)
p.set(False)
print(p.get())
time.sleep(2)
p.set(True)
print(p.get())
@@ -0,0 +1,3 @@
desk:
DeditecRelais8:
index: 2
@@ -0,0 +1,9 @@
targets:
main:
resources:
RemotePlace:
name: dedi
drivers:
DeditecRelaisDriver: {}
options:
crossbar_url: 'ws://labgrid:20408/ws'
@@ -0,0 +1,46 @@
# pylint: disable=no-member
import subprocess
import attr

from .common import Driver
from ..factory import target_factory
from ..resource.udev import DeditecRelais8
from ..resource.remote import NetworkDeditecRelais8
from ..step import step
from ..protocol import DigitalOutputProtocol
from ..util.agentwrapper import AgentWrapper


@target_factory.reg_driver
@attr.s(cmp=False)
class DeditecRelaisDriver(Driver, DigitalOutputProtocol):
bindings = {
"relais": {DeditecRelais8, NetworkDeditecRelais8},
}

def __attrs_post_init__(self):
super().__attrs_post_init__()
self.wrapper = None

def on_activate(self):
if isinstance(self.relais, NetworkDeditecRelais8):
host = self.relais.host
else:
host = None
self.wrapper = AgentWrapper(host)
self.proxy = self.wrapper.load('deditec_relais8')

def on_deactivate(self):
self.wrapper.close()
self.wrapper = None
self.proxy = None

@Driver.check_active
@step(args=['status'])
def set(self, status):
self.proxy.set(self.relais.busnum, self.relais.devnum, self.relais.index, status)

@Driver.check_active
@step(result=True)
def get(self):
return self.proxy.get(self.relais.busnum, self.relais.devnum, self.relais.index)
@@ -660,8 +660,10 @@ def digital_io(self):
target = self._get_target(place)
from ..resource.modbus import ModbusTCPCoil
from ..resource.onewireport import OneWirePIO
from ..resource.remote import NetworkDeditecRelais8
from ..driver.modbusdriver import ModbusCoilDriver
from ..driver.onewiredriver import OneWirePIODriver
from ..driver.deditecrelaisdriver import DeditecRelaisDriver
drv = None
for resource in target.resources:
if isinstance(resource, ModbusTCPCoil):
@@ -678,6 +680,13 @@ def digital_io(self):
target.set_binding_map({"port": name})
drv = OneWirePIODriver(target, name=name)
break
elif isinstance(resource, NetworkDeditecRelais8):
try:
drv = target.get_driver(DeditecRelaisDriver, name=name)
except NoDriverFoundError:
target.set_binding_map({"relais": name})
drv = DeditecRelaisDriver(target, name=name)
break
if not drv:
raise UserError("target has no compatible resource available")
target.activate(drv)
@@ -274,6 +274,25 @@ def _get_params(self):
'index': self.local.index,
}

@attr.s(cmp=False)
class USBDeditecRelaisExport(USBGenericExport):
"""ResourceExport for outputs on deditec relais"""

def __attrs_post_init__(self):
super().__attrs_post_init__()

def _get_params(self):
"""Helper function to return parameters"""
return {
'host': self.host,
'busnum': self.local.busnum,
'devnum': self.local.devnum,
'path': self.local.path,
'vendor_id': self.local.vendor_id,
'model_id': self.local.model_id,
'index': self.local.index,
}


exports["AndroidFastboot"] = USBGenericExport
exports["IMXUSBLoader"] = USBGenericExport
@@ -286,6 +305,7 @@ def _get_params(self):
exports["USBVideo"] = USBGenericExport
exports["USBTMC"] = USBGenericExport
exports["USBPowerPort"] = USBPowerPortExport
exports["DeditecRelais8"] = USBDeditecRelaisExport


@attr.s
@@ -199,3 +199,13 @@ class NetworkUSBTMC(RemoteUSBResource):
def __attrs_post_init__(self):
self.timeout = 10.0
super().__attrs_post_init__()


@target_factory.reg_resource
@attr.s(cmp=False)
class NetworkDeditecRelais8(RemoteUSBResource):
"""The NetworkDeditecRelais8 describes a remotely accessible USB relais port"""
index = attr.ib(default=None, validator=attr.validators.instance_of(int))
def __attrs_post_init__(self):
self.timeout = 10.0
super().__attrs_post_init__()
@@ -385,3 +385,21 @@ def path(self):
return self.device.device_node

return None

@target_factory.reg_resource
@attr.s(cmp=False)
class DeditecRelais8(USBResource):
index = attr.ib(default=None, validator=attr.validators.instance_of(int))

def __attrs_post_init__(self):
self.match['ID_VENDOR'] = 'DEDITEC'
# the serial is the same for all boards with the same model
self.match['ID_SERIAL_SHORT'] = 'DT000014'
super().__attrs_post_init__()

@property
def path(self):
if self.device is not None:
return self.device.device_path

return None
@@ -16,11 +16,18 @@ class Agent:
def __init__(self):
self.methods = {}
self.register('load', self.load)
self.register('list', self.list)

@staticmethod
def _send(data):
sys.stdout.write(json.dumps(data)+'\n')
sys.stdout.flush()
# use real stdin/stdout
self.stdin = sys.stdin
self.stdout = sys.stdout

# use stderr for normal prints
sys.stdout = sys.stderr

def send(self, data):
self.stdout.write(json.dumps(data)+'\n')
self.stdout.flush()

def register(self, name, func):
assert name not in self.methods
@@ -32,15 +39,18 @@ def load(self, name, source):
for k, v in module.methods.items():
self.register('{}.{}'.format(name, k), v)

def list(self):
return list(self.methods.keys())

def run(self):
for line in sys.stdin:
for line in self.stdin:
if not line:
continue

try:
request = json.loads(line)
except json.JSONDecodeError:
Agent._send({'error': 'request parsing failed for {}'.format(repr(line))})
self.send({'error': 'request parsing failed for {}'.format(repr(line))})
break

if request.get('close', False):
@@ -51,14 +61,14 @@ def run(self):
kwargs = request['kwargs']
try:
response = self.methods[name](*args, **kwargs)
Agent._send({'result': response})
self.send({'result': response})
except Exception as e: # pylint: disable=broad-except
import traceback
try:
tb = [list(x) for x in traceback.extract_tb(sys.exc_info()[2])]
except:
tb = None
Agent._send({'exception': repr(e), 'tb': tb})
self.send({'exception': repr(e), 'tb': tb})

def handle_test(*args, **kwargs): # pylint: disable=unused-argument
return args[::-1]

0 comments on commit 9091a77

Please sign in to comment.
You can’t perform that action at this time.