Skip to content

Commit

Permalink
Thread-local stdio handling to support concurrent pantsd clients (#11536
Browse files Browse the repository at this point in the history
)

### Problem

In order to support concurrent `pantsd` clients, work spawned by each client (even into background threads) must interact with the relevant `stdio` file handles. And when a client disconnects, spawned work should be able to continue to log to the `pantsd.log`.

### Solution

Extract the thread/task-local aspects of our `logging` crate into a `stdio` crate that provides:
1) a `Console`-aware `logging` destination
2) exclusive access to a `Console` while a UI or `InteractiveProcess` is running
3) Python-level `sys.std*` replacements

### Result

No user-visible impact, but one of the largest remaining blockers for #7654 is removed.

[ci skip-build-wheels]
  • Loading branch information
stuhood committed Mar 1, 2021
1 parent b2fbce0 commit 87f2261
Show file tree
Hide file tree
Showing 45 changed files with 1,434 additions and 1,015 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from pants.engine.fs import DigestContents, FileContent
from pants.engine.process import InteractiveRunner
from pants.testutil.python_interpreter_selection import skip_unless_python27_and_python3_present
from pants.testutil.rule_runner import QueryRule, RuleRunner
from pants.testutil.rule_runner import QueryRule, RuleRunner, mock_console


@pytest.fixture
Expand Down Expand Up @@ -170,8 +170,9 @@ def run_pytest(
test_result = rule_runner.request(TestResult, inputs)
debug_request = rule_runner.request(TestDebugRequest, inputs)
if debug_request.process is not None:
debug_result = InteractiveRunner(rule_runner.scheduler).run(debug_request.process)
assert test_result.exit_code == debug_result.exit_code
with mock_console(rule_runner.options_bootstrapper):
debug_result = InteractiveRunner(rule_runner.scheduler).run(debug_request.process)
assert test_result.exit_code == debug_result.exit_code
return test_result


Expand Down
139 changes: 56 additions & 83 deletions src/python/pants/bin/daemon_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,12 @@

from pants.base.exiter import PANTS_FAILED_EXIT_CODE, ExitCode
from pants.bin.local_pants_runner import LocalPantsRunner
from pants.engine.internals.native import Native, RawFdRunner
from pants.engine.internals.native import RawFdRunner
from pants.engine.internals.native_engine import PySessionCancellationLatch
from pants.init.logging import (
clear_logging_handlers,
get_logging_handlers,
set_logging_handlers,
setup_logging,
)
from pants.init.logging import stdio_destination
from pants.option.options_bootstrapper import OptionsBootstrapper
from pants.pantsd.pants_daemon_core import PantsDaemonCore
from pants.util.contextutil import argv_as, hermetic_environment_as, stdio_as
from pants.util.contextutil import argv_as, hermetic_environment_as

logger = logging.getLogger(__name__)

Expand All @@ -39,22 +34,21 @@ def __init__(self, core: PantsDaemonCore) -> None:
self._run_lock = Lock()

@staticmethod
def _send_stderr(stderr_fd: int, msg: str) -> None:
def _send_stderr(stderr_fileno: int, msg: str) -> None:
"""Used to send stderr on a raw filehandle _before_ stdio replacement.
After stdio replacement has happened via `stdio_as` (which mutates sys.std*, and thus cannot
happen until the request lock has been acquired), sys.std* should be used directly.
TODO: This method will be removed as part of #7654.
"""
with os.fdopen(stderr_fd, mode="w", closefd=False) as stderr:
with os.fdopen(stderr_fileno, mode="w", closefd=False) as stderr:
print(msg, file=stderr, flush=True)

@contextmanager
def _one_run_at_a_time(
self, stderr_fd: int, cancellation_latch: PySessionCancellationLatch, timeout: float
self, stderr_fileno: int, cancellation_latch: PySessionCancellationLatch, timeout: float
):
"""Acquires exclusive access within the daemon.
Periodically prints a message on the given stderr_fd while exclusive access cannot be
Periodically prints a message on the given stderr_fileno while exclusive access cannot be
acquired.
TODO: This method will be removed as part of #7654, so it currently polls the lock and
Expand All @@ -76,7 +70,7 @@ def should_keep_polling(now):
# If we don't acquire immediately, send an explanation.
length = "forever" if should_poll_forever else "up to {} seconds".format(timeout)
self._send_stderr(
stderr_fd,
stderr_fileno,
f"Another pants invocation is running. Will wait {length} for it to finish before giving up.\n"
"If you don't want to wait for the first run to finish, please press Ctrl-C and run "
"this command with PANTS_CONCURRENT=True in the environment.\n",
Expand All @@ -92,7 +86,7 @@ def should_keep_polling(now):
elif should_keep_polling(now):
if now > render_deadline:
self._send_stderr(
stderr_fd,
stderr_fileno,
f"Waiting for invocation to finish (waited for {int(now - start)}s so far)...\n",
)
render_deadline = now + render_timeout
Expand All @@ -102,27 +96,6 @@ def should_keep_polling(now):
"Timed out while waiting for another pants invocation to finish."
)

@contextmanager
def _stderr_logging(self, global_bootstrap_options):
"""Temporarily replaces existing handlers (ie, the pantsd handler) with a stderr handler.
In the context of pantsd, there will be an existing handler for the pantsd log, which we
temporarily replace. Making them additive would cause per-run logs to go to pantsd, which
we don't want.
TODO: It would be good to handle logging destinations entirely via the threadlocal state
rather than via handler mutations.
"""
handlers = get_logging_handlers()
try:
clear_logging_handlers()
Native().override_thread_logging_destination_to_just_stderr()
setup_logging(global_bootstrap_options, stderr_logging=True)
yield
finally:
Native().override_thread_logging_destination_to_just_pantsd()
set_logging_handlers(handlers)

def single_daemonized_run(
self, working_dir: str, cancellation_latch: PySessionCancellationLatch
) -> ExitCode:
Expand All @@ -133,39 +106,38 @@ def single_daemonized_run(
environment.
"""

# Capture the client's start time, which we propagate here in order to get an accurate
# view of total time.
env_start_time = os.environ.get("PANTSD_RUNTRACKER_CLIENT_START_TIME", None)
start_time = float(env_start_time) if env_start_time else time.time()

# Clear global mutable state before entering `LocalPantsRunner`. Note that we use
# `sys.argv` and `os.environ`, since they have been mutated to maintain the illusion
# of a local run: once we allow for concurrent runs, this information should be
# propagated down from the caller.
# see https://github.com/pantsbuild/pants/issues/7654
options_bootstrapper = OptionsBootstrapper.create(
env=os.environ, args=sys.argv, allow_pantsrc=True
)
global_bootstrap_options = options_bootstrapper.bootstrap_options.for_global_scope()

# Run using the pre-warmed Session.
with self._stderr_logging(global_bootstrap_options):
try:
scheduler, options_initializer = self._core.prepare(options_bootstrapper)
runner = LocalPantsRunner.create(
os.environ,
options_bootstrapper,
scheduler=scheduler,
options_initializer=options_initializer,
cancellation_latch=cancellation_latch,
)
return runner.run(start_time)
except Exception as e:
logger.exception(e)
return PANTS_FAILED_EXIT_CODE
except KeyboardInterrupt:
print("Interrupted by user.\n", file=sys.stderr)
return PANTS_FAILED_EXIT_CODE
try:
logger.debug("Connected to pantsd")
# Capture the client's start time, which we propagate here in order to get an accurate
# view of total time.
env_start_time = os.environ.get("PANTSD_RUNTRACKER_CLIENT_START_TIME", None)
start_time = float(env_start_time) if env_start_time else time.time()

# Clear global mutable state before entering `LocalPantsRunner`. Note that we use
# `sys.argv` and `os.environ`, since they have been mutated to maintain the illusion
# of a local run: once we allow for concurrent runs, this information should be
# propagated down from the caller.
# see https://github.com/pantsbuild/pants/issues/7654
options_bootstrapper = OptionsBootstrapper.create(
env=os.environ, args=sys.argv, allow_pantsrc=True
)

# Run using the pre-warmed Session.
scheduler, options_initializer = self._core.prepare(options_bootstrapper)
runner = LocalPantsRunner.create(
os.environ,
options_bootstrapper,
scheduler=scheduler,
options_initializer=options_initializer,
cancellation_latch=cancellation_latch,
)
return runner.run(start_time)
except Exception as e:
logger.exception(e)
return PANTS_FAILED_EXIT_CODE
except KeyboardInterrupt:
print("Interrupted by user.\n", file=sys.stderr)
return PANTS_FAILED_EXIT_CODE

def __call__(
self,
Expand All @@ -174,27 +146,28 @@ def __call__(
env: Dict[str, str],
working_directory: bytes,
cancellation_latch: PySessionCancellationLatch,
stdin_fd: int,
stdout_fd: int,
stderr_fd: int,
stdin_fileno: int,
stdout_fileno: int,
stderr_fileno: int,
) -> ExitCode:
request_timeout = float(env.get("PANTSD_REQUEST_TIMEOUT_LIMIT", -1))
# NB: Order matters: we acquire a lock before mutating either `sys.std*`, `os.environ`, etc.
with self._one_run_at_a_time(
stderr_fd,
stderr_fileno,
cancellation_latch=cancellation_latch,
timeout=request_timeout,
), stdio_as(
stdin_fd=stdin_fd, stdout_fd=stdout_fd, stderr_fd=stderr_fd
), hermetic_environment_as(
**env
), argv_as(
(command,) + args
):
# NB: Run implements exception handling, so only the most primitive errors will escape
# this function, where they will be logged to the pantsd.log by the server.
# NB: `single_daemonized_run` implements exception handling, so only the most primitive
# errors will escape this function, where they will be logged by the server.
logger.info(f"handling request: `{' '.join(args)}`")
try:
return self.single_daemonized_run(working_directory.decode(), cancellation_latch)
with stdio_destination(
stdin_fileno=stdin_fileno,
stdout_fileno=stdout_fileno,
stderr_fileno=stderr_fileno,
), hermetic_environment_as(**env), argv_as((command,) + args):
return self.single_daemonized_run(
working_directory.decode(), cancellation_latch
)
finally:
logger.info(f"request completed: `{' '.join(args)}`")
48 changes: 29 additions & 19 deletions src/python/pants/bin/pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@

import logging
import os
import sys
import warnings
from dataclasses import dataclass
from typing import List, Mapping

from pants.base.exception_sink import ExceptionSink
from pants.base.exiter import ExitCode
from pants.bin.remote_pants_runner import RemotePantsRunner
from pants.init.logging import setup_logging
from pants.init.logging import initialize_stdio, stdio_destination
from pants.init.util import init_workdir
from pants.option.option_value_container import OptionValueContainer
from pants.option.options_bootstrapper import OptionsBootstrapper
Expand Down Expand Up @@ -69,21 +70,30 @@ def run(self, start_time: float) -> ExitCode:

# We enable logging here, and everything before it will be routed through regular
# Python logging.
setup_logging(global_bootstrap_options, stderr_logging=True)

if self._should_run_with_pantsd(global_bootstrap_options):
try:
remote_runner = RemotePantsRunner(self.args, self.env, options_bootstrapper)
return remote_runner.run()
except RemotePantsRunner.Fallback as e:
logger.warning(f"Client exception: {e!r}, falling back to non-daemon mode")

# N.B. Inlining this import speeds up the python thin client run by about 100ms.
from pants.bin.local_pants_runner import LocalPantsRunner

# We only install signal handling via ExceptionSink if the run will execute in this process.
ExceptionSink.install(
log_location=init_workdir(global_bootstrap_options), pantsd_instance=False
)
runner = LocalPantsRunner.create(env=self.env, options_bootstrapper=options_bootstrapper)
return runner.run(start_time)
stdin_fileno = sys.stdin.fileno()
stdout_fileno = sys.stdout.fileno()
stderr_fileno = sys.stderr.fileno()
with initialize_stdio(global_bootstrap_options), stdio_destination(
stdin_fileno=stdin_fileno,
stdout_fileno=stdout_fileno,
stderr_fileno=stderr_fileno,
):

if self._should_run_with_pantsd(global_bootstrap_options):
try:
remote_runner = RemotePantsRunner(self.args, self.env, options_bootstrapper)
return remote_runner.run()
except RemotePantsRunner.Fallback as e:
logger.warning(f"Client exception: {e!r}, falling back to non-daemon mode")

# N.B. Inlining this import speeds up the python thin client run by about 100ms.
from pants.bin.local_pants_runner import LocalPantsRunner

# We only install signal handling via ExceptionSink if the run will execute in this process.
ExceptionSink.install(
log_location=init_workdir(global_bootstrap_options), pantsd_instance=False
)
runner = LocalPantsRunner.create(
env=self.env, options_bootstrapper=options_bootstrapper
)
return runner.run(start_time)
2 changes: 1 addition & 1 deletion src/python/pants/bin/remote_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def _connect_and_execute(self, pantsd_handle: PantsDaemonClient.Handle) -> ExitC
executor = PyExecutor(*GlobalOptions.compute_executor_arguments(global_options))

# Merge the nailgun TTY capability environment variables with the passed environment dict.
ng_env = NailgunProtocol.ttynames_to_env(sys.stdin, sys.stdout.buffer, sys.stderr.buffer)
ng_env = NailgunProtocol.ttynames_to_env(sys.stdin, sys.stdout, sys.stderr)
modified_env = {
**self._env,
**ng_env,
Expand Down
54 changes: 28 additions & 26 deletions src/python/pants/core/goals/repl_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pants.core.goals.repl import Repl
from pants.core.goals.repl import rules as repl_rules
from pants.engine.process import Process
from pants.testutil.rule_runner import QueryRule, RuleRunner
from pants.testutil.rule_runner import QueryRule, RuleRunner, mock_console


@pytest.fixture
Expand All @@ -35,41 +35,43 @@ def setup_sources(rule_runner: RuleRunner) -> None:


def test_repl_with_targets(rule_runner: RuleRunner) -> None:
# TODO(#9108): A mock InteractiveRunner that allows us to actually run code in
# the repl and verify that, e.g., the generated protobuf code is available.
# Right now this test prepares for that by including generated code, but cannot
# actually verify it.
# TODO(#9108): Expand `mock_console` to allow for providing input for the repl to verify
# that, e.g., the generated protobuf code is available. Right now this test prepares for
# that by including generated code, but cannot actually verify it.
setup_sources(rule_runner)
result = rule_runner.run_goal_rule(
Repl,
global_args=[
"--backend-packages=pants.backend.python",
"--backend-packages=pants.backend.codegen.protobuf.python",
],
args=["src/python/lib.py"],
)
with mock_console(rule_runner.options_bootstrapper):
result = rule_runner.run_goal_rule(
Repl,
global_args=[
"--backend-packages=pants.backend.python",
"--backend-packages=pants.backend.codegen.protobuf.python",
],
args=["src/python/lib.py"],
)
assert result.exit_code == 0


def test_repl_ipython(rule_runner: RuleRunner) -> None:
setup_sources(rule_runner)
result = rule_runner.run_goal_rule(
Repl,
global_args=[
"--backend-packages=pants.backend.python",
"--backend-packages=pants.backend.codegen.protobuf.python",
],
args=["--shell=ipython", "src/python/lib.py"],
)
with mock_console(rule_runner.options_bootstrapper):
result = rule_runner.run_goal_rule(
Repl,
global_args=[
"--backend-packages=pants.backend.python",
"--backend-packages=pants.backend.codegen.protobuf.python",
],
args=["--shell=ipython", "src/python/lib.py"],
)
assert result.exit_code == 0


def test_repl_bogus_repl_name(rule_runner: RuleRunner) -> None:
setup_sources(rule_runner)
result = rule_runner.run_goal_rule(
Repl,
global_args=["--backend-packages=pants.backend.python"],
args=["--shell=bogus-repl", "src/python/lib.py"],
)
with mock_console(rule_runner.options_bootstrapper):
result = rule_runner.run_goal_rule(
Repl,
global_args=["--backend-packages=pants.backend.python"],
args=["--shell=bogus-repl", "src/python/lib.py"],
)
assert result.exit_code == -1
assert "'bogus-repl' is not a registered REPL. Available REPLs" in result.stderr
Loading

0 comments on commit 87f2261

Please sign in to comment.