Skip to content

Commit

Permalink
Add support for cycle-tolerant "weak" Gets (#10230)
Browse files Browse the repository at this point in the history
### Problem

As covered in #10059: in order to support fine-grained dependency inference on real world graphs (which very frequently involve file-level dependency cycles), we'd like to very carefully introduce a form of cycle-tolerance.

### Solution

Introduce the concept of "weak" `Get`s (exposed via `await Get(.., weak=True)`) which can return `None` if a cycle would be formed that would cause a `@rule` to depend on its own result. A weak `Get` is intended to be similar in practice to a [weak reference](https://en.wikipedia.org/wiki/Weak_reference), and correspondingly non-weak edges are referred to as "strong" edges.

A few changes were made to the `Graph` to support this, but the most fundamental was that we now allow cycles in the `Graph`, as long as they don't involve any running `Node`s. As described in the docstring for `graph::Graph`, we record dependencies for two reasons: 1) invalidation, and 2) deadlock detection. Invalidation is not concerned by cycles, and a deadlock can only occur in running code and thus does not need to walk non-live edges.

### Result

Tests demonstrate that `@rules` are able to successfully consume "weak-weak" cycles, where a graph like:
```
digraph {
  A -> B [label=weak, style=dashed];
  B -> A [label=weak, style=dashed];
}
```
... produces the set `{A, B}` when entered from either direction. Followup work will additionally allow for "strong-weak" cycles, which "mostly" work right now, but which currently have behavior that depends on where the cycle is entered. See #10229 and the ignored test in this change.
  • Loading branch information
stuhood committed Jul 17, 2020
1 parent 301c9cf commit 0ad3a56
Show file tree
Hide file tree
Showing 12 changed files with 669 additions and 395 deletions.
4 changes: 2 additions & 2 deletions src/python/pants/engine/internals/native.py
Expand Up @@ -69,14 +69,14 @@ def generator_send(
if isinstance(res, Get):
# Get.
return PyGeneratorResponseGet(
res.product_type, res.subject_declared_type, res.subject,
res.product_type, res.subject_declared_type, res.subject, res.weak,
)
elif type(res) in (tuple, list):
# GetMulti.
return PyGeneratorResponseGetMulti(
tuple(
PyGeneratorResponseGet(
get.product_type, get.subject_declared_type, get.subject,
get.product_type, get.subject_declared_type, get.subject, get.weak,
)
for get in res
)
Expand Down
21 changes: 20 additions & 1 deletion src/python/pants/engine/internals/scheduler_test.py
Expand Up @@ -5,7 +5,7 @@
from contextlib import contextmanager
from dataclasses import dataclass
from textwrap import dedent
from typing import Any
from typing import Any, FrozenSet

from pants.engine.internals.scheduler import ExecutionError
from pants.engine.rules import RootRule, rule
Expand Down Expand Up @@ -116,6 +116,20 @@ async def error_msg_test_rule(union_wrapper: UnionWrapper) -> UnionX:
raise AssertionError("The statement above this one should have failed!")


class BooleanDeps(FrozenSet[bool]):
pass


@rule
async def boolean_cycle(key: bool) -> BooleanDeps:
"""A rule with exactly two instances (bool == two keys), which depend on one another weakly."""
deps = {key}
dep = await Get(BooleanDeps, bool, not key, weak=True)
if dep is not None:
deps.update(dep)
return BooleanDeps(deps)


class TypeCheckFailWrapper:
"""This object wraps another object which will be used to demonstrate a type check failure when
the engine processes an `await Get(...)` statement."""
Expand Down Expand Up @@ -181,6 +195,7 @@ def rules(cls):
RootRule(UnionB),
select_union_b,
a_union_test,
boolean_cycle,
boolean_and_int,
RootRule(int),
RootRule(bool),
Expand Down Expand Up @@ -230,6 +245,10 @@ def test_strict_equals(self):
# type of a value in equality.
assert A() == self.request_single_product(A, Params(1, True))

def test_weak_gets(self):
assert {True, False} == set(self.request_single_product(BooleanDeps, True))
assert {True, False} == set(self.request_single_product(BooleanDeps, False))

@contextmanager
def _assert_execution_error(self, expected_msg):
with assert_execution_error(self, expected_msg):
Expand Down
19 changes: 14 additions & 5 deletions src/python/pants/engine/selectors.py
Expand Up @@ -41,17 +41,19 @@ class Get(GetConstraints, Generic[_ProductType, _SubjectType]):
A Get can be constructed in 2 ways with two variants each:
+ Long form:
a. Get(<ProductType>, <SubjectType>, subject)
b. Get[<ProductType>](<SubjectType>, subject)
Get(<ProductType>, <SubjectType>, subject)
+ Short form
a. Get(<ProductType>, <SubjectType>(<constructor args for subject>))
b. Get[<ProductType>](<SubjectType>(<constructor args for subject>))
Get(<ProductType>, <SubjectType>(<constructor args for subject>))
The long form supports providing type information to the rule engine that it could not otherwise
infer from the subject variable [1]. Likewise, the short form must use inline construction of the
subject in order to convey the subject type to the engine.
The `weak` parameter is an experimental extension: a "weak" Get will return None rather than the
requested value iff the dependency caused by the Get would create a cycle in the dependency
graph.
[1] The engine needs to determine all rule and Get input and output types statically before
executing any rules. Since Gets are declared inside function bodies, the only way to extract this
information is through a parse of the rule function. The parse analysis is rudimentary and cannot
Expand All @@ -61,7 +63,9 @@ class Get(GetConstraints, Generic[_ProductType, _SubjectType]):
"""

@overload
def __init__(self, product_type: Type[_ProductType], subject_arg0: _SubjectType) -> None:
def __init__(
self, product_type: Type[_ProductType], subject_arg0: _SubjectType, *, weak: bool = False
) -> None:
...

@overload
Expand All @@ -70,6 +74,8 @@ def __init__(
product_type: Type[_ProductType],
subject_arg0: Type[_SubjectType],
subject_arg1: _SubjectType,
*,
weak: bool = False,
) -> None:
...

Expand All @@ -78,6 +84,8 @@ def __init__(
product_type: Type[_ProductType],
subject_arg0: Union[Type[_SubjectType], _SubjectType],
subject_arg1: Optional[_SubjectType] = None,
*,
weak: bool = False,
) -> None:
self.product_type = product_type
self.subject_declared_type: Type[_SubjectType] = self._validate_subject_declared_type(
Expand All @@ -86,6 +94,7 @@ def __init__(
self.subject: _SubjectType = self._validate_subject(
subject_arg1 if subject_arg1 is not None else subject_arg0
)
self.weak = weak

self._validate_product()

Expand Down
118 changes: 60 additions & 58 deletions src/rust/engine/graph/src/entry.rs
Expand Up @@ -25,6 +25,13 @@ impl RunToken {
fn next(self) -> RunToken {
RunToken(self.0 + 1)
}

///
/// Returns true if "other" is equal to this RunToken, or this RunToken's predecessor.
///
pub fn equals_current_or_previous(&self, other: RunToken) -> bool {
self.0 == other.0 || other.next().0 == self.0
}
}

///
Expand Down Expand Up @@ -52,33 +59,29 @@ impl Generation {
///
/// A result from running a Node.
///
/// If the value is Dirty, the consumer should check whether the dependencies of the Node have the
/// same values as they did when this Node was last run; if so, the value can be re-used
/// (and should be marked "Clean").
///
/// If the value is Uncacheable it may only be consumed in the same Run that produced it, and should
/// be recomputed in a new Run.
///
/// A value of type UncacheableDependencies has Uncacheable dependencies, and is treated as
/// equivalent to Dirty in all cases except when `poll`d: since `poll` requests are waiting for
/// meaningful work to do, they need to differentiate between a truly invalidated/changed (Dirty)
/// Node and a Node that would be re-cleaned once per session.
///
/// If the value is Clean, the consumer can simply use the value as-is.
///
#[derive(Clone, Debug)]
pub enum EntryResult<N: Node> {
// The value is Clean, and the consumer can simply use it as-is.
Clean(N::Item),
UncacheableDependencies(N::Item),
// If the value is Dirty, the consumer should check whether the dependencies of the Node have the
// same values as they did when this Node was last run; if so, the value can be re-used
// (and should be marked "Clean").
Dirty(N::Item),
Uncacheable(N::Item, <<N as Node>::Context as NodeContext>::RunId),
// Uncacheable values may only be consumed in the same Session that produced them, and should
// be recomputed in a new Session.
Uncacheable(N::Item, <<N as Node>::Context as NodeContext>::SessionId),
// A value of type UncacheableDependencies has Uncacheable dependencies, and is treated as
// equivalent to Dirty in all cases except when `poll`d: since `poll` requests are waiting for
// meaningful work to do, they need to differentiate between a truly invalidated/changed (Dirty)
// Node and a Node that would be re-cleaned once per session.
UncacheableDependencies(N::Item),
}

impl<N: Node> EntryResult<N> {
fn is_clean(&self, context: &N::Context) -> bool {
match self {
EntryResult::Clean(..) => true,
EntryResult::Uncacheable(_, run_id) => context.run_id() == run_id,
EntryResult::Uncacheable(_, session_id) => context.session_id() == session_id,
EntryResult::Dirty(..) => false,
EntryResult::UncacheableDependencies(..) => false,
}
Expand All @@ -95,7 +98,7 @@ impl<N: Node> EntryResult<N> {
/// currently to clean it).
fn poll_should_wait(&self, context: &N::Context) -> bool {
match self {
EntryResult::Uncacheable(_, run_id) => context.run_id() == run_id,
EntryResult::Uncacheable(_, session_id) => context.session_id() == session_id,
EntryResult::Dirty(..) => false,
EntryResult::UncacheableDependencies(_) | EntryResult::Clean(..) => true,
}
Expand Down Expand Up @@ -286,25 +289,38 @@ impl<N: Node> Entry<N> {
previous_result: Option<EntryResult<N>>,
) -> EntryState<N> {
// Increment the RunToken to uniquely identify this work.
let previous_run_token = run_token;
let run_token = run_token.next();
let context = context_factory.clone_for(entry_id);
let context = context_factory.clone_for(entry_id, run_token);
let node = node.clone();
let (abort_handle, abort_registration) = AbortHandle::new_pair();
trace!(
"Running node {:?} with {:?}. It was: previous_result={:?}",
node,
run_token,
previous_result,
);

context_factory.spawn(async move {
// If we have previous result generations, compare them to all current dependency
// generations (which, if they are dirty, will cause recursive cleaning). If they
// match, we can consider the previous result value to be clean for reuse.
let was_clean = if let Some(previous_dep_generations) = previous_dep_generations {
match context.graph().dep_generations(entry_id, &context).await {
trace!("Getting deps to attempt to clean {}", node);
match context
.graph()
.dep_generations(entry_id, previous_run_token, &context)
.await
{
Ok(ref dep_generations) if dep_generations == &previous_dep_generations => {
trace!("Deps matched: {} is clean.", node);
// Dependencies have not changed: Node is clean.
true
}
_ => {
// If dependency generations mismatched or failed to fetch, clear its
// dependencies and indicate that it should re-run.
context.graph().clear_deps(entry_id, run_token);
// If dependency generations mismatched or failed to fetch, indicate that the Node
// should re-run.
trace!("Deps did not match: {} needs to re-run.", node);
false
}
}
Expand Down Expand Up @@ -403,11 +419,6 @@ impl<N: Node> Entry<N> {
result,
dep_generations,
} => {
trace!(
"Re-starting node {:?}. It was: previous_result={:?}",
self.node,
result,
);
assert!(
!result.is_clean(context),
"A clean Node should not reach this point: {:?}",
Expand Down Expand Up @@ -453,20 +464,14 @@ impl<N: Node> Entry<N> {
/// result should be used. This special case exists to avoid 1) cloning the result to call this
/// method, and 2) comparing the current/previous results unnecessarily.
///
/// Takes a &mut InnerGraph to ensure that completing nodes doesn't race with dirtying them.
/// The important relationship being guaranteed here is that if the Graph is calling
/// invalidate_from_roots, it may mark us, or our dependencies, as dirty. We don't want to
/// complete _while_ a batch of nodes are being marked as dirty, and this exclusive access ensures
/// that can't happen.
///
pub(crate) fn complete(
&mut self,
&self,
context: &N::Context,
result_run_token: RunToken,
dep_generations: Vec<Generation>,
result: Option<Result<N::Item, N::Error>>,
has_uncacheable_deps: bool,
_graph: &mut super::InnerGraph<N>,
has_weak_deps: bool,
) {
let mut state = self.state.lock();

Expand All @@ -481,7 +486,6 @@ impl<N: Node> Entry<N> {
"Not completing node {:?} because it was invalidated.",
self.node
);
return;
}
}

Expand All @@ -506,19 +510,22 @@ impl<N: Node> Entry<N> {
}
}
Some(Ok(result)) => {
// If the new result does not match the previous result, the generation increments.
let next_result: EntryResult<N> = if !self.node.cacheable() {
EntryResult::Uncacheable(result, context.run_id().clone())
EntryResult::Uncacheable(result, context.session_id().clone())
} else if has_weak_deps {
EntryResult::Dirty(result)
} else if has_uncacheable_deps {
EntryResult::UncacheableDependencies(result)
} else {
EntryResult::Clean(result)
};
// If the new result does not match the previous result, the generation increments.
if Some(next_result.as_ref()) != previous_result.as_ref().map(EntryResult::as_ref) {
// Node was re-executed (ie not cleaned) and had a different result value.
generation = generation.next()
};
self.notify_waiters(waiters, Ok((next_result.as_ref().clone(), generation)));

EntryState::Completed {
result: next_result,
pollers: Vec::new(),
Expand Down Expand Up @@ -595,10 +602,7 @@ impl<N: Node> Entry<N> {
}

///
/// Get the current RunToken of this entry.
///
/// TODO: Consider moving the Generation and RunToken out of the EntryState once we decide what
/// we want the per-Entry locking strategy to be.
/// Get the RunToken of this entry regardless of whether it is running.
///
pub(crate) fn run_token(&self) -> RunToken {
match *self.state.lock() {
Expand All @@ -609,19 +613,22 @@ impl<N: Node> Entry<N> {
}

///
/// Clears the state of this Node, forcing it to be recomputed.
/// Get the current RunToken of this entry iff it is currently running.
///
/// # Arguments
pub(crate) fn running_run_token(&self) -> Option<RunToken> {
match *self.state.lock() {
EntryState::Running { run_token, .. } => Some(run_token),
_ => None,
}
}

///
/// * `graph_still_contains_edges` - If the caller has guaranteed that all edges from this Node
/// have been removed from the graph, they should pass false here, else true. We may want to
/// remove this parameter, and force this method to remove the edges, but that would require
/// acquiring the graph lock here, which we currently don't do.
/// Clears the state of this Node, forcing it to be recomputed.
///
pub(crate) fn clear(&mut self, graph_still_contains_edges: bool) {
pub(crate) fn clear(&mut self) {
let mut state = self.state.lock();

let (run_token, generation, mut previous_result) =
let (run_token, generation, previous_result) =
match mem::replace(&mut *state, EntryState::initial()) {
EntryState::NotStarted {
run_token,
Expand Down Expand Up @@ -649,13 +656,8 @@ impl<N: Node> Entry<N> {

trace!("Clearing node {:?}", self.node);

if graph_still_contains_edges {
if let Some(previous_result) = previous_result.as_mut() {
previous_result.dirty();
}
}

// Swap in a state with a new RunToken value, which invalidates any outstanding work.
// Swap in a state with a new RunToken value, which invalidates any outstanding work and all
// edges for the previous run.
*state = EntryState::NotStarted {
run_token: run_token.next(),
generation,
Expand Down

0 comments on commit 0ad3a56

Please sign in to comment.