Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose intrinsics as decorated Python functions instead #20874

Merged
merged 1 commit into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions src/python/pants/engine/internals/native_engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,28 @@ from typing import (

from typing_extensions import Self

from pants.engine.fs import (
CreateDigest,
DigestContents,
DigestEntries,
DigestSubset,
NativeDownloadFile,
PathGlobs,
Paths,
)
from pants.engine.internals.docker import DockerResolveImageRequest, DockerResolveImageResult
from pants.engine.internals.native_dep_inference import (
NativeParsedJavascriptDependencies,
NativeParsedPythonDependencies,
)
from pants.engine.internals.scheduler import Workunit, _PathGlobsAndRootCollection
from pants.engine.internals.session import SessionValues
from pants.engine.process import InteractiveProcess, InteractiveProcessResult
from pants.engine.internals.session import RunId, SessionValues
from pants.engine.process import (
FallibleProcessResult,
InteractiveProcess,
InteractiveProcessResult,
Process,
)

# TODO: black and flake8 disagree about the content of this file:
# see https://github.com/psf/black/issues/1548
Expand Down Expand Up @@ -473,6 +492,45 @@ EMPTY_SNAPSHOT: Snapshot

def default_cache_path() -> str: ...

# ------------------------------------------------------------------------------
# Intrinsics
# ------------------------------------------------------------------------------

async def create_digest_to_digest(
create_digest: CreateDigest,
) -> Digest: ...
async def path_globs_to_digest(
path_globs: PathGlobs,
) -> Digest: ...
async def path_globs_to_paths(
path_globs: PathGlobs,
) -> Paths: ...
async def download_file_to_digest(
native_download_file: NativeDownloadFile,
) -> Digest: ...
async def digest_to_snapshot(digest: Digest) -> Snapshot: ...
async def directory_digest_to_digest_contents(digest: Digest) -> DigestContents: ...
async def directory_digest_to_digest_entries(digest: Digest) -> DigestEntries: ...
async def merge_digests_request_to_digest(merge_digests: MergeDigests) -> Digest: ...
async def remove_prefix_request_to_digest(remove_prefix: RemovePrefix) -> Digest: ...
async def add_prefix_request_to_digest(add_prefix: AddPrefix) -> Digest: ...
async def process_request_to_process_result(
process: Process, process_execution_environment: ProcessExecutionEnvironment
) -> FallibleProcessResult: ...
async def digest_subset_to_digest(digest_subset: DigestSubset) -> Digest: ...
async def session_values() -> SessionValues: ...
async def run_id() -> RunId: ...
async def interactive_process(
process: InteractiveProcess, process_execution_environment: ProcessExecutionEnvironment
) -> InteractiveProcessResult: ...
async def docker_resolve_image(request: DockerResolveImageRequest) -> DockerResolveImageResult: ...
async def parse_python_deps(
deps_request: NativeDependenciesRequest,
) -> NativeParsedPythonDependencies: ...
async def parse_javascript_deps(
deps_request: NativeDependenciesRequest,
) -> NativeParsedJavascriptDependencies: ...

# ------------------------------------------------------------------------------
# `pantsd`
# ------------------------------------------------------------------------------
Expand Down
149 changes: 149 additions & 0 deletions src/python/pants/engine/intrinsics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Copyright 2024 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from __future__ import annotations

from pants.engine.fs import (
AddPrefix,
CreateDigest,
Digest,
DigestContents,
DigestEntries,
DigestSubset,
MergeDigests,
NativeDownloadFile,
PathGlobs,
Paths,
RemovePrefix,
Snapshot,
)
from pants.engine.internals import native_engine
from pants.engine.internals.docker import DockerResolveImageRequest, DockerResolveImageResult
from pants.engine.internals.native_dep_inference import (
NativeParsedJavascriptDependencies,
NativeParsedPythonDependencies,
)
from pants.engine.internals.native_engine import NativeDependenciesRequest
from pants.engine.internals.session import RunId, SessionValues
from pants.engine.process import (
FallibleProcessResult,
InteractiveProcess,
InteractiveProcessResult,
Process,
ProcessExecutionEnvironment,
)
from pants.engine.rules import _uncacheable_rule, collect_rules, rule


@rule
async def create_digest_to_digest(
create_digest: CreateDigest,
) -> Digest:
return await native_engine.create_digest_to_digest(create_digest)


@rule
async def path_globs_to_digest(
path_globs: PathGlobs,
) -> Digest:
return await native_engine.path_globs_to_digest(path_globs)


@rule
async def path_globs_to_paths(
path_globs: PathGlobs,
) -> Paths:
return await native_engine.path_globs_to_paths(path_globs)


@rule
async def download_file_to_digest(
native_download_file: NativeDownloadFile,
) -> Digest:
return await native_engine.download_file_to_digest(native_download_file)


@rule
async def digest_to_snapshot(digest: Digest) -> Snapshot:
return await native_engine.digest_to_snapshot(digest)


@rule
async def directory_digest_to_digest_contents(digest: Digest) -> DigestContents:
return await native_engine.directory_digest_to_digest_contents(digest)


@rule
async def directory_digest_to_digest_entries(digest: Digest) -> DigestEntries:
return await native_engine.directory_digest_to_digest_entries(digest)


@rule
async def merge_digests_request_to_digest(merge_digests: MergeDigests) -> Digest:
return await native_engine.merge_digests_request_to_digest(merge_digests)


@rule
async def remove_prefix_request_to_digest(remove_prefix: RemovePrefix) -> Digest:
return await native_engine.remove_prefix_request_to_digest(remove_prefix)


@rule
async def add_prefix_request_to_digest(add_prefix: AddPrefix) -> Digest:
return await native_engine.add_prefix_request_to_digest(add_prefix)


@rule
async def process_request_to_process_result(
process: Process, process_execution_environment: ProcessExecutionEnvironment
) -> FallibleProcessResult:
return await native_engine.process_request_to_process_result(
process, process_execution_environment
)


@rule
async def digest_subset_to_digest(digest_subset: DigestSubset) -> Digest:
return await native_engine.digest_subset_to_digest(digest_subset)


@rule
async def session_values() -> SessionValues:
return await native_engine.session_values()


@rule
async def run_id() -> RunId:
return await native_engine.run_id()


@_uncacheable_rule
async def interactive_process(
process: InteractiveProcess, process_execution_environment: ProcessExecutionEnvironment
) -> InteractiveProcessResult:
return await native_engine.interactive_process(process, process_execution_environment)


@rule
async def docker_resolve_image(request: DockerResolveImageRequest) -> DockerResolveImageResult:
return await native_engine.docker_resolve_image(request)


@rule
async def parse_python_deps(
deps_request: NativeDependenciesRequest,
) -> NativeParsedPythonDependencies:
return await native_engine.parse_python_deps(deps_request)


@rule
async def parse_javascript_deps(
deps_request: NativeDependenciesRequest,
) -> NativeParsedJavascriptDependencies:
return await native_engine.parse_javascript_deps(deps_request)


def rules():
return [
*collect_rules(),
]
3 changes: 2 additions & 1 deletion src/python/pants/init/engine_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from pants.build_graph.build_configuration import BuildConfiguration
from pants.core.util_rules import environments, system_binaries
from pants.core.util_rules.environments import determine_bootstrap_environment
from pants.engine import desktop, download_file, fs, process
from pants.engine import desktop, download_file, fs, intrinsics, process
from pants.engine.console import Console
from pants.engine.environment import EnvironmentName
from pants.engine.fs import PathGlobs, Snapshot, Workspace
Expand Down Expand Up @@ -276,6 +276,7 @@ def current_executing_goals(session_values: SessionValues) -> CurrentExecutingGo
rules = FrozenOrderedSet(
(
*collect_rules(locals()),
*intrinsics.rules(),
*build_files.rules(),
*fs.rules(),
*dep_rules.rules(),
Expand Down
4 changes: 0 additions & 4 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use crate::intrinsics::Intrinsics;
use crate::nodes::{ExecuteProcess, NodeKey, NodeOutput, NodeResult};
use crate::python::{throw, Failure};
use crate::session::{Session, Sessions};
Expand Down Expand Up @@ -62,7 +61,6 @@ pub struct Core {
pub tasks: Tasks,
pub rule_graph: RuleGraph<Rule>,
pub types: Types,
pub intrinsics: Intrinsics,
pub executor: Executor,
store: Store,
/// The CommandRunners to use for execution, in ascending order of reliability (for the purposes
Expand Down Expand Up @@ -519,7 +517,6 @@ impl Core {
executor: Executor,
tasks: Tasks,
types: Types,
intrinsics: Intrinsics,
build_root: PathBuf,
ignore_patterns: Vec<String>,
use_gitignore: bool,
Expand Down Expand Up @@ -683,7 +680,6 @@ impl Core {
tasks,
rule_graph,
types,
intrinsics,
executor: executor.clone(),
store,
command_runners,
Expand Down
36 changes: 12 additions & 24 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use pyo3::types::{PyBytes, PyDict, PyList, PyTuple, PyType};
use pyo3::{create_exception, IntoPy, PyAny, PyRef};
use regex::Regex;
use remote::remote_cache::RemoteCacheWarningsBehavior;
use rule_graph::{self, DependencyKey, RuleGraph, RuleId};
use rule_graph::{self, RuleGraph};
use store::RemoteProvider;
use task_executor::Executor;
use workunit_store::{
Expand All @@ -49,14 +49,16 @@ use workunit_store::{

use crate::externs::fs::{possible_store_missing_digest, PyFileDigest};
use crate::externs::process::PyProcessExecutionEnvironment;
use crate::intrinsics;
use crate::{
externs, nodes, Core, ExecutionRequest, ExecutionStrategyOptions, ExecutionTermination,
Failure, Function, Intrinsic, Intrinsics, Key, LocalStoreOptions, Params, RemotingOptions,
Rule, Scheduler, Session, SessionCore, Tasks, TypeId, Types, Value,
Failure, Function, Key, LocalStoreOptions, Params, RemotingOptions, Rule, Scheduler, Session,
SessionCore, Tasks, TypeId, Types, Value,
};

#[pymodule]
fn native_engine(py: Python, m: &PyModule) -> PyO3Result<()> {
intrinsics::register(py, m)?;
externs::register(py, m)?;
externs::address::register(py, m)?;
externs::fs::register(m)?;
Expand Down Expand Up @@ -701,9 +703,7 @@ fn scheduler_create(
.borrow_mut()
.take()
.ok_or_else(|| PyException::new_err("An instance of PyTypes may only be used once."))?;
let intrinsics = Intrinsics::new(&types);
let mut tasks = py_tasks.0.replace(Tasks::new());
tasks.intrinsics_set(&intrinsics);
let tasks = py_tasks.0.replace(Tasks::new());

// NOTE: Enter the Tokio runtime so that libraries like Tonic (for gRPC) are able to
// use `tokio::spawn` since Python does not setup Tokio for the main thread. This also
Expand All @@ -716,7 +716,6 @@ fn scheduler_create(
py_executor.0.clone(),
tasks,
types,
intrinsics,
build_root,
ignore_patterns,
use_gitignore,
Expand Down Expand Up @@ -1022,21 +1021,11 @@ fn session_run_interactive_process(
let interactive_process: Value = interactive_process.into();
let process_config = Value::new(process_config_from_environment.into_py(py));
py.allow_threads(|| {
core.executor.clone().block_on(nodes::maybe_side_effecting(
core.executor.clone().block_on(nodes::task_context(
context.clone(),
true,
&Arc::new(std::sync::atomic::AtomicBool::new(true)),
core.intrinsics.run(
&Intrinsic {
id: RuleId::new("interactive_process"),
product: core.types.interactive_process_result,
inputs: vec![
DependencyKey::new(core.types.interactive_process),
DependencyKey::new(core.types.process_config_from_environment),
],
},
context,
vec![interactive_process, process_config],
),
intrinsics::interactive_process_inner(&context, interactive_process, process_config),
))
})
.map(|v| v.into())
Expand Down Expand Up @@ -1407,8 +1396,7 @@ fn rule_graph_rule_gets<'p>(py: Python<'p>, py_scheduler: &PyScheduler) -> PyO3R
core.executor.enter(|| {
let result = PyDict::new(py);
for (rule, rule_dependencies) in core.rule_graph.rule_dependencies() {
let Rule::Task(task) = rule else { continue };

let task = rule.0;
let function = &task.func;
let mut dependencies = Vec::new();
for (dependency_key, rule) in rule_dependencies {
Expand All @@ -1421,7 +1409,7 @@ fn rule_graph_rule_gets<'p>(py: Python<'p>, py_scheduler: &PyScheduler) -> PyO3R
{
continue;
}
let Rule::Task(task) = rule else { continue };
let function = &rule.0.func;

let provided_params = dependency_key
.provided_params
Expand All @@ -1431,7 +1419,7 @@ fn rule_graph_rule_gets<'p>(py: Python<'p>, py_scheduler: &PyScheduler) -> PyO3R
dependencies.push((
dependency_key.product.as_py_type(py),
provided_params,
task.func.0.value.into_py(py),
function.0.value.into_py(py),
));
}
if dependencies.is_empty() {
Expand Down
Loading
Loading