Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RtlsdrAio improvements #52

Merged
merged 23 commits into from Sep 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0b1c1d1
Put queue items in with a coroutine and empty them with task_done
nocarryr Sep 24, 2016
c7fc677
Make start/stop methods coroutines
nocarryr Sep 24, 2016
a38016b
Ensure func_start is awaited
nocarryr Sep 24, 2016
6218761
await the stop method
nocarryr Sep 24, 2016
1e0dbf6
Use pyenv to use current python3.5 release
nocarryr Sep 24, 2016
28221ed
Only install and use python3.5.2 in the 3.5 build
nocarryr Sep 24, 2016
89c4df1
Use pyenv shell
nocarryr Sep 24, 2016
8944c4c
Bash syntax fixes
nocarryr Sep 24, 2016
e85e789
Create and activate the virtualenv
nocarryr Sep 24, 2016
ff8bff5
Install missing pyenv-virtualenv
nocarryr Sep 24, 2016
f3d1d12
Remove shell reload
nocarryr Sep 24, 2016
6504200
Set pyenv environment vars
nocarryr Sep 24, 2016
78cbcfb
Try using the full path for pyenv-virtualenv
nocarryr Sep 25, 2016
3678b88
More mucking around with build environment hacks
nocarryr Sep 25, 2016
af4807d
Correct argument order
nocarryr Sep 25, 2016
5c82383
Try to force virtualenv to use the built python interpreter
nocarryr Sep 25, 2016
f07fa5f
Please work
nocarryr Sep 25, 2016
525970a
Correct filename
nocarryr Sep 25, 2016
6fdef60
If this works, I'm going to be very annoyed
nocarryr Sep 25, 2016
d4bc47f
Remove attempted python3.5.2 install script
nocarryr Sep 25, 2016
bc0c940
Add the `await` statement in README async example
nocarryr Sep 26, 2016
aab57f0
Separate the test functions to make things a bit more clear
nocarryr Sep 26, 2016
94207fb
Expand aio test to include read_bytes_async
nocarryr Sep 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -4,7 +4,7 @@ dist: trusty
python:
- "2.7"
- "3.4"
- "3.5"
- "3.5.2"
addons:
apt:
packages:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -78,7 +78,7 @@ async def streaming():
# ...

# to stop streaming:
sdr.stop()
await sdr.stop()

# done
sdr.close()
Expand Down
42 changes: 28 additions & 14 deletions rtlsdr/rtlsdraio.py
Expand Up @@ -32,39 +32,53 @@ def __init__(self, func_start, func_stop=None, queue_size=20, *, loop=None):

self.running = False

def _callback(self, *args):
if self.running and not self.queue.full():
self.loop.call_soon_threadsafe(self.queue.put_nowait, args)
else:
async def add_to_queue(self, *args):
try:
self.queue.put_nowait(args)
except asyncio.QueueFull:
log.info('extra callback data lost')

def start(self):
def _callback(self, *args):
if not self.running:
return
asyncio.run_coroutine_threadsafe(self.add_to_queue(*args), self.loop)

async def start(self):
assert(not self.running)

# start legacy async function
future = self.loop.run_in_executor(None, self.func_start, self._callback)
asyncio.ensure_future(future, loop=self.loop)
self.executor_task = future
self.running = True

def stop(self):
async def stop(self):
assert(self.running)

# send a signal to stop
self.queue.put_nowait((StopAsyncIteration(),))
self.running = False

# send a signal to stop
iter_stopped = False
while not iter_stopped:
try:
self.queue.put_nowait((StopAsyncIteration(),))
iter_stopped = True
except asyncio.QueueFull:
try:
self.queue.task_done()
except ValueError:
pass
if self.func_stop:
# stop legacy async function
future = self.loop.run_in_executor(None, self.func_stop)
asyncio.ensure_future(future, loop=self.loop)

#self.func_stop()
await self.loop.run_in_executor(None, self.func_stop)
await self.executor_task

async def __aiter__(self):
return self

async def __anext__(self):
val = await self.queue.get()
self.queue.task_done()

if isinstance(val[0], StopAsyncIteration):
raise StopAsyncIteration
Expand Down Expand Up @@ -107,10 +121,10 @@ def stream(self, num_samples_or_bytes=DEFAULT_READ_SIZE, format='samples', loop=
self.async_iter = AsyncCallbackIter(func_start=lambda cb: func_start(cb, num_samples_or_bytes),
func_stop=self.cancel_read_async,
loop=loop)
self.async_iter.start()
asyncio.ensure_future(self.async_iter.start(), loop=loop)

return self.async_iter

def stop(self):
''' Stop async stream. '''
self.async_iter.stop()
return asyncio.ensure_future(self.async_iter.stop(), loop=self.async_iter.loop)
43 changes: 28 additions & 15 deletions tests/test_aio.py
@@ -1,10 +1,11 @@
import asyncio
import pytest

def test(rtlsdraio):
async def main():
import math
from utils import generic_test
import math
from utils import generic_test

async def main():
sdr = rtlsdraio.RtlSdrAio()
generic_test(sdr)

Expand All @@ -16,27 +17,39 @@ async def main():
print(' center frequency %0.6f MHz' % (sdr.fc/1e6))
print(' gain: %d dB' % sdr.gain)


print('Streaming samples...')
await process_samples(sdr, 'samples')
await sdr.stop()

print('Streaming bytes...')
await process_samples(sdr, 'bytes')
await sdr.stop()

# make sure our format parameter checks work
with pytest.raises(ValueError):
await process_samples(sdr, 'foo')

print('Done')

sdr.close()


async def process_samples(sdr, fmt):
async def packed_bytes_to_iq(samples):
return sdr.packed_bytes_to_iq(samples)

i = 0
async for samples in sdr.stream():
async for samples in sdr.stream(format=fmt):
if fmt == 'bytes':
samples = await packed_bytes_to_iq(samples)
power = sum(abs(s)**2 for s in samples) / len(samples)
print('Relative power:', 10*math.log10(power), 'dB')

i += 1

if i > 20:
sdr.stop()
break

print('Done')

sdr.close()

async def do_nothing():
for i in range(50):
await asyncio.sleep(0.1)
print('#')

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([main(), do_nothing()]))
loop.run_until_complete(main())