Skip to content

Commit

Permalink
[pantsd] Robustify client connection logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
kwlzn committed Jun 12, 2018
1 parent 79d358a commit 7e1e39c
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 28 deletions.
6 changes: 3 additions & 3 deletions src/python/pants/bin/daemon_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@ def post_fork_child(self):
# Set context in the process title.
set_process_title('pantsd-runner [{}]'.format(' '.join(self._args)))

# Setup a SIGINT signal handler.
self._setup_sigint_handler()

# Broadcast our process group ID (in PID form - i.e. negated) to the remote client so
# they can send signals (e.g. SIGINT) to all processes in the runners process group.
NailgunProtocol.send_pid(self._socket, bytes(os.getpgrp() * -1))

# Setup a SIGINT signal handler.
self._setup_sigint_handler()

# Invoke a Pants run with stdio redirected and a proxied environment.
with self._nailgunned_stdio(self._socket) as finalizer, hermetic_environment_as(**self._env):
try:
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/bin/pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def run(self):
try:
return RemotePantsRunner(self._exiter, self._args, self._env, bootstrap_options).run()
except RemotePantsRunner.Fallback as e:
logger.debug('caught client exception: {!r}, falling back to non-daemon mode'.format(e))
logger.warn('caught client exception: {!r}, falling back to non-daemon mode'.format(e))

# N.B. Inlining this import speeds up the python thin client run by about 100ms.
from pants.bin.local_pants_runner import LocalPantsRunner
Expand Down
60 changes: 50 additions & 10 deletions src/python/pants/bin/remote_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ class RemotePantsRunner(object):
class Fallback(Exception):
"""Raised when fallback to an alternate execution mode is requested."""

class PortNotFound(Exception):
"""Raised when the pailgun port can't be found."""
class Terminated(Exception):
"""Raised when an active run is terminated mid-flight."""

PANTS_COMMAND = 'pants'
RECOVERABLE_EXCEPTIONS = (PortNotFound, NailgunClient.NailgunConnectionError)
RECOVERABLE_EXCEPTIONS = (
NailgunClient.NailgunConnectionError,
NailgunClient.NailgunExecutionError
)

def __init__(self, exiter, args, env, bootstrap_options, stdin=None, stdout=None, stderr=None):
"""
Expand Down Expand Up @@ -86,6 +89,44 @@ def _setup_logging(self):
root.setLevel(log_level)
root.addHandler(handler)

@staticmethod
def _backoff(attempt):
"""Minimal backoff strategy for daemon restarts."""
time.sleep(attempt + (attempt - 1))

def _run_pants_with_retry(self, port, retries=3):
"""Runs pants remotely with retry and recovery for nascent executions."""
attempt = 1
while 1:
logger.debug(
'connecting to pantsd on port {} (attempt {}/{})'.format(port, attempt, retries)
)
try:
return self._connect_and_execute(port)
except self.RECOVERABLE_EXCEPTIONS as e:
if attempt > retries:
raise self.Fallback(e)

self._backoff(attempt)
logger.warn(
'pantsd was unresponsive on port {}, retrying ({}/{})'
.format(port, attempt, retries)
)

# One possible cause of the daemon being non-responsive during an attempt might be if a
# another lifecycle operation is happening concurrently (incl teardown). To account for
# this, we won't begin attempting restarts until at least 1 second has passed (1 attempt).
if attempt > 1:
port = self._restart_pantsd()
attempt += 1
except NailgunClient.NailgunError as e:
# Ensure a newline.
logger.fatal('')
logger.fatal('lost active connection to pantsd!')
raise self.Terminated, (
'abruptly lost active connection to pantsd runner: {!r}'.format(e)
), e.traceback

def _connect_and_execute(self, port):
# Merge the nailgun TTY capability environment variables with the passed environment dict.
ng_env = NailgunProtocol.isatty_to_env(self._stdin, self._stdout, self._stderr)
Expand All @@ -99,7 +140,8 @@ def _connect_and_execute(self, port):
ins=self._stdin,
out=self._stdout,
err=self._stderr,
exit_on_broken_pipe=True)
exit_on_broken_pipe=True,
expects_pid=True)

with self._trapped_signals(client), STTYSettings.preserved():
# Execute the command on the pailgun.
Expand All @@ -108,15 +150,13 @@ def _connect_and_execute(self, port):
# Exit.
self._exiter.exit(result)

def _restart_pantsd(self):
return PantsDaemon.Factory.restart(bootstrap_options=self._bootstrap_options)

def _maybe_launch_pantsd(self):
return PantsDaemon.Factory.maybe_launch(bootstrap_options=self._bootstrap_options)

def run(self, args=None):
self._setup_logging()
port = self._maybe_launch_pantsd()

logger.debug('connecting to pailgun on port {}'.format(port))
try:
self._connect_and_execute(port)
except self.RECOVERABLE_EXCEPTIONS as e:
raise self.Fallback(e)
self._run_pants_with_retry(port)
4 changes: 3 additions & 1 deletion src/python/pants/core_tasks/pantsd_kill.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class PantsDaemonKill(Task):

def execute(self):
try:
PantsDaemon.Factory.create(self.context.options, full_init=False).terminate()
pantsd = PantsDaemon.Factory.create(self.context.options, full_init=False)
with pantsd.lifecycle_lock:
pantsd.terminate()
except ProcessManager.NonResponsiveProcess as e:
raise TaskError('failure while terminating pantsd: {}'.format(e))
68 changes: 60 additions & 8 deletions src/python/pants/java/nailgun_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,38 @@ class NailgunClient(object):
class NailgunError(Exception):
"""Indicates an error interacting with a nailgun server."""

DESCRIPTION = 'Problem talking to nailgun server'

def __init__(self, address, pid, wrapped_exc, traceback):
self.address = address
self.pid = pid
self.wrapped_exc = wrapped_exc
self.traceback = traceback
super(NailgunClient.NailgunError, self).__init__(
'{} (address: {}{}): {!r}'
.format(
self.DESCRIPTION,
address,
', remote_pid: {}'.format(pid) if pid is not None else '',
self.wrapped_exc
)
)

class NailgunConnectionError(NailgunError):
"""Indicates an error upon initial connect to the nailgun server."""
DESCRIPTION = 'Problem connecting to nailgun server'

class NailgunExecutionError(NailgunError):
"""Indicates an error upon initial command execution on the nailgun server."""
DESCRIPTION = 'Problem executing command on nailgun server'

# For backwards compatibility with nails expecting the ng c client special env vars.
ENV_DEFAULTS = dict(NAILGUN_FILESEPARATOR=os.sep, NAILGUN_PATHSEPARATOR=os.pathsep)
DEFAULT_NG_HOST = '127.0.0.1'
DEFAULT_NG_PORT = 2113

def __init__(self, host=DEFAULT_NG_HOST, port=DEFAULT_NG_PORT, ins=sys.stdin, out=None, err=None,
workdir=None, exit_on_broken_pipe=False):
workdir=None, exit_on_broken_pipe=False, expects_pid=False):
"""Creates a nailgun client that can be used to issue zero or more nailgun commands.
:param string host: the nailgun server to contact (defaults to '127.0.0.1')
Expand All @@ -117,17 +139,30 @@ def __init__(self, host=DEFAULT_NG_HOST, port=DEFAULT_NG_PORT, ins=sys.stdin, ou
:param file out: a stream to write command standard output to (defaults to stdout)
:param file err: a stream to write command standard error to (defaults to stderr)
:param string workdir: the default working directory for all nailgun commands (defaults to CWD)
:param bool exit_on_broken_pipe: whether or not to exit when `Broken Pipe` errors are encountered.
:param bool exit_on_broken_pipe: whether or not to exit when `Broken Pipe` errors are encountered
:param bool expect_pid: Whether or not to expect a PID from the server (only true for pantsd)
"""
self._host = host
self._port = port
self._address = (host, port)
self._address_string = ':'.join(str(i) for i in self._address)
self._stdin = ins
self._stdout = out or sys.stdout
self._stderr = err or sys.stderr
self._workdir = workdir or os.path.abspath(os.path.curdir)
self._exit_on_broken_pipe = exit_on_broken_pipe
self._expects_pid = expects_pid
self._session = None

@property
def pid(self):
if not self._expects_pid:
return None
try:
return self._session.remote_pid
except AttributeError:
return None

def try_connect(self):
"""Creates a socket, connects it to the nailgun and returns the connected socket.
Expand All @@ -136,12 +171,16 @@ def try_connect(self):
"""
sock = RecvBufferedSocket(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
try:
sock.connect((self._host, self._port))
sock.connect(self._address)
except (socket.error, socket.gaierror) as e:
logger.debug('Encountered socket exception {!r} when attempting connect to nailgun'.format(e))
sock.close()
raise self.NailgunConnectionError(
'Problem connecting to nailgun server at {}:{}: {!r}'.format(self._host, self._port, e))
address=self._address_string,
pid=self.pid,
wrapped_exc=e,
traceback=sys.exc_info()[2]
)
else:
return sock

Expand Down Expand Up @@ -178,11 +217,24 @@ def execute(self, main_class, cwd=None, *args, **environment):
try:
return self._session.execute(cwd, main_class, *args, **environment)
except socket.error as e:
raise self.NailgunError('Problem communicating with nailgun server at {}:{}: {!r}'
.format(self._host, self._port, e))
raise self.NailgunError(
address=self._address_string,
pid=self.pid,
wrapped_exc=e,
traceback=sys.exc_info()[2]
)
except NailgunProtocol.ProtocolError as e:
raise self.NailgunError('Problem in nailgun protocol with nailgun server at {}:{}: {!r}'
.format(self._host, self._port, e))
# If we get to a `ProtocolError` and we don't yet have a pid, then
# the daemon has not yet achieved a successful fork - so we can
# treat that as a separate, retryable error (usually indicating that
# the daemon is in a bad state).
exc_type = self.NailgunExecutionError if self.pid is None else self.NailgunError
raise exc_type(
address=self._address_string,
pid=self.pid,
wrapped_exc=e,
traceback=sys.exc_info()[2]
)
finally:
sock.close()
self._session = None
Expand Down
25 changes: 20 additions & 5 deletions src/python/pants/pantsd/pants_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,19 @@ def maybe_launch(cls, bootstrap_options=None):
else:
return stub_pantsd.read_named_socket('pailgun', int)

@classmethod
def restart(cls, bootstrap_options=None):
"""Restarts a running daemon instance.
:param Options bootstrap_options: The bootstrap options, if available.
:returns: The pailgun port number of the new pantsd instance.
:rtype: int
"""
pantsd = cls.create(bootstrap_options)
with pantsd.lifecycle_lock:
# N.B. This will call `pantsd.terminate()` before starting.
return pantsd.launch()

@classmethod
def create(cls, bootstrap_options=None, full_init=True):
"""
Expand Down Expand Up @@ -402,11 +415,13 @@ def launch(self):
return listening_port

def terminate(self, include_watchman=True):
"""Terminates pantsd and watchman."""
with self.lifecycle_lock:
super(PantsDaemon, self).terminate()
if include_watchman:
self.watchman_launcher.terminate()
"""Terminates pantsd and watchman.
N.B. This should always be called under care of `self.lifecycle_lock`.
"""
super(PantsDaemon, self).terminate()
if include_watchman:
self.watchman_launcher.terminate()


def launch():
Expand Down

0 comments on commit 7e1e39c

Please sign in to comment.