Skip to content

Commit

Permalink
Read stop signals from the process and update the process state. (#5361)
Browse files Browse the repository at this point in the history
Reading stop signals from the process and update the process state.

### The issue

Technically. In a couple of places that critical for processing signals
we have `os.waitpid()`. The function behavior is pretty unobvious and
one of things is processing return code after catching the signal. We
had no good signal processing around this and this PR fixes this. See
also `proc_untraced_waitpid` function description.

From user perspective. For example we have process that is waiting for
user input from terminal e.g. `python -c "input()"` or `fzf`. If this
process will be in captured pipeline e.g. `!(echo 1 | fzf | head)` it
will be suspended by OS and the pipeline will be in the endless loop
with future crashing and corrupting std at the end. This PR fixes this.

### The solution

Technically. The key function is `proc_untraced_waitpid` - it catches
the stop signals and updates the process state.

From user perspective. First of all we expect that users will use
captured object `!()` only for capturable processes. Because of it our
goal here is to just make the behavior in this case stable.
In this PR we detect that process in the pipeline is suspended and we
need to finish the command pipeline carefully:
* Show the message about suspended process.
* Keep suspended process in `jobs`. The same behavior we can see in
bash. This is good because we don't know what process suspended and why.
May be experienced user will want to continue it manually.
* Finish the CommandPipeline with returncode=None and suspended=True.

### Before

```xsh
!(fzf) # or !(python -c "input()")
# Hanging / Exceptions / OSError / No way to end the command.
# After exception:
$(echo 1)
# OSError / IO error
```

### After

```xsh
!(fzf) # or `!(ls | fzf | head)` or `!(python -c "input()")`
# Process ['fzf'] with pid 60000 suspended with signal 22 SIGTTOU and stay in `jobs`.
# This happends when process start waiting for input but there is no terminal attached in captured mode.
# CommandPipeline(returncode=None, suspended=True, ...)

$(echo 1)
# Success.
```
Closes #4752 #4577

### Notes

* There is pretty edge case situation when the process was terminated so
fast that we can't catch pid alive and check signal
([src](https://github.com/xonsh/xonsh/blob/67d672783db6397bdec7ae44a9d9727b1e20a772/xonsh/jobs.py#L71-L80)).
I leave it as is for now.

### Mentions

#2159

## For community
⬇️ **Please click the 👍 reaction instead of leaving a `+1` or 👍
comment**

---------

Co-authored-by: a <1@1.1>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Gil Forsyth <gforsyth@users.noreply.github.com>
  • Loading branch information
4 people committed May 22, 2024
1 parent 635d783 commit 0f25a5a
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 50 deletions.
23 changes: 23 additions & 0 deletions news/fix_interactive_suspended_subproc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
**Added:**

* Reading stop signals from the process and update the process state (#5361).

**Changed:**

* <news item>

**Deprecated:**

* <news item>

**Removed:**

* <news item>

**Fixed:**

* <news item>

**Security:**

* <news item>
37 changes: 34 additions & 3 deletions tests/procs/test_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@
from xonsh.tools import XonshError


def cmd_sig(sig):
return [
"python",
"-c",
f"import os, signal; os.kill(os.getpid(), signal.{sig})",
]


@skip_if_on_windows
def test_cmds_to_specs_thread_subproc(xession):
env = xession.env
Expand Down Expand Up @@ -141,15 +149,37 @@ def test_capture_always(


@skip_if_on_windows
@pytest.mark.flaky(reruns=3, reruns_delay=1)
def test_interrupted_process_returncode(xonsh_session):
@pytest.mark.parametrize("captured", ["stdout", "object"])
@pytest.mark.parametrize("interactive", [True, False])
def test_interrupted_process_returncode(xonsh_session, captured, interactive):
xonsh_session.env["XONSH_INTERACTIVE"] = interactive
xonsh_session.env["RAISE_SUBPROC_ERROR"] = False
cmd = [["python", "-c", "import os, signal; os.kill(os.getpid(), signal.SIGINT)"]]
cmd = [cmd_sig("SIGINT")]
specs = cmds_to_specs(cmd, captured="stdout")
(p := _run_command_pipeline(specs, cmd)).end()
assert p.proc.returncode == -signal.SIGINT


@skip_if_on_windows
@pytest.mark.parametrize(
"suspended_pipeline",
[
[cmd_sig("SIGTTIN")],
[["echo", "1"], "|", cmd_sig("SIGTTIN")],
[["echo", "1"], "|", cmd_sig("SIGTTIN"), "|", ["head"]],
],
)
def test_specs_with_suspended_captured_process_pipeline(
xonsh_session, suspended_pipeline
):
xonsh_session.env["XONSH_INTERACTIVE"] = True
specs = cmds_to_specs(suspended_pipeline, captured="object")
p = _run_command_pipeline(specs, suspended_pipeline)
p.proc.send_signal(signal.SIGCONT)
p.end()
assert p.suspended


@skip_if_on_windows
@pytest.mark.parametrize(
"cmds, exp_stream_lines, exp_list_lines",
Expand All @@ -162,6 +192,7 @@ def test_interrupted_process_returncode(xonsh_session):
([["echo", "-n", "1\n2 3"]], "1\n2 3", ["1", "2 3"]),
],
)
@pytest.mark.flaky(reruns=3, reruns_delay=1)
def test_subproc_output_format(cmds, exp_stream_lines, exp_list_lines, xonsh_session):
xonsh_session.env["XONSH_SUBPROC_OUTPUT_FORMAT"] = "stream_lines"
output = run_subproc(cmds, "stdout")
Expand Down
21 changes: 15 additions & 6 deletions tests/test_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1239,17 +1239,26 @@ def test_catching_system_exit():
out, err, ret = run_xonsh(
cmd=None, stdin_cmd=stdin_cmd, interactive=True, single_command=False, timeout=3
)
if ON_WINDOWS:
assert ret == 1
else:
assert ret == 2
assert ret > 0


@skip_if_on_windows
@pytest.mark.flaky(reruns=3, reruns_delay=1)
def test_catching_exit_signal():
stdin_cmd = "kill -SIGHUP @(__import__('os').getpid())\n"
stdin_cmd = "sleep 0.2; kill -SIGHUP @(__import__('os').getpid())\n"
out, err, ret = run_xonsh(
cmd=None, stdin_cmd=stdin_cmd, interactive=True, single_command=False, timeout=3
)
assert ret > 0


@skip_if_on_windows
def test_suspended_captured_process_pipeline():
"""See also test_specs.py:test_specs_with_suspended_captured_process_pipeline"""
stdin_cmd = "!(python -c 'import os, signal, time; time.sleep(0.2); os.kill(os.getpid(), signal.SIGTTIN)')\n"
out, err, ret = run_xonsh(
cmd=None, stdin_cmd=stdin_cmd, interactive=True, single_command=False, timeout=5
)
match = ".*suspended=True.*"
assert re.match(
match, out, re.MULTILINE | re.DOTALL
), f"\nFailed:\n```\n{stdin_cmd.strip()}\n```,\nresult: {out!r}\nexpected: {match!r}."
134 changes: 100 additions & 34 deletions xonsh/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from xonsh.completers.tools import RichCompletion
from xonsh.lazyasd import LazyObject
from xonsh.platform import FD_STDERR, LIBC, ON_CYGWIN, ON_DARWIN, ON_MSYS, ON_WINDOWS
from xonsh.tools import on_main_thread, unthreadable
from xonsh.tools import get_signal_name, on_main_thread, unthreadable

# Track time stamp of last exit command, so that two consecutive attempts to
# exit can kill all jobs and exit.
Expand All @@ -39,18 +39,84 @@
_tasks_main: collections.deque[int] = collections.deque()


def waitpid(pid, opt):
def proc_untraced_waitpid(proc, hang, task=None, raise_child_process_error=False):
"""
Transparent wrapper on `os.waitpid` to make notes about undocumented subprocess behavior.
Read a stop signals from the process and update the process state.
Return code
===========
Basically ``p = subprocess.Popen()`` populates ``p.returncode`` after ``p.wait()``, ``p.poll()``
or ``p.communicate()`` (https://docs.python.org/3/library/os.html#os.waitpid).
But if you're using `os.waitpid()` BEFORE these functions you're capturing return code
from a signal subsystem and ``p.returncode`` will be ``0``.
After ``os.waitid`` call you need to set return code manually
``p.returncode = -os.WTERMSIG(status)`` like in Popen.
After ``os.waitid`` call you need to set return code and process signal manually.
See also ``xonsh.tools.describe_waitpid_status()``.
Signals
=======
The command that is waiting for input can be suspended by OS in case there is no terminal attached
because without terminal command will never end. Read more about SIGTTOU and SIGTTIN signals:
* https://www.linusakesson.net/programming/tty/
* http://curiousthing.org/sigttin-sigttou-deep-dive-linux
* https://www.gnu.org/software/libc/manual/html_node/Job-Control-Signals.html
"""
return os.waitpid(pid, opt)

info = {"backgrounded": False, "signal": None, "signal_name": None}

if ON_WINDOWS:
return info

if proc is not None and getattr(proc, "pid", None) is None:
"""
When the process stopped before os.waitpid it has no pid.
Note that in this case there is high probability
that we will have return code 0 instead of real return code.
"""
if raise_child_process_error:
raise ChildProcessError("Process Identifier (PID) not found.")
else:
return info

try:
"""
The WUNTRACED flag indicates that the caller wishes to wait for stopped or terminated
child processes, but doesn't want to return information about them. A stopped process is one
that has been suspended and is waiting to be resumed or terminated.
"""
opt = os.WUNTRACED if hang else (os.WUNTRACED | os.WNOHANG)
wpid, wcode = os.waitpid(proc.pid, opt)
except ChildProcessError:
wpid, wcode = 0, 0
if raise_child_process_error:
raise

if wpid == 0:
# Process has no changes in state.
pass

elif os.WIFSTOPPED(wcode):
if task is not None:
task["status"] = "stopped"
info["backgrounded"] = True
proc.signal = (os.WSTOPSIG(wcode), os.WCOREDUMP(wcode))
info["signal"] = os.WSTOPSIG(wcode)
proc.suspended = True

elif os.WIFSIGNALED(wcode):
print() # get a newline because ^C will have been printed
proc.signal = (os.WTERMSIG(wcode), os.WCOREDUMP(wcode))
proc.returncode = -os.WTERMSIG(wcode) # Popen default.
info["signal"] = os.WTERMSIG(wcode)

else:
proc.returncode = os.WEXITSTATUS(wcode)
proc.signal = None
info["signal"] = None

info["signal_name"] = f'{info["signal"]} {get_signal_name(info["signal"])}'.strip()
return info


@contextlib.contextmanager
Expand Down Expand Up @@ -144,7 +210,7 @@ def _continue(job):

def _kill(job):
subprocess.check_output(
["taskkill", "/F", "/T", "/PID", str(job["obj"].pid)],
["taskkill", "/F", "/T", "/PID", str(job["proc"].pid)],
stderr=subprocess.STDOUT,
)

Expand All @@ -165,11 +231,11 @@ def wait_for_active_job(last_task=None, backgrounded=False, return_error=False):
# Return when there are no foreground active task
if active_task is None:
return last_task
obj = active_task["obj"]
proc = active_task["proc"]
_continue(active_task)
while obj.returncode is None:
while proc.returncode is None:
try:
obj.wait(0.01)
proc.wait(0.01)
except subprocess.TimeoutExpired:
pass
except KeyboardInterrupt:
Expand Down Expand Up @@ -278,31 +344,23 @@ def wait_for_active_job(last_task=None, backgrounded=False, return_error=False):
# Return when there are no foreground active task
if active_task is None:
return last_task
thread = active_task["obj"]
backgrounded = False
proc = active_task["proc"]
info = {"backgrounded": False}

try:
if thread.pid is None:
# When the process stopped before os.waitpid it has no pid.
raise ChildProcessError("The process PID not found.")
_, wcode = waitpid(thread.pid, os.WUNTRACED)
except ChildProcessError as e: # No child processes
info = proc_untraced_waitpid(
proc, hang=True, task=active_task, raise_child_process_error=True
)
except ChildProcessError as e:
if return_error:
return e
else:
return _safe_wait_for_active_job(
last_task=active_task, backgrounded=backgrounded
last_task=active_task, backgrounded=info["backgrounded"]
)
if os.WIFSTOPPED(wcode):
active_task["status"] = "stopped"
backgrounded = True
elif os.WIFSIGNALED(wcode):
print() # get a newline because ^C will have been printed
thread.signal = (os.WTERMSIG(wcode), os.WCOREDUMP(wcode))
thread.returncode = -os.WTERMSIG(wcode) # Default Popen
else:
thread.returncode = os.WEXITSTATUS(wcode)
thread.signal = None
return wait_for_active_job(last_task=active_task, backgrounded=backgrounded)
return wait_for_active_job(
last_task=active_task, backgrounded=info["backgrounded"]
)


def _safe_wait_for_active_job(last_task=None, backgrounded=False):
Expand Down Expand Up @@ -344,8 +402,8 @@ def _clear_dead_jobs():
to_remove = set()
tasks = get_tasks()
for tid in tasks:
obj = get_task(tid)["obj"]
if obj is None or obj.poll() is not None:
proc = get_task(tid)["proc"]
if proc is None or proc.poll() is not None:
to_remove.add(tid)
for job in to_remove:
tasks.remove(job)
Expand All @@ -364,13 +422,13 @@ def format_job_string(num: int, format="dict") -> str:
"cmd": " ".join(
[" ".join(i) if isinstance(i, list) else i for i in job["cmds"]]
),
"pid": int(job["pids"][-1]) if job["pids"] else None,
"pids": job["pids"] if "pids" in job else None,
}

if format == "posix":
r["pos"] = "+" if tasks[0] == num else "-" if tasks[1] == num else " "
r["bg"] = " &" if job["bg"] else ""
r["pid"] = f"({r['pid']})" if r["pid"] else ""
r["pid"] = f"({','.join(str(pid) for pid in r['pids'])})" if r["pids"] else ""
return "[{num}]{pos} {status}: {cmd}{bg} {pid}".format(**r)
else:
return repr(r)
Expand All @@ -396,13 +454,21 @@ def add_job(info):
"""Add a new job to the jobs dictionary."""
num = get_next_job_number()
info["started"] = time.time()
info["status"] = "running"
info["status"] = info["status"] if "status" in info else "running"
get_tasks().appendleft(num)
get_jobs()[num] = info
if info["bg"] and XSH.env.get("XONSH_INTERACTIVE"):
print_one_job(num)


def update_job_attr(pid, name, value):
"""Update job attribute."""
jobs = get_jobs()
for num, job in get_jobs().items():
if "pids" in job and pid in job["pids"]:
jobs[num][name] = value


def clean_jobs():
"""Clean up jobs for exiting shell
Expand Down
Loading

0 comments on commit 0f25a5a

Please sign in to comment.