Skip to content

Commit

Permalink
Force sequential runs under pantsd (#7781)
Browse files Browse the repository at this point in the history
## Problem

It's not currently safe to have multiple parallel pants runs with the daemon enabled.
However, this was not enforced (context: #7751).

## Solution

- Implement a lock in `PailgunServer`, so that each request thread waits to acquire it (in `PailgunServer.process_request_thread`).
- Implement an option that is passed via the environment, to each request, to communicate how long should a request wait for the lock before timing out.

## Result

In the cases where a single pants run is running at a time, the UX is unchanged.
When a request is run, we have three cases:
- If we specify a positive timeout (or if we leave it by default), and the request finishes before we time out, the request waits for a bit and then runs:
```
$ ./pants --enable-pantsd --pantsd-run-start-timeout=20 list src/scala::
Another pants run was found, starting waiting for up to 20.0s for it to finish
Waiting for request to finish (1.0s total)...
Waiting for request to finish (2.0s total)...
Waiting for request to finish (3.0s total)...
Waiting for request to finish (4.0s total)...
Waiting for request to finish (5.0s total)...
Waiting for request to finish (6.0s total)...
Waiting for request to finish (7.0s total)...
Waiting for request to finish (8.0s total)...
Waiting for request to finish (9.0s total)...
src/scala/org/pantsbuild/zinc/cache:cache
src/scala/org/pantsbuild/zinc/analysis:analysis
src/scala/org/pantsbuild/zinc/options:options
src/scala/org/pantsbuild/zinc/compiler:compiler
src/scala/org/pantsbuild/zinc/extractor:extractor
src/scala/org/pantsbuild/zinc/scalautil:scalautil
src/scala/org/pantsbuild/zinc/bootstrapper:bootstrapper
src/scala/org/pantsbuild/zinc/util:util
```

- If we specify a timeout and hit it, the run will fail with a useful error:
```
$ ./pants --enable-pantsd --pantsd-run-start-timeout="0" list ::
Another pants run was found, starting waiting for up to 0.0s for it to finish
Traceback (most recent call last):
  File "/Users/bescobar/workspace/otherpants/src/python/pants/pantsd/pailgun_server.py", line 259, in process_request_thread
    with self.ensure_request_is_exclusive(environment, request):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/Users/bescobar/workspace/otherpants/src/python/pants/pantsd/pailgun_server.py", line 246, in ensure_request_is_exclusive
    raise ExclusiveRequestTimeout("Timed out while waiting for another pants run to finish")
pants.pantsd.pailgun_server.ExclusiveRequestTimeout: Timed out while waiting for another pants run to finish
```

- If we specify a **negative** timeout, the request will block _potentially_ forever:
```
$ ./pants --enable-pantsd --pantsd-run-start-timeout=-1 list src/scala::
Another pants run was found, starting waiting forever for it to finish
Waiting for request to finish (1.0s total)...
Waiting for request to finish (2.0s total)...
Waiting for request to finish (3.0s total)...
Waiting for request to finish (4.0s total)...
Waiting for request to finish (5.0s total)...
Waiting for request to finish (6.0s total)...
Waiting for request to finish (7.0s total)...
Waiting for request to finish (8.0s total)...
Waiting for request to finish (9.0s total)...
Waiting for request to finish (10.0s total)...
Waiting for request to finish (11.0s total)...
src/scala/org/pantsbuild/zinc/extractor:extractor
src/scala/org/pantsbuild/zinc/compiler:compiler
src/scala/org/pantsbuild/zinc/analysis:analysis
src/scala/org/pantsbuild/zinc/bootstrapper:bootstrapper
src/scala/org/pantsbuild/zinc/options:options
src/scala/org/pantsbuild/zinc/util:util
src/scala/org/pantsbuild/zinc/scalautil:scalautil
src/scala/org/pantsbuild/zinc/cache:cache
```
**All of the new output is to stderr**

Fixes #7751.
Commits should _mostly_ make sense independently.
  • Loading branch information
blorente authored and stuhood committed May 29, 2019
1 parent 7482c43 commit bd5e70d
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 8 deletions.
1 change: 1 addition & 0 deletions src/python/pants/bin/remote_pants_runner.py
Expand Up @@ -141,6 +141,7 @@ def _connect_and_execute(self, pantsd_handle):
ng_env = NailgunProtocol.isatty_to_env(self._stdin, self._stdout, self._stderr)
modified_env = combined_dict(self._env, ng_env)
modified_env['PANTSD_RUNTRACKER_CLIENT_START_TIME'] = str(self._start_time)
modified_env['PANTSD_REQUEST_TIMEOUT_LIMIT'] = str(self._bootstrap_options.for_global_scope().pantsd_timeout_when_multiple_invocations)

assert isinstance(port, int), 'port {} is not an integer!'.format(port)

Expand Down
10 changes: 10 additions & 0 deletions src/python/pants/option/global_options.py
Expand Up @@ -273,6 +273,16 @@ def register_bootstrap_options(cls, register):
help='Create a new pantsd server, and use it, and shut it down immediately after. '
'If pantsd is already running, it will shut it down and spawn a new instance (Beta)')

# NB: We really don't want this option to invalidate the daemon, because different clients might have
# different needs. For instance, an IDE might have a very long timeout because it only wants to refresh
# a project in the background, while a user might want a shorter timeout for interactivity.
register('--pantsd-timeout-when-multiple-invocations', advanced=True, type=float, default=60.0, daemon=False,
help='The maximum amount of time to wait for the invocation to start until '
'raising a timeout exception. '
'Because pantsd currently does not support parallel runs, '
'any prior running Pants command must be finished for the current one to start. '
'To never timeout, use the value -1.')

# These facilitate configuring the native engine.
register('--native-engine-visualize-to', advanced=True, default=None, type=dir_option, daemon=False,
help='A directory to write execution and rule graphs to as `dot` files. The contents '
Expand Down
120 changes: 114 additions & 6 deletions src/python/pants/pantsd/pailgun_server.py
Expand Up @@ -8,12 +8,14 @@
import socket
import threading
import traceback
from contextlib import contextmanager

from six.moves.socketserver import BaseRequestHandler, BaseServer, TCPServer, ThreadingMixIn

from pants.engine.native import Native
from pants.java.nailgun_protocol import NailgunProtocol
from pants.util.contextutil import maybe_profiled
from pants.util.memo import memoized
from pants.util.socket import RecvBufferedSocket, safe_select


Expand Down Expand Up @@ -50,18 +52,26 @@ def handle(self):
def handle_error(self, exc):
"""Main error handler entrypoint for subclasses."""

@memoized
def parsed_request(self):
return NailgunProtocol.parse_request(self.request)


class PailgunHandler(PailgunHandlerBase):
"""A nailgun protocol handler for use with forking, SocketServer-based servers."""

def _run_pants(self, sock, arguments, environment):
"""Execute a given run with a pants runner."""
# For the pants run, we want to log to stderr.
# TODO Might be worth to make contextmanagers for this?
Native().override_thread_logging_destination_to_just_stderr()
self.server.runner_factory(sock, arguments, environment).run()
Native().override_thread_logging_destination_to_just_pantsd()

def handle(self):
"""Request handler for a single Pailgun request."""
# Parse the Nailgun request portion.
_, _, arguments, environment = NailgunProtocol.parse_request(self.request)
_, _, arguments, environment = self.parsed_request()

# N.B. the first and second nailgun request arguments (working_dir and command) are currently
# ignored in favor of a get_buildroot() call within LocalPantsRunner.run() and an assumption
Expand Down Expand Up @@ -90,6 +100,48 @@ def handle_error(self, exc=None):
NailgunProtocol.send_exit_with_code(self.request, failure_code)


class ExclusiveRequestTimeout(Exception):
"""Represents a timeout while waiting for another request to complete."""


class PailgunHandleRequestLock(object):
"""Convenience lock to implement Lock.acquire(timeout), which is not available in Python 2."""
# TODO remove and replace for the py3 Lock() when we don't have to support py2 anymore.

def __init__(self):
self.cond = threading.Condition()
self.available = True

def acquire(self, timeout=0.0):
"""
Try to acquire the lock, blocking until the timeout is reached. Will return immediately if the lock is acquired.
:return True if the lock was aquired, False if the timeout was reached.
"""
self.cond.acquire()
if self.available:
self.available = False
self.cond.release()
return True
else:
self.cond.wait(timeout=timeout)
# We have the lock!
if not self.available:
self.cond.release()
return False
else:
self.available = False
self.cond.release()
return True

def release(self):
"""Release the lock."""
self.cond.acquire()
self.available = True
self.cond.notify()
self.cond.release()


class PailgunServer(ThreadingMixIn, TCPServer):
"""A pants nailgun server.
Expand Down Expand Up @@ -126,6 +178,8 @@ def __init__(self, server_address, runner_factory, lifecycle_lock, request_compl
self.allow_reuse_address = True # Allow quick reuse of TCP_WAIT sockets.
self.server_port = None # Set during server_bind() once the port is bound.
self.request_complete_callback = request_complete_callback
self.logger = logging.getLogger(__name__)
self.free_to_handle_request_lock = PailgunHandleRequestLock()

if bind_and_activate:
try:
Expand Down Expand Up @@ -178,19 +232,73 @@ def handle_request(self):
with self.lifecycle_lock():
self._handle_request_noblock()

def _should_poll_forever(self, timeout):
return timeout < 0

def _should_keep_polling(self, timeout, time_polled):
return self._should_poll_forever(timeout) or time_polled < timeout

@contextmanager
def ensure_request_is_exclusive(self, environment, request):
"""
Ensure that this is the only pants running.
We currently don't allow parallel pants runs, so this function blocks a request thread until
there are no more requests being handled.
"""
# TODO add `did_poll` to pantsd metrics

timeout = float(environment['PANTSD_REQUEST_TIMEOUT_LIMIT'])

@contextmanager
def yield_and_release(time_waited):
try:
self.logger.debug("request lock acquired {}.".format("on the first try" if time_waited == 0 else "in {} seconds".format(time_waited)))
yield
finally:
self.free_to_handle_request_lock.release()
self.logger.debug("released request lock.")

time_polled = 0.0
user_notification_interval = 1.0 # Stop polling to notify the user every second.
self.logger.debug("request {} is trying to aquire the request lock.".format(request))

# NB: Optimistically try to acquire the lock without blocking, in case we are the only request being handled.
# This could be merged into the `while` loop below, but separating this special case for logging helps.
if self.free_to_handle_request_lock.acquire(timeout=0):
with yield_and_release(time_polled):
yield
else:
# We have to wait for another request to finish being handled.
NailgunProtocol.send_stderr(request, "Another pants invocation is running. Will wait {} for it to finish before giving up.\n".format(
"forever" if self._should_poll_forever(timeout) else "up to {} seconds".format(timeout)
))
while not self.free_to_handle_request_lock.acquire(timeout=user_notification_interval):
time_polled += user_notification_interval
if self._should_keep_polling(timeout, time_polled):
NailgunProtocol.send_stderr(request, "Waiting for invocation to finish (waited for {}s so far)...\n".format(time_polled))
else: # We have timed out.
raise ExclusiveRequestTimeout("Timed out while waiting for another pants invocation to finish.")
with yield_and_release(time_polled):
yield

def process_request_thread(self, request, client_address):
"""Override of ThreadingMixIn.process_request_thread() that delegates to the request handler."""
# Instantiate the request handler.
Native().override_thread_logging_destination_to_just_stderr()
Native().override_thread_logging_destination_to_just_pantsd()
handler = self.RequestHandlerClass(request, client_address, self)
try:
# Attempt to handle a request with the handler.
handler.handle_request()
self.request_complete_callback()

_, _, _, environment = handler.parsed_request()

try:
with self.ensure_request_is_exclusive(environment, request):
# Attempt to handle a request with the handler.
handler.handle_request()
self.request_complete_callback()
except Exception as e:
# If that fails, (synchronously) handle the error with the error handler sans-fork.
try:
self.logger.error("Request {} errored with {}({})".format(request, type(e), e))
handler.handle_error(e)
finally:
# Shutdown the socket since we don't expect a fork() in the exception context.
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/pantsd/pants_daemon.py
Expand Up @@ -225,7 +225,7 @@ def _setup_services(build_root, bootstrap_options, legacy_graph_scheduler, watch
(bootstrap_options.pantsd_pailgun_host, bootstrap_options.pantsd_pailgun_port),
DaemonPantsRunner,
scheduler_service,
should_shutdown_after_run
should_shutdown_after_run,
)

store_gc_service = StoreGCService(legacy_graph_scheduler.scheduler)
Expand Down
111 changes: 110 additions & 1 deletion tests/python/pants_test/pantsd/test_pailgun_server.py
Expand Up @@ -8,6 +8,7 @@
import threading
import unittest
from contextlib import contextmanager
from queue import Queue
from socketserver import TCPServer

import mock
Expand All @@ -22,6 +23,10 @@
class TestPailgunServer(unittest.TestCase):
def setUp(self):
self.mock_handler_inst = mock.Mock()
# Add a fake environment for this to not timeout.
self.fake_environment = {"PANTSD_REQUEST_TIMEOUT_LIMIT": "-1"}
self.mock_handler_inst.parsed_request.return_value = (None, None, [], self.fake_environment)

self.mock_runner_factory = mock.Mock(side_effect=Exception('this should never be called'))
self.mock_handler_class = mock.Mock(return_value=self.mock_handler_inst)
self.lock = threading.RLock()
Expand All @@ -43,7 +48,7 @@ def after_request_callback():
runner_factory=self.mock_runner_factory,
handler_class=self.mock_handler_class,
lifecycle_lock=lock,
request_complete_callback=after_request_callback
request_complete_callback=after_request_callback,
)

@mock.patch.object(TCPServer, 'server_bind', **PATCH_OPTS)
Expand Down Expand Up @@ -78,6 +83,110 @@ def test_process_request_thread_error(self, mock_shutdown_request):
self.assertIs(self.mock_handler_inst.handle_error.called, True)
mock_shutdown_request.assert_called_once_with(self.server, mock_request)

def test_ensure_request_is_exclusive(self):
"""Launch many requests, assert that every one is trying to enter the critical section, and assert that only one is doing so at a time."""
self.threads_to_start = 10

# Queues are thread safe (https://docs.python.org/2/library/queue.html)
self.thread_errors = Queue()
def threaded_assert_equal(one, other, message):
try:
self.assertEqual(one, other, message)
except AssertionError as error:
self.thread_errors.put(error)

self.threads_running_cond = threading.Condition()
self.threads_running = 0
def handle_thread_tried_to_handle_request():
"""Mark a thread as started, and block until every thread has been marked as starting."""
self.threads_running_cond.acquire()
self.threads_running += 1
if self.threads_running == self.threads_to_start:
self.threads_running_cond.notify_all()
else:
while self.threads_running != self.threads_to_start:
self.threads_running_cond.wait()


threaded_assert_equal(self.threads_running, self.threads_to_start, "This thread is unblocked before all the threads had started.")
self.threads_running_cond.release()

def handle_thread_finished():
"""Mark a thread as finished, and block until there are no more threads running."""
self.threads_running_cond.acquire()
self.threads_running -= 1
print("Handle_thread_finished, threads_running are {}".format(self.threads_running))
if self.threads_running == 0:
self.threads_running_cond.notify_all()
else:
while self.threads_running != 0:
self.threads_running_cond.wait()

threaded_assert_equal(self.threads_running, 0, "handle_thread_finished exited when there still were threads running.")
self.threads_running_cond.release()

self.threads_handling_requests = 0
self.threads_handling_requests_lock = threading.Lock()

def handle_thread_starts_handling_request():
with self.threads_handling_requests_lock:
self.threads_handling_requests += 1
threaded_assert_equal(self.threads_handling_requests, 1, "A thread is already handling a request!")

def check_only_one_thread_is_handling_a_request():
"""Assert that there's only ever one thread inside the lock."""
with self.threads_handling_requests_lock:
threaded_assert_equal(self.threads_handling_requests, 1, "A thread is already handling a request!")

def handle_thread_finishing_handling_request():
"""Assert that I was the only thread handling a request."""
with self.threads_handling_requests_lock:
self.threads_handling_requests -= 1
threaded_assert_equal(self.threads_handling_requests, 0, "There were multiple threads handling a request when a thread finished")

# Wrap ensure_request_is_exclusive to notify when we acquire and release the lock.
def mock_ensure_request_is_exclusive(request_lock_under_test):
"""Wrap the lock under test. Every thread that calls this function has reached the critical section."""
@contextmanager
def wrapper(environment, request):
# Assert that all threads are trying to handle a request.
handle_thread_tried_to_handle_request()
with request_lock_under_test(environment, request):
try:
# Assert that only one is allowed to handle a request.
print("Thread has entered the request handling code.")
handle_thread_starts_handling_request()
check_only_one_thread_is_handling_a_request()
yield
check_only_one_thread_is_handling_a_request()
print("Thread has exited the request handling code.")
finally:
# Account for a thread finishing a request.
handle_thread_finishing_handling_request()
# Notify that a thread is shutting down.
handle_thread_finished()
# At this point, we have asserted that all threads are finished.
return wrapper

self.server.ensure_request_is_exclusive = mock_ensure_request_is_exclusive(self.server.ensure_request_is_exclusive)

# Create as many mock threads as needed. Lauch all of them, and wait for all of them to finish.
mock_request = mock.Mock()
def create_request_thread(port):
return threading.Thread(target = self.server.process_request_thread,
args = (mock_request, ('1.2.3.4', port)),
name="MockThread-{}".format(port))

threads = [create_request_thread(0) for _ in range(0, self.threads_to_start)]
for thread in threads:
thread.start()
self.assertTrue(self.thread_errors.empty(), "There were some errors in the threads:\n {}".format(self.thread_errors))

for thread in threads:
# If this fails because it times out, it's definitely a legitimate error.
thread.join(10)
self.assertTrue(self.thread_errors.empty(), "There were some errors in the threads:\n {}".format(self.thread_errors))


class TestPailgunHandler(unittest.TestCase):
def setUp(self):
Expand Down

0 comments on commit bd5e70d

Please sign in to comment.