Skip to content

Commit

Permalink
feat: add AsyncProc class
Browse files Browse the repository at this point in the history
  • Loading branch information
jnoortheen committed Jan 10, 2022
1 parent 343ea33 commit 8df1c3c
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 33 deletions.
42 changes: 42 additions & 0 deletions tests/procs/test_async_proc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import contextlib

import pytest

from xonsh.procs import async_proc as ap
import sys

from xonsh.procs.specs import run_subproc


def test_ls(xession):
proc = ap.AsyncProc(["ls"], stdout=sys.stdout, stderr=sys.stderr)
assert proc.proc.pid


@pytest.fixture
def run_proc(tmp_path):
def factory(cmds: "list[str]", captured):
out_file = tmp_path / "stdout"
with out_file.open("wb") as fw:
with contextlib.redirect_stdout(fw):
return_val = run_subproc([cmds], captured)
return return_val, out_file.read_text()

return factory


@pytest.mark.parametrize(
"captured,exp_out,exp_rtn",
[
pytest.param(False, "hello", None, id="$[]"),
pytest.param("stdout", "", "hello", id="$()"),
],
)
def test_run_subproc(xession, run_proc, captured, exp_out, exp_rtn):
xession.env["XONSH_SHOW_TRACEBACK"] = True
cmds = ["echo", "hello"]

rtn, out = run_proc(cmds, captured)

assert rtn == exp_rtn
assert out.strip() == exp_out
3 changes: 0 additions & 3 deletions xonsh/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def _send_signal(job, signal):
except ProcessLookupError:
pass


elif ON_WINDOWS:
pass
elif ON_CYGWIN or ON_MSYS:
Expand All @@ -52,7 +51,6 @@ def _send_signal(job, signal):
except Exception:
pass


else:

def _send_signal(job, signal):
Expand Down Expand Up @@ -107,7 +105,6 @@ def wait_for_active_job(last_task=None, backgrounded=False, return_error=False):
pass # ignore error if process closed before we got here
return wait_for_active_job(last_task=active_task)


else:

def _continue(job):
Expand Down
74 changes: 74 additions & 0 deletions xonsh/procs/async_proc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import asyncio
import asyncio.subprocess as asp
import sys


class SubProcStreamProtocol(asp.SubprocessStreamProtocol):
"""store writes to streams"""


class XonshProcBase:
"""attributes and methods that are expected from a xonshProc implementation"""

@property
def pid(self):
raise NotImplementedError


class AsyncProc(XonshProcBase):
def __init__(
self,
args: "list[str]",
loop=None,
universal_newlines=False,
stdin=None,
stdout=None,
stderr=None,
**kwargs,
):
# todo: check if unbuffered read work all the time
kwargs["bufsize"] = 0

self.is_text = universal_newlines
if loop is None:
loop = asyncio.get_event_loop_policy().get_event_loop()
self.loop = loop
self.stdin = stdin
self.stdout = sys.stdout if stdout is None else stdout
self.stderr = sys.stdout if stderr is None else stderr

self.proc: asp.Process = self.loop.run_until_complete(
self.get_proc(*args, **kwargs)
)

async def get_proc(
self,
program: str,
*args,
limit=2 ** 16,
**kwargs,
):
"""wrap ``create_subprocess_exec`` call"""
protocol_factory = lambda: SubProcStreamProtocol(limit=limit, loop=self.loop)
transport, protocol = await self.loop.subprocess_exec(
protocol_factory,
program,
*args,
stdin=self.stdin,
stdout=self.stdout,
stderr=self.stderr,
**kwargs,
)
return asp.Process(transport, protocol, self.loop)

@property
def pid(self):
return self.proc.pid

def wait(self):
if self.proc:
self.loop.run_until_complete(self.proc.wait())

@property
def returncode(self):
return self.proc.returncode
16 changes: 9 additions & 7 deletions xonsh/procs/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,13 +456,15 @@ def _end(self, tee_output):
"""Waits for the command to complete and then runs any closing and
cleanup procedures that need to be run.
"""
if tee_output:
for _ in self.tee_stdout():
pass
# if tee_output:
# for _ in self.tee_stdout():
# pass
if self.proc:
self.proc.wait()
self._endtime()
# since we are driven by getting output, input may not be available
# until the command has completed.
self._set_input()
# self._set_input()
self._close_prev_procs()
self._close_proc()
self._check_signal()
Expand Down Expand Up @@ -542,9 +544,9 @@ def _close_proc(self):
self._safe_close(s.captured_stderr)
if p is None:
return
self._safe_close(p.stdin)
self._safe_close(p.stdout)
self._safe_close(p.stderr)
# self._safe_close(p.stdin)
# self._safe_close(p.stdout)
# self._safe_close(p.stderr)

def _set_input(self):
"""Sets the input variable."""
Expand Down
53 changes: 30 additions & 23 deletions xonsh/procs/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pathlib
import subprocess
import contextlib
import typing as tp

from xonsh.built_ins import XSH
import xonsh.tools as xt
Expand All @@ -18,9 +19,9 @@
import xonsh.environ as xenv
import xonsh.lazyimps as xli
import xonsh.jobs as xj
from xonsh.procs.async_proc import AsyncProc

from xonsh.procs.readers import ConsoleParallelReader
from xonsh.procs.posix import PopenThread
from xonsh.procs.proxies import ProcProxy, ProcProxyThread
from xonsh.procs.pipelines import (
pause_call_resume,
Expand All @@ -29,6 +30,14 @@
STDOUT_CAPTURE_KINDS,
)

if tp.TYPE_CHECKING:
CapturedValue = tp.Literal[
"stdout", # $() - stdout capture
"object", # !() - full capture
"hiddenobject", # ![] - nocapture
False, # $[] - nocapture
]


@xl.lazyobject
def RE_SHEBANG():
Expand Down Expand Up @@ -283,7 +292,7 @@ def __init__(
stderr=None,
universal_newlines=False,
close_fds=False,
captured=False,
captured: "CapturedValue" = False,
env=None,
):
"""
Expand Down Expand Up @@ -349,7 +358,7 @@ def __init__(
self.close_fds = close_fds
self.captured = captured
if env is not None:
self.env = {
self.env: "None|dict" = {
k: v if not (isinstance(v, list)) or len(v) > 1 else v[0]
for (k, v) in env.items()
}
Expand Down Expand Up @@ -471,7 +480,7 @@ def _run_binary(self, kwargs):
cmd = [self.binary_loc] + self.cmd[1:]
else:
cmd = self.cmd
p = self.cls(cmd, bufsize=bufsize, **kwargs)
p = AsyncProc(cmd, bufsize=bufsize, **kwargs)
except PermissionError:
e = "xonsh: subprocess mode: permission denied: {0}"
raise xt.XonshError(e.format(self.cmd[0]))
Expand Down Expand Up @@ -731,30 +740,28 @@ def _safe_pipe_properties(fd, use_tty=False):


def _update_last_spec(last):
env = XSH.env
# env = XSH.env
captured = last.captured
last.last_in_pipeline = True
if not captured:
return
callable_alias = callable(last.alias)
if callable_alias:
pass
else:
cmds_cache = XSH.commands_cache
thable = (
env.get("THREAD_SUBPROCS")
and (captured != "hiddenobject" or env.get("XONSH_CAPTURE_ALWAYS"))
and cmds_cache.predict_threadable(last.args)
and cmds_cache.predict_threadable(last.cmd)
)
if captured and thable:
last.cls = PopenThread
elif not thable:
# foreground processes should use Popen
last.threadable = False
if captured == "object" or captured == "hiddenobject":
# CommandPipeline objects should not pipe stdout, stderr
return
# if not callable_alias:
# cmds_cache = XSH.commands_cache
# thable = (
# env.get("THREAD_SUBPROCS")
# and (captured != "hiddenobject" or env.get("XONSH_CAPTURE_ALWAYS"))
# and cmds_cache.predict_threadable(last.args)
# and cmds_cache.predict_threadable(last.cmd)
# )
# if captured and thable:
# last.cls = PopenThread
# elif not thable:
# # foreground processes should use Popen
# last.threadable = False
# if captured == "object" or captured == "hiddenobject":
# # CommandPipeline objects should not pipe stdout, stderr
# return
# cannot used PTY pipes for aliases, for some dark reason,
# and must use normal pipes instead.
use_tty = xp.ON_POSIX and not callable_alias
Expand Down

0 comments on commit 8df1c3c

Please sign in to comment.