Skip to content

Commit

Permalink
Add support for InteractiveProcess consuming inputs while `run_in_w…
Browse files Browse the repository at this point in the history
…orkspace=True` (Cherry-pick of #16093) (#16148)

As described in #16105, blocking code in `@rule` bodies can trigger a shutdown race condition when `pantsd` is disabled and `Ctrl+C` is sent.

Longer term solutions are discussed on that issue, but in the short term, we can avoid using blocking code for `InteractiveProcess`, such that `run` uses the same sandbox creation, async teardown, and relativizing code as `Process` does.

Fixes #13852, fixes #14386, fixes #16120, and fixes #15771.
  • Loading branch information
stuhood committed Jul 12, 2022
1 parent e1eca0d commit cb88650
Show file tree
Hide file tree
Showing 21 changed files with 318 additions and 403 deletions.
17 changes: 6 additions & 11 deletions src/python/pants/backend/docker/goals/publish_test.py
Expand Up @@ -3,7 +3,7 @@

from __future__ import annotations

from typing import cast
from typing import Callable, cast

import pytest

Expand All @@ -22,7 +22,9 @@
from pants.core.goals.publish import PublishPackages, PublishProcesses
from pants.engine.addresses import Address
from pants.engine.fs import EMPTY_DIGEST
from pants.engine.process import Process
from pants.testutil.option_util import create_subsystem
from pants.testutil.process_util import process_assertion
from pants.testutil.rule_runner import QueryRule, RuleRunner
from pants.util.frozendict import FrozenDict

Expand Down Expand Up @@ -90,24 +92,17 @@ def assert_publish(
publish: PublishPackages,
expect_names: tuple[str, ...],
expect_description: str | None,
expect_process,
expect_process: Callable[[Process], None] | None,
) -> None:
assert publish.names == expect_names
assert publish.description == expect_description
if expect_process:
expect_process(publish.process)
assert publish.process
expect_process(publish.process.process)
else:
assert publish.process is None


def process_assertion(**assertions):
def assert_process(process):
for attr, expected in assertions.items():
assert getattr(process, attr) == expected

return assert_process


def test_docker_skip_push(rule_runner: RuleRunner) -> None:
result, _ = run_publish(rule_runner, Address("src/skip-test"))
assert len(result) == 1
Expand Down
12 changes: 3 additions & 9 deletions src/python/pants/backend/helm/goals/publish_test.py
Expand Up @@ -20,6 +20,7 @@
from pants.core.util_rules import external_tool
from pants.engine.addresses import Address
from pants.engine.fs import EMPTY_DIGEST
from pants.testutil.process_util import process_assertion
from pants.testutil.rule_runner import QueryRule, RuleRunner


Expand Down Expand Up @@ -81,19 +82,12 @@ def assert_publish(
assert publish.names == expect_names
assert publish.description == expect_description
if expect_process:
expect_process(publish.process)
assert publish.process
expect_process(publish.process.process)
else:
assert publish.process is None


def process_assertion(**assertions):
def assert_process(process):
for attr, expected in assertions.items():
assert getattr(process, attr) == expected

return assert_process


def _declare_targets(rule_runner: RuleRunner) -> None:
rule_runner.write_files(
{
Expand Down
20 changes: 8 additions & 12 deletions src/python/pants/backend/python/goals/publish_test.py
Expand Up @@ -4,6 +4,7 @@
from __future__ import annotations

from textwrap import dedent
from typing import Callable

import pytest

Expand All @@ -20,6 +21,8 @@
from pants.core.util_rules.config_files import rules as config_files_rules
from pants.engine.addresses import Address
from pants.engine.fs import EMPTY_DIGEST
from pants.engine.process import Process
from pants.testutil.process_util import process_assertion
from pants.testutil.rule_runner import QueryRule, RuleRunner
from pants.util.frozendict import FrozenDict

Expand Down Expand Up @@ -95,24 +98,17 @@ def assert_package(
package: PublishPackages,
expect_names: tuple[str, ...],
expect_description: str,
expect_process,
expect_process: Callable[[Process], None] | None,
) -> None:
assert package.names == expect_names
assert package.description == expect_description
if expect_process:
expect_process(package.process)
assert package.process
expect_process(package.process.process)
else:
assert package.process is None


def process_assertion(**assertions):
def assert_process(process):
for attr, expected in assertions.items():
assert getattr(process, attr) == expected

return assert_process


def test_twine_upload(rule_runner, packages) -> None:
rule_runner.write_files(project_files(skip_twine=False))
result = request_publish_processes(rule_runner, packages)
Expand Down Expand Up @@ -211,6 +207,6 @@ def test_twine_cert_arg(rule_runner, packages, options, cert_arg) -> None:
process = result[0].process
assert process
if cert_arg:
assert cert_arg in process.argv
assert cert_arg in process.process.argv
else:
assert not any(arg.startswith("--cert") for arg in process.argv)
assert not any(arg.startswith("--cert") for arg in process.process.argv)
Expand Up @@ -606,7 +606,7 @@ def test_debug_adaptor_request_argv(rule_runner: RuleRunner) -> None:
inputs = [PythonTestFieldSet.create(tgt)]
request = rule_runner.request(TestDebugAdapterRequest, inputs)
assert request.process is not None
assert request.process.argv == (
assert request.process.process.argv == (
"./pytest_runner.pex_pex_shim.sh",
"--listen",
"127.0.0.1:5678",
Expand Down
Expand Up @@ -107,10 +107,10 @@ def run(*extra_args: str, **extra_env: str) -> PantsResult:
file = result.stdout.strip()
if use_new_semantics_args:
assert file.endswith("utils/strutil.py")
assert ".pants.d/tmp" not in file
assert "pants-sandbox-" not in file
else:
assert file.endswith("src_root2/utils/strutil.py")
assert ".pants.d/tmp" in file
assert "pants-sandbox-" in file
assert result.exit_code == 23

if include_tools:
Expand Down Expand Up @@ -270,4 +270,4 @@ def test_filename_spec_ambiutity(use_new_semantics_args) -> None:
result = run_pants(args)
file = result.stdout.strip()
assert file.endswith("src/app.py")
assert ".pants.d/tmp" in file
assert "pants-sandbox-" in file
Expand Up @@ -84,7 +84,7 @@ def run(*extra_args: str, **extra_env: str) -> Tuple[PantsResult, str]:
file = result.stdout.strip()
if run_in_sandbox:
assert file.endswith("src_root2/utils/strutil.py")
assert ".pants.d/tmp" in file
assert "pants-sandbox-" in file
else:
assert file.endswith(os.path.join(test_repo_root, "src_root2/utils/strutil.py"))
assert result.exit_code == 23
Expand Down
15 changes: 5 additions & 10 deletions src/python/pants/backend/scala/goals/repl.py
@@ -1,5 +1,6 @@
# Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from __future__ import annotations

from pants.backend.scala.subsystems.scala import ScalaSubsystem
Expand Down Expand Up @@ -66,32 +67,26 @@ async def create_scala_repl_request(
Get(Digest, AddPrefix(d, user_classpath_prefix)) for d in user_classpath.digests()
)

# TODO: Manually merging the `immutable_input_digests` since InteractiveProcess doesn't
# support them yet. See https://github.com/pantsbuild/pants/issues/13852.
jdk_digests = await MultiGet(
Get(Digest, AddPrefix(digest, relpath))
for relpath, digest in jdk.immutable_input_digests.items()
)

repl_digest = await Get(
Digest,
MergeDigests([*prefixed_user_classpath, tool_classpath.content.digest, *jdk_digests]),
MergeDigests([*prefixed_user_classpath, tool_classpath.content.digest]),
)

return ReplRequest(
digest=repl_digest,
args=[
*jdk.args(bash, tool_classpath.classpath_entries()),
*jdk.args(bash, tool_classpath.classpath_entries(), chroot="{chroot}"),
"-Dscala.usejavacp=true",
"scala.tools.nsc.MainGenericRunner",
"-classpath",
":".join(user_classpath.args(prefix=user_classpath_prefix)),
],
run_in_workspace=False,
extra_env={
**jdk.env,
"PANTS_INTERNAL_ABSOLUTE_PREFIX": "",
},
run_in_workspace=False,
immutable_input_digests=jdk.immutable_input_digests,
append_only_caches=jdk.append_only_caches,
)

Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/core/goals/export_test.py
Expand Up @@ -63,7 +63,7 @@ def mock_export(

def _mock_run(rule_runner: RuleRunner, ip: InteractiveProcess) -> InteractiveProcessResult:
subprocess.check_call(
ip.argv,
ip.process.argv,
stderr=subprocess.STDOUT,
env={
"PATH": os.environ.get("PATH", ""),
Expand Down
52 changes: 20 additions & 32 deletions src/python/pants/core/goals/repl.py
Expand Up @@ -5,7 +5,6 @@
import os
from abc import ABC
from dataclasses import dataclass
from pathlib import PurePath
from typing import ClassVar, Iterable, Mapping, Optional, Sequence, Tuple

from pants.base.build_root import BuildRoot
Expand All @@ -14,14 +13,12 @@
from pants.engine.environment import CompleteEnvironment
from pants.engine.fs import Digest, Workspace
from pants.engine.goal import Goal, GoalSubsystem
from pants.engine.internals.native_engine import EMPTY_DIGEST
from pants.engine.process import InteractiveProcess, InteractiveProcessResult
from pants.engine.rules import Effect, Get, collect_rules, goal_rule
from pants.engine.target import FilteredTargets, Target
from pants.engine.unions import UnionMembership, union
from pants.option.global_options import GlobalOptions
from pants.option.option_types import BoolOption, StrOption
from pants.util.contextutil import temporary_dir
from pants.util.frozendict import FrozenDict
from pants.util.memo import memoized_property
from pants.util.meta import frozen_after_init
Expand All @@ -38,10 +35,9 @@ class ReplImplementation(ABC):
name: ClassVar[str]

targets: Sequence[Target]
chroot: str # Absolute path of the chroot the sources will be materialized to.

def in_chroot(self, relpath: str) -> str:
return os.path.join(self.chroot, relpath)
return os.path.join("{chroot}", relpath)

@memoized_property
def addresses(self) -> Addresses:
Expand Down Expand Up @@ -78,6 +74,7 @@ class ReplRequest:
digest: Digest
args: Tuple[str, ...]
extra_env: FrozenDict[str, str]
immutable_input_digests: FrozenDict[str, Digest]
append_only_caches: FrozenDict[str, str]
run_in_workspace: bool

Expand All @@ -87,12 +84,14 @@ def __init__(
digest: Digest,
args: Iterable[str],
extra_env: Optional[Mapping[str, str]] = None,
immutable_input_digests: Mapping[str, Digest] | None = None,
append_only_caches: Mapping[str, str] | None = None,
run_in_workspace: bool = True,
) -> None:
self.digest = digest
self.args = tuple(args)
self.extra_env = FrozenDict(extra_env or {})
self.immutable_input_digests = FrozenDict(immutable_input_digests or {})
self.append_only_caches = FrozenDict(append_only_caches or {})
self.run_in_workspace = run_in_workspace

Expand Down Expand Up @@ -121,33 +120,22 @@ async def run_repl(
)
return Repl(-1)

with temporary_dir(root_dir=global_options.pants_workdir, cleanup=False) as tmpdir:
repl_impl = repl_implementation_cls(targets=specified_targets, chroot=tmpdir)
request = await Get(ReplRequest, ReplImplementation, repl_impl)

input_digest = request.digest
if request.run_in_workspace:
workspace.write_digest(
request.digest,
path_prefix=PurePath(tmpdir).relative_to(build_root.path).as_posix(),
# We don't want to influence whether the InteractiveProcess is able to restart. Because
# we're writing into a temp directory, we can safely mark this side_effecting=False.
side_effecting=False,
)
input_digest = EMPTY_DIGEST

env = {**complete_env, **request.extra_env}
result = await Effect(
InteractiveProcessResult,
InteractiveProcess(
argv=request.args,
env=env,
input_digest=input_digest,
run_in_workspace=request.run_in_workspace,
restartable=repl_subsystem.restartable,
append_only_caches=request.append_only_caches,
),
)
repl_impl = repl_implementation_cls(targets=specified_targets)
request = await Get(ReplRequest, ReplImplementation, repl_impl)

env = {**complete_env, **request.extra_env}
result = await Effect(
InteractiveProcessResult,
InteractiveProcess(
argv=request.args,
env=env,
input_digest=request.digest,
run_in_workspace=request.run_in_workspace,
restartable=repl_subsystem.restartable,
immutable_input_digests=request.immutable_input_digests,
append_only_caches=request.append_only_caches,
),
)
return Repl(result.exit_code)


Expand Down

0 comments on commit cb88650

Please sign in to comment.