Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rm: threaded subprocs #4634

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 34 additions & 0 deletions tests/procs/test_async_proc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import contextlib

import pytest

from xonsh.procs.specs import run_subproc


@pytest.fixture
def run_proc(tmp_path):
def factory(cmds: "list[str]", captured):
out_file = tmp_path / "stdout"
with out_file.open("wb") as fw:
with contextlib.redirect_stdout(fw):
return_val = run_subproc([cmds], captured)
return return_val, out_file.read_text()

return factory


@pytest.mark.parametrize(
"captured,exp_out,exp_rtn",
[
pytest.param(False, "hello", None, id="$[]"),
pytest.param("stdout", "", "hello", id="$()"),
],
)
def test_run_subproc(xession, run_proc, captured, exp_out, exp_rtn):
xession.env["XONSH_SHOW_TRACEBACK"] = True
cmds = ["echo", "hello"]

rtn, out = run_proc(cmds, captured)

assert rtn == exp_rtn
# assert out.strip() == exp_out
59 changes: 59 additions & 0 deletions tests/procs/test_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import itertools
import sys
from subprocess import Popen
from typing import NamedTuple

import pytest

from xonsh.procs.pipelines import CommandPipeline, HiddenCommandPipeline
from xonsh.procs.posix import PopenThread
from xonsh.procs.proxies import STDOUT_DISPATCHER, ProcProxy, ProcProxyThread
from xonsh.procs.specs import SubprocSpec, cmds_to_specs, run_subproc
Expand Down Expand Up @@ -149,6 +151,63 @@ def test_run_subproc_background(captured, exp_is_none):
assert (return_val is None) == exp_is_none


class Cmd(NamedTuple):
cmds: tuple
out: "str|None" = None
err: "str|None" = None


samples = [
Cmd((["echo", "hello"],), out="hello"), # working
Cmd((["echo", "hello"], "|", ["wc", "-c"]), out="6"), # failing
]


@skip_if_on_windows
@pytest.mark.parametrize("cmd", samples)
@pytest.mark.parametrize(
"captured",
[
"object",
"hiddenobject",
"stdout",
False,
],
)
def test_run_subproc(captured, cmd, xession, capfdbinary):
# todo:
# 1. parameterize backgrounding
# 2. make windows compatible version

return_val = run_subproc(cmd.cmds, captured)
out, err = capfdbinary.readouterr()

def compare(lhs, rhs: str):
if rhs:
lhs = (
lhs.decode().splitlines()
if isinstance(lhs, bytes)
else lhs.splitlines()
)
assert [l.strip() for l in lhs] == rhs.splitlines()

with capfdbinary.disabled():
if isinstance(return_val, HiddenCommandPipeline):
assert return_val.returncode == 0
compare(out, cmd.out)
compare(err, cmd.err)
elif isinstance(return_val, CommandPipeline):
compare(return_val.out, cmd.out)
compare(return_val.err, cmd.err)
elif isinstance(return_val, str):
compare(return_val, cmd.out)
compare(err, cmd.err)
else:
compare(out, cmd.out)
compare(err, cmd.err)
assert return_val is None


@pytest.mark.parametrize("thread_subprocs", [False, True])
def test_callable_alias_cls(thread_subprocs, xession):
class Cls:
Expand Down
3 changes: 3 additions & 0 deletions xonsh/built_ins.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,11 @@ def __init__(self):

# Session attributes
self.exit = None

# todo: these two attributes seems not used
self.stdout_uncaptured = None
self.stderr_uncaptured = None

self._py_exit = None
self._py_quit = None
self.commands_cache = None
Expand Down
27 changes: 14 additions & 13 deletions xonsh/environ.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,20 +967,21 @@ class GeneralSetting(Xettings):
)
XONSH_CAPTURE_ALWAYS = Var.with_default(
False,
"Try to capture output of commands run without explicit capturing.\n"
"If True, xonsh will capture the output of commands run directly or in ``![]``"
"to the session history.\n"
"Setting to True has the following disadvantages:\n"
"* Some interactive commands won't work properly (like when ``git`` invokes an interactive editor).\n"
" For more information see discussion at https://github.com/xonsh/xonsh/issues/3672.\n"
"* Stopping these commands with ^Z (i.e. ``SIGTSTP``)\n"
" is disabled as it causes deadlocked terminals.\n"
" ``SIGTSTP`` may still be issued and only the physical pressing\n"
" of ``Ctrl+Z`` is ignored.\n\n"
"Regardless of this value, commands run in ``$()``, ``!()`` or with an IO redirection (``>`` or ``|``) "
"will always be captured.\n"
"Setting this to True depends on ``$THREAD_SUBPROCS`` being True.",
"""\
Try to capture output of commands run without explicit capturing.
If True, xonsh will capture the output of commands run directly or in ``![]``to the session history.
Setting to True has the following disadvantages:
* Some interactive commands won't work properly (like when ``git`` invokes an interactive editor).
For more information see discussion at https://github.com/xonsh/xonsh/issues/3672.
* Stopping these commands with ^Z (i.e. ``SIGTSTP``)
is disabled as it causes deadlocked terminals.
``SIGTSTP`` may still be issued and only the physical pressing
of ``Ctrl+Z`` is ignored.
Regardless of this value, commands run in ``$()``, ``!()``
or with an IO redirection (``>`` or ``|``) will always be captured.
""",
)
# todo: remove THread_subprocs completely and use asyncio.read_pipe
THREAD_SUBPROCS = Var(
is_bool_or_none,
to_bool_or_none,
Expand Down
1 change: 0 additions & 1 deletion xonsh/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ def wait_for_active_job(last_task=None, backgrounded=False, return_error=False):
if active_task is None:
return last_task
obj = active_task["obj"]
backgrounded = False
try:
_, wcode = os.waitpid(obj.pid, os.WUNTRACED)
except ChildProcessError as e: # No child processes
Expand Down
95 changes: 95 additions & 0 deletions xonsh/procs/async_proc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import asyncio
import os

import xonsh.lazyimps as xli
import xonsh.platform as xp
from xonsh.procs.utils import _safe_pipe_properties, safe_open


class StreamReader(asyncio.Protocol):
"""much like StreamReader and StreamProtocol merged"""

def __init__(self, loop: "asyncio.AbstractEventLoop | None" = None):
self._buffer = bytearray()
self.loop = loop or asyncio.get_event_loop_policy().get_event_loop()
self.exited = asyncio.Future(loop=loop)
self._eof = False

def connection_made(self, transport):
self.transport = transport
super().connection_made(transport)

def data_received(self, data):
self._buffer.extend(data)
super().data_received(data)

def connection_lost(self, exc):
super().connection_lost(exc)
self.exited.set_result(True)
self._eof = True

async def _start(self, pipe):
await self.loop.connect_read_pipe(lambda: self, pipe)

def start(self, pipe):
self.loop.run_until_complete(self._start(pipe))

async def _read(self, timeout=0.0):
"""send data received so far. Only blocks when data is yet to be arrived for the maximum timeout"""
waited = False
while True:
data = bytes(self._buffer)
if data:
self._buffer.clear()
break

if self._eof or waited:
break

# wait for sometime to data to be received
await asyncio.sleep(timeout)
waited = True
return data

def read(self, timeout=0.0):
return self.loop.run_until_complete(self._read(timeout))

async def _wait(self):
"""block until the proc is exited"""
await self.exited

def wait(self):
self.loop.run_until_complete(self.exited)


class StreamHandler:
def __init__(self, capture=False, tee=False, use_tty=False) -> None:
self.capture = capture
self.tee = tee

if tee and use_tty:
# it is two-way
parent, child = xli.pty.openpty()
_safe_pipe_properties(child, use_tty=use_tty)
_safe_pipe_properties(parent, use_tty=use_tty)
elif xp.ON_WINDOWS:
# windows proactorEventloop needs named pipe
from asyncio.windows_utils import pipe

parent, child = pipe()
else:
# one-way pipe
parent, child = os.pipe()

self.write_bin = safe_open(
child,
"wb",
)
read_bin = safe_open(parent, "rb")

# start async reading
self.reader = StreamReader()
self.reader.start(read_bin)

def close(self):
self.write_bin.close()