Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move graph cycle detection to rust. (cherrypick of #11202) #11203

Merged
merged 1 commit into from Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 10 additions & 35 deletions src/python/pants/engine/internals/graph.py
Expand Up @@ -7,7 +7,7 @@
import os.path
from dataclasses import dataclass
from pathlib import PurePath
from typing import Dict, Iterable, List, NamedTuple, Optional, Sequence, Set, Tuple, Type
from typing import Dict, Iterable, List, NamedTuple, Optional, Sequence, Tuple, Type

from pants.base.exceptions import ResolveError
from pants.base.specs import (
Expand Down Expand Up @@ -37,6 +37,7 @@
Snapshot,
SourcesSnapshot,
)
from pants.engine.internals.native_engine import cyclic_paths
from pants.engine.internals.target_adaptor import TargetAdaptor
from pants.engine.rules import Get, MultiGet, collect_rules, rule
from pants.engine.target import (
Expand Down Expand Up @@ -247,21 +248,18 @@ def __init__(self, subject: Address, path: Tuple[Address, ...]) -> None:
self.path = path


def _detect_cycles(
roots: Tuple[Address, ...], dependency_mapping: Dict[Address, Tuple[Address, ...]]
) -> None:
path_stack: OrderedSet[Address] = OrderedSet()
visited: Set[Address] = set()
def _detect_cycles(dependency_mapping: Dict[Address, Tuple[Address, ...]]) -> None:
for path in cyclic_paths(dependency_mapping):
address = path[-1]

def maybe_report_cycle(address: Address) -> None:
# NB: File-level dependencies are cycle tolerant.
if not address.is_base_target or address not in path_stack:
if not address.is_base_target:
return

# The path of the cycle is shorter than the entire path to the cycle: if the suffix of
# the path representing the cycle contains a file dep, it is ignored.
in_cycle = False
for path_address in path_stack:
for path_address in path:
if in_cycle and not path_address.is_base_target:
# There is a file address inside the cycle: do not report it.
return
Expand All @@ -273,27 +271,7 @@ def maybe_report_cycle(address: Address) -> None:
# the address in question.
in_cycle = path_address == address
# If we did not break out early, it's because there were no file addresses in the cycle.
raise CycleException(address, (*path_stack, address))

def visit(address: Address):
if address in visited:
maybe_report_cycle(address)
return
path_stack.add(address)
visited.add(address)

for dep_address in dependency_mapping[address]:
visit(dep_address)

path_stack.remove(address)

for root in roots:
visit(root)
if path_stack:
raise AssertionError(
f"The stack of visited nodes should have been empty at the end of recursion, "
f"but it still contained: {path_stack}"
)
raise CycleException(address, path)


@rule(desc="Resolve transitive targets")
Expand Down Expand Up @@ -332,10 +310,7 @@ async def transitive_targets(request: TransitiveTargetsRequest) -> TransitiveTar
)
visited.update(queued)

# NB: We use `roots_as_targets` to get the root addresses, rather than `request.roots`. This
# is because expanding from the `Addresses` -> `Targets` may have resulted in generated
# subtargets being used, so we need to use `roots_as_targets` to have this expansion.
_detect_cycles(tuple(t.address for t in roots_as_targets), dependency_mapping)
_detect_cycles(dependency_mapping)

# Apply any transitive excludes (`!!` ignores).
transitive_excludes: FrozenOrderedSet[Target] = FrozenOrderedSet()
Expand Down Expand Up @@ -392,7 +367,7 @@ async def transitive_targets_lite(request: TransitiveTargetsRequestLite) -> Tran
# NB: We use `roots_as_targets` to get the root addresses, rather than `request.roots`. This
# is because expanding from the `Addresses` -> `Targets` may have resulted in generated
# subtargets being used, so we need to use `roots_as_targets` to have this expansion.
_detect_cycles(tuple(t.address for t in roots_as_targets), dependency_mapping)
_detect_cycles(dependency_mapping)

# Apply any transitive excludes (`!!` ignores).
wrapped_transitive_excludes = await MultiGet(
Expand Down
6 changes: 5 additions & 1 deletion src/python/pants/engine/internals/native_engine.pyi
@@ -1,9 +1,13 @@
from typing import Any
from typing import Any, Dict, Tuple, TypeVar

# TODO: black and flake8 disagree about the content of this file:
# see https://github.com/psf/black/issues/1548
# flake8: noqa: E302

T = TypeVar("T")

def cyclic_paths(adjacencies: Dict[T, Tuple[T, ...]]) -> Tuple[Tuple[T]]: ...

class PyDigest:
def __init__(self, fingerprint: str, serialized_bytes_length: int) -> None: ...
@property
Expand Down
54 changes: 54 additions & 0 deletions src/rust/engine/src/core.rs
Expand Up @@ -3,6 +3,7 @@

use fnv::FnvHasher;

use std::collections::HashSet;
use std::convert::AsRef;
use std::ops::Deref;
use std::sync::Arc;
Expand All @@ -11,6 +12,7 @@ use std::{fmt, hash};
use crate::externs;

use cpython::{FromPyObject, PyClone, PyDict, PyErr, PyObject, PyResult, Python, ToPyObject};
use indexmap::{IndexMap, IndexSet};
use smallvec::SmallVec;

pub type FNV = hash::BuildHasherDefault<FnvHasher>;
Expand Down Expand Up @@ -410,3 +412,55 @@ pub fn throw(msg: &str) -> Failure {
engine_traceback: Vec::new(),
}
}

///
/// Given a graph represented as an adjacency list, return a collection of cyclic paths.
///
pub fn cyclic_paths<N: hash::Hash + Eq + Copy>(adjacencies: IndexMap<N, Vec<N>>) -> Vec<Vec<N>> {
let mut cyclic_paths = Vec::new();
let mut path_stack = IndexSet::new();
let mut visited = HashSet::new();

for node in adjacencies.keys() {
cyclic_paths_visit(
*node,
&adjacencies,
&mut cyclic_paths,
&mut path_stack,
&mut visited,
);
}

cyclic_paths
}

fn cyclic_paths_visit<N: hash::Hash + Eq + Copy>(
node: N,
adjacencies: &IndexMap<N, Vec<N>>,
cyclic_paths: &mut Vec<Vec<N>>,
path_stack: &mut IndexSet<N>,
visited: &mut HashSet<N>,
) {
if visited.contains(&node) {
if path_stack.contains(&node) {
cyclic_paths.push(
path_stack
.iter()
.cloned()
.chain(std::iter::once(node))
.collect::<Vec<_>>(),
);
}
return;
}
path_stack.insert(node);
visited.insert(node);

if let Some(adjacent) = adjacencies.get(&node) {
for dep_node in adjacent {
cyclic_paths_visit(*dep_node, adjacencies, cyclic_paths, path_stack, visited);
}
}

path_stack.remove(&node);
}
39 changes: 38 additions & 1 deletion src/rust/engine/src/externs/interface.rs
Expand Up @@ -61,6 +61,7 @@ use futures::future::FutureExt;
use futures::future::{self as future03, TryFutureExt};
use futures01::Future;
use hashing::{Digest, EMPTY_DIGEST};
use indexmap::IndexMap;
use log::{self, debug, error, warn, Log};
use logging::logger::PANTS_LOGGER;
use logging::{Destination, Logger, PythonLogLevel};
Expand All @@ -72,7 +73,8 @@ use workunit_store::{Workunit, WorkunitState};

use crate::{
externs, nodes, Core, ExecutionRequest, ExecutionStrategyOptions, ExecutionTermination, Failure,
Function, Intrinsics, Params, RemotingOptions, Rule, Scheduler, Session, Tasks, Types, Value,
Function, Intrinsics, Key, Params, RemotingOptions, Rule, Scheduler, Session, Tasks, Types,
Value,
};

py_exception!(native_engine, PollTimeout);
Expand All @@ -85,6 +87,8 @@ py_module_initializer!(native_engine, |py, m| {

m.add(py, "default_config_path", py_fn!(py, default_config_path()))?;

m.add(py, "cyclic_paths", py_fn!(py, cyclic_paths(a: PyDict)))?;

m.add(
py,
"init_logging",
Expand Down Expand Up @@ -1645,6 +1649,39 @@ fn default_config_path(py: Python) -> CPyResult<String> {
})
}

fn cyclic_paths(py: Python, adjacencies: PyDict) -> CPyResult<Vec<PyTuple>> {
let adjacencies = adjacencies
.items(py)
.into_iter()
.map(|(k, v)| {
let node = externs::key_for(k.into())?;
let adjacent = v
.extract::<Vec<PyObject>>(py)?
.into_iter()
.map(|v| externs::key_for(v.into()))
.collect::<Result<Vec<Key>, _>>()?;
let res: Result<_, PyErr> = Ok((node, adjacent));
res
})
.collect::<Result<IndexMap<Key, Vec<Key>>, _>>()?;
let paths = py.allow_threads(move || crate::core::cyclic_paths(adjacencies));

Ok(
paths
.into_iter()
.map(|path| {
let gil = Python::acquire_gil();
let path_vec = path
.iter()
.map(externs::val_for)
.map(|node| node.consume_into_py_object(gil.python()))
.collect::<Vec<_>>();
PyTuple::new(gil.python(), &path_vec)
})
.collect(),
)
}

fn init_logging(
py: Python,
level: u64,
Expand Down