From cb886503b16ff4631a0489efdf91af4d7dc36d43 Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Tue, 12 Jul 2022 15:02:15 -0700 Subject: [PATCH] Add support for `InteractiveProcess` consuming inputs while `run_in_workspace=True` (Cherry-pick of #16093) (#16148) As described in #16105, blocking code in `@rule` bodies can trigger a shutdown race condition when `pantsd` is disabled and `Ctrl+C` is sent. Longer term solutions are discussed on that issue, but in the short term, we can avoid using blocking code for `InteractiveProcess`, such that `run` uses the same sandbox creation, async teardown, and relativizing code as `Process` does. Fixes #13852, fixes #14386, fixes #16120, and fixes #15771. --- .../backend/docker/goals/publish_test.py | 17 +- .../pants/backend/helm/goals/publish_test.py | 12 +- .../backend/python/goals/publish_test.py | 20 +-- .../goals/pytest_runner_integration_test.py | 2 +- .../goals/run_pex_binary_integration_test.py | 6 +- .../run_python_source_integration_test.py | 2 +- src/python/pants/backend/scala/goals/repl.py | 15 +- src/python/pants/core/goals/export_test.py | 2 +- src/python/pants/core/goals/repl.py | 52 +++--- src/python/pants/core/goals/run.py | 54 +++--- src/python/pants/engine/process.py | 79 +++------ src/python/pants/engine/process_test.py | 44 ++--- src/python/pants/jvm/jdk_rules.py | 15 +- src/python/pants/jvm/run_deploy_jar.py | 2 +- src/python/pants/testutil/process_util.py | 16 ++ .../process_execution/src/immutable_inputs.rs | 18 +- .../engine/process_execution/src/local.rs | 137 +++++++++------ .../process_execution/src/local_tests.rs | 4 +- src/rust/engine/src/context.rs | 44 ++--- src/rust/engine/src/intrinsics.rs | 164 +++++------------- src/rust/engine/src/nodes.rs | 16 +- 21 files changed, 318 insertions(+), 403 deletions(-) create mode 100644 src/python/pants/testutil/process_util.py diff --git a/src/python/pants/backend/docker/goals/publish_test.py b/src/python/pants/backend/docker/goals/publish_test.py index c84ff250452..5b9c42a2162 100644 --- a/src/python/pants/backend/docker/goals/publish_test.py +++ b/src/python/pants/backend/docker/goals/publish_test.py @@ -3,7 +3,7 @@ from __future__ import annotations -from typing import cast +from typing import Callable, cast import pytest @@ -22,7 +22,9 @@ from pants.core.goals.publish import PublishPackages, PublishProcesses from pants.engine.addresses import Address from pants.engine.fs import EMPTY_DIGEST +from pants.engine.process import Process from pants.testutil.option_util import create_subsystem +from pants.testutil.process_util import process_assertion from pants.testutil.rule_runner import QueryRule, RuleRunner from pants.util.frozendict import FrozenDict @@ -90,24 +92,17 @@ def assert_publish( publish: PublishPackages, expect_names: tuple[str, ...], expect_description: str | None, - expect_process, + expect_process: Callable[[Process], None] | None, ) -> None: assert publish.names == expect_names assert publish.description == expect_description if expect_process: - expect_process(publish.process) + assert publish.process + expect_process(publish.process.process) else: assert publish.process is None -def process_assertion(**assertions): - def assert_process(process): - for attr, expected in assertions.items(): - assert getattr(process, attr) == expected - - return assert_process - - def test_docker_skip_push(rule_runner: RuleRunner) -> None: result, _ = run_publish(rule_runner, Address("src/skip-test")) assert len(result) == 1 diff --git a/src/python/pants/backend/helm/goals/publish_test.py b/src/python/pants/backend/helm/goals/publish_test.py index 5e3a5c1e66e..5bb8719d5d6 100644 --- a/src/python/pants/backend/helm/goals/publish_test.py +++ b/src/python/pants/backend/helm/goals/publish_test.py @@ -20,6 +20,7 @@ from pants.core.util_rules import external_tool from pants.engine.addresses import Address from pants.engine.fs import EMPTY_DIGEST +from pants.testutil.process_util import process_assertion from pants.testutil.rule_runner import QueryRule, RuleRunner @@ -81,19 +82,12 @@ def assert_publish( assert publish.names == expect_names assert publish.description == expect_description if expect_process: - expect_process(publish.process) + assert publish.process + expect_process(publish.process.process) else: assert publish.process is None -def process_assertion(**assertions): - def assert_process(process): - for attr, expected in assertions.items(): - assert getattr(process, attr) == expected - - return assert_process - - def _declare_targets(rule_runner: RuleRunner) -> None: rule_runner.write_files( { diff --git a/src/python/pants/backend/python/goals/publish_test.py b/src/python/pants/backend/python/goals/publish_test.py index 73ecaa71cc3..1f0ee49b478 100644 --- a/src/python/pants/backend/python/goals/publish_test.py +++ b/src/python/pants/backend/python/goals/publish_test.py @@ -4,6 +4,7 @@ from __future__ import annotations from textwrap import dedent +from typing import Callable import pytest @@ -20,6 +21,8 @@ from pants.core.util_rules.config_files import rules as config_files_rules from pants.engine.addresses import Address from pants.engine.fs import EMPTY_DIGEST +from pants.engine.process import Process +from pants.testutil.process_util import process_assertion from pants.testutil.rule_runner import QueryRule, RuleRunner from pants.util.frozendict import FrozenDict @@ -95,24 +98,17 @@ def assert_package( package: PublishPackages, expect_names: tuple[str, ...], expect_description: str, - expect_process, + expect_process: Callable[[Process], None] | None, ) -> None: assert package.names == expect_names assert package.description == expect_description if expect_process: - expect_process(package.process) + assert package.process + expect_process(package.process.process) else: assert package.process is None -def process_assertion(**assertions): - def assert_process(process): - for attr, expected in assertions.items(): - assert getattr(process, attr) == expected - - return assert_process - - def test_twine_upload(rule_runner, packages) -> None: rule_runner.write_files(project_files(skip_twine=False)) result = request_publish_processes(rule_runner, packages) @@ -211,6 +207,6 @@ def test_twine_cert_arg(rule_runner, packages, options, cert_arg) -> None: process = result[0].process assert process if cert_arg: - assert cert_arg in process.argv + assert cert_arg in process.process.argv else: - assert not any(arg.startswith("--cert") for arg in process.argv) + assert not any(arg.startswith("--cert") for arg in process.process.argv) diff --git a/src/python/pants/backend/python/goals/pytest_runner_integration_test.py b/src/python/pants/backend/python/goals/pytest_runner_integration_test.py index 716942ff225..2599ce641e4 100644 --- a/src/python/pants/backend/python/goals/pytest_runner_integration_test.py +++ b/src/python/pants/backend/python/goals/pytest_runner_integration_test.py @@ -606,7 +606,7 @@ def test_debug_adaptor_request_argv(rule_runner: RuleRunner) -> None: inputs = [PythonTestFieldSet.create(tgt)] request = rule_runner.request(TestDebugAdapterRequest, inputs) assert request.process is not None - assert request.process.argv == ( + assert request.process.process.argv == ( "./pytest_runner.pex_pex_shim.sh", "--listen", "127.0.0.1:5678", diff --git a/src/python/pants/backend/python/goals/run_pex_binary_integration_test.py b/src/python/pants/backend/python/goals/run_pex_binary_integration_test.py index 08ed2b93992..ee15e6915f5 100644 --- a/src/python/pants/backend/python/goals/run_pex_binary_integration_test.py +++ b/src/python/pants/backend/python/goals/run_pex_binary_integration_test.py @@ -107,10 +107,10 @@ def run(*extra_args: str, **extra_env: str) -> PantsResult: file = result.stdout.strip() if use_new_semantics_args: assert file.endswith("utils/strutil.py") - assert ".pants.d/tmp" not in file + assert "pants-sandbox-" not in file else: assert file.endswith("src_root2/utils/strutil.py") - assert ".pants.d/tmp" in file + assert "pants-sandbox-" in file assert result.exit_code == 23 if include_tools: @@ -270,4 +270,4 @@ def test_filename_spec_ambiutity(use_new_semantics_args) -> None: result = run_pants(args) file = result.stdout.strip() assert file.endswith("src/app.py") - assert ".pants.d/tmp" in file + assert "pants-sandbox-" in file diff --git a/src/python/pants/backend/python/goals/run_python_source_integration_test.py b/src/python/pants/backend/python/goals/run_python_source_integration_test.py index 2555f8558d9..98c1f7a098f 100644 --- a/src/python/pants/backend/python/goals/run_python_source_integration_test.py +++ b/src/python/pants/backend/python/goals/run_python_source_integration_test.py @@ -84,7 +84,7 @@ def run(*extra_args: str, **extra_env: str) -> Tuple[PantsResult, str]: file = result.stdout.strip() if run_in_sandbox: assert file.endswith("src_root2/utils/strutil.py") - assert ".pants.d/tmp" in file + assert "pants-sandbox-" in file else: assert file.endswith(os.path.join(test_repo_root, "src_root2/utils/strutil.py")) assert result.exit_code == 23 diff --git a/src/python/pants/backend/scala/goals/repl.py b/src/python/pants/backend/scala/goals/repl.py index 6ffebe8d0ca..3c10604871a 100644 --- a/src/python/pants/backend/scala/goals/repl.py +++ b/src/python/pants/backend/scala/goals/repl.py @@ -1,5 +1,6 @@ # Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). # Licensed under the Apache License, Version 2.0 (see LICENSE). + from __future__ import annotations from pants.backend.scala.subsystems.scala import ScalaSubsystem @@ -66,32 +67,26 @@ async def create_scala_repl_request( Get(Digest, AddPrefix(d, user_classpath_prefix)) for d in user_classpath.digests() ) - # TODO: Manually merging the `immutable_input_digests` since InteractiveProcess doesn't - # support them yet. See https://github.com/pantsbuild/pants/issues/13852. - jdk_digests = await MultiGet( - Get(Digest, AddPrefix(digest, relpath)) - for relpath, digest in jdk.immutable_input_digests.items() - ) - repl_digest = await Get( Digest, - MergeDigests([*prefixed_user_classpath, tool_classpath.content.digest, *jdk_digests]), + MergeDigests([*prefixed_user_classpath, tool_classpath.content.digest]), ) return ReplRequest( digest=repl_digest, args=[ - *jdk.args(bash, tool_classpath.classpath_entries()), + *jdk.args(bash, tool_classpath.classpath_entries(), chroot="{chroot}"), "-Dscala.usejavacp=true", "scala.tools.nsc.MainGenericRunner", "-classpath", ":".join(user_classpath.args(prefix=user_classpath_prefix)), ], + run_in_workspace=False, extra_env={ **jdk.env, "PANTS_INTERNAL_ABSOLUTE_PREFIX": "", }, - run_in_workspace=False, + immutable_input_digests=jdk.immutable_input_digests, append_only_caches=jdk.append_only_caches, ) diff --git a/src/python/pants/core/goals/export_test.py b/src/python/pants/core/goals/export_test.py index 9acc8b5ccaf..66935aeb3b4 100644 --- a/src/python/pants/core/goals/export_test.py +++ b/src/python/pants/core/goals/export_test.py @@ -63,7 +63,7 @@ def mock_export( def _mock_run(rule_runner: RuleRunner, ip: InteractiveProcess) -> InteractiveProcessResult: subprocess.check_call( - ip.argv, + ip.process.argv, stderr=subprocess.STDOUT, env={ "PATH": os.environ.get("PATH", ""), diff --git a/src/python/pants/core/goals/repl.py b/src/python/pants/core/goals/repl.py index 9be5f19a63f..d722d270cf7 100644 --- a/src/python/pants/core/goals/repl.py +++ b/src/python/pants/core/goals/repl.py @@ -5,7 +5,6 @@ import os from abc import ABC from dataclasses import dataclass -from pathlib import PurePath from typing import ClassVar, Iterable, Mapping, Optional, Sequence, Tuple from pants.base.build_root import BuildRoot @@ -14,14 +13,12 @@ from pants.engine.environment import CompleteEnvironment from pants.engine.fs import Digest, Workspace from pants.engine.goal import Goal, GoalSubsystem -from pants.engine.internals.native_engine import EMPTY_DIGEST from pants.engine.process import InteractiveProcess, InteractiveProcessResult from pants.engine.rules import Effect, Get, collect_rules, goal_rule from pants.engine.target import FilteredTargets, Target from pants.engine.unions import UnionMembership, union from pants.option.global_options import GlobalOptions from pants.option.option_types import BoolOption, StrOption -from pants.util.contextutil import temporary_dir from pants.util.frozendict import FrozenDict from pants.util.memo import memoized_property from pants.util.meta import frozen_after_init @@ -38,10 +35,9 @@ class ReplImplementation(ABC): name: ClassVar[str] targets: Sequence[Target] - chroot: str # Absolute path of the chroot the sources will be materialized to. def in_chroot(self, relpath: str) -> str: - return os.path.join(self.chroot, relpath) + return os.path.join("{chroot}", relpath) @memoized_property def addresses(self) -> Addresses: @@ -78,6 +74,7 @@ class ReplRequest: digest: Digest args: Tuple[str, ...] extra_env: FrozenDict[str, str] + immutable_input_digests: FrozenDict[str, Digest] append_only_caches: FrozenDict[str, str] run_in_workspace: bool @@ -87,12 +84,14 @@ def __init__( digest: Digest, args: Iterable[str], extra_env: Optional[Mapping[str, str]] = None, + immutable_input_digests: Mapping[str, Digest] | None = None, append_only_caches: Mapping[str, str] | None = None, run_in_workspace: bool = True, ) -> None: self.digest = digest self.args = tuple(args) self.extra_env = FrozenDict(extra_env or {}) + self.immutable_input_digests = FrozenDict(immutable_input_digests or {}) self.append_only_caches = FrozenDict(append_only_caches or {}) self.run_in_workspace = run_in_workspace @@ -121,33 +120,22 @@ async def run_repl( ) return Repl(-1) - with temporary_dir(root_dir=global_options.pants_workdir, cleanup=False) as tmpdir: - repl_impl = repl_implementation_cls(targets=specified_targets, chroot=tmpdir) - request = await Get(ReplRequest, ReplImplementation, repl_impl) - - input_digest = request.digest - if request.run_in_workspace: - workspace.write_digest( - request.digest, - path_prefix=PurePath(tmpdir).relative_to(build_root.path).as_posix(), - # We don't want to influence whether the InteractiveProcess is able to restart. Because - # we're writing into a temp directory, we can safely mark this side_effecting=False. - side_effecting=False, - ) - input_digest = EMPTY_DIGEST - - env = {**complete_env, **request.extra_env} - result = await Effect( - InteractiveProcessResult, - InteractiveProcess( - argv=request.args, - env=env, - input_digest=input_digest, - run_in_workspace=request.run_in_workspace, - restartable=repl_subsystem.restartable, - append_only_caches=request.append_only_caches, - ), - ) + repl_impl = repl_implementation_cls(targets=specified_targets) + request = await Get(ReplRequest, ReplImplementation, repl_impl) + + env = {**complete_env, **request.extra_env} + result = await Effect( + InteractiveProcessResult, + InteractiveProcess( + argv=request.args, + env=env, + input_digest=request.digest, + run_in_workspace=request.run_in_workspace, + restartable=repl_subsystem.restartable, + immutable_input_digests=request.immutable_input_digests, + append_only_caches=request.append_only_caches, + ), + ) return Repl(result.exit_code) diff --git a/src/python/pants/core/goals/run.py b/src/python/pants/core/goals/run.py index daf2ac065e3..80940fee4d7 100644 --- a/src/python/pants/core/goals/run.py +++ b/src/python/pants/core/goals/run.py @@ -3,7 +3,6 @@ import logging from abc import ABCMeta from dataclasses import dataclass -from pathlib import PurePath from typing import Iterable, Mapping, Optional, Tuple from pants.base.build_root import BuildRoot @@ -25,7 +24,6 @@ from pants.engine.unions import UnionMembership, union from pants.option.global_options import GlobalOptions from pants.option.option_types import ArgsListOption, BoolOption -from pants.util.contextutil import temporary_dir from pants.util.frozendict import FrozenDict from pants.util.meta import frozen_after_init from pants.util.strutil import softwrap @@ -166,41 +164,29 @@ async def run( # Cleanup is the default, so we want to preserve the chroot if either option is off. cleanup = run_subsystem.cleanup and global_options.process_cleanup - with temporary_dir(root_dir=global_options.pants_workdir, cleanup=cleanup) as tmpdir: - if not cleanup: - logger.info(f"Preserving running binary chroot {tmpdir}") - workspace.write_digest( - request.digest, - path_prefix=PurePath(tmpdir).relative_to(build_root.path).as_posix(), - # We don't want to influence whether the InteractiveProcess is able to restart. Because - # we're writing into a temp directory, we can safely mark this side_effecting=False. - side_effecting=False, - ) - - args = (arg.format(chroot=tmpdir) for arg in request.args) - env = {**complete_env, **{k: v.format(chroot=tmpdir) for k, v in request.extra_env.items()}} - if run_subsystem.debug_adapter: - logger.info( - softwrap( - f""" - Launching debug adapter at '{debug_adapter.host}:{debug_adapter.port}', - which will wait for a client connection... - """ - ) + if run_subsystem.debug_adapter: + logger.info( + softwrap( + f""" + Launching debug adapter at '{debug_adapter.host}:{debug_adapter.port}', + which will wait for a client connection... + """ ) - - result = await Effect( - InteractiveProcessResult, - InteractiveProcess( - argv=(*args, *run_subsystem.args), - env=env, - run_in_workspace=True, - restartable=restartable, - ), ) - exit_code = result.exit_code - return Run(exit_code) + result = await Effect( + InteractiveProcessResult, + InteractiveProcess( + argv=(*request.args, *run_subsystem.args), + env={**complete_env, **request.extra_env}, + input_digest=request.digest, + run_in_workspace=True, + restartable=restartable, + cleanup=cleanup, + ), + ) + + return Run(result.exit_code) def rules(): diff --git a/src/python/pants/engine/process.py b/src/python/pants/engine/process.py index c006100b90e..6ba14e8e11c 100644 --- a/src/python/pants/engine/process.py +++ b/src/python/pants/engine/process.py @@ -9,12 +9,12 @@ from enum import Enum from typing import Iterable, Mapping +from pants.base.deprecated import warn_or_error from pants.engine.engine_aware import SideEffecting -from pants.engine.fs import EMPTY_DIGEST, AddPrefix, Digest, FileDigest, MergeDigests -from pants.engine.internals.selectors import MultiGet +from pants.engine.fs import EMPTY_DIGEST, Digest, FileDigest from pants.engine.internals.session import RunId from pants.engine.platform import Platform -from pants.engine.rules import Get, collect_rules, rule +from pants.engine.rules import collect_rules, rule from pants.option.global_options import ProcessCleanupOption from pants.util.frozendict import FrozenDict from pants.util.logging import LogLevel @@ -286,13 +286,13 @@ class InteractiveProcessResult: @frozen_after_init @dataclass(unsafe_hash=True) class InteractiveProcess(SideEffecting): - argv: tuple[str, ...] - env: FrozenDict[str, str] - input_digest: Digest + # NB: Although InteractiveProcess supports only some of the features of Process, we construct an + # underlying Process instance to improve code reuse. + process: Process run_in_workspace: bool forward_signals_to_process: bool restartable: bool - append_only_caches: FrozenDict[str, str] + cleanup: bool def __init__( self, @@ -303,7 +303,9 @@ def __init__( run_in_workspace: bool = False, forward_signals_to_process: bool = True, restartable: bool = False, + cleanup: bool = True, append_only_caches: Mapping[str, str] | None = None, + immutable_input_digests: Mapping[str, Digest] | None = None, ) -> None: """Request to run a subprocess in the foreground, similar to subprocess.run(). @@ -316,28 +318,18 @@ def __init__( sent to a process by hitting Ctrl-C in the terminal to actually reach the process, or capture that signal itself, blocking it from the process. """ - self.argv = tuple(argv) - self.env = FrozenDict(env or {}) - self.input_digest = input_digest + self.process = Process( + argv, + description="Interactive process", + env=env, + input_digest=input_digest, + append_only_caches=append_only_caches, + immutable_input_digests=immutable_input_digests, + ) self.run_in_workspace = run_in_workspace self.forward_signals_to_process = forward_signals_to_process self.restartable = restartable - self.append_only_caches = FrozenDict(append_only_caches or {}) - - self.__post_init__() - - def __post_init__(self): - if self.input_digest != EMPTY_DIGEST and self.run_in_workspace: - raise ValueError( - "InteractiveProcess should use the Workspace API to materialize any needed " - "files when it runs in the workspace" - ) - if self.append_only_caches and self.run_in_workspace: - raise ValueError( - "InteractiveProcess requested setup of append-only caches and also requested to run" - " in the workspace. These options are incompatible since setting up append-only" - " caches would modify the workspace." - ) + self.cleanup = cleanup @classmethod def from_process( @@ -347,14 +339,6 @@ def from_process( forward_signals_to_process: bool = True, restartable: bool = False, ) -> InteractiveProcess: - # TODO: Remove this check once https://github.com/pantsbuild/pants/issues/13852 is - # implemented and the immutable_input_digests are propagated into the InteractiveProcess. - if process.immutable_input_digests: - raise ValueError( - "Process has immutable_input_digests, so it cannot be converted to an " - "InteractiveProcess by calling from_process(). Use an async " - "InteractiveProcessRequest instead." - ) return InteractiveProcess( argv=process.argv, env=process.env, @@ -362,6 +346,7 @@ def from_process( forward_signals_to_process=forward_signals_to_process, restartable=restartable, append_only_caches=process.append_only_caches, + immutable_input_digests=process.immutable_input_digests, ) @@ -374,27 +359,15 @@ class InteractiveProcessRequest: @rule async def interactive_process_from_process(req: InteractiveProcessRequest) -> InteractiveProcess: - # TODO: Temporary workaround until https://github.com/pantsbuild/pants/issues/13852 - # is implemented. Once that is implemented we can get rid of this rule, and the - # InteractiveProcessRequest type, and use InteractiveProcess.from_process directly. - - if req.process.immutable_input_digests: - prefixed_immutable_input_digests = await MultiGet( - Get(Digest, AddPrefix(digest, prefix)) - for prefix, digest in req.process.immutable_input_digests.items() - ) - full_input_digest = await Get( - Digest, MergeDigests([req.process.input_digest, *prefixed_immutable_input_digests]) - ) - else: - full_input_digest = req.process.input_digest - return InteractiveProcess( - argv=req.process.argv, - env=req.process.env, - input_digest=full_input_digest, + warn_or_error( + removal_version="2.15.0.dev1", + entity="InteractiveProcessRequest", + hint="Instead, use `InteractiveProcess.from_process`.", + ) + return InteractiveProcess.from_process( + req.process, forward_signals_to_process=req.forward_signals_to_process, restartable=req.restartable, - append_only_caches=req.process.append_only_caches, ) diff --git a/src/python/pants/engine/process_test.py b/src/python/pants/engine/process_test.py index 42bbacdcd84..be420b2aca7 100644 --- a/src/python/pants/engine/process_test.py +++ b/src/python/pants/engine/process_test.py @@ -12,18 +12,17 @@ DigestContents, Directory, FileContent, - Snapshot, ) from pants.engine.internals.scheduler import ExecutionError from pants.engine.process import ( FallibleProcessResult, InteractiveProcess, - InteractiveProcessRequest, + InteractiveProcessResult, Process, ProcessCacheScope, ProcessResult, ) -from pants.testutil.rule_runner import QueryRule, RuleRunner +from pants.testutil.rule_runner import QueryRule, RuleRunner, mock_console from pants.util.contextutil import environment_as @@ -32,7 +31,7 @@ def new_rule_runner() -> RuleRunner: rules=[ QueryRule(ProcessResult, [Process]), QueryRule(FallibleProcessResult, [Process]), - QueryRule(InteractiveProcess, [InteractiveProcessRequest]), + QueryRule(InteractiveProcessResult, [InteractiveProcess]), ], ) @@ -253,34 +252,27 @@ def test_create_files(rule_runner: RuleRunner) -> None: assert result.stdout == b"hellogoodbye" -def test_interactive_process_cannot_have_input_files_and_workspace() -> None: - mock_digest = Digest(EMPTY_DIGEST.fingerprint, 1) - with pytest.raises(ValueError): - InteractiveProcess(argv=["/bin/echo"], input_digest=mock_digest, run_in_workspace=True) - - -def test_interactive_process_cannot_have_append_only_caches_and_workspace() -> None: - with pytest.raises(ValueError): - InteractiveProcess( - argv=["/bin/echo"], append_only_caches={"foo": "bar"}, run_in_workspace=True - ) - - -def test_interactive_process_immutable_input_digests(rule_runner: RuleRunner) -> None: +@pytest.mark.parametrize("run_in_workspace", [True, False]) +def test_interactive_process_inputs(rule_runner: RuleRunner, run_in_workspace: bool) -> None: digest0 = rule_runner.request(Digest, [CreateDigest([FileContent("file0", b"")])]) digest1 = rule_runner.request(Digest, [CreateDigest([FileContent("file1", b"")])]) digest2 = rule_runner.request( Digest, [CreateDigest([FileContent("file2", b""), FileContent("file3", b"")])] ) - process = Process( - argv=["foo", "bar"], - description="dummy", + process = InteractiveProcess( + argv=["/bin/bash", "-c", "ls -1 '{chroot}'"], env={"BAZ": "QUX"}, input_digest=digest0, immutable_input_digests={"prefix1": digest1, "prefix2": digest2}, + append_only_caches={"cache_name": "append_only0"}, + run_in_workspace=run_in_workspace, ) - iproc = rule_runner.request(InteractiveProcess, [InteractiveProcessRequest(process)]) - assert iproc.argv == process.argv - assert iproc.env == process.env - snapshot = rule_runner.request(Snapshot, [iproc.input_digest]) - assert snapshot.files == ("file0", "prefix1/file1", "prefix2/file2", "prefix2/file3") + with mock_console(rule_runner.options_bootstrapper) as (_, stdio_reader): + result = rule_runner.run_interactive_process(process) + assert result.exit_code == 0 + assert set(stdio_reader.get_stdout().splitlines()) == { + "append_only0", + "file0", + "prefix1", + "prefix2", + } diff --git a/src/python/pants/jvm/jdk_rules.py b/src/python/pants/jvm/jdk_rules.py index 1776e86c272..6b1bcfb0d18 100644 --- a/src/python/pants/jvm/jdk_rules.py +++ b/src/python/pants/jvm/jdk_rules.py @@ -101,13 +101,20 @@ class JdkEnvironment: jdk_preparation_script: ClassVar[str] = f"{bin_dir}/jdk.sh" java_home: ClassVar[str] = "__java_home" - def args(self, bash: BashBinary, classpath_entries: Iterable[str]) -> tuple[str, ...]: + def args( + self, bash: BashBinary, classpath_entries: Iterable[str], chroot: str | None = None + ) -> tuple[str, ...]: + def in_chroot(path: str) -> str: + if not chroot: + return path + return os.path.join(chroot, path) + return ( bash.path, - self.jdk_preparation_script, + in_chroot(self.jdk_preparation_script), f"{self.java_home}/bin/java", "-cp", - ":".join([self.nailgun_jar, *classpath_entries]), + ":".join([in_chroot(self.nailgun_jar), *classpath_entries]), ) @property @@ -203,7 +210,7 @@ async def prepare_jdk_environment( else: coursier_jdk_option = shlex.quote(f"--jvm={version}") - # TODO(#14386) This argument re-writing code should be done in a more standardised way. + # TODO(#16104) This argument re-writing code should use the native {chroot} support. # See also `run_deploy_jar` for other argument re-writing code. def prefixed(arg: str) -> str: if arg.startswith("__"): diff --git a/src/python/pants/jvm/run_deploy_jar.py b/src/python/pants/jvm/run_deploy_jar.py index 2854ae13b1d..b5323edc08f 100644 --- a/src/python/pants/jvm/run_deploy_jar.py +++ b/src/python/pants/jvm/run_deploy_jar.py @@ -68,7 +68,7 @@ async def create_deploy_jar_run_request( runtime_jvm = await Get(__RuntimeJvm, JdkEnvironment, jdk) support_digests += (runtime_jvm.digest,) - # TODO(#14386) This argument re-writing code should be done in a more standardised way. + # TODO(#16104) This argument re-writing code should use the native {chroot} support. # See also `jdk_rules.py` for other argument re-writing code. def prefixed(arg: str, prefixes: Iterable[str]) -> str: if any(arg.startswith(prefix) for prefix in prefixes): diff --git a/src/python/pants/testutil/process_util.py b/src/python/pants/testutil/process_util.py new file mode 100644 index 00000000000..a5a03b60878 --- /dev/null +++ b/src/python/pants/testutil/process_util.py @@ -0,0 +1,16 @@ +# Copyright 2022 Pants project contributors (see CONTRIBUTORS.md). +# Licensed under the Apache License, Version 2.0 (see LICENSE). + +from __future__ import annotations + +from typing import Callable + +from pants.engine.process import Process + + +def process_assertion(**assertions) -> Callable[[Process], None]: + def assert_process(process: Process) -> None: + for attr, expected in assertions.items(): + assert getattr(process, attr) == expected + + return assert_process diff --git a/src/rust/engine/process_execution/src/immutable_inputs.rs b/src/rust/engine/process_execution/src/immutable_inputs.rs index e9fa81d44bb..60b8526cdf1 100644 --- a/src/rust/engine/process_execution/src/immutable_inputs.rs +++ b/src/rust/engine/process_execution/src/immutable_inputs.rs @@ -11,8 +11,7 @@ use tempfile::TempDir; use crate::WorkdirSymlink; -/// Holds Digests materialized into a temporary directory, for symlinking into local sandboxes. -pub struct ImmutableInputs { +struct Inner { store: Store, // The TempDir that digests are materialized in. workdir: TempDir, @@ -21,6 +20,12 @@ pub struct ImmutableInputs { contents: Mutex>>>, } +/// +/// Holds Digests materialized into a temporary directory, for symlinking into local sandboxes. +/// +#[derive(Clone)] +pub struct ImmutableInputs(Arc); + impl ImmutableInputs { pub fn new(store: Store, base: &Path) -> Result { let workdir = tempfile::Builder::new() @@ -32,17 +37,17 @@ impl ImmutableInputs { e ) })?; - Ok(Self { + Ok(Self(Arc::new(Inner { store, workdir, contents: Mutex::default(), - }) + }))) } /// Returns an absolute Path to immutably consume the given Digest from. async fn path(&self, directory_digest: DirectoryDigest) -> Result { let digest = directory_digest.as_digest(); - let cell = self.contents.lock().entry(digest).or_default().clone(); + let cell = self.0.contents.lock().entry(digest).or_default().clone(); // We (might) need to initialize the value. // @@ -75,7 +80,7 @@ impl ImmutableInputs { // of approach 2 might eventually be worthwhile. cell .get_or_try_init(async { - let chroot = TempDir::new_in(self.workdir.path()).map_err(|e| { + let chroot = TempDir::new_in(self.0.workdir.path()).map_err(|e| { format!( "Failed to create a temporary directory for materialization of immutable input \ digest {:?}: {}", @@ -85,6 +90,7 @@ impl ImmutableInputs { let dest = chroot.path().join(digest.hash.to_hex()); self + .0 .store .materialize_directory(dest.clone(), directory_digest, Permissions::ReadOnly) .await?; diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index 2e94ab43ec1..93026b36fd1 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -26,6 +26,8 @@ use log::{debug, info}; use nails::execution::ExitCode; use shell_quote::bash; use store::{OneOffStoreFileByDigest, Snapshot, Store, StoreError}; +use task_executor::Executor; +use tempfile::TempDir; use tokio::process::{Child, Command}; use tokio::sync::RwLock; use tokio::time::{timeout, Duration}; @@ -42,7 +44,7 @@ pub const USER_EXECUTABLE_MODE: u32 = 0o100755; pub struct CommandRunner { pub store: Store, - executor: task_executor::Executor, + executor: Executor, work_dir_base: PathBuf, named_caches: NamedCaches, immutable_inputs: ImmutableInputs, @@ -54,7 +56,7 @@ pub struct CommandRunner { impl CommandRunner { pub fn new( store: Store, - executor: task_executor::Executor, + executor: Executor, work_dir_base: PathBuf, named_caches: NamedCaches, immutable_inputs: ImmutableInputs, @@ -269,41 +271,21 @@ impl super::CommandRunner for CommandRunner { // renders at the Process's level. desc = Some(req.description.clone()), |workunit| async move { - // Set up a temporary workdir, which will optionally be preserved. - let (workdir_path, maybe_workdir) = { - let workdir = tempfile::Builder::new() - .prefix("pants-sandbox-") - .tempdir_in(&self.work_dir_base) - .map_err(|err| { - format!( - "Error making tempdir for local process execution: {:?}", - err - ) - })?; - if self.cleanup_local_dirs { - // Hold on to the workdir so that we can drop it explicitly after we've finished using it. - (workdir.path().to_owned(), Some(workdir)) - } else { - // This consumes the `TempDir` without deleting directory on the filesystem, meaning - // that the temporary directory will no longer be automatically deleted when dropped. - let preserved_path = workdir.into_path(); - info!( - "Preserving local process execution dir {} for {:?}", - preserved_path.display(), - req.description - ); - (preserved_path, None) - } - }; + let workdir = create_sandbox( + self.executor.clone(), + &self.work_dir_base, + &req.description, + self.cleanup_local_dirs, + )?; // Start working on a mutable version of the process. let mut req = req; // Update env, replacing `{chroot}` placeholders with `workdir_path`. - update_env(&workdir_path, &mut req); + apply_chroot(workdir.path().to_str().unwrap(), &mut req); // Prepare the workdir. let exclusive_spawn = prepare_workdir( - workdir_path.clone(), + workdir.path().to_owned(), &req, req.input_digests.input_files.clone(), self.store.clone(), @@ -320,7 +302,7 @@ impl super::CommandRunner for CommandRunner { context, self.store.clone(), self.executor.clone(), - workdir_path.clone(), + workdir.path().to_owned(), (), exclusive_spawn, self.platform(), @@ -338,15 +320,8 @@ impl super::CommandRunner for CommandRunner { }) .await; - match maybe_workdir { - Some(workdir) => { - // Dropping the temporary directory will likely involve a lot of IO: do it in the - // background. - let _background_cleanup = self.executor.spawn_blocking(|| std::mem::drop(workdir)); - } - None => { - setup_run_sh_script(&req.env, &req.working_directory, &req.argv, &workdir_path)?; - } + if !self.cleanup_local_dirs { + setup_run_sh_script(&req.env, &req.working_directory, &req.argv, workdir.path())?; } res @@ -478,7 +453,7 @@ pub trait CapturedWorkdir { req: Process, context: Context, store: Store, - executor: task_executor::Executor, + executor: Executor, workdir_path: PathBuf, workdir_token: Self::WorkdirToken, exclusive_spawn: bool, @@ -609,22 +584,18 @@ pub trait CapturedWorkdir { ) -> Result>, String>; } -/// Updates the Process env. -/// -/// Mutates the env for the process `req`, replacing any `{chroot}` placeholders with -/// `workdir_path`. /// -/// This matches the behavior of interactive processes executed in a temporary directory and those -/// executed by the `run` goal. +/// Mutates a Process, replacing any `{chroot}` placeholders with `chroot_path`. /// -/// TODO: align this with the code path for interactive processes. Related issue #14386. -/// -pub fn update_env(workdir_path: &Path, req: &mut Process) { - if let Some(workdir) = workdir_path.to_str() { - for value in req.env.values_mut() { - if value.contains("{chroot}") { - *value = value.replace("{chroot}", workdir); - } +pub fn apply_chroot(chroot_path: &str, req: &mut Process) { + for value in req.env.values_mut() { + if value.contains("{chroot}") { + *value = value.replace("{chroot}", chroot_path); + } + } + for value in &mut req.argv { + if value.contains("{chroot}") { + *value = value.replace("{chroot}", chroot_path); } } } @@ -646,7 +617,7 @@ pub async fn prepare_workdir( req: &Process, materialized_input_digest: DirectoryDigest, store: Store, - executor: task_executor::Executor, + executor: Executor, named_caches: &NamedCaches, immutable_inputs: &ImmutableInputs, ) -> Result { @@ -755,6 +726,60 @@ pub async fn prepare_workdir( Ok(exclusive_spawn) } +/// Creates an optionally-cleaned-up sandbox in the given base path. +pub fn create_sandbox( + executor: Executor, + base_directory: &Path, + description: &str, + cleanup: bool, +) -> Result { + let workdir = tempfile::Builder::new() + .prefix("pants-sandbox-") + .tempdir_in(base_directory) + .map_err(|err| { + format!( + "Error making tempdir for local process execution: {:?}", + err + ) + })?; + + let (workdir_path, maybe_workdir) = if cleanup { + // Hold on to the workdir so that we can drop it explicitly after we've finished using it. + (workdir.path().to_owned(), Some(workdir)) + } else { + // This consumes the `TempDir` without deleting directory on the filesystem, meaning + // that the temporary directory will no longer be automatically deleted when dropped. + let preserved_path = workdir.into_path(); + info!( + "Preserving local process execution dir {} for {}", + preserved_path.display(), + description, + ); + (preserved_path, None) + }; + + Ok(AsyncDropSandbox(executor, workdir_path, maybe_workdir)) +} + +/// Dropping sandboxes can involve a lot of IO, so it is spawned to the background as a blocking +/// task. +#[must_use] +pub struct AsyncDropSandbox(Executor, PathBuf, Option); + +impl AsyncDropSandbox { + pub fn path(&self) -> &Path { + &self.1 + } +} + +impl Drop for AsyncDropSandbox { + fn drop(&mut self) { + if let Some(sandbox) = self.2.take() { + let _background_cleanup = self.0.spawn_blocking(|| std::mem::drop(sandbox)); + } + } +} + /// Create a file called __run.sh with the env, cwd and argv used by Pants to facilitate debugging. fn setup_run_sh_script( env: &BTreeMap, diff --git a/src/rust/engine/process_execution/src/local_tests.rs b/src/rust/engine/process_execution/src/local_tests.rs index 6637441cce7..7bc14f3d1c2 100644 --- a/src/rust/engine/process_execution/src/local_tests.rs +++ b/src/rust/engine/process_execution/src/local_tests.rs @@ -348,13 +348,13 @@ async fn jdk_symlink() { #[tokio::test] #[cfg(unix)] -async fn test_update_env() { +async fn test_apply_chroot() { let mut env: BTreeMap = BTreeMap::new(); env.insert("PATH".to_string(), "/usr/bin:{chroot}/bin".to_string()); let work_dir = TempDir::new().unwrap(); let mut req = Process::new(owned_string_vec(&["/usr/bin/env"])).env(env.clone()); - local::update_env(&work_dir.path(), &mut req); + local::apply_chroot(work_dir.path().to_str().unwrap(), &mut req); let path = format!("/usr/bin:{}/bin", work_dir.path().to_str().unwrap()); diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index cf169c77de2..40e41551a66 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -73,7 +73,9 @@ pub struct Core { pub local_parallelism: usize, pub graceful_shutdown_timeout: Duration, pub sessions: Sessions, - pub named_caches_dir: PathBuf, + pub named_caches: NamedCaches, + pub immutable_inputs: ImmutableInputs, + pub local_execution_root_dir: PathBuf, } #[derive(Clone, Debug)] @@ -177,9 +179,11 @@ impl Core { /// fn make_leaf_runner( full_store: &Store, + local_runner_store: &Store, executor: &Executor, local_execution_root_dir: &Path, - named_caches_dir: &Path, + immutable_inputs: &ImmutableInputs, + named_caches: &NamedCaches, process_execution_metadata: &ProcessMetadata, root_ca_certs: &Option>, exec_strategy_opts: &ExecutionStrategyOptions, @@ -206,22 +210,12 @@ impl Core { exec_strategy_opts.remote_parallelism, ) } else { - // If eager_fetch is enabled, we do not want to use any remote store with the local command - // runner. This reduces the surface area of where the remote store is - // used to only be the remote cache command runner. - let store_for_local_runner = if remoting_opts.cache_eager_fetch { - full_store.clone().into_local_only() - } else { - full_store.clone() - }; - let immutable_inputs = - ImmutableInputs::new(store_for_local_runner.clone(), local_execution_root_dir)?; let local_command_runner = local::CommandRunner::new( - store_for_local_runner.clone(), + local_runner_store.clone(), executor.clone(), local_execution_root_dir.to_path_buf(), - NamedCaches::new(named_caches_dir.to_path_buf()), - immutable_inputs, + named_caches.clone(), + immutable_inputs.clone(), exec_strategy_opts.local_cleanup, ); @@ -242,7 +236,7 @@ impl Core { Box::new(nailgun::CommandRunner::new( local_command_runner, local_execution_root_dir.to_path_buf(), - store_for_local_runner, + local_runner_store.clone(), executor.clone(), pool_size, )) @@ -320,10 +314,12 @@ impl Core { /// fn make_command_runners( full_store: &Store, + local_runner_store: &Store, executor: &Executor, local_cache: &PersistentCache, local_execution_root_dir: &Path, - named_caches_dir: &Path, + immutable_inputs: &ImmutableInputs, + named_caches: &NamedCaches, process_execution_metadata: &ProcessMetadata, root_ca_certs: &Option>, exec_strategy_opts: &ExecutionStrategyOptions, @@ -332,9 +328,11 @@ impl Core { ) -> Result>, String> { let leaf_runner = Self::make_leaf_runner( full_store, + local_runner_store, executor, local_execution_root_dir, - named_caches_dir, + immutable_inputs, + named_caches, process_execution_metadata, root_ca_certs, exec_strategy_opts, @@ -488,6 +486,8 @@ impl Core { full_store.clone() }; + let immutable_inputs = ImmutableInputs::new(store.clone(), &local_execution_root_dir)?; + let named_caches = NamedCaches::new(named_caches_dir); let process_execution_metadata = ProcessMetadata { instance_name: remoting_opts.instance_name.clone(), cache_key_gen_version: remoting_opts.execution_process_cache_namespace.clone(), @@ -496,10 +496,12 @@ impl Core { let command_runners = Self::make_command_runners( &full_store, + &store, &executor, &local_cache, &local_execution_root_dir, - &named_caches_dir, + &immutable_inputs, + &named_caches, &process_execution_metadata, &root_ca_certs, &exec_strategy_opts, @@ -565,7 +567,9 @@ impl Core { local_parallelism: exec_strategy_opts.local_parallelism, graceful_shutdown_timeout: exec_strategy_opts.graceful_shutdown_timeout, sessions, - named_caches_dir, + named_caches, + immutable_inputs, + local_execution_root_dir, }) } diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics.rs index d62f13ed83c..ea23dced4bb 100644 --- a/src/rust/engine/src/intrinsics.rs +++ b/src/rust/engine/src/intrinsics.rs @@ -1,8 +1,6 @@ // Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -use std::collections::BTreeMap; -use std::os::unix::fs::symlink; use std::path::{Path, PathBuf}; use std::process::Stdio; use std::time::Duration; @@ -22,15 +20,13 @@ use crate::Failure; use futures::future::{self, BoxFuture, FutureExt, TryFutureExt}; use futures::try_join; use indexmap::IndexMap; -use pyo3::{PyRef, Python}; -use tempfile::TempDir; +use pyo3::{PyRef, Python, ToPyObject}; use tokio::process; -use fs::{ - safe_create_dir_all_ioerror, DirectoryDigest, Permissions, RelativePath, EMPTY_DIRECTORY_DIGEST, -}; +use fs::{DirectoryDigest, RelativePath}; use hashing::Digest; -use process_execution::{CacheName, ManagedChild, NamedCaches}; +use process_execution::local::{apply_chroot, create_sandbox, prepare_workdir}; +use process_execution::ManagedChild; use stdio::TryCloneAsFile; use store::{SnapshotOps, SubsetParams}; @@ -519,129 +515,63 @@ fn interactive_process( let types = &context.core.types; let interactive_process_result = types.interactive_process_result; - let (argv, run_in_workspace, restartable, input_digest, env, append_only_caches) = Python::with_gil(|py| { + let (py_interactive_process, py_process): (Value, Value) = Python::with_gil(|py| { let py_interactive_process = (*args[0]).as_ref(py); - let argv: Vec = externs::getattr(py_interactive_process, "argv").unwrap(); - if argv.is_empty() { - return Err("Empty argv list not permitted".to_owned()); - } + let py_process: Value = externs::getattr(py_interactive_process, "process").unwrap(); + (py_interactive_process.extract().unwrap(), py_process) + }); + let mut process = ExecuteProcess::lift_process(&context.core.store(), py_process).await?; + let (run_in_workspace, restartable, cleanup) = Python::with_gil(|py| { + let py_interactive_process_obj = py_interactive_process.to_object(py); + let py_interactive_process = py_interactive_process_obj.as_ref(py); let run_in_workspace: bool = externs::getattr(py_interactive_process, "run_in_workspace").unwrap(); let restartable: bool = externs::getattr(py_interactive_process, "restartable").unwrap(); - let py_input_digest = externs::getattr(py_interactive_process, "input_digest").unwrap(); - let input_digest = lift_directory_digest(py_input_digest)?; - let env: BTreeMap = externs::getattr_from_str_frozendict(py_interactive_process, "env"); - - let append_only_caches = externs::getattr_from_str_frozendict::<&str>(py_interactive_process, "append_only_caches") - .into_iter() - .map(|(name, dest)| Ok((CacheName::new(name)?, RelativePath::new(dest)?))) - .collect::, String>>()?; - if !append_only_caches.is_empty() && run_in_workspace { - return Err("Local interactive process cannot use append-only caches when run in workspace.".to_owned()); - } - - Ok((argv, run_in_workspace, restartable, input_digest, env, append_only_caches)) - })?; + let cleanup: bool = externs::getattr(py_interactive_process, "cleanup").unwrap(); + (run_in_workspace, restartable, cleanup) + }); let session = context.session; - let maybe_tempdir = if run_in_workspace { - None + let tempdir = create_sandbox( + context.core.executor.clone(), + &context.core.local_execution_root_dir, + "interactive process", + cleanup, + )?; + prepare_workdir( + tempdir.path().to_owned(), + &process, + process.input_digests.input_files.clone(), + context.core.store(), + context.core.executor.clone(), + &context.core.named_caches, + &context.core.immutable_inputs, + ) + .await?; + apply_chroot(tempdir.path().to_str().unwrap(), &mut process); + + let p = Path::new(&process.argv[0]); + // TODO: Deprecate this program name calculation, and recommend `{chroot}` replacement in args + // instead. + let program_name = if !run_in_workspace && p.is_relative() { + let mut buf = PathBuf::new(); + buf.push(tempdir.path()); + buf.push(p); + buf } else { - Some(TempDir::new().map_err(|err| format!("Error creating tempdir: {}", err))?) - }; - - if input_digest != *EMPTY_DIRECTORY_DIGEST { - if run_in_workspace { - return Err( - "Local interactive process should not attempt to materialize files when run in workspace.".to_owned().into() - ); - } - - let destination = match maybe_tempdir { - Some(ref dir) => dir.path().to_path_buf(), - None => unreachable!(), - }; - - context - .core - .store() - .materialize_directory(destination, input_digest, Permissions::Writable) - .await?; - } - - // TODO: `immutable_input_digests` are not supported for InteractiveProcess, but they would be - // materialized here. - // see https://github.com/pantsbuild/pants/issues/13852 - if !append_only_caches.is_empty() { - let named_caches = NamedCaches::new(context.core.named_caches_dir.clone()); - let named_cache_symlinks = named_caches - .local_paths(&append_only_caches) - .collect::>(); - - let workdir = match maybe_tempdir { - Some(ref dir) => dir.path().to_path_buf(), - None => unreachable!(), - }; - - for named_cache_symlink in named_cache_symlinks { - safe_create_dir_all_ioerror(&named_cache_symlink.dst).map_err(|err| { - format!( - "Error making {} for local execution: {:?}", - named_cache_symlink.dst.display(), - err - ) - })?; - - let src = workdir.join(&named_cache_symlink.src); - if let Some(dir) = src.parent() { - safe_create_dir_all_ioerror(dir).map_err(|err| { - format!( - "Error making {} for local execution: {:?}", dir.display(), err - ) - })?; - } - symlink(&named_cache_symlink.dst, &src).map_err(|err| { - format!( - "Error linking {} -> {} for local execution: {:?}", - src.display(), - named_cache_symlink.dst.display(), - err - ) - })?; - } - } - - let p = Path::new(&argv[0]); - let program_name = match maybe_tempdir { - Some(ref tempdir) if p.is_relative() => { - let mut buf = PathBuf::new(); - buf.push(tempdir); - buf.push(p); - buf - } - _ => p.to_path_buf(), + p.to_path_buf() }; let mut command = process::Command::new(program_name); - for arg in argv[1..].iter() { - command.arg(arg); - } - - let mut env = env; - if let Some(ref tempdir) = maybe_tempdir { + if !run_in_workspace { command.current_dir(tempdir.path()); - - // Replace any references to `{chroot}` in the environment variables with the path to the - // temporary directory. This matches `engine.process_execution.local:update_env()`. - for value in env.values_mut() { - if value.contains("{chroot}") { - *value = value.replace("{chroot}", tempdir.path().to_str().unwrap()); - } - } + } + for arg in process.argv[1..].iter() { + command.arg(arg); } command.env_clear(); - command.envs(env); + command.envs(process.env); if !restartable { task_side_effected()?; diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 94d6079242e..17e5db6bbae 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -297,7 +297,10 @@ impl ExecuteProcess { .map_err(|e| e.enrich("Failed to merge input digests for process")) } - pub fn lift_process(value: &PyAny, input_digests: InputDigests) -> Result { + fn lift_process_fields( + value: &PyAny, + input_digests: InputDigests, + ) -> Result { let env = externs::getattr_from_str_frozendict(value, "env"); let working_directory = match externs::getattr_as_optional_string(value, "working_directory") { None => None, @@ -374,10 +377,15 @@ impl ExecuteProcess { }) } - pub async fn lift(store: &Store, value: Value) -> Result { + pub async fn lift_process(store: &Store, value: Value) -> Result { let input_digests = Self::lift_process_input_digests(store, &value).await?; - let process = Python::with_gil(|py| Self::lift_process((*value).as_ref(py), input_digests))?; - Ok(Self { process }) + Python::with_gil(|py| Self::lift_process_fields((*value).as_ref(py), input_digests)) + } + + pub async fn lift(store: &Store, value: Value) -> Result { + Ok(Self { + process: Self::lift_process(store, value).await?, + }) } async fn run_node(