Skip to content

Commit

Permalink
improv: add blocking capture methods, misc improvements to sigrok dri…
Browse files Browse the repository at this point in the history
…ver impl

This adds blocking capture methods where capture time or number of
samples are specified in advance.

Some of the internals are refactored to allow for code re-use
and API documentation is added to clarify the usage of the sigrok driver.

Signed-off-by: Felix Zwettler <felix.zwettler@duagon.com>
  • Loading branch information
flxzt committed May 6, 2024
1 parent cc5e1f0 commit 31ebd5f
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 14 deletions.
3 changes: 3 additions & 0 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2710,6 +2710,9 @@ Arguments:
The driver can be used in test cases by calling its ``capture()``, ``stop()``
and ``analyze()`` methods.

To capture for a certain predetermined amount of time or number of samples,
methods ``capture_for_time()`` and ``capture_samples()`` can be used.

SigrokPowerDriver
~~~~~~~~~~~~~~~~~
The :any:`SigrokPowerDriver` uses a `SigrokUSBSerialDevice`_ resource to
Expand Down
163 changes: 152 additions & 11 deletions labgrid/driver/sigrokdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,38 @@ def _get_sigrok_prefix(self):
return self.sigrok.command_prefix + prefix

@Driver.check_active
@step(title='call', args=['args'])
@step(title='call_with_driver', args=['args'])
def _call_with_driver(self, *args):
combined = self._get_sigrok_prefix() + list(args)
self.logger.debug("Combined command: %s", " ".join(combined))
self._process = subprocess.Popen(
combined,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE
stderr=subprocess.PIPE,
text=True
)

@Driver.check_active
@step(title='call_with_driver_blocking', args=['args'])
def _call_with_driver_blocking(self, *args, log_output=False):
combined = self._get_sigrok_prefix() + list(args)
self.logger.debug("Combined command: %s", " ".join(combined))
process = subprocess.Popen(
combined,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate()
if log_output:
self.logger.debug("stdout: %s", stdout)
self.logger.debug("stderr: %s", stderr)
if process.returncode != 0:
raise OSError
return stdout, stderr


@Driver.check_active
@step(title='call', args=['args'])
Expand Down Expand Up @@ -133,6 +155,19 @@ class SigrokDriver(SigrokCommon):

@Driver.check_active
def capture(self, filename, samplerate="200k"):
"""
Starts to capture samples, in the background.
Args:
filename: the path to the file where the capture is saved.
samplerate: the sample-rate of the capture
Raises:
RuntimeError() if a capture is already running.
"""
if self._running:
raise RuntimeError("capture is already running")

self._filename = filename
self._basename = os.path.basename(self._filename)
self.logger.debug(
Expand All @@ -145,6 +180,7 @@ def capture(self, filename, samplerate="200k"):
filename = os.path.join(self._tmpdir, self._basename)
cmd.append(filename)
self._call_with_driver(*cmd)

args = self.sigrok.command_prefix + ['test', '-e', filename]

while subprocess.call(args):
Expand All @@ -162,29 +198,115 @@ def capture(self, filename, samplerate="200k"):

self._running = True

@Driver.check_active
def capture_for_time(self, filename, time_ms, samplerate="200k"):
"""
Captures samples for a specified time (ms).
Blocks while capturing.
Args:
filename: the path to the file where the capture is saved.
time: time (in ms) for capture duration
samplerate: the sample-rate of the capture
Returns:
The capture as a list containing dict's with fields "Time"
and the channel names
Raises:
OSError() if the subprocess returned with non-zero return-code
"""
return self._capture_blocking(filename, ["--time", str(time_ms)], samplerate)

@Driver.check_active
def capture_samples(self, filename, samples, samplerate="200k"):
"""
Captures a specified number of samples.
Blocks while capturing.
Args:
filename: the path to the file where the capture is saved.
samples: number of samples to capture
samplerate: the sample-rate of the capture
Returns:
The capture as a list containing dict's with fields "Time"
and the channel names
"""
return self._capture_blocking(filename, ["--samples", str(samples)], samplerate)

@Driver.check_active
def stop(self):
assert self._running
"""
Stops the capture and returns recorded samples.
Note that this method might block for several seconds because it needs
to wait for output, parse the capture file and prepare the list
containing the samples.
Returns:
The capture as a list containing dict's with fields "Time"
and the channel names
Raises:
RuntimeError() if capture has not been started
"""
if not self._running:
raise RuntimeError("no capture started yet")
self._running = False
fnames = ['time']
fnames.extend(self.sigrok.channels.split(','))

csv_filename = f'{os.path.splitext(self._basename)[0]}.csv'

# sigrok-cli can be quit through any keypress
stdout, stderr = self._process.communicate(input="q")
self.logger.debug("stdout: %s", stdout)
self.logger.debug("stderr: %s", stderr)

# Convert from .sr to .csv
self._convert_sr_csv(os.path.join(self._tmpdir, self._basename),
os.path.join(self._tmpdir, csv_filename))
self._transfer_tmp_file(csv_filename)
return self._process_csv(csv_filename)

def _capture_blocking(self, filename, capture_args, samplerate):
self._filename = filename
self._basename = os.path.basename(self._filename)
csv_filename = f'{os.path.splitext(self._basename)[0]}.csv'
self.logger.debug(
"Saving to: %s with basename: %s", self._filename, self._basename
)
cmd = [
"-l", "4", "--config", f"samplerate={samplerate}",
*capture_args, "-o"
]
filename = os.path.join(self._tmpdir, self._basename)
cmd.append(filename)
self._call_with_driver_blocking(*cmd, log_output=True)

args = self.sigrok.command_prefix + ['test', '-e', filename]

while subprocess.call(args):
sleep(0.1)

self._convert_sr_csv(os.path.join(self._tmpdir, self._basename),
os.path.join(self._tmpdir, csv_filename))
self._transfer_tmp_file(csv_filename)
return self._process_csv(csv_filename)

def _convert_sr_csv(self, file_path_sr, file_path_scv):
cmd = [
'-i',
os.path.join(self._tmpdir, self._basename), '-O', 'csv:time=true', '-o',
os.path.join(self._tmpdir, csv_filename)
file_path_sr, '-O', 'csv:time=true', '-o',
file_path_scv
]
self._call(*cmd)
stdout, stderr = self._process.communicate()
self.logger.debug("stdout: %s", stdout)
self.logger.debug("stderr: %s", stderr)


def _transfer_tmp_file(self, csv_filename):
if isinstance(self.sigrok, NetworkSigrokUSBDevice):
subprocess.call([
'scp', f'{self.sigrok.host}:{os.path.join(self._tmpdir, self._basename)}',
Expand All @@ -201,16 +323,23 @@ def stop(self):
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
else:
shutil.copyfile(
os.path.join(self._tmpdir, self._basename), self._filename
)

def _process_csv(self, csv_filename):
fnames = ['time']
fnames.extend(self.sigrok.channels.split(','))

if isinstance(self.sigrok, NetworkSigrokUSBDevice):
with open(os.path.join(self._local_tmpdir,
csv_filename)) as csv_file:
# skip first 5 lines of the csv output, contains metadata and fieldnames
for _ in range(0, 5):
next(csv_file)
return list(csv.DictReader(csv_file, fieldnames=fnames))
else:
shutil.copyfile(
os.path.join(self._tmpdir, self._basename), self._filename
)
with open(os.path.join(self._tmpdir, csv_filename)) as csv_file:
# skip first 5 lines of the csv output, contains metadata and fieldnames
for _ in range(0, 5):
Expand All @@ -219,6 +348,15 @@ def stop(self):

@Driver.check_active
def analyze(self, args, filename=None):
"""
Analyzes captured data through `sigrok-cli`'s command line interface
Args:
args: args to `sigrok-cli`
Returns:
A dictionary containing the matched groups.
"""
annotation_regex = re.compile(r'(?P<startnum>\d+)-(?P<endnum>\d+) (?P<decoder>[\w\-]+): (?P<annotation>[\w\-]+): (?P<data>".*)') # pylint: disable=line-too-long
if not filename and self._filename:
filename = os.path.join(self._tmpdir, self._basename)
Expand Down Expand Up @@ -408,12 +546,15 @@ def stop(self):
Raises:
RuntimeError() if capture has not been started
OSError() if the subprocess returned with non-zero return-code
"""
if not self._running:
raise RuntimeError("no capture started yet")
while not self._timeout.expired:
if self._process.poll() is not None:
# process has finished. no need to wait for the timeout
if self._process.returncode != 0:
raise OSError
break
time.sleep(0.1)
else:
Expand Down
34 changes: 31 additions & 3 deletions tests/test_sigrok.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
pytestmark = pytest.mark.skipif(not which("sigrok-cli"),
reason="sigrok not available")

VENDOR_ID = "0925"
PRODUCT_ID = "3881"

def test_sigrok_resource(target):
r = SigrokUSBDevice(target, name=None, match={"sys_name": "1-12"}, driver='fx2lafw', channels="D0,D1")
Expand All @@ -23,19 +25,45 @@ def test_sigrok_driver(target):


@pytest.mark.sigrokusb
def test_sigrok_usb_driver(target, tmpdir):
r = SigrokUSBDevice(target, name=None, match={"ID_MODEL_ID": "3881", "ID_VENDOR_ID": "0925"}, driver='fx2lafw', channels="D0,D1")
def test_sigrok_usb_driver_capture(target, tmpdir):
r = SigrokUSBDevice(target, name=None, match={"ID_MODEL_ID": PRODUCT_ID, "ID_VENDOR_ID": VENDOR_ID}, driver='fx2lafw', channels="D0,D1")
d = SigrokDriver(target, name=None)
target.activate(d)
record = tmpdir.join("output.sr")
d.capture(record)
sleep(5)
samples = d.stop()
assert os.path.getsize(record) > 0
assert samples != None
assert samples is not None
assert list(samples[0].keys()) == ['time', 'D0', 'D1']
assert list(samples[-1].keys()) == ['time', 'D0', 'D1']

@pytest.mark.sigrokusb
def test_sigrok_usb_driver_blocking_samples(target, tmpdir):
r = SigrokUSBDevice(target, name=None, match={"ID_MODEL_ID": PRODUCT_ID, "ID_VENDOR_ID": VENDOR_ID}, driver='fx2lafw', channels="D0,D1")
d = SigrokDriver(target, name=None)
target.activate(d)
record = tmpdir.join("output.sr")
samples = d.capture_samples(record, 100)
assert os.path.getsize(record) > 0
assert samples is not None
assert len(samples) == 100


@pytest.mark.sigrokusb
def test_sigrok_usb_driver_blocking_time(target, tmpdir):
r = SigrokUSBDevice(target, name=None, match={"ID_MODEL_ID": PRODUCT_ID, "ID_VENDOR_ID": VENDOR_ID}, driver='fx2lafw', channels="D0,D1")
d = SigrokDriver(target, name=None)
target.activate(d)
record = tmpdir.join("output.sr")
samples = d.capture_for_time(record, 101) # sigrok-cli captures 5ms less than specified.
assert os.path.getsize(record) > 0
assert samples is not None
assert list(samples[0].keys()) == ['time', 'D0', 'D1']
assert list(samples[-1].keys()) == ['time', 'D0', 'D1']
time = float(samples[-1]['time']) - float(samples[0]['time'])
assert time >= 100_000

def test_sigrok_power_driver(target):
r = SigrokUSBSerialDevice(target, name=None, driver='manson-hcs-3xxx')
r.avail = True
Expand Down

0 comments on commit 31ebd5f

Please sign in to comment.