Skip to content

Commit

Permalink
driver/sigrok: add blocking capture methods,
Browse files Browse the repository at this point in the history
misc improvements to sigrok driver

this adds blocking capture functions where capture time or number of
samples are specified in advance.

Some API documentation was also added to clarify the usage of the sigrok driver.

Additionally, the following fixes were made:

- include time when converting the capture file to CSV explicitly, the `sigrok-cli` package on ubuntu 22.04
    apparently defaults to false
- wrong filename in the sigrok analyze function when the resource is remote
- capture file transferred to wrong (tmp) dir when the user-specified path is relative

Signed-off-by: Felix Zwettler <felix.zwettler@duagon.com>
  • Loading branch information
flxzt committed Apr 2, 2024
1 parent 110532f commit 1f26099
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 14 deletions.
3 changes: 3 additions & 0 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2710,6 +2710,9 @@ Arguments:
The driver can be used in test cases by calling its ``capture()``, ``stop()``
and ``analyze()`` methods.

To capture for a certain predetermined amount of time or number of samples,
methods ``capture_for_time()`` and ``capture_samples()`` can be used.

SigrokPowerDriver
~~~~~~~~~~~~~~~~~
The :any:`SigrokPowerDriver` uses a `SigrokUSBSerialDevice`_ resource to
Expand Down
177 changes: 166 additions & 11 deletions labgrid/driver/sigrokdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,37 @@ def _get_sigrok_prefix(self):
return self.sigrok.command_prefix + prefix

@Driver.check_active
@step(title='call', args=['args'])
@step(title='call_with_driver', args=['args'])
def _call_with_driver(self, *args):
combined = self._get_sigrok_prefix() + list(args)
self.logger.debug("Combined command: %s", " ".join(combined))
self._process = subprocess.Popen(
combined,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE
stderr=subprocess.PIPE,
text=True
)

@Driver.check_active
@step(title='call_with_driver_blocking', args=['args'])
def _call_with_driver_blocking(self, *args, log_output=False):
combined = self._get_sigrok_prefix() + list(args)
self.logger.debug("Combined command: %s", " ".join(combined))
process = subprocess.Popen(
combined,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate()
self.logger.debug("stdout: %s", stdout)
self.logger.debug("stderr: %s", stderr)
if process.returncode != 0:
raise OSError
return stdout, stderr


@Driver.check_active
@step(title='call', args=['args'])
Expand Down Expand Up @@ -134,6 +155,19 @@ class SigrokDriver(SigrokCommon):

@Driver.check_active
def capture(self, filename, samplerate="200k"):
"""
Starts to capture samples, in the background.
Args:
filename: the path to the file where the capture is saved.
samplerate: the sample-rate of the capture
Raises:
RuntimeError() if a capture is already running.
"""
if self._running:
raise RuntimeError("capture is already running")

self._filename = filename
self._basename = os.path.basename(self._filename)
self.logger.debug(
Expand All @@ -146,36 +180,138 @@ def capture(self, filename, samplerate="200k"):
filename = os.path.join(self._tmpdir, self._basename)
cmd.append(filename)
self._call_with_driver(*cmd)

args = self.sigrok.command_prefix + ['test', '-e', filename]

while subprocess.call(args):
sleep(0.1)

self._running = True

@Driver.check_active
def capture_for_time(self, filename, time_ms, samplerate="200k"):
"""
Captures samples for a specified time (ms).
Blocks while capturing.
Args:
filename: the path to the file where the capture is saved.
time: time (in ms) for capture duration
samplerate: the sample-rate of the capture
Returns:
The capture as a list containing dict's with fields:
{
"time": sample time (us),
"channel_name_1": Sample value for channel 1,
"channel_name_2": Sample value for channel 2,
...
}
Raises:
OSError() if the subprocess returned with non-zero return-code
"""
return self._capture_blocking(filename, ["--time", str(time_ms)], samplerate)

@Driver.check_active
def capture_samples(self, filename, samples, samplerate="200k"):
"""
Captures a specified number of samples.
Blocks while capturing.
Args:
filename: the path to the file where the capture is saved.
samples: number of samples to capture
samplerate: the sample-rate of the capture
Returns:
The capture as a list containing dict's with fields:
{
"time": sample time (us),
"channel_name_1": Sample value for channel 1,
"channel_name_2": Sample value for channel 2,
...
}
"""
return self._capture_blocking(filename, ["--samples", str(samples)], samplerate)

@Driver.check_active
def stop(self):
assert self._running
"""
Stops the capture and returns recorded samples.
Note that this method might block for several seconds because it needs
to wait for output, parse the capture file and prepare the list
containing the samples.
Returns:
The capture as a list containing dict's with fields:
{
"time": sample time (us),
"channel_name_1": Sample value for channel 1,
"channel_name_2": Sample value for channel 2,
...
}
Raises:
RuntimeError() if capture has not been started
"""
if not self._running:
raise RuntimeError("no capture started yet")
self._running = False
fnames = ['time']
fnames.extend(self.sigrok.channels.split(','))

csv_filename = f'{os.path.splitext(self._basename)[0]}.csv'

self._process.send_signal(signal.SIGINT)
stdout, stderr = self._process.communicate()
self.logger.debug("stdout: %s", stdout)
self.logger.debug("stderr: %s", stderr)

# Convert from .sr to .csv
self._convert_sr_csv(os.path.join(self._tmpdir, self._basename),
os.path.join(self._tmpdir, csv_filename))
self._transfer_tmp_file(csv_filename)
return self._process_csv(csv_filename)

def _capture_blocking(self, filename, capture_args, samplerate):
self._filename = filename
self._basename = os.path.basename(self._filename)
csv_filename = f'{os.path.splitext(self._basename)[0]}.csv'
self.logger.debug(
"Saving to: %s with basename: %s", self._filename, self._basename
)
cmd = [
"-l", "4", "--config", f"samplerate={samplerate}",
*capture_args, "-o"
]
filename = os.path.join(self._tmpdir, self._basename)
cmd.append(filename)
stdout, stderr = self._call_with_driver_blocking(*cmd, log_output=True)

args = self.sigrok.command_prefix + ['test', '-e', filename]

while subprocess.call(args):
sleep(0.1)

self._convert_sr_csv(os.path.join(self._tmpdir, self._basename),
os.path.join(self._tmpdir, csv_filename))
self._transfer_tmp_file(csv_filename)
return self._process_csv(csv_filename)

def _convert_sr_csv(self, file_path_sr, file_path_scv):
cmd = [
'-i',
os.path.join(self._tmpdir, self._basename), '-O', 'csv:time=true', '-o',
os.path.join(self._tmpdir, csv_filename)
file_path_sr, '-O', 'csv:time=true', '-o',
file_path_scv
]
self._call(*cmd)
stdout, stderr = self._process.communicate()
self.logger.debug("stdout: %s", stdout)
self.logger.debug("stderr: %s", stderr)


def _transfer_tmp_file(self, csv_filename):
if isinstance(self.sigrok, NetworkSigrokUSBDevice):
subprocess.call([
'scp', f'{self.sigrok.host}:{os.path.join(self._tmpdir, self._basename)}',
Expand All @@ -192,16 +328,23 @@ def stop(self):
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
else:
shutil.copyfile(
os.path.join(self._tmpdir, self._basename), self._filename
)

def _process_csv(self, csv_filename):
fnames = ['time']
fnames.extend(self.sigrok.channels.split(','))

if isinstance(self.sigrok, NetworkSigrokUSBDevice):
with open(os.path.join(self._local_tmpdir,
csv_filename)) as csv_file:
# skip first 5 lines of the csv output, contains metadata and fieldnames
for _ in range(0, 5):
next(csv_file)
return list(csv.DictReader(csv_file, fieldnames=fnames))
else:
shutil.copyfile(
os.path.join(self._tmpdir, self._basename), self._filename
)
with open(os.path.join(self._tmpdir, csv_filename)) as csv_file:
# skip first 5 lines of the csv output, contains metadata and fieldnames
for _ in range(0, 5):
Expand All @@ -210,6 +353,15 @@ def stop(self):

@Driver.check_active
def analyze(self, args, filename=None):
"""
Analyzes captured data through `sigrok-cli`'s command line interface
Args:
args: args to `sigrok-cli`
Returns:
A dictionary containing the matched groups.
"""
annotation_regex = re.compile(r'(?P<startnum>\d+)-(?P<endnum>\d+) (?P<decoder>[\w\-]+): (?P<annotation>[\w\-]+): (?P<data>".*)') # pylint: disable=line-too-long
if not filename and self._filename:
filename = os.path.join(self._tmpdir, self._basename)
Expand Down Expand Up @@ -399,12 +551,15 @@ def stop(self):
Raises:
RuntimeError() if capture has not been started
OSError() if the subprocess returned with non-zero return-code
"""
if not self._running:
raise RuntimeError("no capture started yet")
while not self._timeout.expired:
if self._process.poll() is not None:
# process has finished. no need to wait for the timeout
if self._process.returncode != 0:
raise OSError
break
time.sleep(0.1)
else:
Expand Down
34 changes: 31 additions & 3 deletions tests/test_sigrok.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
pytestmark = pytest.mark.skipif(not which("sigrok-cli"),
reason="sigrok not available")

VENDOR_ID = "0925"
PRODUCT_ID = "3881"

def test_sigrok_resource(target):
r = SigrokUSBDevice(target, name=None, match={"sys_name": "1-12"}, driver='fx2lafw', channels="D0,D1")
Expand All @@ -23,19 +25,45 @@ def test_sigrok_driver(target):


@pytest.mark.sigrokusb
def test_sigrok_usb_driver(target, tmpdir):
r = SigrokUSBDevice(target, name=None, match={"ID_MODEL_ID": "3881", "ID_VENDOR_ID": "0925"}, driver='fx2lafw', channels="D0,D1")
def test_sigrok_usb_driver_capture(target, tmpdir):
r = SigrokUSBDevice(target, name=None, match={"ID_MODEL_ID": PRODUCT_ID, "ID_VENDOR_ID": VENDOR_ID}, driver='fx2lafw', channels="D0,D1")
d = SigrokDriver(target, name=None)
target.activate(d)
record = tmpdir.join("output.sr")
d.capture(record)
sleep(5)
samples = d.stop()
assert os.path.getsize(record) > 0
assert samples != None
assert samples is not None
assert list(samples[0].keys()) == ['time', 'D0', 'D1']
assert list(samples[-1].keys()) == ['time', 'D0', 'D1']

@pytest.mark.sigrokusb
def test_sigrok_usb_driver_blocking_samples(target, tmpdir):
r = SigrokUSBDevice(target, name=None, match={"ID_MODEL_ID": PRODUCT_ID, "ID_VENDOR_ID": VENDOR_ID}, driver='fx2lafw', channels="D0,D1")
d = SigrokDriver(target, name=None)
target.activate(d)
record = tmpdir.join("output.sr")
samples = d.capture_samples(record, 100)
assert os.path.getsize(record) > 0
assert samples is not None
assert len(samples) == 100


@pytest.mark.sigrokusb
def test_sigrok_usb_driver_blocking_time(target, tmpdir):
r = SigrokUSBDevice(target, name=None, match={"ID_MODEL_ID": PRODUCT_ID, "ID_VENDOR_ID": VENDOR_ID}, driver='fx2lafw', channels="D0,D1")
d = SigrokDriver(target, name=None)
target.activate(d)
record = tmpdir.join("output.sr")
samples = d.capture_for_time(record, 101) # sigrok-cli captures 5ms less than specified.
assert os.path.getsize(record) > 0
assert samples is not None
assert list(samples[0].keys()) == ['time', 'D0', 'D1']
assert list(samples[-1].keys()) == ['time', 'D0', 'D1']
time = float(samples[-1]['time']) - float(samples[0]['time'])
assert time >= 100_000

def test_sigrok_power_driver(target):
r = SigrokUSBSerialDevice(target, name=None, driver='manson-hcs-3xxx')
r.avail = True
Expand Down

0 comments on commit 1f26099

Please sign in to comment.