Skip to content

Commit

Permalink
[pantsd] Repair console interactivity in pantsd runs. (#5352)
Browse files Browse the repository at this point in the history
Problem
Currently, in the case of pantsd-based runs, because we marshall stdio across a socket via the nailgun protocol we have no way for subprocesses to indicate tty settings like they normally would when directly connected to a tty device.

This causes interactive things like ./pants repl (which rely on libraries like jline/readline) to behave strangely - not responding to up/down/left/right/^U/^K/^D, echoing double lines or double chars, etc. The same case is present for ./pants run of interactive programs. These are all things normally controlled via the tty.

Solution
Send the fully qualified path to the tty via the /dev filesystem as nailgun environment variables. If we detect the thin client as connected to the same tty on all 3 stdio descriptors, we'll now directly open the TTY in the pantsd-runner process and redirect stdio to it. This is inherited by subprocesses, too.

In order to ensure subprocesses potentially connected to the thin clients tty don't outlive the pants run, we now also send SIGINT on ^C/^\ to the runners entire process group to ensure any subprocesses also receive the signal (they may still ignore it tho, in some cases - see below).

Additionally, in the case of the python repl (which ignores ^C by default) - we now wrap that with a new signal_handler_as contextmanager to ignore ^C in the pants-side process to mimic the correct behavior of a vanilla python repl (which is to log a handled KeyboardInterrupt).

I've also ported STTYSettings to use of tcgetattr/tcsetattr (to avoid subprocess invokes of stty) and wrapped the thin client invoke in that.

Result
Both the python and scala repl cases behave as expected in the daemon.

Fixes #5174
Fixes #5058
  • Loading branch information
kwlzn authored and stuhood committed Jan 27, 2018
1 parent 50bf568 commit afe3edd
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 68 deletions.
15 changes: 11 additions & 4 deletions src/python/pants/backend/python/tasks2/python_repl.py
Expand Up @@ -6,13 +6,15 @@
unicode_literals, with_statement)

import os
import signal

from pex.pex_info import PexInfo

from pants.backend.python.targets.python_requirement_library import PythonRequirementLibrary
from pants.backend.python.targets.python_target import PythonTarget
from pants.backend.python.tasks2.python_execution_task_base import PythonExecutionTaskBase
from pants.task.repl_task_mixin import ReplTaskMixin
from pants.util.contextutil import signal_handler_as


class PythonRepl(ReplTaskMixin, PythonExecutionTaskBase):
Expand Down Expand Up @@ -49,8 +51,13 @@ def setup_repl_session(self, targets):
pex_info.entry_point = entry_point
return self.create_pex(pex_info)

# NB: **pex_run_kwargs is used by tests only.
# N.B. **pex_run_kwargs is used by tests only.
def launch_repl(self, pex, **pex_run_kwargs):
env = pex_run_kwargs.pop('env', os.environ).copy()
po = pex.run(blocking=False, env=env, **pex_run_kwargs)
po.wait()
# While the repl subprocess is synchronously spawned, we rely on process group
# signalling for a SIGINT to reach the repl subprocess directly - and want to
# do nothing in response on the parent side.
def ignore_control_c(signum, frame): pass

with signal_handler_as(signal.SIGINT, ignore_control_c):
env = pex_run_kwargs.pop('env', os.environ).copy()
pex.run(env=env, **pex_run_kwargs)
72 changes: 45 additions & 27 deletions src/python/pants/bin/daemon_pants_runner.py
Expand Up @@ -9,6 +9,7 @@
import os
import signal
import sys
import termios
import time
from contextlib import contextmanager

Expand Down Expand Up @@ -110,31 +111,47 @@ def _nailgunned_stdio(self, sock):
# Determine output tty capabilities from the environment.
stdin_isatty, stdout_isatty, stderr_isatty = NailgunProtocol.isatty_from_env(self._env)

# Launch a thread to read stdin data from the socket (the only messages expected from the client
# for the remainder of the protocol), and threads to copy from stdout/stderr pipes onto the
# socket.
with NailgunStreamWriter.open_multi(
sock,
(ChunkType.STDOUT, ChunkType.STDERR),
None,
(stdout_isatty, stderr_isatty)
) as ((stdout_fd, stderr_fd), writer),\
NailgunStreamStdinReader.open(sock, stdin_isatty) as stdin_fd,\
stdio_as(stdout_fd=stdout_fd, stderr_fd=stderr_fd, stdin_fd=stdin_fd):
# N.B. This will be passed to and called by the `DaemonExiter` prior to sending an
# exit chunk, to avoid any socket shutdown vs write races.
stdout, stderr = sys.stdout, sys.stderr
def finalizer():
try:
stdout.flush()
stderr.flush()
finally:
time.sleep(.001) # HACK: Sleep 1ms in the main thread to free the GIL.
writer.stop()
writer.join()
stdout.close()
stderr.close()
yield finalizer
# If all stdio is a tty, there's only one logical I/O device (the tty device). This happens to
# be addressable as a file in OSX and Linux, so we take advantage of that and directly open the
# character device for output redirection - eliminating the need to directly marshall any
# interactive stdio back/forth across the socket and permitting full, correct tty control with
# no middle-man.
if all((stdin_isatty, stdout_isatty, stderr_isatty)):
stdin_ttyname, stdout_ttyname, stderr_ttyname = NailgunProtocol.ttynames_from_env(self._env)
assert stdin_ttyname == stdout_ttyname == stderr_ttyname, (
'expected all stdio ttys to be the same, but instead got: {}\n'
'please file a bug at http://github.com/pantsbuild/pants'
.format([stdin_ttyname, stdout_ttyname, stderr_ttyname])
)
with open(stdin_ttyname, 'rb+wb', 0) as tty:
tty_fileno = tty.fileno()
with stdio_as(stdin_fd=tty_fileno, stdout_fd=tty_fileno, stderr_fd=tty_fileno):
def finalizer():
termios.tcdrain(tty_fileno)
yield finalizer
else:
stdio_writers = (
(ChunkType.STDOUT, stdout_isatty),
(ChunkType.STDERR, stderr_isatty)
)
types, ttys = zip(*(stdio_writers))
with NailgunStreamStdinReader.open(sock, stdin_isatty) as stdin_fd,\
NailgunStreamWriter.open_multi(sock, types, ttys) as ((stdout_fd, stderr_fd), writer),\
stdio_as(stdout_fd=stdout_fd, stderr_fd=stderr_fd, stdin_fd=stdin_fd):
# N.B. This will be passed to and called by the `DaemonExiter` prior to sending an
# exit chunk, to avoid any socket shutdown vs write races.
stdout, stderr = sys.stdout, sys.stderr
def finalizer():
try:
stdout.flush()
stderr.flush()
finally:
time.sleep(.001) # HACK: Sleep 1ms in the main thread to free the GIL.
writer.stop()
writer.join()
stdout.close()
stderr.close()
yield finalizer

def _setup_sigint_handler(self):
"""Sets up a control-c signal handler for the daemon runner context."""
Expand Down Expand Up @@ -185,8 +202,9 @@ def post_fork_child(self):
# Set context in the process title.
set_process_title('pantsd-runner [{}]'.format(' '.join(self._args)))

# Broadcast our pid to the remote client so they can send us signals (i.e. SIGINT).
NailgunProtocol.send_pid(self._socket, bytes(os.getpid()))
# Broadcast our process group ID (in PID form - i.e. negated) to the remote client so
# they can send signals (e.g. SIGINT) to all processes in the runners process group.
NailgunProtocol.send_pid(self._socket, bytes(os.getpgrp() * -1))

# Setup a SIGINT signal handler.
self._setup_sigint_handler()
Expand Down
3 changes: 2 additions & 1 deletion src/python/pants/bin/remote_pants_runner.py
Expand Up @@ -10,6 +10,7 @@
import sys
from contextlib import contextmanager

from pants.console.stty_utils import STTYSettings
from pants.java.nailgun_client import NailgunClient
from pants.java.nailgun_protocol import NailgunProtocol
from pants.pantsd.pants_daemon import PantsDaemon
Expand Down Expand Up @@ -102,7 +103,7 @@ def _connect_and_execute(self, port):
err=self._stderr,
exit_on_broken_pipe=True)

with self._trapped_signals(client):
with self._trapped_signals(client), STTYSettings.preserved():
# Execute the command on the pailgun.
result = client.execute(self.PANTS_COMMAND, *self._args, **modified_env)

Expand Down
51 changes: 30 additions & 21 deletions src/python/pants/console/stty_utils.py
Expand Up @@ -5,33 +5,42 @@
from __future__ import (absolute_import, division, generators, nested_scopes, print_function,
unicode_literals, with_statement)

import logging
import sys
import termios
from contextlib import contextmanager

from pants.util.process_handler import subprocess


@contextmanager
def preserve_stty_settings():
"""Run potentially stty-modifying operations, e.g., REPL execution, in this contextmanager."""
stty_settings = STTYSettings()
stty_settings.save_stty_options()
yield
stty_settings.restore_ssty_options()
logger = logging.getLogger(__name__)


class STTYSettings(object):
"""Saves/restores stty settings, e.g., during REPL execution."""

def __init__(self):
self._stty_options = None

def save_stty_options(self):
self._stty_options = self._run_cmd('stty -g 2>/dev/null')
@classmethod
@contextmanager
def preserved(cls):
"""Run potentially stty-modifying operations, e.g., REPL execution, in this contextmanager."""
inst = cls()
inst.save_tty_flags()
try:
yield
finally:
inst.restore_tty_flags()

def restore_ssty_options(self):
self._run_cmd('stty ' + self._stty_options)

def _run_cmd(self, cmd):
po = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
stdout, _ = po.communicate()
return stdout
def __init__(self):
self._tty_flags = None

def save_tty_flags(self):
# N.B. `stty(1)` operates against stdin.
try:
self._tty_flags = termios.tcgetattr(sys.stdin.fileno())
except termios.error as e:
logger.debug('masking tcgetattr exception: {!r}'.format(e))

def restore_tty_flags(self):
if self._tty_flags:
try:
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, self._tty_flags)
except termios.error as e:
logger.debug('masking tcsetattr exception: {!r}'.format(e))
16 changes: 12 additions & 4 deletions src/python/pants/java/nailgun_io.py
Expand Up @@ -85,6 +85,8 @@ def open(cls, sock, isatty=False):
with _pipe(isatty) as (read_fd, write_fd):
reader = NailgunStreamStdinReader(sock, os.fdopen(write_fd, 'wb'))
with reader.running():
# Instruct the thin client to begin reading and sending stdin.
NailgunProtocol.send_start_reading_input(sock)
yield read_fd

def run(self):
Expand Down Expand Up @@ -140,14 +142,20 @@ def _assert_aligned(self, *iterables):

@classmethod
@contextmanager
def open(cls, sock, chunk_type, chunk_eof_type, isatty, buf_size=None, select_timeout=None):
"""Yields the write side of a pipe that will copy appropriately chunked values to the socket."""
with cls.open_multi(sock, (chunk_type,), chunk_eof_type, isattys=(isatty,)) as ctx:
def open(cls, sock, chunk_type, isatty, chunk_eof_type=None, buf_size=None, select_timeout=None):
"""Yields the write side of a pipe that will copy appropriately chunked values to a socket."""
with cls.open_multi(sock,
(chunk_type,),
(isatty,),
chunk_eof_type,
buf_size,
select_timeout) as ctx:
yield ctx

@classmethod
@contextmanager
def open_multi(cls, sock, chunk_types, chunk_eof_type, isattys, buf_size=None, select_timeout=None):
def open_multi(cls, sock, chunk_types, isattys, chunk_eof_type=None, buf_size=None,
select_timeout=None):
"""Yields the write sides of pipes that will copy appropriately chunked values to the socket."""
cls._assert_aligned(chunk_types, isattys)

Expand Down
29 changes: 24 additions & 5 deletions src/python/pants/java/nailgun_protocol.py
Expand Up @@ -5,11 +5,15 @@
from __future__ import (absolute_import, division, generators, nested_scopes, print_function,
unicode_literals, with_statement)

import os
import struct

import six


STDIO_DESCRIPTORS = (0, 1, 2)


class ChunkType(object):
"""Nailgun protocol chunk types.
Expand Down Expand Up @@ -63,6 +67,7 @@ class NailgunProtocol(object):

ENVIRON_SEP = '='
TTY_ENV_TMPL = 'NAILGUN_TTY_{}'
TTY_PATH_ENV = 'NAILGUN_TTY_PATH_{}'
HEADER_FMT = b'>Ic'
HEADER_BYTES = 5

Expand Down Expand Up @@ -237,10 +242,13 @@ def isatty_to_env(cls, stdin, stdout, stderr):
:param file stderr: The stream to check for stderr tty capabilities.
:returns: A dict containing the tty capability environment variables.
"""
fds = (stdin, stdout, stderr)
return {
cls.TTY_ENV_TMPL.format(fd_id): bytes(int(fd.isatty())) for fd_id, fd in enumerate(fds)
}
def gen_env_vars():
for fd_id, fd in zip(STDIO_DESCRIPTORS, (stdin, stdout, stderr)):
is_atty = fd.isatty()
yield (cls.TTY_ENV_TMPL.format(fd_id), bytes(int(is_atty)))
if is_atty:
yield (cls.TTY_PATH_ENV.format(fd_id), os.ttyname(fd.fileno()) or '')
return dict(gen_env_vars())

@classmethod
def isatty_from_env(cls, env):
Expand All @@ -252,4 +260,15 @@ def isatty_from_env(cls, env):
def str_int_bool(i):
return i.isdigit() and bool(int(i)) # Environment variable values should always be strings.

return tuple(str_int_bool(env.get(cls.TTY_ENV_TMPL.format(fd_id), '0')) for fd_id in range(3))
return tuple(
str_int_bool(env.get(cls.TTY_ENV_TMPL.format(fd_id), '0')) for fd_id in STDIO_DESCRIPTORS
)

@classmethod
def ttynames_from_env(cls, env):
"""Determines the ttynames for remote file descriptors (if ttys).
:param dict env: A dictionary representing the environment.
:returns: A tuple of boolean values indicating ttyname paths or None for (stdin, stdout, stderr).
"""
return tuple(env.get(cls.TTY_PATH_ENV.format(fd_id)) for fd_id in STDIO_DESCRIPTORS)
3 changes: 0 additions & 3 deletions src/python/pants/pantsd/pailgun_server.py
Expand Up @@ -73,9 +73,6 @@ def handle(self):
self.logger.info('handling pailgun request: `{}`'.format(' '.join(arguments)))
self.logger.debug('pailgun request environment: %s', environment)

# Instruct the client to send stdin (if applicable).
NailgunProtocol.send_start_reading_input(self.request)

# Execute the requested command with optional daemon-side profiling.
with maybe_profiled(environment.get('PANTSD_PROFILE')):
self._run_pants(self.request, arguments, environment)
Expand Down
4 changes: 2 additions & 2 deletions src/python/pants/task/repl_task_mixin.py
Expand Up @@ -8,7 +8,7 @@
from abc import abstractmethod

from pants.base.workunit import WorkUnitLabel
from pants.console import stty_utils
from pants.console.stty_utils import STTYSettings
from pants.task.mutex_task_mixin import MutexTaskMixin


Expand Down Expand Up @@ -50,7 +50,7 @@ def launch_repl(self, session_setup):
def execute_for(self, targets):
session_setup = self.setup_repl_session(targets)
self.context.release_lock()
with stty_utils.preserve_stty_settings():
with STTYSettings.preserved():
with self.context.new_workunit(name='repl', labels=[WorkUnitLabel.RUN]):
print('') # Start REPL output on a new line.
try:
Expand Down
15 changes: 15 additions & 0 deletions src/python/pants/util/contextutil.py
Expand Up @@ -8,6 +8,7 @@
import logging
import os
import shutil
import signal
import sys
import tempfile
import time
Expand Down Expand Up @@ -98,6 +99,20 @@ def stdio_as(stdout_fd, stderr_fd, stdin_fd):
yield


@contextmanager
def signal_handler_as(sig, handler):
"""Temporarily replaces a signal handler for the given signal and restores the old handler.
:param int sig: The target signal to replace the handler for (e.g. signal.SIGINT).
:param func handler: The new temporary handler.
"""
old_handler = signal.signal(sig, handler)
try:
yield
finally:
signal.signal(sig, old_handler)


@contextmanager
def temporary_dir(root_dir=None, cleanup=True, suffix=str(), permissions=None, prefix=tempfile.template):
"""
Expand Down
20 changes: 19 additions & 1 deletion tests/python/pants_test/util/test_contextutil.py
Expand Up @@ -8,6 +8,7 @@
import os
import pstats
import shutil
import signal
import sys
import unittest
import uuid
Expand All @@ -18,7 +19,8 @@

from pants.util.contextutil import (HardSystemExit, InvalidZipPath, Timer, environment_as,
exception_logging, hard_exit_handler, maybe_profiled, open_zip,
pushd, stdio_as, temporary_dir, temporary_file)
pushd, signal_handler_as, stdio_as, temporary_dir,
temporary_file)
from pants.util.process_handler import subprocess


Expand Down Expand Up @@ -251,6 +253,22 @@ def test_stdio_as_dev_null(self):
print('garbage', file=sys.stdout)
print('garbage', file=sys.stderr)

def test_signal_handler_as(self):
mock_initial_handler = 1
mock_new_handler = 2
with mock.patch('signal.signal', **PATCH_OPTS) as mock_signal:
mock_signal.return_value = mock_initial_handler
try:
with signal_handler_as(signal.SIGUSR2, mock_new_handler):
raise NotImplementedError('blah')
except NotImplementedError:
pass
self.assertEquals(mock_signal.call_count, 2)
mock_signal.assert_has_calls([
mock.call(signal.SIGUSR2, mock_new_handler),
mock.call(signal.SIGUSR2, mock_initial_handler)
])

def test_permissions(self):
with temporary_file(permissions=0700) as f:
self.assertEquals(0700, os.stat(f.name)[0] & 0777)
Expand Down

0 comments on commit afe3edd

Please sign in to comment.