From e3a2fbe6da1007b3989cdf2ff3789a67313d52a2 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith using claude.ai/code" Date: Thu, 27 Nov 2025 23:52:43 +0000 Subject: [PATCH 01/12] Add subprocess.run_pipeline() for command pipe chaining Add a new run_pipeline() function to the subprocess module that enables running multiple commands connected via pipes, similar to shell pipelines. New API: - run_pipeline(*commands, ...) - Run a pipeline of commands - PipelineResult - Return type with commands, returncodes, stdout, stderr - PipelineError - Raised when check=True and any command fails Features: - Supports arbitrary number of commands (minimum 2) - capture_output, input, timeout, and check parameters like run() - stdin= connects to first process, stdout= connects to last process - Text mode support via text=True, encoding, errors - All processes share a single stderr pipe for simplicity - "pipefail" semantics: check=True fails if any command fails Unlike run(), this function does not accept universal_newlines. Use text=True instead. Example: result = subprocess.run_pipeline( ['cat', 'file.txt'], ['grep', 'pattern'], ['wc', '-l'], capture_output=True, text=True ) Co-authored-by: Claude Opus 4.5 --- Lib/subprocess.py | 304 +++++++++++++++++++++++++++++++++++- Lib/test/test_subprocess.py | 211 +++++++++++++++++++++++++ 2 files changed, 514 insertions(+), 1 deletion(-) diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 4d5ab6fbff0a46..d89876fa4d1c5c 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -62,7 +62,8 @@ __all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput", "getoutput", "check_output", "run", "CalledProcessError", "DEVNULL", - "SubprocessError", "TimeoutExpired", "CompletedProcess"] + "SubprocessError", "TimeoutExpired", "CompletedProcess", + "run_pipeline", "PipelineResult", "PipelineError"] # NOTE: We intentionally exclude list2cmdline as it is # considered an internal implementation detail. issue10838. @@ -194,6 +195,36 @@ def stdout(self, value): self.output = value +class PipelineError(SubprocessError): + """Raised when run_pipeline() is called with check=True and one or more + commands in the pipeline return a non-zero exit status. + + Attributes: + commands: List of commands in the pipeline (each a list of strings). + returncodes: List of return codes corresponding to each command. + stdout: Standard output from the final command (if captured). + stderr: Standard error output (if captured). + failed: List of (index, command, returncode) tuples for failed commands. + """ + def __init__(self, commands, returncodes, stdout=None, stderr=None): + self.commands = commands + self.returncodes = returncodes + self.stdout = stdout + self.stderr = stderr + self.failed = [ + (i, cmd, rc) + for i, (cmd, rc) in enumerate(zip(commands, returncodes)) + if rc != 0 + ] + + def __str__(self): + failed_info = ", ".join( + f"command {i} {cmd!r} returned {rc}" + for i, cmd, rc in self.failed + ) + return f"Pipeline failed: {failed_info}" + + if _mswindows: class STARTUPINFO: def __init__(self, *, dwFlags=0, hStdInput=None, hStdOutput=None, @@ -508,6 +539,47 @@ def check_returncode(self): self.stderr) +class PipelineResult: + """A pipeline of processes that have finished running. + + This is returned by run_pipeline(). + + Attributes: + commands: List of commands in the pipeline (each command is a list). + returncodes: List of return codes for each command in the pipeline. + returncode: The return code of the final command (for convenience). + stdout: The standard output of the final command (None if not captured). + stderr: The standard error output (None if not captured). + """ + def __init__(self, commands, returncodes, stdout=None, stderr=None): + self.commands = list(commands) + self.returncodes = list(returncodes) + self.stdout = stdout + self.stderr = stderr + + @property + def returncode(self): + """Return the exit code of the final command in the pipeline.""" + return self.returncodes[-1] if self.returncodes else None + + def __repr__(self): + args = [f'commands={self.commands!r}', + f'returncodes={self.returncodes!r}'] + if self.stdout is not None: + args.append(f'stdout={self.stdout!r}') + if self.stderr is not None: + args.append(f'stderr={self.stderr!r}') + return f"{type(self).__name__}({', '.join(args)})" + + __class_getitem__ = classmethod(types.GenericAlias) + + def check_returncodes(self): + """Raise PipelineError if any command's exit code is non-zero.""" + if any(rc != 0 for rc in self.returncodes): + raise PipelineError(self.commands, self.returncodes, + self.stdout, self.stderr) + + def run(*popenargs, input=None, capture_output=False, timeout=None, check=False, **kwargs): """Run command with arguments and return a CompletedProcess instance. @@ -578,6 +650,236 @@ def run(*popenargs, return CompletedProcess(process.args, retcode, stdout, stderr) +def run_pipeline(*commands, input=None, capture_output=False, timeout=None, + check=False, **kwargs): + """Run a pipeline of commands connected via pipes. + + Each positional argument should be a command (list of strings or a string + if shell=True) to execute. The stdout of each command is connected to the + stdin of the next command in the pipeline, similar to shell pipelines. + + Returns a PipelineResult instance with attributes commands, returncodes, + stdout, and stderr. By default, stdout and stderr are not captured, and + those attributes will be None. Pass capture_output=True to capture both + the final command's stdout and stderr from all commands. + + If check is True and any command's exit code is non-zero, it raises a + PipelineError. This is similar to shell "pipefail" behavior. + + If timeout (seconds) is given and the pipeline takes too long, a + TimeoutExpired exception will be raised and all processes will be killed. + + The optional "input" argument allows passing bytes or a string to the + first command's stdin. If you use this argument, you may not also specify + stdin in kwargs. + + By default, all communication is in bytes. Use text=True, encoding, or + errors to enable text mode, which affects the input argument and stdout/ + stderr outputs. + + .. note:: + When using text=True with capture_output=True or stderr=PIPE, be aware + that stderr output from multiple processes may be interleaved in ways + that produce invalid character sequences when decoded. For reliable + text decoding, avoid text=True when capturing stderr from pipelines, + or handle decoding errors appropriately. + + Other keyword arguments are passed to each Popen call, except for stdin, + stdout which are managed by the pipeline. + + Example: + # Equivalent to: cat file.txt | grep pattern | wc -l + result = run_pipeline( + ['cat', 'file.txt'], + ['grep', 'pattern'], + ['wc', '-l'], + capture_output=True, text=True + ) + print(result.stdout) # "42\\n" + print(result.returncodes) # [0, 0, 0] + """ + if len(commands) < 2: + raise ValueError('run_pipeline requires at least 2 commands') + + # Reject universal_newlines - use text= instead + if kwargs.get('universal_newlines') is not None: + raise TypeError( + "run_pipeline() does not support 'universal_newlines'. " + "Use 'text=True' instead." + ) + + # Validate no conflicting arguments + if input is not None: + if kwargs.get('stdin') is not None: + raise ValueError('stdin and input arguments may not both be used.') + + if capture_output: + if kwargs.get('stdout') is not None or kwargs.get('stderr') is not None: + raise ValueError('stdout and stderr arguments may not be used ' + 'with capture_output.') + + # Determine stderr handling - all processes share the same stderr pipe + # When capturing, we create one pipe and all processes write to it + stderr_arg = kwargs.pop('stderr', None) + capture_stderr = capture_output or stderr_arg == PIPE + + # stdin is for the first process, stdout is for the last process + stdin_arg = kwargs.pop('stdin', None) + stdout_arg = kwargs.pop('stdout', None) + + processes = [] + stderr_read_fd = None # Read end of shared stderr pipe (for parent) + stderr_write_fd = None # Write end of shared stderr pipe (for children) + + try: + # Create a single stderr pipe that all processes will share + if capture_stderr: + stderr_read_fd, stderr_write_fd = os.pipe() + + for i, cmd in enumerate(commands): + is_first = (i == 0) + is_last = (i == len(commands) - 1) + + # Determine stdin for this process + if is_first: + if input is not None: + proc_stdin = PIPE + else: + proc_stdin = stdin_arg # Could be None, PIPE, fd, or file + else: + proc_stdin = processes[-1].stdout + + # Determine stdout for this process + if is_last: + if capture_output: + proc_stdout = PIPE + else: + proc_stdout = stdout_arg # Could be None, PIPE, fd, or file + else: + proc_stdout = PIPE + + # All processes share the same stderr pipe (write end) + if capture_stderr: + proc_stderr = stderr_write_fd + else: + proc_stderr = stderr_arg + + proc = Popen(cmd, stdin=proc_stdin, stdout=proc_stdout, + stderr=proc_stderr, **kwargs) + processes.append(proc) + + # Close the parent's copy of the previous process's stdout + # to allow the pipe to signal EOF when the previous process exits + if not is_first and processes[-2].stdout is not None: + processes[-2].stdout.close() + + # Close the write end of stderr pipe in parent - children have it + if stderr_write_fd is not None: + os.close(stderr_write_fd) + stderr_write_fd = None + + first_proc = processes[0] + last_proc = processes[-1] + + # Handle communication with timeout + start_time = _time() if timeout is not None else None + + # Write input to first process if provided + if input is not None and first_proc.stdin is not None: + try: + first_proc.stdin.write(input) + except BrokenPipeError: + pass # First process may have exited early + finally: + first_proc.stdin.close() + + # Determine if we're in text mode + text_mode = kwargs.get('text') or kwargs.get('encoding') or kwargs.get('errors') + + # Read output from the last process + stdout = None + stderr = None + + # Read stdout if we created a pipe for it (capture_output or stdout=PIPE) + if last_proc.stdout is not None: + stdout = last_proc.stdout.read() + + # Read stderr from the shared pipe + if stderr_read_fd is not None: + stderr = os.read(stderr_read_fd, 1024 * 1024 * 10) # Up to 10MB + # Keep reading until EOF + while True: + chunk = os.read(stderr_read_fd, 65536) + if not chunk: + break + stderr += chunk + + # Calculate remaining timeout + def remaining_timeout(): + if timeout is None: + return None + elapsed = _time() - start_time + remaining = timeout - elapsed + if remaining <= 0: + raise TimeoutExpired(commands, timeout, stdout, stderr) + return remaining + + # Wait for all processes to complete + returncodes = [] + for proc in processes: + try: + proc.wait(timeout=remaining_timeout()) + except TimeoutExpired: + # Kill all processes on timeout + for p in processes: + if p.poll() is None: + p.kill() + for p in processes: + p.wait() + raise TimeoutExpired(commands, timeout, stdout, stderr) + returncodes.append(proc.returncode) + + # Handle text mode conversion for stderr (stdout is already handled + # by Popen when text=True). stderr is always read as bytes since + # we use os.pipe() directly. + if text_mode and stderr is not None: + encoding = kwargs.get('encoding') + errors = kwargs.get('errors', 'strict') + if encoding is None: + encoding = locale.getencoding() + stderr = stderr.decode(encoding, errors) + + result = PipelineResult(commands, returncodes, stdout, stderr) + + if check and any(rc != 0 for rc in returncodes): + raise PipelineError(commands, returncodes, stdout, stderr) + + return result + + finally: + # Ensure all processes are cleaned up + for proc in processes: + if proc.poll() is None: + proc.kill() + proc.wait() + # Close any open file handles + if proc.stdin and not proc.stdin.closed: + proc.stdin.close() + if proc.stdout and not proc.stdout.closed: + proc.stdout.close() + # Close stderr pipe file descriptors + if stderr_read_fd is not None: + try: + os.close(stderr_read_fd) + except OSError: + pass + if stderr_write_fd is not None: + try: + os.close(stderr_write_fd) + except OSError: + pass + + def list2cmdline(seq): """ Translate a sequence of arguments into a command line diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 806a1e3fa303eb..01aa6c02dc26bb 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -1984,6 +1984,217 @@ def test_encoding_warning(self): self.assertStartsWith(lines[1], b":3: EncodingWarning: ") +class PipelineTestCase(BaseTestCase): + """Tests for subprocess.run_pipeline()""" + + def test_pipeline_basic(self): + """Test basic two-command pipeline""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello world")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + capture_output=True, text=True + ) + self.assertEqual(result.stdout.strip(), 'HELLO WORLD') + self.assertEqual(result.returncodes, [0, 0]) + self.assertEqual(result.returncode, 0) + + def test_pipeline_three_commands(self): + """Test pipeline with three commands""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("one\\ntwo\\nthree")'], + [sys.executable, '-c', 'import sys; print("".join(sorted(sys.stdin.readlines())))'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read().strip().upper())'], + capture_output=True, text=True + ) + self.assertEqual(result.stdout.strip(), 'ONE\nTHREE\nTWO') + self.assertEqual(result.returncodes, [0, 0, 0]) + + def test_pipeline_with_input(self): + """Test pipeline with input data""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + [sys.executable, '-c', 'import sys; print(len(sys.stdin.read().strip()))'], + input='hello', capture_output=True, text=True + ) + self.assertEqual(result.stdout.strip(), '5') + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_bytes_mode(self): + """Test pipeline in binary mode""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; sys.stdout.buffer.write(b"hello")'], + [sys.executable, '-c', 'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read().upper())'], + capture_output=True + ) + self.assertEqual(result.stdout, b'HELLO') + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_error_check(self): + """Test that check=True raises PipelineError on failure""" + with self.assertRaises(subprocess.PipelineError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; sys.exit(1)'], + capture_output=True, check=True + ) + exc = cm.exception + self.assertEqual(len(exc.failed), 1) + self.assertEqual(exc.failed[0][0], 1) # Second command failed + self.assertEqual(exc.returncodes, [0, 1]) + + def test_pipeline_first_command_fails(self): + """Test pipeline where first command fails""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; sys.exit(42)'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read())'], + capture_output=True + ) + self.assertEqual(result.returncodes[0], 42) + + def test_pipeline_requires_two_commands(self): + """Test that pipeline requires at least 2 commands""" + with self.assertRaises(ValueError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + capture_output=True + ) + self.assertIn('at least 2 commands', str(cm.exception)) + + def test_pipeline_stdin_and_input_conflict(self): + """Test that stdin and input cannot both be specified""" + with self.assertRaises(ValueError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'pass'], + [sys.executable, '-c', 'pass'], + input='data', stdin=subprocess.PIPE + ) + self.assertIn('stdin', str(cm.exception)) + self.assertIn('input', str(cm.exception)) + + def test_pipeline_capture_output_conflict(self): + """Test that capture_output conflicts with stdout/stderr""" + with self.assertRaises(ValueError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'pass'], + [sys.executable, '-c', 'pass'], + capture_output=True, stdout=subprocess.PIPE + ) + self.assertIn('capture_output', str(cm.exception)) + + def test_pipeline_rejects_universal_newlines(self): + """Test that universal_newlines is not supported""" + with self.assertRaises(TypeError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'pass'], + [sys.executable, '-c', 'pass'], + universal_newlines=True + ) + self.assertIn('universal_newlines', str(cm.exception)) + self.assertIn('text=True', str(cm.exception)) + + def test_pipeline_result_repr(self): + """Test PipelineResult string representation""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("test")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read())'], + capture_output=True, text=True + ) + repr_str = repr(result) + self.assertIn('PipelineResult', repr_str) + self.assertIn('commands=', repr_str) + self.assertIn('returncodes=', repr_str) + + def test_pipeline_check_returncodes_method(self): + """Test PipelineResult.check_returncodes() method""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; sys.exit(5)'], + capture_output=True + ) + with self.assertRaises(subprocess.PipelineError) as cm: + result.check_returncodes() + self.assertEqual(cm.exception.returncodes[1], 5) + + def test_pipeline_no_capture(self): + """Test pipeline without capturing output""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'pass'], + [sys.executable, '-c', 'pass'], + ) + self.assertEqual(result.stdout, None) + self.assertEqual(result.stderr, None) + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_stderr_capture(self): + """Test that stderr is captured from all processes""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; print("err1", file=sys.stderr); print("out1")'], + [sys.executable, '-c', 'import sys; print("err2", file=sys.stderr); print(sys.stdin.read())'], + capture_output=True, text=True + ) + self.assertIn('err1', result.stderr) + self.assertIn('err2', result.stderr) + + @unittest.skipIf(mswindows, "POSIX specific test") + def test_pipeline_timeout(self): + """Test pipeline with timeout""" + with self.assertRaises(subprocess.TimeoutExpired): + subprocess.run_pipeline( + [sys.executable, '-c', 'import time; time.sleep(10); print("done")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read())'], + capture_output=True, timeout=0.1 + ) + + def test_pipeline_error_str(self): + """Test PipelineError string representation""" + try: + subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; sys.exit(1)'], + [sys.executable, '-c', 'import sys; sys.exit(2)'], + capture_output=True, check=True + ) + except subprocess.PipelineError as e: + error_str = str(e) + self.assertIn('Pipeline failed', error_str) + + def test_pipeline_explicit_stdout_pipe(self): + """Test pipeline with explicit stdout=PIPE""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + stdout=subprocess.PIPE + ) + self.assertEqual(result.stdout.strip(), b'HELLO') + self.assertIsNone(result.stderr) + + def test_pipeline_stdin_from_file(self): + """Test pipeline with stdin from file""" + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + f.write('file content\n') + f.flush() + fname = f.name + try: + with open(fname, 'r') as f: + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + [sys.executable, '-c', 'import sys; print(len(sys.stdin.read().strip()))'], + stdin=f, capture_output=True, text=True + ) + self.assertEqual(result.stdout.strip(), '12') # "FILE CONTENT" + finally: + os.unlink(fname) + + def test_pipeline_stdout_to_devnull(self): + """Test pipeline with stdout to DEVNULL""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read())'], + stdout=subprocess.DEVNULL + ) + self.assertIsNone(result.stdout) + self.assertEqual(result.returncodes, [0, 0]) + + def _get_test_grp_name(): for name_group in ('staff', 'nogroup', 'grp', 'nobody', 'nfsnobody'): if grp: From 4feb2a80e5ceb459079909e1b0bfbdfd7f670d65 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith using claude.ai/code" Date: Thu, 27 Nov 2025 23:52:57 +0000 Subject: [PATCH 02/12] Add documentation for subprocess.run_pipeline() Document the new run_pipeline() function, PipelineResult class, and PipelineError exception in the subprocess module documentation. Includes: - Function signature with stdin, stdout, stderr, capture_output, etc. - Note about shared stderr pipe and text mode caveat for interleaved multi-byte character sequences - Note that universal_newlines is not supported (use text=True) - Explanation that stdin connects to first process, stdout to last - Usage examples showing basic pipelines, multi-command pipelines, input handling, and error handling with check=True - PipelineResult attributes: commands, returncodes, returncode, stdout, stderr, and check_returncodes() method - PipelineError attributes: commands, returncodes, stdout, stderr, and failed list Co-authored-by: Claude Opus 4.5 --- Doc/library/subprocess.rst | 179 +++++++++++++++++++++++++++++++++++++ 1 file changed, 179 insertions(+) diff --git a/Doc/library/subprocess.rst b/Doc/library/subprocess.rst index 43da804b62beb1..da1d4047851567 100644 --- a/Doc/library/subprocess.rst +++ b/Doc/library/subprocess.rst @@ -264,6 +264,185 @@ underlying :class:`Popen` interface can be used directly. *stdout* and *stderr* attributes added +.. function:: run_pipeline(*commands, stdin=None, input=None, \ + stdout=None, stderr=None, capture_output=False, \ + timeout=None, check=False, encoding=None, \ + errors=None, text=None, env=None, \ + **other_popen_kwargs) + + Run a pipeline of commands connected via pipes, similar to shell pipelines. + Wait for all commands to complete, then return a :class:`PipelineResult` + instance. + + Each positional argument should be a command (a list of strings, or a string + if ``shell=True``) to execute. The standard output of each command is + connected to the standard input of the next command in the pipeline. + + This function requires at least two commands. For a single command, use + :func:`run` instead. + + If *capture_output* is true, the standard output of the final command and + the standard error of all commands will be captured. All processes in the + pipeline share a single stderr pipe, so their error output will be + interleaved. The *stdout* and *stderr* arguments may not be supplied at + the same time as *capture_output*. + + A *timeout* may be specified in seconds. If the timeout expires, all + child processes will be killed and waited for, and then a + :exc:`TimeoutExpired` exception will be raised. + + The *input* argument is passed to the first command's stdin. If used, it + must be a byte sequence, or a string if *encoding* or *errors* is specified + or *text* is true. + + If *check* is true, and any process in the pipeline exits with a non-zero + exit code, a :exc:`PipelineError` exception will be raised. This behavior + is similar to the shell's ``pipefail`` option. + + If *encoding* or *errors* are specified, or *text* is true, file objects + are opened in text mode using the specified encoding and errors. + + .. note:: + + When using ``text=True`` with ``capture_output=True`` or ``stderr=PIPE``, + be aware that stderr output from multiple processes may be interleaved + in ways that produce incomplete multi-byte character sequences. For + reliable text decoding of stderr, consider capturing in binary mode + and decoding manually with appropriate error handling, or use + ``errors='replace'`` or ``errors='backslashreplace'``. + + If *stdin* is specified, it is connected to the first command's standard + input. If *stdout* is specified, it is connected to the last command's + standard output. When *stdout* is :data:`PIPE`, the output is available + in the returned :class:`PipelineResult`'s :attr:`~PipelineResult.stdout` + attribute. Other keyword arguments are passed to each :class:`Popen` call. + + Unlike :func:`run`, this function does not accept *universal_newlines*. + Use ``text=True`` instead. + + Examples:: + + >>> import subprocess + >>> # Equivalent to: echo "hello world" | tr a-z A-Z + >>> result = subprocess.run_pipeline( + ... ['echo', 'hello world'], + ... ['tr', 'a-z', 'A-Z'], + ... capture_output=True, text=True + ... ) + >>> result.stdout + 'HELLO WORLD\n' + >>> result.returncodes + [0, 0] + + >>> # Pipeline with three commands + >>> result = subprocess.run_pipeline( + ... ['echo', 'one\ntwo\nthree'], + ... ['sort'], + ... ['head', '-n', '2'], + ... capture_output=True, text=True + ... ) + >>> result.stdout + 'one\nthree\n' + + >>> # Using input parameter + >>> result = subprocess.run_pipeline( + ... ['cat'], + ... ['wc', '-l'], + ... input='line1\nline2\nline3\n', + ... capture_output=True, text=True + ... ) + >>> result.stdout.strip() + '3' + + >>> # Error handling with check=True + >>> subprocess.run_pipeline( + ... ['echo', 'hello'], + ... ['false'], # exits with status 1 + ... check=True + ... ) + Traceback (most recent call last): + ... + subprocess.PipelineError: Pipeline failed: command 1 ['false'] returned 1 + + .. versionadded:: next + + +.. class:: PipelineResult + + The return value from :func:`run_pipeline`, representing a pipeline of + processes that have finished. + + .. attribute:: commands + + The list of commands used to launch the pipeline. Each command is a list + of strings (or a string if ``shell=True`` was used). + + .. attribute:: returncodes + + List of exit status codes for each command in the pipeline. Typically, + an exit status of 0 indicates that the command ran successfully. + + A negative value ``-N`` indicates that the command was terminated by + signal ``N`` (POSIX only). + + .. attribute:: returncode + + Exit status of the final command in the pipeline. This is a convenience + property equivalent to ``returncodes[-1]``. + + .. attribute:: stdout + + Captured stdout from the final command in the pipeline. A bytes sequence, + or a string if :func:`run_pipeline` was called with an encoding, errors, + or ``text=True``. ``None`` if stdout was not captured. + + .. attribute:: stderr + + Captured stderr from all commands in the pipeline, combined. A bytes + sequence, or a string if :func:`run_pipeline` was called with an + encoding, errors, or ``text=True``. ``None`` if stderr was not captured. + + .. method:: check_returncodes() + + If any command's :attr:`returncode` is non-zero, raise a + :exc:`PipelineError`. + + .. versionadded:: next + + +.. exception:: PipelineError + + Subclass of :exc:`SubprocessError`, raised when a pipeline run by + :func:`run_pipeline` (with ``check=True``) contains one or more commands + that returned a non-zero exit status. This is similar to the shell's + ``pipefail`` behavior. + + .. attribute:: commands + + List of commands that were used in the pipeline. + + .. attribute:: returncodes + + List of exit status codes for each command in the pipeline. + + .. attribute:: stdout + + Output of the final command if it was captured. Otherwise, ``None``. + + .. attribute:: stderr + + Combined stderr output of all commands if it was captured. + Otherwise, ``None``. + + .. attribute:: failed + + List of ``(index, command, returncode)`` tuples for each command + that returned a non-zero exit status. The *index* is the position + of the command in the pipeline (0-based). + + .. versionadded:: next + + .. _frequently-used-arguments: Frequently Used Arguments From 2a11d4bf53ccf3721833cc41e2b6e9d0a487e024 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith using claude.ai/code" Date: Fri, 28 Nov 2025 01:12:12 +0000 Subject: [PATCH 03/12] Refactor run_pipeline() to use multiplexed I/O Add _communicate_streams() helper function that properly multiplexes read/write operations to prevent pipe buffer deadlocks. The helper uses selectors on POSIX and threads on Windows, similar to Popen.communicate(). This fixes potential deadlocks when large amounts of data flow through the pipeline and significantly improves performance. Co-authored-by: Claude Opus 4.5 --- Lib/subprocess.py | 318 +++++++++++++++++++++++++++++++++++++++------- 1 file changed, 269 insertions(+), 49 deletions(-) diff --git a/Lib/subprocess.py b/Lib/subprocess.py index d89876fa4d1c5c..5a6a7086db115e 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -320,6 +320,220 @@ def _cleanup(): DEVNULL = -3 +# Helper function for multiplexed I/O, used by run_pipeline() +def _remaining_time_helper(endtime): + """Calculate remaining time until deadline.""" + if endtime is None: + return None + return endtime - _time() + + +def _communicate_streams(stdin=None, input_data=None, read_streams=None, + timeout=None, cmd_for_timeout=None): + """ + Multiplex I/O: write input_data to stdin, read from read_streams. + + Works with both file objects and raw file descriptors. + All I/O is done in binary mode; caller handles text encoding. + + Args: + stdin: Writable file object for input, or None + input_data: Bytes to write to stdin, or None + read_streams: List of readable file objects or raw fds to read from + timeout: Timeout in seconds, or None for no timeout + cmd_for_timeout: Value to use for TimeoutExpired.cmd + + Returns: + Dict mapping each item in read_streams to its bytes data + + Raises: + TimeoutExpired: If timeout expires (with partial data) + """ + if timeout is not None: + endtime = _time() + timeout + else: + endtime = None + + read_streams = read_streams or [] + + if _mswindows: + return _communicate_streams_windows( + stdin, input_data, read_streams, endtime, timeout, cmd_for_timeout) + else: + return _communicate_streams_posix( + stdin, input_data, read_streams, endtime, timeout, cmd_for_timeout) + + +if _mswindows: + def _reader_thread_func(fh, buffer): + """Thread function to read from a file handle into a buffer list.""" + try: + buffer.append(fh.read()) + except OSError: + buffer.append(b'') + + def _communicate_streams_windows(stdin, input_data, read_streams, + endtime, orig_timeout, cmd_for_timeout): + """Windows implementation using threads.""" + threads = [] + buffers = {} + fds_to_close = [] + + # Start reader threads + for stream in read_streams: + buf = [] + buffers[stream] = buf + # Wrap raw fds in file objects + if isinstance(stream, int): + fobj = os.fdopen(os.dup(stream), 'rb') + fds_to_close.append(stream) + else: + fobj = stream + t = threading.Thread(target=_reader_thread_func, args=(fobj, buf)) + t.daemon = True + t.start() + threads.append((stream, t, fobj)) + + # Write stdin + if stdin and input_data: + try: + stdin.write(input_data) + except BrokenPipeError: + pass + except OSError as exc: + if exc.errno != errno.EINVAL: + raise + if stdin: + try: + stdin.close() + except BrokenPipeError: + pass + except OSError as exc: + if exc.errno != errno.EINVAL: + raise + + # Join threads with timeout + for stream, t, fobj in threads: + remaining = _remaining_time_helper(endtime) + if remaining is not None and remaining < 0: + remaining = 0 + t.join(remaining) + if t.is_alive(): + # Collect partial results + results = {s: (b[0] if b else b'') for s, b in buffers.items()} + raise TimeoutExpired( + cmd_for_timeout, orig_timeout, + output=results.get(read_streams[0]) if read_streams else None) + + # Close any raw fds we duped + for fd in fds_to_close: + try: + os.close(fd) + except OSError: + pass + + # Collect results + return {stream: (buf[0] if buf else b'') for stream, buf in buffers.items()} + +else: + def _communicate_streams_posix(stdin, input_data, read_streams, + endtime, orig_timeout, cmd_for_timeout): + """POSIX implementation using selectors.""" + # Normalize read_streams: build mapping of fd -> (original_key, chunks) + fd_info = {} # fd -> (original_stream, chunks_list) + for stream in read_streams: + if isinstance(stream, int): + fd = stream + else: + fd = stream.fileno() + fd_info[fd] = (stream, []) + + # Prepare stdin + stdin_fd = None + if stdin: + try: + stdin.flush() + except BrokenPipeError: + pass + if input_data: + stdin_fd = stdin.fileno() + else: + try: + stdin.close() + except BrokenPipeError: + pass + + # Prepare input data + input_offset = 0 + input_view = memoryview(input_data) if input_data else None + + with _PopenSelector() as selector: + if stdin_fd is not None and input_data: + selector.register(stdin_fd, selectors.EVENT_WRITE) + for fd in fd_info: + selector.register(fd, selectors.EVENT_READ) + + while selector.get_map(): + remaining = _remaining_time_helper(endtime) + if remaining is not None and remaining < 0: + # Timed out - collect partial results + results = {orig: b''.join(chunks) + for fd, (orig, chunks) in fd_info.items()} + raise TimeoutExpired( + cmd_for_timeout, orig_timeout, + output=results.get(read_streams[0]) if read_streams else None) + + ready = selector.select(remaining) + + # Check timeout after select + if endtime is not None and _time() > endtime: + results = {orig: b''.join(chunks) + for fd, (orig, chunks) in fd_info.items()} + raise TimeoutExpired( + cmd_for_timeout, orig_timeout, + output=results.get(read_streams[0]) if read_streams else None) + + for key, events in ready: + if key.fd == stdin_fd: + # Write chunk to stdin + chunk = input_view[input_offset:input_offset + _PIPE_BUF] + try: + input_offset += os.write(key.fd, chunk) + except BrokenPipeError: + selector.unregister(key.fd) + try: + stdin.close() + except BrokenPipeError: + pass + else: + if input_offset >= len(input_data): + selector.unregister(key.fd) + try: + stdin.close() + except BrokenPipeError: + pass + elif key.fd in fd_info: + # Read chunk from output stream + data = os.read(key.fd, 32768) + if not data: + selector.unregister(key.fd) + else: + fd_info[key.fd][1].append(data) + + # Build results: map original stream keys to joined data + results = {} + for fd, (orig_stream, chunks) in fd_info.items(): + results[orig_stream] = b''.join(chunks) + # Close file objects (but not raw fds - caller manages those) + if not isinstance(orig_stream, int): + try: + orig_stream.close() + except OSError: + pass + + return results + + # XXX This function is only used by multiprocessing and the test suite, # but it's here so that it can be imported when Python is compiled without # threads. @@ -781,54 +995,70 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None, first_proc = processes[0] last_proc = processes[-1] - # Handle communication with timeout - start_time = _time() if timeout is not None else None - - # Write input to first process if provided - if input is not None and first_proc.stdin is not None: - try: - first_proc.stdin.write(input) - except BrokenPipeError: - pass # First process may have exited early - finally: - first_proc.stdin.close() + # Calculate deadline for timeout (used throughout) + if timeout is not None: + endtime = _time() + timeout + else: + endtime = None # Determine if we're in text mode text_mode = kwargs.get('text') or kwargs.get('encoding') or kwargs.get('errors') + encoding = kwargs.get('encoding') + errors_param = kwargs.get('errors', 'strict') + if text_mode and encoding is None: + encoding = locale.getencoding() + + # Encode input if in text mode + input_data = input + if input_data is not None and text_mode: + input_data = input_data.encode(encoding, errors_param) + + # Build list of streams to read from + read_streams = [] + if last_proc.stdout is not None: + read_streams.append(last_proc.stdout) + if stderr_read_fd is not None: + read_streams.append(stderr_read_fd) - # Read output from the last process - stdout = None - stderr = None + # Use multiplexed I/O to handle stdin/stdout/stderr concurrently + # This avoids deadlocks from pipe buffer limits + stdin_stream = first_proc.stdin if input is not None else None - # Read stdout if we created a pipe for it (capture_output or stdout=PIPE) - if last_proc.stdout is not None: - stdout = last_proc.stdout.read() + try: + results = _communicate_streams( + stdin=stdin_stream, + input_data=input_data, + read_streams=read_streams, + timeout=_remaining_time_helper(endtime), + cmd_for_timeout=commands, + ) + except TimeoutExpired: + # Kill all processes on timeout + for p in processes: + if p.poll() is None: + p.kill() + for p in processes: + p.wait() + raise - # Read stderr from the shared pipe - if stderr_read_fd is not None: - stderr = os.read(stderr_read_fd, 1024 * 1024 * 10) # Up to 10MB - # Keep reading until EOF - while True: - chunk = os.read(stderr_read_fd, 65536) - if not chunk: - break - stderr += chunk - - # Calculate remaining timeout - def remaining_timeout(): - if timeout is None: - return None - elapsed = _time() - start_time - remaining = timeout - elapsed - if remaining <= 0: - raise TimeoutExpired(commands, timeout, stdout, stderr) - return remaining + # Extract results + stdout = results.get(last_proc.stdout) + stderr = results.get(stderr_read_fd) - # Wait for all processes to complete + # Decode stdout if in text mode (Popen text mode only applies to + # streams it creates, but we read via _communicate_streams which + # always returns bytes) + if text_mode and stdout is not None: + stdout = stdout.decode(encoding, errors_param) + if text_mode and stderr is not None: + stderr = stderr.decode(encoding, errors_param) + + # Wait for all processes to complete (use remaining time from deadline) returncodes = [] for proc in processes: try: - proc.wait(timeout=remaining_timeout()) + remaining = _remaining_time_helper(endtime) + proc.wait(timeout=remaining) except TimeoutExpired: # Kill all processes on timeout for p in processes: @@ -839,16 +1069,6 @@ def remaining_timeout(): raise TimeoutExpired(commands, timeout, stdout, stderr) returncodes.append(proc.returncode) - # Handle text mode conversion for stderr (stdout is already handled - # by Popen when text=True). stderr is always read as bytes since - # we use os.pipe() directly. - if text_mode and stderr is not None: - encoding = kwargs.get('encoding') - errors = kwargs.get('errors', 'strict') - if encoding is None: - encoding = locale.getencoding() - stderr = stderr.decode(encoding, errors) - result = PipelineResult(commands, returncodes, stdout, stderr) if check and any(rc != 0 for rc in returncodes): @@ -867,7 +1087,7 @@ def remaining_timeout(): proc.stdin.close() if proc.stdout and not proc.stdout.closed: proc.stdout.close() - # Close stderr pipe file descriptors + # Close stderr pipe file descriptor if stderr_read_fd is not None: try: os.close(stderr_read_fd) From 2470e14a70ca4ed6c6ea230476c80b04b1f72bd0 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith using claude.ai/code" Date: Fri, 28 Nov 2025 06:39:05 +0000 Subject: [PATCH 04/12] Add deadlock prevention tests for run_pipeline() Add three tests that verify the multiplexed I/O implementation properly handles large data volumes that would otherwise cause pipe buffer deadlocks: - test_pipeline_large_data_no_deadlock: 256KB through 2-stage pipeline - test_pipeline_large_data_three_stages: 128KB through 3-stage pipeline - test_pipeline_large_data_with_stderr: 64KB with concurrent stderr These tests would timeout or deadlock without proper multiplexing. Co-authored-by: Claude Opus 4.5 --- Lib/test/test_subprocess.py | 94 +++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 01aa6c02dc26bb..5ab791918a2029 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -2194,6 +2194,100 @@ def test_pipeline_stdout_to_devnull(self): self.assertIsNone(result.stdout) self.assertEqual(result.returncodes, [0, 0]) + def test_pipeline_large_data_no_deadlock(self): + """Test that large data doesn't cause pipe buffer deadlock. + + This test verifies that the multiplexed I/O implementation properly + handles cases where pipe buffers would fill up. Without proper + multiplexing, this would deadlock because: + 1. First process outputs large data filling stdout pipe buffer + 2. Middle process reads some, processes, writes to its stdout + 3. If stdout pipe buffer fills, middle process blocks on write + 4. But first process is blocked waiting for middle to read more + 5. Classic deadlock + + The test uses data larger than typical pipe buffer size (64KB on Linux) + to ensure the multiplexed I/O is working correctly. + """ + # Generate data larger than typical pipe buffer (64KB) + # Use 256KB to ensure we exceed buffer on most systems + large_data = 'x' * (256 * 1024) + + # Pipeline: input -> double the data -> count chars + # The middle process outputs twice as much, increasing buffer pressure + result = subprocess.run_pipeline( + [sys.executable, '-c', + 'import sys; data = sys.stdin.read(); print(data + data)'], + [sys.executable, '-c', + 'import sys; print(len(sys.stdin.read().strip()))'], + input=large_data, capture_output=True, text=True, timeout=30 + ) + + # Original data doubled = 512KB = 524288 chars + # Second process strips whitespace (removes trailing newline) then counts + expected_len = 256 * 1024 * 2 # doubled data, newline stripped + self.assertEqual(result.stdout.strip(), str(expected_len)) + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_large_data_three_stages(self): + """Test large data through a three-stage pipeline. + + This is a more complex deadlock scenario with three processes, + where buffer pressure can occur at multiple points. + """ + # Use 128KB of data + large_data = 'y' * (128 * 1024) + + # Pipeline: input -> uppercase -> add prefix to each line -> count + # We use line-based processing to create more buffer churn + result = subprocess.run_pipeline( + [sys.executable, '-c', + 'import sys; print(sys.stdin.read().upper())'], + [sys.executable, '-c', + 'import sys; print("".join("PREFIX:" + line for line in sys.stdin))'], + [sys.executable, '-c', + 'import sys; print(len(sys.stdin.read()))'], + input=large_data, capture_output=True, text=True, timeout=30 + ) + + self.assertEqual(result.returncodes, [0, 0, 0]) + # Just verify we got a reasonable numeric output without deadlock + output_len = int(result.stdout.strip()) + self.assertGreater(output_len, len(large_data)) + + def test_pipeline_large_data_with_stderr(self): + """Test large data with stderr output from multiple processes. + + Ensures stderr collection doesn't interfere with the main data flow + and doesn't cause deadlocks when multiple processes write stderr. + """ + # 64KB of data + data_size = 64 * 1024 + large_data = 'z' * data_size + + result = subprocess.run_pipeline( + [sys.executable, '-c', ''' +import sys +sys.stderr.write("stage1 processing\\n") +data = sys.stdin.read() +sys.stderr.write(f"stage1 read {len(data)} bytes\\n") +print(data) +'''], + [sys.executable, '-c', ''' +import sys +sys.stderr.write("stage2 processing\\n") +data = sys.stdin.read() +sys.stderr.write(f"stage2 read {len(data)} bytes\\n") +print(len(data.strip())) +'''], + input=large_data, capture_output=True, text=True, timeout=30 + ) + + self.assertEqual(result.stdout.strip(), str(data_size)) + self.assertIn('stage1 processing', result.stderr) + self.assertIn('stage2 processing', result.stderr) + self.assertEqual(result.returncodes, [0, 0]) + def _get_test_grp_name(): for name_group in ('staff', 'nogroup', 'grp', 'nobody', 'nfsnobody'): From e22d1da9bccb0c1cb40ec6e0d21b6ce16316cd1c Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith using claude.ai/code" Date: Fri, 28 Nov 2025 07:04:28 +0000 Subject: [PATCH 05/12] Simplify _communicate_streams() to only accept file objects Remove support for raw file descriptors in _communicate_streams(), requiring all streams to be file objects. This simplifies both the Windows and POSIX implementations by removing isinstance() checks and fd-wrapping logic. The run_pipeline() function now wraps the stderr pipe's read end with os.fdopen() immediately after creation. This change makes _communicate_streams() more compatible with Popen.communicate() which already uses file objects, enabling potential future refactoring to share the multiplexed I/O logic. Co-authored-by: Claude Opus 4.5 --- Lib/subprocess.py | 80 ++++++++++++++++++----------------------------- 1 file changed, 31 insertions(+), 49 deletions(-) diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 5a6a7086db115e..7d497cc102adf9 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -333,18 +333,19 @@ def _communicate_streams(stdin=None, input_data=None, read_streams=None, """ Multiplex I/O: write input_data to stdin, read from read_streams. - Works with both file objects and raw file descriptors. + All streams must be file objects (not raw file descriptors). All I/O is done in binary mode; caller handles text encoding. Args: - stdin: Writable file object for input, or None + stdin: Writable binary file object for input, or None input_data: Bytes to write to stdin, or None - read_streams: List of readable file objects or raw fds to read from + read_streams: List of readable binary file objects to read from timeout: Timeout in seconds, or None for no timeout cmd_for_timeout: Value to use for TimeoutExpired.cmd Returns: - Dict mapping each item in read_streams to its bytes data + Dict mapping each file object in read_streams to its bytes data. + All file objects in read_streams will be closed. Raises: TimeoutExpired: If timeout expires (with partial data) @@ -377,22 +378,15 @@ def _communicate_streams_windows(stdin, input_data, read_streams, """Windows implementation using threads.""" threads = [] buffers = {} - fds_to_close = [] - # Start reader threads + # Start reader threads for each stream for stream in read_streams: buf = [] buffers[stream] = buf - # Wrap raw fds in file objects - if isinstance(stream, int): - fobj = os.fdopen(os.dup(stream), 'rb') - fds_to_close.append(stream) - else: - fobj = stream - t = threading.Thread(target=_reader_thread_func, args=(fobj, buf)) + t = threading.Thread(target=_reader_thread_func, args=(stream, buf)) t.daemon = True t.start() - threads.append((stream, t, fobj)) + threads.append((stream, t)) # Write stdin if stdin and input_data: @@ -413,7 +407,7 @@ def _communicate_streams_windows(stdin, input_data, read_streams, raise # Join threads with timeout - for stream, t, fobj in threads: + for stream, t in threads: remaining = _remaining_time_helper(endtime) if remaining is not None and remaining < 0: remaining = 0 @@ -425,13 +419,6 @@ def _communicate_streams_windows(stdin, input_data, read_streams, cmd_for_timeout, orig_timeout, output=results.get(read_streams[0]) if read_streams else None) - # Close any raw fds we duped - for fd in fds_to_close: - try: - os.close(fd) - except OSError: - pass - # Collect results return {stream: (buf[0] if buf else b'') for stream, buf in buffers.items()} @@ -439,14 +426,10 @@ def _communicate_streams_windows(stdin, input_data, read_streams, def _communicate_streams_posix(stdin, input_data, read_streams, endtime, orig_timeout, cmd_for_timeout): """POSIX implementation using selectors.""" - # Normalize read_streams: build mapping of fd -> (original_key, chunks) - fd_info = {} # fd -> (original_stream, chunks_list) + # Build mapping of fd -> (file_object, chunks_list) + fd_info = {} for stream in read_streams: - if isinstance(stream, int): - fd = stream - else: - fd = stream.fileno() - fd_info[fd] = (stream, []) + fd_info[stream.fileno()] = (stream, []) # Prepare stdin stdin_fd = None @@ -477,8 +460,8 @@ def _communicate_streams_posix(stdin, input_data, read_streams, remaining = _remaining_time_helper(endtime) if remaining is not None and remaining < 0: # Timed out - collect partial results - results = {orig: b''.join(chunks) - for fd, (orig, chunks) in fd_info.items()} + results = {stream: b''.join(chunks) + for fd, (stream, chunks) in fd_info.items()} raise TimeoutExpired( cmd_for_timeout, orig_timeout, output=results.get(read_streams[0]) if read_streams else None) @@ -487,8 +470,8 @@ def _communicate_streams_posix(stdin, input_data, read_streams, # Check timeout after select if endtime is not None and _time() > endtime: - results = {orig: b''.join(chunks) - for fd, (orig, chunks) in fd_info.items()} + results = {stream: b''.join(chunks) + for fd, (stream, chunks) in fd_info.items()} raise TimeoutExpired( cmd_for_timeout, orig_timeout, output=results.get(read_streams[0]) if read_streams else None) @@ -520,16 +503,14 @@ def _communicate_streams_posix(stdin, input_data, read_streams, else: fd_info[key.fd][1].append(data) - # Build results: map original stream keys to joined data + # Build results and close all file objects results = {} - for fd, (orig_stream, chunks) in fd_info.items(): - results[orig_stream] = b''.join(chunks) - # Close file objects (but not raw fds - caller manages those) - if not isinstance(orig_stream, int): - try: - orig_stream.close() - except OSError: - pass + for fd, (stream, chunks) in fd_info.items(): + results[stream] = b''.join(chunks) + try: + stream.close() + except OSError: + pass return results @@ -942,13 +923,14 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None, stdout_arg = kwargs.pop('stdout', None) processes = [] - stderr_read_fd = None # Read end of shared stderr pipe (for parent) + stderr_reader = None # File object for reading shared stderr (for parent) stderr_write_fd = None # Write end of shared stderr pipe (for children) try: # Create a single stderr pipe that all processes will share if capture_stderr: stderr_read_fd, stderr_write_fd = os.pipe() + stderr_reader = os.fdopen(stderr_read_fd, 'rb') for i, cmd in enumerate(commands): is_first = (i == 0) @@ -1017,8 +999,8 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None, read_streams = [] if last_proc.stdout is not None: read_streams.append(last_proc.stdout) - if stderr_read_fd is not None: - read_streams.append(stderr_read_fd) + if stderr_reader is not None: + read_streams.append(stderr_reader) # Use multiplexed I/O to handle stdin/stdout/stderr concurrently # This avoids deadlocks from pipe buffer limits @@ -1043,7 +1025,7 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None, # Extract results stdout = results.get(last_proc.stdout) - stderr = results.get(stderr_read_fd) + stderr = results.get(stderr_reader) # Decode stdout if in text mode (Popen text mode only applies to # streams it creates, but we read via _communicate_streams which @@ -1087,10 +1069,10 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None, proc.stdin.close() if proc.stdout and not proc.stdout.closed: proc.stdout.close() - # Close stderr pipe file descriptor - if stderr_read_fd is not None: + # Close stderr pipe (reader is a file object, writer is a raw fd) + if stderr_reader is not None and not stderr_reader.closed: try: - os.close(stderr_read_fd) + stderr_reader.close() except OSError: pass if stderr_write_fd is not None: From a3e98a73be3d1f3e260ca5cba518306a325c1953 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith using claude.ai/code" Date: Fri, 28 Nov 2025 07:07:43 +0000 Subject: [PATCH 06/12] Improve test_pipeline_large_data_with_stderr to use large stderr Update the test to write 64KB to stderr from each process (128KB total) instead of just small status messages. This better tests that the multiplexed I/O handles concurrent large data on both stdout and stderr without deadlocking. Co-authored-by: Claude Opus 4.5 --- Lib/test/test_subprocess.py | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 5ab791918a2029..58d8c1385b2df4 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -2256,36 +2256,46 @@ def test_pipeline_large_data_three_stages(self): self.assertGreater(output_len, len(large_data)) def test_pipeline_large_data_with_stderr(self): - """Test large data with stderr output from multiple processes. + """Test large data with large stderr output from multiple processes. Ensures stderr collection doesn't interfere with the main data flow - and doesn't cause deadlocks when multiple processes write stderr. + and doesn't cause deadlocks when multiple processes write large + amounts to stderr concurrently with stdin/stdout data flow. """ - # 64KB of data + # 64KB of data through the pipeline data_size = 64 * 1024 large_data = 'z' * data_size + # Each process writes 64KB to stderr as well + stderr_size = 64 * 1024 result = subprocess.run_pipeline( - [sys.executable, '-c', ''' + [sys.executable, '-c', f''' import sys -sys.stderr.write("stage1 processing\\n") +# Write large stderr output +sys.stderr.write("E" * {stderr_size}) +sys.stderr.write("\\nstage1 done\\n") +# Pass through stdin to stdout data = sys.stdin.read() -sys.stderr.write(f"stage1 read {len(data)} bytes\\n") print(data) '''], - [sys.executable, '-c', ''' + [sys.executable, '-c', f''' import sys -sys.stderr.write("stage2 processing\\n") +# Write large stderr output +sys.stderr.write("F" * {stderr_size}) +sys.stderr.write("\\nstage2 done\\n") +# Count input size data = sys.stdin.read() -sys.stderr.write(f"stage2 read {len(data)} bytes\\n") print(len(data.strip())) '''], input=large_data, capture_output=True, text=True, timeout=30 ) self.assertEqual(result.stdout.strip(), str(data_size)) - self.assertIn('stage1 processing', result.stderr) - self.assertIn('stage2 processing', result.stderr) + # Verify both processes wrote to stderr + self.assertIn('stage1 done', result.stderr) + self.assertIn('stage2 done', result.stderr) + # Verify large stderr was captured (at least most of it) + self.assertGreater(len(result.stderr), stderr_size) self.assertEqual(result.returncodes, [0, 0]) From 3c28ed6e93313b91365780441cfc1f90ff330a94 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith using claude.ai/code" Date: Sat, 29 Nov 2025 00:34:04 +0000 Subject: [PATCH 07/12] Remove obsolete XXX comment about non-blocking I/O The comment suggested rewriting Popen._communicate() to use non-blocking I/O on file objects now that Python 3's io module is used instead of C stdio. This is unnecessary - the current approach using select() to detect ready fds followed by os.read()/os.write() is correct and efficient. The selector already solves "when is data ready?" so non-blocking mode would add complexity with no benefit. Co-authored-by: Claude Opus 4.5 --- Lib/subprocess.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 7d497cc102adf9..4c92d17da93342 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -2655,9 +2655,6 @@ def _communicate(self, input, endtime, orig_timeout): ready = selector.select(timeout) self._check_timeout(endtime, orig_timeout, stdout, stderr) - # XXX Rewrite these to use non-blocking I/O on the file - # objects; they are no longer using C stdio! - for key, events in ready: if key.fileobj is self.stdin: chunk = input_view[self._input_offset : From 9f53a8e883e059cd78cea3dc58200ed09348297f Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith using claude.ai/code" Date: Sat, 29 Nov 2025 00:54:27 +0000 Subject: [PATCH 08/12] Refactor POSIX communicate I/O into shared _communicate_io_posix() Extract the core selector-based I/O loop into a new _communicate_io_posix() function that is shared by both _communicate_streams_posix() (used by run_pipeline) and Popen._communicate() (used by Popen.communicate). The new function: - Takes a pre-configured selector and output buffers - Supports resume via input_offset parameter (for Popen timeout retry) - Returns (new_offset, completed) instead of raising TimeoutExpired - Does not close streams (caller decides based on use case) This reduces code duplication and ensures both APIs use the same well-tested I/O multiplexing logic. Co-authored-by: Claude Opus 4.5 --- Lib/subprocess.py | 216 ++++++++++++++++++++++++++-------------------- 1 file changed, 122 insertions(+), 94 deletions(-) diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 4c92d17da93342..50b437e3ee5ae7 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -320,7 +320,7 @@ def _cleanup(): DEVNULL = -3 -# Helper function for multiplexed I/O, used by run_pipeline() +# Helper function for multiplexed I/O def _remaining_time_helper(endtime): """Calculate remaining time until deadline.""" if endtime is None: @@ -328,6 +328,76 @@ def _remaining_time_helper(endtime): return endtime - _time() +def _communicate_io_posix(selector, stdin, input_view, input_offset, + output_buffers, endtime): + """ + Low-level POSIX I/O multiplexing loop. + + This is the common core used by both _communicate_streams() and + Popen._communicate(). It handles the select loop for reading/writing + but does not manage stream lifecycle or raise timeout exceptions. + + Args: + selector: A _PopenSelector with streams already registered + stdin: Writable file object for input, or None + input_view: memoryview of input bytes, or None + input_offset: Starting offset into input_view (for resume support) + output_buffers: Dict {file_object: list} to append read chunks to + endtime: Deadline timestamp, or None for no timeout + + Returns: + (new_input_offset, completed) + - new_input_offset: How many bytes of input were written + - completed: True if all I/O finished, False if timed out + + Note: + - Does NOT close any streams (caller decides) + - Does NOT raise TimeoutExpired (caller handles) + - Appends to output_buffers lists in place + """ + stdin_fd = stdin.fileno() if stdin else None + + while selector.get_map(): + remaining = _remaining_time_helper(endtime) + if remaining is not None and remaining < 0: + return (input_offset, False) # Timed out + + ready = selector.select(remaining) + + # Check timeout after select (may have woken spuriously) + if endtime is not None and _time() > endtime: + return (input_offset, False) # Timed out + + for key, events in ready: + if key.fd == stdin_fd: + # Write chunk to stdin + chunk = input_view[input_offset:input_offset + _PIPE_BUF] + try: + input_offset += os.write(key.fd, chunk) + except BrokenPipeError: + selector.unregister(key.fd) + try: + stdin.close() + except BrokenPipeError: + pass + else: + if input_offset >= len(input_view): + selector.unregister(key.fd) + try: + stdin.close() + except BrokenPipeError: + pass + elif key.fileobj in output_buffers: + # Read chunk from output stream + data = os.read(key.fd, 32768) + if not data: + selector.unregister(key.fileobj) + else: + output_buffers[key.fileobj].append(data) + + return (input_offset, True) # Completed + + def _communicate_streams(stdin=None, input_data=None, read_streams=None, timeout=None, cmd_for_timeout=None): """ @@ -426,86 +496,46 @@ def _communicate_streams_windows(stdin, input_data, read_streams, def _communicate_streams_posix(stdin, input_data, read_streams, endtime, orig_timeout, cmd_for_timeout): """POSIX implementation using selectors.""" - # Build mapping of fd -> (file_object, chunks_list) - fd_info = {} - for stream in read_streams: - fd_info[stream.fileno()] = (stream, []) + # Build output buffers for each stream + output_buffers = {stream: [] for stream in read_streams} # Prepare stdin - stdin_fd = None if stdin: try: stdin.flush() except BrokenPipeError: pass - if input_data: - stdin_fd = stdin.fileno() - else: + if not input_data: try: stdin.close() except BrokenPipeError: pass + stdin = None # Don't register with selector # Prepare input data - input_offset = 0 input_view = memoryview(input_data) if input_data else None with _PopenSelector() as selector: - if stdin_fd is not None and input_data: - selector.register(stdin_fd, selectors.EVENT_WRITE) - for fd in fd_info: - selector.register(fd, selectors.EVENT_READ) - - while selector.get_map(): - remaining = _remaining_time_helper(endtime) - if remaining is not None and remaining < 0: - # Timed out - collect partial results - results = {stream: b''.join(chunks) - for fd, (stream, chunks) in fd_info.items()} - raise TimeoutExpired( - cmd_for_timeout, orig_timeout, - output=results.get(read_streams[0]) if read_streams else None) - - ready = selector.select(remaining) - - # Check timeout after select - if endtime is not None and _time() > endtime: - results = {stream: b''.join(chunks) - for fd, (stream, chunks) in fd_info.items()} - raise TimeoutExpired( - cmd_for_timeout, orig_timeout, - output=results.get(read_streams[0]) if read_streams else None) - - for key, events in ready: - if key.fd == stdin_fd: - # Write chunk to stdin - chunk = input_view[input_offset:input_offset + _PIPE_BUF] - try: - input_offset += os.write(key.fd, chunk) - except BrokenPipeError: - selector.unregister(key.fd) - try: - stdin.close() - except BrokenPipeError: - pass - else: - if input_offset >= len(input_data): - selector.unregister(key.fd) - try: - stdin.close() - except BrokenPipeError: - pass - elif key.fd in fd_info: - # Read chunk from output stream - data = os.read(key.fd, 32768) - if not data: - selector.unregister(key.fd) - else: - fd_info[key.fd][1].append(data) + if stdin and input_data: + selector.register(stdin, selectors.EVENT_WRITE) + for stream in read_streams: + selector.register(stream, selectors.EVENT_READ) + + # Run the common I/O loop + _, completed = _communicate_io_posix( + selector, stdin, input_view, 0, output_buffers, endtime) + + if not completed: + # Timed out - collect partial results + results = {stream: b''.join(chunks) + for stream, chunks in output_buffers.items()} + raise TimeoutExpired( + cmd_for_timeout, orig_timeout, + output=results.get(read_streams[0]) if read_streams else None) # Build results and close all file objects results = {} - for fd, (stream, chunks) in fd_info.items(): + for stream, chunks in output_buffers.items(): results[stream] = b''.join(chunks) try: stream.close() @@ -2633,6 +2663,10 @@ def _communicate(self, input, endtime, orig_timeout): input_view = memoryview(self._input) else: input_view = self._input.cast("b") # byte input required + input_offset = self._input_offset + else: + input_view = None + input_offset = 0 with _PopenSelector() as selector: if self.stdin and not self.stdin.closed and self._input: @@ -2642,38 +2676,32 @@ def _communicate(self, input, endtime, orig_timeout): if self.stderr and not self.stderr.closed: selector.register(self.stderr, selectors.EVENT_READ) - while selector.get_map(): - timeout = self._remaining_time(endtime) - if timeout is not None and timeout < 0: - self._check_timeout(endtime, orig_timeout, - stdout, stderr, - skip_check_and_raise=True) - raise RuntimeError( # Impossible :) - '_check_timeout(..., skip_check_and_raise=True) ' - 'failed to raise TimeoutExpired.') - - ready = selector.select(timeout) - self._check_timeout(endtime, orig_timeout, stdout, stderr) - - for key, events in ready: - if key.fileobj is self.stdin: - chunk = input_view[self._input_offset : - self._input_offset + _PIPE_BUF] - try: - self._input_offset += os.write(key.fd, chunk) - except BrokenPipeError: - selector.unregister(key.fileobj) - key.fileobj.close() - else: - if self._input_offset >= len(input_view): - selector.unregister(key.fileobj) - key.fileobj.close() - elif key.fileobj in (self.stdout, self.stderr): - data = os.read(key.fd, 32768) - if not data: - selector.unregister(key.fileobj) - key.fileobj.close() - self._fileobj2output[key.fileobj].append(data) + # Use the common I/O loop (supports resume via _input_offset) + stdin_to_write = (self.stdin if self.stdin and self._input + and not self.stdin.closed else None) + new_offset, completed = _communicate_io_posix( + selector, + stdin_to_write, + input_view, + input_offset, + self._fileobj2output, + endtime) + if self._input: + self._input_offset = new_offset + + if not completed: + self._check_timeout(endtime, orig_timeout, stdout, stderr, + skip_check_and_raise=True) + raise RuntimeError( # Impossible :) + '_check_timeout(..., skip_check_and_raise=True) ' + 'failed to raise TimeoutExpired.') + + # Close streams now that we're done reading + if self.stdout: + self.stdout.close() + if self.stderr: + self.stderr.close() + try: self.wait(timeout=self._remaining_time(endtime)) except TimeoutExpired as exc: From d420f29e2bc04b97f82c20e3822cd3b6e68cf4f4 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith using claude.ai/code" Date: Sat, 29 Nov 2025 08:41:25 +0000 Subject: [PATCH 09/12] Fix _communicate_streams_windows to avoid blocking with large input Move stdin writing to a background thread in _communicate_streams_windows to avoid blocking indefinitely when writing large input to a pipeline where the subprocess doesn't consume stdin quickly. This mirrors the fix made to Popen._communicate() for Windows in commit 5b1862b (gh-87512). Add test_pipeline_timeout_large_input to verify that TimeoutExpired is raised promptly when run_pipeline() is called with large input and a timeout, even when the first process is slow to consume stdin. Co-authored-by: Claude Opus 4.5 --- Lib/subprocess.py | 71 +++++++++++++++++++++++++++---------- Lib/test/test_subprocess.py | 33 +++++++++++++++++ 2 files changed, 86 insertions(+), 18 deletions(-) diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 50b437e3ee5ae7..d360af52323138 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -443,11 +443,48 @@ def _reader_thread_func(fh, buffer): except OSError: buffer.append(b'') + def _writer_thread_func(fh, data, result): + """Thread function to write data to a file handle and close it.""" + try: + if data: + fh.write(data) + except BrokenPipeError: + pass + except OSError as exc: + if exc.errno != errno.EINVAL: + result.append(exc) + try: + fh.close() + except BrokenPipeError: + pass + except OSError as exc: + if exc.errno != errno.EINVAL and not result: + result.append(exc) + def _communicate_streams_windows(stdin, input_data, read_streams, endtime, orig_timeout, cmd_for_timeout): """Windows implementation using threads.""" threads = [] buffers = {} + writer_thread = None + writer_result = [] + + # Start writer thread to send input to stdin + if stdin and input_data: + writer_thread = threading.Thread( + target=_writer_thread_func, + args=(stdin, input_data, writer_result)) + writer_thread.daemon = True + writer_thread.start() + elif stdin: + # No input data, just close stdin + try: + stdin.close() + except BrokenPipeError: + pass + except OSError as exc: + if exc.errno != errno.EINVAL: + raise # Start reader threads for each stream for stream in read_streams: @@ -458,25 +495,23 @@ def _communicate_streams_windows(stdin, input_data, read_streams, t.start() threads.append((stream, t)) - # Write stdin - if stdin and input_data: - try: - stdin.write(input_data) - except BrokenPipeError: - pass - except OSError as exc: - if exc.errno != errno.EINVAL: - raise - if stdin: - try: - stdin.close() - except BrokenPipeError: - pass - except OSError as exc: - if exc.errno != errno.EINVAL: - raise + # Join writer thread with timeout first + if writer_thread is not None: + remaining = _remaining_time_helper(endtime) + if remaining is not None and remaining < 0: + remaining = 0 + writer_thread.join(remaining) + if writer_thread.is_alive(): + # Timed out during write - collect partial results + results = {s: (b[0] if b else b'') for s, b in buffers.items()} + raise TimeoutExpired( + cmd_for_timeout, orig_timeout, + output=results.get(read_streams[0]) if read_streams else None) + # Check for write errors + if writer_result: + raise writer_result[0] - # Join threads with timeout + # Join reader threads with timeout for stream, t in threads: remaining = _remaining_time_helper(endtime) if remaining is not None and remaining < 0: diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 58d8c1385b2df4..b0bc13e3d11f58 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -2298,6 +2298,39 @@ def test_pipeline_large_data_with_stderr(self): self.assertGreater(len(result.stderr), stderr_size) self.assertEqual(result.returncodes, [0, 0]) + def test_pipeline_timeout_large_input(self): + """Test that timeout is enforced with large input to a slow pipeline. + + This verifies that run_pipeline() doesn't block indefinitely when + writing large input to a pipeline where the first process is slow + to consume stdin. The timeout should be enforced promptly. + + This is particularly important on Windows where stdin writing could + block without proper threading. + """ + # Input larger than typical pipe buffer (64KB) + input_data = 'x' * (128 * 1024) + + start = time.monotonic() + with self.assertRaises(subprocess.TimeoutExpired): + subprocess.run_pipeline( + # First process sleeps before reading - simulates slow consumer + [sys.executable, '-c', + 'import sys, time; time.sleep(30); print(sys.stdin.read())'], + [sys.executable, '-c', + 'import sys; print(len(sys.stdin.read()))'], + input=input_data, capture_output=True, text=True, timeout=0.5 + ) + elapsed = time.monotonic() - start + + # Timeout should occur close to the specified timeout value, + # not after waiting for the subprocess to finish sleeping. + # Allow generous margin for slow CI, but must be well under + # the subprocess sleep time. + self.assertLess(elapsed, 5.0, + f"TimeoutExpired raised after {elapsed:.2f}s; expected ~0.5s. " + "Input writing may have blocked without checking timeout.") + def _get_test_grp_name(): for name_group in ('staff', 'nogroup', 'grp', 'nobody', 'nfsnobody'): From df8f082f5971febe17ea3a9d0534f76b905e0556 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith using claude.ai/code" Date: Sat, 29 Nov 2025 08:46:22 +0000 Subject: [PATCH 10/12] Fix memoryview and closed stdin handling in _communicate_streams_posix Apply the same fixes from Popen._communicate() to _communicate_streams_posix for run_pipeline(): 1. Handle non-byte memoryview input by casting to byte view (gh-134453): Non-byte memoryviews (e.g., int32 arrays) had incorrect length tracking because len() returns element count, not byte count. Now cast to "b" view for correct progress tracking. 2. Handle ValueError on stdin.flush() when stdin is closed (gh-74389): Ignore ValueError from flush() if stdin is already closed, matching the BrokenPipeError handling. Add tests for memoryview input to run_pipeline: - test_pipeline_memoryview_input: basic byte memoryview - test_pipeline_memoryview_input_nonbyte: int32 array memoryview Co-authored-by: Claude Opus 4.5 --- Lib/subprocess.py | 14 +++++++++++-- Lib/test/test_subprocess.py | 42 +++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/Lib/subprocess.py b/Lib/subprocess.py index d360af52323138..b783b3b3397180 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -540,6 +540,10 @@ def _communicate_streams_posix(stdin, input_data, read_streams, stdin.flush() except BrokenPipeError: pass + except ValueError: + # ignore ValueError: I/O operation on closed file. + if not stdin.closed: + raise if not input_data: try: stdin.close() @@ -547,8 +551,14 @@ def _communicate_streams_posix(stdin, input_data, read_streams, pass stdin = None # Don't register with selector - # Prepare input data - input_view = memoryview(input_data) if input_data else None + # Prepare input data - cast to bytes view for correct length tracking + if input_data: + if not isinstance(input_data, memoryview): + input_view = memoryview(input_data) + else: + input_view = input_data.cast("b") # byte view required + else: + input_view = None with _PopenSelector() as selector: if stdin and input_data: diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index b0bc13e3d11f58..f91d1f3522f902 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -2019,6 +2019,48 @@ def test_pipeline_with_input(self): self.assertEqual(result.stdout.strip(), '5') self.assertEqual(result.returncodes, [0, 0]) + def test_pipeline_memoryview_input(self): + """Test pipeline with memoryview input (byte elements)""" + test_data = b"Hello, memoryview pipeline!" + mv = memoryview(test_data) + result = subprocess.run_pipeline( + [sys.executable, '-c', + 'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read())'], + [sys.executable, '-c', + 'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read().upper())'], + input=mv, capture_output=True + ) + self.assertEqual(result.stdout, test_data.upper()) + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_memoryview_input_nonbyte(self): + """Test pipeline with non-byte memoryview input (e.g., int32). + + This tests the fix for gh-134453 where non-byte memoryviews + had incorrect length tracking on POSIX, causing data truncation. + """ + import array + # Create an array of 32-bit integers large enough to trigger + # chunked writing behavior (> PIPE_BUF) + pipe_buf = getattr(select, 'PIPE_BUF', 512) + # Each 'i' element is 4 bytes, need more than pipe_buf bytes total + num_elements = (pipe_buf // 4) + 100 + test_array = array.array('i', [0x41424344 for _ in range(num_elements)]) + expected_bytes = test_array.tobytes() + mv = memoryview(test_array) + + result = subprocess.run_pipeline( + [sys.executable, '-c', + 'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read())'], + [sys.executable, '-c', + 'import sys; data = sys.stdin.buffer.read(); ' + 'sys.stdout.buffer.write(data)'], + input=mv, capture_output=True + ) + self.assertEqual(result.stdout, expected_bytes, + msg=f"{len(result.stdout)=} != {len(expected_bytes)=}") + self.assertEqual(result.returncodes, [0, 0]) + def test_pipeline_bytes_mode(self): """Test pipeline in binary mode""" result = subprocess.run_pipeline( From 978cd76cd8929e98af69895497c7688cab0fce35 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith using claude.ai/code" Date: Sat, 29 Nov 2025 08:56:34 +0000 Subject: [PATCH 11/12] Factor out _flush_stdin() and _make_input_view() helpers Extract common stdin preparation logic into shared helper functions used by both _communicate_streams_posix() and Popen._communicate(): - _flush_stdin(stdin): Flush stdin, ignoring BrokenPipeError and ValueError (for closed files) - _make_input_view(input_data): Convert input data to a byte memoryview, handling non-byte memoryviews by casting to "b" view This ensures consistent behavior and makes the fixes for gh-134453 (memoryview) and gh-74389 (closed stdin) shared in one place. Co-authored-by: Claude Opus 4.5 --- Lib/subprocess.py | 65 +++++++++++++++++++++++------------------------ 1 file changed, 32 insertions(+), 33 deletions(-) diff --git a/Lib/subprocess.py b/Lib/subprocess.py index b783b3b3397180..088a8ebb93631a 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -328,6 +328,32 @@ def _remaining_time_helper(endtime): return endtime - _time() +def _flush_stdin(stdin): + """Flush stdin, ignoring BrokenPipeError and closed file ValueError.""" + try: + stdin.flush() + except BrokenPipeError: + pass + except ValueError: + # Ignore ValueError: I/O operation on closed file. + if not stdin.closed: + raise + + +def _make_input_view(input_data): + """Convert input data to a byte memoryview for writing. + + Handles the case where input_data is already a memoryview with + non-byte elements (e.g., int32 array) by casting to a byte view. + This ensures len(view) returns the byte count, not element count. + """ + if not input_data: + return None + if isinstance(input_data, memoryview): + return input_data.cast("b") # ensure byte view for correct len() + return memoryview(input_data) + + def _communicate_io_posix(selector, stdin, input_view, input_offset, output_buffers, endtime): """ @@ -536,14 +562,7 @@ def _communicate_streams_posix(stdin, input_data, read_streams, # Prepare stdin if stdin: - try: - stdin.flush() - except BrokenPipeError: - pass - except ValueError: - # ignore ValueError: I/O operation on closed file. - if not stdin.closed: - raise + _flush_stdin(stdin) if not input_data: try: stdin.close() @@ -551,14 +570,8 @@ def _communicate_streams_posix(stdin, input_data, read_streams, pass stdin = None # Don't register with selector - # Prepare input data - cast to bytes view for correct length tracking - if input_data: - if not isinstance(input_data, memoryview): - input_view = memoryview(input_data) - else: - input_view = input_data.cast("b") # byte view required - else: - input_view = None + # Prepare input data + input_view = _make_input_view(input_data) with _PopenSelector() as selector: if stdin and input_data: @@ -2671,14 +2684,7 @@ def _communicate(self, input, endtime, orig_timeout): if self.stdin and not self._communication_started: # Flush stdio buffer. This might block, if the user has # been writing to .stdin in an uncontrolled fashion. - try: - self.stdin.flush() - except BrokenPipeError: - pass # communicate() must ignore BrokenPipeError. - except ValueError: - # ignore ValueError: I/O operation on closed file. - if not self.stdin.closed: - raise + _flush_stdin(self.stdin) if not input: try: self.stdin.close() @@ -2703,15 +2709,8 @@ def _communicate(self, input, endtime, orig_timeout): self._save_input(input) - if self._input: - if not isinstance(self._input, memoryview): - input_view = memoryview(self._input) - else: - input_view = self._input.cast("b") # byte input required - input_offset = self._input_offset - else: - input_view = None - input_offset = 0 + input_view = _make_input_view(self._input) + input_offset = self._input_offset if self._input else 0 with _PopenSelector() as selector: if self.stdin and not self.stdin.closed and self._input: From 15f8a93bcba2f4888a5f32b987d7182dc0a39504 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith using claude.ai/code" Date: Sat, 29 Nov 2025 09:27:29 +0000 Subject: [PATCH 12/12] Support universal_newlines and use _translate_newlines in run_pipeline - Factor out _translate_newlines() as a module-level function, have Popen's method delegate to it for code sharing - Remove rejection of universal_newlines kwarg in run_pipeline(), treat it the same as text=True (consistent with Popen behavior) - Use _translate_newlines() for text mode decoding in run_pipeline() to properly handle \r\n and \r newline sequences - Update documentation to remove mention of universal_newlines rejection - Update test to verify universal_newlines=True works like text=True Co-authored-by: Claude --- Doc/library/subprocess.rst | 3 --- Lib/subprocess.py | 29 +++++++++++++---------------- Lib/test/test_subprocess.py | 20 ++++++++++---------- 3 files changed, 23 insertions(+), 29 deletions(-) diff --git a/Doc/library/subprocess.rst b/Doc/library/subprocess.rst index da1d4047851567..95014e2b66d92f 100644 --- a/Doc/library/subprocess.rst +++ b/Doc/library/subprocess.rst @@ -317,9 +317,6 @@ underlying :class:`Popen` interface can be used directly. in the returned :class:`PipelineResult`'s :attr:`~PipelineResult.stdout` attribute. Other keyword arguments are passed to each :class:`Popen` call. - Unlike :func:`run`, this function does not accept *universal_newlines*. - Use ``text=True`` instead. - Examples:: >>> import subprocess diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 088a8ebb93631a..702918b8c38f84 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -354,6 +354,12 @@ def _make_input_view(input_data): return memoryview(input_data) +def _translate_newlines(data, encoding, errors): + """Decode bytes to str and translate newlines to \n.""" + data = data.decode(encoding, errors) + return data.replace("\r\n", "\n").replace("\r", "\n") + + def _communicate_io_posix(selector, stdin, input_view, input_offset, output_buffers, endtime): """ @@ -984,13 +990,6 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None, if len(commands) < 2: raise ValueError('run_pipeline requires at least 2 commands') - # Reject universal_newlines - use text= instead - if kwargs.get('universal_newlines') is not None: - raise TypeError( - "run_pipeline() does not support 'universal_newlines'. " - "Use 'text=True' instead." - ) - # Validate no conflicting arguments if input is not None: if kwargs.get('stdin') is not None: @@ -1071,8 +1070,9 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None, else: endtime = None - # Determine if we're in text mode - text_mode = kwargs.get('text') or kwargs.get('encoding') or kwargs.get('errors') + # Determine if we're in text mode (text= or universal_newlines=) + text_mode = (kwargs.get('text') or kwargs.get('universal_newlines') + or kwargs.get('encoding') or kwargs.get('errors')) encoding = kwargs.get('encoding') errors_param = kwargs.get('errors', 'strict') if text_mode and encoding is None: @@ -1115,13 +1115,11 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None, stdout = results.get(last_proc.stdout) stderr = results.get(stderr_reader) - # Decode stdout if in text mode (Popen text mode only applies to - # streams it creates, but we read via _communicate_streams which - # always returns bytes) + # Translate newlines if in text mode (decode and convert \r\n to \n) if text_mode and stdout is not None: - stdout = stdout.decode(encoding, errors_param) + stdout = _translate_newlines(stdout, encoding, errors_param) if text_mode and stderr is not None: - stderr = stderr.decode(encoding, errors_param) + stderr = _translate_newlines(stderr, encoding, errors_param) # Wait for all processes to complete (use remaining time from deadline) returncodes = [] @@ -1686,8 +1684,7 @@ def universal_newlines(self, universal_newlines): self.text_mode = bool(universal_newlines) def _translate_newlines(self, data, encoding, errors): - data = data.decode(encoding, errors) - return data.replace("\r\n", "\n").replace("\r", "\n") + return _translate_newlines(data, encoding, errors) def __enter__(self): return self diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index f91d1f3522f902..4e43ea7bbfda90 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -2123,16 +2123,16 @@ def test_pipeline_capture_output_conflict(self): ) self.assertIn('capture_output', str(cm.exception)) - def test_pipeline_rejects_universal_newlines(self): - """Test that universal_newlines is not supported""" - with self.assertRaises(TypeError) as cm: - subprocess.run_pipeline( - [sys.executable, '-c', 'pass'], - [sys.executable, '-c', 'pass'], - universal_newlines=True - ) - self.assertIn('universal_newlines', str(cm.exception)) - self.assertIn('text=True', str(cm.exception)) + def test_pipeline_universal_newlines(self): + """Test that universal_newlines=True works like text=True""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + capture_output=True, universal_newlines=True + ) + self.assertIsInstance(result.stdout, str) + self.assertIn('HELLO', result.stdout) + self.assertEqual(result.returncodes, [0, 0]) def test_pipeline_result_repr(self): """Test PipelineResult string representation"""