Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add artifacts method to EngineAware #10344

Merged
merged 1 commit into from Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/python/pants/core/goals/test.py
Expand Up @@ -17,11 +17,12 @@
from pants.engine.addresses import Address
from pants.engine.collection import Collection
from pants.engine.console import Console
from pants.engine.engine_aware import EngineAware
from pants.engine.fs import Digest, DirectoryToMaterialize, Workspace
from pants.engine.goal import Goal, GoalSubsystem
from pants.engine.interactive_process import InteractiveProcess, InteractiveRunner
from pants.engine.process import FallibleProcessResult
from pants.engine.rules import EngineAware, goal_rule, rule
from pants.engine.rules import goal_rule, rule
from pants.engine.selectors import Get, MultiGet
from pants.engine.target import (
FieldSetWithOrigin,
Expand Down
29 changes: 29 additions & 0 deletions src/python/pants/engine/engine_aware.py
@@ -0,0 +1,29 @@
# Copyright 2020 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from abc import ABC
from typing import Dict, Optional

from pants.engine.fs import Digest
from pants.util.logging import LogLevel


class EngineAware(ABC):
"""This is a marker class used to indicate that the output of an `@rule` can send metadata about
the rule's output to the engine.

EngineAware defines abstract methods on the class, all of which return an Optional[T], and which
are expected to be overridden by concrete types implementing EngineAware.
"""

def level(self) -> Optional[LogLevel]:
"""Overrides the level of the workunit associated with this type."""
return None

def message(self) -> Optional[str]:
"""Sets an optional result message on the workunit."""
return None

def artifacts(self) -> Optional[Dict[str, Digest]]:
"""Sets a map of names to `Digest`s to appear as artifacts on the workunit."""
return None
35 changes: 34 additions & 1 deletion src/python/pants/engine/internals/engine_test.py
Expand Up @@ -8,11 +8,12 @@
from textwrap import dedent
from typing import List, Optional

from pants.engine.engine_aware import EngineAware
from pants.engine.fs import EMPTY_DIGEST
from pants.engine.internals.scheduler import ExecutionError
from pants.engine.internals.scheduler_test_base import SchedulerTestBase
from pants.engine.process import Process, ProcessResult
from pants.engine.rules import EngineAware, RootRule, rule
from pants.engine.rules import RootRule, rule
from pants.engine.selectors import Get, MultiGet
from pants.reporting.streaming_workunit_handler import (
StreamingWorkunitContext,
Expand Down Expand Up @@ -588,6 +589,38 @@ def a_rule(n: int) -> ModifiedOutput:
workunit = next(item for item in finished if item["name"] == "a_rule")
assert workunit["level"] == "DEBUG"

def test_artifacts_on_engine_aware_type(self) -> None:
@dataclass(frozen=True)
class Output(EngineAware):
val: int

def artifacts(self):
return {"some_arbitrary_key": EMPTY_DIGEST}

@rule(desc="a_rule")
def a_rule(n: int) -> Output:
return Output(val=n)

rules = [a_rule, RootRule(int)]
scheduler = self.mk_scheduler(
rules, include_trace_on_error=False, should_report_workunits=True
)

tracker = WorkunitTracker()
handler = StreamingWorkunitHandler(
scheduler,
callbacks=[tracker.add],
report_interval_seconds=0.01,
max_workunit_verbosity=LogLevel.DEBUG,
)
with handler.session():
scheduler.product_request(Output, subjects=[0])

finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))
workunit = next(item for item in finished if item["name"] == "a_rule")
artifacts = workunit["artifacts"]
assert artifacts["some_arbitrary_key"] == EMPTY_DIGEST


class StreamingWorkunitProcessTests(TestBase):

Expand Down
3 changes: 2 additions & 1 deletion src/python/pants/engine/internals/scheduler.py
Expand Up @@ -14,6 +14,7 @@
from pants.base.project_tree import Dir, File, Link
from pants.engine.addresses import Address
from pants.engine.collection import Collection
from pants.engine.engine_aware import EngineAware
from pants.engine.fs import (
AddPrefix,
CreateDigest,
Expand All @@ -36,7 +37,7 @@
from pants.engine.internals.nodes import Return, Throw
from pants.engine.platform import Platform
from pants.engine.process import FallibleProcessResultWithPlatform, MultiPlatformProcess
from pants.engine.rules import EngineAware, Rule, RuleIndex, TaskRule
from pants.engine.rules import Rule, RuleIndex, TaskRule
from pants.engine.selectors import Params
from pants.engine.unions import union
from pants.option.global_options import ExecutionOptions
Expand Down
17 changes: 0 additions & 17 deletions src/python/pants/engine/rules.py
Expand Up @@ -20,23 +20,6 @@
from pants.util.ordered_set import FrozenOrderedSet, OrderedSet


class EngineAware(ABC):
"""This is a marker class used to indicate that the output of an `@rule` can send metadata about
the rule's output to the engine.

EngineAware defines abstract methods on the class, all of which return an Optional[T], and which
are expected to be overridden by concrete types implementing EngineAware.
"""

def level(self) -> Optional[LogLevel]:
"""Overrides the level of the workunit associated with this type."""
return None

def message(self) -> Optional[str]:
"""Sets an optional result message on the workunit."""
return None


@decorated_type_checkable
def side_effecting(cls):
"""Annotates a class to indicate that it is a side-effecting type, which needs to be handled
Expand Down
20 changes: 4 additions & 16 deletions src/rust/engine/process_execution/src/lib.rs
Expand Up @@ -522,29 +522,17 @@ impl CommandRunner for BoundedCommandRunner {
let desc = req
.user_facing_name()
.unwrap_or_else(|| "<Unnamed process>".to_string());
let outer_metadata = WorkunitMetadata {
desc: Some(desc.clone()),
level: Level::Debug,
message: None,
blocked: true,
stdout: None,
stderr: None,
};
let mut outer_metadata = WorkunitMetadata::with_level(Level::Debug);
outer_metadata.desc = Some(desc.clone());
let bounded_fut = {
let inner = self.inner.clone();
let semaphore = self.inner.1.clone();
let context = context.clone();
let name = format!("{}-running", req.workunit_name());

semaphore.with_acquired(move |concurrency_id| {
let metadata = WorkunitMetadata {
desc: Some(desc),
message: None,
level: Level::Info,
blocked: false,
stdout: None,
stderr: None,
};
let mut metadata = WorkunitMetadata::with_level(Level::Info);
metadata.desc = Some(desc);

let metadata_updater = |result: &Result<FallibleProcessResultWithPlatform, String>,
old_metadata| match result {
Expand Down
7 changes: 7 additions & 0 deletions src/rust/engine/src/externs/interface.rs
Expand Up @@ -865,6 +865,13 @@ fn workunit_to_py_value(workunit: &Workunit, core: &Arc<Core>) -> CPyResult<Valu

let mut artifact_entries = Vec::new();

for (artifact_name, digest) in workunit.metadata.artifacts.iter() {
artifact_entries.push((
externs::store_utf8(artifact_name.as_str()),
crate::nodes::Snapshot::store_directory(core, digest),
))
}

if let Some(stdout_digest) = &workunit.metadata.stdout.as_ref() {
artifact_entries.push((
externs::store_utf8("stdout_digest"),
Expand Down
10 changes: 10 additions & 0 deletions src/rust/engine/src/externs/mod.rs
Expand Up @@ -260,6 +260,16 @@ pub fn create_exception(msg: &str) -> Value {
Value::from(with_externs(|py, e| e.call_method(py, "create_exception", (msg,), None)).unwrap())
}

pub fn check_for_python_none(value: Value) -> Option<Value> {
let gil = Python::acquire_gil();
let py = gil.python();

if *value == py.None() {
return None;
}
Some(value)
}

pub fn call_method(value: &Value, method: &str, args: &[Value]) -> Result<Value, Failure> {
let arg_handles: Vec<PyObject> = args.iter().map(|v| v.clone().into()).collect();
let gil = Python::acquire_gil();
Expand Down
66 changes: 46 additions & 20 deletions src/rust/engine/src/nodes.rs
Expand Up @@ -23,7 +23,7 @@ use crate::selectors;
use crate::tasks::{self, Rule};
use boxfuture::{BoxFuture, Boxable};
use bytes::{self, BufMut};
use cpython::{Python, PythonObject};
use cpython::{PyDict, PyString, Python, PythonObject};
use fs::{
self, Dir, DirectoryListing, File, FileContent, GlobExpansionConjunction, GlobMatching, Link,
PathGlobs, PathStat, PreparedPathGlobs, StrictGlobMatching, VFS,
Expand Down Expand Up @@ -810,32 +810,50 @@ impl Task {
future::try_join_all(get_futures).await
}

fn compute_workunit_message(result_val: &Value) -> Option<String> {
let msg_val: Value = externs::call_method(&result_val, "message", &[]).ok()?;
{
let gil = Python::acquire_gil();
let py = gil.python();
fn compute_new_artifacts(result_val: &Value) -> Option<Vec<(String, hashing::Digest)>> {
let artifacts_val: Value = externs::call_method(&result_val, "artifacts", &[]).ok()?;
let artifacts_val: Value = externs::check_for_python_none(artifacts_val)?;
let gil = Python::acquire_gil();
let py = gil.python();
let artifacts_dict: &PyDict = &*artifacts_val.cast_as::<PyDict>(py).ok()?;
let mut output = Vec::new();

for (key, value) in artifacts_dict.items(py).into_iter() {
let key_name: String = match key.cast_as::<PyString>(py) {
Ok(s) => s.to_string_lossy(py).into(),
Err(e) => {
log::warn!(
"Error in EngineAware.artifacts() implementation - non-string key: {:?}",
e
);
return None;
}
};
let digest = match lift_digest(&Value::new(value)) {
Ok(digest) => digest,
Err(e) => {
log::warn!("Error in EngineAware.artifacts() implementation: {}", e);
return None;
}
};

if *msg_val == py.None() {
return None;
}
output.push((key_name, digest));
}

Some(output)
}

fn compute_workunit_message(result_val: &Value) -> Option<String> {
let msg_val: Value = externs::call_method(&result_val, "message", &[]).ok()?;
let msg_val = externs::check_for_python_none(msg_val)?;
Some(externs::val_to_str(&msg_val))
}

fn compute_new_workunit_level(result_val: &Value) -> Option<log::Level> {
use num_enum::TryFromPrimitiveError;

let new_level_val: Value = externs::call_method(&result_val, "level", &[]).ok()?;

{
let gil = Python::acquire_gil();
let py = gil.python();

if *new_level_val == py.None() {
return None;
}
}
let new_level_val = externs::check_for_python_none(new_level_val)?;

let new_py_level: PythonLogLevel = match externs::project_maybe_u64(&new_level_val, "_level")
.and_then(|n: u64| {
Expand Down Expand Up @@ -898,6 +916,7 @@ pub struct PythonRuleOutput {
value: Value,
new_level: Option<log::Level>,
message: Option<String>,
new_artifacts: Vec<(String, hashing::Digest)>,
}

#[async_trait]
Expand Down Expand Up @@ -938,18 +957,20 @@ impl WrappedNode for Task {
}

if result_type == product {
let (new_level, message) = if can_modify_workunit {
let (new_level, message, new_artifacts) = if can_modify_workunit {
(
Self::compute_new_workunit_level(&result_val),
Self::compute_workunit_message(&result_val),
Self::compute_new_artifacts(&result_val).unwrap_or_else(Vec::new),
)
} else {
(None, None)
(None, None, Vec::new())
};
Ok(PythonRuleOutput {
value: result_val,
new_level,
message,
new_artifacts,
})
} else {
Err(throw(&format!(
Expand Down Expand Up @@ -1107,6 +1128,7 @@ impl Node for NodeKey {
blocked: false,
stdout: None,
stderr: None,
artifacts: Vec::new(),
};
let metadata2 = metadata.clone();

Expand All @@ -1126,6 +1148,7 @@ impl Node for NodeKey {

let mut level = metadata.level;
let mut message = None;
let mut artifacts = Vec::new();
let mut result = match maybe_watch {
Ok(()) => match self {
NodeKey::DigestFile(n) => n.run_wrapped_node(context).map_ok(NodeOutput::Digest).await,
Expand Down Expand Up @@ -1162,6 +1185,7 @@ impl Node for NodeKey {
level = new_level;
}
message = python_rule_output.message;
artifacts = python_rule_output.new_artifacts;
NodeOutput::Value(python_rule_output.value)
})
.await
Expand All @@ -1177,6 +1201,7 @@ impl Node for NodeKey {
let final_metadata = WorkunitMetadata {
level,
message,
artifacts,
..metadata
};
(result, final_metadata)
Expand Down Expand Up @@ -1274,6 +1299,7 @@ impl TryFrom<NodeOutput> for PythonRuleOutput {
value: v,
new_level: None,
message: None,
new_artifacts: Vec::new(),
}),
_ => Err(()),
}
Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/workunit_store/src/lib.rs
Expand Up @@ -112,6 +112,7 @@ pub struct WorkunitMetadata {
pub blocked: bool,
pub stdout: Option<hashing::Digest>,
pub stderr: Option<hashing::Digest>,
pub artifacts: Vec<(String, hashing::Digest)>,
}

impl WorkunitMetadata {
Expand All @@ -135,6 +136,7 @@ impl Default for WorkunitMetadata {
blocked: false,
stdout: None,
stderr: None,
artifacts: Vec::new(),
}
}
}
Expand Down