Skip to content

Commit

Permalink
Merge pull request #1141 from SmithChart/sigrokdmm
Browse files Browse the repository at this point in the history
driver/sigrok: Add sigrok driver for DMMs
  • Loading branch information
Bastian-Krause committed Jul 25, 2023
2 parents 6872745 + e921a4a commit b409e1d
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 1 deletion.
34 changes: 34 additions & 0 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2450,6 +2450,40 @@ Arguments:
- max_current (float): optional, maximum allowed current for protection against
accidental damage (in ampere)

SigrokDmmDriver
~~~~~~~~~~~~~~~
The `SigrokDmmDriver` uses a `SigrokDevice` resource to record samples from a digital multimeter (DMM) and provides
them during test runs.

It is known to work with Unit-T `UT61B` and `UT61C` devices but should also work with other DMMs supported by *sigrok*.

Binds to:
sigrok:
- `SigrokUSBDevice`_
- `SigrokUSBSerialDevice`_
- `NetworkSigrokUSBDevice`_
- NetworkSigrokUSBSerialDevice

Implements:
- None yet

Arguments:
- None

Sampling can be started calling `capture(samples, timeout=None)`.
It sets up sampling and returns immediately.
The default timeout has been chosen to work with Unit-T `UT61B`.
Other devices may require a different timeout setting.

Samples can be obtained using `stop()`.
`stop()` will block until either *sigrok* terminates or `timeout` is reached.
This method returns a `(unit, samples)` tuple:
`unit` is the physical unit reported by the DMM;
samples is an iterable of samples.

This driver relies on buffering of the subprocess call.
Reading a few samples will very likely work - but obtaining a lot of samples may stall.

USBSDMuxDriver
~~~~~~~~~~~~~~
The :any:`USBSDMuxDriver` uses a USBSDMuxDevice resource to control a
Expand Down
2 changes: 1 addition & 1 deletion labgrid/driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from .qemudriver import QEMUDriver
from .modbusdriver import ModbusCoilDriver
from .modbusrtudriver import ModbusRTUDriver
from .sigrokdriver import SigrokDriver, SigrokPowerDriver
from .sigrokdriver import SigrokDriver, SigrokPowerDriver, SigrokDmmDriver
from .usbstoragedriver import USBStorageDriver, NetworkUSBStorageDriver, Mode
from .resetdriver import DigitalOutputResetDriver
from .gpiodriver import GpioDigitalOutputDriver
Expand Down
113 changes: 113 additions & 0 deletions labgrid/driver/sigrokdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from .common import Driver, check_file
from .exception import ExecutionError
from .powerdriver import PowerResetMixin
from ..util import Timeout


@attr.s(eq=False)
Expand Down Expand Up @@ -329,3 +330,115 @@ def measure(self):
if len(res) != 2:
raise ExecutionError(f"Cannot parse --show output {out}")
return res

@target_factory.reg_driver
@attr.s(eq=False)
class SigrokDmmDriver(SigrokCommon):
"""
This driver wraps around a single channel DMM controlled by sigrok.
It has been tested with Unit-T UT61C and UT61B devices but probably also
works with other single chnnel DMMs.
This driver binds to a SigrokUsbDevice.
Make sure to select the correct driver for your DMM there.
Example usage:
> resources:
> - SigrokUSBDevice:
> driver: uni-t-ut61c
> match:
> 'ID_PATH': pci-0000:07:00.4-usb-0:2:1.0
> drivers:
> - SigrokDmmDriver: {}
Args:
bindings (dict): driver to use with sigrok
"""

bindings = {
"sigrok": {SigrokUSBSerialDevice, NetworkSigrokUSBSerialDevice, SigrokUSBDevice, NetworkSigrokUSBDevice},
}

@Driver.check_active
@step(result=True)
def capture(self, samples, timeout=None):
"""
Starts to read samples from the DMM.
This method returns once sampling has been started. Sampling continues in the background.
Note: We use subprocess.PIPE to buffer the samples.
When this buffer is too small for the number of samples requested sampling may stall.
Args:
samples: Number of samples to obtain
timeout: Timeout after which sampling should be stopped.
If None: timeout[s] = samples * 1s + 5s
If int: Timeout in [s]
Raises:
RuntimeError() if a capture is already running.
"""
if self._running:
raise RuntimeError("capture is already running")

if not timeout:
timeout = samples + 5.0

args = f"-O csv --samples {samples}".split(" ")
self._call_with_driver(*args)
self._timeout = Timeout(timeout)
self._running = True

@Driver.check_active
@step(result=True)
def stop(self):
"""
Waits for sigrok to complete and returns all samples obtained afterwards.
This function blocks until either sigrok has terminated or the timeout has been reached.
Returns:
(unit_spec, [sample, ...])
Raises:
RuntimeError() if capture has not been started
"""
if not self._running:
raise RuntimeError("no capture started yet")
while not self._timeout.expired:
if self._process.poll() is not None:
# process has finished. no need to wait for the timeout
break
time.sleep(0.1)
else:
# process did not finish in time
self.log.info("sigrok-cli did not finish in time, increase timeout?")
self._process.kill()

res = []
unit = ""
for line in self._process.stdout.readlines():
line = line.strip()
if b";" in line:
# discard header information
continue
if not unit:
# the first line after the header contains the unit information
unit = line.decode()
else:
# all other lines are actual values
res.append(float(line))
_, stderr = self._process.communicate()
self.log.debug("stderr: %s", stderr)

self._running = False
return unit, res

def on_activate(self):
# This driver does not use self._tmpdir from SigrockCommon.
# Overriding this function to inhibit the temp-dir creation.
pass

def on_deactivate(self):
# This driver does not use self._tmpdir from SigrockCommon.
# Overriding this function to inhibit the temp-dir creation.
pass

0 comments on commit b409e1d

Please sign in to comment.