Skip to content

Commit

Permalink
add a global option for the pailgun timeout and add an integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
cosmicexplorer committed Mar 19, 2019
1 parent 5fb8b18 commit 61b09e1
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 44 deletions.
26 changes: 7 additions & 19 deletions src/python/pants/bin/remote_pants_runner.py
Expand Up @@ -5,7 +5,6 @@
from __future__ import absolute_import, division, print_function, unicode_literals

import logging
import signal
import sys
import time
from builtins import object, str
Expand All @@ -25,11 +24,6 @@
logger = logging.getLogger(__name__)


def _make_interrupted_by_user_error():
"""Create an exception which is set by the signal handler and/or a socket timeout!"""
return KeyboardInterrupt('Interrupted by user over pailgun client!')


class PailgunClientSignalHandler(SignalHandler):

def __init__(self, pailgun_client, timeout=1, *args, **kwargs):
Expand All @@ -41,7 +35,7 @@ def __init__(self, pailgun_client, timeout=1, *args, **kwargs):
def _forward_signal_with_timeout(self, signum):
self._pailgun_client.set_exit_timeout(
timeout=self._timeout,
reason=_make_interrupted_by_user_error())
reason=KeyboardInterrupt('Interrupted by user over pailgun client!'))
self._pailgun_client.maybe_send_signal(signum)

def handle_sigint(self, signum, _frame):
Expand Down Expand Up @@ -90,19 +84,13 @@ def __init__(self, exiter, args, env, options_bootstrapper, stdin=None, stdout=N
self._stderr = stderr or (sys.stderr.buffer if PY3 else sys.stderr)

@contextmanager
def _trapped_signals(self, client, timeout=None):
def _trapped_signals(self, client):
"""A contextmanager that handles SIGINT (control-c) and SIGQUIT (control-\\) remotely."""
signal_handler = PailgunClientSignalHandler(client, timeout=timeout)
signal_handler = PailgunClientSignalHandler(
client,
timeout=self._bootstrap_options.for_global_scope().pantsd_pailgun_quit_timeout)
with ExceptionSink.trapped_signals(signal_handler):
try:
yield
except NailgunProtocol.ProcessStreamTimeout as e:
logger.warn("timed out when attempting to gracefully shut down the remote client. "
"sending SIGKILL to the remote client at pid: {}. "
"original timeout seconds: {}, message: {}"
.format(client._maybe_last_pid(), timeout, e))
client.maybe_send_signal(signal.SIGKILL)
raise _make_interrupted_by_user_error()
yield

def _setup_logging(self):
"""Sets up basic stdio logging for the thin client."""
Expand Down Expand Up @@ -174,7 +162,7 @@ def _connect_and_execute(self, pantsd_handle):
exit_on_broken_pipe=True,
metadata_base_dir=pantsd_handle.metadata_base_dir)

with self._trapped_signals(client, timeout=1), STTYSettings.preserved():
with self._trapped_signals(client), STTYSettings.preserved():
# Execute the command on the pailgun.
try:
result = client.execute(self.PANTS_COMMAND, *self._args, **modified_env)
Expand Down
25 changes: 21 additions & 4 deletions src/python/pants/java/nailgun_client.py
Expand Up @@ -13,20 +13,21 @@
import time
from builtins import object, str

import psutil
from future.utils import PY3

from pants.java.nailgun_io import NailgunStreamWriter
from pants.java.nailgun_protocol import ChunkType, NailgunProtocol
from pants.util.dirutil import safe_file_dump
from pants.util.osutil import safe_kill
from pants.util.socket import RecvBufferedSocket
from pants.util.strutil import ensure_binary
from pants.util.strutil import ensure_binary, safe_shlex_join


logger = logging.getLogger(__name__)


class NailgunClientSession(NailgunProtocol, NailgunProtocol.TimeoutObject):
class NailgunClientSession(NailgunProtocol, NailgunProtocol.TimeoutProvider):
"""Handles a single nailgun client session."""

def __init__(self, sock, in_file, out_file, err_file, exit_on_broken_pipe=False,
Expand All @@ -49,6 +50,7 @@ def __init__(self, sock, in_file, out_file, err_file, exit_on_broken_pipe=False,
self._stderr = err_file
self._exit_on_broken_pipe = exit_on_broken_pipe
self.remote_pid = None
self.remote_process_cmdline = None
self.remote_pgrp = None
self._remote_pid_callback = remote_pid_callback
self._remote_pgrp_callback = remote_pgrp_callback
Expand All @@ -74,9 +76,9 @@ def _set_exit_timeout(self, timeout, reason):
self._exit_reason = reason

def maybe_timeout_options(self):
"""Implements the NailgunProtocol.TimeoutObject interface."""
"""Implements the NailgunProtocol.TimeoutProvider interface."""
if self._exit_timeout_start_time:
return (self._exit_timeout_start_time, self._exit_timeout)
return NailgunProtocol.TimeoutOptions(self._exit_timeout_start_time, self._exit_timeout)
else:
return None

Expand All @@ -102,6 +104,9 @@ def _write_flush(self, fd, payload=None):
# Otherwise, re-raise.
raise

class ExitTimedOut(Exception):
"""Raised when a timeout for the remote client exit was breached."""

def _process_session(self):
"""Process the outputs of the nailgun session.
Expand All @@ -125,6 +130,7 @@ def _process_session(self):
return int(payload)
elif chunk_type == ChunkType.PID:
self.remote_pid = int(payload)
self.remote_process_cmdline = psutil.Process(self.remote_pid).cmdline()
if self._remote_pid_callback:
self._remote_pid_callback(self.remote_pid)
elif chunk_type == ChunkType.PGRP:
Expand All @@ -135,6 +141,17 @@ def _process_session(self):
self._maybe_start_input_writer()
else:
raise self.ProtocolError('received unexpected chunk {} -> {}'.format(chunk_type, payload))
except NailgunProtocol.ProcessStreamTimeout as e:
assert(self.remote_pid is not None)
# NB: We overwrite the process title in the pantsd-runner process, which causes it to have an
# argv with lots of empty spaces for some reason. We filter those out and pretty-print the
# rest here.
filtered_remote_cmdline = safe_shlex_join(
arg for arg in self.remote_process_cmdline if arg != '')
logger.warning(
"timed out when attempting to gracefully shut down the remote client executing \"{}\". "
"sending SIGKILL to the remote client at pid: {}. message: {}"
.format(filtered_remote_cmdline, self.remote_pid, e))
finally:
# Bad chunk types received from the server can throw NailgunProtocol.ProtocolError in
# NailgunProtocol.iter_chunks(). This ensures the NailgunStreamWriter is always stopped.
Expand Down
27 changes: 16 additions & 11 deletions src/python/pants/java/nailgun_protocol.py
Expand Up @@ -4,6 +4,7 @@

from __future__ import absolute_import, division, print_function, unicode_literals

import datetime
import os
import socket
import struct
Expand All @@ -13,6 +14,7 @@
from contextlib import contextmanager

from pants.util.meta import AbstractClass
from pants.util.objects import datatype
from pants.util.osutil import IntegerForPid


Expand Down Expand Up @@ -234,17 +236,18 @@ def _set_socket_timeout(cls, sock, timeout=None):
if timeout is not None:
sock.settimeout(prev_timeout)

class TimeoutObject(AbstractClass):
class TimeoutOptions(datatype([('start_time', float), ('interval', float)])): pass

class TimeoutProvider(AbstractClass):

@abstractmethod
def maybe_timeout_options(self):
"""Called on every stream iteration to obtain a possible specification for a timeout.
If this method returns non-None, it should return a 2-tuple of the time the timeout began, and
the length of the timeout, both as positive floats.
If this method returns non-None, it should return an instance of `cls.TimeoutOptions`, which
then initiates a timeout after which the stream will raise `cls.ProcessStreamTimeout`.
:returns: a 2-tuple of (timeout_start_time, timeout_interval), or None
:rtype: tuple of float
:rtype: :class:`cls.TimeoutOptions`, or None
"""

@classmethod
Expand All @@ -257,25 +260,27 @@ def iter_chunks(cls, sock, return_bytes=False, timeout_object=None):
:param sock: the socket to read from.
:param bool return_bytes: If False, decode the payload into a utf-8 string.
:param cls.TimeoutObject timeout_object: If provided, will be checked every iteration for a
:param cls.TimeoutProvider timeout_object: If provided, will be checked every iteration for a
possible timeout.
:raises: :class:`cls.ProcessStreamTimeout`
"""
assert(timeout_object is None or isinstance(timeout_object, cls.TimeoutObject))
assert(timeout_object is None or isinstance(timeout_object, cls.TimeoutProvider))
orig_timeout_time = None
timeout_interval = None
while 1:
if orig_timeout_time is not None:
remaining_time = time.time() - (orig_timeout_time + timeout_interval)
if remaining_time < 0:
if remaining_time > 0:
original_timestamp = datetime.datetime.fromtimestamp(orig_timeout_time).isoformat()
raise cls.ProcessStreamTimeout(
"iterating over bytes timed out with timeout interval {} starting at {}, "
"iterating over bytes from nailgun timed out with timeout interval {} starting at {}, "
"overtime seconds: {}"
.format(timeout_interval, orig_timeout_time, (-1 * remaining_time)))
.format(timeout_interval, original_timestamp, remaining_time))
elif timeout_object is not None:
opts = timeout_object.maybe_timeout_options()
if opts:
orig_timeout_time, timeout_interval = opts
orig_timeout_time = opts.start_time
timeout_interval = opts.interval
continue
remaining_time = None
else:
Expand Down
3 changes: 3 additions & 0 deletions src/python/pants/option/global_options.py
Expand Up @@ -276,6 +276,9 @@ def register_bootstrap_options(cls, register):
help='The host to bind the pants nailgun server to.')
register('--pantsd-pailgun-port', advanced=True, type=int, default=0,
help='The port to bind the pants nailgun server to. Defaults to a random port.')
register('--pantsd-pailgun-quit-timeout', advanced=True, type=float, default=1.0,
help='The length of time (in seconds) to wait for further output after sending a '
'signal to the remote pantsd-runner process before killing it.')
register('--pantsd-log-dir', advanced=True, default=None,
help='The directory to log pantsd output to.')
register('--pantsd-invalidation-globs', advanced=True, type=list, fromfile=True, default=[],
Expand Down
43 changes: 33 additions & 10 deletions tests/python/pants_test/pantsd/test_pantsd_integration.py
Expand Up @@ -4,8 +4,10 @@

from __future__ import absolute_import, division, print_function, unicode_literals

import datetime
import itertools
import os
import re
import signal
import threading
import time
Expand Down Expand Up @@ -416,17 +418,24 @@ def test_pantsd_multiple_parallel_runs(self):
self.assert_success(creator_handle.join())
self.assert_success(waiter_handle.join())

def _assert_pantsd_keyboardinterrupt_signal(self, signum, message):
def _assert_pantsd_keyboardinterrupt_signal(self, signum, messages, regexps=[],
quit_timeout=None):
# TODO: This tests that pantsd-runner processes actually die after the thin client receives the
# specified signal.
with self.pantsd_test_context() as (workdir, config, checker):
# Launch a run that will wait for a file to be created (but do not create that file).
file_to_make = os.path.join(workdir, 'some_magic_file')
waiter_handle = self.run_pants_with_workdir_without_waiting(
['run', 'testprojects/src/python/coordinated_runs:waiter', '--', file_to_make],
workdir,
config,
)

# Assuming the config provided already has some global options, set the quit timeout just for
# this pants run, not the kill-pantsd runs before or after.
if quit_timeout is not None:
timeout_args = ['--pantsd-pailgun-quit-timeout={}'.format(quit_timeout)]
else:
timeout_args = []
argv = timeout_args + [
'run', 'testprojects/src/python/coordinated_runs:waiter', '--', file_to_make
]
waiter_handle = self.run_pants_with_workdir_without_waiting(argv, workdir, config)
client_pid = waiter_handle.process.pid

checker.assert_started()
Expand All @@ -439,7 +448,10 @@ def _assert_pantsd_keyboardinterrupt_signal(self, signum, message):
waiter_run = waiter_handle.join()
self.assert_failure(waiter_run)

self.assertIn(message, waiter_run.stderr_data)
for msg in messages:
self.assertIn(msg, waiter_run.stderr_data)
for regexp in regexps:
assertRegex(self, waiter_run.stderr_data, regexp)

time.sleep(1)
for proc in pantsd_runner_processes:
Expand All @@ -451,12 +463,23 @@ def _assert_pantsd_keyboardinterrupt_signal(self, signum, message):
def test_pantsd_sigterm(self):
self._assert_pantsd_keyboardinterrupt_signal(
signal.SIGTERM,
'Signal {signum} (SIGTERM) was raised. Exiting with failure.'.format(signum=signal.SIGTERM))
['Signal {signum} (SIGTERM) was raised. Exiting with failure.'.format(signum=signal.SIGTERM)])

def test_keyboardinterrupt_signals_with_pantsd(self):
for interrupt_signal in [signal.SIGINT, signal.SIGQUIT]:
self._assert_pantsd_keyboardinterrupt_signal(interrupt_signal,
'\nInterrupted by user over pailgun client!\n')
self._assert_pantsd_keyboardinterrupt_signal(
interrupt_signal,
['\nInterrupted by user over pailgun client!\n'])

def test_signal_pailgun_stream_timeout(self):
today = datetime.date.today().isoformat()
self._assert_pantsd_keyboardinterrupt_signal(
signal.SIGINT,
messages=['\nInterrupted by user over pailgun client!\n'],
regexps=["""WARN\\] timed out when attempting to gracefully shut down the remote client executing "'pantsd-runner.*'"\\. sending SIGKILL to the remote client at pid: [0-9]+\\. message: iterating over bytes from nailgun timed out with timeout interval 0\\.01 starting at {today}T[^\n]+, overtime seconds: [^\n]+"""
.format(today=re.escape(today))],
quit_timeout=0.01,
)

def test_pantsd_environment_scrubbing(self):
# This pair of JVM options causes the JVM to always crash, so the command will fail if the env
Expand Down

0 comments on commit 61b09e1

Please sign in to comment.