Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nurseries cannot propagate StopAsyncIteration exceptions #393

Closed
matham opened this issue Jan 5, 2018 · 4 comments · Fixed by #612
Closed

Nurseries cannot propagate StopAsyncIteration exceptions #393

matham opened this issue Jan 5, 2018 · 4 comments · Fixed by #612

Comments

@matham
Copy link
Contributor

matham commented Jan 5, 2018

I'm trying to implement an async version of zip using the following code, but I'm getting an error when one of the iterators passed to async_zip raises a StopAsyncIteration before the others are finished. Normally I'd expect that I can catch it using the multierror or a try/except, however, instead I'm getting a RuntimeError.

I don't understand why this is happening and it's not clear if this is a bug in my code or in trio.

import trio

class async_zip(object):

    def __init__(self, *largs):
        self.nexts = [obj.__anext__ for obj in largs]

    async def _accumulate(self, f, items, i):
        items[i] = await f()

    def __aiter__(self):
        return self

    async def __anext__(self):
        nexts = self.nexts
        items = [None, ] * len(nexts)
        got_stop = False

        def handle(exc):
            nonlocal got_stop
            if isinstance(exc, StopAsyncIteration):
                got_stop = True
                return None
            else:
                return exc

        with trio.MultiError.catch(handle):
            async with trio.open_nursery() as nursery:
                for i, f in enumerate(nexts):
                    nursery.start_soon(self._accumulate, f, items, i)

        if got_stop:
            raise StopAsyncIteration
        return items

# an async iterable
class it(object):
    def __init__(self, count):
        self.count = count
        self.val = 0
    def __aiter__(self):
        return self
    async def __anext__(self):
        await trio.sleep(1)
        val = self.val
        if val >= self.count:
            raise StopAsyncIteration
        self.val += 1 
        return val

# test the iterable
async def runner():
    async for val in it(4):
        print('got', val)

# let all iterables finish together
async def zipper():
    async for vals in async_zip(it(4), it(4), it(4), it(4)):
        print('got', vals)

# have one iterable finish early
async def zipper_short():
    async for vals in async_zip(it(4), it(4), it(4), it(2)):
        print('got', vals)

# run them
trio.run(runner)
trio.run(zipper)
trio.run(zipper_short)

This prints

got 0
got 1
got 2
got 3
got [0, 0, 0, 0]
got [1, 1, 1, 1]
got [2, 2, 2, 2]
got [3, 3, 3, 3]
got [0, 0, 0, 0]
got [1, 1, 1, 1]
Traceback (most recent call last):
  File "E:\Python\Python35-x64\lib\site-packages\async_generator\impl.py", line 199, in _invoke
    result = fn(*args)
  File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 320, in open_nursery
    await nursery._clean_up(pending_exc)
  File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 519, in _clean_up
    raise mexc
  File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 1502, in run_impl
    msg = task.coro.send(next_send)
  File "<ipython-input-2-e43ada780e94>", line 9, in _accumulate
    items[i] = await f()
  File "<ipython-input-2-e43ada780e94>", line 46, in __anext__
    raise StopAsyncIteration
StopAsyncIteration
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "E:\Python\Python35-x64\lib\site-packages\IPython\core\interactiveshell.py", line 2881, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-2-e43ada780e94>", line 64, in <module>
    trio.run(zipper_short)
  File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 1397, in run
    return result.unwrap()
  File "E:\Python\Python35-x64\lib\contextlib.py", line 77, in __exit__
    self.gen.throw(type, value, traceback)
  File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_ki.py", line 194, in ki_manager
    yield
  File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 1397, in run
    return result.unwrap()
  File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_result.py", line 119, in unwrap
    raise self.error
  File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 1502, in run_impl
    msg = task.coro.send(next_send)
  File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 1085, in init
    return system_nursery._reap_and_unwrap(task)
  File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 447, in _reap_and_unwrap
    return task._result.unwrap()
  File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_result.py", line 119, in unwrap
    raise self.error
  File "E:\Python\Python35-x64\lib\site-packages\trio\_core\_run.py", line 1502, in run_impl
    msg = task.coro.send(next_send)
  File "<ipython-input-2-e43ada780e94>", line 59, in zipper_short
    async for vals in async_zip(it(4), it(4), it(4), it(2)):
  File "<ipython-input-2-e43ada780e94>", line 30, in __anext__
    nursery.start_soon(self._accumulate, f, items, i)
  File "E:\Python\Python35-x64\lib\site-packages\trio\_util.py", line 109, in __aexit__
    await self._agen.asend(None)
  File "E:\Python\Python35-x64\lib\site-packages\async_generator\impl.py", line 261, in asend
    return await self._do_it(self._it.send, value)
  File "E:\Python\Python35-x64\lib\site-packages\async_generator\impl.py", line 277, in _do_it
    return await ANextIter(self._it, start_fn, *args)
  File "E:\Python\Python35-x64\lib\site-packages\async_generator\impl.py", line 192, in send
    return self._invoke(self._it.send, value)
  File "E:\Python\Python35-x64\lib\site-packages\async_generator\impl.py", line 211, in _invoke
    ) from e
RuntimeError: async_generator raise StopAsyncIteration
@njsmith
Copy link
Member

njsmith commented Jan 5, 2018

Oh wow, this is a tricky bug you found. At first I thought it was #229, but it's actually different. (BTW, that concurrent zip is really cool! Even if it does seem to be uniquely suited to tickling this very weird bug :-).) Anyway, here's what's happening:

A popular way to implement context managers is by writing a generator, and then decorating it with contextlib.contextmanager. This same pattern can be used to write async context managers, by writing an async generator and then using an appropriately-implemented async context manager decorator. Trio has an internal implementation of this decorator (trio._util.acontextmanager), and that's how trio.open_nursery is implemented. So open_nursery looks roughly like:

@acontextmanager
async def open_nursery():
    # ... do setup stuff
    try:
        yield nursery
    except BaseException as exc:
        nursery._record_exception(exc)
    await nursery._wait_for_all_tasks_to_exit()
    raise nursery._combine_exceptions()

Here, exactly one of the background tasks raised StopAsyncIteration, so the last line ends up being raise that_stop_async_iteration_object.

However, there's a problem: you're not allowed to raise StopAsyncIteration inside an async generator! According to PEP 479, if you try, then it gets replaced by a RuntimeError. This is actually a good thing, because the way generators work, a regular return is converted into a StopAsyncIteration (just like falling off the end of a generator function causes StopIteration to be raised), so if it wasn't converted into a RuntimeError then it would have ended up getting swallowed by the decorator and mis-interpreted as a bare return, which would have been even more confusing. But... either way, this isn't going to work.

Our @acontextmanager decorator has some special code to detect this problem when the exception was actually raised inside the body of the async with block, and fix things up again – which means that code like this works the way you'd expect:

async with trio.open_nursery() as nursery:
    raise StopAsyncIteration

But the only reason this works is because the @acontextmanager catches the StopAsyncIteration first before throwing it into the open_nursery code, so it can recognize the special RuntimeError when it comes back out. In your case, the StopAsyncIteration is being raised by a background task and gets routed into the nursery via the nursery machinery, so @acontextmanager doesn't have any way to recognize it, and we can't use this special trick.

So this really depends on a bunch of arcane details of how async context managers, async generators, and the nursery implementation all interact. Neat!

But, uh... what do we do about it?

I think the only option is to give up on using @acontextmanager to implement open_nursery, and switch to implementing the async context manager protocol directly. This is a bit tricky, but I've actually done most of the work already (see also #340). I guess this will be the thing that pushes us over into actually switching to that! (And this will also fix #229, so that's nice.)

While you're waiting, though, here's a workaround: the problem is specifically with having a function called by nursery.start_soon that raises StopAsyncIteration. If you handle the exception inside the function, instead of letting it escape, then that should work. So something like (untested):

class async_zip(object):

    def __init__(self, *largs):
        # I added the __aiter__() call here; it's nothing to do with this bug, but you should have it :-)
        self.nexts = [obj.__aiter__().__anext__ for obj in largs]
        # We put the "are we done?" state on the object so _accumulate can see it
        self.done = False

    async def _accumulate(self, f, items, i):
        try:
            items[i] = await f()
        except StopAsyncIteration:
            self.done = True

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.done:
            # catch nasty users who try to re-iterate us after we said we were finished
            # (another advantage of having it on the object)
            raise StopAsyncIteration

        nexts = self.nexts
        items = [None, ] * len(nexts)

        async with trio.open_nursery() as nursery:
            for i, f in enumerate(nexts):
                nursery.start_soon(self._accumulate, f, items, i)

        if self.done:
            raise StopAsyncIteration
        # I also added the call to tuple() here because that's how zip() is supposed to work :-)
        return tuple(items)

Alternatively you could implement it as an async generator yourself (this is using the native async generator syntax, so requires Python 3.6+). Maybe this is clearer? I dunno:

async def async_zip(*aiterables):
    aiterators = [aiterable.__aiter__() for aiterable in aiterables]
    done = False
    while True:
        items = [None] * len(aiterators)

        async def fill_in(i):
            try:
                items[i] = await aiterators[i].__anext__()
            except StopAsyncIteration:
                nonlocal done
                done = True

        async with trio.open_nursery() as nursery:
            for i in range(len(aiterators):
                nursery.start_soon(fill_in, i)

        if done:
            break
        yield tuple(items)

@njsmith njsmith changed the title Issue with async iterators Nurseries cannot propagate StopAsyncIteration exceptions Jan 5, 2018
@imrn
Copy link

imrn commented Jan 5, 2018 via email

@njsmith
Copy link
Member

njsmith commented Jan 5, 2018 via email

@matham
Copy link
Contributor Author

matham commented Jan 5, 2018

Ahh, thanks for explaining that (and little code fixes). Your first work-around will work nicely (I'm still on 3.5.2).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants