Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better way to wait for all tasks from a stream to complete? #15

Closed
Joker-vD opened this issue May 15, 2018 · 2 comments
Closed

Better way to wait for all tasks from a stream to complete? #15

Joker-vD opened this issue May 15, 2018 · 2 comments
Labels

Comments

@Joker-vD
Copy link

Joker-vD commented May 15, 2018

import asyncio
import aiostream
import random

async def hack_to_wait(task):
    await task
    yield None

async def wait_for_tasks(tasks)
    async with aiostream.stream.flatmap(aiostream.stream.iterate(tasks), hack_to_wait).stream() as streamer:
        async for _ in streamer:
            pass

# A silly example
async def produce_tasks():
    for _ in range(5):
        await asyncio.sleep(random.randint(1, 10))
        yield asyncio.sleep(random.randint(1, 10)) 

loop = asyncio.get_event_loop()
loop.run_until_complete(wait_for_tasks(produce_tasks()))
loop.close()

Playing with delays and debugging prints shows that this code does what is intended: it waits for tasks already obtained from stream_of_tasks to complete and for more tasks to be generated by stream_of_tasks at the same time. Basically, it's asyncio.wait() that supports async collections — or rather, some ugly hack. It doesn't play well with exceptions and can only wait for the completion of all tasks at once. Still, it's somewhat useful for tasks that has to be completed, but don't return a result and have no reasonable place to be awaited for.

Is there a better way to do this with aiostream, or maybe the problem it tries to solve (awaiting all tasks from an awaitable collection) can be evaded entirely?

@vxgmichel
Copy link
Owner

Hi @Joker-vD, thanks for creating an issue!

I can think of three different ways to implement the feature you're looking for:

import random
import asyncio
from aiostream import stream, pipe


async def do_something(x):
    print(f'Doing something for {x:.2f} s...')
    await asyncio.sleep(x)
    print(f'Done something for {x:.2f} s...')


async def produce_coros():
    for _ in range(5):
        await asyncio.sleep(random.random())
        yield do_something(random.random())


async def _await(arg):
    return await arg


async def main():
    # Version 1
    await stream.combine.amap(produce_coros(), lambda x: x)
    print()
    # Version 2
    await stream.map(produce_coros(), _await)
    print()
    # Version 3
    await stream.flatmap(produce_coros(), stream.just)
    print()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

The last one is probably the simplest, although flatmap is meant to be an advanced operator. Maybe the library is missing an extra helper, but I can't really figure it out. Maybe something like asyncio.as_completed:

from aiostream import stream, pipe, operator

@operator(pipable=True)
def as_completed(source, task_limit=None):
    return stream.flatmap.raw(source, stream.just.raw, task_limit=task_limit)

async def main():
    results = await (as_completed(produce_coros()) | pipe.list())

@vxgmichel
Copy link
Owner

Now that async_ and await_ have been added to aiostream namespace (see PR #21), the simplest solution is:

from aiostream import stream, await_

async def main():
    await stream.map(produce_coros(), await_)

PR #21 will become available with version 0.3.1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants