Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

driver/sigrok: add blocking capture methods, misc improvements to sigrok driver #1345

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2710,6 +2710,9 @@ Arguments:
The driver can be used in test cases by calling its ``capture()``, ``stop()``
and ``analyze()`` methods.

To capture for a certain predetermined amount of time or number of samples,
methods ``capture_for_time()`` and ``capture_samples()`` can be used.

SigrokPowerDriver
~~~~~~~~~~~~~~~~~
The :any:`SigrokPowerDriver` uses a `SigrokUSBSerialDevice`_ resource to
Expand Down
182 changes: 166 additions & 16 deletions labgrid/driver/sigrokdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import re
import subprocess
import shutil
import signal
import tempfile
import time
import uuid
Expand Down Expand Up @@ -93,16 +92,38 @@ def _get_sigrok_prefix(self):
return self.sigrok.command_prefix + prefix

@Driver.check_active
@step(title='call', args=['args'])
@step(title='call_with_driver', args=['args'])
def _call_with_driver(self, *args):
combined = self._get_sigrok_prefix() + list(args)
self.logger.debug("Combined command: %s", " ".join(combined))
self._process = subprocess.Popen(
combined,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE
stderr=subprocess.PIPE,
text=True
)

@Driver.check_active
@step(title='call_with_driver_blocking', args=['args'])
def _call_with_driver_blocking(self, *args, log_output=False):
combined = self._get_sigrok_prefix() + list(args)
self.logger.debug("Combined command: %s", " ".join(combined))
process = subprocess.Popen(
combined,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate()
if log_output:
self.logger.debug("stdout: %s", stdout)
self.logger.debug("stderr: %s", stderr)
if process.returncode != 0:
raise OSError
return stdout, stderr


@Driver.check_active
@step(title='call', args=['args'])
Expand Down Expand Up @@ -134,6 +155,19 @@ class SigrokDriver(SigrokCommon):

@Driver.check_active
def capture(self, filename, samplerate="200k"):
"""
Starts to capture samples, in the background.

Args:
filename: the path to the file where the capture is saved.
samplerate: the sample-rate of the capture

Raises:
RuntimeError() if a capture is already running.
"""
if self._running:
raise RuntimeError("capture is already running")

self._filename = filename
self._basename = os.path.basename(self._filename)
self.logger.debug(
Expand All @@ -146,40 +180,137 @@ def capture(self, filename, samplerate="200k"):
filename = os.path.join(self._tmpdir, self._basename)
cmd.append(filename)
self._call_with_driver(*cmd)

args = self.sigrok.command_prefix + ['test', '-e', filename]

while subprocess.call(args):
# in case the sigrok-cli call fails, this would wait forever.
# to avoid this, we also check the spawned sigrok process
if self._process.poll() is not None:
ret = self._process.returncode
if ret != 0:
stdout, stderr = self._process.communicate()
self.logger.debug("sigrok-cli call terminated prematurely with non-zero return-code")
self.logger.debug("stdout: %s", stdout)
self.logger.debug("stderr: %s", stderr)
raise ExecutionError(f"sigrok-cli call terminated prematurely with return-code '{ret}'.")
sleep(0.1)

self._running = True

@Driver.check_active
def capture_for_time(self, filename, time_ms, samplerate="200k"):
"""
Captures samples for a specified time (ms).

Blocks while capturing.

Args:
filename: the path to the file where the capture is saved.
time: time (in ms) for capture duration
samplerate: the sample-rate of the capture

Returns:
The capture as a list containing dict's with fields "Time"
and the channel names

Raises:
OSError() if the subprocess returned with non-zero return-code
"""
return self._capture_blocking(filename, ["--time", str(time_ms)], samplerate)

@Driver.check_active
def capture_samples(self, filename, samples, samplerate="200k"):
"""
Captures a specified number of samples.

Blocks while capturing.

Args:
filename: the path to the file where the capture is saved.
samples: number of samples to capture
samplerate: the sample-rate of the capture

Returns:
The capture as a list containing dict's with fields "Time"
and the channel names
"""
return self._capture_blocking(filename, ["--samples", str(samples)], samplerate)

@Driver.check_active
def stop(self):
assert self._running
"""
Stops the capture and returns recorded samples.

Note that this method might block for several seconds because it needs
to wait for output, parse the capture file and prepare the list
containing the samples.

Returns:
The capture as a list containing dict's with fields "Time"
and the channel names

Raises:
RuntimeError() if capture has not been started
"""
if not self._running:
raise RuntimeError("no capture started yet")
self._running = False
fnames = ['time']
fnames.extend(self.sigrok.channels.split(','))

csv_filename = f'{os.path.splitext(self._basename)[0]}.csv'

self._process.send_signal(signal.SIGINT)
stdout, stderr = self._process.communicate()
# sigrok-cli can be quit through any keypress
stdout, stderr = self._process.communicate(input="q")
self.logger.debug("stdout: %s", stdout)
self.logger.debug("stderr: %s", stderr)

# Convert from .sr to .csv
self._convert_sr_csv(os.path.join(self._tmpdir, self._basename),
os.path.join(self._tmpdir, csv_filename))
self._transfer_tmp_file(csv_filename)
return self._process_csv(csv_filename)

def _capture_blocking(self, filename, capture_args, samplerate):
self._filename = filename
self._basename = os.path.basename(self._filename)
csv_filename = f'{os.path.splitext(self._basename)[0]}.csv'
self.logger.debug(
"Saving to: %s with basename: %s", self._filename, self._basename
)
cmd = [
"-l", "4", "--config", f"samplerate={samplerate}",
*capture_args, "-o"
]
filename = os.path.join(self._tmpdir, self._basename)
cmd.append(filename)
self._call_with_driver_blocking(*cmd, log_output=True)

args = self.sigrok.command_prefix + ['test', '-e', filename]

while subprocess.call(args):
sleep(0.1)

self._convert_sr_csv(os.path.join(self._tmpdir, self._basename),
os.path.join(self._tmpdir, csv_filename))
self._transfer_tmp_file(csv_filename)
return self._process_csv(csv_filename)

def _convert_sr_csv(self, file_path_sr, file_path_scv):
cmd = [
'-i',
os.path.join(self._tmpdir, self._basename), '-O', 'csv', '-o',
os.path.join(self._tmpdir, csv_filename)
file_path_sr, '-O', 'csv:time=true', '-o',
file_path_scv
]
self._call(*cmd)
stdout, stderr = self._process.communicate()
self.logger.debug("stdout: %s", stdout)
self.logger.debug("stderr: %s", stderr)


def _transfer_tmp_file(self, csv_filename):
if isinstance(self.sigrok, NetworkSigrokUSBDevice):
subprocess.call([
'scp', f'{self.sigrok.host}:{os.path.join(self._tmpdir, self._basename)}',
os.path.join(self._local_tmpdir, self._filename)
os.path.abspath(self._filename)
],
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
Expand All @@ -192,16 +323,23 @@ def stop(self):
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
else:
shutil.copyfile(
os.path.join(self._tmpdir, self._basename), self._filename
)

def _process_csv(self, csv_filename):
fnames = ['time']
fnames.extend(self.sigrok.channels.split(','))

if isinstance(self.sigrok, NetworkSigrokUSBDevice):
with open(os.path.join(self._local_tmpdir,
csv_filename)) as csv_file:
# skip first 5 lines of the csv output, contains metadata and fieldnames
for _ in range(0, 5):
next(csv_file)
return list(csv.DictReader(csv_file, fieldnames=fnames))
else:
shutil.copyfile(
os.path.join(self._tmpdir, self._basename), self._filename
)
with open(os.path.join(self._tmpdir, csv_filename)) as csv_file:
# skip first 5 lines of the csv output, contains metadata and fieldnames
for _ in range(0, 5):
Expand All @@ -210,9 +348,18 @@ def stop(self):

@Driver.check_active
def analyze(self, args, filename=None):
"""
Analyzes captured data through `sigrok-cli`'s command line interface

Args:
args: args to `sigrok-cli`

Returns:
A dictionary containing the matched groups.
"""
annotation_regex = re.compile(r'(?P<startnum>\d+)-(?P<endnum>\d+) (?P<decoder>[\w\-]+): (?P<annotation>[\w\-]+): (?P<data>".*)') # pylint: disable=line-too-long
if not filename and self._filename:
filename = self._filename
filename = os.path.join(self._tmpdir, self._basename)
else:
filename = os.path.abspath(filename)
check_file(filename, command_prefix=self.sigrok.command_prefix)
Expand Down Expand Up @@ -399,12 +546,15 @@ def stop(self):

Raises:
RuntimeError() if capture has not been started
OSError() if the subprocess returned with non-zero return-code
"""
if not self._running:
raise RuntimeError("no capture started yet")
while not self._timeout.expired:
if self._process.poll() is not None:
# process has finished. no need to wait for the timeout
if self._process.returncode != 0:
raise OSError
break
time.sleep(0.1)
else:
Expand Down
34 changes: 31 additions & 3 deletions tests/test_sigrok.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
pytestmark = pytest.mark.skipif(not which("sigrok-cli"),
reason="sigrok not available")

VENDOR_ID = "0925"
PRODUCT_ID = "3881"

def test_sigrok_resource(target):
r = SigrokUSBDevice(target, name=None, match={"sys_name": "1-12"}, driver='fx2lafw', channels="D0,D1")
Expand All @@ -23,19 +25,45 @@ def test_sigrok_driver(target):


@pytest.mark.sigrokusb
def test_sigrok_usb_driver(target, tmpdir):
r = SigrokUSBDevice(target, name=None, match={"ID_MODEL_ID": "3881", "ID_VENDOR_ID": "0925"}, driver='fx2lafw', channels="D0,D1")
def test_sigrok_usb_driver_capture(target, tmpdir):
r = SigrokUSBDevice(target, name=None, match={"ID_MODEL_ID": PRODUCT_ID, "ID_VENDOR_ID": VENDOR_ID}, driver='fx2lafw', channels="D0,D1")
d = SigrokDriver(target, name=None)
target.activate(d)
record = tmpdir.join("output.sr")
d.capture(record)
sleep(5)
samples = d.stop()
assert os.path.getsize(record) > 0
assert samples != None
assert samples is not None
assert list(samples[0].keys()) == ['time', 'D0', 'D1']
assert list(samples[-1].keys()) == ['time', 'D0', 'D1']

@pytest.mark.sigrokusb
def test_sigrok_usb_driver_blocking_samples(target, tmpdir):
r = SigrokUSBDevice(target, name=None, match={"ID_MODEL_ID": PRODUCT_ID, "ID_VENDOR_ID": VENDOR_ID}, driver='fx2lafw', channels="D0,D1")
d = SigrokDriver(target, name=None)
target.activate(d)
record = tmpdir.join("output.sr")
samples = d.capture_samples(record, 100)
assert os.path.getsize(record) > 0
assert samples is not None
assert len(samples) == 100


@pytest.mark.sigrokusb
def test_sigrok_usb_driver_blocking_time(target, tmpdir):
r = SigrokUSBDevice(target, name=None, match={"ID_MODEL_ID": PRODUCT_ID, "ID_VENDOR_ID": VENDOR_ID}, driver='fx2lafw', channels="D0,D1")
d = SigrokDriver(target, name=None)
target.activate(d)
record = tmpdir.join("output.sr")
samples = d.capture_for_time(record, 101) # sigrok-cli captures 5ms less than specified.
assert os.path.getsize(record) > 0
assert samples is not None
assert list(samples[0].keys()) == ['time', 'D0', 'D1']
assert list(samples[-1].keys()) == ['time', 'D0', 'D1']
time = float(samples[-1]['time']) - float(samples[0]['time'])
assert time >= 100_000

def test_sigrok_power_driver(target):
r = SigrokUSBSerialDevice(target, name=None, driver='manson-hcs-3xxx')
r.avail = True
Expand Down