Skip to content

Commit

Permalink
Merge 51e91a0 into 1f41051
Browse files Browse the repository at this point in the history
  • Loading branch information
gshuflin committed Sep 16, 2020
2 parents 1f41051 + 51e91a0 commit 993f416
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 13 deletions.
3 changes: 3 additions & 0 deletions src/python/pants/engine/process.py
Expand Up @@ -51,6 +51,7 @@ class Process:
jdk_home: Optional[str]
is_nailgunnable: bool
execution_slot_variable: Optional[str]
cache_failures: bool

def __init__(
self,
Expand All @@ -68,6 +69,7 @@ def __init__(
jdk_home: Optional[str] = None,
is_nailgunnable: bool = False,
execution_slot_variable: Optional[str] = None,
cache_failures: bool = False,
) -> None:
"""Request to run a subprocess, similar to subprocess.Popen.
Expand Down Expand Up @@ -106,6 +108,7 @@ def __init__(
self.jdk_home = jdk_home
self.is_nailgunnable = is_nailgunnable
self.execution_slot_variable = execution_slot_variable
self.cache_failures = cache_failures


@frozen_after_init
Expand Down
12 changes: 10 additions & 2 deletions src/rust/engine/graph/src/entry.rs
Expand Up @@ -230,6 +230,14 @@ impl<N: Node> Entry<N> {
&self.node
}

pub(crate) fn cacheable_with_output(&self, output: Option<&N::Item>) -> bool {
(if let Some(item) = output {
self.node.cacheable_item(item)
} else {
false
}) && self.node.cacheable()
}

///
/// If this Node is currently complete and clean with the given Generation, then waits for it to
/// be changed in any way. If the node is not clean, or the generation mismatches, returns
Expand Down Expand Up @@ -434,7 +442,7 @@ impl<N: Node> Entry<N> {
entry_id,
run_token,
generation,
if self.node.cacheable() {
if self.cacheable_with_output(Some(result.as_ref())) {
Some(dep_generations)
} else {
None
Expand Down Expand Up @@ -506,7 +514,7 @@ impl<N: Node> Entry<N> {
}
}
Some(Ok(result)) => {
let next_result: EntryResult<N> = if !self.node.cacheable() {
let next_result: EntryResult<N> = if !self.cacheable_with_output(Some(&result)) {
EntryResult::Uncacheable(result, context.session_id().clone())
} else if has_weak_deps {
EntryResult::Dirty(result)
Expand Down
6 changes: 5 additions & 1 deletion src/rust/engine/graph/src/lib.rs
Expand Up @@ -863,7 +863,11 @@ impl<N: Node> Graph<N> {
// This is to allow for the behaviour that an uncacheable Node should always have "dirty"
// (marked as UncacheableDependencies) dependents, transitively.
let entry = inner.entry_for_id(edge_ref.target()).unwrap();
if !entry.node().cacheable() || entry.has_uncacheable_deps() {
let result_item = match result {
Some(Ok(ref item)) => Some(item),
_ => None,
};
if !entry.cacheable_with_output(result_item) || entry.has_uncacheable_deps() {
has_uncacheable_deps = true;
}

Expand Down
9 changes: 8 additions & 1 deletion src/rust/engine/graph/src/node.rs
Expand Up @@ -30,9 +30,16 @@ pub trait Node: Clone + Debug + Display + Eq + Hash + Send + 'static {
async fn run(self, context: Self::Context) -> Result<Self::Item, Self::Error>;

///
/// If the node result is cacheable, return true.
/// If a node's output is cacheable based solely on properties of the node, and not the output,
/// return true.
///
fn cacheable(&self) -> bool;

/// A Node may want to compute cacheability differently based on properties of the Node's item.
/// The output of this method will be and'd with `cacheable` to compute overall cacheability.
fn cacheable_item(&self, _item: &Self::Item) -> bool {
self.cacheable()
}
}

pub trait NodeError: Clone + Debug + Eq + Send {
Expand Down
4 changes: 3 additions & 1 deletion src/rust/engine/process_execution/src/cache.rs
Expand Up @@ -77,8 +77,10 @@ impl crate::CommandRunner for CommandRunner {
}
}

let cache_failures = req.0.values().any(|process| process.cache_failures);

let result = command_runner.underlying.run(req, context).await?;
if result.exit_code == 0 {
if result.exit_code == 0 || cache_failures {
if let Err(err) = command_runner.store(key, &result).await {
warn!(
"Error storing process execution result to local cache: {} - ignoring and continuing",
Expand Down
3 changes: 3 additions & 0 deletions src/rust/engine/process_execution/src/lib.rs
Expand Up @@ -221,6 +221,8 @@ pub struct Process {
pub target_platform: PlatformConstraint,

pub is_nailgunnable: bool,

pub cache_failures: bool,
}

impl Process {
Expand Down Expand Up @@ -250,6 +252,7 @@ impl Process {
target_platform: PlatformConstraint::None,
is_nailgunnable: false,
execution_slot_variable: None,
cache_failures: false,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/process_execution/src/nailgun/mod.rs
Expand Up @@ -65,6 +65,7 @@ fn construct_nailgun_server_request(
target_platform: platform_constraint,
is_nailgunnable: true,
execution_slot_variable: None,
cache_failures: false,
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/process_execution/src/remote_tests.rs
Expand Up @@ -90,6 +90,7 @@ async fn make_execute_request() {
target_platform: PlatformConstraint::None,
is_nailgunnable: false,
execution_slot_variable: None,
cache_failures: false,
};

let mut want_command = bazel_protos::remote_execution::Command::new();
Expand Down Expand Up @@ -176,6 +177,7 @@ async fn make_execute_request_with_instance_name() {
target_platform: PlatformConstraint::None,
is_nailgunnable: false,
execution_slot_variable: None,
cache_failures: false,
};

let mut want_command = bazel_protos::remote_execution::Command::new();
Expand Down Expand Up @@ -275,6 +277,7 @@ async fn make_execute_request_with_cache_key_gen_version() {
target_platform: PlatformConstraint::None,
is_nailgunnable: false,
execution_slot_variable: None,
cache_failures: false,
};

let mut want_command = bazel_protos::remote_execution::Command::new();
Expand Down Expand Up @@ -523,6 +526,7 @@ async fn make_execute_request_with_timeout() {
target_platform: PlatformConstraint::None,
is_nailgunnable: false,
execution_slot_variable: None,
cache_failures: false,
};

let mut want_command = bazel_protos::remote_execution::Command::new();
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/process_executor/src/main.rs
Expand Up @@ -367,6 +367,7 @@ async fn main() {
target_platform: target_platform,
is_nailgunnable,
execution_slot_variable: None,
cache_failures: false,
};

let runner: Box<dyn process_execution::CommandRunner> = match server_arg {
Expand Down
46 changes: 38 additions & 8 deletions src/rust/engine/src/nodes.rs
Expand Up @@ -233,7 +233,10 @@ pub fn lift_digest(digest: &Value) -> Result<hashing::Digest, String> {
/// A Node that represents a set of processes to execute on specific platforms.
///
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct MultiPlatformExecuteProcess(MultiPlatformProcess);
pub struct MultiPlatformExecuteProcess {
cache_failures: bool,
process: MultiPlatformProcess,
}

impl MultiPlatformExecuteProcess {
fn lift_execute_process(
Expand Down Expand Up @@ -300,6 +303,8 @@ impl MultiPlatformExecuteProcess {
}
};

let cache_failures = externs::project_bool(&value, "cache_failures");

Ok(process_execution::Process {
argv: externs::project_multi_strs(&value, "argv"),
env,
Expand All @@ -315,6 +320,7 @@ impl MultiPlatformExecuteProcess {
target_platform,
is_nailgunnable,
execution_slot_variable,
cache_failures,
})
}

Expand All @@ -341,16 +347,22 @@ impl MultiPlatformExecuteProcess {
));
}

let mut cache_failures = true;

let mut request_by_constraint: BTreeMap<(PlatformConstraint, PlatformConstraint), Process> =
BTreeMap::new();
for (constraint_key, execute_process) in constraint_key_pairs.iter().zip(processes.iter()) {
let underlying_req =
MultiPlatformExecuteProcess::lift_execute_process(execute_process, constraint_key.1)?;
if !underlying_req.cache_failures {
cache_failures = false;
}
request_by_constraint.insert(*constraint_key, underlying_req.clone());
}
Ok(MultiPlatformExecuteProcess(MultiPlatformProcess(
request_by_constraint,
)))
Ok(MultiPlatformExecuteProcess {
cache_failures,
process: MultiPlatformProcess(request_by_constraint),
})
}
}

Expand All @@ -365,7 +377,7 @@ impl WrappedNode for MultiPlatformExecuteProcess {
type Item = ProcessResult;

async fn run_wrapped_node(self, context: Context) -> NodeResult<ProcessResult> {
let request = self.0;
let request = self.process;
let execution_context = process_execution::Context::new(
context.session.workunit_store(),
context.session.build_id().to_string(),
Expand Down Expand Up @@ -1092,7 +1104,7 @@ impl NodeKey {
fn workunit_name(&self) -> String {
match self {
NodeKey::Task(ref task) => task.task.display_info.name.clone(),
NodeKey::MultiPlatformExecuteProcess(mp_epr) => mp_epr.0.workunit_name(),
NodeKey::MultiPlatformExecuteProcess(mp_epr) => mp_epr.process.workunit_name(),
NodeKey::Snapshot(..) => "snapshot".to_string(),
NodeKey::Paths(..) => "paths".to_string(),
NodeKey::DigestFile(..) => "digest_file".to_string(),
Expand All @@ -1115,7 +1127,8 @@ impl NodeKey {
NodeKey::Task(ref task) => task.task.display_info.desc.as_ref().map(|s| s.to_owned()),
NodeKey::Snapshot(ref s) => Some(format!("Snapshotting: {}", s.0)),
NodeKey::Paths(ref s) => Some(format!("Finding files: {}", s.0)),
NodeKey::MultiPlatformExecuteProcess(mp_epr) => Some(mp_epr.0.user_facing_name()),
//NodeKey::MultiPlatformExecuteProcess(mp_epr) => Some(mp_epr.0.user_facing_name()),
NodeKey::MultiPlatformExecuteProcess(mp_epr) => Some(mp_epr.process.user_facing_name()),
NodeKey::DigestFile(DigestFile(File { path, .. })) => {
Some(format!("Fingerprinting: {}", path.display()))
}
Expand Down Expand Up @@ -1289,6 +1302,23 @@ impl Node for NodeKey {
_ => true,
}
}

fn cacheable_item(&self, output: &NodeOutput) -> bool {
match self {
NodeKey::MultiPlatformExecuteProcess(ref mp) => match output {
NodeOutput::ProcessResult(ref process_result) => {
let process_succeeded = process_result.0.exit_code == 0;
if mp.cache_failures {
true
} else {
process_succeeded
}
}
_ => true,
},
_ => true,
}
}
}

impl Display for NodeKey {
Expand All @@ -1297,7 +1327,7 @@ impl Display for NodeKey {
&NodeKey::DigestFile(ref s) => write!(f, "DigestFile({})", s.0.path.display()),
&NodeKey::DownloadedFile(ref s) => write!(f, "DownloadedFile({})", s.0),
&NodeKey::MultiPlatformExecuteProcess(ref s) => {
write!(f, "Process({})", s.0.user_facing_name())
write!(f, "Process({})", s.process.user_facing_name())
}
&NodeKey::ReadLink(ref s) => write!(f, "ReadLink({})", (s.0).0.display()),
&NodeKey::Scandir(ref s) => write!(f, "Scandir({})", (s.0).0.display()),
Expand Down

0 comments on commit 993f416

Please sign in to comment.