diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py index 224b1883808a41..9cd3fe3ed4f504 100644 --- a/Lib/asyncio/base_subprocess.py +++ b/Lib/asyncio/base_subprocess.py @@ -22,7 +22,7 @@ def __init__(self, loop, protocol, args, shell, self._proc = None self._pid = None self._returncode = None - self._exit_waiters = [] + self._exit_waiters = set() self._pending_calls = collections.deque() self._pipes = {} self._finished = False @@ -209,6 +209,14 @@ async def _connect_pipes(self, waiter): except (SystemExit, KeyboardInterrupt): raise except BaseException as exc: + # Close any pipes that were already connected before the + # error/cancellation to avoid leaking file descriptors. + for proto in self._pipes.values(): + if proto is not None: + proto.pipe.close() + for raw_pipe in (proc.stdin, proc.stdout, proc.stderr): + if raw_pipe is not None: + raw_pipe.close() if waiter is not None and not waiter.cancelled(): waiter.set_exception(exc) else: @@ -251,8 +259,11 @@ async def _wait(self): return self._returncode waiter = self._loop.create_future() - self._exit_waiters.append(waiter) - return await waiter + self._exit_waiters.add(waiter) + try: + return await waiter + finally: + self._exit_waiters.discard(waiter) def _try_finish(self): assert not self._finished @@ -280,7 +291,6 @@ def _call_connection_lost(self, exc): for waiter in self._exit_waiters: if not waiter.done(): waiter.set_result(self._returncode) - self._exit_waiters = None self._loop = None self._proc = None self._protocol = None diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py index 043359bbd03f8a..5efd0d54973706 100644 --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -160,10 +160,10 @@ async def _feed_stdin(self, input): # write() and drain() can raise these exceptions. if debug: logger.debug('%r communicate: stdin got %r', self, exc) - - if debug: - logger.debug('%r communicate: close stdin', self) - self.stdin.close() + finally: + if debug: + logger.debug('%r communicate: close stdin', self) + self.stdin.close() async def _noop(self): return None @@ -178,12 +178,14 @@ async def _read_stream(self, fd): if self._loop.get_debug(): name = 'stdout' if fd == 1 else 'stderr' logger.debug('%r communicate: read %s', self, name) - output = await stream.read() - if self._loop.get_debug(): - name = 'stdout' if fd == 1 else 'stderr' - logger.debug('%r communicate: close %s', self, name) - transport.close() - return output + try: + output = await stream.read() + if self._loop.get_debug(): + name = 'stdout' if fd == 1 else 'stderr' + logger.debug('%r communicate: close %s', self, name) + return output + finally: + transport.close() async def communicate(self, input=None): if self.stdin is not None: @@ -198,8 +200,13 @@ async def communicate(self, input=None): stderr = self._read_stream(2) else: stderr = self._noop() - stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr) - await self.wait() + try: + stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr) + except: + self.kill() + raise + finally: + await self.wait() return (stdout, stderr) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 49e8067ee7b4e5..0706b0c15087de 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -211,7 +211,7 @@ async def _make_subprocess_transport(self, protocol, args, shell, raise except BaseException: transp.close() - await transp._wait() + await tasks.shield(transp._wait()) raise return transp diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 5f75b17d8ca649..0939b015f3108a 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -408,7 +408,7 @@ async def _make_subprocess_transport(self, protocol, args, shell, raise except BaseException: transp.close() - await transp._wait() + await tasks.shield(transp._wait()) raise return transp diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index c08eb7cf261568..0657aa39eb9fdc 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -11,7 +11,7 @@ from asyncio import subprocess from test.test_asyncio import utils as test_utils from test import support -from test.support import os_helper, warnings_helper, gc_collect +from test.support import os_helper, gc_collect if not support.has_subprocess_support: raise unittest.SkipTest("test module requires subprocess") @@ -125,7 +125,7 @@ def test_proc_exited_no_invalid_state_error_on_exit_waiters(self): # Simulate a waiter registered via _wait() before the process exits. exit_waiter = self.loop.create_future() - transport._exit_waiters.append(exit_waiter) + transport._exit_waiters.add(exit_waiter) # _connect_pipes hasn't completed, so _pipes_connected is False. self.assertFalse(transport._pipes_connected) @@ -910,7 +910,137 @@ async def main(): self.loop.run_until_complete(main()) - @warnings_helper.ignore_warnings(category=ResourceWarning) + def test_communicate_cancellation_kills_process(self): + async def run(): + proc = await asyncio.create_subprocess_exec( + *PROGRAM_BLOCKED, + stdout=subprocess.PIPE, + ) + with self.assertRaises(asyncio.TimeoutError): + await asyncio.wait_for(proc.communicate(), 0.1) + + returncode = await asyncio.wait_for(proc.wait(), 5.0) + self.assertIsNotNone(returncode) + + self.loop.run_until_complete(run()) + + def test_communicate_cancellation_closes_stdout_transport(self): + async def run(): + proc = await asyncio.create_subprocess_exec( + *PROGRAM_BLOCKED, + stdout=subprocess.PIPE, + ) + try: + with self.assertRaises(asyncio.TimeoutError): + await asyncio.wait_for(proc.communicate(), 0.1) + + await asyncio.sleep(0) + + stdout_transport = proc._transport.get_pipe_transport(1) + self.assertTrue( + stdout_transport is None or stdout_transport.is_closing(), + "stdout pipe transport not closed after cancellation") + finally: + if proc.returncode is None: + proc.kill() + await proc.wait() + + self.loop.run_until_complete(run()) + + def test_communicate_cancellation_closes_stdin(self): + async def run(): + proc = await asyncio.create_subprocess_exec( + *PROGRAM_BLOCKED, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + try: + large_input = b'x' * (1024 * 1024) + with self.assertRaises(asyncio.TimeoutError): + await asyncio.wait_for( + proc.communicate(large_input), 0.5) + + await asyncio.sleep(0) + + stdin_transport = proc._transport.get_pipe_transport(0) + self.assertTrue( + stdin_transport is None or stdin_transport.is_closing(), + "stdin pipe transport not closed after cancellation") + finally: + if proc.returncode is None: + proc.kill() + await proc.wait() + + self.loop.run_until_complete(run()) + + def test_communicate_cancellation_closes_stderr_transport(self): + async def run(): + proc = await asyncio.create_subprocess_exec( + *PROGRAM_BLOCKED, + stderr=subprocess.PIPE, + ) + try: + with self.assertRaises(asyncio.TimeoutError): + await asyncio.wait_for(proc.communicate(), 0.1) + + await asyncio.sleep(0) + + stderr_transport = proc._transport.get_pipe_transport(2) + self.assertTrue( + stderr_transport is None or stderr_transport.is_closing(), + "stderr pipe transport not closed after cancellation") + finally: + if proc.returncode is None: + proc.kill() + await proc.wait() + + self.loop.run_until_complete(run()) + + def test_wait_cancellation_removes_exit_waiters(self): + async def run(): + proc = await asyncio.create_subprocess_exec(*PROGRAM_BLOCKED) + try: + for _ in range(5): + task = self.loop.create_task(proc.wait()) + self.loop.call_soon(task.cancel) + try: + await task + except asyncio.CancelledError: + pass + + self.assertEqual(len(proc._transport._exit_waiters), 0) + finally: + proc.kill() + await proc.wait() + + self.loop.run_until_complete(run()) + + def test_communicate_cancellation_all_pipes(self): + async def run(): + proc = await asyncio.create_subprocess_exec( + *PROGRAM_BLOCKED, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + large_input = b'x' * (1024 * 1024) + with self.assertRaises(asyncio.TimeoutError): + await asyncio.wait_for( + proc.communicate(large_input), 0.5) + + await asyncio.sleep(0) + + for fd, name in [(0, 'stdin'), (1, 'stdout'), (2, 'stderr')]: + transport = proc._transport.get_pipe_transport(fd) + self.assertTrue( + transport is None or transport.is_closing(), + f"{name} pipe transport not closed after cancellation") + + returncode = await asyncio.wait_for(proc.wait(), 5.0) + self.assertIsNotNone(returncode) + + self.loop.run_until_complete(run()) + def test_subprocess_read_pipe_cancelled(self): async def main(): loop = asyncio.get_running_loop() @@ -921,7 +1051,6 @@ async def main(): asyncio.run(main()) gc_collect() - @warnings_helper.ignore_warnings(category=ResourceWarning) def test_subprocess_write_pipe_cancelled(self): async def main(): loop = asyncio.get_running_loop() @@ -932,7 +1061,6 @@ async def main(): asyncio.run(main()) gc_collect() - @warnings_helper.ignore_warnings(category=ResourceWarning) def test_subprocess_read_write_pipe_cancelled(self): async def main(): loop = asyncio.get_running_loop()