diff --git a/src/python/pants/engine/internals/native.py b/src/python/pants/engine/internals/native.py index c383478671e..aa871b479ca 100644 --- a/src/python/pants/engine/internals/native.py +++ b/src/python/pants/engine/internals/native.py @@ -69,14 +69,14 @@ def generator_send( if isinstance(res, Get): # Get. return PyGeneratorResponseGet( - res.product_type, res.subject_declared_type, res.subject, + res.product_type, res.subject_declared_type, res.subject, res.weak, ) elif type(res) in (tuple, list): # GetMulti. return PyGeneratorResponseGetMulti( tuple( PyGeneratorResponseGet( - get.product_type, get.subject_declared_type, get.subject, + get.product_type, get.subject_declared_type, get.subject, get.weak, ) for get in res ) diff --git a/src/python/pants/engine/internals/scheduler_test.py b/src/python/pants/engine/internals/scheduler_test.py index 2840adb08d6..772fd5a2bb8 100644 --- a/src/python/pants/engine/internals/scheduler_test.py +++ b/src/python/pants/engine/internals/scheduler_test.py @@ -5,7 +5,7 @@ from contextlib import contextmanager from dataclasses import dataclass from textwrap import dedent -from typing import Any +from typing import Any, FrozenSet from pants.engine.internals.scheduler import ExecutionError from pants.engine.rules import RootRule, rule @@ -116,6 +116,20 @@ async def error_msg_test_rule(union_wrapper: UnionWrapper) -> UnionX: raise AssertionError("The statement above this one should have failed!") +class BooleanDeps(FrozenSet[bool]): + pass + + +@rule +async def boolean_cycle(key: bool) -> BooleanDeps: + """A rule with exactly two instances (bool == two keys), which depend on one another weakly.""" + deps = {key} + dep = await Get(BooleanDeps, bool, not key, weak=True) + if dep is not None: + deps.update(dep) + return BooleanDeps(deps) + + class TypeCheckFailWrapper: """This object wraps another object which will be used to demonstrate a type check failure when the engine processes an `await Get(...)` statement.""" @@ -181,6 +195,7 @@ def rules(cls): RootRule(UnionB), select_union_b, a_union_test, + boolean_cycle, boolean_and_int, RootRule(int), RootRule(bool), @@ -230,6 +245,10 @@ def test_strict_equals(self): # type of a value in equality. assert A() == self.request_single_product(A, Params(1, True)) + def test_weak_gets(self): + assert {True, False} == set(self.request_single_product(BooleanDeps, True)) + assert {True, False} == set(self.request_single_product(BooleanDeps, False)) + @contextmanager def _assert_execution_error(self, expected_msg): with assert_execution_error(self, expected_msg): diff --git a/src/python/pants/engine/selectors.py b/src/python/pants/engine/selectors.py index b889f1481a9..6f8238fbedc 100644 --- a/src/python/pants/engine/selectors.py +++ b/src/python/pants/engine/selectors.py @@ -41,17 +41,19 @@ class Get(GetConstraints, Generic[_ProductType, _SubjectType]): A Get can be constructed in 2 ways with two variants each: + Long form: - a. Get(, , subject) - b. Get[](, subject) + Get(, , subject) + Short form - a. Get(, ()) - b. Get[](()) + Get(, ()) The long form supports providing type information to the rule engine that it could not otherwise infer from the subject variable [1]. Likewise, the short form must use inline construction of the subject in order to convey the subject type to the engine. + The `weak` parameter is an experimental extension: a "weak" Get will return None rather than the + requested value iff the dependency caused by the Get would create a cycle in the dependency + graph. + [1] The engine needs to determine all rule and Get input and output types statically before executing any rules. Since Gets are declared inside function bodies, the only way to extract this information is through a parse of the rule function. The parse analysis is rudimentary and cannot @@ -61,7 +63,9 @@ class Get(GetConstraints, Generic[_ProductType, _SubjectType]): """ @overload - def __init__(self, product_type: Type[_ProductType], subject_arg0: _SubjectType) -> None: + def __init__( + self, product_type: Type[_ProductType], subject_arg0: _SubjectType, *, weak: bool = False + ) -> None: ... @overload @@ -70,6 +74,8 @@ def __init__( product_type: Type[_ProductType], subject_arg0: Type[_SubjectType], subject_arg1: _SubjectType, + *, + weak: bool = False, ) -> None: ... @@ -78,6 +84,8 @@ def __init__( product_type: Type[_ProductType], subject_arg0: Union[Type[_SubjectType], _SubjectType], subject_arg1: Optional[_SubjectType] = None, + *, + weak: bool = False, ) -> None: self.product_type = product_type self.subject_declared_type: Type[_SubjectType] = self._validate_subject_declared_type( @@ -86,6 +94,7 @@ def __init__( self.subject: _SubjectType = self._validate_subject( subject_arg1 if subject_arg1 is not None else subject_arg0 ) + self.weak = weak self._validate_product() diff --git a/src/rust/engine/graph/src/entry.rs b/src/rust/engine/graph/src/entry.rs index e827b26948d..0c94e5d1e70 100644 --- a/src/rust/engine/graph/src/entry.rs +++ b/src/rust/engine/graph/src/entry.rs @@ -25,6 +25,13 @@ impl RunToken { fn next(self) -> RunToken { RunToken(self.0 + 1) } + + /// + /// Returns true if "other" is equal to this RunToken, or this RunToken's predecessor. + /// + pub fn equals_current_or_previous(&self, other: RunToken) -> bool { + self.0 == other.0 || other.next().0 == self.0 + } } /// @@ -52,33 +59,29 @@ impl Generation { /// /// A result from running a Node. /// -/// If the value is Dirty, the consumer should check whether the dependencies of the Node have the -/// same values as they did when this Node was last run; if so, the value can be re-used -/// (and should be marked "Clean"). -/// -/// If the value is Uncacheable it may only be consumed in the same Run that produced it, and should -/// be recomputed in a new Run. -/// -/// A value of type UncacheableDependencies has Uncacheable dependencies, and is treated as -/// equivalent to Dirty in all cases except when `poll`d: since `poll` requests are waiting for -/// meaningful work to do, they need to differentiate between a truly invalidated/changed (Dirty) -/// Node and a Node that would be re-cleaned once per session. -/// -/// If the value is Clean, the consumer can simply use the value as-is. -/// #[derive(Clone, Debug)] pub enum EntryResult { + // The value is Clean, and the consumer can simply use it as-is. Clean(N::Item), - UncacheableDependencies(N::Item), + // If the value is Dirty, the consumer should check whether the dependencies of the Node have the + // same values as they did when this Node was last run; if so, the value can be re-used + // (and should be marked "Clean"). Dirty(N::Item), - Uncacheable(N::Item, <::Context as NodeContext>::RunId), + // Uncacheable values may only be consumed in the same Session that produced them, and should + // be recomputed in a new Session. + Uncacheable(N::Item, <::Context as NodeContext>::SessionId), + // A value of type UncacheableDependencies has Uncacheable dependencies, and is treated as + // equivalent to Dirty in all cases except when `poll`d: since `poll` requests are waiting for + // meaningful work to do, they need to differentiate between a truly invalidated/changed (Dirty) + // Node and a Node that would be re-cleaned once per session. + UncacheableDependencies(N::Item), } impl EntryResult { fn is_clean(&self, context: &N::Context) -> bool { match self { EntryResult::Clean(..) => true, - EntryResult::Uncacheable(_, run_id) => context.run_id() == run_id, + EntryResult::Uncacheable(_, session_id) => context.session_id() == session_id, EntryResult::Dirty(..) => false, EntryResult::UncacheableDependencies(..) => false, } @@ -95,7 +98,7 @@ impl EntryResult { /// currently to clean it). fn poll_should_wait(&self, context: &N::Context) -> bool { match self { - EntryResult::Uncacheable(_, run_id) => context.run_id() == run_id, + EntryResult::Uncacheable(_, session_id) => context.session_id() == session_id, EntryResult::Dirty(..) => false, EntryResult::UncacheableDependencies(_) | EntryResult::Clean(..) => true, } @@ -286,25 +289,38 @@ impl Entry { previous_result: Option>, ) -> EntryState { // Increment the RunToken to uniquely identify this work. + let previous_run_token = run_token; let run_token = run_token.next(); - let context = context_factory.clone_for(entry_id); + let context = context_factory.clone_for(entry_id, run_token); let node = node.clone(); let (abort_handle, abort_registration) = AbortHandle::new_pair(); + trace!( + "Running node {:?} with {:?}. It was: previous_result={:?}", + node, + run_token, + previous_result, + ); context_factory.spawn(async move { // If we have previous result generations, compare them to all current dependency // generations (which, if they are dirty, will cause recursive cleaning). If they // match, we can consider the previous result value to be clean for reuse. let was_clean = if let Some(previous_dep_generations) = previous_dep_generations { - match context.graph().dep_generations(entry_id, &context).await { + trace!("Getting deps to attempt to clean {}", node); + match context + .graph() + .dep_generations(entry_id, previous_run_token, &context) + .await + { Ok(ref dep_generations) if dep_generations == &previous_dep_generations => { + trace!("Deps matched: {} is clean.", node); // Dependencies have not changed: Node is clean. true } _ => { - // If dependency generations mismatched or failed to fetch, clear its - // dependencies and indicate that it should re-run. - context.graph().clear_deps(entry_id, run_token); + // If dependency generations mismatched or failed to fetch, indicate that the Node + // should re-run. + trace!("Deps did not match: {} needs to re-run.", node); false } } @@ -403,11 +419,6 @@ impl Entry { result, dep_generations, } => { - trace!( - "Re-starting node {:?}. It was: previous_result={:?}", - self.node, - result, - ); assert!( !result.is_clean(context), "A clean Node should not reach this point: {:?}", @@ -453,20 +464,14 @@ impl Entry { /// result should be used. This special case exists to avoid 1) cloning the result to call this /// method, and 2) comparing the current/previous results unnecessarily. /// - /// Takes a &mut InnerGraph to ensure that completing nodes doesn't race with dirtying them. - /// The important relationship being guaranteed here is that if the Graph is calling - /// invalidate_from_roots, it may mark us, or our dependencies, as dirty. We don't want to - /// complete _while_ a batch of nodes are being marked as dirty, and this exclusive access ensures - /// that can't happen. - /// pub(crate) fn complete( - &mut self, + &self, context: &N::Context, result_run_token: RunToken, dep_generations: Vec, result: Option>, has_uncacheable_deps: bool, - _graph: &mut super::InnerGraph, + has_weak_deps: bool, ) { let mut state = self.state.lock(); @@ -481,7 +486,6 @@ impl Entry { "Not completing node {:?} because it was invalidated.", self.node ); - return; } } @@ -506,19 +510,22 @@ impl Entry { } } Some(Ok(result)) => { - // If the new result does not match the previous result, the generation increments. let next_result: EntryResult = if !self.node.cacheable() { - EntryResult::Uncacheable(result, context.run_id().clone()) + EntryResult::Uncacheable(result, context.session_id().clone()) + } else if has_weak_deps { + EntryResult::Dirty(result) } else if has_uncacheable_deps { EntryResult::UncacheableDependencies(result) } else { EntryResult::Clean(result) }; + // If the new result does not match the previous result, the generation increments. if Some(next_result.as_ref()) != previous_result.as_ref().map(EntryResult::as_ref) { // Node was re-executed (ie not cleaned) and had a different result value. generation = generation.next() }; self.notify_waiters(waiters, Ok((next_result.as_ref().clone(), generation))); + EntryState::Completed { result: next_result, pollers: Vec::new(), @@ -595,10 +602,7 @@ impl Entry { } /// - /// Get the current RunToken of this entry. - /// - /// TODO: Consider moving the Generation and RunToken out of the EntryState once we decide what - /// we want the per-Entry locking strategy to be. + /// Get the RunToken of this entry regardless of whether it is running. /// pub(crate) fn run_token(&self) -> RunToken { match *self.state.lock() { @@ -609,19 +613,22 @@ impl Entry { } /// - /// Clears the state of this Node, forcing it to be recomputed. + /// Get the current RunToken of this entry iff it is currently running. /// - /// # Arguments + pub(crate) fn running_run_token(&self) -> Option { + match *self.state.lock() { + EntryState::Running { run_token, .. } => Some(run_token), + _ => None, + } + } + /// - /// * `graph_still_contains_edges` - If the caller has guaranteed that all edges from this Node - /// have been removed from the graph, they should pass false here, else true. We may want to - /// remove this parameter, and force this method to remove the edges, but that would require - /// acquiring the graph lock here, which we currently don't do. + /// Clears the state of this Node, forcing it to be recomputed. /// - pub(crate) fn clear(&mut self, graph_still_contains_edges: bool) { + pub(crate) fn clear(&mut self) { let mut state = self.state.lock(); - let (run_token, generation, mut previous_result) = + let (run_token, generation, previous_result) = match mem::replace(&mut *state, EntryState::initial()) { EntryState::NotStarted { run_token, @@ -649,13 +656,8 @@ impl Entry { trace!("Clearing node {:?}", self.node); - if graph_still_contains_edges { - if let Some(previous_result) = previous_result.as_mut() { - previous_result.dirty(); - } - } - - // Swap in a state with a new RunToken value, which invalidates any outstanding work. + // Swap in a state with a new RunToken value, which invalidates any outstanding work and all + // edges for the previous run. *state = EntryState::NotStarted { run_token: run_token.next(), generation, diff --git a/src/rust/engine/graph/src/lib.rs b/src/rust/engine/graph/src/lib.rs index 3fd38943255..179eab16bc0 100644 --- a/src/rust/engine/graph/src/lib.rs +++ b/src/rust/engine/graph/src/lib.rs @@ -32,8 +32,8 @@ pub mod entry; mod node; -pub use crate::entry::{Entry, EntryState}; -use crate::entry::{Generation, RunToken}; +use crate::entry::Generation; +pub use crate::entry::{Entry, EntryResult, RunToken}; use std::collections::{HashMap, HashSet, VecDeque}; use std::fs::File; @@ -44,18 +44,39 @@ use std::time::Duration; use fnv::FnvHasher; use futures::future; -use log::{debug, info, trace, warn}; +use log::{debug, info, trace}; use parking_lot::Mutex; use petgraph::graph::DiGraph; use petgraph::visit::EdgeRef; use petgraph::Direction; use tokio::time::delay_for; -pub use crate::node::{EntryId, Node, NodeContext, NodeError, NodeVisualizer}; +pub use crate::node::{EdgeId, EntryId, Node, NodeContext, NodeError, NodeVisualizer}; type FNV = BuildHasherDefault; -type PGraph = DiGraph, f32, u32>; +type PGraph = DiGraph, (EdgeType, RunToken), u32>; + +/// +/// When an edge is created, it is created with one of two types. +/// +/// A "strong" edge is required, and will always either return the value of the Node it depends +/// on, or fail if the creation of the edge would result in a cycle of strong edges. +/// +/// A "weak" edge is optional, in that if adding a weak edge would create a cycle in the graph, the +/// request for the value may return None rather than failing. +/// +/// TODO: Currently we do not allow a Node with a weak dependency to participate in a cycle with +/// itself that involves a strong edge. This means that entering a `strong-weak` cycle from one +/// side rather than the other has a different result (namely, one side will fail with a cycle +/// error, while the other doesn't). This can be worked around by making both edges weak. +/// see https://github.com/pantsbuild/pants/issues/10229 +/// +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] +enum EdgeType { + Weak, + Strong, +} #[derive(Debug, Eq, PartialEq)] pub struct InvalidationResult { @@ -102,29 +123,46 @@ impl InnerGraph { id } + fn add_edge( + &mut self, + src_id: EntryId, + dst_id: EntryId, + edge_type: EdgeType, + run_token: RunToken, + ) { + trace!( + "Adding dependency {:?} from {:?} to {:?}", + (edge_type, run_token), + self.entry_for_id(src_id).unwrap().node(), + self.entry_for_id(dst_id).unwrap().node(), + ); + self.pg.add_edge(src_id, dst_id, (edge_type, run_token)); + } + /// - /// Detect whether adding an edge from src to dst would create a cycle. - /// - /// Returns a path which would cause the cycle if an edge were added from src to dst, or None if - /// no cycle would be created. + /// Detect whether adding an edge from src to dst would create a cycle and returns a path which + /// would represent the cycle if an edge were added from src to dst. Returns None if no cycle + /// would be created. /// - /// This strongly optimizes for the case of no cycles. If cycles are detected, this is very - /// expensive to call. + /// This is a very expensive method relative to `detect_cycle`: if you don't need the cyclic + /// path, prefer to call `detect_cycle`. /// - fn report_cycle(&self, src_id: EntryId, dst_id: EntryId) -> Option>> { - if src_id == dst_id { - let entry = self.entry_for_id(src_id).unwrap(); - return Some(vec![entry.clone(), entry.clone()]); - } - if !self.detect_cycle(src_id, dst_id) { + fn detect_and_compute_cycle( + &self, + src_id: EntryId, + dst_id: EntryId, + should_include_edge: impl Fn(EdgeId) -> bool, + ) -> Option> { + if !self.detect_cycle(src_id, dst_id, &should_include_edge) { return None; } - Self::shortest_path(&self.pg, dst_id, src_id).map(|mut path| { + + Self::shortest_path(&self.pg, dst_id, src_id, should_include_edge).map(|mut path| { path.reverse(); path.push(dst_id); path .into_iter() - .map(|index| self.entry_for_id(index).unwrap().clone()) + .map(|node_index| self.unsafe_entry_for_id(node_index).node().clone()) .collect() }) } @@ -135,7 +173,12 @@ impl InnerGraph { /// Uses Dijkstra's algorithm, which is significantly cheaper than the Bellman-Ford, but keeps /// less context around paths on the way. /// - fn detect_cycle(&self, src_id: EntryId, dst_id: EntryId) -> bool { + fn detect_cycle( + &self, + src_id: EntryId, + dst_id: EntryId, + should_include_edge: impl Fn(EdgeId) -> bool, + ) -> bool { // Search either forward from the dst, or backward from the src. let (root, needle, direction) = { let out_from_dst = self.pg.neighbors(dst_id).count(); @@ -154,7 +197,7 @@ impl InnerGraph { let mut roots = VecDeque::new(); roots.push_back(root); self - .walk(roots, direction, |_| false) + .walk(roots, direction, should_include_edge) .any(|eid| eid == needle) } @@ -164,9 +207,33 @@ impl InnerGraph { /// Uses Bellman-Ford, which is pretty expensive O(VE) as it has to traverse the whole graph and /// keeping a lot of state on the way. /// - fn shortest_path(graph: &PGraph, src: EntryId, dst: EntryId) -> Option> { - let (_path_weights, paths) = petgraph::algo::bellman_ford(graph, src) - .expect("There should not be any negative edge weights"); + fn shortest_path( + graph: &PGraph, + src: EntryId, + dst: EntryId, + should_include_edge: impl Fn(EdgeId) -> bool, + ) -> Option> { + let (_path_weights, paths) = { + // We map the graph to an empty graph with the same node structure, but with potentially + // fewer edges (based on the predicate). Edges are given equal-weighted float edges, because + // bellman_ford requires weights. + // + // Because the graph has identical nodes, it also has identical node indices (guaranteed by + // `filter_map`), and we can use the returned path structure as indices into the original + // graph. + let float_graph = graph.filter_map( + |_entry_id, _node| Some(()), + |edge_index, _edge_type| { + if should_include_edge(edge_index) { + Some(1.0_f32) + } else { + None + } + }, + ); + petgraph::algo::bellman_ford(&float_graph, src) + .expect("There should not be any negative edge weights") + }; let mut next = dst; let mut path = Vec::new(); @@ -274,25 +341,59 @@ impl InnerGraph { /// The Walk will iterate over all nodes that descend from the roots in the direction of /// traversal but won't necessarily be in topological order. /// - fn walk bool>( + fn walk bool>( &self, roots: VecDeque, direction: Direction, - stop_walking_predicate: F, + should_walk_edge: F, ) -> Walk<'_, N, F> { Walk { graph: self, direction: direction, deque: roots, walked: HashSet::default(), - stop_walking_predicate, + should_walk_edge, + } + } + + /// + /// A running edge is an edge leaving a Running node with a matching RunToken: ie, an edge that was + /// created by the active run of a node. Running edges are not allowed to form cycles, as that could + /// cause work to deadlock on itself. + /// + /// "Running" edges are a subset of "live" edges: see `live_edge_predicate` + /// + fn running_edge_predicate<'a>(inner: &'a InnerGraph) -> impl Fn(EdgeId) -> bool + 'a { + move |edge_id| { + let (edge_src_id, _) = inner.pg.edge_endpoints(edge_id).unwrap(); + if let Some(running_run_token) = inner.unsafe_entry_for_id(edge_src_id).running_run_token() { + // Only include the edge if the Node is running, and the edge is for this run. + inner.pg[edge_id].1 == running_run_token + } else { + // Node is not running. + false + } + } + } + + /// + /// A live edge is an edge for the current RunToken of a Node, regardless of whether the Node is + /// currently running. + /// + /// "Live" edges are a superset of "running" edges: see `running_edge_predicate` + /// + fn live_edge_predicate<'a>(inner: &'a InnerGraph) -> impl Fn(EdgeId) -> bool + 'a { + move |edge_id| { + let (edge_src_id, _) = inner.pg.edge_endpoints(edge_id).unwrap(); + // Only include the edge if it is live. + inner.pg[edge_id].1 == inner.unsafe_entry_for_id(edge_src_id).run_token() } } fn clear(&mut self) { for eid in self.nodes.values() { if let Some(entry) = self.pg.node_weight_mut(*eid) { - entry.clear(true); + entry.clear(); } } } @@ -319,12 +420,12 @@ impl InnerGraph { } }) .collect(); - // And their transitive dependencies, which will be dirtied. + // And their live transitive dependencies, which will be dirtied. let transitive_ids: Vec<_> = self .walk( root_ids.iter().cloned().collect(), Direction::Incoming, - |_| false, + Self::live_edge_predicate(&self), ) .filter(|eid| !root_ids.contains(eid)) .collect(); @@ -334,23 +435,16 @@ impl InnerGraph { dirtied: transitive_ids.len(), }; - // Clear roots and remove their outbound edges. + // Clear the roots. for id in &root_ids { if let Some(entry) = self.pg.node_weight_mut(*id) { - entry.clear(false); + entry.clear(); } } - self.pg.retain_edges(|pg, edge| { - if let Some((src, _)) = pg.edge_endpoints(edge) { - !root_ids.contains(&src) - } else { - true - } - }); // Dirty transitive entries, but do not yet clear their output edges. We wait to clear // outbound edges until we decide whether we can clean an entry: if we can, all edges are - // preserved; if we can't, they are cleared in `Graph::clear_deps`. + // preserved; if we can't, they are eventually cleaned in `Graph::garbage_collect_edges`. for id in &transitive_ids { if let Some(mut entry) = self.pg.node_weight_mut(*id).cloned() { entry.dirty(self); @@ -386,7 +480,7 @@ impl InnerGraph { .cloned() .collect(); - for eid in self.walk(root_entries, Direction::Outgoing, |_| false) { + for eid in self.walk(root_entries, Direction::Outgoing, |_| true) { let entry = self.unsafe_entry_for_id(eid); let node_str = entry.format(context); @@ -424,7 +518,11 @@ impl InnerGraph { .collect(); self.live_internal( self - .walk(root_ids, Direction::Outgoing, |_| false) + .walk( + root_ids, + Direction::Outgoing, + Self::live_edge_predicate(&self), + ) .collect(), context.clone(), ) @@ -447,7 +545,26 @@ impl InnerGraph { } /// -/// A DAG (enforced on mutation) of Entries. +/// A potentially cyclic graph of Nodes and their dependencies. +/// +/// ---- +/// +/// A note on cycles: We track the dependencies of Nodes for two primary reasons: +/// +/// 1. To allow for invalidation/dirtying/clearing when the transitive dependencies of a Node +/// have changed. See `invalidate_from_roots` for more information on this usecase. +/// 2. To detect situations where a running Node might depend on its own result, which would +/// cause it to deadlock. +/// +/// The first usecase (invalidation) does not care about cycles in the graph: if a Node +/// transitively depends on a previous version of itself, that's ok, as invalidation will dirty +/// the entire cycle. +/// +/// The second case does care about cycle detection though, so when new dependencies are introduced +/// in the graph we cycle detect for the case where a running Node might depend on its own result +/// (as determined by the RunToken). Notably, this does _not_ prevent a Node from depending on a +/// previous run of itself, as that cannot cause a deadlock: the two computations are independent. +/// See Graph::report_cycle for more information. /// pub struct Graph { inner: Mutex>, @@ -477,10 +594,10 @@ impl Graph { async fn get_inner( &self, - src_id: Option, context: &N::Context, dst_node: N, - ) -> Result<(N::Item, Generation), N::Error> { + edge_type: EdgeType, + ) -> Result, N::Error> { // Compute information about the dst under the Graph lock, and then release it. let (dst_retry, mut entry, entry_id) = { // Get or create the destination, and then insert the dep and return its state. @@ -489,25 +606,41 @@ impl Graph { // TODO: doing cycle detection under the lock... unfortunate, but probably unavoidable // without a much more complicated algorithm. let dst_id = inner.ensure_entry(dst_node); - let dst_retry = if let Some(src_id) = src_id { - if let Some(cycle_path) = Self::report_cycle(src_id, dst_id, &mut inner, context) { - // Cyclic dependency: render an error. - let path_strs = cycle_path - .into_iter() - .map(|e| e.node().to_string()) - .collect(); - return Err(N::Error::cyclic(path_strs)); + let dst_retry = if let Some((src_id, run_token)) = context.entry_id_and_run_token() { + // See whether adding this edge would create a cycle. + if inner.detect_cycle(src_id, dst_id, InnerGraph::running_edge_predicate(&inner)) { + // If we have detected a cycle, the type of edge becomes relevant: a strong edge will + // report the cycle as a failure, while a weak edge will go ahead and add the dependency, + // but return None to indicate that it isn't consumable. + match edge_type { + EdgeType::Strong => { + if let Some(cycle_path) = inner.detect_and_compute_cycle( + src_id, + dst_id, + InnerGraph::running_edge_predicate(&inner), + ) { + debug!( + "Detected cycle considering adding edge from {:?} to {:?}; existing path: {:?}", + inner.entry_for_id(src_id).unwrap().node(), + inner.entry_for_id(dst_id).unwrap().node(), + cycle_path + ); + // Cyclic dependency: render an error. + let path_strs = cycle_path.into_iter().map(|n| n.to_string()).collect(); + return Err(N::Error::cyclic(path_strs)); + } + } + EdgeType::Weak => { + // NB: A weak edge is still recorded, as the result can affect the behavior of the + // node, and nodes with weak edges complete as Dirty to allow them to re-run. + inner.add_edge(src_id, dst_id, edge_type, run_token); + return Ok(None); + } + } } // Valid dependency. - trace!( - "Adding dependency from {:?} to {:?}", - inner.entry_for_id(src_id).unwrap().node(), - inner.entry_for_id(dst_id).unwrap().node() - ); - // All edges get a weight of 1.0 so that we can Bellman-Ford over the graph, treating each - // edge as having equal weight. - inner.pg.add_edge(src_id, dst_id, 1.0); + inner.add_edge(src_id, dst_id, edge_type, run_token); // We can retry the dst Node if the src Node is not cacheable. If the src is not cacheable, // it only be allowed to run once, and so Node invalidation does not pass through it. @@ -531,7 +664,7 @@ impl Graph { let context = context.clone(); loop { match entry.get(&context, entry_id).await { - Ok(r) => break Ok(r), + Ok(r) => break Ok(Some(r)), Err(err) if err == N::Error::invalidated() => { let node = { let inner = self.inner.lock(); @@ -549,12 +682,12 @@ impl Graph { } } else { // Not retriable. - entry.get(context, entry_id).await + Ok(Some(entry.get(context, entry_id).await?)) } } /// - /// Request the given dst Node, optionally in the context of the given src Node. + /// Request the given dst Node in the given Context (which might represent a src Node). /// /// If there is no src Node, or the src Node is not cacheable, this method will retry for /// invalidation until the Node completes. @@ -562,21 +695,46 @@ impl Graph { /// Invalidation events in the graph (generally, filesystem changes) will cause cacheable Nodes /// to be retried here for up to `invalidation_timeout`. /// - pub async fn get( + pub async fn get(&self, context: &N::Context, dst_node: N) -> Result { + match self.get_inner(context, dst_node, EdgeType::Strong).await { + Ok(Some((res, _generation))) => Ok(res), + Err(e) => Err(e), + Ok(None) => unreachable!("A strong dependency cannot return None."), + } + } + + /// + /// Identical to Get, but if a cycle would be created by the dependency, returns None instead. + /// + pub async fn get_weak( &self, - src_id: Option, context: &N::Context, dst_node: N, - ) -> Result { - let (res, _generation) = self.get_inner(src_id, context, dst_node).await?; - Ok(res) + ) -> Result, N::Error> { + match self.get_inner(context, dst_node, EdgeType::Weak).await { + Ok(Some((res, _generation))) => Ok(Some(res)), + Ok(None) => Ok(None), + Err(e) => Err(e), + } } /// - /// Return the value of the given Node. Shorthand for `self.get(None, context, node)`. + /// Return the value of the given Node. This is a synonym for `self.get(context, node)`, but it + /// is expected to be used by callers requesting node values from the graph, while `self.get` is + /// also used by Nodes to request dependencies.. /// pub async fn create(&self, node: N, context: &N::Context) -> Result { - self.get(None, context, node).await + let result = self.get(context, node).await; + // In the background, garbage collect edges. + // NB: This could safely occur at any frequency: if it ever shows up in profiles, we could make + // it optional based on how many edges are garbage. + context.spawn({ + let context = context.clone(); + async move { + context.graph().garbage_collect_edges(); + } + }); + result } /// @@ -605,65 +763,13 @@ impl Graph { }; // Re-request the Node. - let (res, generation) = self.get_inner(None, context, node).await?; + let (res, generation) = self + .get_inner(context, node, EdgeType::Strong) + .await? + .expect("A strong dependency cannot return None."); Ok((res, LastObserved(generation))) } - fn report_cycle( - src_id: EntryId, - potential_dst_id: EntryId, - inner: &mut InnerGraph, - context: &N::Context, - ) -> Option>> { - let mut counter = 0; - loop { - // Find one cycle if any cycles exist. - if let Some(cycle_path) = inner.report_cycle(src_id, potential_dst_id) { - // See if the cycle contains any dirty nodes. If there are dirty nodes, we can try clearing - // them, and then check if there are still any cycles in the graph. - let dirty_nodes: HashSet<_> = cycle_path - .iter() - .filter(|n| !n.is_clean(context)) - .map(|n| n.node().clone()) - .collect(); - if dirty_nodes.is_empty() { - // We detected a cycle with no dirty nodes - there's a cycle and there's nothing we can do - // to remove it. We only log at debug because the UI will render the cycle. - debug!( - "Detected cycle considering adding edge from {:?} to {:?}; existing path: {:?}", - inner.entry_for_id(src_id).unwrap(), - inner.entry_for_id(potential_dst_id).unwrap(), - cycle_path - ); - return Some(cycle_path); - } - counter += 1; - // Obsolete edges from a dirty node may cause fake cycles to be detected if there was a - // dirty dep from A to B, and we're trying to add a dep from B to A. - // If we detect a cycle that contains dirty nodes (and so potentially obsolete edges), - // we repeatedly cycle-detect, clearing (and re-running) and dirty nodes (and their edges) - // that we encounter. - // - // We do this repeatedly, because there may be multiple paths which would cause cycles, - // which contain dirty nodes. If we've cleared 10 separate paths which contain dirty nodes, - // and are still detecting cycle-causing paths containing dirty nodes, give up. 10 is a very - // arbitrary number, which we can increase if we find real graphs in the wild which hit this - // limit. - if counter > 10 { - warn!( - "Couldn't remove cycle containing dirty nodes after {} attempts; nodes in cycle: {:?}", - counter, cycle_path - ); - return Some(cycle_path); - } - // Clear the dirty nodes, removing the edges from them, and try again. - inner.invalidate_from_roots(|node| dirty_nodes.contains(node)); - } else { - return None; - } - } - } - /// /// Calculate the critical path for the subset of the graph that descends from these roots, /// assuming this mapping between entries and durations. @@ -676,93 +782,59 @@ impl Graph { } /// - /// Gets the generations of the dependencies of the given EntryId, (re)computing or cleaning - /// them first if necessary. + /// Gets the generations of the dependencies of the given EntryId at the given RunToken, + /// (re)computing or cleaning them first if necessary. /// async fn dep_generations( &self, entry_id: EntryId, + run_token: RunToken, context: &N::Context, ) -> Result, N::Error> { - let generations = { + let dep_nodes = { let inner = self.inner.lock(); - let dep_ids = inner + inner .pg - .neighbors_directed(entry_id, Direction::Outgoing) - .collect::>(); - - dep_ids - .into_iter() - .map(|dep_id| { - let mut entry = inner - .entry_for_id(dep_id) - .unwrap_or_else(|| panic!("Dependency not present in Graph.")) - .clone(); - async move { - let (_, generation) = entry.get(context, dep_id).await?; - Ok(generation) + .edges_directed(entry_id, Direction::Outgoing) + .filter_map(|edge_ref| { + if edge_ref.weight().1 == run_token { + let entry = inner + .entry_for_id(edge_ref.target()) + .unwrap_or_else(|| panic!("Dependency not present in Graph.")) + .clone(); + Some((edge_ref.weight().0, entry.node().clone())) + } else { + None } }) .collect::>() }; - future::try_join_all(generations).await - } - - /// - /// Clears the dependency edges of the given EntryId if the RunToken matches. - /// - fn clear_deps(&self, entry_id: EntryId, run_token: RunToken) { - let mut inner = self.inner.lock(); - // If the RunToken mismatches, return. - if let Some(entry) = inner.entry_for_id(entry_id) { - if entry.run_token() != run_token { - return; - } - } - - // Otherwise, clear the deps. - // NB: Because `remove_edge` changes EdgeIndex values, we remove edges one at a time. - while let Some(dep_edge) = inner - .pg - .edges_directed(entry_id, Direction::Outgoing) - .next() - .map(|edge| edge.id()) - { - inner.pg.remove_edge(dep_edge); - } + let generations = dep_nodes + .into_iter() + .map(|(edge_type, node)| async move { + Ok( + self + .get_inner(context, node, edge_type) + .await? + .map(|(_, generation)| generation), + ) + }) + .collect::>(); + // Weak edges might have returned None: we filter those out, and expect that it might cause the + // Node to fail to be cleaned. + Ok( + future::try_join_all(generations) + .await? + .into_iter() + .filter_map(std::convert::identity) + .collect(), + ) } /// - /// When the Executor finishes executing a Node it calls back to store the result value. We use - /// the run_token and dirty bits to determine whether the Node changed while we were busy - /// executing it, so that we can discard the work. - /// - /// We use the dirty bit in addition to the RunToken in order to avoid cases where dependencies - /// change while we're running. In order for a dependency to "change" it must have been cleared - /// or been marked dirty. But if our dependencies have been cleared or marked dirty, then we will - /// have been as well. We can thus use the dirty bit as a signal that the generation values of - /// our dependencies are still accurate. The dirty bit is safe to rely on as it is only ever - /// mutated, and dependencies' dirty bits are only read, under the InnerGraph lock - this is only - /// reliably the case because Entry happens to require a &mut InnerGraph reference; it would be - /// great not to violate that in the future. - /// - /// TODO: We don't track which generation actually added which edges, so over time nodes will end - /// up with spurious dependencies. This is mostly sound, but may lead to over-invalidation and - /// doing more work than is necessary. - /// As an example, if generation 0 of X depends on A and B, and generation 1 of X depends on C, - /// nothing will prune the dependencies from X onto A and B, so generation 1 of X will have - /// dependencies on A, B, and C in the graph, even though running it only depends on C. - /// At some point we should address this, but we must be careful with how we do so; anything which - /// ties together the generation of a node with specifics of edges would require careful - /// consideration of locking (probably it would require merging the EntryState locks and Graph - /// locks, or working out something clever). - /// - /// It would also require careful consideration of nodes in the Running EntryState - these may - /// have previous RunToken edges and next RunToken edges which collapse into the same Generation - /// edges; when working out whether a dirty node is really clean, care must be taken to avoid - /// spurious cycles. Currently we handle this as a special case by, if we detect a cycle that - /// contains dirty nodes, clearing those nodes (removing any edges from them). This is a little - /// hacky, but will tide us over until we fully solve this problem. + /// When the Executor finishes executing a Node it calls back to store the result value. + /// Entry::complete uses the run_token to determine whether the Node changed while we were busy + /// executing it, so that it can discard the work. /// fn complete( &self, @@ -771,43 +843,68 @@ impl Graph { run_token: RunToken, result: Option>, ) { - let (entry, has_uncacheable_deps, dep_generations) = { + let (entry, has_uncacheable_deps, has_weak_deps, dep_generations) = { let inner = self.inner.lock(); let mut has_uncacheable_deps = false; + let mut has_weak_deps = false; // Get the Generations of all dependencies of the Node. We can trust that these have not changed - // since we began executing, as long as we are not currently marked dirty (see the method doc). + // since we began executing, as long as the entry's RunToken is still valid (confirmed in + // Entry::complete). let dep_generations = inner .pg - .neighbors_directed(entry_id, Direction::Outgoing) - .filter_map(|dep_id| inner.entry_for_id(dep_id)) - .map(|entry| { - // If a dependency is itself uncacheable or has uncacheable deps, this Node should - // also complete as having uncacheable dpes, independent of matching Generation values. - // This is to allow for the behaviour that an uncacheable Node should always have "dirty" - // (marked as UncacheableDependencies) dependents, transitively. - if !entry.node().cacheable() || entry.has_uncacheable_deps() { - has_uncacheable_deps = true; + .edges_directed(entry_id, Direction::Outgoing) + .filter_map(|edge_ref| { + if edge_ref.weight().1 == run_token { + if edge_ref.weight().0 == EdgeType::Weak { + has_weak_deps = true; + } + // If a dependency is itself uncacheable or has uncacheable deps, this Node should + // also complete as having uncacheable deps, independent of matching Generation values. + // This is to allow for the behaviour that an uncacheable Node should always have "dirty" + // (marked as UncacheableDependencies) dependents, transitively. + let entry = inner.entry_for_id(edge_ref.target()).unwrap(); + if !entry.node().cacheable() || entry.has_uncacheable_deps() { + has_uncacheable_deps = true; + } + + Some(entry.generation()) + } else { + None } - entry.generation() }) .collect(); - ( - inner.entry_for_id(entry_id).cloned(), - has_uncacheable_deps, - dep_generations, - ) + + let entry = inner.entry_for_id(entry_id).unwrap().clone(); + (entry, has_uncacheable_deps, has_weak_deps, dep_generations) }; - if let Some(mut entry) = entry { - let mut inner = self.inner.lock(); - entry.complete( - context, - run_token, - dep_generations, - result, - has_uncacheable_deps, - &mut inner, - ); - } + + // Attempt to complete the Node outside the graph lock. + entry.complete( + context, + run_token, + dep_generations, + result, + has_uncacheable_deps, + has_weak_deps, + ); + } + + /// + /// Garbage collects all dependency edges that are not associated with the previous or current run + /// of a Node. Node cleaning consumes the previous edges for an operation, so we preserve those. + /// + /// This is executed as a bulk operation, because individual edge removals take O(n), and bulk + /// edge filtering is both more efficient, and possible to do asynchronously. + /// + pub fn garbage_collect_edges(&self) { + let mut inner = self.inner.lock(); + inner.pg.retain_edges(|pg, edge_index| { + let (edge_src_id, _) = pg.edge_endpoints(edge_index).unwrap(); + // Retain the edge if it is for either the current or previous run of a Node. + pg[edge_src_id] + .run_token() + .equals_current_or_previous(pg[edge_index].1) + }); } /// @@ -876,33 +973,46 @@ pub struct LastObserved(Generation); /// Represents the state of a particular walk through a Graph. Implements Iterator and has the same /// lifetime as the Graph itself. /// -struct Walk<'a, N: Node, F> -where - F: Fn(&EntryId) -> bool, -{ +struct Walk<'a, N: Node, F: Fn(EdgeId) -> bool> { graph: &'a InnerGraph, direction: Direction, deque: VecDeque, walked: HashSet, - stop_walking_predicate: F, + should_walk_edge: F, } -impl<'a, N: Node + 'a, F: Fn(&EntryId) -> bool> Iterator for Walk<'a, N, F> { +impl<'a, N: Node + 'a, F: Fn(EdgeId) -> bool> Iterator for Walk<'a, N, F> { type Item = EntryId; fn next(&mut self) -> Option { while let Some(id) = self.deque.pop_front() { - // Visit this node and it neighbors if this node has not yet be visited and we aren't - // stopping our walk at this node, based on if it satisfies the stop_walking_predicate. + // Visit this node and its neighbors if this node has not yet be visited and we aren't + // stopping our walk at this node, based on if it satisfies the should_walk_edge. // This mechanism gives us a way to selectively dirty parts of the graph respecting node boundaries // like uncacheable nodes, which shouldn't be dirtied. - if !self.walked.insert(id) || (self.stop_walking_predicate)(&id) { + if !self.walked.insert(id) { continue; } - self - .deque - .extend(self.graph.pg.neighbors_directed(id, self.direction)); + let should_walk_edge = &self.should_walk_edge; + let direction = self.direction; + self.deque.extend( + self + .graph + .pg + .edges_directed(id, self.direction) + .filter_map(|edge_ref| { + if should_walk_edge(edge_ref.id()) { + let node = match direction { + Direction::Outgoing => edge_ref.target(), + Direction::Incoming => edge_ref.source(), + }; + Some(node) + } else { + None + } + }), + ); return Some(id); } diff --git a/src/rust/engine/graph/src/node.rs b/src/rust/engine/graph/src/node.rs index 4bd35f37151..28cd1811afa 100644 --- a/src/rust/engine/graph/src/node.rs +++ b/src/rust/engine/graph/src/node.rs @@ -7,13 +7,13 @@ use std::hash::Hash; use async_trait::async_trait; -use petgraph::stable_graph; - -use crate::entry::Entry; +use crate::entry::{Entry, RunToken}; use crate::Graph; // 2^32 Nodes ought to be more than enough for anyone! -pub type EntryId = stable_graph::NodeIndex; +// TODO: Consider renaming to NodeId. +pub type EntryId = petgraph::graph::NodeIndex; +pub type EdgeId = petgraph::graph::EdgeIndex; /// /// Defines executing a cacheable/memoizable step within the given NodeContext. @@ -77,20 +77,28 @@ pub trait NodeContext: Clone + Send + Sync + 'static { /// particular: an uncacheable (Node::cacheable) Node will execute once per Run, regardless /// of other invalidation. /// - type RunId: Clone + Debug + Eq + Send; + type SessionId: Clone + Debug + Eq + Send; /// - /// Creates a clone of this NodeContext to be used for a different Node. + /// Creates a clone of this NodeContext to be used for a different Node, or different run of the + /// same Node. + /// + /// To clone a Context for use for the same run of the same Node, `Clone` is used directly. + /// + fn clone_for(&self, entry_id: EntryId, run_token: RunToken) -> ::Context; + /// - /// To clone a Context for use for the same Node, `Clone` is used directly. + /// If this Context is associated with a run of a particular Node, returns its EntryId and + /// RunToken. A Context used at the root of the Graph will not be associated with any particular + /// Node, but all other Contexts are created via `clone_for` for a particular Node's run. /// - fn clone_for(&self, entry_id: EntryId) -> ::Context; + fn entry_id_and_run_token(&self) -> Option<(EntryId, RunToken)>; /// - /// Returns the RunId for this Context, which should uniquely identify a caller's run for the - /// purposes of "once per Run" behaviour. + /// Returns the SessionId for this Context, which should uniquely identify a caller's session for + /// the purposes of "once per Session" behaviour. /// - fn run_id(&self) -> &Self::RunId; + fn session_id(&self) -> &Self::SessionId; /// /// Returns a reference to the Graph for this Context. diff --git a/src/rust/engine/graph/src/tests.rs b/src/rust/engine/graph/src/tests.rs index c7c6f200181..339aa22a0cf 100644 --- a/src/rust/engine/graph/src/tests.rs +++ b/src/rust/engine/graph/src/tests.rs @@ -11,7 +11,7 @@ use parking_lot::Mutex; use rand::{self, Rng}; use tokio::time::{delay_for, timeout, Elapsed}; -use crate::{EntryId, Graph, InvalidationResult, Node, NodeContext, NodeError}; +use crate::{EdgeType, EntryId, Graph, InvalidationResult, Node, NodeContext, NodeError, RunToken}; #[tokio::test] async fn create() { @@ -84,7 +84,7 @@ async fn invalidate_and_rerun() { // Request with a different salt, which will cause both the middle and upper nodes to rerun since // their input values have changed. - let context = context.new_run(1).with_salt(1); + let context = context.new_session(1).with_salt(1); assert_eq!( graph.create(TNode::new(2), &context).await, Ok(vec![T(0, 0), T(1, 1), T(2, 1)]) @@ -136,8 +136,8 @@ async fn invalidate_randomly() { let graph = Arc::new(Graph::new()); let invalidations = 10; - let sleep_per_invalidation = Duration::from_millis(100); - let range = 100; + let sleep_per_invalidation = Duration::from_millis(500); + let range = 1000; // Spawn a background thread to randomly invalidate in the relevant range. Hold its handle so // it doesn't detach. @@ -236,7 +236,6 @@ async fn poll_cacheable() { #[tokio::test] async fn poll_uncacheable() { - let _logger = env_logger::try_init(); let graph = Arc::new(Graph::new()); // Create a context where the middle node is uncacheable. let context = { @@ -290,7 +289,7 @@ async fn dirty_dependents_of_uncacheable_node() { ); // Re-request the root in a new session and confirm that only the bottom node re-runs. - let context = context.new_run(1); + let context = context.new_session(1); assert_eq!( graph.create(TNode::new(2), &context).await, Ok(vec![T(0, 0), T(1, 0), T(2, 0)]) @@ -299,7 +298,7 @@ async fn dirty_dependents_of_uncacheable_node() { // Re-request with a new session and different salt, and confirm that everything re-runs bottom // up (the order of node cleaning). - let context = context.new_run(2).with_salt(1); + let context = context.new_session(2).with_salt(1); assert_eq!( graph.create(TNode::new(2), &context).await, Ok(vec![T(0, 1), T(1, 1), T(2, 1)]) @@ -339,17 +338,13 @@ async fn uncachable_node_only_runs_once() { graph.create(TNode::new(2), &context).await, Ok(vec![T(0, 0), T(1, 0), T(2, 0)]) ); - // TNode(0) and TNode(2) are cleared and dirtied (respectively) before completing, and - // so run twice each. But the uncacheable node runs once. assert_eq!( - context.runs(), - vec![ - TNode::new(2), - TNode::new(1), - TNode::new(0), - TNode::new(2), - TNode::new(0), - ] + context + .runs() + .into_iter() + .filter(|n| *n == TNode::new(1)) + .count(), + 1, ); } @@ -434,22 +429,6 @@ async fn canceled_immediately() { ); } -#[tokio::test] -async fn cyclic_failure() { - // Confirms that an attempt to create a cycle fails. - let graph = Arc::new(Graph::new()); - let top = TNode::new(2); - let context = TContext::new(graph.clone()).with_dependencies( - // Request creation of a cycle by sending the bottom most node to the top. - vec![(TNode::new(0), Some(top))].into_iter().collect(), - ); - - assert_eq!( - graph.create(TNode::new(2), &context).await, - Err(TError::Cyclic) - ); -} - #[tokio::test] async fn cyclic_dirtying() { // Confirms that a dirtied path between two nodes is able to reverse direction while being @@ -483,6 +462,82 @@ async fn cyclic_dirtying() { assert_eq!(res, Ok(vec![T(1, 1), T(2, 1)])); } +#[tokio::test] +async fn cyclic_strong_strong() { + // A cycle between two nodes with strong edges. + let (graph, context) = cyclic_references(vec![]); + assert_eq!( + graph.create(TNode::new(1), &context).await, + Err(TError::Cyclic) + ); + assert_eq!( + graph.create(TNode::new(0), &context).await, + Err(TError::Cyclic) + ); +} + +#[tokio::test] +async fn cyclic_strong_weak_with_strong_first() { + // A cycle between two nodes with a strong dep from top to bottom and a weak dep from bottom + // to top, where we enter from the top first. + let _logger = env_logger::try_init(); + + let (graph, context) = cyclic_references(vec![TNode::new(1)]); + assert_eq!( + graph.create(TNode::new(1), &context).await, + Ok(vec![T(0, 0), T(1, 0)]) + ); + assert_eq!( + graph.create(TNode::new(0), &context).await, + Ok(vec![T(0, 0), T(1, 0), T(0, 0)]) + ); +} + +/// +/// TODO: Ignored due to https://github.com/pantsbuild/pants/issues/10229. +/// +#[tokio::test] +#[ignore] +async fn cyclic_strong_weak_with_weak_first() { + // A cycle between two nodes with a strong dep from top to bottom and a weak dep from bottom + // to top, where we enter from the bottom first. + let (graph, context) = cyclic_references(vec![TNode::new(1)]); + assert_eq!( + graph.create(TNode::new(0), &context).await, + Ok(vec![T(0, 0), T(1, 0), T(0, 0)]) + ); + assert_eq!( + graph.create(TNode::new(1), &context).await, + Ok(vec![T(0, 0), T(1, 0)]) + ); +} + +#[tokio::test] +async fn cyclic_weak_weak() { + // A cycle between two nodes, both with weak edges. + let (graph, context) = cyclic_references(vec![TNode::new(0), TNode::new(1)]); + assert_eq!( + graph.create(TNode::new(1), &context).await, + Ok(vec![T(0, 0), T(1, 0)]) + ); + assert_eq!( + graph.create(TNode::new(0), &context).await, + Ok(vec![T(1, 0), T(0, 0)]) + ); +} + +fn cyclic_references(weak: Vec) -> (Arc>, TContext) { + let graph = Arc::new(Graph::new()); + let top = TNode::new(1); + let context = TContext::new(graph.clone()) + .with_dependencies( + // Request creation of a cycle by sending the bottom most node to the top. + vec![(TNode::new(0), Some(top))].into_iter().collect(), + ) + .with_weak(weak.into_iter().collect()); + (graph, context) +} + #[tokio::test] async fn critical_path() { use super::entry::Entry; @@ -539,7 +594,9 @@ async fn critical_path() { for (src, dst) in &deps { let src = inner.nodes[&node_key(src)]; let dst = inner.nodes[&node_key(dst)]; - inner.pg.add_edge(src, dst, 1.0); + inner + .pg + .add_edge(src, dst, (EdgeType::Strong, RunToken::initial())); } } @@ -696,11 +753,13 @@ impl TNode { /// #[derive(Clone)] struct TContext { - run_id: usize, + session_id: usize, // A value that is included in every value computed by this context. Stands in for "the state of the // outside world". A test that wants to "change the outside world" and observe its effect on the // graph should change the salt to do so. salt: usize, + // When dependencies on these nodes are requested, those dependencies will be weak. + weak: Arc>, // A mapping from source to optional destination that drives what values each TNode depends on. // If there is no entry in this map for a node, then TNode::run will default to requesting // the next smallest node. Finally, if a None entry is present, a node will have no @@ -711,34 +770,39 @@ struct TContext { graph: Arc>, aborts: Arc>>, runs: Arc>>, - entry_id: Option, + entry_id_and_run_token: Option<(EntryId, RunToken)>, } impl NodeContext for TContext { type Node = TNode; - type RunId = usize; + type SessionId = usize; - fn clone_for(&self, entry_id: EntryId) -> TContext { + fn clone_for(&self, entry_id: EntryId, run_token: RunToken) -> TContext { TContext { - run_id: self.run_id, + session_id: self.session_id, salt: self.salt, + weak: self.weak.clone(), edges: self.edges.clone(), delays: self.delays.clone(), uncacheable: self.uncacheable.clone(), graph: self.graph.clone(), aborts: self.aborts.clone(), runs: self.runs.clone(), - entry_id: Some(entry_id), + entry_id_and_run_token: Some((entry_id, run_token)), } } - fn run_id(&self) -> &usize { - &self.run_id + fn session_id(&self) -> &usize { + &self.session_id } fn graph(&self) -> &Graph { &self.graph } + fn entry_id_and_run_token(&self) -> Option<(EntryId, RunToken)> { + self.entry_id_and_run_token + } + fn spawn(&self, future: F) where F: Future + Send + 'static, @@ -751,18 +815,24 @@ impl NodeContext for TContext { impl TContext { fn new(graph: Arc>) -> TContext { TContext { - run_id: 0, + session_id: 0, salt: 0, + weak: Arc::default(), edges: Arc::default(), delays: Arc::default(), uncacheable: Arc::default(), graph, - aborts: Arc::new(Mutex::new(Vec::new())), - runs: Arc::new(Mutex::new(Vec::new())), - entry_id: None, + aborts: Arc::default(), + runs: Arc::default(), + entry_id_and_run_token: None, } } + fn with_weak(mut self, weak: HashSet) -> TContext { + self.weak = Arc::new(weak); + self + } + fn with_dependencies(mut self, edges: HashMap>) -> TContext { self.edges = Arc::new(edges); self @@ -783,8 +853,8 @@ impl TContext { self } - fn new_run(mut self, new_run_id: usize) -> TContext { - self.run_id = new_run_id; + fn new_session(mut self, new_session_id: usize) -> TContext { + self.session_id = new_session_id; { let mut runs = self.runs.lock(); runs.clear(); @@ -797,7 +867,17 @@ impl TContext { } async fn get(&self, dst: TNode) -> Result, TError> { - self.graph.get(self.entry_id, self, dst).await + if self.weak.contains(&dst) { + Ok( + self + .graph + .get_weak(self, dst) + .await? + .unwrap_or_else(Vec::new), + ) + } else { + self.graph.get(self, dst).await + } } fn abort_guard(&self, node: TNode) -> AbortGuard { diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index df4fcbcc5e5..6bf6c45a2b2 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -17,7 +17,7 @@ use crate::tasks::{Rule, Tasks}; use crate::types::Types; use fs::{safe_create_dir_all_ioerror, GitignoreStyleExcludes, PosixFS}; -use graph::{EntryId, Graph, InvalidationResult, NodeContext}; +use graph::{EntryId, Graph, InvalidationResult, NodeContext, RunToken}; use log::info; use process_execution::{ self, speculate::SpeculatingCommandRunner, BoundedCommandRunner, CommandRunner, NamedCaches, @@ -321,20 +321,21 @@ impl Deref for InvalidatableGraph { #[derive(Clone)] pub struct Context { - entry_id: Option, + entry_id_and_run_token: Option<(EntryId, RunToken)>, pub core: Arc, pub session: Session, - run_id: Uuid, + session_run_id: Uuid, } impl Context { pub fn new(core: Arc, session: Session) -> Context { - let run_id = session.run_id(); + // NB: See `impl NodeContext for Context` for more information on this naming. + let session_run_id = session.run_id(); Context { - entry_id: None, + entry_id_and_run_token: None, core, session, - run_id, + session_run_id, } } @@ -342,38 +343,52 @@ impl Context { /// Get the future value for the given Node implementation. /// pub async fn get(&self, node: N) -> Result { - let node_result = self - .core - .graph - .get(self.entry_id, self, node.into()) - .await?; + let node_result = self.core.graph.get(self, node.into()).await?; Ok( node_result .try_into() .unwrap_or_else(|_| panic!("A Node implementation was ambiguous.")), ) } + + /// + /// Same as for Self::get, but returns None if a cycle would be created. + /// + pub async fn get_weak(&self, node: N) -> Result, Failure> { + let node_result = self.core.graph.get_weak(self, node.into()).await?; + Ok(node_result.map(|v| { + v.try_into() + .unwrap_or_else(|_| panic!("A Node implementation was ambiguous.")) + })) + } } impl NodeContext for Context { type Node = NodeKey; - type RunId = Uuid; + // NB: The name "Session" is slightly overloaded between the graph crate and the engine: what the + // graph crate calls a SessionId we refer to as the "run_id of the Session". See + // `scheduler::Session` for more information. + type SessionId = Uuid; /// - /// Clones this Context for a new EntryId. Because the Core of the context is an Arc, this + /// Clones this Context for a new run of a Node. Because the Core of the context is an Arc, this /// is a shallow clone. /// - fn clone_for(&self, entry_id: EntryId) -> Context { + fn clone_for(&self, entry_id: EntryId, run_token: RunToken) -> Context { Context { - entry_id: Some(entry_id), + entry_id_and_run_token: Some((entry_id, run_token)), core: self.core.clone(), session: self.session.clone(), - run_id: self.run_id, + session_run_id: self.session_run_id, } } - fn run_id(&self) -> &Self::RunId { - &self.run_id + fn entry_id_and_run_token(&self) -> Option<(EntryId, RunToken)> { + self.entry_id_and_run_token + } + + fn session_id(&self) -> &Self::SessionId { + &self.session_run_id } fn graph(&self) -> &Graph { diff --git a/src/rust/engine/src/externs/mod.rs b/src/rust/engine/src/externs/mod.rs index 811068984a6..359e4f41a38 100644 --- a/src/rust/engine/src/externs/mod.rs +++ b/src/rust/engine/src/externs/mod.rs @@ -431,8 +431,9 @@ py_class!(pub class PyGeneratorResponseGet |py| { data product: PyType; data declared_subject: PyType; data subject: PyObject; - def __new__(_cls, product: PyType, declared_subject: PyType, subject: PyObject) -> CPyResult { - Self::create_instance(py, product, declared_subject, subject) + data weak: PyBool; + def __new__(_cls, product: PyType, declared_subject: PyType, subject: PyObject, weak: PyBool) -> CPyResult { + Self::create_instance(py, product, declared_subject, subject, weak) } }); @@ -448,6 +449,7 @@ pub struct Get { pub product: TypeId, pub subject: Key, pub declared_subject: Option, + pub weak: bool, } impl Get { @@ -458,6 +460,7 @@ impl Get { .key_insert(py, get.subject(py).clone_ref(py).into()) .map_err(|e| Failure::from_py_err(py, e))?, declared_subject: Some(interns.type_insert(py, get.declared_subject(py).clone_ref(py))), + weak: get.weak(py).is_true(), }) } } diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics.rs index 159a5456b0e..e52248eab04 100644 --- a/src/rust/engine/src/intrinsics.rs +++ b/src/rust/engine/src/intrinsics.rs @@ -16,6 +16,16 @@ use std::path::PathBuf; type IntrinsicFn = Box) -> BoxFuture<'static, NodeResult> + Send + Sync>; +/// +/// Native "rule" implementations. +/// +/// On a case by case basis, intrinsics might have associated Nodes in the Graph in order to memoize +/// their results. For example: `multi_platform_process_request_to_process_result` and +/// `path_globs_to_snapshot` take a while to run because they run a process and capture filesystem +/// state (respectively) and have small in-memory outputs. On the other hand, `digest_to_snapshot` +/// runs a relatively cheap operation (loading a Snapshot from the mmap'ed database) on a small +/// input, and produces a larger output: it is thus not memoized. +/// pub struct Intrinsics { intrinsics: IndexMap, } diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index ec94e602d88..f6a3f92affd 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -93,10 +93,16 @@ pub struct Select { pub params: Params, pub product: TypeId, entry: rule_graph::Entry, + weak: bool, } impl Select { - pub fn new(mut params: Params, product: TypeId, entry: rule_graph::Entry) -> Select { + pub fn new( + mut params: Params, + product: TypeId, + entry: rule_graph::Entry, + weak: bool, + ) -> Select { params.retain(|k| match &entry { &rule_graph::Entry::Param(ref type_id) => type_id == k.type_id(), &rule_graph::Entry::WithDeps(ref with_deps) => with_deps.params().contains(k.type_id()), @@ -105,6 +111,7 @@ impl Select { params, product, entry, + weak, } } @@ -119,7 +126,7 @@ impl Select { .entry_for(&dependency_key) .unwrap_or_else(|| panic!("{:?} did not declare a dependency on {:?}", edges, product)) .clone(); - Select::new(params, product, entry) + Select::new(params, product, entry, false) } async fn select_product( @@ -145,8 +152,9 @@ impl Select { } } -// TODO: This is a Node only because it is used as a root in the graph, but it should never be -// requested using context.get +// NB: This is a Node only to allow it to be used as a root in the graph, but it should not be +// requested using context.get, because it requests only memoizable Nodes itself (sometimes +// indirectly via intrinsics). #[async_trait] impl WrappedNode for Select { type Item = Value; @@ -155,15 +163,23 @@ impl WrappedNode for Select { match &self.entry { &rule_graph::Entry::WithDeps(rule_graph::EntryWithDeps::Inner(ref inner)) => { match inner.rule() { - &tasks::Rule::Task(ref task) => context - .get(Task { + &tasks::Rule::Task(ref task) => { + let node = Task { params: self.params.clone(), product: self.product, task: task.clone(), entry: Arc::new(self.entry.clone()), - }) - .await - .map(|output| output.value), + }; + if self.weak { + if let Some(output) = context.get_weak(node).await? { + Ok(output.value) + } else { + Ok(externs::none().into()) + } + } else { + Ok(context.get(node).await?.value) + } + } &Rule::Intrinsic(ref intrinsic) => { let intrinsic = intrinsic.clone(); let values = future::try_join_all( @@ -801,7 +817,7 @@ impl Task { params.put(get.subject); async move { let entry = entry_res?; - Select::new(params, get.product, entry) + Select::new(params, get.product, entry, get.weak) .run_wrapped_node(context.clone()) .await } diff --git a/src/rust/engine/src/scheduler.rs b/src/rust/engine/src/scheduler.rs index 5034eae227e..356cd7a16c1 100644 --- a/src/rust/engine/src/scheduler.rs +++ b/src/rust/engine/src/scheduler.rs @@ -66,6 +66,8 @@ struct InnerSession { // entire Session, but in some cases (in particular, a `--loop`) the caller wants to retain the // same Session while still observing new values for uncacheable rules like Goals. // + // See `impl NodeContext for Context` for more information. + // // TODO: Figure out how the `--loop` interplays with metrics. It's possible that for metrics // purposes, each iteration of a loop should be considered to be a new Session, but for now the // Session/build_id would be stable.