Skip to content

Commit

Permalink
Reduce sections under graph lock
Browse files Browse the repository at this point in the history
These are now guarded by the Entry lock, which is much more granular.
  • Loading branch information
illicitonion committed Jul 11, 2018
1 parent 5734865 commit 2a844f0
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 30 deletions.
1 change: 1 addition & 0 deletions src/rust/engine/graph/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl<N: Node> EntryKey<N> {
///
/// An Entry and its adjacencies.
///
#[derive(Clone)]
pub struct Entry<N: Node> {
// TODO: This is a clone of the Node, which is also kept in the `nodes` map. It would be
// nice to avoid keeping two copies of each Node, but tracking references between the two
Expand Down
69 changes: 39 additions & 30 deletions src/rust/engine/graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,25 +474,28 @@ impl<N: Node> Graph<N> {
where
C: NodeContext<Node = N>,
{
// Get or create the destination, and then insert the dep and return its state.
let mut inner = self.inner.lock().unwrap();
let dst_id = {
// TODO: doing cycle detection under the lock... unfortunate, but probably unavoidable
// without a much more complicated algorithm.
let potential_dst_id = inner.ensure_entry(EntryKey::Valid(dst_node.clone()));
if inner.detect_cycle(src_id, potential_dst_id) {
// Cyclic dependency: declare a dependency on a copy of the Node that is marked Cyclic.
inner.ensure_entry(EntryKey::Cyclic(dst_node))
} else {
// Valid dependency.
potential_dst_id
}
let (maybe_entry, entry_id) = {
// Get or create the destination, and then insert the dep and return its state.
let mut inner = self.inner.lock().unwrap();
let dst_id = {
// TODO: doing cycle detection under the lock... unfortunate, but probably unavoidable
// without a much more complicated algorithm.
let potential_dst_id = inner.ensure_entry(EntryKey::Valid(dst_node.clone()));
if inner.detect_cycle(src_id, potential_dst_id) {
// Cyclic dependency: declare a dependency on a copy of the Node that is marked Cyclic.
inner.ensure_entry(EntryKey::Cyclic(dst_node))
} else {
// Valid dependency.
potential_dst_id
}
};
inner.pg.add_edge(src_id, dst_id, ());
(inner.entry_for_id(dst_id).cloned(), dst_id)
};

// Declare the dep, and return the state of the destination.
inner.pg.add_edge(src_id, dst_id, ());
if let Some(entry) = inner.entry_for_id_mut(dst_id) {
entry.get(context, dst_id).map(|(res, _)| res).to_boxed()
if let Some(mut entry) = maybe_entry {
entry.get(context, entry_id).map(|(res, _)| res).to_boxed()
} else {
future::err(N::Error::invalidated()).to_boxed()
}
Expand All @@ -505,10 +508,13 @@ impl<N: Node> Graph<N> {
where
C: NodeContext<Node = N>,
{
let mut inner = self.inner.lock().unwrap();
let id = inner.ensure_entry(EntryKey::Valid(node));
if let Some(entry) = inner.entry_for_id_mut(id) {
entry.get(context, id).map(|(res, _)| res).to_boxed()
let (maybe_entry, entry_id) = {
let mut inner = self.inner.lock().unwrap();
let id = inner.ensure_entry(EntryKey::Valid(node));
(inner.entry_for_id(id).cloned(), id)
};
if let Some(mut entry) = maybe_entry {
entry.get(context, entry_id).map(|(res, _)| res).to_boxed()
} else {
future::err(N::Error::invalidated()).to_boxed()
}
Expand Down Expand Up @@ -591,16 +597,19 @@ impl<N: Node> Graph<N> {
) where
C: NodeContext<Node = N>,
{
let mut inner = self.inner.lock().unwrap();
// Get the Generations of all dependencies of the Node. We can trust that these have not changed
// since we began executing, as long as we are not currently marked dirty (see the method doc).
let dep_generations = inner
.pg
.neighbors_directed(entry_id, Direction::Outgoing)
.filter_map(|dep_id| inner.entry_for_id(dep_id))
.map(|entry| entry.generation())
.collect();
if let Some(entry) = inner.entry_for_id_mut(entry_id) {
let (entry, entry_id, dep_generations) = {
let mut inner = self.inner.lock().unwrap();
// Get the Generations of all dependencies of the Node. We can trust that these have not changed
// since we began executing, as long as we are not currently marked dirty (see the method doc).
let dep_generations = inner
.pg
.neighbors_directed(entry_id, Direction::Outgoing)
.filter_map(|dep_id| inner.entry_for_id(dep_id))
.map(|entry| entry.generation())
.collect();
(inner.entry_for_id(entry_id).cloned(), entry_id, dep_generations)
};
if let Some(mut entry) = entry {
entry.complete(context, entry_id, run_token, dep_generations, result);
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/graph/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub type EntryId = stable_graph::NodeIndex<u32>;
///
/// Defines executing a cacheable/memoizable step within the given NodeContext.
///
/// Note that it is assumed that Nodes are very cheap to clone.
///
pub trait Node: Clone + Debug + Eq + Hash + Send + 'static {
type Context: NodeContext<Node = Self>;

Expand Down

0 comments on commit 2a844f0

Please sign in to comment.