Skip to content

Commit

Permalink
Merge pull request #116 from nocarryr/update-test-script
Browse files Browse the repository at this point in the history
Update test script
  • Loading branch information
nocarryr committed Mar 6, 2020
2 parents 2e4b57e + 7ea2b83 commit 1dc9e9e
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 21 deletions.
136 changes: 115 additions & 21 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,140 @@

from __future__ import division
from __future__ import print_function

import sys
import argparse

try:
import numpy as np
HAVE_NP = True
except ImportError:
np = None
HAVE_NP = False

from rtlsdr import *

def main():
def get_mean(samples):
if HAVE_NP:
r = samples.mean()
else:
r = sum(samples)/len(samples)
return r

def main(**opts):
rs = opts.get('rs', 2.4e6)
fc = opts.get('fc', 100e6)
gain = opts.get('gain', 10)
num_reads = opts.get('num_reads', 2)
num_samples = opts.get('num_samples', 256*1024)
nfft = opts.get('nfft', 1024)
plot_enabled = opts.get('plot_enabled', False)

@limit_calls(2)
@limit_calls(num_reads)
def test_callback(samples, rtlsdr_obj):
print(' in callback')
print(' signal mean:', sum(samples)/len(samples))
print(' in callback, index:', rtlsdr_obj._read_index)
print(' signal mean:', get_mean(samples))

all_samples = rtlsdr_obj._all_samples
if HAVE_NP:
all_samples[rtlsdr_obj._read_index,:] = samples
else:
all_samples.extend(samples)
rtlsdr_obj._read_index += 1

sdr = RtlSdr()

print('Configuring SDR...')
sdr.rs = 2.4e6
sdr.fc = 100e6
sdr.gain = 10
sdr.rs = rs
sdr.fc = fc
sdr.gain = gain
print(' sample rate: %0.6f MHz' % (sdr.rs/1e6))
print(' center frequency %0.6f MHz' % (sdr.fc/1e6))
print(' center frequency: %0.6f MHz' % (sdr.fc/1e6))
print(' gain: %d dB' % sdr.gain)

print('Reading samples...')
samples = sdr.read_samples(256*1024)
print(' signal mean:', sum(samples)/len(samples))
samples = sdr.read_samples(num_samples)
print(' signal mean:', get_mean(samples))

print('Testing callback...')
sdr.read_samples_async(test_callback, 256*1024)

try:
import pylab as mpl
if HAVE_NP:
all_samples = np.zeros((num_reads, num_samples), dtype=np.complex128)
else:
all_samples = []

sdr._all_samples = all_samples
sdr._read_index = 0
sdr._num_reads = num_reads

sdr.read_samples_async(test_callback, num_samples)

if HAVE_NP:
all_samples = all_samples.flatten()

print('Testing spectrum plotting...')
mpl.figure()
mpl.psd(samples, NFFT=1024, Fc=sdr.fc/1e6, Fs=sdr.rs/1e6)
print('Total sample count={}, mean={}'.format(len(all_samples), get_mean(all_samples)))

mpl.show()
except:
# matplotlib not installed/working
pass
if plot_enabled:
try:
import pylab as mpl

print('Testing spectrum plotting...')
mpl.figure()
mpl.psd(all_samples, NFFT=nfft, Fc=sdr.fc/1e6, Fs=sdr.rs/1e6)

mpl.show()
except:
# matplotlib not installed/working
pass

print('Done\n')
sdr.close()

class ArgParseFormatter(argparse.ArgumentDefaultsHelpFormatter):
def _get_default_metavar_for_optional(self, action):
return action.type.__name__

def _get_default_metavar_for_positional(self, action):
return action.type.__name__

def parse_args(argv=None):
if argv is None:
argv = sys.argv[1:]

p = argparse.ArgumentParser(formatter_class=ArgParseFormatter)
p.add_argument(
'--rs', dest='rs', type=float, default=2.4e6,
help='Device sample rate',
)
p.add_argument(
'--fc', dest='fc', type=float, default=100e6,
help='Device center frequency',
)
p.add_argument(
'--gain', dest='gain', type=float, default=10.,
help='Device gain (dB)',
)
p.add_argument(
'--num-samples', dest='num_samples', type=int, default=256*1024,
help='Number of samples to read from device per iteration',
)
p.add_argument(
'--num-reads', dest='num_reads', type=int, default=2,
help='Number of times to read samples (total samples: num_reads * num_samples)',
)
p.add_argument(
'--nfft', dest='nfft', type=int, default=1024,
help='Number of FFT bins to use for plotting (if matplotlib is available)',
)
p.add_argument(
'--plot', dest='plot_enabled', action='store_true',
help='Plot the PSD results (if matplotlib is installed)',
)

args = p.parse_args(argv)
opts = vars(args)
return opts

if __name__ == '__main__':
main()
opts = parse_args()
main(**opts)
83 changes: 83 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import sys
import pytest
import itertools

def test_pkg_version():
import subprocess
Expand All @@ -16,6 +19,86 @@ def test(sdr_cls, use_numpy):
generic_test(sdr, use_numpy=use_numpy)
sdr.close()

@pytest.mark.skipif(sys.version_info < (3, 5), reason="requires python3.5 or higher")
def test_example_script(capsys, use_numpy):
from pathlib import Path
import rtlsdr

SAMPLE_RATES = [1.024e6, 2.048e6]
CENTER_FREQS = [100e6, 200e6, 300e6]
GAINS = [10, 20, 30]
NUM_SAMPLES = 4096
NUM_READS = 8

here = Path(__file__).resolve().parent
script_fn = here.parent / 'test.py'
assert script_fn.exists()

mod_dict = {'__builtins__':__builtins__, 'rtlsdr':rtlsdr}
mod = compile(script_fn.read_text(), str(script_fn), mode='exec')
exec(mod, mod_dict)

mod_dict['HAVE_NP'] = use_numpy
mod_dict['HAVE_NP'] = use_numpy
parse_args = mod_dict['parse_args']
main = mod_dict['main']


def iter_output_sections(outstr):
cur_section = None
content = []
for line in outstr.splitlines():
content.append(line)
if line.startswith('Configuring SDR'):
assert cur_section is None
cur_section = 'config'
elif line.startswith('Reading samples'):
assert cur_section == 'config'
yield cur_section, content
content = []
cur_section = 'read_sync'
elif line.startswith('Testing callback'):
assert cur_section == 'read_sync'
yield cur_section, content
content = []
cur_section = 'read_async'
elif line.startswith('Total sample count'):
assert cur_section == 'read_async'
yield cur_section, content
content = []
cur_section = 'totals'
elif line.startswith('Done'):
break

for rs, fc, gain in itertools.product(SAMPLE_RATES, CENTER_FREQS, GAINS):
argv = [
'--rs', '{:.0f}'.format(rs),
'--fc', '{:.0f}'.format(fc),
'--gain', str(gain),
'--num-samples', str(NUM_SAMPLES),
'--num-reads', str(NUM_READS),
]
opts = parse_args(argv)
main(**opts)
captured = capsys.readouterr()
for section, content in iter_output_sections(captured.out):
if section == 'config':
for line in section:
if 'sample rate' in line:
val = float(line.split(':')[1])
assert val == rs
elif 'center frequency' in line:
val = float(line.split(':')[1])
assert val == fc
elif 'gain' in line:
val = float(line.split(':').rstip('dB'))
assert val == gain
elif section == 'totals':
line = content[0]
total_count = int(line.split('sample count=')[1].split(',')[0])
assert total_count == NUM_SAMPLES * NUM_READS


def test_serial_addressing(sdr_cls, use_numpy):
for i, serial in enumerate(sdr_cls.get_device_serial_addresses()):
assert sdr_cls.get_device_index_by_serial(serial) == i
Expand Down

0 comments on commit 1dc9e9e

Please sign in to comment.