diff --git a/.gitignore b/.gitignore index 5a54e80..1ec0199 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ .cache # setuptools related +dist/* build/* .eggs/* SigMF.egg-info/* diff --git a/README.md b/README.md index 54cd4f2..0d2e62f 100644 --- a/README.md +++ b/README.md @@ -2,16 +2,15 @@

-This repository contains a python module for interacting with SigMF Objects. +This repository contains a python module for interacting with SigMF Objects. This module works with Python 3.6 and higher. This module is distributed freely under the terms GNU Lesser GPL v3 License. # Signal Metadata Format (SigMF) -The [SigMF specification document](sigmf-spec.md), `sigmf-spec.md` is located in the +The [SigMF specification document](https://github.com/sigmf/SigMF/blob/HEAD/sigmf-spec.md), `sigmf-spec.md` is located in the https://github.com/gnuradio/SigMF repository. - # Installation To install the latest released version of the SigMF package, install it from pip: @@ -203,7 +202,7 @@ In [4]: arc.ndim Out[4]: 1 In [5]: arc[:10] -Out[5]: +Out[5]: array([-20.+11.j, -21. -6.j, -17.-20.j, -13.-52.j, 0.-75.j, 22.-58.j, 48.-44.j, 49.-60.j, 31.-56.j, 23.-47.j], dtype=complex64) ``` @@ -238,7 +237,7 @@ In [2]: sigmf_bytes = io.BytesIO(open('/src/LTE.sigmf', 'rb').read()) In [3]: arc = sigmf.SigMFArchiveReader(archive_buffer=sigmf_bytes) In [4]: arc[:10] -Out[4]: +Out[4]: array([-20.+11.j, -21. -6.j, -17.-20.j, -13.-52.j, 0.-75.j, 22.-58.j, 48.-44.j, 49.-60.j, 31.-56.j, 23.-47.j], dtype=complex64) ``` @@ -256,4 +255,4 @@ useful to anyone and everyone, regardless of tool or workflow. *No*, similar to the response, above, the goal is to create something that is generally applicable to _signal processing_, regardless of whether or not the -application is communications related. \ No newline at end of file +application is communications related. diff --git a/setup.py b/setup.py index c9d7712..1d43b36 100755 --- a/setup.py +++ b/setup.py @@ -3,50 +3,45 @@ import os import re -shortdesc = 'Signal Metadata Format Specification' -longdesc = ''' -The Signal Metadata Format (SigMF) specifies a way to describe -sets of recorded digital signal samples with metadata written in JSON. -SigMF can be used to describe general information about a collection -of samples, the characteristics of the system that generated the -samples, and features of the signal itself. -''' +short_description = "Python module for interacting with SigMF recordings." -with open(os.path.join('sigmf', '__init__.py')) as handle: +with open("README.md", encoding="utf-8") as handle: + long_description = handle.read() + +with open(os.path.join("sigmf", "__init__.py"), encoding="utf-8") as handle: version = re.search(r'__version__\s*=\s*[\'"]([^\'"]*)[\'"]', handle.read()).group(1) setup( - name='SigMF', + name="SigMF", version=version, - description=shortdesc, - long_description=longdesc, - url='https://github.com/sigmf/sigmf-python', - + description=short_description, + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/sigmf/sigmf-python", + license="GNU Lesser General Public License v3 or later (LGPLv3+)", classifiers=[ - 'License :: OSI Approved :: GNU Lesser General Public License v3 or later (LGPLv3+)', - 'Operating System :: OS Independent', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: 3.9', - 'Programming Language :: Python :: 3.10', + "License :: OSI Approved :: GNU Lesser General Public License v3 or later (LGPLv3+)", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", ], entry_points={ - 'console_scripts': [ - 'sigmf_validate = sigmf.validate:main', - 'sigmf_gui = sigmf.gui:main [gui]', + "console_scripts": [ + "sigmf_validate = sigmf.validate:main", + "sigmf_gui = sigmf.gui:main [gui]", ] }, - packages=['sigmf'], + packages=["sigmf"], package_data={ - 'sigmf': ['*.json'], - }, - install_requires=['numpy', 'jsonschema'], - extras_require={ - 'gui': 'pysimplegui==4.0.0' + "sigmf": ["*.json"], }, - setup_requires=['pytest-runner'], - tests_require=['pytest>3'], - zip_safe=False + install_requires=["numpy", "jsonschema"], + extras_require={"gui": "pysimplegui==4.0.0"}, + setup_requires=["pytest-runner"], + tests_require=["pytest>3", "hypothesis"], + zip_safe=False, ) diff --git a/sigmf/__init__.py b/sigmf/__init__.py index 42e2136..0641226 100644 --- a/sigmf/__init__.py +++ b/sigmf/__init__.py @@ -4,7 +4,7 @@ # # SPDX-License-Identifier: LGPL-3.0-or-later -__version__ = '1.1.2' +__version__ = "1.1.3" from .archive import SigMFArchive from .sigmffile import SigMFFile, SigMFCollection diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index 423ab49..bffd41f 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -204,10 +204,10 @@ def __getitem__(self, sli): if self._return_type is not None: # is_fixed_point and is_complex if self._memmap.ndim == 2: - # num_channels==1 + # num_channels == 1 a = a[:,0].astype(self._return_type) + 1.j * a[:,1].astype(self._return_type) elif self._memmap.ndim == 3: - # num_channels>1 + # num_channels > 1 a = a[:,:,0].astype(self._return_type) + 1.j * a[:,:,1].astype(self._return_type) else: raise ValueError("unhandled ndim in SigMFFile.__getitem__(); this shouldn't happen") diff --git a/tests/conftest.py b/tests/conftest.py index 9a8aa64..d25ad06 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -29,6 +29,7 @@ @pytest.fixture def test_data_file(): + """when called, yields temporary file""" with tempfile.NamedTemporaryFile() as temp: TEST_FLOAT32_DATA.tofile(temp.name) yield temp @@ -36,6 +37,7 @@ def test_data_file(): @pytest.fixture def test_sigmffile(test_data_file): + """If pytest uses this signature, will return valid SigMF file.""" sigf = SigMFFile() sigf.set_global_field("core:datatype", "rf32_le") sigf.add_annotation(start_index=0, length=len(TEST_FLOAT32_DATA)) diff --git a/tests/test_archivereader.py b/tests/test_archivereader.py index 2b5b449..9176e32 100644 --- a/tests/test_archivereader.py +++ b/tests/test_archivereader.py @@ -1,54 +1,75 @@ -import codecs -import json -import tarfile +# Copyright 2023 GNU Radio Foundation import tempfile -from os import path - import numpy as np -import pytest +import unittest -from sigmf import error +import sigmf from sigmf import SigMFFile, SigMFArchiveReader -from sigmf.archive import SIGMF_DATASET_EXT, SIGMF_METADATA_EXT - -def test_access_data_without_untar(test_sigmffile): - global_info = { - "core:author": "Glen M", - "core:datatype": "ri16_le", - "core:license": "https://creativecommons.org/licenses/by-sa/4.0/", - "core:num_channels": 2, - "core:sample_rate": 48000, - "core:version": "1.0.0" - } - capture_info = { - "core:datetime": "2021-06-18T23:17:51.163959Z", - "core:sample_start": 0 + + +class TestArchiveReader(unittest.TestCase): + def setUp(self): + # in order to check shapes we need some positive number of samples to work with + # number of samples should be lowest common factor of num_channels + self.raw_count = 16 + self.lut = { + "i8": np.int8, + "u8": np.uint8, + "i16": np.int16, + "u16": np.uint16, + "u32": np.uint32, + "i32": np.int32, + "f32": np.float32, + "f64": np.float64, } - - NUM_ROWS = 5 - - for dt in "ri16_le", "ci16_le", "rf32_le", "rf64_le", "cf32_le", "cf64_le": - global_info["core:datatype"] = dt - for num_chan in 1,3: - global_info["core:num_channels"] = num_chan - base_filename = dt + '_' + str(num_chan) - archive_filename = base_filename + '.sigmf' - - a = np.arange(NUM_ROWS * num_chan * (2 if 'c' in dt else 1)) - if 'i16' in dt: - b = a.astype(np.int16) - elif 'f32' in dt: - b = a.astype(np.float32) - elif 'f64' in dt: - b = a.astype(np.float64) - else: - raise ValueError('whoops') - - test_sigmffile.data_file = None - with tempfile.NamedTemporaryFile() as temp: - b.tofile(temp.name) - meta = SigMFFile(data_file=temp.name, global_info=global_info) - meta.add_capture(0, metadata=capture_info) - meta.tofile(archive_filename, toarchive=True) - - archi = SigMFArchiveReader(archive_filename, skip_checksum=True) + + def test_access_data_without_untar(self): + """iterate through datatypes and verify IO is correct""" + _, temp_path = tempfile.mkstemp() + _, temp_archive = tempfile.mkstemp(suffix=".sigmf") + + for key, dtype in self.lut.items(): + # for each type of storage + temp_samples = np.arange(self.raw_count, dtype=dtype) + temp_samples.tofile(temp_path) + for num_channels in [1, 4, 8]: + # for single or 8 channel + for complex_prefix in ["r", "c"]: + # for real or complex + target_count = self.raw_count + temp_meta = SigMFFile( + data_file=temp_path, + global_info={ + SigMFFile.DATATYPE_KEY: f"{complex_prefix}{key}_le", + SigMFFile.NUM_CHANNELS_KEY: num_channels, + SigMFFile.VERSION_KEY: sigmf.__version__, + }, + ) + temp_meta.tofile(temp_archive, toarchive=True) + + readback = SigMFArchiveReader(temp_archive) + readback_samples = readback[:] + + if complex_prefix == "c": + # complex data will be half as long + target_count //= 2 + self.assertTrue(np.all(np.iscomplex(readback_samples))) + if num_channels != 1: + # check expected # of channels + self.assertEqual( + readback_samples.ndim, + 2, + "Mismatch in shape of readback samples.", + ) + target_count //= num_channels + + self.assertEqual( + target_count, + temp_meta._count_samples(), + "Mismatch in expected metadata length.", + ) + self.assertEqual( + target_count, + len(readback), + "Mismatch in expected readback length", + ) diff --git a/tests/test_sigmffile.py b/tests/test_sigmffile.py index e371964..929106e 100644 --- a/tests/test_sigmffile.py +++ b/tests/test_sigmffile.py @@ -101,64 +101,68 @@ def test_add_multiple_captures_and_annotations(): simulate_capture(sigf, idx, 1024) -def test_multichannel_types(): - '''check that real & complex for all types is reading multiple channels correctly''' - lut = { - 'i8': np.int8, - 'u8': np.uint8, - 'i16': np.int16, - 'u16': np.uint16, - 'u32': np.uint32, - 'i32': np.int32, - 'f32': np.float32, - 'f64': np.float64, - } - raw_count = 16 - _, temp_path = tempfile.mkstemp() - for key, dtype in lut.items(): - # for each type of storage - np.arange(raw_count, dtype=dtype).tofile(temp_path) - for num_channels in [1, 8]: - # for single or 8 channel - for complex_prefix in ['r', 'c']: - # for real or complex - check_count = raw_count * 1 # deepcopy - temp_signal = SigMFFile( - data_file=temp_path, - global_info={ - SigMFFile.DATATYPE_KEY: f'{complex_prefix}{key}_le', - SigMFFile.NUM_CHANNELS_KEY: num_channels, - }, - ) - temp_samples = temp_signal.read_samples() - - if complex_prefix == 'c': - # complex data will be half as long - check_count //= 2 - assert np.all(np.iscomplex(temp_samples)) - if num_channels != 1: - assert temp_samples.ndim == 2 - check_count //= num_channels - - assert check_count == temp_signal._count_samples() - - -def test_multichannel_seek(): - '''assure that seeking is working correctly with multichannel files''' - _, temp_path = tempfile.mkstemp() - # write some dummy data and read back - np.arange(18, dtype=np.uint16).tofile(temp_path) - temp_signal = SigMFFile( - data_file=temp_path, - global_info={ - SigMFFile.DATATYPE_KEY: 'cu16_le', - SigMFFile.NUM_CHANNELS_KEY: 3, - }, - ) - # read after the first sample - temp_samples = temp_signal.read_samples(start_index=1, autoscale=False) - # assure samples are in the order we expect - assert np.all(temp_samples[:, 0] == np.array([6+7j, 12+13j])) +class TestMultichannel(unittest.TestCase): + def setUp(self): + # in order to check shapes we need some positive number of samples to work with + # number of samples should be lowest common factor of num_channels + self.raw_count = 16 + self.lut = { + "i8": np.int8, + "u8": np.uint8, + "i16": np.int16, + "u16": np.uint16, + "u32": np.uint32, + "i32": np.int32, + "f32": np.float32, + "f64": np.float64, + } + + def test_multichannel_types(self): + """check that real & complex for all types is reading multiple channels correctly""" + _, temp_path = tempfile.mkstemp() + for key, dtype in self.lut.items(): + # for each type of storage + np.arange(self.raw_count, dtype=dtype).tofile(temp_path) + for num_channels in [1, 4, 8]: + # for single or 8 channel + for complex_prefix in ["r", "c"]: + # for real or complex + check_count = self.raw_count + temp_signal = SigMFFile( + data_file=temp_path, + global_info={ + SigMFFile.DATATYPE_KEY: f"{complex_prefix}{key}_le", + SigMFFile.NUM_CHANNELS_KEY: num_channels, + }, + ) + temp_samples = temp_signal.read_samples() + + if complex_prefix == "c": + # complex data will be half as long + check_count //= 2 + self.assertTrue(np.all(np.iscomplex(temp_samples))) + if num_channels != 1: + self.assertEqual(temp_samples.ndim, 2) + check_count //= num_channels + + self.assertEqual(check_count, temp_signal._count_samples()) + + def test_multichannel_seek(self): + """assure that seeking is working correctly with multichannel files""" + _, temp_path = tempfile.mkstemp() + # write some dummy data and read back + np.arange(18, dtype=np.uint16).tofile(temp_path) + temp_signal = SigMFFile( + data_file=temp_path, + global_info={ + SigMFFile.DATATYPE_KEY: "cu16_le", + SigMFFile.NUM_CHANNELS_KEY: 3, + }, + ) + # read after the first sample + temp_samples = temp_signal.read_samples(start_index=1, autoscale=False) + # assure samples are in the order we expect + self.assertTrue(np.all(temp_samples[:, 0] == np.array([6 + 7j, 12 + 13j]))) def test_key_validity():