Skip to content

Commit

Permalink
[3.12] gh-109709: Fix asyncio test_stdin_broken_pipe() (GH-109710) (#…
Browse files Browse the repository at this point in the history
…109731)

gh-109709: Fix asyncio test_stdin_broken_pipe() (GH-109710)

Replace harcoded sleep of 500 ms with synchronization using a pipe.

Fix also Process._feed_stdin(): catch also BrokenPipeError on
stdin.write(input), not only on stdin.drain().
(cherry picked from commit cbbdf2c)

Co-authored-by: Victor Stinner <vstinner@python.org>
  • Loading branch information
miss-islington and vstinner committed Oct 2, 2023
1 parent 551aa6a commit c7cbd82
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 16 deletions.
14 changes: 8 additions & 6 deletions Lib/asyncio/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,17 @@ def kill(self):

async def _feed_stdin(self, input):
debug = self._loop.get_debug()
if input is not None:
self.stdin.write(input)
if debug:
logger.debug(
'%r communicate: feed stdin (%s bytes)', self, len(input))
try:
if input is not None:
self.stdin.write(input)
if debug:
logger.debug(
'%r communicate: feed stdin (%s bytes)', self, len(input))

await self.stdin.drain()
except (BrokenPipeError, ConnectionResetError) as exc:
# communicate() ignores BrokenPipeError and ConnectionResetError
# communicate() ignores BrokenPipeError and ConnectionResetError.
# write() and drain() can raise these exceptions.
if debug:
logger.debug('%r communicate: stdin got %r', self, exc)

Expand Down
52 changes: 42 additions & 10 deletions Lib/test/test_asyncio/test_subprocess.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import signal
import sys
import textwrap
import unittest
import warnings
from unittest import mock
Expand All @@ -12,9 +13,14 @@
from test import support
from test.support import os_helper

if sys.platform != 'win32':

MS_WINDOWS = (sys.platform == 'win32')
if MS_WINDOWS:
import msvcrt
else:
from asyncio import unix_events


if support.check_sanitizer(address=True):
raise unittest.SkipTest("Exposes ASAN flakiness in GitHub CI")

Expand Down Expand Up @@ -270,26 +276,43 @@ async def send_signal(proc):
finally:
signal.signal(signal.SIGHUP, old_handler)

def prepare_broken_pipe_test(self):
def test_stdin_broken_pipe(self):
# buffer large enough to feed the whole pipe buffer
large_data = b'x' * support.PIPE_MAX_SIZE

rfd, wfd = os.pipe()
self.addCleanup(os.close, rfd)
self.addCleanup(os.close, wfd)
if MS_WINDOWS:
handle = msvcrt.get_osfhandle(rfd)
os.set_handle_inheritable(handle, True)
code = textwrap.dedent(f'''
import os, msvcrt
handle = {handle}
fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
os.read(fd, 1)
''')
from subprocess import STARTUPINFO
startupinfo = STARTUPINFO()
startupinfo.lpAttributeList = {"handle_list": [handle]}
kwargs = dict(startupinfo=startupinfo)
else:
code = f'import os; fd = {rfd}; os.read(fd, 1)'
kwargs = dict(pass_fds=(rfd,))

# the program ends before the stdin can be fed
proc = self.loop.run_until_complete(
asyncio.create_subprocess_exec(
sys.executable, '-c', 'pass',
sys.executable, '-c', code,
stdin=subprocess.PIPE,
**kwargs
)
)

return (proc, large_data)

def test_stdin_broken_pipe(self):
proc, large_data = self.prepare_broken_pipe_test()

async def write_stdin(proc, data):
await asyncio.sleep(0.5)
proc.stdin.write(data)
# Only exit the child process once the write buffer is filled
os.write(wfd, b'go')
await proc.stdin.drain()

coro = write_stdin(proc, large_data)
Expand All @@ -300,7 +323,16 @@ async def write_stdin(proc, data):
self.loop.run_until_complete(proc.wait())

def test_communicate_ignore_broken_pipe(self):
proc, large_data = self.prepare_broken_pipe_test()
# buffer large enough to feed the whole pipe buffer
large_data = b'x' * support.PIPE_MAX_SIZE

# the program ends before the stdin can be fed
proc = self.loop.run_until_complete(
asyncio.create_subprocess_exec(
sys.executable, '-c', 'pass',
stdin=subprocess.PIPE,
)
)

# communicate() must ignore BrokenPipeError when feeding stdin
self.loop.set_exception_handler(lambda loop, msg: None)
Expand Down

0 comments on commit c7cbd82

Please sign in to comment.