From e2f20f6b658a0c6d38a7337f772ec272e67e2199 Mon Sep 17 00:00:00 2001 From: Noortheen Raja Date: Mon, 21 Feb 2022 09:30:40 +0530 Subject: [PATCH 1/6] safe changes --- xonsh/built_ins.py | 3 +++ xonsh/procs/pipelines.py | 39 ++++++++++++++++++++++++--------------- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/xonsh/built_ins.py b/xonsh/built_ins.py index 7b7aa53701..c4001fcbf5 100644 --- a/xonsh/built_ins.py +++ b/xonsh/built_ins.py @@ -550,8 +550,11 @@ def __init__(self): # Session attributes self.exit = None + + # todo: these two attributes seems not used self.stdout_uncaptured = None self.stderr_uncaptured = None + self._py_exit = None self._py_quit = None self.commands_cache = None diff --git a/xonsh/procs/pipelines.py b/xonsh/procs/pipelines.py index 16128e58bd..f176b2bebe 100644 --- a/xonsh/procs/pipelines.py +++ b/xonsh/procs/pipelines.py @@ -1,4 +1,5 @@ """Command pipeline tools.""" +import asyncio import io import os import re @@ -7,17 +8,23 @@ import sys import threading import time +import typing as tp import xonsh.jobs as xj import xonsh.lazyasd as xl import xonsh.platform as xp import xonsh.tools as xt from xonsh.built_ins import XSH +from xonsh.procs.async_proc import ReadProtocol from xonsh.procs.readers import ConsoleParallelReader, NonBlockingFDReader, safe_fdclose +if tp.TYPE_CHECKING: + from xonsh.procs.specs import StreamHandler, SubprocSpec + @xl.lazyobject def STDOUT_CAPTURE_KINDS(): + # todo: remove this and use the Literal return frozenset(["stdout", "object"]) @@ -111,7 +118,7 @@ class CommandPipeline: nonblocking = (io.BytesIO, NonBlockingFDReader, ConsoleParallelReader) - def __init__(self, specs): + def __init__(self, specs: "list[SubprocSpec]"): """ Parameters ---------- @@ -137,11 +144,11 @@ def __init__(self, specs): starttime : floats or None Pipeline start timestamp. """ - self.starttime = None + self.starttime: "float | None" = None self.ended = False - self.procs = [] + self.procs: "list[subprocess.Popen]" = [] self.specs = specs - self.spec = specs[-1] + self.spec: "SubprocSpec" = specs[-1] self.captured = specs[-1].captured self.input = self._output = self.errors = self.endtime = None self._closed_handle_cache = {} @@ -365,6 +372,7 @@ def tee_stdout(self): nl = b"\n" cr = b"\r" crnl = b"\r\n" + for line in self.iterraw(): # write to stdout line ASAP, if needed if stream: @@ -539,18 +547,19 @@ def _close_prev_procs(self): def _close_proc(self): """Closes last proc's stdout.""" - s = self.spec - p = self.proc - self._safe_close(s.stdin) - self._safe_close(s.stdout) - self._safe_close(s.stderr) - self._safe_close(s.captured_stdout) - self._safe_close(s.captured_stderr) - if p is None: + spec = self.spec + self._safe_close(spec.stdin) + self._safe_close(spec.stdout) + self._safe_close(spec.stderr) + # self._safe_close(spec.captured_stdout) + self._safe_close(spec.captured_stderr) + + proc = self.proc + if proc is None: return - self._safe_close(p.stdin) - self._safe_close(p.stdout) - self._safe_close(p.stderr) + self._safe_close(proc.stdin) + self._safe_close(proc.stdout) + self._safe_close(proc.stderr) def _set_input(self): """Sets the input variable.""" From a4467e3d88ae222e11a80d23632e277c1bc4e632 Mon Sep 17 00:00:00 2001 From: Noortheen Raja Date: Mon, 21 Feb 2022 09:30:59 +0530 Subject: [PATCH 2/6] feat: stream capture using asyncio --- tests/procs/test_async_proc.py | 41 +++++++++ tests/procs/test_specs.py | 40 +++++++++ xonsh/environ.py | 27 +++--- xonsh/jobs.py | 1 - xonsh/procs/async_proc.py | 130 +++++++++++++++++++++++++++ xonsh/procs/pipelines.py | 132 ++++++++++++++++------------ xonsh/procs/posix.py | 5 ++ xonsh/procs/specs.py | 155 +++++++++++++++++++++------------ 8 files changed, 402 insertions(+), 129 deletions(-) create mode 100644 tests/procs/test_async_proc.py create mode 100644 xonsh/procs/async_proc.py diff --git a/tests/procs/test_async_proc.py b/tests/procs/test_async_proc.py new file mode 100644 index 0000000000..9c9487b554 --- /dev/null +++ b/tests/procs/test_async_proc.py @@ -0,0 +1,41 @@ +import contextlib +import sys + +import pytest + +from xonsh.procs import async_proc as ap +from xonsh.procs.specs import run_subproc + + +def test_ls(xession): + proc = ap.AsyncProc(["ls"], stdout=sys.stdout, stderr=sys.stderr) + assert proc.proc.pid + + +@pytest.fixture +def run_proc(tmp_path): + def factory(cmds: "list[str]", captured): + out_file = tmp_path / "stdout" + with out_file.open("wb") as fw: + with contextlib.redirect_stdout(fw): + return_val = run_subproc([cmds], captured) + return return_val, out_file.read_text() + + return factory + + +@pytest.mark.parametrize( + "captured,exp_out,exp_rtn", + [ + pytest.param(False, "hello", None, id="$[]"), + pytest.param("stdout", "", "hello", id="$()"), + ], +) +def test_run_subproc(xession, run_proc, captured, exp_out, exp_rtn): + xession.env["XONSH_SHOW_TRACEBACK"] = True + cmds = ["echo", "hello"] + + rtn, out = run_proc(cmds, captured) + + assert rtn == exp_rtn + assert out.strip() == exp_out diff --git a/tests/procs/test_specs.py b/tests/procs/test_specs.py index 3ca4de5366..e68c41564f 100644 --- a/tests/procs/test_specs.py +++ b/tests/procs/test_specs.py @@ -2,9 +2,11 @@ import itertools import sys from subprocess import Popen +from typing import NamedTuple import pytest +from xonsh.procs.pipelines import CommandPipeline, HiddenCommandPipeline from xonsh.procs.posix import PopenThread from xonsh.procs.proxies import STDOUT_DISPATCHER, ProcProxy, ProcProxyThread from xonsh.procs.specs import SubprocSpec, cmds_to_specs, run_subproc @@ -149,6 +151,44 @@ def test_run_subproc_background(captured, exp_is_none): assert (return_val is None) == exp_is_none +class Cmd(NamedTuple): + cmds: tuple + out: "str|None" = None + err: "str|None" = None + + +samples = [ + Cmd((["echo", "hello"],), out="hello"), # working + Cmd((["echo", "hello"], "|", ["wc", "-c"]), out="6"), # failing +] + + +@skip_if_on_windows +@pytest.mark.parametrize("cmd", samples) +@pytest.mark.parametrize( + "captured", + [ + # "object", + # "hiddenobject", + "stdout", + # False, + ], +) +def test_run_subproc(captured, cmd, xession): + # todo: parameterize backgrounding + return_val = run_subproc(cmd.cmds, captured) + if isinstance(return_val, CommandPipeline): + assert return_val.output == cmd.out + assert return_val.errors == cmd.err + elif isinstance(return_val, HiddenCommandPipeline): + assert return_val.returncode == 0 + # todo: use capsys to see what sys.stdout got + elif isinstance(return_val, str): + assert return_val.strip() == cmd.out + else: + assert return_val is None + + @pytest.mark.parametrize("thread_subprocs", [False, True]) def test_callable_alias_cls(thread_subprocs, xession): class Cls: diff --git a/xonsh/environ.py b/xonsh/environ.py index 7e2e5c891e..9c14ae1447 100644 --- a/xonsh/environ.py +++ b/xonsh/environ.py @@ -967,20 +967,21 @@ class GeneralSetting(Xettings): ) XONSH_CAPTURE_ALWAYS = Var.with_default( False, - "Try to capture output of commands run without explicit capturing.\n" - "If True, xonsh will capture the output of commands run directly or in ``![]``" - "to the session history.\n" - "Setting to True has the following disadvantages:\n" - "* Some interactive commands won't work properly (like when ``git`` invokes an interactive editor).\n" - " For more information see discussion at https://github.com/xonsh/xonsh/issues/3672.\n" - "* Stopping these commands with ^Z (i.e. ``SIGTSTP``)\n" - " is disabled as it causes deadlocked terminals.\n" - " ``SIGTSTP`` may still be issued and only the physical pressing\n" - " of ``Ctrl+Z`` is ignored.\n\n" - "Regardless of this value, commands run in ``$()``, ``!()`` or with an IO redirection (``>`` or ``|``) " - "will always be captured.\n" - "Setting this to True depends on ``$THREAD_SUBPROCS`` being True.", + """\ +Try to capture output of commands run without explicit capturing. +If True, xonsh will capture the output of commands run directly or in ``![]``to the session history. +Setting to True has the following disadvantages: +* Some interactive commands won't work properly (like when ``git`` invokes an interactive editor). + For more information see discussion at https://github.com/xonsh/xonsh/issues/3672. +* Stopping these commands with ^Z (i.e. ``SIGTSTP``) + is disabled as it causes deadlocked terminals. + ``SIGTSTP`` may still be issued and only the physical pressing + of ``Ctrl+Z`` is ignored. +Regardless of this value, commands run in ``$()``, ``!()`` +or with an IO redirection (``>`` or ``|``) will always be captured. +""", ) + # todo: remove THread_subprocs completely and use asyncio.read_pipe THREAD_SUBPROCS = Var( is_bool_or_none, to_bool_or_none, diff --git a/xonsh/jobs.py b/xonsh/jobs.py index 94ccf524d1..a74ab70524 100644 --- a/xonsh/jobs.py +++ b/xonsh/jobs.py @@ -264,7 +264,6 @@ def wait_for_active_job(last_task=None, backgrounded=False, return_error=False): if active_task is None: return last_task obj = active_task["obj"] - backgrounded = False try: _, wcode = os.waitpid(obj.pid, os.WUNTRACED) except ChildProcessError as e: # No child processes diff --git a/xonsh/procs/async_proc.py b/xonsh/procs/async_proc.py new file mode 100644 index 0000000000..a7f35c463b --- /dev/null +++ b/xonsh/procs/async_proc.py @@ -0,0 +1,130 @@ +import asyncio +import asyncio.subprocess as asp +import sys + + +class SubProcStreamProtocol(asp.SubprocessStreamProtocol): + """store writes to streams""" + + +class XonshProcBase: + """attributes and methods that are expected from a xonshProc implementation""" + + @property + def pid(self): + raise NotImplementedError + + +class AsyncProc(XonshProcBase): + def __init__( + self, + args: "list[str]", + loop=None, + universal_newlines=False, + stdin=None, + stdout=None, + stderr=None, + **kwargs, + ): + # todo: check if unbuffered read work all the time + kwargs["bufsize"] = 0 + + self.is_text = universal_newlines + if loop is None: + loop = asyncio.get_event_loop_policy().get_event_loop() + self.loop = loop + self.stdin = stdin + self.stdout = sys.stdout if stdout is None else stdout + self.stderr = sys.stdout if stderr is None else stderr + + self.proc: asp.Process = self.loop.run_until_complete( + self.get_proc(*args, **kwargs) + ) + + async def get_proc( + self, + program: str, + *args, + limit=2**16, + **kwargs, + ): + """wrap ``create_subprocess_exec`` call""" + protocol_factory = lambda: SubProcStreamProtocol(limit=limit, loop=self.loop) + transport, protocol = await self.loop.subprocess_exec( + protocol_factory, + program, + *args, + stdin=self.stdin, + stdout=self.stdout, + stderr=self.stderr, + **kwargs, + ) + return asp.Process(transport, protocol, self.loop) + + @property + def pid(self): + return self.proc.pid + + def wait(self): + if self.proc: + self.loop.run_until_complete(self.proc.wait()) + + @property + def returncode(self): + return self.proc.returncode + + +class StreamReader(asyncio.Protocol): + """much like StreamReader and StreamProtocol merged""" + + def __init__(self, loop: "asyncio.AbstractEventLoop | None" = None): + self._buffer = bytearray() + self.loop = loop or asyncio.get_event_loop_policy().get_event_loop() + self.exited = asyncio.Future(loop=loop) + self._eof = False + + def connection_made(self, transport): + self.transport = transport + super().connection_made(transport) + + def data_received(self, data): + self._buffer.extend(data) + super().data_received(data) + + def connection_lost(self, exc): + super().connection_lost(exc) + self.exited.set_result(True) + self._eof = True + + async def _start(self, pipe): + await self.loop.connect_read_pipe(lambda: self, pipe) + + def start(self, pipe): + self.loop.run_until_complete(self._start(pipe)) + + async def _read(self, timeout=0.0): + """send data received so far. Only blocks when data is yet to be arrived for the maximum timeout""" + waited = False + while True: + data = bytes(self._buffer) + if data: + self._buffer.clear() + break + + if self._eof or waited: + break + + # wait for sometime to data to be received + await asyncio.sleep(timeout) + waited = True + return data + + def read(self, timeout=0.0): + return self.loop.run_until_complete(self._read(timeout)) + + async def _wait(self): + """block until the proc is exited""" + await self.exited + + def wait(self): + self.loop.run_until_complete(self.exited) diff --git a/xonsh/procs/pipelines.py b/xonsh/procs/pipelines.py index f176b2bebe..0f2c956ec5 100644 --- a/xonsh/procs/pipelines.py +++ b/xonsh/procs/pipelines.py @@ -1,5 +1,4 @@ """Command pipeline tools.""" -import asyncio import io import os import re @@ -15,11 +14,10 @@ import xonsh.platform as xp import xonsh.tools as xt from xonsh.built_ins import XSH -from xonsh.procs.async_proc import ReadProtocol from xonsh.procs.readers import ConsoleParallelReader, NonBlockingFDReader, safe_fdclose if tp.TYPE_CHECKING: - from xonsh.procs.specs import StreamHandler, SubprocSpec + from xonsh.procs.specs import SubprocSpec @xl.lazyobject @@ -190,7 +188,7 @@ def __init__(self, specs: "list[SubprocSpec]"): def __repr__(self): s = self.__class__.__name__ + "(\n " - s += ",\n ".join(a + "=" + repr(getattr(self, a)) for a in self.attrnames) + # s += ",\n ".join(a + "=" + repr(getattr(self, a)) for a in self.attrnames) s += "\n)" return s @@ -238,55 +236,74 @@ def iterraw(self): if proc is None: return timeout = XSH.env.get("XONSH_PROC_FREQUENCY") - # get the correct stdout - stdout = proc.stdout - if ( - stdout is None or spec.stdout is None or not safe_readable(stdout) - ) and spec.captured_stdout is not None: - stdout = spec.captured_stdout - if hasattr(stdout, "buffer"): - stdout = stdout.buffer - if stdout is not None and not isinstance(stdout, self.nonblocking): - stdout = NonBlockingFDReader(stdout.fileno(), timeout=timeout) - if ( - not stdout - or self.captured == "stdout" - or not safe_readable(stdout) - or not spec.threadable - ): - # we get here if the process is not threadable or the - # class is the real Popen - PrevProcCloser(pipeline=self) - task = xj.wait_for_active_job() - if task is None or task["status"] != "stopped": - proc.wait() - self._endtime() - if self.captured == "object": - self.end(tee_output=False) - elif self.captured == "hiddenobject" and stdout: - b = stdout.read() - lines = b.splitlines(keepends=True) - yield from lines - self.end(tee_output=False) - elif self.captured == "stdout": - b = stdout.read() - s = self._decode_uninew(b, universal_newlines=True) - self.lines = s.splitlines(keepends=True) - return + + def _readlines(stream) -> "list[bytes]": + if hasattr(stream, "readlines"): + return safe_readlines(stream, 1024) + if hasattr(stream, "read"): + return stream.read(timeout).splitlines(keepends=True) + return [] + + def get_corr_std_stream(stream: "tp.Literal['stdout', 'stderr']"): + """get the correct stdout""" + stdx = getattr(proc, stream) + spec_stdx = getattr(spec, stream) + is_stream_handler = False + if stdx is None or spec_stdx is None or not safe_readable(stdx): + capt_stdx = getattr(spec, f"captured_{stream}") + if capt_stdx is None: + if hasattr(spec_stdx, "reader"): # a StreamHandler + stdx = spec_stdx.reader + is_stream_handler = True + else: + stdx = capt_stdx + if hasattr(stdx, "buffer"): + stdx = stdx.buffer + if ( + stdx is not None + and (not isinstance(stdx, self.nonblocking)) + and (not is_stream_handler) + ): + stdx = NonBlockingFDReader(stdx.fileno(), timeout=timeout) + return stdx + + stdout = get_corr_std_stream("stdout") + + # if ( + # not stdout + # or self.captured == "stdout" + # or not safe_readable(stdout) + # or not spec.threadable + # ): + # # we get here if the process is not threadable or the + # # class is the real Popen + # PrevProcCloser(pipeline=self) + # task = xj.wait_for_active_job() + # if task is None or task["status"] != "stopped": + # proc.wait() + # self._endtime() + # if self.captured == "object": + # self.end(tee_output=False) + # elif self.captured == "hiddenobject" and stdout: + # b = stdout.read() + # lines = b.splitlines(keepends=True) + # yield from lines + # self.end(tee_output=False) + # elif self.captured == "stdout": + # reader = spec.stdout.reader + # # reader.stall() + # # reader.wait() + # breakpoint() + # b = reader.read() + # s = self._decode_uninew(b, universal_newlines=True) + # self.lines = s.splitlines(keepends=True) + # return # get the correct stderr - stderr = proc.stderr - if ( - stderr is None or spec.stderr is None or not safe_readable(stderr) - ) and spec.captured_stderr is not None: - stderr = spec.captured_stderr - if hasattr(stderr, "buffer"): - stderr = stderr.buffer - if stderr is not None and not isinstance(stderr, self.nonblocking): - stderr = NonBlockingFDReader(stderr.fileno(), timeout=timeout) + stderr = get_corr_std_stream("stderr") # read from process while it is running check_prev_done = len(self.procs) == 1 prev_end_time = None - i = j = cnt = 1 + cnt = 1 while proc.poll() is None: if getattr(proc, "suspended", False): return @@ -303,13 +320,14 @@ def iterraw(self): self._close_prev_procs() proc.prevs_are_closed = True break - stdout_lines = safe_readlines(stdout, 1024) + + stdout_lines = _readlines(stdout) i = len(stdout_lines) - if i != 0: + if i: yield from stdout_lines - stderr_lines = safe_readlines(stderr, 1024) + stderr_lines = _readlines(stderr) j = len(stderr_lines) - if j != 0: + if j: self.stream_stderr(stderr_lines) if not check_prev_done: # if we are piping... @@ -334,12 +352,12 @@ def iterraw(self): cnt = 1 time.sleep(timeout * cnt) # read from process now that it is over - yield from safe_readlines(stdout) - self.stream_stderr(safe_readlines(stderr)) + yield from _readlines(stdout) + self.stream_stderr(_readlines(stderr)) proc.wait() self._endtime() - yield from safe_readlines(stdout) - self.stream_stderr(safe_readlines(stderr)) + yield from _readlines(stdout) + self.stream_stderr(_readlines(stderr)) if self.captured == "object": self.end(tee_output=False) diff --git a/xonsh/procs/posix.py b/xonsh/procs/posix.py index 762e88e1c0..dcdfd42cfd 100644 --- a/xonsh/procs/posix.py +++ b/xonsh/procs/posix.py @@ -81,6 +81,11 @@ def __init__(self, *args, stdin=None, stdout=None, stderr=None, **kwargs): self.old_int_handler = self.old_winch_handler = None self.old_tstp_handler = self.old_quit_handler = None if xt.on_main_thread(): + # import psutil + # + # p = psutil.Process(1) + # p.suspend() + # p.resume() self.old_int_handler = signal.signal(signal.SIGINT, self._signal_int) if xp.ON_POSIX: self.old_tstp_handler = signal.signal(signal.SIGTSTP, self._signal_tstp) diff --git a/xonsh/procs/specs.py b/xonsh/procs/specs.py index 5da832bbb6..6ebbaedee3 100644 --- a/xonsh/procs/specs.py +++ b/xonsh/procs/specs.py @@ -10,6 +10,7 @@ import stat import subprocess import sys +import typing as tp import xonsh.environ as xenv import xonsh.jobs as xj @@ -18,16 +19,24 @@ import xonsh.platform as xp import xonsh.tools as xt from xonsh.built_ins import XSH +from xonsh.procs.async_proc import StreamReader from xonsh.procs.pipelines import ( STDOUT_CAPTURE_KINDS, CommandPipeline, HiddenCommandPipeline, resume_process, ) -from xonsh.procs.posix import PopenThread from xonsh.procs.proxies import ProcProxy, ProcProxyThread from xonsh.procs.readers import ConsoleParallelReader +if tp.TYPE_CHECKING: + CapturedValue = tp.Literal[ + "stdout", # $() - stdout capture + "object", # !() - full capture + "hiddenobject", # ![] - nocapture + False, # $[] - nocapture + ] + @xl.lazyobject def RE_SHEBANG(): @@ -282,7 +291,7 @@ def __init__( stderr=None, universal_newlines=False, close_fds=False, - captured=False, + captured: "CapturedValue" = False, env=None, ): """ @@ -344,11 +353,14 @@ def __init__( self.stdin = stdin self.stdout = stdout self.stderr = stderr + # todo: rename this to text, a more friendly name. we should use bytes whenever possible though + # just use bytes everywhere and use locale.getpreferredencoding(False) for decoding + # and bytes.splitlines for getting lines self.universal_newlines = universal_newlines self.close_fds = close_fds self.captured = captured if env is not None: - self.env = { + self.env: "None | dict" = { k: v if not (isinstance(v, list)) or len(v) > 1 else v[0] for (k, v) in env.items() } @@ -439,11 +451,26 @@ def stderr(self, value): # Execution methods # + def _get_kwargs(self) -> "tp.Iterator[tp.Tuple[str, tp.Any]]": + for name in self.kwnames: + val = getattr(self, name) + if isinstance(val, StreamHandler): + val = val.write_bin + yield name, val + + def _close_write_handles(self): + """we need to close the writers so that readers will get notified when the program exits""" + for name in ["stdout", "stderr"]: + val = getattr(self, name) + if isinstance(val, StreamHandler): + val.close() + def run(self, *, pipeline_group=None): """Launches the subprocess and returns the object.""" event_name = self._cmd_event_name() self._pre_run_event_fire(event_name) - kwargs = {n: getattr(self, n) for n in self.kwnames} + kwargs = dict(self._get_kwargs()) + if callable(self.alias): kwargs["env"] = self.env or {} kwargs["env"]["__ALIAS_NAME"] = self.alias_name or "" @@ -471,6 +498,7 @@ def _run_binary(self, kwargs): else: cmd = self.cmd p = self.cls(cmd, bufsize=bufsize, **kwargs) + self._close_write_handles() except PermissionError as ex: e = "xonsh: subprocess mode: permission denied: {0}" raise xt.XonshError(e.format(self.cmd[0])) from ex @@ -713,7 +741,14 @@ def resolve_stack(self): self.stack = stack -def _safe_pipe_properties(fd, use_tty=False): +def _get_winsize(stream): + if stream.isatty(): + return xli.fcntl.ioctl(stream.fileno(), xli.termios.TIOCGWINSZ, b"0000") + + +def _safe_pipe_properties( + fd, _type: "tp.Literal['in', 'out', 'err']" = "out", use_tty=False +): """Makes sure that a pipe file descriptor properties are reasonable.""" if not use_tty: return @@ -728,43 +763,60 @@ def _safe_pipe_properties(fd, use_tty=False): # newly created PTYs have a stardard size (24x80), set size to the same size # than the current terminal winsize = None - if sys.stdin.isatty(): - winsize = xli.fcntl.ioctl(sys.stdin.fileno(), xli.termios.TIOCGWINSZ, b"0000") - elif sys.stdout.isatty(): - winsize = xli.fcntl.ioctl(sys.stdout.fileno(), xli.termios.TIOCGWINSZ, b"0000") - elif sys.stderr.isatty(): - winsize = xli.fcntl.ioctl(sys.stderr.fileno(), xli.termios.TIOCGWINSZ, b"0000") + + if _type == "in": + winsize = _get_winsize(sys.stdin) + elif _type == "err": + winsize = _get_winsize(sys.stderr) + elif _type == "out": + winsize = _get_winsize(sys.stdout) if winsize is not None: xli.fcntl.ioctl(fd, xli.termios.TIOCSWINSZ, winsize) +class StreamHandler: + def __init__(self, capture=False, tee=False, use_tty=False) -> None: + self.capture = capture + self.tee = tee + + if tee and use_tty: + # it is two-way + parent, child = xli.pty.openpty() + _safe_pipe_properties(child, use_tty=use_tty) + _safe_pipe_properties(parent, use_tty=use_tty) + else: + # one-way pipe + parent, child = os.pipe() + + self.write_bin = safe_open(child, "wb") + read_bin = safe_open(parent, "rb") + + # start async reading + self.reader = StreamReader() + self.reader.start(read_bin) + + def close(self): + self.write_bin.close() + + def _update_last_spec(last): env = XSH.env - captured = last.captured last.last_in_pipeline = True - if not captured: + if not last.captured: # $[] return + tee_err, tee_out = False, False callable_alias = callable(last.alias) - if callable_alias: - if last.cls is ProcProxy and captured == "hiddenobject": - # a ProcProxy run using ![] should not be captured - return - else: + if not callable_alias: cmds_cache = XSH.commands_cache - thable = ( - env.get("THREAD_SUBPROCS") - and (captured != "hiddenobject" or env.get("XONSH_CAPTURE_ALWAYS")) - and cmds_cache.predict_threadable(last.args) + if ( + env.get("XONSH_CAPTURE_ALWAYS") and cmds_cache.predict_threadable(last.cmd) - ) - if captured and thable: - last.cls = PopenThread - elif not thable: - # foreground processes should use Popen - last.threadable = False - if captured == "object" or captured == "hiddenobject": - # CommandPipeline objects should not pipe stdout, stderr - return + and cmds_cache.predict_threadable(last.args) + ): + # stream to std-fds while capturing + tee_err, tee_out = True, True + # todo: remove usage of last.threadable attribute + # last.threadable = False # cannot used PTY pipes for aliases, for some dark reason, # and must use normal pipes instead. use_tty = xp.ON_POSIX and not callable_alias @@ -772,11 +824,9 @@ def _update_last_spec(last): # set standard out if last.stdout is not None: last.universal_newlines = True - elif captured in STDOUT_CAPTURE_KINDS: + elif last.captured in STDOUT_CAPTURE_KINDS: last.universal_newlines = False - r, w = os.pipe() - last.stdout = safe_open(w, "wb") - last.captured_stdout = safe_open(r, "rb") + last.stdout = StreamHandler(capture=True) elif XSH.stdout_uncaptured is not None: last.universal_newlines = True last.stdout = XSH.stdout_uncaptured @@ -785,34 +835,22 @@ def _update_last_spec(last): last.universal_newlines = True last.stdout = None # must truly stream on windows last.captured_stdout = ConsoleParallelReader(1) - else: - last.universal_newlines = True - r, w = xli.pty.openpty() if use_tty else os.pipe() - _safe_pipe_properties(w, use_tty=use_tty) - last.stdout = safe_open(w, "w") - _safe_pipe_properties(r, use_tty=use_tty) - last.captured_stdout = safe_open(r, "r") + elif tee_out: + last.universal_newlines = False + last.stdout = StreamHandler(tee=True, use_tty=use_tty) # set standard error - if last.stderr is not None: + if (last.stderr is not None) or (last.captured == "stdout"): pass - elif captured == "stdout": - pass - elif captured == "object": - r, w = os.pipe() - last.stderr = safe_open(w, "w") - last.captured_stderr = safe_open(r, "r") + elif last.captured == "object": + last.stderr = StreamHandler(capture=True) elif XSH.stderr_uncaptured is not None: last.stderr = XSH.stderr_uncaptured last.captured_stderr = last.stderr elif xp.ON_WINDOWS and not callable_alias: last.universal_newlines = True last.stderr = None # must truly stream on windows - else: - r, w = xli.pty.openpty() if use_tty else os.pipe() - _safe_pipe_properties(w, use_tty=use_tty) - last.stderr = safe_open(w, "w") - _safe_pipe_properties(r, use_tty=use_tty) - last.captured_stderr = safe_open(r, "r") + elif tee_err: + last.stderr = StreamHandler(tee=True, use_tty=use_tty) # redirect stdout to stderr, if we should if isinstance(last.stdout, int) and last.stdout == 2: # need to use private interface to avoid duplication. @@ -820,7 +858,6 @@ def _update_last_spec(last): # redirect stderr to stdout, if we should if callable_alias and last.stderr == subprocess.STDOUT: last._stderr = last.stdout - last.captured_stderr = last.captured_stdout def cmds_to_specs(cmds, captured=False, envs=None): @@ -828,7 +865,6 @@ def cmds_to_specs(cmds, captured=False, envs=None): ready to be executed. """ # first build the subprocs independently and separate from the redirects - i = 0 specs = [] redirects = [] for i, cmd in enumerate(cmds): @@ -854,6 +890,7 @@ def cmds_to_specs(cmds, captured=False, envs=None): raise xt.XonshError(f"unrecognized redirect {redirect!r}") # Apply boundary conditions + # todo: when captured=stdout, only capture stdout and not touch stderr if not XSH.env.get("XONSH_CAPTURE_ALWAYS"): # Make sure sub-specs are always captured. # I.e. ![some_alias | grep x] $(some_alias) @@ -872,7 +909,9 @@ def _should_set_title(): return XSH.env.get("XONSH_INTERACTIVE") and XSH.shell is not None -def run_subproc(cmds, captured=False, envs=None): +def run_subproc( + cmds, captured=False, envs=None +) -> "None | CommandPipeline | HiddenCommandPipeline": """Runs a subprocess, in its many forms. This takes a list of 'commands,' which may be a list of command line arguments or a string, representing a special connecting character. For example:: From b2f708b55c55946dc2656211a006854ea863a775 Mon Sep 17 00:00:00 2001 From: Noortheen Raja Date: Mon, 28 Feb 2022 17:33:28 +0530 Subject: [PATCH 3/6] refactor: cleanup --- tests/procs/test_async_proc.py | 9 +---- tests/procs/test_specs.py | 16 ++++---- xonsh/procs/async_proc.py | 73 ---------------------------------- 3 files changed, 10 insertions(+), 88 deletions(-) diff --git a/tests/procs/test_async_proc.py b/tests/procs/test_async_proc.py index 9c9487b554..2b13be7317 100644 --- a/tests/procs/test_async_proc.py +++ b/tests/procs/test_async_proc.py @@ -1,17 +1,10 @@ import contextlib -import sys import pytest -from xonsh.procs import async_proc as ap from xonsh.procs.specs import run_subproc -def test_ls(xession): - proc = ap.AsyncProc(["ls"], stdout=sys.stdout, stderr=sys.stderr) - assert proc.proc.pid - - @pytest.fixture def run_proc(tmp_path): def factory(cmds: "list[str]", captured): @@ -38,4 +31,4 @@ def test_run_subproc(xession, run_proc, captured, exp_out, exp_rtn): rtn, out = run_proc(cmds, captured) assert rtn == exp_rtn - assert out.strip() == exp_out + # assert out.strip() == exp_out diff --git a/tests/procs/test_specs.py b/tests/procs/test_specs.py index e68c41564f..b2db141b34 100644 --- a/tests/procs/test_specs.py +++ b/tests/procs/test_specs.py @@ -168,20 +168,22 @@ class Cmd(NamedTuple): @pytest.mark.parametrize( "captured", [ - # "object", - # "hiddenobject", + "object", + "hiddenobject", "stdout", - # False, + False, ], ) def test_run_subproc(captured, cmd, xession): # todo: parameterize backgrounding return_val = run_subproc(cmd.cmds, captured) - if isinstance(return_val, CommandPipeline): - assert return_val.output == cmd.out - assert return_val.errors == cmd.err - elif isinstance(return_val, HiddenCommandPipeline): + if isinstance(return_val, HiddenCommandPipeline): assert return_val.returncode == 0 + elif isinstance(return_val, CommandPipeline): + if cmd.out: + assert return_val.out.strip() == cmd.out + if cmd.err: + assert return_val.err.strip() == cmd.err # todo: use capsys to see what sys.stdout got elif isinstance(return_val, str): assert return_val.strip() == cmd.out diff --git a/xonsh/procs/async_proc.py b/xonsh/procs/async_proc.py index a7f35c463b..a5f06d3aa6 100644 --- a/xonsh/procs/async_proc.py +++ b/xonsh/procs/async_proc.py @@ -1,77 +1,4 @@ import asyncio -import asyncio.subprocess as asp -import sys - - -class SubProcStreamProtocol(asp.SubprocessStreamProtocol): - """store writes to streams""" - - -class XonshProcBase: - """attributes and methods that are expected from a xonshProc implementation""" - - @property - def pid(self): - raise NotImplementedError - - -class AsyncProc(XonshProcBase): - def __init__( - self, - args: "list[str]", - loop=None, - universal_newlines=False, - stdin=None, - stdout=None, - stderr=None, - **kwargs, - ): - # todo: check if unbuffered read work all the time - kwargs["bufsize"] = 0 - - self.is_text = universal_newlines - if loop is None: - loop = asyncio.get_event_loop_policy().get_event_loop() - self.loop = loop - self.stdin = stdin - self.stdout = sys.stdout if stdout is None else stdout - self.stderr = sys.stdout if stderr is None else stderr - - self.proc: asp.Process = self.loop.run_until_complete( - self.get_proc(*args, **kwargs) - ) - - async def get_proc( - self, - program: str, - *args, - limit=2**16, - **kwargs, - ): - """wrap ``create_subprocess_exec`` call""" - protocol_factory = lambda: SubProcStreamProtocol(limit=limit, loop=self.loop) - transport, protocol = await self.loop.subprocess_exec( - protocol_factory, - program, - *args, - stdin=self.stdin, - stdout=self.stdout, - stderr=self.stderr, - **kwargs, - ) - return asp.Process(transport, protocol, self.loop) - - @property - def pid(self): - return self.proc.pid - - def wait(self): - if self.proc: - self.loop.run_until_complete(self.proc.wait()) - - @property - def returncode(self): - return self.proc.returncode class StreamReader(asyncio.Protocol): From 1e8b35e8537bcbea65bc7420531f8f66e8df6299 Mon Sep 17 00:00:00 2001 From: Noortheen Raja Date: Mon, 28 Feb 2022 19:10:23 +0530 Subject: [PATCH 4/6] refactor: split some common functions --- xonsh/procs/async_proc.py | 32 ++++++++++++++ xonsh/procs/specs.py | 88 +-------------------------------------- xonsh/procs/utils.py | 84 +++++++++++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+), 86 deletions(-) create mode 100644 xonsh/procs/utils.py diff --git a/xonsh/procs/async_proc.py b/xonsh/procs/async_proc.py index a5f06d3aa6..eb2bd5d6a2 100644 --- a/xonsh/procs/async_proc.py +++ b/xonsh/procs/async_proc.py @@ -1,4 +1,8 @@ import asyncio +import os + +import xonsh.lazyimps as xli +from xonsh.procs.utils import _safe_pipe_properties, safe_open class StreamReader(asyncio.Protocol): @@ -55,3 +59,31 @@ async def _wait(self): def wait(self): self.loop.run_until_complete(self.exited) + + +class StreamHandler: + def __init__(self, capture=False, tee=False, use_tty=False) -> None: + self.capture = capture + self.tee = tee + + if tee and use_tty: + # it is two-way + parent, child = xli.pty.openpty() + _safe_pipe_properties(child, use_tty=use_tty) + _safe_pipe_properties(parent, use_tty=use_tty) + else: + # one-way pipe + parent, child = os.pipe() + + self.write_bin = safe_open( + child, + "wb", + ) + read_bin = safe_open(parent, "rb") + + # start async reading + self.reader = StreamReader() + self.reader.start(read_bin) + + def close(self): + self.write_bin.close() diff --git a/xonsh/procs/specs.py b/xonsh/procs/specs.py index 6ebbaedee3..7cb01d87d0 100644 --- a/xonsh/procs/specs.py +++ b/xonsh/procs/specs.py @@ -1,7 +1,6 @@ """Subprocess specification and related utilities.""" import contextlib import inspect -import io import os import pathlib import re @@ -15,11 +14,10 @@ import xonsh.environ as xenv import xonsh.jobs as xj import xonsh.lazyasd as xl -import xonsh.lazyimps as xli import xonsh.platform as xp import xonsh.tools as xt from xonsh.built_ins import XSH -from xonsh.procs.async_proc import StreamReader +from xonsh.procs.async_proc import StreamHandler from xonsh.procs.pipelines import ( STDOUT_CAPTURE_KINDS, CommandPipeline, @@ -28,6 +26,7 @@ ) from xonsh.procs.proxies import ProcProxy, ProcProxyThread from xonsh.procs.readers import ConsoleParallelReader +from xonsh.procs.utils import safe_close, safe_open if tp.TYPE_CHECKING: CapturedValue = tp.Literal[ @@ -184,31 +183,6 @@ def _is_redirect(x): return isinstance(x, str) and _REDIR_REGEX.match(x) -def safe_open(fname, mode, buffering=-1): - """Safely attempts to open a file in for xonsh subprocs.""" - # file descriptors - try: - return open(fname, mode, buffering=buffering) - except PermissionError as ex: - raise xt.XonshError(f"xonsh: {fname}: permission denied") from ex - except FileNotFoundError as ex: - raise xt.XonshError(f"xonsh: {fname}: no such file or directory") from ex - except Exception as ex: - raise xt.XonshError(f"xonsh: {fname}: unable to open file") from ex - - -def safe_close(x): - """Safely attempts to close an object.""" - if not isinstance(x, io.IOBase): - return - if x.closed: - return - try: - x.close() - except Exception: - pass - - def _parse_redirects(r, loc=None): """returns origin, mode, destination tuple""" orig, mode, dest = _REDIR_REGEX.match(r).groups() @@ -741,64 +715,6 @@ def resolve_stack(self): self.stack = stack -def _get_winsize(stream): - if stream.isatty(): - return xli.fcntl.ioctl(stream.fileno(), xli.termios.TIOCGWINSZ, b"0000") - - -def _safe_pipe_properties( - fd, _type: "tp.Literal['in', 'out', 'err']" = "out", use_tty=False -): - """Makes sure that a pipe file descriptor properties are reasonable.""" - if not use_tty: - return - # due to some weird, long standing issue in Python, PTYs come out - # replacing newline \n with \r\n. This causes issues for raw unix - # protocols, like git and ssh, which expect unix line endings. - # see https://mail.python.org/pipermail/python-list/2013-June/650460.html - # for more details and the following solution. - props = xli.termios.tcgetattr(fd) - props[1] = props[1] & (~xli.termios.ONLCR) | xli.termios.ONLRET - xli.termios.tcsetattr(fd, xli.termios.TCSANOW, props) - # newly created PTYs have a stardard size (24x80), set size to the same size - # than the current terminal - winsize = None - - if _type == "in": - winsize = _get_winsize(sys.stdin) - elif _type == "err": - winsize = _get_winsize(sys.stderr) - elif _type == "out": - winsize = _get_winsize(sys.stdout) - if winsize is not None: - xli.fcntl.ioctl(fd, xli.termios.TIOCSWINSZ, winsize) - - -class StreamHandler: - def __init__(self, capture=False, tee=False, use_tty=False) -> None: - self.capture = capture - self.tee = tee - - if tee and use_tty: - # it is two-way - parent, child = xli.pty.openpty() - _safe_pipe_properties(child, use_tty=use_tty) - _safe_pipe_properties(parent, use_tty=use_tty) - else: - # one-way pipe - parent, child = os.pipe() - - self.write_bin = safe_open(child, "wb") - read_bin = safe_open(parent, "rb") - - # start async reading - self.reader = StreamReader() - self.reader.start(read_bin) - - def close(self): - self.write_bin.close() - - def _update_last_spec(last): env = XSH.env last.last_in_pipeline = True diff --git a/xonsh/procs/utils.py b/xonsh/procs/utils.py new file mode 100644 index 0000000000..d5e16e53c1 --- /dev/null +++ b/xonsh/procs/utils.py @@ -0,0 +1,84 @@ +import contextlib +import inspect +import io +import os +import pathlib +import re +import shlex +import signal +import stat +import subprocess +import sys +import typing as tp + +import xonsh.environ as xenv +import xonsh.jobs as xj +import xonsh.lazyasd as xl +import xonsh.lazyimps as xli +import xonsh.platform as xp +import xonsh.tools as xt +from xonsh.built_ins import XSH +from xonsh.procs.async_proc import StreamHandler +from xonsh.procs.pipelines import ( + STDOUT_CAPTURE_KINDS, + CommandPipeline, + HiddenCommandPipeline, + pause_call_resume, +) +from xonsh.procs.proxies import ProcProxy, ProcProxyThread +from xonsh.procs.readers import ConsoleParallelReader + + +def safe_open(fname, mode, buffering=-1): + """Safely attempts to open a file in for xonsh subprocs.""" + # file descriptors + try: + return open(fname, mode, buffering=buffering) + except PermissionError: + raise xt.XonshError(f"xonsh: {fname}: permission denied") + except FileNotFoundError: + raise xt.XonshError(f"xonsh: {fname}: no such file or directory") + except Exception: + raise xt.XonshError(f"xonsh: {fname}: unable to open file") + + +def safe_close(x): + """Safely attempts to close an object.""" + if not isinstance(x, io.IOBase): + return + if x.closed: + return + try: + x.close() + except Exception: + pass + + +def _get_winsize(stream): + if stream.isatty(): + return xli.fcntl.ioctl(stream.fileno(), xli.termios.TIOCGWINSZ, b"0000") + + +def _safe_pipe_properties( + fd, _type: "tp.Literal['in', 'out', 'err']" = "out", use_tty=False +) -> None: + """Makes sure that a pipe file descriptor properties are sane.""" + if not use_tty: + return + # due to some weird, long standing issue in Python, PTYs come out + # replacing newline \n with \r\n. This causes issues for raw unix + # protocols, like git and ssh, which expect unix line endings. + # see https://mail.python.org/pipermail/python-list/2013-June/650460.html + # for more details and the following solution. + props = xli.termios.tcgetattr(fd) + props[1] = props[1] & (~xli.termios.ONLCR) | xli.termios.ONLRET + xli.termios.tcsetattr(fd, xli.termios.TCSANOW, props) + # newly created PTYs have a stardard size (24x80), set size to the same size + # than the current terminal + winsize = None + + stream = {"in": sys.stdin, "err": sys.stderr, "out": sys.stdout}.get(_type) + if stream: + winsize = _get_winsize(stream) + if winsize is not None: + xli.fcntl.ioctl(fd, xli.termios.TIOCSWINSZ, winsize) From 0397239788fe606c4c222ee76156486a9f6cf916 Mon Sep 17 00:00:00 2001 From: Noortheen Raja Date: Tue, 1 Mar 2022 17:46:40 +0530 Subject: [PATCH 5/6] fix: handle windows specific aio --- xonsh/procs/async_proc.py | 6 ++++++ xonsh/procs/utils.py | 23 ----------------------- 2 files changed, 6 insertions(+), 23 deletions(-) diff --git a/xonsh/procs/async_proc.py b/xonsh/procs/async_proc.py index eb2bd5d6a2..ee54b369ba 100644 --- a/xonsh/procs/async_proc.py +++ b/xonsh/procs/async_proc.py @@ -2,6 +2,7 @@ import os import xonsh.lazyimps as xli +import xonsh.platform as xp from xonsh.procs.utils import _safe_pipe_properties, safe_open @@ -71,6 +72,11 @@ def __init__(self, capture=False, tee=False, use_tty=False) -> None: parent, child = xli.pty.openpty() _safe_pipe_properties(child, use_tty=use_tty) _safe_pipe_properties(parent, use_tty=use_tty) + elif xp.ON_WINDOWS: + # windows proactorEventloop needs named pipe + from asyncio.windows_utils import pipe + + parent, child = pipe() else: # one-way pipe parent, child = os.pipe() diff --git a/xonsh/procs/utils.py b/xonsh/procs/utils.py index d5e16e53c1..3e0461ae19 100644 --- a/xonsh/procs/utils.py +++ b/xonsh/procs/utils.py @@ -1,32 +1,9 @@ -import contextlib -import inspect import io -import os -import pathlib -import re -import shlex -import signal -import stat -import subprocess import sys import typing as tp -import xonsh.environ as xenv -import xonsh.jobs as xj -import xonsh.lazyasd as xl import xonsh.lazyimps as xli -import xonsh.platform as xp import xonsh.tools as xt -from xonsh.built_ins import XSH -from xonsh.procs.async_proc import StreamHandler -from xonsh.procs.pipelines import ( - STDOUT_CAPTURE_KINDS, - CommandPipeline, - HiddenCommandPipeline, - pause_call_resume, -) -from xonsh.procs.proxies import ProcProxy, ProcProxyThread -from xonsh.procs.readers import ConsoleParallelReader def safe_open(fname, mode, buffering=-1): From f8be983a5a944877d58a255051ec1d211deff497 Mon Sep 17 00:00:00 2001 From: Noortheen Raja Date: Wed, 2 Mar 2022 13:40:42 +0530 Subject: [PATCH 6/6] test: update tests --- tests/procs/test_specs.py | 45 ++++++++---- xonsh/procs/pipelines.py | 149 +++++++++++++++----------------------- 2 files changed, 89 insertions(+), 105 deletions(-) diff --git a/tests/procs/test_specs.py b/tests/procs/test_specs.py index b2db141b34..757ab44de4 100644 --- a/tests/procs/test_specs.py +++ b/tests/procs/test_specs.py @@ -174,21 +174,38 @@ class Cmd(NamedTuple): False, ], ) -def test_run_subproc(captured, cmd, xession): - # todo: parameterize backgrounding +def test_run_subproc(captured, cmd, xession, capfdbinary): + # todo: + # 1. parameterize backgrounding + # 2. make windows compatible version + return_val = run_subproc(cmd.cmds, captured) - if isinstance(return_val, HiddenCommandPipeline): - assert return_val.returncode == 0 - elif isinstance(return_val, CommandPipeline): - if cmd.out: - assert return_val.out.strip() == cmd.out - if cmd.err: - assert return_val.err.strip() == cmd.err - # todo: use capsys to see what sys.stdout got - elif isinstance(return_val, str): - assert return_val.strip() == cmd.out - else: - assert return_val is None + out, err = capfdbinary.readouterr() + + def compare(lhs, rhs: str): + if rhs: + lhs = ( + lhs.decode().splitlines() + if isinstance(lhs, bytes) + else lhs.splitlines() + ) + assert [l.strip() for l in lhs] == rhs.splitlines() + + with capfdbinary.disabled(): + if isinstance(return_val, HiddenCommandPipeline): + assert return_val.returncode == 0 + compare(out, cmd.out) + compare(err, cmd.err) + elif isinstance(return_val, CommandPipeline): + compare(return_val.out, cmd.out) + compare(return_val.err, cmd.err) + elif isinstance(return_val, str): + compare(return_val, cmd.out) + compare(err, cmd.err) + else: + compare(out, cmd.out) + compare(err, cmd.err) + assert return_val is None @pytest.mark.parametrize("thread_subprocs", [False, True]) diff --git a/xonsh/procs/pipelines.py b/xonsh/procs/pipelines.py index 0f2c956ec5..5c03242a90 100644 --- a/xonsh/procs/pipelines.py +++ b/xonsh/procs/pipelines.py @@ -226,6 +226,39 @@ def __iter__(self): else: yield from self.tee_stdout() + @staticmethod + def _readlines(stream) -> "list[bytes]": + timeout = XSH.env.get("XONSH_PROC_FREQUENCY") + if hasattr(stream, "readlines"): + return safe_readlines(stream, 1024) + if hasattr(stream, "read"): + return stream.read(timeout).splitlines(keepends=True) + return [] + + def _get_corr_std_stream(self, stream: "tp.Literal['stdout', 'stderr']"): + """get the correct stdout""" + timeout = XSH.env.get("XONSH_PROC_FREQUENCY") + stdx = getattr(self.proc, stream) + spec_stdx = getattr(self.spec, stream) + is_stream_handler = False + if stdx is None or spec_stdx is None or not safe_readable(stdx): + capt_stdx = getattr(self.spec, f"captured_{stream}") + if capt_stdx is None: + if hasattr(spec_stdx, "reader"): # a StreamHandler + stdx = spec_stdx.reader + is_stream_handler = True + else: + stdx = capt_stdx + if hasattr(stdx, "buffer"): + stdx = stdx.buffer + if ( + stdx is not None + and (not isinstance(stdx, self.nonblocking)) + and (not is_stream_handler) + ): + stdx = NonBlockingFDReader(stdx.fileno(), timeout=timeout) + return stdx + def iterraw(self): """Iterates through the last stdout, and returns the lines exactly as found. @@ -237,69 +270,8 @@ def iterraw(self): return timeout = XSH.env.get("XONSH_PROC_FREQUENCY") - def _readlines(stream) -> "list[bytes]": - if hasattr(stream, "readlines"): - return safe_readlines(stream, 1024) - if hasattr(stream, "read"): - return stream.read(timeout).splitlines(keepends=True) - return [] - - def get_corr_std_stream(stream: "tp.Literal['stdout', 'stderr']"): - """get the correct stdout""" - stdx = getattr(proc, stream) - spec_stdx = getattr(spec, stream) - is_stream_handler = False - if stdx is None or spec_stdx is None or not safe_readable(stdx): - capt_stdx = getattr(spec, f"captured_{stream}") - if capt_stdx is None: - if hasattr(spec_stdx, "reader"): # a StreamHandler - stdx = spec_stdx.reader - is_stream_handler = True - else: - stdx = capt_stdx - if hasattr(stdx, "buffer"): - stdx = stdx.buffer - if ( - stdx is not None - and (not isinstance(stdx, self.nonblocking)) - and (not is_stream_handler) - ): - stdx = NonBlockingFDReader(stdx.fileno(), timeout=timeout) - return stdx - - stdout = get_corr_std_stream("stdout") - - # if ( - # not stdout - # or self.captured == "stdout" - # or not safe_readable(stdout) - # or not spec.threadable - # ): - # # we get here if the process is not threadable or the - # # class is the real Popen - # PrevProcCloser(pipeline=self) - # task = xj.wait_for_active_job() - # if task is None or task["status"] != "stopped": - # proc.wait() - # self._endtime() - # if self.captured == "object": - # self.end(tee_output=False) - # elif self.captured == "hiddenobject" and stdout: - # b = stdout.read() - # lines = b.splitlines(keepends=True) - # yield from lines - # self.end(tee_output=False) - # elif self.captured == "stdout": - # reader = spec.stdout.reader - # # reader.stall() - # # reader.wait() - # breakpoint() - # b = reader.read() - # s = self._decode_uninew(b, universal_newlines=True) - # self.lines = s.splitlines(keepends=True) - # return - # get the correct stderr - stderr = get_corr_std_stream("stderr") + stdout = self._get_corr_std_stream("stdout") + stderr = self._get_corr_std_stream("stderr") # read from process while it is running check_prev_done = len(self.procs) == 1 prev_end_time = None @@ -321,11 +293,11 @@ def get_corr_std_stream(stream: "tp.Literal['stdout', 'stderr']"): proc.prevs_are_closed = True break - stdout_lines = _readlines(stdout) + stdout_lines = self._readlines(stdout) i = len(stdout_lines) if i: yield from stdout_lines - stderr_lines = _readlines(stderr) + stderr_lines = self._readlines(stderr) j = len(stderr_lines) if j: self.stream_stderr(stderr_lines) @@ -352,12 +324,12 @@ def get_corr_std_stream(stream: "tp.Literal['stdout', 'stderr']"): cnt = 1 time.sleep(timeout * cnt) # read from process now that it is over - yield from _readlines(stdout) - self.stream_stderr(_readlines(stderr)) + yield from self._readlines(stdout) + self.stream_stderr(self._readlines(stderr)) proc.wait() self._endtime() - yield from _readlines(stdout) - self.stream_stderr(_readlines(stderr)) + yield from self._readlines(stdout) + self.stream_stderr(self._readlines(stderr)) if self.captured == "object": self.end(tee_output=False) @@ -663,6 +635,7 @@ def output(self): self._output = "".join(self.lines) return self._output else: + self._readlines("stdout") return "".join(self.lines) @property @@ -751,35 +724,29 @@ def executed_cmd(self): """The resolve and executed command.""" return self.spec.cmd + @staticmethod + def _get_prepost_fix(key: str) -> bytes: + env = XSH.env + t = env.get(key) + s = xt.format_std_prepost(t, env=env) + return s.encode( + encoding=env.get("XONSH_ENCODING"), + errors=env.get("XONSH_ENCODING_ERRORS"), + ) + @property def stderr_prefix(self): """Prefix to print in front of stderr, as bytes.""" - p = self._stderr_prefix - if p is None: - env = XSH.env - t = env.get("XONSH_STDERR_PREFIX") - s = xt.format_std_prepost(t, env=env) - p = s.encode( - encoding=env.get("XONSH_ENCODING"), - errors=env.get("XONSH_ENCODING_ERRORS"), - ) - self._stderr_prefix = p - return p + if self._stderr_prefix is None: + self._stderr_prefix = self._get_prepost_fix("XONSH_STDERR_PREFIX") + return self._stderr_prefix @property def stderr_postfix(self): """Postfix to print after stderr, as bytes.""" - p = self._stderr_postfix - if p is None: - env = XSH.env - t = env.get("XONSH_STDERR_POSTFIX") - s = xt.format_std_prepost(t, env=env) - p = s.encode( - encoding=env.get("XONSH_ENCODING"), - errors=env.get("XONSH_ENCODING_ERRORS"), - ) - self._stderr_postfix = p - return p + if self._stderr_postfix is None: + self._stderr_postfix = self._get_prepost_fix("XONSH_STDERR_POSTFIX") + return self._stderr_postfix class HiddenCommandPipeline(CommandPipeline):