diff --git a/Doc/library/subprocess.rst b/Doc/library/subprocess.rst index 43da804b62beb1..95014e2b66d92f 100644 --- a/Doc/library/subprocess.rst +++ b/Doc/library/subprocess.rst @@ -264,6 +264,182 @@ underlying :class:`Popen` interface can be used directly. *stdout* and *stderr* attributes added +.. function:: run_pipeline(*commands, stdin=None, input=None, \ + stdout=None, stderr=None, capture_output=False, \ + timeout=None, check=False, encoding=None, \ + errors=None, text=None, env=None, \ + **other_popen_kwargs) + + Run a pipeline of commands connected via pipes, similar to shell pipelines. + Wait for all commands to complete, then return a :class:`PipelineResult` + instance. + + Each positional argument should be a command (a list of strings, or a string + if ``shell=True``) to execute. The standard output of each command is + connected to the standard input of the next command in the pipeline. + + This function requires at least two commands. For a single command, use + :func:`run` instead. + + If *capture_output* is true, the standard output of the final command and + the standard error of all commands will be captured. All processes in the + pipeline share a single stderr pipe, so their error output will be + interleaved. The *stdout* and *stderr* arguments may not be supplied at + the same time as *capture_output*. + + A *timeout* may be specified in seconds. If the timeout expires, all + child processes will be killed and waited for, and then a + :exc:`TimeoutExpired` exception will be raised. + + The *input* argument is passed to the first command's stdin. If used, it + must be a byte sequence, or a string if *encoding* or *errors* is specified + or *text* is true. + + If *check* is true, and any process in the pipeline exits with a non-zero + exit code, a :exc:`PipelineError` exception will be raised. This behavior + is similar to the shell's ``pipefail`` option. + + If *encoding* or *errors* are specified, or *text* is true, file objects + are opened in text mode using the specified encoding and errors. + + .. note:: + + When using ``text=True`` with ``capture_output=True`` or ``stderr=PIPE``, + be aware that stderr output from multiple processes may be interleaved + in ways that produce incomplete multi-byte character sequences. For + reliable text decoding of stderr, consider capturing in binary mode + and decoding manually with appropriate error handling, or use + ``errors='replace'`` or ``errors='backslashreplace'``. + + If *stdin* is specified, it is connected to the first command's standard + input. If *stdout* is specified, it is connected to the last command's + standard output. When *stdout* is :data:`PIPE`, the output is available + in the returned :class:`PipelineResult`'s :attr:`~PipelineResult.stdout` + attribute. Other keyword arguments are passed to each :class:`Popen` call. + + Examples:: + + >>> import subprocess + >>> # Equivalent to: echo "hello world" | tr a-z A-Z + >>> result = subprocess.run_pipeline( + ... ['echo', 'hello world'], + ... ['tr', 'a-z', 'A-Z'], + ... capture_output=True, text=True + ... ) + >>> result.stdout + 'HELLO WORLD\n' + >>> result.returncodes + [0, 0] + + >>> # Pipeline with three commands + >>> result = subprocess.run_pipeline( + ... ['echo', 'one\ntwo\nthree'], + ... ['sort'], + ... ['head', '-n', '2'], + ... capture_output=True, text=True + ... ) + >>> result.stdout + 'one\nthree\n' + + >>> # Using input parameter + >>> result = subprocess.run_pipeline( + ... ['cat'], + ... ['wc', '-l'], + ... input='line1\nline2\nline3\n', + ... capture_output=True, text=True + ... ) + >>> result.stdout.strip() + '3' + + >>> # Error handling with check=True + >>> subprocess.run_pipeline( + ... ['echo', 'hello'], + ... ['false'], # exits with status 1 + ... check=True + ... ) + Traceback (most recent call last): + ... + subprocess.PipelineError: Pipeline failed: command 1 ['false'] returned 1 + + .. versionadded:: next + + +.. class:: PipelineResult + + The return value from :func:`run_pipeline`, representing a pipeline of + processes that have finished. + + .. attribute:: commands + + The list of commands used to launch the pipeline. Each command is a list + of strings (or a string if ``shell=True`` was used). + + .. attribute:: returncodes + + List of exit status codes for each command in the pipeline. Typically, + an exit status of 0 indicates that the command ran successfully. + + A negative value ``-N`` indicates that the command was terminated by + signal ``N`` (POSIX only). + + .. attribute:: returncode + + Exit status of the final command in the pipeline. This is a convenience + property equivalent to ``returncodes[-1]``. + + .. attribute:: stdout + + Captured stdout from the final command in the pipeline. A bytes sequence, + or a string if :func:`run_pipeline` was called with an encoding, errors, + or ``text=True``. ``None`` if stdout was not captured. + + .. attribute:: stderr + + Captured stderr from all commands in the pipeline, combined. A bytes + sequence, or a string if :func:`run_pipeline` was called with an + encoding, errors, or ``text=True``. ``None`` if stderr was not captured. + + .. method:: check_returncodes() + + If any command's :attr:`returncode` is non-zero, raise a + :exc:`PipelineError`. + + .. versionadded:: next + + +.. exception:: PipelineError + + Subclass of :exc:`SubprocessError`, raised when a pipeline run by + :func:`run_pipeline` (with ``check=True``) contains one or more commands + that returned a non-zero exit status. This is similar to the shell's + ``pipefail`` behavior. + + .. attribute:: commands + + List of commands that were used in the pipeline. + + .. attribute:: returncodes + + List of exit status codes for each command in the pipeline. + + .. attribute:: stdout + + Output of the final command if it was captured. Otherwise, ``None``. + + .. attribute:: stderr + + Combined stderr output of all commands if it was captured. + Otherwise, ``None``. + + .. attribute:: failed + + List of ``(index, command, returncode)`` tuples for each command + that returned a non-zero exit status. The *index* is the position + of the command in the pipeline (0-based). + + .. versionadded:: next + + .. _frequently-used-arguments: Frequently Used Arguments diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 4d5ab6fbff0a46..702918b8c38f84 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -62,7 +62,8 @@ __all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput", "getoutput", "check_output", "run", "CalledProcessError", "DEVNULL", - "SubprocessError", "TimeoutExpired", "CompletedProcess"] + "SubprocessError", "TimeoutExpired", "CompletedProcess", + "run_pipeline", "PipelineResult", "PipelineError"] # NOTE: We intentionally exclude list2cmdline as it is # considered an internal implementation detail. issue10838. @@ -194,6 +195,36 @@ def stdout(self, value): self.output = value +class PipelineError(SubprocessError): + """Raised when run_pipeline() is called with check=True and one or more + commands in the pipeline return a non-zero exit status. + + Attributes: + commands: List of commands in the pipeline (each a list of strings). + returncodes: List of return codes corresponding to each command. + stdout: Standard output from the final command (if captured). + stderr: Standard error output (if captured). + failed: List of (index, command, returncode) tuples for failed commands. + """ + def __init__(self, commands, returncodes, stdout=None, stderr=None): + self.commands = commands + self.returncodes = returncodes + self.stdout = stdout + self.stderr = stderr + self.failed = [ + (i, cmd, rc) + for i, (cmd, rc) in enumerate(zip(commands, returncodes)) + if rc != 0 + ] + + def __str__(self): + failed_info = ", ".join( + f"command {i} {cmd!r} returned {rc}" + for i, cmd, rc in self.failed + ) + return f"Pipeline failed: {failed_info}" + + if _mswindows: class STARTUPINFO: def __init__(self, *, dwFlags=0, hStdInput=None, hStdOutput=None, @@ -289,6 +320,295 @@ def _cleanup(): DEVNULL = -3 +# Helper function for multiplexed I/O +def _remaining_time_helper(endtime): + """Calculate remaining time until deadline.""" + if endtime is None: + return None + return endtime - _time() + + +def _flush_stdin(stdin): + """Flush stdin, ignoring BrokenPipeError and closed file ValueError.""" + try: + stdin.flush() + except BrokenPipeError: + pass + except ValueError: + # Ignore ValueError: I/O operation on closed file. + if not stdin.closed: + raise + + +def _make_input_view(input_data): + """Convert input data to a byte memoryview for writing. + + Handles the case where input_data is already a memoryview with + non-byte elements (e.g., int32 array) by casting to a byte view. + This ensures len(view) returns the byte count, not element count. + """ + if not input_data: + return None + if isinstance(input_data, memoryview): + return input_data.cast("b") # ensure byte view for correct len() + return memoryview(input_data) + + +def _translate_newlines(data, encoding, errors): + """Decode bytes to str and translate newlines to \n.""" + data = data.decode(encoding, errors) + return data.replace("\r\n", "\n").replace("\r", "\n") + + +def _communicate_io_posix(selector, stdin, input_view, input_offset, + output_buffers, endtime): + """ + Low-level POSIX I/O multiplexing loop. + + This is the common core used by both _communicate_streams() and + Popen._communicate(). It handles the select loop for reading/writing + but does not manage stream lifecycle or raise timeout exceptions. + + Args: + selector: A _PopenSelector with streams already registered + stdin: Writable file object for input, or None + input_view: memoryview of input bytes, or None + input_offset: Starting offset into input_view (for resume support) + output_buffers: Dict {file_object: list} to append read chunks to + endtime: Deadline timestamp, or None for no timeout + + Returns: + (new_input_offset, completed) + - new_input_offset: How many bytes of input were written + - completed: True if all I/O finished, False if timed out + + Note: + - Does NOT close any streams (caller decides) + - Does NOT raise TimeoutExpired (caller handles) + - Appends to output_buffers lists in place + """ + stdin_fd = stdin.fileno() if stdin else None + + while selector.get_map(): + remaining = _remaining_time_helper(endtime) + if remaining is not None and remaining < 0: + return (input_offset, False) # Timed out + + ready = selector.select(remaining) + + # Check timeout after select (may have woken spuriously) + if endtime is not None and _time() > endtime: + return (input_offset, False) # Timed out + + for key, events in ready: + if key.fd == stdin_fd: + # Write chunk to stdin + chunk = input_view[input_offset:input_offset + _PIPE_BUF] + try: + input_offset += os.write(key.fd, chunk) + except BrokenPipeError: + selector.unregister(key.fd) + try: + stdin.close() + except BrokenPipeError: + pass + else: + if input_offset >= len(input_view): + selector.unregister(key.fd) + try: + stdin.close() + except BrokenPipeError: + pass + elif key.fileobj in output_buffers: + # Read chunk from output stream + data = os.read(key.fd, 32768) + if not data: + selector.unregister(key.fileobj) + else: + output_buffers[key.fileobj].append(data) + + return (input_offset, True) # Completed + + +def _communicate_streams(stdin=None, input_data=None, read_streams=None, + timeout=None, cmd_for_timeout=None): + """ + Multiplex I/O: write input_data to stdin, read from read_streams. + + All streams must be file objects (not raw file descriptors). + All I/O is done in binary mode; caller handles text encoding. + + Args: + stdin: Writable binary file object for input, or None + input_data: Bytes to write to stdin, or None + read_streams: List of readable binary file objects to read from + timeout: Timeout in seconds, or None for no timeout + cmd_for_timeout: Value to use for TimeoutExpired.cmd + + Returns: + Dict mapping each file object in read_streams to its bytes data. + All file objects in read_streams will be closed. + + Raises: + TimeoutExpired: If timeout expires (with partial data) + """ + if timeout is not None: + endtime = _time() + timeout + else: + endtime = None + + read_streams = read_streams or [] + + if _mswindows: + return _communicate_streams_windows( + stdin, input_data, read_streams, endtime, timeout, cmd_for_timeout) + else: + return _communicate_streams_posix( + stdin, input_data, read_streams, endtime, timeout, cmd_for_timeout) + + +if _mswindows: + def _reader_thread_func(fh, buffer): + """Thread function to read from a file handle into a buffer list.""" + try: + buffer.append(fh.read()) + except OSError: + buffer.append(b'') + + def _writer_thread_func(fh, data, result): + """Thread function to write data to a file handle and close it.""" + try: + if data: + fh.write(data) + except BrokenPipeError: + pass + except OSError as exc: + if exc.errno != errno.EINVAL: + result.append(exc) + try: + fh.close() + except BrokenPipeError: + pass + except OSError as exc: + if exc.errno != errno.EINVAL and not result: + result.append(exc) + + def _communicate_streams_windows(stdin, input_data, read_streams, + endtime, orig_timeout, cmd_for_timeout): + """Windows implementation using threads.""" + threads = [] + buffers = {} + writer_thread = None + writer_result = [] + + # Start writer thread to send input to stdin + if stdin and input_data: + writer_thread = threading.Thread( + target=_writer_thread_func, + args=(stdin, input_data, writer_result)) + writer_thread.daemon = True + writer_thread.start() + elif stdin: + # No input data, just close stdin + try: + stdin.close() + except BrokenPipeError: + pass + except OSError as exc: + if exc.errno != errno.EINVAL: + raise + + # Start reader threads for each stream + for stream in read_streams: + buf = [] + buffers[stream] = buf + t = threading.Thread(target=_reader_thread_func, args=(stream, buf)) + t.daemon = True + t.start() + threads.append((stream, t)) + + # Join writer thread with timeout first + if writer_thread is not None: + remaining = _remaining_time_helper(endtime) + if remaining is not None and remaining < 0: + remaining = 0 + writer_thread.join(remaining) + if writer_thread.is_alive(): + # Timed out during write - collect partial results + results = {s: (b[0] if b else b'') for s, b in buffers.items()} + raise TimeoutExpired( + cmd_for_timeout, orig_timeout, + output=results.get(read_streams[0]) if read_streams else None) + # Check for write errors + if writer_result: + raise writer_result[0] + + # Join reader threads with timeout + for stream, t in threads: + remaining = _remaining_time_helper(endtime) + if remaining is not None and remaining < 0: + remaining = 0 + t.join(remaining) + if t.is_alive(): + # Collect partial results + results = {s: (b[0] if b else b'') for s, b in buffers.items()} + raise TimeoutExpired( + cmd_for_timeout, orig_timeout, + output=results.get(read_streams[0]) if read_streams else None) + + # Collect results + return {stream: (buf[0] if buf else b'') for stream, buf in buffers.items()} + +else: + def _communicate_streams_posix(stdin, input_data, read_streams, + endtime, orig_timeout, cmd_for_timeout): + """POSIX implementation using selectors.""" + # Build output buffers for each stream + output_buffers = {stream: [] for stream in read_streams} + + # Prepare stdin + if stdin: + _flush_stdin(stdin) + if not input_data: + try: + stdin.close() + except BrokenPipeError: + pass + stdin = None # Don't register with selector + + # Prepare input data + input_view = _make_input_view(input_data) + + with _PopenSelector() as selector: + if stdin and input_data: + selector.register(stdin, selectors.EVENT_WRITE) + for stream in read_streams: + selector.register(stream, selectors.EVENT_READ) + + # Run the common I/O loop + _, completed = _communicate_io_posix( + selector, stdin, input_view, 0, output_buffers, endtime) + + if not completed: + # Timed out - collect partial results + results = {stream: b''.join(chunks) + for stream, chunks in output_buffers.items()} + raise TimeoutExpired( + cmd_for_timeout, orig_timeout, + output=results.get(read_streams[0]) if read_streams else None) + + # Build results and close all file objects + results = {} + for stream, chunks in output_buffers.items(): + results[stream] = b''.join(chunks) + try: + stream.close() + except OSError: + pass + + return results + + # XXX This function is only used by multiprocessing and the test suite, # but it's here so that it can be imported when Python is compiled without # threads. @@ -508,6 +828,47 @@ def check_returncode(self): self.stderr) +class PipelineResult: + """A pipeline of processes that have finished running. + + This is returned by run_pipeline(). + + Attributes: + commands: List of commands in the pipeline (each command is a list). + returncodes: List of return codes for each command in the pipeline. + returncode: The return code of the final command (for convenience). + stdout: The standard output of the final command (None if not captured). + stderr: The standard error output (None if not captured). + """ + def __init__(self, commands, returncodes, stdout=None, stderr=None): + self.commands = list(commands) + self.returncodes = list(returncodes) + self.stdout = stdout + self.stderr = stderr + + @property + def returncode(self): + """Return the exit code of the final command in the pipeline.""" + return self.returncodes[-1] if self.returncodes else None + + def __repr__(self): + args = [f'commands={self.commands!r}', + f'returncodes={self.returncodes!r}'] + if self.stdout is not None: + args.append(f'stdout={self.stdout!r}') + if self.stderr is not None: + args.append(f'stderr={self.stderr!r}') + return f"{type(self).__name__}({', '.join(args)})" + + __class_getitem__ = classmethod(types.GenericAlias) + + def check_returncodes(self): + """Raise PipelineError if any command's exit code is non-zero.""" + if any(rc != 0 for rc in self.returncodes): + raise PipelineError(self.commands, self.returncodes, + self.stdout, self.stderr) + + def run(*popenargs, input=None, capture_output=False, timeout=None, check=False, **kwargs): """Run command with arguments and return a CompletedProcess instance. @@ -578,6 +939,235 @@ def run(*popenargs, return CompletedProcess(process.args, retcode, stdout, stderr) +def run_pipeline(*commands, input=None, capture_output=False, timeout=None, + check=False, **kwargs): + """Run a pipeline of commands connected via pipes. + + Each positional argument should be a command (list of strings or a string + if shell=True) to execute. The stdout of each command is connected to the + stdin of the next command in the pipeline, similar to shell pipelines. + + Returns a PipelineResult instance with attributes commands, returncodes, + stdout, and stderr. By default, stdout and stderr are not captured, and + those attributes will be None. Pass capture_output=True to capture both + the final command's stdout and stderr from all commands. + + If check is True and any command's exit code is non-zero, it raises a + PipelineError. This is similar to shell "pipefail" behavior. + + If timeout (seconds) is given and the pipeline takes too long, a + TimeoutExpired exception will be raised and all processes will be killed. + + The optional "input" argument allows passing bytes or a string to the + first command's stdin. If you use this argument, you may not also specify + stdin in kwargs. + + By default, all communication is in bytes. Use text=True, encoding, or + errors to enable text mode, which affects the input argument and stdout/ + stderr outputs. + + .. note:: + When using text=True with capture_output=True or stderr=PIPE, be aware + that stderr output from multiple processes may be interleaved in ways + that produce invalid character sequences when decoded. For reliable + text decoding, avoid text=True when capturing stderr from pipelines, + or handle decoding errors appropriately. + + Other keyword arguments are passed to each Popen call, except for stdin, + stdout which are managed by the pipeline. + + Example: + # Equivalent to: cat file.txt | grep pattern | wc -l + result = run_pipeline( + ['cat', 'file.txt'], + ['grep', 'pattern'], + ['wc', '-l'], + capture_output=True, text=True + ) + print(result.stdout) # "42\\n" + print(result.returncodes) # [0, 0, 0] + """ + if len(commands) < 2: + raise ValueError('run_pipeline requires at least 2 commands') + + # Validate no conflicting arguments + if input is not None: + if kwargs.get('stdin') is not None: + raise ValueError('stdin and input arguments may not both be used.') + + if capture_output: + if kwargs.get('stdout') is not None or kwargs.get('stderr') is not None: + raise ValueError('stdout and stderr arguments may not be used ' + 'with capture_output.') + + # Determine stderr handling - all processes share the same stderr pipe + # When capturing, we create one pipe and all processes write to it + stderr_arg = kwargs.pop('stderr', None) + capture_stderr = capture_output or stderr_arg == PIPE + + # stdin is for the first process, stdout is for the last process + stdin_arg = kwargs.pop('stdin', None) + stdout_arg = kwargs.pop('stdout', None) + + processes = [] + stderr_reader = None # File object for reading shared stderr (for parent) + stderr_write_fd = None # Write end of shared stderr pipe (for children) + + try: + # Create a single stderr pipe that all processes will share + if capture_stderr: + stderr_read_fd, stderr_write_fd = os.pipe() + stderr_reader = os.fdopen(stderr_read_fd, 'rb') + + for i, cmd in enumerate(commands): + is_first = (i == 0) + is_last = (i == len(commands) - 1) + + # Determine stdin for this process + if is_first: + if input is not None: + proc_stdin = PIPE + else: + proc_stdin = stdin_arg # Could be None, PIPE, fd, or file + else: + proc_stdin = processes[-1].stdout + + # Determine stdout for this process + if is_last: + if capture_output: + proc_stdout = PIPE + else: + proc_stdout = stdout_arg # Could be None, PIPE, fd, or file + else: + proc_stdout = PIPE + + # All processes share the same stderr pipe (write end) + if capture_stderr: + proc_stderr = stderr_write_fd + else: + proc_stderr = stderr_arg + + proc = Popen(cmd, stdin=proc_stdin, stdout=proc_stdout, + stderr=proc_stderr, **kwargs) + processes.append(proc) + + # Close the parent's copy of the previous process's stdout + # to allow the pipe to signal EOF when the previous process exits + if not is_first and processes[-2].stdout is not None: + processes[-2].stdout.close() + + # Close the write end of stderr pipe in parent - children have it + if stderr_write_fd is not None: + os.close(stderr_write_fd) + stderr_write_fd = None + + first_proc = processes[0] + last_proc = processes[-1] + + # Calculate deadline for timeout (used throughout) + if timeout is not None: + endtime = _time() + timeout + else: + endtime = None + + # Determine if we're in text mode (text= or universal_newlines=) + text_mode = (kwargs.get('text') or kwargs.get('universal_newlines') + or kwargs.get('encoding') or kwargs.get('errors')) + encoding = kwargs.get('encoding') + errors_param = kwargs.get('errors', 'strict') + if text_mode and encoding is None: + encoding = locale.getencoding() + + # Encode input if in text mode + input_data = input + if input_data is not None and text_mode: + input_data = input_data.encode(encoding, errors_param) + + # Build list of streams to read from + read_streams = [] + if last_proc.stdout is not None: + read_streams.append(last_proc.stdout) + if stderr_reader is not None: + read_streams.append(stderr_reader) + + # Use multiplexed I/O to handle stdin/stdout/stderr concurrently + # This avoids deadlocks from pipe buffer limits + stdin_stream = first_proc.stdin if input is not None else None + + try: + results = _communicate_streams( + stdin=stdin_stream, + input_data=input_data, + read_streams=read_streams, + timeout=_remaining_time_helper(endtime), + cmd_for_timeout=commands, + ) + except TimeoutExpired: + # Kill all processes on timeout + for p in processes: + if p.poll() is None: + p.kill() + for p in processes: + p.wait() + raise + + # Extract results + stdout = results.get(last_proc.stdout) + stderr = results.get(stderr_reader) + + # Translate newlines if in text mode (decode and convert \r\n to \n) + if text_mode and stdout is not None: + stdout = _translate_newlines(stdout, encoding, errors_param) + if text_mode and stderr is not None: + stderr = _translate_newlines(stderr, encoding, errors_param) + + # Wait for all processes to complete (use remaining time from deadline) + returncodes = [] + for proc in processes: + try: + remaining = _remaining_time_helper(endtime) + proc.wait(timeout=remaining) + except TimeoutExpired: + # Kill all processes on timeout + for p in processes: + if p.poll() is None: + p.kill() + for p in processes: + p.wait() + raise TimeoutExpired(commands, timeout, stdout, stderr) + returncodes.append(proc.returncode) + + result = PipelineResult(commands, returncodes, stdout, stderr) + + if check and any(rc != 0 for rc in returncodes): + raise PipelineError(commands, returncodes, stdout, stderr) + + return result + + finally: + # Ensure all processes are cleaned up + for proc in processes: + if proc.poll() is None: + proc.kill() + proc.wait() + # Close any open file handles + if proc.stdin and not proc.stdin.closed: + proc.stdin.close() + if proc.stdout and not proc.stdout.closed: + proc.stdout.close() + # Close stderr pipe (reader is a file object, writer is a raw fd) + if stderr_reader is not None and not stderr_reader.closed: + try: + stderr_reader.close() + except OSError: + pass + if stderr_write_fd is not None: + try: + os.close(stderr_write_fd) + except OSError: + pass + + def list2cmdline(seq): """ Translate a sequence of arguments into a command line @@ -1094,8 +1684,7 @@ def universal_newlines(self, universal_newlines): self.text_mode = bool(universal_newlines) def _translate_newlines(self, data, encoding, errors): - data = data.decode(encoding, errors) - return data.replace("\r\n", "\n").replace("\r", "\n") + return _translate_newlines(data, encoding, errors) def __enter__(self): return self @@ -2092,14 +2681,7 @@ def _communicate(self, input, endtime, orig_timeout): if self.stdin and not self._communication_started: # Flush stdio buffer. This might block, if the user has # been writing to .stdin in an uncontrolled fashion. - try: - self.stdin.flush() - except BrokenPipeError: - pass # communicate() must ignore BrokenPipeError. - except ValueError: - # ignore ValueError: I/O operation on closed file. - if not self.stdin.closed: - raise + _flush_stdin(self.stdin) if not input: try: self.stdin.close() @@ -2124,11 +2706,8 @@ def _communicate(self, input, endtime, orig_timeout): self._save_input(input) - if self._input: - if not isinstance(self._input, memoryview): - input_view = memoryview(self._input) - else: - input_view = self._input.cast("b") # byte input required + input_view = _make_input_view(self._input) + input_offset = self._input_offset if self._input else 0 with _PopenSelector() as selector: if self.stdin and not self.stdin.closed and self._input: @@ -2138,41 +2717,32 @@ def _communicate(self, input, endtime, orig_timeout): if self.stderr and not self.stderr.closed: selector.register(self.stderr, selectors.EVENT_READ) - while selector.get_map(): - timeout = self._remaining_time(endtime) - if timeout is not None and timeout < 0: - self._check_timeout(endtime, orig_timeout, - stdout, stderr, - skip_check_and_raise=True) - raise RuntimeError( # Impossible :) - '_check_timeout(..., skip_check_and_raise=True) ' - 'failed to raise TimeoutExpired.') - - ready = selector.select(timeout) - self._check_timeout(endtime, orig_timeout, stdout, stderr) - - # XXX Rewrite these to use non-blocking I/O on the file - # objects; they are no longer using C stdio! - - for key, events in ready: - if key.fileobj is self.stdin: - chunk = input_view[self._input_offset : - self._input_offset + _PIPE_BUF] - try: - self._input_offset += os.write(key.fd, chunk) - except BrokenPipeError: - selector.unregister(key.fileobj) - key.fileobj.close() - else: - if self._input_offset >= len(input_view): - selector.unregister(key.fileobj) - key.fileobj.close() - elif key.fileobj in (self.stdout, self.stderr): - data = os.read(key.fd, 32768) - if not data: - selector.unregister(key.fileobj) - key.fileobj.close() - self._fileobj2output[key.fileobj].append(data) + # Use the common I/O loop (supports resume via _input_offset) + stdin_to_write = (self.stdin if self.stdin and self._input + and not self.stdin.closed else None) + new_offset, completed = _communicate_io_posix( + selector, + stdin_to_write, + input_view, + input_offset, + self._fileobj2output, + endtime) + if self._input: + self._input_offset = new_offset + + if not completed: + self._check_timeout(endtime, orig_timeout, stdout, stderr, + skip_check_and_raise=True) + raise RuntimeError( # Impossible :) + '_check_timeout(..., skip_check_and_raise=True) ' + 'failed to raise TimeoutExpired.') + + # Close streams now that we're done reading + if self.stdout: + self.stdout.close() + if self.stderr: + self.stderr.close() + try: self.wait(timeout=self._remaining_time(endtime)) except TimeoutExpired as exc: diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 806a1e3fa303eb..4e43ea7bbfda90 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -1984,6 +1984,396 @@ def test_encoding_warning(self): self.assertStartsWith(lines[1], b":3: EncodingWarning: ") +class PipelineTestCase(BaseTestCase): + """Tests for subprocess.run_pipeline()""" + + def test_pipeline_basic(self): + """Test basic two-command pipeline""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello world")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + capture_output=True, text=True + ) + self.assertEqual(result.stdout.strip(), 'HELLO WORLD') + self.assertEqual(result.returncodes, [0, 0]) + self.assertEqual(result.returncode, 0) + + def test_pipeline_three_commands(self): + """Test pipeline with three commands""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("one\\ntwo\\nthree")'], + [sys.executable, '-c', 'import sys; print("".join(sorted(sys.stdin.readlines())))'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read().strip().upper())'], + capture_output=True, text=True + ) + self.assertEqual(result.stdout.strip(), 'ONE\nTHREE\nTWO') + self.assertEqual(result.returncodes, [0, 0, 0]) + + def test_pipeline_with_input(self): + """Test pipeline with input data""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + [sys.executable, '-c', 'import sys; print(len(sys.stdin.read().strip()))'], + input='hello', capture_output=True, text=True + ) + self.assertEqual(result.stdout.strip(), '5') + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_memoryview_input(self): + """Test pipeline with memoryview input (byte elements)""" + test_data = b"Hello, memoryview pipeline!" + mv = memoryview(test_data) + result = subprocess.run_pipeline( + [sys.executable, '-c', + 'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read())'], + [sys.executable, '-c', + 'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read().upper())'], + input=mv, capture_output=True + ) + self.assertEqual(result.stdout, test_data.upper()) + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_memoryview_input_nonbyte(self): + """Test pipeline with non-byte memoryview input (e.g., int32). + + This tests the fix for gh-134453 where non-byte memoryviews + had incorrect length tracking on POSIX, causing data truncation. + """ + import array + # Create an array of 32-bit integers large enough to trigger + # chunked writing behavior (> PIPE_BUF) + pipe_buf = getattr(select, 'PIPE_BUF', 512) + # Each 'i' element is 4 bytes, need more than pipe_buf bytes total + num_elements = (pipe_buf // 4) + 100 + test_array = array.array('i', [0x41424344 for _ in range(num_elements)]) + expected_bytes = test_array.tobytes() + mv = memoryview(test_array) + + result = subprocess.run_pipeline( + [sys.executable, '-c', + 'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read())'], + [sys.executable, '-c', + 'import sys; data = sys.stdin.buffer.read(); ' + 'sys.stdout.buffer.write(data)'], + input=mv, capture_output=True + ) + self.assertEqual(result.stdout, expected_bytes, + msg=f"{len(result.stdout)=} != {len(expected_bytes)=}") + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_bytes_mode(self): + """Test pipeline in binary mode""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; sys.stdout.buffer.write(b"hello")'], + [sys.executable, '-c', 'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read().upper())'], + capture_output=True + ) + self.assertEqual(result.stdout, b'HELLO') + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_error_check(self): + """Test that check=True raises PipelineError on failure""" + with self.assertRaises(subprocess.PipelineError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; sys.exit(1)'], + capture_output=True, check=True + ) + exc = cm.exception + self.assertEqual(len(exc.failed), 1) + self.assertEqual(exc.failed[0][0], 1) # Second command failed + self.assertEqual(exc.returncodes, [0, 1]) + + def test_pipeline_first_command_fails(self): + """Test pipeline where first command fails""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; sys.exit(42)'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read())'], + capture_output=True + ) + self.assertEqual(result.returncodes[0], 42) + + def test_pipeline_requires_two_commands(self): + """Test that pipeline requires at least 2 commands""" + with self.assertRaises(ValueError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + capture_output=True + ) + self.assertIn('at least 2 commands', str(cm.exception)) + + def test_pipeline_stdin_and_input_conflict(self): + """Test that stdin and input cannot both be specified""" + with self.assertRaises(ValueError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'pass'], + [sys.executable, '-c', 'pass'], + input='data', stdin=subprocess.PIPE + ) + self.assertIn('stdin', str(cm.exception)) + self.assertIn('input', str(cm.exception)) + + def test_pipeline_capture_output_conflict(self): + """Test that capture_output conflicts with stdout/stderr""" + with self.assertRaises(ValueError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'pass'], + [sys.executable, '-c', 'pass'], + capture_output=True, stdout=subprocess.PIPE + ) + self.assertIn('capture_output', str(cm.exception)) + + def test_pipeline_universal_newlines(self): + """Test that universal_newlines=True works like text=True""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + capture_output=True, universal_newlines=True + ) + self.assertIsInstance(result.stdout, str) + self.assertIn('HELLO', result.stdout) + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_result_repr(self): + """Test PipelineResult string representation""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("test")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read())'], + capture_output=True, text=True + ) + repr_str = repr(result) + self.assertIn('PipelineResult', repr_str) + self.assertIn('commands=', repr_str) + self.assertIn('returncodes=', repr_str) + + def test_pipeline_check_returncodes_method(self): + """Test PipelineResult.check_returncodes() method""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; sys.exit(5)'], + capture_output=True + ) + with self.assertRaises(subprocess.PipelineError) as cm: + result.check_returncodes() + self.assertEqual(cm.exception.returncodes[1], 5) + + def test_pipeline_no_capture(self): + """Test pipeline without capturing output""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'pass'], + [sys.executable, '-c', 'pass'], + ) + self.assertEqual(result.stdout, None) + self.assertEqual(result.stderr, None) + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_stderr_capture(self): + """Test that stderr is captured from all processes""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; print("err1", file=sys.stderr); print("out1")'], + [sys.executable, '-c', 'import sys; print("err2", file=sys.stderr); print(sys.stdin.read())'], + capture_output=True, text=True + ) + self.assertIn('err1', result.stderr) + self.assertIn('err2', result.stderr) + + @unittest.skipIf(mswindows, "POSIX specific test") + def test_pipeline_timeout(self): + """Test pipeline with timeout""" + with self.assertRaises(subprocess.TimeoutExpired): + subprocess.run_pipeline( + [sys.executable, '-c', 'import time; time.sleep(10); print("done")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read())'], + capture_output=True, timeout=0.1 + ) + + def test_pipeline_error_str(self): + """Test PipelineError string representation""" + try: + subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; sys.exit(1)'], + [sys.executable, '-c', 'import sys; sys.exit(2)'], + capture_output=True, check=True + ) + except subprocess.PipelineError as e: + error_str = str(e) + self.assertIn('Pipeline failed', error_str) + + def test_pipeline_explicit_stdout_pipe(self): + """Test pipeline with explicit stdout=PIPE""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + stdout=subprocess.PIPE + ) + self.assertEqual(result.stdout.strip(), b'HELLO') + self.assertIsNone(result.stderr) + + def test_pipeline_stdin_from_file(self): + """Test pipeline with stdin from file""" + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + f.write('file content\n') + f.flush() + fname = f.name + try: + with open(fname, 'r') as f: + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + [sys.executable, '-c', 'import sys; print(len(sys.stdin.read().strip()))'], + stdin=f, capture_output=True, text=True + ) + self.assertEqual(result.stdout.strip(), '12') # "FILE CONTENT" + finally: + os.unlink(fname) + + def test_pipeline_stdout_to_devnull(self): + """Test pipeline with stdout to DEVNULL""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read())'], + stdout=subprocess.DEVNULL + ) + self.assertIsNone(result.stdout) + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_large_data_no_deadlock(self): + """Test that large data doesn't cause pipe buffer deadlock. + + This test verifies that the multiplexed I/O implementation properly + handles cases where pipe buffers would fill up. Without proper + multiplexing, this would deadlock because: + 1. First process outputs large data filling stdout pipe buffer + 2. Middle process reads some, processes, writes to its stdout + 3. If stdout pipe buffer fills, middle process blocks on write + 4. But first process is blocked waiting for middle to read more + 5. Classic deadlock + + The test uses data larger than typical pipe buffer size (64KB on Linux) + to ensure the multiplexed I/O is working correctly. + """ + # Generate data larger than typical pipe buffer (64KB) + # Use 256KB to ensure we exceed buffer on most systems + large_data = 'x' * (256 * 1024) + + # Pipeline: input -> double the data -> count chars + # The middle process outputs twice as much, increasing buffer pressure + result = subprocess.run_pipeline( + [sys.executable, '-c', + 'import sys; data = sys.stdin.read(); print(data + data)'], + [sys.executable, '-c', + 'import sys; print(len(sys.stdin.read().strip()))'], + input=large_data, capture_output=True, text=True, timeout=30 + ) + + # Original data doubled = 512KB = 524288 chars + # Second process strips whitespace (removes trailing newline) then counts + expected_len = 256 * 1024 * 2 # doubled data, newline stripped + self.assertEqual(result.stdout.strip(), str(expected_len)) + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_large_data_three_stages(self): + """Test large data through a three-stage pipeline. + + This is a more complex deadlock scenario with three processes, + where buffer pressure can occur at multiple points. + """ + # Use 128KB of data + large_data = 'y' * (128 * 1024) + + # Pipeline: input -> uppercase -> add prefix to each line -> count + # We use line-based processing to create more buffer churn + result = subprocess.run_pipeline( + [sys.executable, '-c', + 'import sys; print(sys.stdin.read().upper())'], + [sys.executable, '-c', + 'import sys; print("".join("PREFIX:" + line for line in sys.stdin))'], + [sys.executable, '-c', + 'import sys; print(len(sys.stdin.read()))'], + input=large_data, capture_output=True, text=True, timeout=30 + ) + + self.assertEqual(result.returncodes, [0, 0, 0]) + # Just verify we got a reasonable numeric output without deadlock + output_len = int(result.stdout.strip()) + self.assertGreater(output_len, len(large_data)) + + def test_pipeline_large_data_with_stderr(self): + """Test large data with large stderr output from multiple processes. + + Ensures stderr collection doesn't interfere with the main data flow + and doesn't cause deadlocks when multiple processes write large + amounts to stderr concurrently with stdin/stdout data flow. + """ + # 64KB of data through the pipeline + data_size = 64 * 1024 + large_data = 'z' * data_size + # Each process writes 64KB to stderr as well + stderr_size = 64 * 1024 + + result = subprocess.run_pipeline( + [sys.executable, '-c', f''' +import sys +# Write large stderr output +sys.stderr.write("E" * {stderr_size}) +sys.stderr.write("\\nstage1 done\\n") +# Pass through stdin to stdout +data = sys.stdin.read() +print(data) +'''], + [sys.executable, '-c', f''' +import sys +# Write large stderr output +sys.stderr.write("F" * {stderr_size}) +sys.stderr.write("\\nstage2 done\\n") +# Count input size +data = sys.stdin.read() +print(len(data.strip())) +'''], + input=large_data, capture_output=True, text=True, timeout=30 + ) + + self.assertEqual(result.stdout.strip(), str(data_size)) + # Verify both processes wrote to stderr + self.assertIn('stage1 done', result.stderr) + self.assertIn('stage2 done', result.stderr) + # Verify large stderr was captured (at least most of it) + self.assertGreater(len(result.stderr), stderr_size) + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_timeout_large_input(self): + """Test that timeout is enforced with large input to a slow pipeline. + + This verifies that run_pipeline() doesn't block indefinitely when + writing large input to a pipeline where the first process is slow + to consume stdin. The timeout should be enforced promptly. + + This is particularly important on Windows where stdin writing could + block without proper threading. + """ + # Input larger than typical pipe buffer (64KB) + input_data = 'x' * (128 * 1024) + + start = time.monotonic() + with self.assertRaises(subprocess.TimeoutExpired): + subprocess.run_pipeline( + # First process sleeps before reading - simulates slow consumer + [sys.executable, '-c', + 'import sys, time; time.sleep(30); print(sys.stdin.read())'], + [sys.executable, '-c', + 'import sys; print(len(sys.stdin.read()))'], + input=input_data, capture_output=True, text=True, timeout=0.5 + ) + elapsed = time.monotonic() - start + + # Timeout should occur close to the specified timeout value, + # not after waiting for the subprocess to finish sleeping. + # Allow generous margin for slow CI, but must be well under + # the subprocess sleep time. + self.assertLess(elapsed, 5.0, + f"TimeoutExpired raised after {elapsed:.2f}s; expected ~0.5s. " + "Input writing may have blocked without checking timeout.") + + def _get_test_grp_name(): for name_group in ('staff', 'nogroup', 'grp', 'nobody', 'nfsnobody'): if grp: