/
nailgun_client.py
382 lines (332 loc) · 15.6 KB
/
nailgun_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# coding=utf-8
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
from __future__ import absolute_import, division, print_function, unicode_literals
import errno
import logging
import os
import signal
import socket
import sys
import time
from builtins import object, str
import psutil
from future.utils import PY3
from pants.java.nailgun_io import NailgunStreamWriter
from pants.java.nailgun_protocol import ChunkType, NailgunProtocol
from pants.util.dirutil import safe_file_dump
from pants.util.osutil import safe_kill
from pants.util.socket import RecvBufferedSocket
from pants.util.strutil import ensure_binary, safe_shlex_join
logger = logging.getLogger(__name__)
class NailgunClientSession(NailgunProtocol, NailgunProtocol.TimeoutProvider):
"""Handles a single nailgun client session."""
def __init__(self, sock, in_file, out_file, err_file, exit_on_broken_pipe=False,
remote_pid_callback=None, remote_pgrp_callback=None):
"""
:param bool exit_on_broken_pipe: whether or not to exit when `Broken Pipe` errors are
encountered
:param remote_pid_callback: Callback to run when a pid chunk is received from a remote client.
:param remote_pgrp_callback: Callback to run when a pgrp (process group) chunk is received from
a remote client.
"""
self._sock = sock
self._input_writer = None if not in_file else NailgunStreamWriter(
(in_file.fileno(),),
self._sock,
(ChunkType.STDIN,),
ChunkType.STDIN_EOF
)
self._stdout = out_file
self._stderr = err_file
self._exit_on_broken_pipe = exit_on_broken_pipe
self.remote_pid = None
self.remote_process_cmdline = None
self.remote_pgrp = None
self._remote_pid_callback = remote_pid_callback
self._remote_pgrp_callback = remote_pgrp_callback
# NB: These variables are set in a signal handler to implement graceful shutdown.
self._exit_timeout_start_time = None
self._exit_timeout = None
self._exit_reason = None
def _set_exit_timeout(self, timeout, reason):
"""Set a timeout for the remainder of the session, along with an exception to raise.
which is implemented by NailgunProtocol.
This method may be called by a signal handler to set a timeout for the remainder of the
session. If the session completes before the timeout does, the exception in `reason` is
raised. Otherwise, `NailgunProtocol.ProcessStreamTimeout` is raised.
:param float timeout: The length of time to time out, in seconds.
:param Exception reason: The exception to raise if the session completes before the timeout
occurs.
"""
self._exit_timeout_start_time = time.time()
self._exit_timeout = timeout
self._exit_reason = reason
def maybe_timeout_options(self):
"""Implements the NailgunProtocol.TimeoutProvider interface."""
if self._exit_timeout_start_time:
return NailgunProtocol.TimeoutOptions(self._exit_timeout_start_time, self._exit_timeout)
else:
return None
def _maybe_start_input_writer(self):
if self._input_writer and not self._input_writer.is_alive():
self._input_writer.start()
def _maybe_stop_input_writer(self):
if self._input_writer and self._input_writer.is_alive():
self._input_writer.stop()
self._input_writer.join()
def _write_flush(self, fd, payload=None):
"""Write a payload to a given fd (if provided) and flush the fd."""
try:
if payload:
fd.write(ensure_binary(payload))
fd.flush()
except (IOError, OSError) as e:
# If a `Broken Pipe` is encountered during a stdio fd write, we're headless - bail.
if e.errno == errno.EPIPE and self._exit_on_broken_pipe:
sys.exit()
# Otherwise, re-raise.
raise
class ExitTimedOut(Exception):
"""Raised when a timeout for the remote client exit was breached."""
def _process_session(self):
"""Process the outputs of the nailgun session.
:raises: :class:`NailgunProtocol.ProcessStreamTimeout` if a timeout set from a signal handler
with .set_exit_timeout() completes.
:raises: :class:`Exception` if the session completes before the timeout, the `reason` argument
to .set_exit_timeout() will be raised."""
try:
for chunk_type, payload in self.iter_chunks(self._sock, return_bytes=True,
timeout_object=self):
# TODO(#6579): assert that we have at this point received all the chunk types in
# ChunkType.REQUEST_TYPES, then require PID and PGRP (exactly once?), and then allow any of
# ChunkType.EXECUTION_TYPES.
if chunk_type == ChunkType.STDOUT:
self._write_flush(self._stdout, payload)
elif chunk_type == ChunkType.STDERR:
self._write_flush(self._stderr, payload)
elif chunk_type == ChunkType.EXIT:
self._write_flush(self._stdout)
self._write_flush(self._stderr)
return int(payload)
elif chunk_type == ChunkType.PID:
self.remote_pid = int(payload)
self.remote_process_cmdline = psutil.Process(self.remote_pid).cmdline()
if self._remote_pid_callback:
self._remote_pid_callback(self.remote_pid)
elif chunk_type == ChunkType.PGRP:
self.remote_pgrp = int(payload)
if self._remote_pgrp_callback:
self._remote_pgrp_callback(self.remote_pgrp)
elif chunk_type == ChunkType.START_READING_INPUT:
self._maybe_start_input_writer()
else:
raise self.ProtocolError('received unexpected chunk {} -> {}'.format(chunk_type, payload))
except NailgunProtocol.ProcessStreamTimeout as e:
assert(self.remote_pid is not None)
# NB: We overwrite the process title in the pantsd-runner process, which causes it to have an
# argv with lots of empty spaces for some reason. We filter those out and pretty-print the
# rest here.
filtered_remote_cmdline = safe_shlex_join(
arg for arg in self.remote_process_cmdline if arg != '')
logger.warning(
"timed out when attempting to gracefully shut down the remote client executing \"{}\". "
"sending SIGKILL to the remote client at pid: {}. message: {}"
.format(filtered_remote_cmdline, self.remote_pid, e))
finally:
# Bad chunk types received from the server can throw NailgunProtocol.ProtocolError in
# NailgunProtocol.iter_chunks(). This ensures the NailgunStreamWriter is always stopped.
self._maybe_stop_input_writer()
# If an asynchronous error was set at any point (such as in a signal handler), we want to make
# sure we clean up the remote process before exiting with error.
if self._exit_reason:
if self.remote_pgrp:
safe_kill(self.remote_pgrp, signal.SIGKILL)
if self.remote_pid:
safe_kill(self.remote_pid, signal.SIGKILL)
raise self._exit_reason
def execute(self, working_dir, main_class, *arguments, **environment):
# Send the nailgun request.
self.send_request(self._sock, working_dir, main_class, *arguments, **environment)
# Process the remainder of the nailgun session.
return self._process_session()
class NailgunClient(object):
"""A python nailgun client (see http://martiansoftware.com/nailgun for more info)."""
class NailgunError(Exception):
"""Indicates an error interacting with a nailgun server."""
DESCRIPTION = 'Problem talking to nailgun server'
_MSG_FMT = """\
{description} (address: {address}, remote_pid={pid}, remote_pgrp={pgrp}): {wrapped_exc!r}\
"""
def __init__(self, address, pid, pgrp, wrapped_exc, traceback):
self.address = address
self.pid = pid
self.pgrp = pgrp
self.wrapped_exc = wrapped_exc
self.traceback = traceback
# TODO: these should be ensured to be non-None in NailgunClientSession!
if self.pid is not None:
pid_msg = str(self.pid)
else:
pid_msg = '<remote PID chunk not yet received!>'
if self.pgrp is not None:
pgrp_msg = str(self.pgrp)
else:
pgrp_msg = '<remote PGRP chunk not yet received!>'
msg = self._MSG_FMT.format(
description=self.DESCRIPTION,
address=self.address,
pid=pid_msg,
pgrp=pgrp_msg,
wrapped_exc=self.wrapped_exc)
super(NailgunClient.NailgunError, self).__init__(msg, self.wrapped_exc)
class NailgunConnectionError(NailgunError):
"""Indicates an error upon initial connect to the nailgun server."""
DESCRIPTION = 'Problem connecting to nailgun server'
class NailgunExecutionError(NailgunError):
"""Indicates an error upon initial command execution on the nailgun server."""
DESCRIPTION = 'Problem executing command on nailgun server'
# For backwards compatibility with nails expecting the ng c client special env vars.
ENV_DEFAULTS = dict(NAILGUN_FILESEPARATOR=os.sep, NAILGUN_PATHSEPARATOR=os.pathsep)
DEFAULT_NG_HOST = '127.0.0.1'
DEFAULT_NG_PORT = 2113
def __init__(self, host=DEFAULT_NG_HOST, port=DEFAULT_NG_PORT, ins=sys.stdin, out=None, err=None,
workdir=None, exit_on_broken_pipe=False, metadata_base_dir=None):
"""Creates a nailgun client that can be used to issue zero or more nailgun commands.
:param string host: the nailgun server to contact (defaults to '127.0.0.1')
:param int port: the port the nailgun server is listening on (defaults to the default nailgun
port: 2113)
:param file ins: a file to read command standard input from (defaults to stdin) - can be None
in which case no input is read
:param file out: a stream to write command standard output to (defaults to stdout)
:param file err: a stream to write command standard error to (defaults to stderr)
:param string workdir: the default working directory for all nailgun commands (defaults to CWD)
:param bool exit_on_broken_pipe: whether or not to exit when `Broken Pipe` errors are
encountered
:param string metadata_base_dir: If a PID and PGRP are received from the server (only for
pailgun connections), a file with the remote pid will be
written under this directory. For non-pailgun connections this
may be None.
"""
self._host = host
self._port = port
self._address = (host, port)
self._address_string = ':'.join(str(i) for i in self._address)
self._stdin = ins
self._stdout = out or (sys.stdout.buffer if PY3 else sys.stdout)
self._stderr = err or (sys.stderr.buffer if PY3 else sys.stderr)
self._workdir = workdir or os.path.abspath(os.path.curdir)
self._exit_on_broken_pipe = exit_on_broken_pipe
self._metadata_base_dir = metadata_base_dir
# Mutable session state.
self._session = None
self._current_remote_pid = None
self._current_remote_pgrp = None
def _get_remote_pid_file_path(self, pid):
return os.path.join(
self._metadata_base_dir,
'nailgun-client',
str(pid))
# TODO(#6579): this should be done within a contextmanager for RAII!
def _maybe_write_pid_file(self):
if self._current_remote_pid and self._current_remote_pgrp:
remote_pid_file_path = self._get_remote_pid_file_path(os.getpid())
safe_file_dump(remote_pid_file_path, str(self._current_remote_pid), mode='w')
def _receive_remote_pid(self, pid):
self._current_remote_pid = pid
self._maybe_write_pid_file()
def _receive_remote_pgrp(self, pgrp):
self._current_remote_pgrp = pgrp
self._maybe_write_pid_file()
def _maybe_last_pid(self):
return self._current_remote_pid
def _maybe_last_pgrp(self):
return self._current_remote_pgrp
def try_connect(self):
"""Creates a socket, connects it to the nailgun and returns the connected socket.
:returns: a connected `socket.socket`.
:raises: `NailgunClient.NailgunConnectionError` on failure to connect.
"""
sock = RecvBufferedSocket(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
try:
sock.connect(self._address)
except (socket.error, socket.gaierror) as e:
logger.debug('Encountered socket exception {!r} when attempting connect to nailgun'.format(e))
sock.close()
raise self.NailgunConnectionError(
address=self._address_string,
pid=self._maybe_last_pid(),
pgrp=self._maybe_last_pgrp(),
wrapped_exc=e,
traceback=sys.exc_info()[2],
)
else:
return sock
def set_exit_timeout(self, timeout, reason):
"""Expose the inner session object's exit timeout setter."""
self._session._set_exit_timeout(timeout, reason)
# TODO(#6579): make all invocations of this method instead require that the process is alive via
# the result of .remote_client_status() before sending a signal! safe_kill() should probably be
# removed as well.
def maybe_send_signal(self, signum, include_pgrp=True):
"""Send the signal `signum` send if the PID and/or PGRP chunks have been received.
No error is raised if the pid or pgrp are None or point to an already-dead process.
"""
remote_pid = self._maybe_last_pid()
if remote_pid is not None:
safe_kill(remote_pid, signum)
if include_pgrp:
remote_pgrp = self._maybe_last_pgrp()
if remote_pgrp:
safe_kill(remote_pgrp, signum)
def execute(self, main_class, cwd=None, *args, **environment):
"""Executes the given main_class with any supplied args in the given environment.
:param string main_class: the fully qualified class name of the main entrypoint
:param string cwd: Set the working directory for this command
:param list args: any arguments to pass to the main entrypoint
:param dict environment: an env mapping made available to native nails via the nail context
:returns: the exit code of the main_class.
:raises: :class:`NailgunClient.NailgunError` if there was an error during execution.
"""
environment = dict(**environment)
environment.update(self.ENV_DEFAULTS)
cwd = cwd or self._workdir
sock = self.try_connect()
# TODO(#6579): NailgunClientSession currently requires callbacks because it can't depend on
# having received these chunks, so we need to avoid clobbering these fields until we initialize
# a new session.
self._current_remote_pid = None
self._current_remote_pgrp = None
self._session = NailgunClientSession(
sock=sock,
in_file=self._stdin,
out_file=self._stdout,
err_file=self._stderr,
exit_on_broken_pipe=self._exit_on_broken_pipe,
remote_pid_callback=self._receive_remote_pid,
remote_pgrp_callback=self._receive_remote_pgrp)
try:
return self._session.execute(cwd, main_class, *args, **environment)
except socket.error as e:
raise self.NailgunError(
address=self._address_string,
pid=self._maybe_last_pid(),
pgrp=self._maybe_last_pgrp(),
wrapped_exc=e,
traceback=sys.exc_info()[2]
)
except NailgunProtocol.ProtocolError as e:
raise self.NailgunError(
address=self._address_string,
pid=self._maybe_last_pid(),
pgrp=self._maybe_last_pgrp(),
wrapped_exc=e,
traceback=sys.exc_info()[2]
)
finally:
sock.close()
self._session = None
def __repr__(self):
return 'NailgunClient(host={!r}, port={!r}, workdir={!r})'.format(self._host,
self._port,
self._workdir)