Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pantsd] Implement auto-shutdown after runs #7341

Merged
merged 4 commits into from Mar 14, 2019
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -222,6 +222,13 @@ def register_bootstrap_options(cls, register):
register('--enable-pantsd', advanced=True, type=bool, default=False,
help='Enables use of the pants daemon (and implicitly, the v2 engine). (Beta)')

# Shutdown pantsd after the current run.
# This needs to be accessed at the same time as enable_pantsd,
# so we register it at bootstrap time.
register('--shutdown-pantsd-after-run', advanced=True, type=bool, default=False,
help='Create a new pantsd server, and use it, and shut it down immediately after. '
'If pantsd is already running, it will shut it down and spawn a new instance (Beta)')

# These facilitate configuring the native engine.
register('--native-engine-visualize-to', advanced=True, default=None, type=dir_option, daemon=False,
help='A directory to write execution and rule graphs to as `dot` files. The contents '
@@ -99,7 +99,7 @@ class PailgunServer(ThreadingMixIn, TCPServer):
# Override the ThreadingMixIn default, to minimize the chances of zombie pailgun processes.
daemon_threads = True

def __init__(self, server_address, runner_factory, lifecycle_lock,
def __init__(self, server_address, runner_factory, lifecycle_lock, request_complete_callback,

This comment has been minimized.

Copy link
@stuhood

stuhood Mar 12, 2019

Member

Worth expanding the docstring for this one I think.

handler_class=None, bind_and_activate=True):
"""Override of TCPServer.__init__().
@@ -111,6 +111,7 @@ def __init__(self, server_address, runner_factory, lifecycle_lock,
execution thread during handling. All pailgun request handling
will take place under care of this lock, which would be shared with
a `PailgunServer`-external lifecycle manager to guard teardown.
:param function request_complete_callback: A callback that will be called whenever a pailgun request is completed.
:param class handler_class: The request handler class to use for each request. (Optional)
:param bool bind_and_activate: If True, binds and activates networking at __init__ time.
(Optional)
@@ -122,6 +123,7 @@ def __init__(self, server_address, runner_factory, lifecycle_lock,
self.lifecycle_lock = lifecycle_lock
self.allow_reuse_address = True # Allow quick reuse of TCP_WAIT sockets.
self.server_port = None # Set during server_bind() once the port is bound.
self.request_complete_callback = request_complete_callback

if bind_and_activate:
try:
@@ -167,6 +169,8 @@ def process_request_thread(self, request, client_address):
try:
# Attempt to handle a request with the handler.
handler.handle_request()
self.request_complete_callback()

except Exception as e:
# If that fails, (synchronously) handle the error with the error handler sans-fork.
try:
@@ -181,6 +181,7 @@ def _setup_services(build_root, bootstrap_options, legacy_graph_scheduler, watch
:returns: A PantsServices instance.
"""
should_shutdown_after_run = bootstrap_options.shutdown_pantsd_after_run
fs_event_service = FSEventService(
watchman,
build_root,
@@ -208,7 +209,8 @@ def _setup_services(build_root, bootstrap_options, legacy_graph_scheduler, watch
pailgun_service = PailgunService(
(bootstrap_options.pantsd_pailgun_host, bootstrap_options.pantsd_pailgun_port),
DaemonPantsRunner,
scheduler_service
scheduler_service,
should_shutdown_after_run
)

store_gc_service = StoreGCService(legacy_graph_scheduler.scheduler)
@@ -441,6 +443,17 @@ def terminate(self, include_watchman=True):
if include_watchman:
self.watchman_launcher.terminate()

def needs_restart(self, option_fingerprint):
"""
Overrides ProcessManager.needs_restart, to account for the case where pantsd is running
but we want to shutdown after this run.
:param option_fingerprint: A fingeprint of the global bootstrap options.
:return: True if the daemon needs to restart.
"""
should_shutdown_after_run = self._bootstrap_options.for_global_scope().shutdown_pantsd_after_run
return super(PantsDaemon, self).needs_restart(option_fingerprint) or \
(self.is_alive() and should_shutdown_after_run)


def launch():
"""An external entrypoint that spawns a new pantsd instance."""
@@ -15,14 +15,15 @@
class PailgunService(PantsService):
"""A service that runs the Pailgun server."""

def __init__(self, bind_addr, runner_class, scheduler_service):
def __init__(self, bind_addr, runner_class, scheduler_service, shutdown_after_run):
"""
:param tuple bind_addr: The (hostname, port) tuple to bind the Pailgun server to.
:param class runner_class: The `PantsRunner` class to be used for Pailgun runs. Generally this
will be `DaemonPantsRunner`, but this decoupling avoids a cycle between the `pants.pantsd` and
`pants.bin` packages.
:param SchedulerService scheduler_service: The SchedulerService instance for access to the
resident scheduler.
:param bool shutdown_after_run: PailgunService should shut down after running the first request.
"""
super(PailgunService, self).__init__()
self._bind_addr = bind_addr
@@ -31,6 +32,7 @@ def __init__(self, bind_addr, runner_class, scheduler_service):

self._logger = logging.getLogger(__name__)
self._pailgun = None
self._shutdown_after_run = shutdown_after_run

@property
def pailgun(self):
@@ -42,6 +44,10 @@ def pailgun(self):
def pailgun_port(self):
return self.pailgun.server_port

def _request_complete_callback(self):
if self._shutdown_after_run:
self.terminate()

def _setup_pailgun(self):
"""Sets up a PailgunServer instance."""
# Constructs and returns a runnable PantsRunner.
@@ -62,7 +68,7 @@ def lifecycle_lock():
with self.services.lifecycle_lock:
yield

return PailgunServer(self._bind_addr, runner_factory, lifecycle_lock)
return PailgunServer(self._bind_addr, runner_factory, lifecycle_lock, self._request_complete_callback)

def run(self):
"""Main service entrypoint. Called via Thread.start() via PantsDaemon.run()."""
@@ -27,11 +27,20 @@ def setUp(self):
self.mock_target_roots_calculator = mock.Mock(side_effect=Exception('should not be called'))
self.service = PailgunService(bind_addr=(None, None),
runner_class=self.mock_runner_class,
scheduler_service=self.mock_scheduler_service)
scheduler_service=self.mock_scheduler_service,
shutdown_after_run=False)

@mock.patch.object(PailgunService, '_setup_pailgun', **PATCH_OPTS)
def test_pailgun_property_values(self, mock_setup):
fake_pailgun = FakePailgun()
mock_setup.return_value = fake_pailgun
self.assertIs(self.service.pailgun, fake_pailgun)
self.assertEqual(self.service.pailgun_port, 33333)

@mock.patch.object(PailgunService, 'terminate', **PATCH_OPTS)
def test_pailgun_service_closes_when_callback_is_called(self, mock_setup):
fake_pailgun = FakePailgun()
mock_setup.return_value = fake_pailgun
self.service._shutdown_after_run = True
self.service._request_complete_callback()
self.assertIs(self.service.terminate.called, True)
@@ -31,13 +31,19 @@ def lock():
with self.lock:
yield

self.after_request_callback_calls = 0

def after_request_callback():
self.after_request_callback_calls += 1

with mock.patch.object(PailgunServer, 'server_bind'), \
mock.patch.object(PailgunServer, 'server_activate'):
self.server = PailgunServer(
server_address=('0.0.0.0', 0),
runner_factory=self.mock_runner_factory,
handler_class=self.mock_handler_class,
lifecycle_lock=lock
lifecycle_lock=lock,
request_complete_callback=after_request_callback
)

@mock.patch.object(TCPServer, 'server_bind', **PATCH_OPTS)
@@ -56,6 +62,13 @@ def test_process_request_thread(self, mock_close_request):
self.assertIs(self.mock_handler_inst.handle_request.called, True)
mock_close_request.assert_called_once_with(self.server, mock_request)

@mock.patch.object(PailgunServer, 'close_request', **PATCH_OPTS)
def test_process_request_calls_callback(self, mock_close_request):
mock_request = mock.Mock()
self.server.process_request_thread(mock_request, ('1.2.3.4', 31338))
self.assertIs(self.mock_handler_inst.handle_request.called, True)
assert(self.after_request_callback_calls == 1)

@mock.patch.object(PailgunServer, 'shutdown_request', **PATCH_OPTS)
def test_process_request_thread_error(self, mock_shutdown_request):
mock_request = mock.Mock()
@@ -515,3 +515,21 @@ def test_pantsd_unicode_environment(self):
result = pantsd_run(['help'])
checker.assert_started()
self.assert_success(result)

def test_daemon_auto_shutdown_after_first_run(self):
config = {'GLOBAL': {'shutdown_pantsd_after_run': True}}
with self.pantsd_test_context(extra_config=config) as (workdir, config, checker):
wait_handle = self.run_pants_with_workdir_without_waiting(
['list'],
workdir,
config,
)

# TODO(#6574, #7330): We might have a new default timeout after these are resolved.
checker.assert_started(timeout=16)
pantsd_runner_processes = checker.runner_process_context.current_processes()
pants_run = wait_handle.join()
self.assert_success(pants_run)

for process in pantsd_runner_processes:
self.assertFalse(process.is_running())
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.