diff --git a/src/python/pants/bin/remote_pants_runner.py b/src/python/pants/bin/remote_pants_runner.py index 4d706750827b..467903d6d42b 100644 --- a/src/python/pants/bin/remote_pants_runner.py +++ b/src/python/pants/bin/remote_pants_runner.py @@ -5,7 +5,6 @@ from __future__ import absolute_import, division, print_function, unicode_literals import logging -import signal import sys import time from builtins import object, str @@ -25,11 +24,6 @@ logger = logging.getLogger(__name__) -def _make_interrupted_by_user_error(): - """Create an exception which is set by the signal handler and/or a socket timeout!""" - return KeyboardInterrupt('Interrupted by user over pailgun client!') - - class PailgunClientSignalHandler(SignalHandler): def __init__(self, pailgun_client, timeout=1, *args, **kwargs): @@ -41,7 +35,7 @@ def __init__(self, pailgun_client, timeout=1, *args, **kwargs): def _forward_signal_with_timeout(self, signum): self._pailgun_client.set_exit_timeout( timeout=self._timeout, - reason=_make_interrupted_by_user_error()) + reason=KeyboardInterrupt('Interrupted by user over pailgun client!')) self._pailgun_client.maybe_send_signal(signum) def handle_sigint(self, signum, _frame): @@ -90,19 +84,13 @@ def __init__(self, exiter, args, env, options_bootstrapper, stdin=None, stdout=N self._stderr = stderr or (sys.stderr.buffer if PY3 else sys.stderr) @contextmanager - def _trapped_signals(self, client, timeout=None): + def _trapped_signals(self, client): """A contextmanager that handles SIGINT (control-c) and SIGQUIT (control-\\) remotely.""" - signal_handler = PailgunClientSignalHandler(client, timeout=timeout) + signal_handler = PailgunClientSignalHandler( + client, + timeout=self._bootstrap_options.for_global_scope().pantsd_pailgun_quit_timeout) with ExceptionSink.trapped_signals(signal_handler): - try: - yield - except NailgunProtocol.ProcessStreamTimeout as e: - logger.warn("timed out when attempting to gracefully shut down the remote client. " - "sending SIGKILL to the remote client at pid: {}. " - "original timeout seconds: {}, message: {}" - .format(client._maybe_last_pid(), timeout, e)) - client.maybe_send_signal(signal.SIGKILL) - raise _make_interrupted_by_user_error() + yield def _setup_logging(self): """Sets up basic stdio logging for the thin client.""" @@ -174,7 +162,7 @@ def _connect_and_execute(self, pantsd_handle): exit_on_broken_pipe=True, metadata_base_dir=pantsd_handle.metadata_base_dir) - with self._trapped_signals(client, timeout=1), STTYSettings.preserved(): + with self._trapped_signals(client), STTYSettings.preserved(): # Execute the command on the pailgun. try: result = client.execute(self.PANTS_COMMAND, *self._args, **modified_env) diff --git a/src/python/pants/java/nailgun_client.py b/src/python/pants/java/nailgun_client.py index 4a2983d51e8f..0e7b76e61e2c 100644 --- a/src/python/pants/java/nailgun_client.py +++ b/src/python/pants/java/nailgun_client.py @@ -13,6 +13,7 @@ import time from builtins import object, str +import psutil from future.utils import PY3 from pants.java.nailgun_io import NailgunStreamWriter @@ -20,13 +21,13 @@ from pants.util.dirutil import safe_file_dump from pants.util.osutil import safe_kill from pants.util.socket import RecvBufferedSocket -from pants.util.strutil import ensure_binary +from pants.util.strutil import ensure_binary, safe_shlex_join logger = logging.getLogger(__name__) -class NailgunClientSession(NailgunProtocol, NailgunProtocol.TimeoutObject): +class NailgunClientSession(NailgunProtocol, NailgunProtocol.TimeoutProvider): """Handles a single nailgun client session.""" def __init__(self, sock, in_file, out_file, err_file, exit_on_broken_pipe=False, @@ -49,6 +50,7 @@ def __init__(self, sock, in_file, out_file, err_file, exit_on_broken_pipe=False, self._stderr = err_file self._exit_on_broken_pipe = exit_on_broken_pipe self.remote_pid = None + self.remote_process_cmdline = None self.remote_pgrp = None self._remote_pid_callback = remote_pid_callback self._remote_pgrp_callback = remote_pgrp_callback @@ -74,9 +76,9 @@ def _set_exit_timeout(self, timeout, reason): self._exit_reason = reason def maybe_timeout_options(self): - """Implements the NailgunProtocol.TimeoutObject interface.""" + """Implements the NailgunProtocol.TimeoutProvider interface.""" if self._exit_timeout_start_time: - return (self._exit_timeout_start_time, self._exit_timeout) + return NailgunProtocol.TimeoutOptions(self._exit_timeout_start_time, self._exit_timeout) else: return None @@ -102,6 +104,9 @@ def _write_flush(self, fd, payload=None): # Otherwise, re-raise. raise + class ExitTimedOut(Exception): + """Raised when a timeout for the remote client exit was breached.""" + def _process_session(self): """Process the outputs of the nailgun session. @@ -125,6 +130,7 @@ def _process_session(self): return int(payload) elif chunk_type == ChunkType.PID: self.remote_pid = int(payload) + self.remote_process_cmdline = psutil.Process(self.remote_pid).cmdline() if self._remote_pid_callback: self._remote_pid_callback(self.remote_pid) elif chunk_type == ChunkType.PGRP: @@ -135,6 +141,17 @@ def _process_session(self): self._maybe_start_input_writer() else: raise self.ProtocolError('received unexpected chunk {} -> {}'.format(chunk_type, payload)) + except NailgunProtocol.ProcessStreamTimeout as e: + assert(self.remote_pid is not None) + # NB: We overwrite the process title in the pantsd-runner process, which causes it to have an + # argv with lots of empty spaces for some reason. We filter those out and pretty-print the + # rest here. + filtered_remote_cmdline = safe_shlex_join( + arg for arg in self.remote_process_cmdline if arg != '') + logger.warning( + "timed out when attempting to gracefully shut down the remote client executing \"{}\". " + "sending SIGKILL to the remote client at pid: {}. message: {}" + .format(filtered_remote_cmdline, self.remote_pid, e)) finally: # Bad chunk types received from the server can throw NailgunProtocol.ProtocolError in # NailgunProtocol.iter_chunks(). This ensures the NailgunStreamWriter is always stopped. diff --git a/src/python/pants/java/nailgun_protocol.py b/src/python/pants/java/nailgun_protocol.py index 7156e4fae167..4bb51fdf4a56 100644 --- a/src/python/pants/java/nailgun_protocol.py +++ b/src/python/pants/java/nailgun_protocol.py @@ -4,6 +4,7 @@ from __future__ import absolute_import, division, print_function, unicode_literals +import datetime import os import socket import struct @@ -13,6 +14,7 @@ from contextlib import contextmanager from pants.util.meta import AbstractClass +from pants.util.objects import datatype from pants.util.osutil import IntegerForPid @@ -234,17 +236,18 @@ def _set_socket_timeout(cls, sock, timeout=None): if timeout is not None: sock.settimeout(prev_timeout) - class TimeoutObject(AbstractClass): + class TimeoutOptions(datatype([('start_time', float), ('interval', float)])): pass + + class TimeoutProvider(AbstractClass): @abstractmethod def maybe_timeout_options(self): """Called on every stream iteration to obtain a possible specification for a timeout. - If this method returns non-None, it should return a 2-tuple of the time the timeout began, and - the length of the timeout, both as positive floats. + If this method returns non-None, it should return an instance of `cls.TimeoutOptions`, which + then initiates a timeout after which the stream will raise `cls.ProcessStreamTimeout`. - :returns: a 2-tuple of (timeout_start_time, timeout_interval), or None - :rtype: tuple of float + :rtype: :class:`cls.TimeoutOptions`, or None """ @classmethod @@ -257,25 +260,27 @@ def iter_chunks(cls, sock, return_bytes=False, timeout_object=None): :param sock: the socket to read from. :param bool return_bytes: If False, decode the payload into a utf-8 string. - :param cls.TimeoutObject timeout_object: If provided, will be checked every iteration for a + :param cls.TimeoutProvider timeout_object: If provided, will be checked every iteration for a possible timeout. :raises: :class:`cls.ProcessStreamTimeout` """ - assert(timeout_object is None or isinstance(timeout_object, cls.TimeoutObject)) + assert(timeout_object is None or isinstance(timeout_object, cls.TimeoutProvider)) orig_timeout_time = None timeout_interval = None while 1: if orig_timeout_time is not None: remaining_time = time.time() - (orig_timeout_time + timeout_interval) - if remaining_time < 0: + if remaining_time > 0: + original_timestamp = datetime.datetime.fromtimestamp(orig_timeout_time).isoformat() raise cls.ProcessStreamTimeout( - "iterating over bytes timed out with timeout interval {} starting at {}, " + "iterating over bytes from nailgun timed out with timeout interval {} starting at {}, " "overtime seconds: {}" - .format(timeout_interval, orig_timeout_time, (-1 * remaining_time))) + .format(timeout_interval, original_timestamp, remaining_time)) elif timeout_object is not None: opts = timeout_object.maybe_timeout_options() if opts: - orig_timeout_time, timeout_interval = opts + orig_timeout_time = opts.start_time + timeout_interval = opts.interval continue remaining_time = None else: diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index 330d022e3409..c2d4795dee19 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -276,6 +276,9 @@ def register_bootstrap_options(cls, register): help='The host to bind the pants nailgun server to.') register('--pantsd-pailgun-port', advanced=True, type=int, default=0, help='The port to bind the pants nailgun server to. Defaults to a random port.') + register('--pantsd-pailgun-quit-timeout', advanced=True, type=float, default=1.0, + help='The length of time (in seconds) to wait for further output after sending a ' + 'signal to the remote pantsd-runner process before killing it.') register('--pantsd-log-dir', advanced=True, default=None, help='The directory to log pantsd output to.') register('--pantsd-invalidation-globs', advanced=True, type=list, fromfile=True, default=[], diff --git a/tests/python/pants_test/pantsd/test_pantsd_integration.py b/tests/python/pants_test/pantsd/test_pantsd_integration.py index 05a4e8767402..73704e0ad504 100644 --- a/tests/python/pants_test/pantsd/test_pantsd_integration.py +++ b/tests/python/pants_test/pantsd/test_pantsd_integration.py @@ -4,8 +4,10 @@ from __future__ import absolute_import, division, print_function, unicode_literals +import datetime import itertools import os +import re import signal import threading import time @@ -416,17 +418,24 @@ def test_pantsd_multiple_parallel_runs(self): self.assert_success(creator_handle.join()) self.assert_success(waiter_handle.join()) - def _assert_pantsd_keyboardinterrupt_signal(self, signum, message): + def _assert_pantsd_keyboardinterrupt_signal(self, signum, messages, regexps=[], + quit_timeout=None): # TODO: This tests that pantsd-runner processes actually die after the thin client receives the # specified signal. with self.pantsd_test_context() as (workdir, config, checker): # Launch a run that will wait for a file to be created (but do not create that file). file_to_make = os.path.join(workdir, 'some_magic_file') - waiter_handle = self.run_pants_with_workdir_without_waiting( - ['run', 'testprojects/src/python/coordinated_runs:waiter', '--', file_to_make], - workdir, - config, - ) + + # Assuming the config provided already has some global options, set the quit timeout just for + # this pants run, not the kill-pantsd runs before or after. + if quit_timeout is not None: + timeout_args = ['--pantsd-pailgun-quit-timeout={}'.format(quit_timeout)] + else: + timeout_args = [] + argv = timeout_args + [ + 'run', 'testprojects/src/python/coordinated_runs:waiter', '--', file_to_make + ] + waiter_handle = self.run_pants_with_workdir_without_waiting(argv, workdir, config) client_pid = waiter_handle.process.pid checker.assert_started() @@ -439,7 +448,10 @@ def _assert_pantsd_keyboardinterrupt_signal(self, signum, message): waiter_run = waiter_handle.join() self.assert_failure(waiter_run) - self.assertIn(message, waiter_run.stderr_data) + for msg in messages: + self.assertIn(msg, waiter_run.stderr_data) + for regexp in regexps: + assertRegex(self, waiter_run.stderr_data, regexp) time.sleep(1) for proc in pantsd_runner_processes: @@ -451,12 +463,23 @@ def _assert_pantsd_keyboardinterrupt_signal(self, signum, message): def test_pantsd_sigterm(self): self._assert_pantsd_keyboardinterrupt_signal( signal.SIGTERM, - 'Signal {signum} (SIGTERM) was raised. Exiting with failure.'.format(signum=signal.SIGTERM)) + ['Signal {signum} (SIGTERM) was raised. Exiting with failure.'.format(signum=signal.SIGTERM)]) def test_keyboardinterrupt_signals_with_pantsd(self): for interrupt_signal in [signal.SIGINT, signal.SIGQUIT]: - self._assert_pantsd_keyboardinterrupt_signal(interrupt_signal, - '\nInterrupted by user over pailgun client!\n') + self._assert_pantsd_keyboardinterrupt_signal( + interrupt_signal, + ['\nInterrupted by user over pailgun client!\n']) + + def test_signal_pailgun_stream_timeout(self): + today = datetime.date.today().isoformat() + self._assert_pantsd_keyboardinterrupt_signal( + signal.SIGINT, + messages=['\nInterrupted by user over pailgun client!\n'], + regexps=["""WARN\\] timed out when attempting to gracefully shut down the remote client executing "'pantsd-runner.*'"\\. sending SIGKILL to the remote client at pid: [0-9]+\\. message: iterating over bytes from nailgun timed out with timeout interval 0\\.01 starting at {today}T[^\n]+, overtime seconds: [^\n]+""" + .format(today=re.escape(today))], + quit_timeout=0.01, + ) def test_pantsd_environment_scrubbing(self): # This pair of JVM options causes the JVM to always crash, so the command will fail if the env