Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unhandled BrokenPipeError in asyncio.streams #104340

Closed
TheTechromancer opened this issue May 9, 2023 · 7 comments
Closed

Unhandled BrokenPipeError in asyncio.streams #104340

TheTechromancer opened this issue May 9, 2023 · 7 comments
Labels
topic-asyncio type-bug An unexpected behavior, bug, or error

Comments

@TheTechromancer
Copy link

TheTechromancer commented May 9, 2023

Bug report

Kind of a weird one here, been running into it a for a while but just recently figured out how to reproduce it reliably.

Basically, if an async process is killed while a large amount of data remains to be written to its stdin, it fails to throw a ConnectionResetError and instead experiences a BrokenPipeError inside the _drain_helper() method. Because the exception happens inside an internal task, it evades handling by the user.

Traceback (most recent call last):
  File "/brokenpipeerror_bug.py", line 28, in main
    await proc.stdin.drain()
  File "/usr/lib/python3.10/asyncio/streams.py", line 371, in drain
    await self._protocol._drain_helper()
  File "/usr/lib/python3.10/asyncio/streams.py", line 173, in _drain_helper
    await waiter
BrokenPipeError

Minimal reproducible example:

import asyncio
import traceback

async def main():
    proc = await asyncio.create_subprocess_exec("sleep", "999", stdin=asyncio.subprocess.PIPE)
    try:
        for _ in range(10000): # NOTE: only triggers if this is a high number
            i = b"www.blacklanternsecurity.com\n"
            proc.stdin.write(i)
        proc.kill()
        await proc.stdin.drain() # This triggers error
    except BrokenPipeError:
        print(f"Handled error: {traceback.format_exc()}")

asyncio.run(main())

$ python brokenpipeerror_bug.py 
Handled error: Traceback (most recent call last):
  File "/brokenpipeerror_bug.py", line 28, in main
    await proc.stdin.drain()
  File "/usr/lib/python3.10/asyncio/streams.py", line 371, in drain
    await self._protocol._drain_helper()
  File "/usr/lib/python3.10/asyncio/streams.py", line 173, in _drain_helper
    await waiter
BrokenPipeError

Future exception was never retrieved
future: <Future finished exception=BrokenPipeError()>
Traceback (most recent call last):
  File "/brokenpipeerror_bug.py", line 28, in main
    await proc.stdin.drain()
  File "/usr/lib/python3.10/asyncio/streams.py", line 371, in drain
    await self._protocol._drain_helper()
  File "/usr/lib/python3.10/asyncio/streams.py", line 173, in _drain_helper
    await waiter
BrokenPipeError

Tested on CPython 3.10.10 on Arch Linux, x86_64

Linked PRs

@gvanrossum
Copy link
Member

Ugh. I traced the code for a bit. From _drain_helper we end up in _UnixWritePipeTransport._write_ready where the exception is raised by the os.write() call, and caught by an except BaseException as exc: block where we end up calling _fatal_error which calls _close which calls self._loop.call_soon(self._call_connection_lost, exc). After some gyrations through the event loop this calls connection_lost which calls self.proc._pipe_connection_lost(self.fd, exc) which schedules a call to SubprocessStreamProtocol.pipe_connection_lost. That takes us to FlowControlMixin.connection_lost which ends up calling waiter.set_exception(exc). That exception is still the original BrokenPipeError and this waiter is indeed the future on which _drain_helper is waiting, explaining what we see.

Maybe someone else wants to take over from here? The question is what to do about it. Should _write_ready catch BrokenPipeError? And if so, what should it do?

Maybe this is expected behavior after all? There's a comment in drain():

            # Raise connection closing error if any,
            # ConnectionResetError otherwise

It would seem that "connection closing error" here might include BrokenPipeError? There are some other comments suggesting that BrokenPipeError is a viable alternative to ConnectionResetError (_feed_stdin in asyncio/subprocess.py; see also _communicate in the non-asyncio subprocess.py).

@TheTechromancer
Copy link
Author

I traced the code for a bit. From _drain_helper we end up in _UnixWritePipeTransport._write_ready where the exception is raised by the os.write() call, and caught by an except BaseException as exc: block where we end up calling _fatal_error which calls _close which calls self._loop.call_soon(self._call_connection_lost, exc). After some gyrations through the event loop this calls connection_lost which calls self.proc._pipe_connection_lost(self.fd, exc) which schedules a call to SubprocessStreamProtocol.pipe_connection_lost. That takes us to FlowControlMixin.connection_lost which ends up calling waiter.set_exception(exc). That exception is still the original BrokenPipeError and this waiter is indeed the future on which _drain_helper is waiting, explaining what we see.

Good Lord. Thanks for looking into this. If it makes the job any easier, I'm personally fine with the BrokenPipeError itself; the main reason I raised the issue was this message:

Future exception was never retrieved
future: <Future finished exception=BrokenPipeError()>

Maybe the best solution would be to retrieve the exception from waiter and re-raise it? I can handle it myself and that way we could avoid the obligatory warning message.

@gvanrossum
Copy link
Member

Good Lord. Thanks for looking into this.

:-)

If it makes the job any easier, I'm personally fine with the BrokenPipeError itself; the main reason I raised the issue was this message:

Future exception was never retrieved
future: <Future finished exception=BrokenPipeError()>

Yeah. The confusing thing is that it appeared during my debugging explorations that the future was being awaited. But I noticed in passing that the same exception is also being set on a different future: self._stdin_closed.set_exception(exc) in pipe_connection_lost(). I could just imagine that that one isn't being awaited somehow.

Maybe the best solution would be to retrieve the exception from waiter and re-raise it? I can handle it myself and that way we could avoid the obligatory warning message.

Let's first determine which of the two futures isn't being awaited. (Do you feel like digging into this yourself a bit? I didn't use anything more sophisticated than pdb, print() and breakpoint(). :-)

@TheTechromancer
Copy link
Author

TheTechromancer commented May 12, 2023

To be honest, I'm very new to asyncio (just started using it last week), so it seems strange to me that the exception wouldn't include information on where it was thrown. Is this really the case?

EDIT: awaiting proc.stdin.wait_closed() seems to fix the issue in this specific case. So StreamReaderProtocol._closed is the future that's not being awaited.

async def main():
    proc = await asyncio.create_subprocess_exec("sleep", "999", stdin=asyncio.subprocess.PIPE)
    try:
        for _ in range(10000):
            i = b"www.blacklanternsecurity.com\n"
            proc.stdin.write(i)
        proc.kill()
        await proc.stdin.wait_closed() # <----
        await proc.stdin.drain()
    except BrokenPipeError:
        print(f"Handled error: {traceback.format_exc()}")

The question now is where would be the best place to await that future. My first instinct would be to do something like this in FlowControlMixin._drain_helper(), where both futures are awaited and any exceptions caught, then re-raised in the context of the current coroutine, so they can be handled by the user:

async def main():
    proc = await asyncio.create_subprocess_exec("sleep", "999", stdin=asyncio.subprocess.PIPE)
    for _ in range(10000):
        i = b"www.blacklanternsecurity.com\n"
        proc.stdin.write(i)
    proc.kill()
    for t in asyncio.as_completed([proc.stdin.wait_closed(), proc.stdin.drain()]):
        exceptions = []
        try:
            await t
        except Exception as e:
            exceptions.append(e)
    if exceptions:
        raise next(iter(exceptions))

This feels a bit icky to me but then again I'm new to error handling in asyncio. Do you see a better solution?

@gvanrossum
Copy link
Member

it seems strange to me that the exception wouldn't include information on where it was thrown. Is this really the case?

For this you can blame asyncio's technique for making sure that exceptions are generally raised where the user expects them. So if you create a task and the task raises, and then you await the task, the awaiter should get the exception. But at the point where the exception happens, the awaiter isn't even on the stack -- the event loop is. So the exception is caught and stored as an attribute on the Task object, and the logic invoked by await looks for that attribute and reraises it in the correct context. This can also happen to Futures, where something that is supposed to "complete" the future has an exception instead of a result.

The general problem with this technique is that a task or future is never awaited, the exception is just sitting there until the object is GC'ed; at that point a finalizer runs that logs the exception preceded by an extra message ("Future exception was never retrieved"), which hopefully gives the user a hint about some task or future they should have awaited but didn't.

(It's kind of annoying that you don't get such a warning when the future or task completes normally and is GC'ed without anyone awaiting it -- if we did it that way, these things would probably be more easily debugged, because you'd get the warning for any task that you don't await, rather than only for those tasks that fail. But we didn't choose to do this and it's probably too late to change course -- users would hate us for those warnings about background tasks.)

The problem in our case is that we have two Futures that both get the same exception attached to them, in the assumption that someone is going to wait for each of them. That's not entirely unreasonable, but it means you can't just do await f1; await f2 because the first await will fail and the second will never be reached. In this case, if you do await proc.stdin.wait_closed() first, that raises the BrokenPipeError and you will never get to the second one, but that's fine, because if you don't call proc.stdin.drain() there is no second future (it's created on the spot).

The problem is then, how on earth would you know to wait for proc.stdin.wait_closed()? I'd expect to have to wait for the process, using await proc.wait(), but if I do that (instead of the drain call) that raises the BrokenPipeError and prints "Future exception was never retrieved". I wonder if we should use the standard trick to avoid that warning when we set it: self._stdin_closed._log_traceback = False (immediately after the set_exception() call in pipe_connection_lost()). That certainly prevents the dreaded message and traceback from being printed. And I think this is making good on a promise in the docs, that close() should be used along with wait_closed(), though not mandatory.

Thoughts? @kumaraditya303 Unless you have an objection against this solution, I'll implement it.

@gvanrossum
Copy link
Member

Oh, and thanks for the research, @TheTechromancer!

miss-islington pushed a commit to miss-islington/cpython that referenced this issue May 17, 2023
…d pipe stdin (pythonGH-104586)

(cherry picked from commit 7fc8e2d)

Co-authored-by: Guido van Rossum <guido@python.org>
@gvanrossum
Copy link
Member

Closing -- the 3.11 backport will take care of itself.

gvanrossum added a commit that referenced this issue May 18, 2023
…ed pipe stdin (GH-104586) (#104594)

(cherry picked from commit 7fc8e2d)

Co-authored-by: Guido van Rossum <guido@python.org>
JelleZijlstra pushed a commit to JelleZijlstra/cpython that referenced this issue May 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
topic-asyncio type-bug An unexpected behavior, bug, or error
Projects
Status: Done
Development

No branches or pull requests

3 participants