From 87f22614a3a97d93aaf570ba1c3eeefcbfdb8d5f Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Mon, 1 Mar 2021 15:53:15 -0800 Subject: [PATCH] Thread-local stdio handling to support concurrent pantsd clients (#11536) ### Problem In order to support concurrent `pantsd` clients, work spawned by each client (even into background threads) must interact with the relevant `stdio` file handles. And when a client disconnects, spawned work should be able to continue to log to the `pantsd.log`. ### Solution Extract the thread/task-local aspects of our `logging` crate into a `stdio` crate that provides: 1) a `Console`-aware `logging` destination 2) exclusive access to a `Console` while a UI or `InteractiveProcess` is running 3) Python-level `sys.std*` replacements ### Result No user-visible impact, but one of the largest remaining blockers for #7654 is removed. [ci skip-build-wheels] --- .../goals/pytest_runner_integration_test.py | 7 +- src/python/pants/bin/daemon_pants_runner.py | 139 +++--- src/python/pants/bin/pants_runner.py | 48 +- src/python/pants/bin/remote_pants_runner.py | 2 +- .../pants/core/goals/repl_integration_test.py | 54 +-- src/python/pants/core/goals/run_test.py | 72 ++- src/python/pants/core/goals/test_test.py | 108 ++--- src/python/pants/engine/console.py | 56 +-- src/python/pants/engine/internals/native.py | 47 +- .../pants/engine/internals/native_engine.pyi | 20 +- .../pants/engine/internals/scheduler.py | 4 + src/python/pants/help/BUILD | 2 +- src/python/pants/init/logging.py | 235 +++++----- src/python/pants/option/global_options.py | 11 +- src/python/pants/pantsd/pants_daemon.py | 43 +- src/python/pants/pantsd/pants_daemon_core.py | 6 +- .../pants/pantsd/service/pants_service.py | 8 +- .../pants/testutil/pants_integration_test.py | 4 +- src/python/pants/testutil/rule_runner.py | 68 ++- src/python/pants/util/contextutil.py | 58 --- src/python/pants/util/contextutil_test.py | 79 ---- src/rust/engine/Cargo.lock | 33 +- src/rust/engine/Cargo.toml | 6 + src/rust/engine/logging/Cargo.toml | 3 +- src/rust/engine/logging/src/lib.rs | 2 - src/rust/engine/logging/src/logger.rs | 226 +++------ src/rust/engine/nailgun/src/client.rs | 21 +- src/rust/engine/src/externs/interface.rs | 124 ++--- src/rust/engine/src/externs/mod.rs | 1 + src/rust/engine/src/externs/stdio.rs | 115 +++++ src/rust/engine/src/scheduler.rs | 26 ++ src/rust/engine/stdio/Cargo.toml | 11 + src/rust/engine/stdio/src/lib.rs | 439 ++++++++++++++++++ src/rust/engine/stdio/src/term.rs | 149 ++++++ src/rust/engine/task_executor/Cargo.toml | 2 +- src/rust/engine/task_executor/src/lib.rs | 12 +- src/rust/engine/ui/Cargo.toml | 8 +- src/rust/engine/ui/src/console_ui.rs | 36 +- src/rust/engine/watch/Cargo.toml | 1 - src/rust/engine/watch/src/lib.rs | 1 - tests/python/pants_test/init/test_logging.py | 88 ++-- .../native_engine_logging_integration_test.py | 7 +- .../pantsd/pantsd_integration_test.py | 13 +- .../pantsd/pantsd_integration_test_base.py | 8 +- .../pants_test/pantsd/test_pants_daemon.py | 46 -- 45 files changed, 1434 insertions(+), 1015 deletions(-) create mode 100644 src/rust/engine/src/externs/stdio.rs create mode 100644 src/rust/engine/stdio/Cargo.toml create mode 100644 src/rust/engine/stdio/src/lib.rs create mode 100644 src/rust/engine/stdio/src/term.rs delete mode 100644 tests/python/pants_test/pantsd/test_pants_daemon.py diff --git a/src/python/pants/backend/python/goals/pytest_runner_integration_test.py b/src/python/pants/backend/python/goals/pytest_runner_integration_test.py index cfc350a9879..fb15a6680b9 100644 --- a/src/python/pants/backend/python/goals/pytest_runner_integration_test.py +++ b/src/python/pants/backend/python/goals/pytest_runner_integration_test.py @@ -27,7 +27,7 @@ from pants.engine.fs import DigestContents, FileContent from pants.engine.process import InteractiveRunner from pants.testutil.python_interpreter_selection import skip_unless_python27_and_python3_present -from pants.testutil.rule_runner import QueryRule, RuleRunner +from pants.testutil.rule_runner import QueryRule, RuleRunner, mock_console @pytest.fixture @@ -170,8 +170,9 @@ def run_pytest( test_result = rule_runner.request(TestResult, inputs) debug_request = rule_runner.request(TestDebugRequest, inputs) if debug_request.process is not None: - debug_result = InteractiveRunner(rule_runner.scheduler).run(debug_request.process) - assert test_result.exit_code == debug_result.exit_code + with mock_console(rule_runner.options_bootstrapper): + debug_result = InteractiveRunner(rule_runner.scheduler).run(debug_request.process) + assert test_result.exit_code == debug_result.exit_code return test_result diff --git a/src/python/pants/bin/daemon_pants_runner.py b/src/python/pants/bin/daemon_pants_runner.py index be56aec0a20..dff4598676c 100644 --- a/src/python/pants/bin/daemon_pants_runner.py +++ b/src/python/pants/bin/daemon_pants_runner.py @@ -11,17 +11,12 @@ from pants.base.exiter import PANTS_FAILED_EXIT_CODE, ExitCode from pants.bin.local_pants_runner import LocalPantsRunner -from pants.engine.internals.native import Native, RawFdRunner +from pants.engine.internals.native import RawFdRunner from pants.engine.internals.native_engine import PySessionCancellationLatch -from pants.init.logging import ( - clear_logging_handlers, - get_logging_handlers, - set_logging_handlers, - setup_logging, -) +from pants.init.logging import stdio_destination from pants.option.options_bootstrapper import OptionsBootstrapper from pants.pantsd.pants_daemon_core import PantsDaemonCore -from pants.util.contextutil import argv_as, hermetic_environment_as, stdio_as +from pants.util.contextutil import argv_as, hermetic_environment_as logger = logging.getLogger(__name__) @@ -39,22 +34,21 @@ def __init__(self, core: PantsDaemonCore) -> None: self._run_lock = Lock() @staticmethod - def _send_stderr(stderr_fd: int, msg: str) -> None: + def _send_stderr(stderr_fileno: int, msg: str) -> None: """Used to send stderr on a raw filehandle _before_ stdio replacement. - After stdio replacement has happened via `stdio_as` (which mutates sys.std*, and thus cannot - happen until the request lock has been acquired), sys.std* should be used directly. + TODO: This method will be removed as part of #7654. """ - with os.fdopen(stderr_fd, mode="w", closefd=False) as stderr: + with os.fdopen(stderr_fileno, mode="w", closefd=False) as stderr: print(msg, file=stderr, flush=True) @contextmanager def _one_run_at_a_time( - self, stderr_fd: int, cancellation_latch: PySessionCancellationLatch, timeout: float + self, stderr_fileno: int, cancellation_latch: PySessionCancellationLatch, timeout: float ): """Acquires exclusive access within the daemon. - Periodically prints a message on the given stderr_fd while exclusive access cannot be + Periodically prints a message on the given stderr_fileno while exclusive access cannot be acquired. TODO: This method will be removed as part of #7654, so it currently polls the lock and @@ -76,7 +70,7 @@ def should_keep_polling(now): # If we don't acquire immediately, send an explanation. length = "forever" if should_poll_forever else "up to {} seconds".format(timeout) self._send_stderr( - stderr_fd, + stderr_fileno, f"Another pants invocation is running. Will wait {length} for it to finish before giving up.\n" "If you don't want to wait for the first run to finish, please press Ctrl-C and run " "this command with PANTS_CONCURRENT=True in the environment.\n", @@ -92,7 +86,7 @@ def should_keep_polling(now): elif should_keep_polling(now): if now > render_deadline: self._send_stderr( - stderr_fd, + stderr_fileno, f"Waiting for invocation to finish (waited for {int(now - start)}s so far)...\n", ) render_deadline = now + render_timeout @@ -102,27 +96,6 @@ def should_keep_polling(now): "Timed out while waiting for another pants invocation to finish." ) - @contextmanager - def _stderr_logging(self, global_bootstrap_options): - """Temporarily replaces existing handlers (ie, the pantsd handler) with a stderr handler. - - In the context of pantsd, there will be an existing handler for the pantsd log, which we - temporarily replace. Making them additive would cause per-run logs to go to pantsd, which - we don't want. - - TODO: It would be good to handle logging destinations entirely via the threadlocal state - rather than via handler mutations. - """ - handlers = get_logging_handlers() - try: - clear_logging_handlers() - Native().override_thread_logging_destination_to_just_stderr() - setup_logging(global_bootstrap_options, stderr_logging=True) - yield - finally: - Native().override_thread_logging_destination_to_just_pantsd() - set_logging_handlers(handlers) - def single_daemonized_run( self, working_dir: str, cancellation_latch: PySessionCancellationLatch ) -> ExitCode: @@ -133,39 +106,38 @@ def single_daemonized_run( environment. """ - # Capture the client's start time, which we propagate here in order to get an accurate - # view of total time. - env_start_time = os.environ.get("PANTSD_RUNTRACKER_CLIENT_START_TIME", None) - start_time = float(env_start_time) if env_start_time else time.time() - - # Clear global mutable state before entering `LocalPantsRunner`. Note that we use - # `sys.argv` and `os.environ`, since they have been mutated to maintain the illusion - # of a local run: once we allow for concurrent runs, this information should be - # propagated down from the caller. - # see https://github.com/pantsbuild/pants/issues/7654 - options_bootstrapper = OptionsBootstrapper.create( - env=os.environ, args=sys.argv, allow_pantsrc=True - ) - global_bootstrap_options = options_bootstrapper.bootstrap_options.for_global_scope() - - # Run using the pre-warmed Session. - with self._stderr_logging(global_bootstrap_options): - try: - scheduler, options_initializer = self._core.prepare(options_bootstrapper) - runner = LocalPantsRunner.create( - os.environ, - options_bootstrapper, - scheduler=scheduler, - options_initializer=options_initializer, - cancellation_latch=cancellation_latch, - ) - return runner.run(start_time) - except Exception as e: - logger.exception(e) - return PANTS_FAILED_EXIT_CODE - except KeyboardInterrupt: - print("Interrupted by user.\n", file=sys.stderr) - return PANTS_FAILED_EXIT_CODE + try: + logger.debug("Connected to pantsd") + # Capture the client's start time, which we propagate here in order to get an accurate + # view of total time. + env_start_time = os.environ.get("PANTSD_RUNTRACKER_CLIENT_START_TIME", None) + start_time = float(env_start_time) if env_start_time else time.time() + + # Clear global mutable state before entering `LocalPantsRunner`. Note that we use + # `sys.argv` and `os.environ`, since they have been mutated to maintain the illusion + # of a local run: once we allow for concurrent runs, this information should be + # propagated down from the caller. + # see https://github.com/pantsbuild/pants/issues/7654 + options_bootstrapper = OptionsBootstrapper.create( + env=os.environ, args=sys.argv, allow_pantsrc=True + ) + + # Run using the pre-warmed Session. + scheduler, options_initializer = self._core.prepare(options_bootstrapper) + runner = LocalPantsRunner.create( + os.environ, + options_bootstrapper, + scheduler=scheduler, + options_initializer=options_initializer, + cancellation_latch=cancellation_latch, + ) + return runner.run(start_time) + except Exception as e: + logger.exception(e) + return PANTS_FAILED_EXIT_CODE + except KeyboardInterrupt: + print("Interrupted by user.\n", file=sys.stderr) + return PANTS_FAILED_EXIT_CODE def __call__( self, @@ -174,27 +146,28 @@ def __call__( env: Dict[str, str], working_directory: bytes, cancellation_latch: PySessionCancellationLatch, - stdin_fd: int, - stdout_fd: int, - stderr_fd: int, + stdin_fileno: int, + stdout_fileno: int, + stderr_fileno: int, ) -> ExitCode: request_timeout = float(env.get("PANTSD_REQUEST_TIMEOUT_LIMIT", -1)) # NB: Order matters: we acquire a lock before mutating either `sys.std*`, `os.environ`, etc. with self._one_run_at_a_time( - stderr_fd, + stderr_fileno, cancellation_latch=cancellation_latch, timeout=request_timeout, - ), stdio_as( - stdin_fd=stdin_fd, stdout_fd=stdout_fd, stderr_fd=stderr_fd - ), hermetic_environment_as( - **env - ), argv_as( - (command,) + args ): - # NB: Run implements exception handling, so only the most primitive errors will escape - # this function, where they will be logged to the pantsd.log by the server. + # NB: `single_daemonized_run` implements exception handling, so only the most primitive + # errors will escape this function, where they will be logged by the server. logger.info(f"handling request: `{' '.join(args)}`") try: - return self.single_daemonized_run(working_directory.decode(), cancellation_latch) + with stdio_destination( + stdin_fileno=stdin_fileno, + stdout_fileno=stdout_fileno, + stderr_fileno=stderr_fileno, + ), hermetic_environment_as(**env), argv_as((command,) + args): + return self.single_daemonized_run( + working_directory.decode(), cancellation_latch + ) finally: logger.info(f"request completed: `{' '.join(args)}`") diff --git a/src/python/pants/bin/pants_runner.py b/src/python/pants/bin/pants_runner.py index 16363129d40..c5c800b357d 100644 --- a/src/python/pants/bin/pants_runner.py +++ b/src/python/pants/bin/pants_runner.py @@ -3,6 +3,7 @@ import logging import os +import sys import warnings from dataclasses import dataclass from typing import List, Mapping @@ -10,7 +11,7 @@ from pants.base.exception_sink import ExceptionSink from pants.base.exiter import ExitCode from pants.bin.remote_pants_runner import RemotePantsRunner -from pants.init.logging import setup_logging +from pants.init.logging import initialize_stdio, stdio_destination from pants.init.util import init_workdir from pants.option.option_value_container import OptionValueContainer from pants.option.options_bootstrapper import OptionsBootstrapper @@ -69,21 +70,30 @@ def run(self, start_time: float) -> ExitCode: # We enable logging here, and everything before it will be routed through regular # Python logging. - setup_logging(global_bootstrap_options, stderr_logging=True) - - if self._should_run_with_pantsd(global_bootstrap_options): - try: - remote_runner = RemotePantsRunner(self.args, self.env, options_bootstrapper) - return remote_runner.run() - except RemotePantsRunner.Fallback as e: - logger.warning(f"Client exception: {e!r}, falling back to non-daemon mode") - - # N.B. Inlining this import speeds up the python thin client run by about 100ms. - from pants.bin.local_pants_runner import LocalPantsRunner - - # We only install signal handling via ExceptionSink if the run will execute in this process. - ExceptionSink.install( - log_location=init_workdir(global_bootstrap_options), pantsd_instance=False - ) - runner = LocalPantsRunner.create(env=self.env, options_bootstrapper=options_bootstrapper) - return runner.run(start_time) + stdin_fileno = sys.stdin.fileno() + stdout_fileno = sys.stdout.fileno() + stderr_fileno = sys.stderr.fileno() + with initialize_stdio(global_bootstrap_options), stdio_destination( + stdin_fileno=stdin_fileno, + stdout_fileno=stdout_fileno, + stderr_fileno=stderr_fileno, + ): + + if self._should_run_with_pantsd(global_bootstrap_options): + try: + remote_runner = RemotePantsRunner(self.args, self.env, options_bootstrapper) + return remote_runner.run() + except RemotePantsRunner.Fallback as e: + logger.warning(f"Client exception: {e!r}, falling back to non-daemon mode") + + # N.B. Inlining this import speeds up the python thin client run by about 100ms. + from pants.bin.local_pants_runner import LocalPantsRunner + + # We only install signal handling via ExceptionSink if the run will execute in this process. + ExceptionSink.install( + log_location=init_workdir(global_bootstrap_options), pantsd_instance=False + ) + runner = LocalPantsRunner.create( + env=self.env, options_bootstrapper=options_bootstrapper + ) + return runner.run(start_time) diff --git a/src/python/pants/bin/remote_pants_runner.py b/src/python/pants/bin/remote_pants_runner.py index a42b660f9a5..1950b0b53f1 100644 --- a/src/python/pants/bin/remote_pants_runner.py +++ b/src/python/pants/bin/remote_pants_runner.py @@ -106,7 +106,7 @@ def _connect_and_execute(self, pantsd_handle: PantsDaemonClient.Handle) -> ExitC executor = PyExecutor(*GlobalOptions.compute_executor_arguments(global_options)) # Merge the nailgun TTY capability environment variables with the passed environment dict. - ng_env = NailgunProtocol.ttynames_to_env(sys.stdin, sys.stdout.buffer, sys.stderr.buffer) + ng_env = NailgunProtocol.ttynames_to_env(sys.stdin, sys.stdout, sys.stderr) modified_env = { **self._env, **ng_env, diff --git a/src/python/pants/core/goals/repl_integration_test.py b/src/python/pants/core/goals/repl_integration_test.py index 2b1f6521f32..ac09c304881 100644 --- a/src/python/pants/core/goals/repl_integration_test.py +++ b/src/python/pants/core/goals/repl_integration_test.py @@ -11,7 +11,7 @@ from pants.core.goals.repl import Repl from pants.core.goals.repl import rules as repl_rules from pants.engine.process import Process -from pants.testutil.rule_runner import QueryRule, RuleRunner +from pants.testutil.rule_runner import QueryRule, RuleRunner, mock_console @pytest.fixture @@ -35,41 +35,43 @@ def setup_sources(rule_runner: RuleRunner) -> None: def test_repl_with_targets(rule_runner: RuleRunner) -> None: - # TODO(#9108): A mock InteractiveRunner that allows us to actually run code in - # the repl and verify that, e.g., the generated protobuf code is available. - # Right now this test prepares for that by including generated code, but cannot - # actually verify it. + # TODO(#9108): Expand `mock_console` to allow for providing input for the repl to verify + # that, e.g., the generated protobuf code is available. Right now this test prepares for + # that by including generated code, but cannot actually verify it. setup_sources(rule_runner) - result = rule_runner.run_goal_rule( - Repl, - global_args=[ - "--backend-packages=pants.backend.python", - "--backend-packages=pants.backend.codegen.protobuf.python", - ], - args=["src/python/lib.py"], - ) + with mock_console(rule_runner.options_bootstrapper): + result = rule_runner.run_goal_rule( + Repl, + global_args=[ + "--backend-packages=pants.backend.python", + "--backend-packages=pants.backend.codegen.protobuf.python", + ], + args=["src/python/lib.py"], + ) assert result.exit_code == 0 def test_repl_ipython(rule_runner: RuleRunner) -> None: setup_sources(rule_runner) - result = rule_runner.run_goal_rule( - Repl, - global_args=[ - "--backend-packages=pants.backend.python", - "--backend-packages=pants.backend.codegen.protobuf.python", - ], - args=["--shell=ipython", "src/python/lib.py"], - ) + with mock_console(rule_runner.options_bootstrapper): + result = rule_runner.run_goal_rule( + Repl, + global_args=[ + "--backend-packages=pants.backend.python", + "--backend-packages=pants.backend.codegen.protobuf.python", + ], + args=["--shell=ipython", "src/python/lib.py"], + ) assert result.exit_code == 0 def test_repl_bogus_repl_name(rule_runner: RuleRunner) -> None: setup_sources(rule_runner) - result = rule_runner.run_goal_rule( - Repl, - global_args=["--backend-packages=pants.backend.python"], - args=["--shell=bogus-repl", "src/python/lib.py"], - ) + with mock_console(rule_runner.options_bootstrapper): + result = rule_runner.run_goal_rule( + Repl, + global_args=["--backend-packages=pants.backend.python"], + args=["--shell=bogus-repl", "src/python/lib.py"], + ) assert result.exit_code == -1 assert "'bogus-repl' is not a registered REPL. Available REPLs" in result.stderr diff --git a/src/python/pants/core/goals/run_test.py b/src/python/pants/core/goals/run_test.py index 6612974e83c..ca218c91fa6 100644 --- a/src/python/pants/core/goals/run_test.py +++ b/src/python/pants/core/goals/run_test.py @@ -14,7 +14,7 @@ from pants.engine.target import Target, TargetRootsToFieldSets, TargetRootsToFieldSetsRequest from pants.option.global_options import GlobalOptions from pants.testutil.option_util import create_goal_subsystem, create_subsystem -from pants.testutil.rule_runner import MockConsole, MockGet, RuleRunner, run_rule_with_mocks +from pants.testutil.rule_runner import MockGet, RuleRunner, mock_console, run_rule_with_mocks @pytest.fixture @@ -33,7 +33,6 @@ def create_mock_run_request(rule_runner: RuleRunner, program_text: bytes) -> Run def single_target_run( rule_runner: RuleRunner, address: Address, - console: MockConsole, *, program_text: bytes, ) -> Run: @@ -50,39 +49,38 @@ class TestBinaryTarget(Target): target = TestBinaryTarget({}, address=address) field_set = TestRunFieldSet.create(target) - res = run_rule_with_mocks( - run, - rule_args=[ - create_goal_subsystem(RunSubsystem, args=[]), - create_subsystem(GlobalOptions, pants_workdir=rule_runner.pants_workdir), - console, - interactive_runner, - workspace, - BuildRoot(), - ], - mock_gets=[ - MockGet( - output_type=TargetRootsToFieldSets, - input_type=TargetRootsToFieldSetsRequest, - mock=lambda _: TargetRootsToFieldSets({target: [field_set]}), - ), - MockGet( - output_type=RunRequest, - input_type=TestRunFieldSet, - mock=lambda _: create_mock_run_request(rule_runner, program_text), - ), - ], - ) - return cast(Run, res) + with mock_console(rule_runner.options_bootstrapper) as (console, _): + res = run_rule_with_mocks( + run, + rule_args=[ + create_goal_subsystem(RunSubsystem, args=[]), + create_subsystem(GlobalOptions, pants_workdir=rule_runner.pants_workdir), + console, + interactive_runner, + workspace, + BuildRoot(), + ], + mock_gets=[ + MockGet( + output_type=TargetRootsToFieldSets, + input_type=TargetRootsToFieldSetsRequest, + mock=lambda _: TargetRootsToFieldSets({target: [field_set]}), + ), + MockGet( + output_type=RunRequest, + input_type=TestRunFieldSet, + mock=lambda _: create_mock_run_request(rule_runner, program_text), + ), + ], + ) + return cast(Run, res) def test_normal_run(rule_runner: RuleRunner) -> None: - console = MockConsole(use_colors=False) program_text = b'#!/usr/bin/python\nprint("hello")' res = single_target_run( rule_runner, Address("some/addr"), - console, program_text=program_text, ) assert res.exit_code == 0 @@ -91,18 +89,18 @@ def test_normal_run(rule_runner: RuleRunner) -> None: def test_materialize_input_files(rule_runner: RuleRunner) -> None: program_text = b'#!/usr/bin/python\nprint("hello")' binary = create_mock_run_request(rule_runner, program_text) - interactive_runner = InteractiveRunner(rule_runner.scheduler) - process = InteractiveProcess( - argv=("./program.py",), - run_in_workspace=False, - input_digest=binary.digest, - ) - result = interactive_runner.run(process) + with mock_console(rule_runner.options_bootstrapper): + interactive_runner = InteractiveRunner(rule_runner.scheduler) + process = InteractiveProcess( + argv=("./program.py",), + run_in_workspace=False, + input_digest=binary.digest, + ) + result = interactive_runner.run(process) assert result.exit_code == 0 def test_failed_run(rule_runner: RuleRunner) -> None: - console = MockConsole(use_colors=False) program_text = b'#!/usr/bin/python\nraise RuntimeError("foo")' - res = single_target_run(rule_runner, Address("some/addr"), console, program_text=program_text) + res = single_target_run(rule_runner, Address("some/addr"), program_text=program_text) assert res.exit_code == 1 diff --git a/src/python/pants/core/goals/test_test.py b/src/python/pants/core/goals/test_test.py index 9819df79c86..5dba64cd497 100644 --- a/src/python/pants/core/goals/test_test.py +++ b/src/python/pants/core/goals/test_test.py @@ -46,7 +46,7 @@ ) from pants.engine.unions import UnionMembership from pants.testutil.option_util import create_goal_subsystem -from pants.testutil.rule_runner import MockConsole, MockGet, RuleRunner, run_rule_with_mocks +from pants.testutil.rule_runner import MockGet, RuleRunner, mock_console, run_rule_with_mocks from pants.util.logging import LogLevel @@ -120,7 +120,6 @@ def run_test_rule( include_sources: bool = True, valid_targets: bool = True, ) -> Tuple[int, str]: - console = MockConsole(use_colors=False) test_subsystem = create_goal_subsystem( TestSubsystem, debug=debug, @@ -157,57 +156,60 @@ def mock_coverage_report_generation( console_report = ConsoleCoverageReport(f"Ran coverage on {addresses}") return CoverageReports(reports=(console_report,)) - result: Test = run_rule_with_mocks( - run_tests, - rule_args=[ - console, - test_subsystem, - interactive_runner, - workspace, - union_membership, - ], - mock_gets=[ - MockGet( - output_type=TargetRootsToFieldSets, - input_type=TargetRootsToFieldSetsRequest, - mock=mock_find_valid_field_sets, - ), - MockGet( - output_type=EnrichedTestResult, - input_type=TestFieldSet, - mock=lambda fs: fs.test_result, - ), - MockGet( - output_type=TestDebugRequest, - input_type=TestFieldSet, - mock=mock_debug_request, - ), - MockGet( - output_type=FieldSetsWithSources, - input_type=FieldSetsWithSourcesRequest, - mock=lambda field_sets: FieldSetsWithSources(field_sets if include_sources else ()), - ), - # Merge XML results. - MockGet( - output_type=Digest, - input_type=MergeDigests, - mock=lambda _: EMPTY_DIGEST, - ), - MockGet( - output_type=CoverageReports, - input_type=CoverageDataCollection, - mock=mock_coverage_report_generation, - ), - MockGet( - output_type=OpenFiles, - input_type=OpenFilesRequest, - mock=lambda _: OpenFiles(()), - ), - ], - union_membership=union_membership, - ) - assert not console.stdout.getvalue() - return result.exit_code, console.stderr.getvalue() + with mock_console(rule_runner.options_bootstrapper) as (console, stdio_reader): + result: Test = run_rule_with_mocks( + run_tests, + rule_args=[ + console, + test_subsystem, + interactive_runner, + workspace, + union_membership, + ], + mock_gets=[ + MockGet( + output_type=TargetRootsToFieldSets, + input_type=TargetRootsToFieldSetsRequest, + mock=mock_find_valid_field_sets, + ), + MockGet( + output_type=EnrichedTestResult, + input_type=TestFieldSet, + mock=lambda fs: fs.test_result, + ), + MockGet( + output_type=TestDebugRequest, + input_type=TestFieldSet, + mock=mock_debug_request, + ), + MockGet( + output_type=FieldSetsWithSources, + input_type=FieldSetsWithSourcesRequest, + mock=lambda field_sets: FieldSetsWithSources( + field_sets if include_sources else () + ), + ), + # Merge XML results. + MockGet( + output_type=Digest, + input_type=MergeDigests, + mock=lambda _: EMPTY_DIGEST, + ), + MockGet( + output_type=CoverageReports, + input_type=CoverageDataCollection, + mock=mock_coverage_report_generation, + ), + MockGet( + output_type=OpenFiles, + input_type=OpenFilesRequest, + mock=lambda _: OpenFiles(()), + ), + ], + union_membership=union_membership, + ) + assert not stdio_reader.get_stdout() + return result.exit_code, stdio_reader.get_stderr() def test_empty_target_noops(rule_runner: RuleRunner) -> None: diff --git a/src/python/pants/engine/console.py b/src/python/pants/engine/console.py index 8113357ef95..3c7acb09e2c 100644 --- a/src/python/pants/engine/console.py +++ b/src/python/pants/engine/console.py @@ -2,43 +2,14 @@ # Licensed under the Apache License, Version 2.0 (see LICENSE). import sys -from dataclasses import dataclass -from typing import Callable, Optional, cast +from typing import Callable, Optional from colors import blue, cyan, green, magenta, red, yellow -from pants.engine.internals.native import Native from pants.engine.internals.scheduler import SchedulerSession from pants.engine.rules import side_effecting -@dataclass(frozen=True) -class NativeWriter: - scheduler_session: SchedulerSession - native: Native = Native() - - def write(self, payload: str) -> None: - raise NotImplementedError - - def flush(self): - """flush() doesn't need to do anything for NativeWriter.""" - pass - - -class NativeStdOut(NativeWriter): - def write(self, payload: str) -> None: - scheduler = self.scheduler_session.scheduler._scheduler - session = self.scheduler_session.session - self.native.write_stdout(scheduler, session, payload, teardown_ui=True) - - -class NativeStdErr(NativeWriter): - def write(self, payload: str) -> None: - scheduler = self.scheduler_session.scheduler._scheduler - session = self.scheduler_session.session - self.native.write_stderr(scheduler, session, payload, teardown_ui=True) - - @side_effecting class Console: """Class responsible for writing text to the console while Pants is running.""" @@ -55,28 +26,25 @@ def __init__( """`stdout` and `stderr` may be explicitly provided when Console is constructed. We use this in tests to provide a mock we can write tests against, rather than writing to - the system stdout/stderr. If they are not defined, the effective stdout/stderr are proxied - to Rust engine intrinsic code if there is a scheduler session provided, or just written to - the standard Python-provided stdout/stderr if it is None. A scheduler session is provided if - --dynamic-ui is set. + the system stdout/stderr. If a SchedulerSession is set, any running UI will be torn down + before stdio is rendered. """ - has_scheduler = session is not None - - self._stdout = stdout or ( - NativeStdOut(cast(SchedulerSession, session)) if has_scheduler else sys.stdout - ) - self._stderr = stderr or ( - NativeStdErr(cast(SchedulerSession, session)) if has_scheduler else sys.stderr - ) + self._stdout = stdout or sys.stdout + self._stderr = stderr or sys.stderr self._use_colors = use_colors + self._session = session @property def stdout(self): + if self._session: + self._session.teardown_dynamic_ui() return self._stdout @property def stderr(self): + if self._session: + self._session.teardown_dynamic_ui() return self._stderr def write_stdout(self, payload: str) -> None: @@ -86,10 +54,10 @@ def write_stderr(self, payload: str) -> None: self.stderr.write(payload) def print_stdout(self, payload: str, end: str = "\n") -> None: - self.stdout.write(f"{payload}{end}") + self.write_stdout(f"{payload}{end}") def print_stderr(self, payload: str, end: str = "\n") -> None: - self.stderr.write(f"{payload}{end}") + self.write_stderr(f"{payload}{end}") def flush(self) -> None: self.stdout.flush() diff --git a/src/python/pants/engine/internals/native.py b/src/python/pants/engine/internals/native.py index bbde02fb15e..70577bcc781 100644 --- a/src/python/pants/engine/internals/native.py +++ b/src/python/pants/engine/internals/native.py @@ -3,7 +3,7 @@ import logging import os -from typing import Dict, Iterable, List, Mapping, Optional, Tuple, Union, cast +from typing import Dict, Iterable, List, Optional, Tuple, Union, cast from typing_extensions import Protocol @@ -30,7 +30,6 @@ from pants.engine.rules import Get from pants.engine.unions import union from pants.option.global_options import ExecutionOptions -from pants.util.logging import LogLevel from pants.util.memo import memoized_property from pants.util.meta import SingletonMetaclass @@ -129,25 +128,6 @@ def lib(self): """Load the native engine as a python module.""" return native_engine - def init_rust_logging( - self, - level: int, - log_show_rust_3rdparty: bool, - use_color: bool, - show_target: bool, - log_levels_by_target: Mapping[str, LogLevel], - message_regex_filters: Iterable[str], - ): - log_levels_as_ints = {k: v.level for k, v in log_levels_by_target.items()} - return self.lib.init_logging( - level, - log_show_rust_3rdparty, - use_color, - show_target, - log_levels_as_ints, - tuple(message_regex_filters), - ) - def set_per_run_log_path(self, path: Optional[str]) -> None: """Instructs the logging code to also write emitted logs to a run-specific log file; or disables writing to any run-specific file if `None` is passed.""" @@ -156,38 +136,13 @@ def set_per_run_log_path(self, path: Optional[str]) -> None: def default_cache_path(self) -> str: return cast(str, self.lib.default_cache_path()) - def setup_pantsd_logger(self, log_file_path): - return self.lib.setup_pantsd_logger(log_file_path) - - def setup_stderr_logger(self): - return self.lib.setup_stderr_logger() - def write_log(self, msg: str, *, level: int, target: str): """Proxy a log message to the Rust logging faculties.""" return self.lib.write_log(msg, level, target) - def write_stdout(self, scheduler, session, msg: str, teardown_ui: bool): - if teardown_ui: - self.teardown_dynamic_ui(scheduler, session) - return self.lib.write_stdout(session, msg) - - def write_stderr(self, scheduler, session, msg: str, teardown_ui: bool): - if teardown_ui: - self.teardown_dynamic_ui(scheduler, session) - return self.lib.write_stderr(session, msg) - - def teardown_dynamic_ui(self, scheduler, session): - self.lib.teardown_dynamic_ui(scheduler, session) - def flush_log(self): return self.lib.flush_log() - def override_thread_logging_destination_to_just_pantsd(self): - self.lib.override_thread_logging_destination("pantsd") - - def override_thread_logging_destination_to_just_stderr(self): - self.lib.override_thread_logging_destination("stderr") - def match_path_globs(self, path_globs: PathGlobs, paths: Iterable[str]) -> Tuple[str, ...]: """Return all paths that match the PathGlobs.""" return tuple(self.lib.match_path_globs(path_globs, tuple(paths))) diff --git a/src/python/pants/engine/internals/native_engine.pyi b/src/python/pants/engine/internals/native_engine.pyi index 382ddae11ea..965dec5d5d2 100644 --- a/src/python/pants/engine/internals/native_engine.pyi +++ b/src/python/pants/engine/internals/native_engine.pyi @@ -1,9 +1,27 @@ -from typing import Any, Callable, Dict, List, Tuple +from io import RawIOBase +from typing import Any, Callable, Dict, List, TextIO, Tuple # TODO: black and flake8 disagree about the content of this file: # see https://github.com/psf/black/issues/1548 # flake8: noqa: E302 +def write_log(msg: str, level: int, target: str) -> None: ... +def flush_log() -> None: ... +def stdio_initialize( + level: int, + show_rust_3rdparty_logs: bool, + use_color: bool, + show_target: bool, + log_levels_by_target: Dict[str, int], + message_regex_filters: Tuple[str, ...], + log_file: str, +) -> Tuple[TextIO, TextIO, TextIO]: ... +def stdio_thread_console_set(stdin_fileno: int, stdout_fileno: int, stderr_fileno: int) -> None: ... +def stdio_thread_console_clear() -> None: ... +def stdio_write_stdout(msg: str) -> None: ... +def stdio_write_stderr(msg: str) -> None: ... +def teardown_dynamic_ui(scheduler: PyScheduler, session: PySession) -> None: ... + class PyDigest: def __init__(self, fingerprint: str, serialized_bytes_length: int) -> None: ... @property diff --git a/src/python/pants/engine/internals/scheduler.py b/src/python/pants/engine/internals/scheduler.py index 92c83c64fd8..f8ce6cc273a 100644 --- a/src/python/pants/engine/internals/scheduler.py +++ b/src/python/pants/engine/internals/scheduler.py @@ -31,6 +31,7 @@ RemovePrefix, Snapshot, ) +from pants.engine.internals import native_engine from pants.engine.internals.native_engine import PyExecutor, PySessionCancellationLatch, PyTypes from pants.engine.internals.nodes import Return, Throw from pants.engine.internals.selectors import Params @@ -486,6 +487,9 @@ def _maybe_visualize(self): self._run_count += 1 self.visualize_graph_to_file(os.path.join(self._scheduler.visualize_to_dir, name)) + def teardown_dynamic_ui(self) -> None: + native_engine.teardown_dynamic_ui(self._scheduler._scheduler, self._session) + def execute(self, execution_request: ExecutionRequest): """Invoke the engine for the given ExecutionRequest, returning Return and Throw states. diff --git a/src/python/pants/help/BUILD b/src/python/pants/help/BUILD index f7604a972ce..f402feb3ff4 100644 --- a/src/python/pants/help/BUILD +++ b/src/python/pants/help/BUILD @@ -12,5 +12,5 @@ python_tests( python_integration_tests( name='integration', uses_pants_run=True, - timeout=180, + timeout=360, ) diff --git a/src/python/pants/init/logging.py b/src/python/pants/init/logging.py index 34fbe2887da..1e580f0f582 100644 --- a/src/python/pants/init/logging.py +++ b/src/python/pants/init/logging.py @@ -4,13 +4,16 @@ import http.client import logging import os -from logging import Formatter, Handler, LogRecord, StreamHandler -from typing import Dict, Iterable, Optional, Tuple +import sys +from contextlib import contextmanager +from logging import Formatter, LogRecord, StreamHandler +from typing import Dict, Iterator import pants.util.logging as pants_logging -from pants.engine.internals.native import Native +from pants.base.deprecated import deprecated_conditional +from pants.engine.internals import native_engine from pants.option.option_value_container import OptionValueContainer -from pants.util.dirutil import safe_mkdir +from pants.util.dirutil import safe_mkdir_for from pants.util.logging import LogLevel # Although logging supports the WARN level, its not documented and could conceivably be yanked. @@ -20,47 +23,18 @@ logging.addLevelName(pants_logging.TRACE, "TRACE") -def init_rust_logger( - log_level: LogLevel, - log_show_rust_3rdparty: bool, - use_color: bool, - show_target: bool, - log_levels_by_target: Dict[str, LogLevel] = {}, - message_regex_filters: Iterable[str] = (), -) -> None: - Native().init_rust_logging( - log_level.level, - log_show_rust_3rdparty, - use_color, - show_target, - log_levels_by_target, - message_regex_filters, - ) - - -class NativeHandler(StreamHandler): +class _NativeHandler(StreamHandler): """This class is installed as a Python logging module handler (using the logging.addHandler method) and proxies logs to the Rust logging infrastructure.""" - def __init__(self, log_level: LogLevel, native_filename: Optional[str] = None) -> None: - super().__init__(None) - self.native = Native() - self.native_filename = native_filename - self.setLevel(pants_logging.TRACE) - if not self.native_filename: - self.native.setup_stderr_logger() - def emit(self, record: LogRecord) -> None: - self.native.write_log(msg=self.format(record), level=record.levelno, target=record.name) + native_engine.write_log(msg=self.format(record), level=record.levelno, target=record.name) def flush(self) -> None: - self.native.flush_log() - - def __repr__(self) -> str: - return f"NativeHandler(id={id(self)}, level={self.level}, filename={self.native_filename}" + native_engine.flush_log() -class ExceptionFormatter(Formatter): +class _ExceptionFormatter(Formatter): """Uses the `--print-stacktrace` option to decide whether to render stacktraces.""" def __init__(self, print_stacktrace: bool): @@ -73,25 +47,30 @@ def formatException(self, exc_info): return "\n(Use --print-stacktrace to see more error details.)" -def clear_logging_handlers(): - logger = logging.getLogger(None) - for handler in get_logging_handlers(): - logger.removeHandler(handler) - +@contextmanager +def stdio_destination(stdin_fileno: int, stdout_fileno: int, stderr_fileno: int) -> Iterator[None]: + """Sets a destination for both logging and stdio: must be called after `initialize_stdio`. -def get_logging_handlers() -> Tuple[Handler, ...]: - logger = logging.getLogger(None) - return tuple(logger.handlers) + After `initialize_stdio` and outside of this contextmanager, the default stdio destination is + the pantsd.log. But inside of this block, all engine "tasks"/@rules that are spawned will have + thread/task-local state that directs their IO to the given destination. When the contextmanager + exits all tasks will be restored to the default destination (regardless of whether they have + completed). + """ + if not logging.getLogger(None).handlers: + raise AssertionError("stdio_destination should only be called after initialize_stdio.") + native_engine.stdio_thread_console_set(stdin_fileno, stdout_fileno, stderr_fileno) + try: + yield + finally: + native_engine.stdio_thread_console_clear() -def set_logging_handlers(handlers: Tuple[Handler, ...]): - clear_logging_handlers() - logger = logging.getLogger(None) - for handler in handlers: - logger.addHandler(handler) +@contextmanager +def _python_logging_setup(level: LogLevel, print_stacktrace: bool) -> Iterator[None]: + """Installs a root Python logger that routes all logging through a Rust logger.""" -def _common_logging_setup(level: LogLevel) -> None: def trace_fn(self, message, *args, **kwargs): if self.isEnabledFor(LogLevel.TRACE.level): self._log(LogLevel.TRACE.level, message, *args, **kwargs) @@ -99,53 +78,101 @@ def trace_fn(self, message, *args, **kwargs): logging.Logger.trace = trace_fn # type: ignore[attr-defined] logger = logging.getLogger(None) - level.set_level_for(logger) - # This routes warnings through our loggers instead of straight to raw stderr. - logging.captureWarnings(True) + def clear_logging_handlers(): + handlers = tuple(logger.handlers) + for handler in handlers: + logger.removeHandler(handler) + return handlers + + def set_logging_handlers(handlers): + for handler in handlers: + logger.addHandler(handler) + + # Remove existing handlers, and restore them afterward. + handlers = clear_logging_handlers() + try: + # This routes warnings through our loggers instead of straight to raw stderr. + logging.captureWarnings(True) + handler = _NativeHandler() + handler.setFormatter(_ExceptionFormatter(print_stacktrace)) + logger.addHandler(handler) + level.set_level_for(logger) - if logger.isEnabledFor(LogLevel.TRACE.level): - http.client.HTTPConnection.debuglevel = 1 # type: ignore[attr-defined] - requests_logger = logging.getLogger("requests.packages.urllib3") - LogLevel.TRACE.set_level_for(requests_logger) - requests_logger.propagate = True + if logger.isEnabledFor(LogLevel.TRACE.level): + http.client.HTTPConnection.debuglevel = 1 # type: ignore[attr-defined] + requests_logger = logging.getLogger("requests.packages.urllib3") + LogLevel.TRACE.set_level_for(requests_logger) + requests_logger.propagate = True + yield + finally: + clear_logging_handlers() + set_logging_handlers(handlers) -def setup_logging(global_bootstrap_options: OptionValueContainer, stderr_logging: bool) -> None: - """Sets up logging for a Pants run. - This is called in two contexts: 1) PantsRunner, 2) DaemonPantsRunner. In the latter case, the - loggers are saved and restored around this call, so in both cases it runs with no handlers - configured (and asserts so!). - """ - if get_logging_handlers(): - raise AssertionError("setup_logging should not be called while Handlers are installed.") +@contextmanager +def initialize_stdio(global_bootstrap_options: OptionValueContainer) -> Iterator[None]: + """Mutates sys.std* and logging to route stdio for a Pants process to thread local destinations. - global_level = global_bootstrap_options.level - log_dir = global_bootstrap_options.logdir + In this context, `sys.std*` and logging handlers will route through Rust code that uses + thread-local information to decide whether to write to a file, or to stdio file handles. + + To control the stdio destination set by this method, use the `stdio_destination` context manager. + This is called in two different processes: + * PantsRunner, after it has determined that LocalPantsRunner will be running in process, and + immediately before setting a `stdio_destination` for the remainder of the run. + * PantsDaemon, immediately on startup. The process will then default to sending stdio to the log + until client connections arrive, at which point `stdio_destination` is used per-connection. + """ + global_level = global_bootstrap_options.level log_show_rust_3rdparty = global_bootstrap_options.log_show_rust_3rdparty use_color = global_bootstrap_options.colors show_target = global_bootstrap_options.show_log_target - log_levels_by_target = get_log_levels_by_target(global_bootstrap_options) + log_levels_by_target = _get_log_levels_by_target(global_bootstrap_options) message_regex_filters = global_bootstrap_options.ignore_pants_warnings + print_stacktrace = global_bootstrap_options.print_stacktrace - init_rust_logger( - global_level, - log_show_rust_3rdparty, - use_color, - show_target, - log_levels_by_target, - message_regex_filters, + # Set the pantsd log destination. + deprecated_log_path = os.path.join( + global_bootstrap_options.pants_workdir, "pantsd", "pantsd.log" ) - - if stderr_logging: - setup_logging_to_stderr(global_level, global_bootstrap_options.print_stacktrace) - - if log_dir: - setup_logging_to_file(global_level, log_dir=log_dir) - - -def get_log_levels_by_target(global_bootstrap_options: OptionValueContainer) -> Dict[str, LogLevel]: + log_path = os.path.join(global_bootstrap_options.pants_workdir, "pants.log") + safe_mkdir_for(deprecated_log_path) + safe_mkdir_for(log_path) + # NB: We append to the deprecated log location with a deprecated conditional that never + # triggers, because there is nothing that the user can do about the deprecation. + deprecated_conditional( + predicate=lambda: False, + removal_version="2.5.0.dev0", + entity_description=f"Logging to {deprecated_log_path}", + hint_message=f"Refer to {log_path} instead.", + ) + with open(deprecated_log_path, "a") as a: + a.write(f"This log location is deprecated: please refer to {log_path} instead.\n") + + # Initialize thread-local stdio, and replace sys.std* with proxies. + original_stdin, original_stdout, original_stderr = sys.stdin, sys.stdout, sys.stderr + try: + sys.stdin, sys.stdout, sys.stderr = native_engine.stdio_initialize( + global_level.level, + log_show_rust_3rdparty, + use_color, + show_target, + {k: v.level for k, v in log_levels_by_target.items()}, + tuple(message_regex_filters), + log_path, + ) + # Install a Python logger that will route through the Rust logger. + with _python_logging_setup(global_level, print_stacktrace): + yield + finally: + sys.stdin, sys.stdout, sys.stderr = original_stdin, original_stdout, original_stderr + + +def _get_log_levels_by_target( + global_bootstrap_options: OptionValueContainer, +) -> Dict[str, LogLevel]: raw_levels = global_bootstrap_options.log_levels_by_target levels: Dict[str, LogLevel] = {} for key, value in raw_levels.items(): @@ -160,39 +187,3 @@ def get_log_levels_by_target(global_bootstrap_options: OptionValueContainer) -> log_level = LogLevel[value.upper()] levels[key] = log_level return levels - - -def setup_logging_to_stderr(level: LogLevel, print_stacktrace: bool) -> None: - """Sets up Python logging to stderr, proxied to Rust via a NativeHandler. - - We deliberately set the most verbose logging possible (i.e. the TRACE log level), here, and let - the Rust logging faculties take care of filtering. - """ - _common_logging_setup(level) - - python_logger = logging.getLogger(None) - handler = NativeHandler(level) - handler.setFormatter(ExceptionFormatter(print_stacktrace)) - python_logger.addHandler(handler) - LogLevel.TRACE.set_level_for(python_logger) - - -def setup_logging_to_file( - level: LogLevel, - *, - log_dir: str, - log_filename: str = "pants.log", -) -> NativeHandler: - native = Native() - logger = logging.getLogger(None) - - _common_logging_setup(level) - - safe_mkdir(log_dir) - log_path = os.path.join(log_dir, log_filename) - - native.setup_pantsd_logger(log_path) - handler = NativeHandler(level, native_filename=log_path) - - logger.addHandler(handler) - return handler diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index 9076d1cac97..aac16ef2e03 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -338,12 +338,18 @@ def register_bootstrap_options(cls, register): ) register( - "-l", "--level", type=LogLevel, default=LogLevel.INFO, help="Set the logging level." + "-l", + "--level", + type=LogLevel, + default=LogLevel.INFO, + daemon=True, + help="Set the logging level.", ) register( "--show-log-target", type=bool, default=False, + daemon=True, advanced=True, help="Display the target where a log message originates in that log message's output. " "This can be helpful when paired with --log-levels-by-target.", @@ -353,6 +359,7 @@ def register_bootstrap_options(cls, register): "--log-levels-by-target", type=dict, default={}, + daemon=True, advanced=True, help="Set a more specific logging level for one or more logging targets. The names of " "logging targets are specified in log strings when the --show-log-target option is set. " @@ -366,6 +373,7 @@ def register_bootstrap_options(cls, register): "--log-show-rust-3rdparty", type=bool, default=False, + daemon=True, advanced=True, help="Whether to show/hide logging done by 3rdparty Rust crates used by the Pants " "engine.", @@ -387,6 +395,7 @@ def register_bootstrap_options(cls, register): type=list, member_type=str, default=[], + daemon=True, advanced=True, help="Regexps matching warning strings to ignore, e.g. " '["DEPRECATED: the option `--my-opt` will be removed"]. The regex patterns will be ' diff --git a/src/python/pants/pantsd/pants_daemon.py b/src/python/pants/pantsd/pants_daemon.py index 7c4e874a61e..31458de6360 100644 --- a/src/python/pants/pantsd/pants_daemon.py +++ b/src/python/pants/pantsd/pants_daemon.py @@ -8,8 +8,7 @@ import sys import time import warnings -from contextlib import contextmanager -from typing import Any, Iterator +from typing import Any from setproctitle import setproctitle as set_process_title @@ -19,7 +18,7 @@ from pants.engine.internals.native import Native from pants.engine.internals.native_engine import PyExecutor from pants.init.engine_initializer import GraphScheduler -from pants.init.logging import setup_logging, setup_logging_to_file +from pants.init.logging import initialize_stdio from pants.init.util import init_workdir from pants.option.global_options import GlobalOptions from pants.option.option_value_container import OptionValueContainer @@ -30,7 +29,6 @@ from pants.pantsd.service.pants_service import PantsServices from pants.pantsd.service.scheduler_service import SchedulerService from pants.pantsd.service.store_gc_service import StoreGCService -from pants.util.contextutil import stdio_as from pants.util.logging import LogLevel from pants.util.strutil import ensure_text @@ -56,7 +54,6 @@ def create(cls, options_bootstrapper: OptionsBootstrapper) -> PantsDaemon: bootstrap_options_values = bootstrap_options.for_global_scope() native = Native() - native.override_thread_logging_destination_to_just_pantsd() executor = PyExecutor(*GlobalOptions.compute_executor_arguments(bootstrap_options_values)) core = PantsDaemonCore(options_bootstrapper, executor, cls._setup_services) @@ -147,34 +144,6 @@ def _close_stdio(): fd.close() os.close(file_no) - @contextmanager - def _pantsd_logging(self) -> Iterator[None]: - """A context manager that runs with pantsd logging. - - Asserts that stdio (represented by file handles 0, 1, 2) is closed to ensure that we can - safely reuse those fd numbers. - """ - - # Ensure that stdio is closed so that we can safely reuse those file descriptors. - for fd in (0, 1, 2): - try: - os.fdopen(fd) - raise AssertionError(f"pantsd logging cannot initialize while stdio is open: {fd}") - except OSError: - pass - - # Redirect stdio to /dev/null for the rest of the run to reserve those file descriptors. - with stdio_as(stdin_fd=-1, stdout_fd=-1, stderr_fd=-1): - # Reinitialize logging for the daemon context. - global_options = self._bootstrap_options.for_global_scope() - setup_logging(global_options, stderr_logging=False) - - log_dir = os.path.join(self._work_dir, self.name) - setup_logging_to_file(global_options.level, log_dir=log_dir, log_filename=self.LOG_NAME) - - self._logger.debug("Logging reinitialized in pantsd context") - yield - def _initialize_metadata(self) -> None: """Writes out our pid and other metadata. @@ -194,15 +163,15 @@ def run_sync(self): """Synchronously run pantsd.""" os.environ.pop("PYTHONPATH") + global_bootstrap_options = self._bootstrap_options.for_global_scope() + # Switch log output to the daemon's log stream from here forward. self._close_stdio() - with self._pantsd_logging(): - # Install signal handling based on global bootstrap options. - global_bootstrap_options = self._bootstrap_options.for_global_scope() + with initialize_stdio(global_bootstrap_options): + # Install signal and panic handling. ExceptionSink.install( log_location=init_workdir(global_bootstrap_options), pantsd_instance=True ) - self._native.set_panic_handler() # Set the process name in ps output to 'pantsd' vs './pants compile src/etc:: -ldebug'. diff --git a/src/python/pants/pantsd/pants_daemon_core.py b/src/python/pants/pantsd/pants_daemon_core.py index c25e8d300ef..c5c39271488 100644 --- a/src/python/pants/pantsd/pants_daemon_core.py +++ b/src/python/pants/pantsd/pants_daemon_core.py @@ -76,9 +76,9 @@ def _initialize( """ try: if self._scheduler: - logger.info("initialization options changed: reinitializing pantsd...") + logger.info("initialization options changed: reinitializing scheduler...") else: - logger.info("initializing pantsd...") + logger.info("initializing scheduler...") if self._services: self._services.shutdown() build_config, options = self._options_initializer.build_config_and_options( @@ -92,7 +92,7 @@ def _initialize( self._services = self._services_constructor(bootstrap_options_values, self._scheduler) self._fingerprint = options_fingerprint - logger.info("pantsd initialized.") + logger.info("scheduler initialized.") except Exception as e: self._kill_switch.set() self._scheduler = None diff --git a/src/python/pants/pantsd/service/pants_service.py b/src/python/pants/pantsd/service/pants_service.py index 559bc9da4b4..dd93afd2fb1 100644 --- a/src/python/pants/pantsd/service/pants_service.py +++ b/src/python/pants/pantsd/service/pants_service.py @@ -8,7 +8,6 @@ from dataclasses import dataclass from typing import Dict, KeysView, Tuple -from pants.engine.internals.native import Native from pants.util.meta import frozen_after_init logger = logging.getLogger(__name__) @@ -213,12 +212,7 @@ def __init__(self, services: Tuple[PantsService, ...] = ()) -> None: @classmethod def _make_thread(cls, service): name = f"{service.__class__.__name__}Thread" - - def target(): - Native().override_thread_logging_destination_to_just_pantsd() - service.run() - - t = threading.Thread(target=target, name=name) + t = threading.Thread(target=service.run, name=name) t.daemon = True return t diff --git a/src/python/pants/testutil/pants_integration_test.py b/src/python/pants/testutil/pants_integration_test.py index b11ea66441a..34ac924d1b4 100644 --- a/src/python/pants/testutil/pants_integration_test.py +++ b/src/python/pants/testutil/pants_integration_test.py @@ -304,10 +304,10 @@ def render_logs(workdir: str) -> None: print(f"{rel_filename} --- ") -def read_pantsd_log(workdir: str) -> Iterator[str]: +def read_pants_log(workdir: str) -> Iterator[str]: """Yields all lines from the pantsd log under the given workdir.""" # Surface the pantsd log for easy viewing via pytest's `-s` (don't capture stdio) option. - for line in _read_log(f"{workdir}/pantsd/pantsd.log"): + for line in _read_log(f"{workdir}/pants.log"): yield line diff --git a/src/python/pants/testutil/rule_runner.py b/src/python/pants/testutil/rule_runner.py index ceca048d127..ccbd43829a5 100644 --- a/src/python/pants/testutil/rule_runner.py +++ b/src/python/pants/testutil/rule_runner.py @@ -5,16 +5,18 @@ import multiprocessing import os +from contextlib import contextmanager from dataclasses import dataclass from io import StringIO -from pathlib import PurePath +from pathlib import Path, PurePath from tempfile import mkdtemp from types import CoroutineType, GeneratorType -from typing import Any, Callable, Iterable, Mapping, Sequence, Type, TypeVar, cast +from typing import Any, Callable, Iterable, Iterator, Mapping, Sequence, Tuple, Type, TypeVar, cast from colors import blue, cyan, green, magenta, red, yellow from pants.base.build_root import BuildRoot +from pants.base.deprecated import deprecated from pants.base.specs_parser import SpecsParser from pants.build_graph.build_configuration import BuildConfiguration from pants.build_graph.build_file_aliases import BuildFileAliases @@ -35,12 +37,13 @@ from pants.engine.target import Target, WrappedTarget from pants.engine.unions import UnionMembership from pants.init.engine_initializer import EngineInitializer +from pants.init.logging import initialize_stdio, stdio_destination from pants.option.global_options import ExecutionOptions, GlobalOptions from pants.option.options_bootstrapper import OptionsBootstrapper from pants.source import source_root from pants.testutil.option_util import create_options_bootstrapper from pants.util.collections import assert_single_element -from pants.util.contextutil import temporary_dir +from pants.util.contextutil import temporary_dir, temporary_file from pants.util.dirutil import ( recursive_dirname, safe_file_dump, @@ -76,6 +79,7 @@ def noop() -> GoalRuleResult: @dataclass class RuleRunner: build_root: str + options_bootstrapper: OptionsBootstrapper build_config: BuildConfiguration scheduler: SchedulerSession @@ -113,9 +117,9 @@ def __init__( build_config_builder.register_target_types(target_types or ()) self.build_config = build_config_builder.create() - options_bootstrapper = create_options_bootstrapper() - options = options_bootstrapper.full_options(self.build_config) - global_options = options_bootstrapper.bootstrap_options.for_global_scope() + self.options_bootstrapper = create_options_bootstrapper() + options = self.options_bootstrapper.full_options(self.build_config) + global_options = self.options_bootstrapper.bootstrap_options.for_global_scope() local_store_dir = ( os.path.realpath(safe_mkdtemp()) if isolated_local_store @@ -142,7 +146,10 @@ def __init__( ).new_session( build_id="buildid_for_test", session_values=SessionValues( - {OptionsBootstrapper: options_bootstrapper, PantsEnvironment: PantsEnvironment()} + { + OptionsBootstrapper: self.options_bootstrapper, + PantsEnvironment: PantsEnvironment(), + } ), ) self.scheduler = graph_session.scheduler_session @@ -187,9 +194,8 @@ def run_goal_rule( ) -> GoalRuleResult: merged_args = (*(global_args or []), goal.name, *(args or [])) self.set_options(merged_args, env=env) - options_bootstrapper = create_options_bootstrapper(args=merged_args, env=env) - raw_specs = options_bootstrapper.full_options_for_scopes( + raw_specs = self.options_bootstrapper.full_options_for_scopes( [*GlobalOptions.known_scope_infos(), *goal.subsystem_cls.known_scope_infos()] ).specs specs = SpecsParser(self.build_root).parse_specs(raw_specs) @@ -220,11 +226,14 @@ def set_options(self, args: Iterable[str], *, env: Mapping[str, str] | None = No This will override any previously configured values. """ - options_bootstrapper = create_options_bootstrapper(args=args, env=env) + self.options_bootstrapper = create_options_bootstrapper(args=args, env=env) self.scheduler = self.scheduler.scheduler.new_session( build_id="buildid_for_test", session_values=SessionValues( - {OptionsBootstrapper: options_bootstrapper, PantsEnvironment: PantsEnvironment(env)} + { + OptionsBootstrapper: self.options_bootstrapper, + PantsEnvironment: PantsEnvironment(env), + } ), ) @@ -436,9 +445,46 @@ def get(product, subject): return e.value +@contextmanager +def mock_console( + options_bootstrapper: OptionsBootstrapper, +) -> Iterator[Tuple[Console, StdioReader]]: + global_bootstrap_options = options_bootstrapper.bootstrap_options.for_global_scope() + with initialize_stdio(global_bootstrap_options), open( + "/dev/null", "r" + ) as stdin, temporary_file(binary_mode=False) as stdout, temporary_file( + binary_mode=False + ) as stderr, stdio_destination( + stdin_fileno=stdin.fileno(), + stdout_fileno=stdout.fileno(), + stderr_fileno=stderr.fileno(), + ): + # NB: We yield a Console without overriding the destination argument, because we have + # already done a sys.std* level replacement. The replacement is necessary in order for + # InteractiveProcess to have native file handles to interact with. + yield Console(use_colors=global_bootstrap_options.colors), StdioReader( + _stdout=Path(stdout.name), _stderr=Path(stderr.name) + ) + + +@dataclass +class StdioReader: + _stdout: Path + _stderr: Path + + def get_stdout(self) -> str: + """Return all data that has been flushed to stdout so far.""" + return self._stdout.read_text() + + def get_stderr(self) -> str: + """Return all data that has been flushed to stderr so far.""" + return self._stderr.read_text() + + class MockConsole: """An implementation of pants.engine.console.Console which captures output.""" + @deprecated("2.5.0.dev0", hint_message="Use the mock_console contextmanager instead.") def __init__(self, use_colors=True): self.stdout = StringIO() self.stderr = StringIO() diff --git a/src/python/pants/util/contextutil.py b/src/python/pants/util/contextutil.py index 18af13f8408..2ce9b18c0f0 100644 --- a/src/python/pants/util/contextutil.py +++ b/src/python/pants/util/contextutil.py @@ -9,7 +9,6 @@ import ssl import sys import tempfile -import termios import threading import zipfile from contextlib import contextmanager @@ -95,63 +94,6 @@ def argv_as(args: Tuple[str, ...]) -> Iterator[None]: sys.argv = old_args -@contextmanager -def _stdio_stream_as(src_fd: int, dst_fd: int, dst_sys_attribute: str, mode: str) -> Iterator[None]: - """Replace the given dst_fd and attribute on `sys` with an open handle to the given src_fd.""" - src = None - if src_fd == -1: - src = open("/dev/null", mode) - src_fd = src.fileno() - - # Capture the python and os level file handles. - old_dst = getattr(sys, dst_sys_attribute) - old_dst_fd = os.dup(dst_fd) - if src_fd != dst_fd: - os.dup2(src_fd, dst_fd) - - # Open up a new file handle to temporarily replace the python-level io object, then yield. - new_dst = os.fdopen(dst_fd, mode) - is_atty = new_dst.isatty() - setattr(sys, dst_sys_attribute, new_dst) - try: - yield - finally: - try: - if src: - src.close() - if is_atty: - termios.tcdrain(dst_fd) - else: - new_dst.flush() - new_dst.close() - except BaseException: - pass - - # Restore the python and os level file handles. - os.dup2(old_dst_fd, dst_fd) - setattr(sys, dst_sys_attribute, old_dst) - - -@contextmanager -def stdio_as(stdout_fd: int, stderr_fd: int, stdin_fd: int) -> Iterator[None]: - """Redirect sys.{stdout, stderr, stdin} to alternate file descriptors. - - As a special case, if a given destination fd is `-1`, we will replace it with an open file handle - to `/dev/null`. - - NB: If the filehandles for sys.{stdout, stderr, stdin} have previously been closed, it's - possible that the OS has repurposed fds `0, 1, 2` to represent other files or sockets. It's - impossible for this method to locate all python objects which refer to those fds, so it's up - to the caller to guarantee that `0, 1, 2` are safe to replace. - - The streams expect unicode. To write and read bytes, access their buffer, e.g. `stdin.buffer.read()`. - """ - with _stdio_stream_as(stdin_fd, 0, "stdin", "r"), _stdio_stream_as( - stdout_fd, 1, "stdout", "w" - ), _stdio_stream_as(stderr_fd, 2, "stderr", "w"): - yield - - @contextmanager def temporary_dir( root_dir: str | None = None, diff --git a/src/python/pants/util/contextutil_test.py b/src/python/pants/util/contextutil_test.py index b6318994c07..605b60fdd26 100644 --- a/src/python/pants/util/contextutil_test.py +++ b/src/python/pants/util/contextutil_test.py @@ -7,7 +7,6 @@ import subprocess import sys import unittest.mock -import uuid import zipfile from contextlib import contextmanager from typing import Iterator @@ -19,7 +18,6 @@ maybe_profiled, open_zip, pushd, - stdio_as, temporary_dir, temporary_file, ) @@ -204,83 +202,6 @@ def test_open_zip_returns_realpath_on_badzipfile(self) -> None: ): pass - @contextmanager - def _stdio_as_tempfiles(self) -> Iterator[None]: - """Harness to replace `sys.std*` with tempfiles. - - Validates that all files are read/written/flushed correctly, and acts as a contextmanager to - allow for recursive tests. - """ - - # Prefix contents written within this instance with a unique string to differentiate - # them from other instances. - uuid_str = str(uuid.uuid4()) - - def u(string: str) -> str: - return f"{uuid_str}#{string}" - - stdin_data = u("stdio") - stdout_data = u("stdout") - stderr_data = u("stderr") - - with temporary_file(binary_mode=False) as tmp_stdin, temporary_file( - binary_mode=False - ) as tmp_stdout, temporary_file(binary_mode=False) as tmp_stderr: - print(stdin_data, file=tmp_stdin) - tmp_stdin.seek(0) - # Read prepared content from stdin, and write content to stdout/stderr. - with stdio_as( - stdout_fd=tmp_stdout.fileno(), - stderr_fd=tmp_stderr.fileno(), - stdin_fd=tmp_stdin.fileno(), - ): - self.assertEqual(sys.stdin.fileno(), 0) - self.assertEqual(sys.stdout.fileno(), 1) - self.assertEqual(sys.stderr.fileno(), 2) - - self.assertEqual(stdin_data, sys.stdin.read().strip()) - print(stdout_data, file=sys.stdout) - yield - print(stderr_data, file=sys.stderr) - - tmp_stdout.seek(0) - tmp_stderr.seek(0) - self.assertEqual(stdout_data, tmp_stdout.read().strip()) - self.assertEqual(stderr_data, tmp_stderr.read().strip()) - - def test_stdio_as(self) -> None: - self.assertTrue( - sys.stderr.fileno() > 2, f"Expected a pseudofile as stderr, got: {sys.stderr}" - ) - old_stdout, old_stderr, old_stdin = sys.stdout, sys.stderr, sys.stdin - - # The first level tests that when `sys.std*` are file-likes (in particular, the ones set up in - # pytest's harness) rather than actual files, we stash and restore them properly. - with self._stdio_as_tempfiles(): - # The second level stashes the first level's actual file objects and then re-opens them. - with self._stdio_as_tempfiles(): - pass - - # Validate that after the second level completes, the first level still sees valid - # fds on `sys.std*`. - self.assertEqual(sys.stdin.fileno(), 0) - self.assertEqual(sys.stdout.fileno(), 1) - self.assertEqual(sys.stderr.fileno(), 2) - - self.assertEqual(sys.stdout, old_stdout) - self.assertEqual(sys.stderr, old_stderr) - self.assertEqual(sys.stdin, old_stdin) - - def test_stdio_as_dev_null(self) -> None: - # Capture output to tempfiles. - with self._stdio_as_tempfiles(): - # Read/write from/to `/dev/null`, which will be validated by the harness as not - # affecting the tempfiles. - with stdio_as(stdout_fd=-1, stderr_fd=-1, stdin_fd=-1): - self.assertEqual("", sys.stdin.read()) - print("garbage", file=sys.stdout) - print("garbage", file=sys.stderr) - def test_permissions(self) -> None: with temporary_file(permissions=0o700) as f: self.assertEqual(0o700, os.stat(f.name)[0] & 0o777) diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 799bef0860d..2768d7c2c8d 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -355,9 +355,8 @@ dependencies = [ [[package]] name = "console" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a50aab2529019abfabfa93f1e6c41ef392f91fbf179b347a7e96abb524884a08" +version = "0.14.0" +source = "git+https://github.com/stuhood/console?branch=stuhood/term-target-for-arbitrary-handles#a29180f638c1aa2b44efcd725b126bc89305fff7" dependencies = [ "encode_unicode", "lazy_static", @@ -366,7 +365,6 @@ dependencies = [ "terminal_size", "unicode-width", "winapi 0.3.9", - "winapi-util", ] [[package]] @@ -655,6 +653,7 @@ dependencies = [ "indexmap", "itertools 0.8.2", "lazy_static", + "libc", "log 0.4.11", "logging", "mock", @@ -668,6 +667,7 @@ dependencies = [ "rule_graph", "sharded_lmdb", "smallvec 0.6.14", + "stdio", "store", "task_executor", "tempfile", @@ -1250,9 +1250,8 @@ dependencies = [ [[package]] name = "indicatif" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49a68371cf417889c9d7f98235b7102ea7c54fc59bcbd22f3dea785be9d27e40" +version = "0.15.0" +source = "git+https://github.com/stuhood/indicatif?branch=stuhood/properly-query-width#7345638238a37fc2fc99faf87dbbd7a8b137b948" dependencies = [ "console", "lazy_static", @@ -1465,6 +1464,7 @@ dependencies = [ "num_enum", "parking_lot", "regex", + "stdio", "tokio", "uuid", ] @@ -1811,9 +1811,9 @@ dependencies = [ [[package]] name = "number_prefix" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17b02fc0ff9a9e4b35b3342880f48e896ebf69f2967921fe8646bf5b7125956a" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" [[package]] name = "once_cell" @@ -2859,6 +2859,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stdio" +version = "0.0.1" +dependencies = [ + "log 0.4.11", + "parking_lot", + "tokio", +] + [[package]] name = "store" version = "0.1.0" @@ -3004,7 +3013,7 @@ dependencies = [ "arc-swap", "futures", "lazy_static", - "logging", + "stdio", "tokio", "workunit_store", ] @@ -3487,10 +3496,11 @@ checksum = "373c8a200f9e67a0c95e62a4f52fbf80c23b4381c05a17845531982fa99e6b33" name = "ui" version = "0.0.1" dependencies = [ + "console", "futures", "indexmap", "indicatif", - "logging", + "stdio", "task_executor", "uuid", "workunit_store", @@ -3730,7 +3740,6 @@ dependencies = [ "futures", "hashing", "log 0.4.11", - "logging", "notify", "parking_lot", "task_executor", diff --git a/src/rust/engine/Cargo.toml b/src/rust/engine/Cargo.toml index d3ef02361e9..568e882fab7 100644 --- a/src/rust/engine/Cargo.toml +++ b/src/rust/engine/Cargo.toml @@ -29,6 +29,7 @@ members = [ "fs/store", "graph", "hashing", + "stdio", "logging", "nailgun", "process_execution", @@ -65,6 +66,7 @@ default-members = [ "fs/store", "graph", "hashing", + "stdio", "logging", "nailgun", "process_execution", @@ -111,6 +113,7 @@ hashing = { path = "hashing" } indexmap = "1.4" itertools = "0.8.2" lazy_static = "1" +libc = "0.2.39" log = "0.4" logging = { path = "logging" } nailgun = { path = "nailgun" } @@ -123,6 +126,7 @@ reqwest = { version = "0.10", default_features = false, features = ["stream", "r rule_graph = { path = "rule_graph" } sharded_lmdb = { path = "sharded_lmdb" } smallvec = "0.6" +stdio = { path = "stdio" } store = { path = "fs/store" } task_executor = { path = "task_executor" } tempfile = "3" @@ -147,3 +151,5 @@ env_logger = "0.5.4" prost = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" } prost-build = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" } prost-types = { git = "https://github.com/danburkert/prost", rev = "a1cccbcee343e2c444e1cd2738c7fba2599fc391" } +# TODO: Posted as https://github.com/mitsuhiko/console/pull/93. +console = { git = "https://github.com/stuhood/console", branch = "stuhood/term-target-for-arbitrary-handles" } diff --git a/src/rust/engine/logging/Cargo.toml b/src/rust/engine/logging/Cargo.toml index c750d7dd688..2783f8f83a2 100644 --- a/src/rust/engine/logging/Cargo.toml +++ b/src/rust/engine/logging/Cargo.toml @@ -12,8 +12,9 @@ lazy_static = "1" log = "0.4" num_enum = "0.4" parking_lot = "0.11" -tokio = { version = "0.2.23", features = ["rt-util"] } regex = "1" +stdio = { path = "../stdio" } +tokio = { version = "0.2.23", features = ["rt-util"] } uuid = { version = "0.7", features = ["v4"] } [build-dependencies] diff --git a/src/rust/engine/logging/src/lib.rs b/src/rust/engine/logging/src/lib.rs index 029d76ccf4a..2fca2014317 100644 --- a/src/rust/engine/logging/src/lib.rs +++ b/src/rust/engine/logging/src/lib.rs @@ -49,8 +49,6 @@ macro_rules! debug_log { pub mod logger; -pub use logger::{get_destination, scope_task_destination, set_thread_destination, Destination}; - pub type Logger = logger::PantsLogger; use num_enum::TryFromPrimitive; diff --git a/src/rust/engine/logging/src/logger.rs b/src/rust/engine/logging/src/logger.rs index 10349432371..7a608f45fd0 100644 --- a/src/rust/engine/logging/src/logger.rs +++ b/src/rust/engine/logging/src/logger.rs @@ -2,28 +2,23 @@ // Licensed under the Apache License, Version 2.0 (see LICENSE). use crate::PythonLogLevel; -use colored::*; use std::cell::RefCell; use std::collections::HashMap; -use std::convert::{TryFrom, TryInto}; +use std::convert::TryInto; use std::fs::File; use std::fs::OpenOptions; -use std::future::Future; use std::io::Write; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; +use colored::*; use lazy_static::lazy_static; use log::{debug, log, set_logger, set_max_level, LevelFilter, Log, Metadata, Record}; use parking_lot::Mutex; use regex::Regex; -use tokio::task_local; -use uuid::Uuid; const TIME_FORMAT_STR: &str = "%H:%M:%S"; -pub type StdioHandler = Box Result<(), ()> + Send>; - lazy_static! { pub static ref PANTS_LOGGER: PantsLogger = PantsLogger::new(); } @@ -35,7 +30,6 @@ pub struct PantsLogger { global_level: Mutex>, use_color: AtomicBool, show_rust_3rdparty_logs: AtomicBool, - stderr_handlers: Mutex>, show_target: AtomicBool, log_level_filters: Mutex>, message_regex_filters: Mutex>, @@ -49,7 +43,6 @@ impl PantsLogger { global_level: Mutex::new(RefCell::new(LevelFilter::Off)), show_rust_3rdparty_logs: AtomicBool::new(true), use_color: AtomicBool::new(false), - stderr_handlers: Mutex::new(HashMap::new()), show_target: AtomicBool::new(false), log_level_filters: Mutex::new(HashMap::new()), message_regex_filters: Mutex::new(Vec::new()), @@ -63,7 +56,8 @@ impl PantsLogger { show_target: bool, log_levels_by_target: HashMap, message_regex_filters: Vec, - ) { + log_file_path: PathBuf, + ) -> Result<(), String> { let log_levels_by_target = log_levels_by_target .iter() .map(|(k, v)| { @@ -75,30 +69,38 @@ impl PantsLogger { }) .collect::>(); - let max_python_level: Result = max_level.try_into(); - match max_python_level { - Ok(python_level) => { - let level: LevelFilter = python_level.into(); - // TODO this should be whatever the most verbose log level specified in log_domain_levels - - // but I'm not sure if it's actually much of a gain over just setting this to Trace. - set_max_level(LevelFilter::Trace); - PANTS_LOGGER.global_level.lock().replace(level); - - PANTS_LOGGER.use_color.store(use_color, Ordering::SeqCst); - PANTS_LOGGER - .show_rust_3rdparty_logs - .store(show_rust_3rdparty_logs, Ordering::SeqCst); - *PANTS_LOGGER.log_level_filters.lock() = log_levels_by_target; - *PANTS_LOGGER.message_regex_filters.lock() = message_regex_filters; - PANTS_LOGGER - .show_target - .store(show_target, Ordering::SeqCst); - if set_logger(&*PANTS_LOGGER).is_err() { - debug!("Logging already initialized."); - } - } - Err(err) => panic!("Unrecognised log level from Python: {}: {}", max_level, err), + let max_python_level: PythonLogLevel = max_level + .try_into() + .map_err(|e| format!("Unrecognised log level from Python: {}: {}", max_level, e))?; + let level: LevelFilter = max_python_level.into(); + + // TODO this should be whatever the most verbose log level specified in log_domain_levels - + // but I'm not sure if it's actually much of a gain over just setting this to Trace. + set_max_level(LevelFilter::Trace); + PANTS_LOGGER.global_level.lock().replace(level); + + PANTS_LOGGER.use_color.store(use_color, Ordering::SeqCst); + PANTS_LOGGER + .show_rust_3rdparty_logs + .store(show_rust_3rdparty_logs, Ordering::SeqCst); + *PANTS_LOGGER.log_level_filters.lock() = log_levels_by_target; + *PANTS_LOGGER.message_regex_filters.lock() = message_regex_filters; + PANTS_LOGGER + .show_target + .store(show_target, Ordering::SeqCst); + *PANTS_LOGGER.log_file.lock() = { + let log_file = OpenOptions::new() + .create(true) + .append(true) + .open(log_file_path) + .map_err(|err| format!("Error opening pantsd logfile: {}", err))?; + Some(log_file) }; + + if set_logger(&*PANTS_LOGGER).is_err() { + debug!("Logging already initialized."); + } + Ok(()) } pub fn set_per_run_logs(&self, per_run_log_path: Option) { @@ -118,31 +120,6 @@ impl PantsLogger { }; } - /// Set up a file logger to log_file_path. Returns the file descriptor of the log file. - #[cfg(unix)] - pub fn set_pantsd_logger( - &self, - log_file_path: PathBuf, - ) -> Result { - use std::os::unix::io::AsRawFd; - - { - // Maybe close open file by dropping the existing file handle - *self.log_file.lock() = None; - } - - OpenOptions::new() - .create(true) - .append(true) - .open(log_file_path) - .map(|file| { - let raw_fd = file.as_raw_fd(); - *self.log_file.lock() = Some(file); - raw_fd - }) - .map_err(|err| format!("Error opening pantsd logfile: {}", err)) - } - /// log_from_python is only used in the Python FFI, which in turn is only called within the /// Python `NativeHandler` class. Every logging call from Python should get proxied through this /// function, which translates the log message into the Rust log paradigm provided by @@ -152,18 +129,6 @@ impl PantsLogger { log!(target: target, level.into(), "{}", message); Ok(()) } - - pub fn register_stderr_handler(&self, callback: StdioHandler) -> Uuid { - let mut handlers = self.stderr_handlers.lock(); - let unique_id = Uuid::new_v4(); - handlers.insert(unique_id, callback); - unique_id - } - - pub fn deregister_stderr_handler(&self, unique_id: Uuid) { - let mut handlers = self.stderr_handlers.lock(); - handlers.remove(&unique_id); - } } impl Log for PantsLogger { @@ -204,7 +169,6 @@ impl Log for PantsLogger { return; } - let destination = get_destination(); let cur_date = chrono::Local::now(); let time_str = format!( "{}.{:02}", @@ -214,11 +178,8 @@ impl Log for PantsLogger { let show_target = self.show_target.load(Ordering::SeqCst); let level = record.level(); - let destination_is_file = match destination { - Destination::Pantsd => true, - Destination::Stderr => false, - }; - let use_color = self.use_color.load(Ordering::SeqCst) && (!destination_is_file); + // TODO: Fix application of color for log-files: see https://github.com/pantsbuild/pants/issues/11020 + let use_color = self.use_color.load(Ordering::SeqCst); let level_marker = match level { _ if !use_color => format!("[{}]", level).normal().clear(), @@ -231,14 +192,14 @@ impl Log for PantsLogger { let log_string = if show_target { format!( - "{} {} ({}) {}", + "{} {} ({}) {}\n", time_str, level_marker, record.target(), record.args(), ) } else { - format!("{} {} {}", time_str, level_marker, record.args()) + format!("{} {} {}\n", time_str, level_marker, record.args()) }; { @@ -251,38 +212,28 @@ impl Log for PantsLogger { } } + let log_bytes = log_string.as_bytes(); + { let mut maybe_per_run_file = self.per_run_logs.lock(); if let Some(ref mut file) = *maybe_per_run_file { // deliberately ignore errors writing to per-run log file - let _ = writeln!(file, "{}", log_string); + let _ = file.write_all(log_bytes); } } - match destination { - Destination::Stderr => { - // We first try to output to all registered handlers. If there are none, or any of them - // fail, then we fallback to sending directly to stderr. - let handlers_map = self.stderr_handlers.lock(); - let mut any_handler_failed = false; - for callback in handlers_map.values() { - let handler_res = callback(&log_string); - if handler_res.is_err() { - any_handler_failed = true; - } - } - if handlers_map.len() == 0 || any_handler_failed { - eprintln!("{}", log_string); - } - } - Destination::Pantsd => { - let mut maybe_file = self.log_file.lock(); - if let Some(ref mut file) = *maybe_file { - match writeln!(file, "{}", log_string) { - Ok(()) => (), - Err(e) => { - eprintln!("Error writing to log file: {}", e); - } + // Attempt to write to stdio, and write to the pantsd log if we fail (either because we don't + // have a valid stdio instance, or because of an error). + let destination = stdio::get_destination(); + if stdio::Destination::write_stderr_raw(&destination, log_bytes).is_err() { + let mut maybe_file = self.log_file.lock(); + if let Some(ref mut file) = *maybe_file { + match file.write_all(log_bytes) { + Ok(()) => (), + Err(e) => { + // If we've failed to write to stdio, but also to our log file, our only recourse is to + // try to write to a different file. + debug_log!("fatal.log", "Failed to write to log file {:?}: {}", file, e); } } } @@ -291,72 +242,3 @@ impl Log for PantsLogger { fn flush(&self) {} } - -/// -/// Thread- or task-local context for where the Logger should send log statements. -/// -/// We do this in a per-thread way because we find that Pants threads generally are either -/// daemon-specific or user-facing. We make sure that every time we spawn a thread on the Python -/// side, we set the thread-local information, and every time we submit a Future to a tokio Runtime -/// on the rust side, we set the task-local information. -/// -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -#[repr(C)] -pub enum Destination { - Pantsd, - Stderr, -} - -impl TryFrom<&str> for Destination { - type Error = String; - fn try_from(dest: &str) -> Result { - match dest { - "pantsd" => Ok(Destination::Pantsd), - "stderr" => Ok(Destination::Stderr), - other => Err(format!("Unknown log destination: {:?}", other)), - } - } -} - -thread_local! { - static THREAD_DESTINATION: RefCell = RefCell::new(Destination::Stderr) -} - -task_local! { - static TASK_DESTINATION: Destination; -} - -/// -/// Set the current log destination for a Thread, but _not_ for a Task. Tasks must always be spawned -/// by callers using the `scope_task_destination` helper (generally via task_executor::Executor.) -/// -pub fn set_thread_destination(destination: Destination) { - THREAD_DESTINATION.with(|thread_destination| { - *thread_destination.borrow_mut() = destination; - }) -} - -/// -/// Propagate the current log destination to a Future representing a newly spawned Task. Usage of -/// this method should mostly be contained to task_executor::Executor. -/// -pub async fn scope_task_destination(destination: Destination, f: F) -> F::Output -where - F: Future, -{ - TASK_DESTINATION.scope(destination, f).await -} - -/// -/// Get the current log destination, from either a Task or a Thread. -/// -/// TODO: Having this return an Option and tracking down all cases where it has defaulted would be -/// good. -/// -pub fn get_destination() -> Destination { - if let Ok(destination) = TASK_DESTINATION.try_with(|destination| *destination) { - destination - } else { - THREAD_DESTINATION.with(|destination| *destination.borrow()) - } -} diff --git a/src/rust/engine/nailgun/src/client.rs b/src/rust/engine/nailgun/src/client.rs index 738aafadeda..7304b5ab024 100644 --- a/src/rust/engine/nailgun/src/client.rs +++ b/src/rust/engine/nailgun/src/client.rs @@ -42,9 +42,20 @@ use futures::{try_join, SinkExt, Stream, StreamExt}; pub enum NailgunClientError { PreConnect(String), PostConnect(String), + BrokenPipe, KeyboardInterrupt, } +fn handle_postconnect_stdio(err: io::Error, msg: &str) -> NailgunClientError { + if err.kind() == io::ErrorKind::BrokenPipe { + // A BrokenPipe error is a semi-expected error caused when stdout/stderr closes, and which + // the Python runtime has a special error type and handling for. + NailgunClientError::BrokenPipe + } else { + NailgunClientError::PostConnect(format!("{}: {}", msg, err)) + } +} + async fn handle_client_output( mut stdio_read: impl Stream + Unpin, mut signal_stream: Signal, @@ -58,14 +69,10 @@ async fn handle_client_output( output = stdio_read.next() => { match output { Some(ChildOutput::Stdout(bytes)) => { - stdout.write_all(&bytes).await.map_err(|err| { - NailgunClientError::PostConnect(format!("Failed to write to stdout: {}", err)) - })? + stdout.write_all(&bytes).await.map_err(|err| handle_postconnect_stdio(err, "Failed to write to stdout"))? }, Some(ChildOutput::Stderr(bytes)) => { - stderr.write_all(&bytes).await.map_err(|err| { - NailgunClientError::PostConnect(format!("Failed to write to stderr: {}", err)) - })? + stderr.write_all(&bytes).await.map_err(|err| handle_postconnect_stdio(err, "Failed to write to stderr"))? }, None => break, } @@ -85,7 +92,7 @@ async fn handle_client_output( } } try_join!(stdout.flush(), stderr.flush()) - .map_err(|e| NailgunClientError::PostConnect(format!("Failed to flush stdio: {}", e)))?; + .map_err(|err| handle_postconnect_stdio(err, "Failed to flush stdio"))?; Ok(()) } diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index 48c38612189..4559569d8e4 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -64,7 +64,7 @@ use futures::Future; use hashing::Digest; use log::{self, debug, error, warn, Log}; use logging::logger::PANTS_LOGGER; -use logging::{Destination, Logger, PythonLogLevel}; +use logging::{Logger, PythonLogLevel}; use regex::Regex; use rule_graph::{self, RuleGraph}; use std::collections::hash_map::HashMap; @@ -103,46 +103,57 @@ py_module_initializer!(native_engine, |py, m| { m.add( py, - "init_logging", + "stdio_initialize", py_fn!( py, - init_logging(a: u64, b: bool, c: bool, d: bool, e: PyDict, f: Vec) + stdio_initialize( + a: u64, + b: bool, + c: bool, + d: bool, + e: PyDict, + f: Vec, + g: String + ) ), )?; m.add( py, - "setup_pantsd_logger", - py_fn!(py, setup_pantsd_logger(a: String)), + "stdio_thread_console_set", + py_fn!( + py, + stdio_thread_console_set(stdin_fileno: i32, stdout_fileno: i32, stderr_fileno: i32) + ), )?; - m.add(py, "setup_stderr_logger", py_fn!(py, setup_stderr_logger()))?; - m.add(py, "flush_log", py_fn!(py, flush_log()))?; m.add( py, - "override_thread_logging_destination", - py_fn!(py, override_thread_logging_destination(a: String)), + "stdio_thread_console_clear", + py_fn!(py, stdio_thread_console_clear()), )?; m.add( py, - "write_log", - py_fn!(py, write_log(a: String, b: u64, c: String)), + "stdio_write_stdout", + py_fn!(py, stdio_write_stdout(b: &[u8])), )?; m.add( py, - "set_per_run_log_path", - py_fn!(py, set_per_run_log_path(a: Option)), + "stdio_write_stderr", + py_fn!(py, stdio_write_stderr(b: &[u8])), )?; + m.add(py, "flush_log", py_fn!(py, flush_log()))?; m.add( py, - "write_stdout", - py_fn!(py, write_stdout(a: PySession, b: String)), + "write_log", + py_fn!(py, write_log(msg: String, level: u64, target: String)), )?; m.add( py, - "write_stderr", - py_fn!(py, write_stderr(a: PySession, b: String)), + "set_per_run_log_path", + py_fn!(py, set_per_run_log_path(a: Option)), )?; + m.add( py, "teardown_dynamic_ui", @@ -653,6 +664,9 @@ py_class!(class PyNailgunClient |py| { let err_str = format!("Nailgun client error: {:?}", s); PyErr::new::(py, (err_str,)) }, + NailgunClientError::BrokenPipe => { + PyErr::new::(py, NoArgs) + } NailgunClientError::KeyboardInterrupt => { PyErr::new::(py, NoArgs) } @@ -1820,7 +1834,7 @@ fn default_cache_path(py: Python) -> CPyResult { }) } -fn init_logging( +fn stdio_initialize( py: Python, level: u64, show_rust_3rdparty_logs: bool, @@ -1828,7 +1842,8 @@ fn init_logging( show_target: bool, log_levels_by_target: PyDict, message_regex_filters: Vec, -) -> PyUnitResult { + log_file: String, +) -> CPyResult { let log_levels_by_target = log_levels_by_target .items(py) .iter() @@ -1853,52 +1868,64 @@ fn init_logging( show_target, log_levels_by_target, message_regex_filters, - ); - Ok(None) + PathBuf::from(log_file), + ) + .map_err(|s| { + PyErr::new::(py, (format!("Could not initialize logging: {}", s),)) + })?; + + Ok(PyTuple::new( + py, + &[ + externs::stdio::py_stdio_read()?.into_object(), + externs::stdio::py_stdio_write(true)?.into_object(), + externs::stdio::py_stdio_write(false)?.into_object(), + ], + )) } -fn setup_pantsd_logger(py: Python, log_file: String) -> CPyResult { - logging::set_thread_destination(Destination::Pantsd); - let path = PathBuf::from(log_file); - PANTS_LOGGER - .set_pantsd_logger(path) - .map(i64::from) - .map_err(|e| PyErr::new::(py, (e,))) +fn stdio_thread_console_set( + _: Python, + stdin_fileno: i32, + stdout_fileno: i32, + stderr_fileno: i32, +) -> PyUnitResult { + let destination = stdio::new_console_destination(stdin_fileno, stdout_fileno, stderr_fileno); + stdio::set_thread_destination(destination); + Ok(None) } -fn setup_stderr_logger(_: Python) -> PyUnitResult { - logging::set_thread_destination(Destination::Stderr); +fn stdio_thread_console_clear(_: Python) -> PyUnitResult { + stdio::get_destination().console_clear(); Ok(None) } -fn set_per_run_log_path(py: Python, log_path: Option) -> PyUnitResult { +fn stdio_write_stdout(py: Python, payload: &[u8]) -> PyUnitResult { py.allow_threads(|| { - PANTS_LOGGER.set_per_run_logs(log_path.map(PathBuf::from)); + stdio::get_destination().write_stdout(payload); Ok(None) }) } -fn write_log(py: Python, msg: String, level: u64, path: String) -> PyUnitResult { +fn stdio_write_stderr(py: Python, payload: &[u8]) -> PyUnitResult { py.allow_threads(|| { - Logger::log_from_python(&msg, level, &path).expect("Error logging message"); + stdio::get_destination().write_stderr(payload); Ok(None) }) } -fn write_stdout(py: Python, session_ptr: PySession, msg: String) -> PyUnitResult { - with_session(py, session_ptr, |session| { - block_in_place_and_wait(py, || session.write_stdout(&msg).boxed_local()) - .map_err(|e| PyErr::new::(py, (e,)))?; +// TODO: Needs to be thread-local / associated with the Console. +fn set_per_run_log_path(py: Python, log_path: Option) -> PyUnitResult { + py.allow_threads(|| { + PANTS_LOGGER.set_per_run_logs(log_path.map(PathBuf::from)); Ok(None) }) } -fn write_stderr(py: Python, session_ptr: PySession, msg: String) -> PyUnitResult { - with_session(py, session_ptr, |session| { - py.allow_threads(|| { - session.write_stderr(&msg); - Ok(None) - }) +fn write_log(py: Python, msg: String, level: u64, target: String) -> PyUnitResult { + py.allow_threads(|| { + Logger::log_from_python(&msg, level, &target).expect("Error logging message"); + Ok(None) }) } @@ -1924,15 +1951,6 @@ fn flush_log(py: Python) -> PyUnitResult { }) } -fn override_thread_logging_destination(py: Python, destination: String) -> PyUnitResult { - let destination = destination - .as_str() - .try_into() - .map_err(|e| PyErr::new::(py, (e,)))?; - logging::set_thread_destination(destination); - Ok(None) -} - fn write_to_file(path: &Path, graph: &RuleGraph) -> io::Result<()> { let file = File::create(path)?; let mut f = io::BufWriter::new(file); diff --git a/src/rust/engine/src/externs/mod.rs b/src/rust/engine/src/externs/mod.rs index cd9ce1a4a31..fea0c21cc49 100644 --- a/src/rust/engine/src/externs/mod.rs +++ b/src/rust/engine/src/externs/mod.rs @@ -13,6 +13,7 @@ pub mod fs; mod interface; #[cfg(test)] mod interface_tests; +mod stdio; use std::collections::BTreeMap; use std::convert::AsRef; diff --git a/src/rust/engine/src/externs/stdio.rs b/src/rust/engine/src/externs/stdio.rs new file mode 100644 index 00000000000..3f92b627eed --- /dev/null +++ b/src/rust/engine/src/externs/stdio.rs @@ -0,0 +1,115 @@ +// Copyright 2020 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +#![deny(warnings)] +// Enable all clippy lints except for many of the pedantic ones. It's a shame this needs to be copied and pasted across crates, but there doesn't appear to be a way to include inner attributes from a common source. +#![deny( + clippy::all, + clippy::default_trait_access, + clippy::expl_impl_clone_on_copy, + clippy::if_not_else, + clippy::needless_continue, + clippy::unseparated_literal_suffix, +// TODO: Falsely triggers for async/await: +// see https://github.com/rust-lang/rust-clippy/issues/5360 +// clippy::used_underscore_binding +)] +// It is often more clear to show that nothing is being moved. +#![allow(clippy::match_ref_pats)] +// Subjective style. +#![allow( + clippy::len_without_is_empty, + clippy::redundant_field_names, + clippy::too_many_arguments +)] +// Default isn't as big a deal as people seem to think it is. +#![allow(clippy::new_without_default, clippy::new_ret_no_self)] +// Arc can be more clear than needing to grok Orderings: +#![allow(clippy::mutex_atomic)] +// File-specific allowances to silence internal warnings of `py_class!`. +#![allow( + unused_braces, + clippy::manual_strip, + clippy::used_underscore_binding, + clippy::transmute_ptr_to_ptr, + clippy::zero_ptr +)] + +use cpython::{exc, py_class, PyErr, PyObject, PyResult, Python}; + +/// +/// Data members and `create_instance` methods are module-private by default, so we expose them +/// with public top-level functions. +/// +/// TODO: See https://github.com/dgrunwald/rust-cpython/issues/242 +/// + +/// +/// A Python file-like that proxies to the `stdio` module, which implements thread-local input. +/// +pub fn py_stdio_read() -> PyResult { + let gil = Python::acquire_gil(); + PyStdioRead::create_instance(gil.python()) +} + +py_class!(pub class PyStdioRead |py| { + def isatty(&self) -> PyResult { + if let Ok(fd) = self.fileno(py) { + Ok(unsafe { libc::isatty(fd) != 0 }) + } else { + Ok(false) + } + } + + def fileno(&self) -> PyResult { + stdio::get_destination().stdin_as_raw_fd().map_err(|e| PyErr::new::(py, (e,))) + } +}); + +/// +/// A Python file-like that proxies to the `stdio` module, which implements thread-local output. +/// +pub fn py_stdio_write(is_stdout: bool) -> PyResult { + let gil = Python::acquire_gil(); + PyStdioWrite::create_instance(gil.python(), is_stdout) +} + +py_class!(pub class PyStdioWrite |py| { + data is_stdout: bool; + + def write(&self, payload: &str) -> PyResult { + let is_stdout = *self.is_stdout(py); + py.allow_threads(|| { + let destination = stdio::get_destination(); + if is_stdout { + destination.write_stdout(payload.as_bytes()); + } else { + destination.write_stderr(payload.as_bytes()); + } + }); + Ok(Python::None(py)) + } + + def isatty(&self) -> PyResult { + if let Ok(fd) = self.fileno(py) { + Ok(unsafe { libc::isatty(fd) != 0 }) + } else { + Ok(false) + } + } + + def fileno(&self) -> PyResult { + let destination = stdio::get_destination(); + let fd = if *self.is_stdout(py) { + destination.stdout_as_raw_fd() + } else { + destination.stderr_as_raw_fd() + }; + fd.map_err(|e| PyErr::new::(py, (e,))) + } + + def flush(&self) -> PyResult { + // All of our destinations are line-buffered. + Ok(Python::None(py)) + } +}); diff --git a/src/rust/engine/src/scheduler.rs b/src/rust/engine/src/scheduler.rs index ccbce6adc6c..509261057e5 100644 --- a/src/rust/engine/src/scheduler.rs +++ b/src/rust/engine/src/scheduler.rs @@ -5,6 +5,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::convert::TryInto; use std::io; use std::path::{Path, PathBuf}; +use std::process::Stdio; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -17,6 +18,7 @@ use futures::{future, FutureExt}; use graph::{InvalidationResult, LastObserved}; use hashing::{Digest, EMPTY_DIGEST}; use log::{debug, info, warn}; +use stdio::TryCloneAsFile; use tempfile::TempDir; use tokio::process; use tokio::sync::mpsc; @@ -243,6 +245,30 @@ impl Scheduler { let exit_status = session .with_console_ui_disabled(async move { + // Once any UI is torn down, grab exclusive access to the console. + let (term_stdin, term_stdout, term_stderr) = + stdio::get_destination().exclusive_start(Box::new(|_| { + // A stdio handler that will immediately trigger logging. + Err(()) + }))?; + // NB: Command's stdio methods take ownership of a file-like to use, so we use + // `TryCloneAsFile` here to `dup` our thread-local stdio. + command + .stdin(Stdio::from( + term_stdin + .try_clone_as_file() + .map_err(|e| format!("Couldn't clone stdin: {}", e))?, + )) + .stdout(Stdio::from( + term_stdout + .try_clone_as_file() + .map_err(|e| format!("Couldn't clone stdout: {}", e))?, + )) + .stderr(Stdio::from( + term_stderr + .try_clone_as_file() + .map_err(|e| format!("Couldn't clone stderr: {}", e))?, + )); let mut subprocess = command .spawn() .map_err(|e| format!("Error executing interactive process: {}", e))?; diff --git a/src/rust/engine/stdio/Cargo.toml b/src/rust/engine/stdio/Cargo.toml new file mode 100644 index 00000000000..6c8c4fee59a --- /dev/null +++ b/src/rust/engine/stdio/Cargo.toml @@ -0,0 +1,11 @@ +[package] +version = "0.0.1" +edition = "2018" +name = "stdio" +authors = [ "Pants Build " ] +publish = false + +[dependencies] +log = "0.4" +parking_lot = "0.11" +tokio = { version = "0.2.23", features = ["rt-util"] } diff --git a/src/rust/engine/stdio/src/lib.rs b/src/rust/engine/stdio/src/lib.rs new file mode 100644 index 00000000000..1dbb8870808 --- /dev/null +++ b/src/rust/engine/stdio/src/lib.rs @@ -0,0 +1,439 @@ +// Copyright 2018 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +#![deny(warnings)] +// Enable all clippy lints except for many of the pedantic ones. It's a shame this needs to be copied and pasted across crates, but there doesn't appear to be a way to include inner attributes from a common source. +#![deny( + clippy::all, + clippy::default_trait_access, + clippy::expl_impl_clone_on_copy, + clippy::if_not_else, + clippy::needless_continue, + clippy::unseparated_literal_suffix, + // TODO: Falsely triggers for async/await: + // see https://github.com/rust-lang/rust-clippy/issues/5360 + // clippy::used_underscore_binding +)] +// It is often more clear to show that nothing is being moved. +#![allow(clippy::match_ref_pats)] +// Subjective style. +#![allow( + clippy::len_without_is_empty, + clippy::redundant_field_names, + clippy::too_many_arguments +)] +// Default isn't as big a deal as people seem to think it is. +#![allow(clippy::new_without_default, clippy::new_ret_no_self)] +// Arc can be more clear than needing to grok Orderings: +#![allow(clippy::mutex_atomic)] + +mod term; + +pub use term::{TermReadDestination, TermWriteDestination, TryCloneAsFile}; + +use std::cell::RefCell; +use std::fmt; +use std::fs::File; +use std::future::Future; +use std::io::{Read, Write}; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::sync::Arc; + +use parking_lot::Mutex; +use tokio::task_local; + +/// +/// A Console wraps some "borrowed" file handles: when it is dropped, we forget about the file +/// handles rather than closing them. The file handles are optional only so that they may be +/// "taken" during Drop. +/// +#[derive(Debug)] +struct Console { + stdin_handle: Option, + stdout_handle: Option, + stderr_handle: Option, +} + +impl Console { + fn new(stdin_fd: RawFd, stdout_fd: RawFd, stderr_fd: RawFd) -> Console { + let (stdin, stdout, stderr) = unsafe { + ( + File::from_raw_fd(stdin_fd), + File::from_raw_fd(stdout_fd), + File::from_raw_fd(stderr_fd), + ) + }; + Console { + stdin_handle: Some(stdin), + stdout_handle: Some(stdout), + stderr_handle: Some(stderr), + } + } + + fn read_stdin(&mut self, buf: &mut [u8]) -> std::io::Result { + self.stdin_handle.as_ref().unwrap().read(buf) + } + + fn write_stdout(&mut self, content: &[u8]) -> Result<(), std::io::Error> { + let mut stdout = self.stdout_handle.as_ref().unwrap(); + stdout.write_all(content)?; + stdout.flush() + } + + fn write_stderr(&mut self, content: &[u8]) -> Result<(), std::io::Error> { + let mut stderr = self.stderr_handle.as_ref().unwrap(); + stderr.write_all(content)?; + stderr.flush() + } + + fn stdin_as_raw_fd(&self) -> RawFd { + self.stdin_handle.as_ref().unwrap().as_raw_fd() + } + + fn stdout_as_raw_fd(&self) -> RawFd { + self.stdout_handle.as_ref().unwrap().as_raw_fd() + } + + fn stderr_as_raw_fd(&self) -> RawFd { + self.stderr_handle.as_ref().unwrap().as_raw_fd() + } +} + +impl Drop for Console { + fn drop(&mut self) { + // "Forget" about our file handles without closing them. + self.stdin_handle.take().unwrap().into_raw_fd(); + self.stdout_handle.take().unwrap().into_raw_fd(); + self.stderr_handle.take().unwrap().into_raw_fd(); + } +} + +/// +/// Thread- or task-local context for where stdio should go. +/// +/// We do this in a per-thread way because we find that Pants threads generally are either: +/// 1. daemon-specific +/// 2. user-console bound +/// 3. directly/exclusively accessed +/// +/// We make sure that every time we spawn a thread on the Python side, we set the thread-local +/// information, and every time we submit a Future to a tokio Runtime on the rust side, we set +/// the task-local information. +/// +enum InnerDestination { + Logging, + Console(Console), + Exclusive { stderr_handler: StdioHandler }, +} + +impl fmt::Debug for InnerDestination { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Logging => f.debug_struct("Logging").finish(), + Self::Console(c) => f.debug_struct("Console").field("console", c).finish(), + Self::Exclusive { .. } => f + .debug_struct("Exclusive") + .field("stderr_handler", &"") + .finish(), + } + } +} + +#[derive(Debug)] +pub struct Destination(Mutex); + +impl Destination { + /// + /// Clears the Destination, setting it back to Logging. + /// + pub fn console_clear(&self) { + *self.0.lock() = InnerDestination::Logging; + } + + /// + /// Starts Exclusive access iff the Destination is currently a Console, and returns Read/Write + /// instances for stdin, stdout, stderr (respectively). + /// + /// Dropping the TermDestination will restore direct Console access. + /// + pub fn exclusive_start( + self: &Arc, + stderr_handler: StdioHandler, + ) -> Result< + ( + TermReadDestination, + TermWriteDestination, + TermWriteDestination, + ), + String, + > { + let mut destination = self.0.lock(); + if !matches!(*destination, InnerDestination::Console(..)) { + return Err(format!( + "Cannot start Exclusive access on Destination {:?}", + destination + )); + } + let console = std::mem::replace( + &mut *destination, + InnerDestination::Exclusive { stderr_handler }, + ); + match console { + InnerDestination::Console(console) => Ok(term::TermDestination::new(console, self.clone())), + _ => unreachable!(), + } + } + + /// + /// Clears Exclusive access and restores the Console. + /// + fn exclusive_clear(&self, console: Console) { + let mut destination = self.0.lock(); + if matches!(*destination, InnerDestination::Exclusive { .. }) { + *destination = InnerDestination::Console(console); + } else { + // Exclusive access was torn down independently: drop the Console. + *destination = InnerDestination::Logging; + } + } + + /// + /// Read from stdin if it is available on the current Destination. + /// + pub fn read_stdin(&self, buf: &mut [u8]) -> std::io::Result { + let mut destination = self.0.lock(); + match *destination { + InnerDestination::Console(ref mut console) => console.read_stdin(buf), + InnerDestination::Exclusive { .. } => Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "stdin is currently Exclusive owned.", + )), + InnerDestination::Logging => Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "No stdin attached.", + )), + } + } + + /// + /// Write the given content to the current stdout destination, falling back to logging if none is + /// available. + /// + pub fn write_stdout(&self, content: &[u8]) { + let mut destination = self.0.lock(); + let error_res = match *destination { + InnerDestination::Console(ref mut console) => { + // Write to the underlying Console. + let res = console.write_stdout(content); + if res.is_ok() { + return; + } + // If writing to the stdout handle fails, fall through to mutate self to drop it. + res.map_err(|e| e.to_string()) + } + InnerDestination::Logging | InnerDestination::Exclusive { .. } => { + // Release the lock on the Destination before logging. + std::mem::drop(destination); + log::info!("stdout: {:?}", String::from_utf8_lossy(content)); + return; + } + }; + + // Release the lock, clear the Console, log the error and retry. + let error_str = format!( + "Failed to write stdout to {:?}, falling back to Logging: {:?}", + destination, error_res + ); + std::mem::drop(destination); + self.console_clear(); + log::warn!("{}", error_str); + self.write_stdout(content); + } + + /// + /// Write the given content to the current stdout Destination, without falling back to Logging. + /// Returns an error if only Logging is available. + /// + /// NB: This method is used from the logging crate, where attempting to fall back to logging for + /// written stdio might result in infinite recursion. + /// + pub fn write_stderr_raw(&self, content: &[u8]) -> Result<(), String> { + let mut destination = self.0.lock(); + match *destination { + InnerDestination::Console(ref mut console) => { + console.write_stderr(content).map_err(|e| e.to_string()) + } + InnerDestination::Exclusive { ref stderr_handler } => { + stderr_handler(&String::from_utf8_lossy(content)) + .map_err(|()| "Exclusive handler failed.".to_owned()) + } + InnerDestination::Logging => { + Err("There is no 'real' stdio destination available.".to_owned()) + } + } + } + + /// + /// Write the given content to the current stderr destination, falling back to logging if none is + /// available. + /// + pub fn write_stderr(&self, content: &[u8]) { + let mut destination = self.0.lock(); + let error_res = match *destination { + InnerDestination::Console(ref mut console) => { + // Write to the underlying Console. + let res = console.write_stderr(content); + if res.is_ok() { + return; + } + // If writing to the stdout handle fails, fall through to mutate self to drop it. + res.map_err(|e| e.to_string()) + } + InnerDestination::Exclusive { ref stderr_handler } => { + // Write to the Exclusive handler. + let res = stderr_handler(&String::from_utf8_lossy(content)); + if res.is_ok() { + return; + } + // If writing to the stderr handler fails, fall through to clear it and try again. + res.map_err(|()| "Failed to write stderr to Exclusive".to_owned()) + } + InnerDestination::Logging => { + // Release the lock on the Destination before logging. + std::mem::drop(destination); + log::info!("stderr: {:?}", String::from_utf8_lossy(content)); + return; + } + }; + + // Release the lock, clear the Console, log the error and retry. + let error_str = format!( + "Failed to write stderr to {:?}, falling back to Logging: {:?}", + destination, error_res + ); + std::mem::drop(destination); + self.console_clear(); + log::warn!("{}", error_str); + self.write_stderr(content); + } + + /// + /// If stdin is backed by a real file, returns it as a RawFd. All usage of `RawFd` is unsafe, + /// but this method is additionally unsafe because the real file might have been closed by the + /// time the caller interacts with it. + /// + pub fn stdin_as_raw_fd(&self) -> Result { + match &*self.0.lock() { + InnerDestination::Console(console) => Ok(console.stdin_as_raw_fd()), + InnerDestination::Logging => { + Err("No associated file descriptor for the Logging destination".to_owned()) + } + InnerDestination::Exclusive { .. } => { + Err("A UI or process has exclusive access, and must be stopped before stdio is directly accessible.".to_owned()) + } + } + } + + /// + /// If stdout is backed by a real file, returns it as a RawFd. All usage of `RawFd` is unsafe, + /// but this method is additionally unsafe because the real file might have been closed by the + /// time the caller interacts with it. + /// + pub fn stdout_as_raw_fd(&self) -> Result { + match &*self.0.lock() { + InnerDestination::Console(console) => Ok(console.stdout_as_raw_fd()), + InnerDestination::Logging => { + Err("No associated file descriptor for the Logging destination".to_owned()) + } + InnerDestination::Exclusive { .. } => { + Err("A UI or process has exclusive access, and must be stopped before stdio is directly accessible.".to_owned()) + } + } + } + + /// + /// If stdout is backed by a real file, returns it as a RawFd. All usage of `RawFd` is unsafe, + /// but this method is additionally unsafe because the real file might have been closed by the + /// time the caller interacts with it. + /// + pub fn stderr_as_raw_fd(&self) -> Result { + match &*self.0.lock() { + InnerDestination::Console(console) => Ok(console.stderr_as_raw_fd()), + InnerDestination::Logging => { + Err("No associated file descriptor for the Logging destination".to_owned()) + } + InnerDestination::Exclusive { .. } => { + Err("A UI or process has exclusive access, and must be stopped before stdio is directly accessible.".to_owned()) + } + } + } +} + +thread_local! { + /// + /// See set_thread_destination. + /// + static THREAD_DESTINATION: RefCell> = RefCell::new(Arc::new(Destination(Mutex::new(InnerDestination::Logging)))) +} + +task_local! { + static TASK_DESTINATION: Arc; +} + +/// +/// Creates a Console that borrows the given file handles, and which can be set for a Thread +/// using `set_thread_destination`. +/// +pub fn new_console_destination( + stdin_fd: RawFd, + stdout_fd: RawFd, + stderr_fd: RawFd, +) -> Arc { + Arc::new(Destination(Mutex::new(InnerDestination::Console( + Console::new(stdin_fd, stdout_fd, stderr_fd), + )))) +} + +/// +/// Set the stdio Destination for the current Thread (which will propagate to spawned Tasks). +/// +/// Setting the Destination on the current Thread will cause it to be propagated to any Tasks +/// spawned by this Thread using the `scope_task_destination` helper (via task_executor::Executor). +/// +/// Note that `set_thread_destination` "replaces" the Destination for a Thread without affecting +/// work that was previously spawned by it, whereas `get_destination().console_clear()` would clear +/// the console for all previously spawned Thread/Tasks. +/// +/// See InnerDestination for more info. +/// +pub fn set_thread_destination(destination: Arc) { + THREAD_DESTINATION.with(|thread_destination| { + thread_destination.replace(destination); + }) +} + +/// +/// Propagate the current stdio Destination to a Future representing a newly spawned Task. Usage of +/// this method should mostly be contained to task_executor::Executor. +/// +/// See InnerDestination for more info. +/// +pub async fn scope_task_destination(destination: Arc, f: F) -> F::Output +where + F: Future, +{ + TASK_DESTINATION.scope(destination, f).await +} + +/// +/// Get the current stdio Destination. +/// +pub fn get_destination() -> Arc { + if let Ok(destination) = TASK_DESTINATION.try_with(|destination| destination.clone()) { + destination + } else { + THREAD_DESTINATION.with(|destination| destination.borrow().clone()) + } +} + +pub type StdioHandler = Box Result<(), ()> + Send>; diff --git a/src/rust/engine/stdio/src/term.rs b/src/rust/engine/stdio/src/term.rs new file mode 100644 index 00000000000..8cdd5939883 --- /dev/null +++ b/src/rust/engine/stdio/src/term.rs @@ -0,0 +1,149 @@ +use std::fs::File; +use std::io::{Read, Write}; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::sync::Arc; + +use parking_lot::Mutex; + +use crate::{Console, Destination}; + +/// +/// An implementation of Read and Write that reads from stdin and writes to stderr. +/// +/// Used to implement `console::Term` for use with the `indicatif` library. +/// +#[derive(Debug)] +pub(crate) struct TermDestination { + // Optional so that it can be restored to the destination on Drop. + console: Mutex>, + // The destination that the Console was taken from, and will be restored to. + destination: Arc, +} + +impl TermDestination { + pub(crate) fn new( + console: Console, + destination: Arc, + ) -> ( + TermReadDestination, + TermWriteDestination, + TermWriteDestination, + ) { + let term_destination = Arc::new(TermDestination { + console: Mutex::new(Some(console)), + destination, + }); + ( + TermReadDestination(term_destination.clone()), + TermWriteDestination { + destination: term_destination.clone(), + is_stderr: false, + }, + TermWriteDestination { + destination: term_destination, + is_stderr: true, + }, + ) + } +} + +#[derive(Debug)] +pub struct TermReadDestination(Arc); + +#[derive(Debug)] +pub struct TermWriteDestination { + destination: Arc, + is_stderr: bool, +} + +impl Read for TermReadDestination { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.0.console.lock().as_mut().unwrap().read_stdin(buf) + } +} + +impl AsRawFd for TermReadDestination { + fn as_raw_fd(&self) -> RawFd { + self.0.console.lock().as_ref().unwrap().stdin_as_raw_fd() + } +} + +impl Write for TermWriteDestination { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + if self.is_stderr { + self + .destination + .console + .lock() + .as_mut() + .unwrap() + .write_stderr(buf)?; + } else { + self + .destination + .console + .lock() + .as_mut() + .unwrap() + .write_stdout(buf)?; + } + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +impl AsRawFd for TermWriteDestination { + fn as_raw_fd(&self) -> RawFd { + if self.is_stderr { + self + .destination + .console + .lock() + .as_ref() + .unwrap() + .stderr_as_raw_fd() + } else { + self + .destination + .console + .lock() + .as_ref() + .unwrap() + .stdout_as_raw_fd() + } + } +} + +impl Drop for TermDestination { + fn drop(&mut self) { + self + .destination + .exclusive_clear(self.console.lock().take().unwrap()) + } +} + +/// +/// Attempt to clone the file handle behind this destination to turn it into an owned File +/// reference which can be closed independently. +/// +/// Roughly equivalent to `File::try_clone`. +/// +pub trait TryCloneAsFile { + fn try_clone_as_file(&self) -> std::io::Result; +} + +impl TryCloneAsFile for T { + fn try_clone_as_file(&self) -> std::io::Result { + let raw_fd = self.as_raw_fd(); + unsafe { + let underlying_file = File::from_raw_fd(raw_fd); + let cloned = underlying_file.try_clone()?; + // Drop the temporarily materialized file now that we've duped it. + let _ = underlying_file.into_raw_fd(); + Ok(cloned) + } + } +} diff --git a/src/rust/engine/task_executor/Cargo.toml b/src/rust/engine/task_executor/Cargo.toml index 95e66c3193e..eda711b8010 100644 --- a/src/rust/engine/task_executor/Cargo.toml +++ b/src/rust/engine/task_executor/Cargo.toml @@ -9,6 +9,6 @@ publish = false arc-swap = "1.2" futures = "0.3" lazy_static = "1" -logging = { path = "../logging" } +stdio = { path = "../stdio" } tokio = { version = "0.2.23", features = ["blocking", "rt-threaded"] } workunit_store = { path = "../workunit_store" } diff --git a/src/rust/engine/task_executor/src/lib.rs b/src/rust/engine/task_executor/src/lib.rs index 84f05abf7e1..b6d84dfe3d2 100644 --- a/src/rust/engine/task_executor/src/lib.rs +++ b/src/rust/engine/task_executor/src/lib.rs @@ -155,12 +155,12 @@ impl Executor { &self, f: F, ) -> impl Future { - let logging_destination = logging::get_destination(); + let stdio_destination = stdio::get_destination(); let workunit_store_handle = workunit_store::get_workunit_store_handle(); // NB: We unwrap here because the only thing that should cause an error in a spawned task is a // panic, in which case we want to propagate that. tokio::task::spawn_blocking(move || { - logging::set_thread_destination(logging_destination); + stdio::set_thread_destination(stdio_destination); workunit_store::set_thread_workunit_store_handle(workunit_store_handle); f() }) @@ -168,20 +168,20 @@ impl Executor { } /// - /// Copy our (thread-local or task-local) logging destination and current workunit parent into - /// the task. The former ensures that when a pantsd thread kicks off a future, any logging done + /// Copy our (thread-local or task-local) stdio destination and current workunit parent into + /// the task. The former ensures that when a pantsd thread kicks off a future, any stdio done /// by it ends up in the pantsd log as we expect. The latter ensures that when a new workunit /// is created it has an accurate handle to its parent. /// fn future_with_correct_context(future: F) -> impl Future { - let logging_destination = logging::get_destination(); + let stdio_destination = stdio::get_destination(); let workunit_store_handle = workunit_store::get_workunit_store_handle(); // NB: It is important that the first portion of this method is synchronous (meaning that this // method cannot be `async`), because that means that it will run on the thread that calls it. // The second, async portion of the method will run in the spawned Task. - logging::scope_task_destination(logging_destination, async move { + stdio::scope_task_destination(stdio_destination, async move { workunit_store::scope_task_workunit_store_handle(workunit_store_handle, future).await }) } diff --git a/src/rust/engine/ui/Cargo.toml b/src/rust/engine/ui/Cargo.toml index 9029b126d7e..4a6579008ed 100644 --- a/src/rust/engine/ui/Cargo.toml +++ b/src/rust/engine/ui/Cargo.toml @@ -8,10 +8,12 @@ authors = [ "Pants Build " ] path = "src/console_ui.rs" [dependencies] +console = "0.14" futures = "0.3" -indicatif = "0.14.0" indexmap = "1.4" -uuid = { version = "0.7", features = ["v4"] } -logging = { path = "../logging" } +# TODO: Posted as https://github.com/mitsuhiko/indicatif/pull/241. +indicatif = { git = "https://github.com/stuhood/indicatif", branch = "stuhood/properly-query-width" } +stdio = { path = "../stdio" } task_executor = { path = "../task_executor" } +uuid = { version = "0.7", features = ["v4"] } workunit_store = { path = "../workunit_store" } diff --git a/src/rust/engine/ui/src/console_ui.rs b/src/rust/engine/ui/src/console_ui.rs index 45947f1639d..b1dbf4eeade 100644 --- a/src/rust/engine/ui/src/console_ui.rs +++ b/src/rust/engine/ui/src/console_ui.rs @@ -35,9 +35,7 @@ use std::time::Duration; use futures::future::{self, FutureExt, TryFutureExt}; use indexmap::IndexMap; use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle}; -use uuid::Uuid; -use logging::logger::{StdioHandler, PANTS_LOGGER}; use task_executor::Executor; use workunit_store::{format_workunit_duration, WorkunitStore}; @@ -61,12 +59,6 @@ impl ConsoleUI { } } - fn default_draw_target() -> ProgressDrawTarget { - // NB: We render more frequently than we receive new data in order to minimize aliasing where a - // render might barely miss a data refresh. - ProgressDrawTarget::stderr_with_hz(Self::render_rate_hz() * 2) - } - /// /// The number of times per-second that `Self::render` should be called. /// @@ -108,8 +100,18 @@ impl ConsoleUI { } } - fn setup_bars(num_swimlanes: usize) -> (MultiProgress, Vec) { - let multi_progress_bars = MultiProgress::with_draw_target(Self::default_draw_target()); + fn setup_bars( + stderr_handler: stdio::StdioHandler, + num_swimlanes: usize, + ) -> Result<(MultiProgress, Vec), String> { + let (term_read, _, term_stderr_write) = + stdio::get_destination().exclusive_start(stderr_handler)?; + + let term = console::Term::read_write_pair(term_read, term_stderr_write); + // NB: We render more frequently than we receive new data in order to minimize aliasing where a + // render might barely miss a data refresh. + let draw_target = ProgressDrawTarget::to_term(term, Self::render_rate_hz() * 2); + let multi_progress_bars = MultiProgress::with_draw_target(draw_target); let bars = (0..num_swimlanes) .map(|_n| { @@ -118,7 +120,7 @@ impl ConsoleUI { }) .collect(); - (multi_progress_bars, bars) + Ok((multi_progress_bars, bars)) } fn get_label_from_heavy_hitters<'a>( @@ -182,14 +184,15 @@ impl ConsoleUI { pub fn initialize( &mut self, executor: Executor, - stderr_handler: StdioHandler, + stderr_handler: stdio::StdioHandler, ) -> Result<(), String> { if self.instance.is_some() { return Err("A ConsoleUI cannot render multiple UIs concurrently.".to_string()); } - // Setup bars, and then spawning rendering of the bars into a background task. - let (multi_progress, bars) = Self::setup_bars(self.local_parallelism); + // Setup bars (which will take ownership of the current Console), and then spawn rendering + // of the bars into a background task. + let (multi_progress, bars) = Self::setup_bars(stderr_handler, self.local_parallelism)?; let multi_progress_task = { executor .spawn_blocking(move || multi_progress.join()) @@ -199,7 +202,6 @@ impl ConsoleUI { self.instance = Some(Instance { tasks_to_display: IndexMap::new(), multi_progress_task, - logger_handle: PANTS_LOGGER.register_stderr_handler(stderr_handler), bars, }); Ok(()) @@ -212,7 +214,8 @@ impl ConsoleUI { if let Some(instance) = self.instance.take() { let sender = self.teardown_mpsc.0.clone(); self.teardown_in_progress = true; - PANTS_LOGGER.deregister_stderr_handler(instance.logger_handle); + // When the MultiProgress completes, the Term(Destination) is dropped, which will restore + // direct access to the Console. instance .multi_progress_task .map_err(|e| format!("Failed to render UI: {}", e)) @@ -232,5 +235,4 @@ struct Instance { tasks_to_display: IndexMap>, multi_progress_task: Pin> + Send>>, bars: Vec, - logger_handle: Uuid, } diff --git a/src/rust/engine/watch/Cargo.toml b/src/rust/engine/watch/Cargo.toml index da3962aa6d2..7c2e7deca61 100644 --- a/src/rust/engine/watch/Cargo.toml +++ b/src/rust/engine/watch/Cargo.toml @@ -11,7 +11,6 @@ fs = { path = "../fs" } futures = "0.3" hashing = { path = "../hashing" } log = "0.4" -logging = { path = "../logging" } # TODO: See https://github.com/notify-rs/notify/issues/255. notify = { git = "https://github.com/pantsbuild/notify", rev = "64880f0662db2b5ecbf25f1cccdca64bb8fac1bc" } parking_lot = "0.11" diff --git a/src/rust/engine/watch/src/lib.rs b/src/rust/engine/watch/src/lib.rs index 640591a14ed..064fd1cf2f1 100644 --- a/src/rust/engine/watch/src/lib.rs +++ b/src/rust/engine/watch/src/lib.rs @@ -149,7 +149,6 @@ impl InvalidationWatcher { watch_receiver: Receiver>, ) -> thread::JoinHandle<()> { thread::spawn(move || { - logging::set_thread_destination(logging::Destination::Pantsd); let exit_msg = loop { let event_res = watch_receiver.recv_timeout(Duration::from_millis(10)); let invalidatable = if let Some(g) = invalidatable.upgrade() { diff --git a/tests/python/pants_test/init/test_logging.py b/tests/python/pants_test/init/test_logging.py index f60f759b2bd..e0e92dcea5e 100644 --- a/tests/python/pants_test/init/test_logging.py +++ b/tests/python/pants_test/init/test_logging.py @@ -4,63 +4,61 @@ import logging from pathlib import Path -from pants.engine.internals.native import Native -from pants.init.logging import setup_logging_to_file +from pants.engine.internals import native_engine +from pants.init.logging import initialize_stdio +from pants.testutil.option_util import create_options_bootstrapper from pants.util.contextutil import temporary_dir from pants.util.logging import LogLevel def test_file_logging() -> None: - native = Native() - native.init_rust_logging( - level=LogLevel.INFO.level, # Tests assume a log level of INFO - log_show_rust_3rdparty=False, - use_color=False, - show_target=False, - log_levels_by_target={}, - message_regex_filters=(), - ) - logger = logging.getLogger("my_file_logger") with temporary_dir() as tmpdir: - setup_logging_to_file(LogLevel.INFO, log_dir=tmpdir) - log_file = Path(tmpdir, "pants.log") + ob = create_options_bootstrapper([f"--pants-workdir={tmpdir}"]) - cat = "🐈" - logger.warning("this is a warning") - logger.info("this is some info") - logger.debug("this is some debug info") - logger.info(f"unicode: {cat}") + # Do not set up a stdio destination, meaning that all messages will go to the log. + global_bootstrap_options = ob.bootstrap_options.for_global_scope() + with initialize_stdio(global_bootstrap_options): + logger = logging.getLogger(None) - loglines = log_file.read_text().splitlines() - print(loglines) - assert len(loglines) == 3 - assert "[WARN] this is a warning" in loglines[0] - assert "[INFO] this is some info" in loglines[1] - assert f"[INFO] unicode: {cat}" in loglines[2] + cat = "🐈" + logger.warning("this is a warning") + logger.info("this is some info") + logger.debug("this is some debug info") + logger.info(f"unicode: {cat}") + + loglines = ( + Path(global_bootstrap_options.pants_workdir, "pants.log").read_text().splitlines() + ) + print(loglines) + assert len(loglines) == 3 + assert "[WARN] this is a warning" in loglines[0] + assert "[INFO] this is some info" in loglines[1] + assert f"[INFO] unicode: {cat}" in loglines[2] def test_log_filtering_by_rule() -> None: - native = Native() - native.init_rust_logging( - level=LogLevel.INFO.level, - log_show_rust_3rdparty=False, - use_color=False, - show_target=True, - log_levels_by_target={ - "debug_target": LogLevel.DEBUG, - }, - message_regex_filters=(), - ) with temporary_dir() as tmpdir: - setup_logging_to_file(LogLevel.INFO, log_dir=tmpdir) - log_file = Path(tmpdir, "pants.log") + ob = create_options_bootstrapper( + [f"--pants-workdir={tmpdir}", '--log-levels-by-target={"debug_target": "debug"}'] + ) - native.write_log(msg="log msg one", level=LogLevel.INFO.level, target="some.target") - native.write_log(msg="log msg two", level=LogLevel.DEBUG.level, target="some.other.target") - native.write_log(msg="log msg three", level=LogLevel.DEBUG.level, target="debug_target") + # Do not set up a stdio destination, meaning that all messages will go to the log. + global_bootstrap_options = ob.bootstrap_options.for_global_scope() + with initialize_stdio(global_bootstrap_options): + native_engine.write_log( + msg="log msg one", level=LogLevel.INFO.level, target="some.target" + ) + native_engine.write_log( + msg="log msg two", level=LogLevel.DEBUG.level, target="some.other.target" + ) + native_engine.write_log( + msg="log msg three", level=LogLevel.DEBUG.level, target="debug_target" + ) - loglines = log_file.read_text().splitlines() + loglines = ( + Path(global_bootstrap_options.pants_workdir, "pants.log").read_text().splitlines() + ) - assert "[INFO] (some.target) log msg one" in loglines[0] - assert "[DEBUG] (debug_target) log msg three" in loglines[1] - assert len(loglines) == 2 + assert "[INFO] log msg one" in loglines[0] + assert "[DEBUG] log msg three" in loglines[1] + assert len(loglines) == 2 diff --git a/tests/python/pants_test/logging/native_engine_logging_integration_test.py b/tests/python/pants_test/logging/native_engine_logging_integration_test.py index 265517d8cd2..13fb4d78b3e 100644 --- a/tests/python/pants_test/logging/native_engine_logging_integration_test.py +++ b/tests/python/pants_test/logging/native_engine_logging_integration_test.py @@ -3,7 +3,7 @@ import re -from pants.testutil.pants_integration_test import read_pantsd_log, run_pants +from pants.testutil.pants_integration_test import read_pants_log, run_pants from pants_test.pantsd.pantsd_integration_test_base import PantsDaemonIntegrationTestBase @@ -30,6 +30,7 @@ def test_pantsd_file_logging(self) -> None: ) ctx.checker.assert_started() assert "[DEBUG] Connecting to pantsd on port" in daemon_run.stderr + assert "[DEBUG] Connected to pantsd" in daemon_run.stderr - pantsd_log = "\n".join(read_pantsd_log(ctx.workdir)) - assert "[DEBUG] Logging reinitialized in pantsd context" in pantsd_log + pants_log = "\n".join(read_pants_log(ctx.workdir)) + assert "[INFO] handling request" in pants_log diff --git a/tests/python/pants_test/pantsd/pantsd_integration_test.py b/tests/python/pants_test/pantsd/pantsd_integration_test.py index d5066766306..bf906fb0d39 100644 --- a/tests/python/pants_test/pantsd/pantsd_integration_test.py +++ b/tests/python/pants_test/pantsd/pantsd_integration_test.py @@ -15,7 +15,7 @@ from pants.testutil.pants_integration_test import ( PantsJoinHandle, - read_pantsd_log, + read_pants_log, setup_tmpdir, temporary_workdir, ) @@ -274,8 +274,8 @@ def test_pantsd_invalidation_file_tracking(self): time.sleep(5) ctx.checker.assert_running() - def full_pantsd_log(): - return "\n".join(read_pantsd_log(ctx.workdir)) + def full_pants_log(): + return "\n".join(read_pants_log(ctx.workdir)) # Create a new file in test_dir with temporary_file(suffix=".py", binary_mode=False, root_dir=test_dir) as temp_f: @@ -284,7 +284,7 @@ def full_pantsd_log(): ctx.checker.assert_stopped() - self.assertIn("saw filesystem changes covered by invalidation globs", full_pantsd_log()) + self.assertIn("saw filesystem changes covered by invalidation globs", full_pants_log()) def test_pantsd_invalidation_pants_toml_file(self): # Test tmp_pants_toml (--pants-config-files=$tmp_pants_toml)'s removal @@ -612,11 +612,10 @@ def test_concurrent_overrides_pantsd(self): config = {"GLOBAL": {"concurrent": True, "pantsd": True}} with temporary_workdir() as workdir: pants_run = self.run_pants_with_workdir( - ["help", "goals"], workdir=workdir, config=config + ["-ldebug", "help", "goals"], workdir=workdir, config=config ) pants_run.assert_success() - pantsd_log_location = os.path.join(workdir, "pantsd", "pantsd.log") - self.assertFalse(os.path.exists(pantsd_log_location)) + self.assertNotIn("Connecting to pantsd", pants_run.stderr) def test_unhandled_exceptions_only_log_exceptions_once(self): """Tests that the unhandled exceptions triggered by LocalPantsRunner instances don't diff --git a/tests/python/pants_test/pantsd/pantsd_integration_test_base.py b/tests/python/pants_test/pantsd/pantsd_integration_test_base.py index 50e36863a86..d2853678bc6 100644 --- a/tests/python/pants_test/pantsd/pantsd_integration_test_base.py +++ b/tests/python/pants_test/pantsd/pantsd_integration_test_base.py @@ -16,7 +16,7 @@ PantsJoinHandle, PantsResult, kill_daemon, - read_pantsd_log, + read_pants_log, run_pants, run_pants_with_workdir, run_pants_with_workdir_without_waiting, @@ -160,10 +160,10 @@ def pantsd_test_context( kill_daemon(pid_dir) checker.assert_stopped() finally: - banner("BEGIN pantsd.log") - for line in read_pantsd_log(workdir): + banner("BEGIN pants.log") + for line in read_pants_log(workdir): print(line) - banner("END pantsd.log") + banner("END pants.log") @contextmanager def pantsd_successful_run_context(self, *args, **kwargs) -> Iterator[PantsdRunContext]: diff --git a/tests/python/pants_test/pantsd/test_pants_daemon.py b/tests/python/pants_test/pantsd/test_pants_daemon.py deleted file mode 100644 index 1f6e69d51cb..00000000000 --- a/tests/python/pants_test/pantsd/test_pants_daemon.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md). -# Licensed under the Apache License, Version 2.0 (see LICENSE). - -import logging -import sys -import unittest.mock - -from pants.engine.internals.native import Native -from pants.engine.internals.native_engine import PyExecutor -from pants.pantsd.pants_daemon import PantsDaemon -from pants.pantsd.pants_daemon_core import PantsDaemonCore -from pants.pantsd.service.pants_service import PantsServices -from pants.testutil.option_util import create_options_bootstrapper -from pants.util.contextutil import stdio_as - -PATCH_OPTS = dict(autospec=True, spec_set=True) - - -TEST_LOG_LEVEL = logging.INFO - - -@unittest.mock.patch("os.close", **PATCH_OPTS) -def test_close_stdio(mock_close): - ob = create_options_bootstrapper([]) - mock_server = unittest.mock.Mock() - - def create_services(bootstrap_options, legacy_graph_scheduler): - return PantsServices() - - pantsd = PantsDaemon( - native=Native(), - work_dir="test_work_dir", - log_level=logging.INFO, - server=mock_server, - core=PantsDaemonCore(ob, PyExecutor(2, 4), create_services), - metadata_base_dir="/tmp/pants_test_metadata_dir", - bootstrap_options=ob.bootstrap_options, - ) - - with stdio_as(-1, -1, -1): - handles = (sys.stdin, sys.stdout, sys.stderr) - fds = [h.fileno() for h in handles] - pantsd._close_stdio() - mock_close.assert_has_calls(unittest.mock.call(x) for x in fds) - for handle in handles: - assert handle.closed is True