Skip to content

Commit

Permalink
Retry for filesystem changes more quickly, indefinitely, and with log…
Browse files Browse the repository at this point in the history
…ging (#10139)

### Problem

As described in #10081, there are a few different concerns around retrying nodes when the filesystem changes:
1. whether we have retried enough times
2. whether we've made it clear to the user that we're retrying
3. whether we retry immediately upon change, or only after something has completed

### Solution

To address each of the above points, we retry:
1. indefinitely
2. with logging
3. immediately upon invalidation of nodes by:
    * removing the `Running::dirty` flag, and moving a `Running` node to `NotStarted` to trigger invalidation
    * "aborting"/dropping ongoing work for a canceled attempt using `futures::Abortable`

Additionally, improve the `Display` and `user_facing_name` implementations for `Node` to allow for better log messages from the `Graph`.

### Result

A message like:
```
12:39:52.93 [INFO] Completed: Building requirements.pex
15:21:58.95 [INFO] Filesystem changed during run: retrying `Test` in 500ms...
15:21:58.96 [INFO] Filesystem changed during run: retrying `@rule coordinator_of_tests((OptionsBootstrapper(args=['--no-v1', '--v2', '--no-process-execution-use-local-cache', 'test', 'tests/python/pants_test/util:'], env={'PANTS_DEV': '1'}, config=ChainedConfig(['/Users/stuhood/src/pants/pants.toml', '/Users/stuhood/.pants.rc'])), WrappedTestFieldSet(field_set=PythonTestFieldSet(address=Address(tests/python/pants_test/util, argutil), origin=SiblingAddresses(directory='tests/python/pants_test/util'), sources=<class 'pants.backend.python.target_types.PythonTestsSources'>(alias='sources', sanitized_raw_value=('test_argutil.py',), default=('test_*.py', '*_test.py', 'tests.py', 'conftest.py')), timeout=pants.backend.python.target_types.PythonTestsTimeout(alias='timeout', value=None, default=None), coverage=pants.backend.python.target_types.PythonCoverage(alias='coverage', value=('pants.util.argutil',), default=None)))))` in 500ms...
12:39:57.17 [INFO] Completed: Building requirements.pex with 1 requirement: dataclasses==0.6
```
...is rendered immediately when a file that a test depends on is invalidated.

Fixes #10081.
  • Loading branch information
stuhood committed Jun 24, 2020
1 parent 529bcbd commit 5a059c3
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 123 deletions.
2 changes: 1 addition & 1 deletion src/python/pants/core/goals/test.py
Expand Up @@ -340,7 +340,7 @@ async def run_tests(
return Test(exit_code)


@rule
@rule(desc="Run test target")
async def coordinator_of_tests(wrapped_field_set: WrappedTestFieldSet) -> AddressAndTestResult:
field_set = wrapped_field_set.field_set
result = await Get[TestResult](TestFieldSet, field_set)
Expand Down
93 changes: 53 additions & 40 deletions src/rust/engine/graph/src/entry.rs
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use crate::node::{EntryId, Node, NodeContext, NodeError};

use futures::channel::oneshot;
use futures::future::{self, BoxFuture, FutureExt};
use futures::future::{self, AbortHandle, Abortable, Aborted, BoxFuture, FutureExt};
use log::{self, trace};
use parking_lot::Mutex;

Expand Down Expand Up @@ -167,10 +167,10 @@ pub enum EntryState<N: Node> {
// The `previous_result` value for a Running node is not a valid value. See NotStarted.
Running {
run_token: RunToken,
abort_handle: AbortHandle,
generation: Generation,
waiters: Vec<Waiter<N>>,
previous_result: Option<EntryResult<N>>,
dirty: bool,
},
// A node that has completed, and then possibly been marked dirty. Because marking a node
// dirty does not eagerly re-execute any logic, it will stay this way until a caller moves it
Expand Down Expand Up @@ -289,6 +289,7 @@ impl<N: Node> Entry<N> {
let run_token = run_token.next();
let context = context_factory.clone_for(entry_id);
let node = node.clone();
let (abort_handle, abort_registration) = AbortHandle::new_pair();

context_factory.spawn(async move {
// If we have previous result generations, compare them to all current dependency
Expand Down Expand Up @@ -319,20 +320,25 @@ impl<N: Node> Entry<N> {
.graph()
.complete(&context, entry_id, run_token, None);
} else {
// The Node needs to (re-)run!
let res = node.run(context.clone()).await;
// The Node needs to (re-)run! Wrap the potentially long running computation in an
// Abortable.
let res = match Abortable::new(node.run(context.clone()), abort_registration).await {
Ok(r) => r,
Err(Aborted) => Err(N::Error::invalidated()),
};

context
.graph()
.complete(&context, entry_id, run_token, Some(res));
}
});

EntryState::Running {
waiters: Vec::new(),
run_token,
abort_handle,
waiters: Vec::new(),
generation,
previous_result,
dirty: false,
}
}

Expand Down Expand Up @@ -456,7 +462,6 @@ impl<N: Node> Entry<N> {
pub(crate) fn complete(
&mut self,
context: &N::Context,
entry_id: EntryId,
result_run_token: RunToken,
dep_generations: Vec<Generation>,
result: Option<Result<N::Item, N::Error>>,
Expand All @@ -472,41 +477,23 @@ impl<N: Node> Entry<N> {
_ => {
// We care about exactly one case: a Running state with the same run_token. All other states
// represent various (legal) race conditions.
trace!("Not completing node {:?} because it was invalidated (different run_token) before completing.", self.node);
trace!(
"Not completing node {:?} because it was invalidated.",
self.node
);
return;
}
}

*state = match mem::replace(&mut *state, EntryState::initial()) {
EntryState::Running {
waiters,
run_token,
waiters,
mut generation,
mut previous_result,
dirty,
..
} => {
match result {
_ if dirty => {
// The node was dirtied while it was running. The dep_generations and new result cannot
// be trusted and were never published. We continue to use the previous result.
trace!(
"Not completing node {:?} because it was dirtied before completing.",
self.node
);
if let Some(previous_result) = previous_result.as_mut() {
previous_result.dirty();
}
Self::run(
context,
&self.node,
entry_id,
run_token,
generation,
None,
previous_result,
)
}
Some(Err(e)) => {
if let Some(previous_result) = previous_result.as_mut() {
previous_result.dirty();
Expand Down Expand Up @@ -641,13 +628,17 @@ impl<N: Node> Entry<N> {
generation,
previous_result,
..
}
| EntryState::Running {
} => (run_token, generation, previous_result),
EntryState::Running {
run_token,
abort_handle,
generation,
previous_result,
..
} => (run_token, generation, previous_result),
} => {
abort_handle.abort();
(run_token, generation, previous_result)
}
EntryState::Completed {
run_token,
generation,
Expand Down Expand Up @@ -682,12 +673,6 @@ impl<N: Node> Entry<N> {
let state = &mut *self.state.lock();
trace!("Dirtying node {:?}", self.node);
match state {
&mut EntryState::Running { ref mut dirty, .. } => {
// An uncacheable node can never be marked dirty.
if self.node.cacheable() {
*dirty = true;
}
}
&mut EntryState::Completed {
ref mut result,
ref mut pollers,
Expand All @@ -698,8 +683,36 @@ impl<N: Node> Entry<N> {
let _ = poller.send(());
}
result.dirty();
return;
}
&mut EntryState::NotStarted { .. } => return,
&mut EntryState::Running { .. } if !self.node.cacheable() => {
// An uncacheable node cannot be interrupted.
return;
}
&mut EntryState::Running { .. } => {
// Handled below: we need to move back to NotStarted.
}
};

*state = match mem::replace(&mut *state, EntryState::initial()) {
EntryState::Running {
run_token,
abort_handle,
generation,
previous_result,
..
} => {
// Dirtying a Running node immediately cancels it.
trace!("Node {:?} was dirtied while running.", self.node);
abort_handle.abort();
EntryState::NotStarted {
run_token,
generation,
previous_result,
}
}
&mut EntryState::NotStarted { .. } => {}
_ => unreachable!(),
}
}

Expand Down
29 changes: 15 additions & 14 deletions src/rust/engine/graph/src/lib.rs
Expand Up @@ -40,11 +40,11 @@ use std::fs::File;
use std::hash::BuildHasherDefault;
use std::io::{self, BufWriter, Write};
use std::path::Path;
use std::time::{Duration, Instant};
use std::time::Duration;

use fnv::FnvHasher;
use futures::future;
use log::{debug, trace, warn};
use log::{debug, info, trace, warn};
use parking_lot::Mutex;
use petgraph::graph::DiGraph;
use petgraph::visit::EdgeRef;
Expand Down Expand Up @@ -453,22 +453,22 @@ impl<N: Node> InnerGraph<N> {
///
pub struct Graph<N: Node> {
inner: Mutex<InnerGraph<N>>,
invalidation_timeout: Duration,
invalidation_delay: Duration,
}

impl<N: Node> Graph<N> {
pub fn new() -> Graph<N> {
Self::new_with_invalidation_timeout(Duration::from_secs(60))
Self::new_with_invalidation_delay(Duration::from_millis(500))
}

pub fn new_with_invalidation_timeout(invalidation_timeout: Duration) -> Graph<N> {
pub fn new_with_invalidation_delay(invalidation_delay: Duration) -> Graph<N> {
let inner = InnerGraph {
nodes: HashMap::default(),
pg: DiGraph::new(),
};
Graph {
inner: Mutex::new(inner),
invalidation_timeout,
invalidation_delay,
}
}

Expand Down Expand Up @@ -531,17 +531,19 @@ impl<N: Node> Graph<N> {
if dst_retry {
// Retry the dst a number of times to handle Node invalidation.
let context = context.clone();
let deadline = Instant::now() + self.invalidation_timeout;
let mut interval = Duration::from_millis(100);
loop {
match entry.get(&context, entry_id).await {
Ok(r) => break Ok(r),
Err(err) if err == N::Error::invalidated() => {
if deadline < Instant::now() {
break Err(N::Error::exhausted());
}
delay_for(interval).await;
interval *= 2;
let node = {
let inner = self.inner.lock();
inner.unsafe_entry_for_id(entry_id).node().clone()
};
info!(
"Filesystem changed during run: retrying `{}` in {:?}...",
node, self.invalidation_delay
);
delay_for(self.invalidation_delay).await;
continue;
}
Err(other_err) => break Err(other_err),
Expand Down Expand Up @@ -801,7 +803,6 @@ impl<N: Node> Graph<N> {
let mut inner = self.inner.lock();
entry.complete(
context,
entry_id,
run_token,
dep_generations,
result,
Expand Down
6 changes: 0 additions & 6 deletions src/rust/engine/graph/src/node.rs
Expand Up @@ -41,12 +41,6 @@ pub trait NodeError: Clone + Debug + Eq + Send {
/// Graph (generally while running).
///
fn invalidated() -> Self;
///
/// Creates an instance that represents an uncacheable node failing from
/// retrying its dependencies too many times, but never able to resolve them,
/// usually because they were invalidated too many times while running.
///
fn exhausted() -> Self;

///
/// Creates an instance that represents that a Node dependency was cyclic along the given path.
Expand Down

0 comments on commit 5a059c3

Please sign in to comment.