From 5a059c3c85b0661c26625d49e15e50a74c045fde Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Tue, 23 Jun 2020 19:21:23 -0700 Subject: [PATCH] Retry for filesystem changes more quickly, indefinitely, and with logging (#10139) ### Problem As described in #10081, there are a few different concerns around retrying nodes when the filesystem changes: 1. whether we have retried enough times 2. whether we've made it clear to the user that we're retrying 3. whether we retry immediately upon change, or only after something has completed ### Solution To address each of the above points, we retry: 1. indefinitely 2. with logging 3. immediately upon invalidation of nodes by: * removing the `Running::dirty` flag, and moving a `Running` node to `NotStarted` to trigger invalidation * "aborting"/dropping ongoing work for a canceled attempt using `futures::Abortable` Additionally, improve the `Display` and `user_facing_name` implementations for `Node` to allow for better log messages from the `Graph`. ### Result A message like: ``` 12:39:52.93 [INFO] Completed: Building requirements.pex 15:21:58.95 [INFO] Filesystem changed during run: retrying `Test` in 500ms... 15:21:58.96 [INFO] Filesystem changed during run: retrying `@rule coordinator_of_tests((OptionsBootstrapper(args=['--no-v1', '--v2', '--no-process-execution-use-local-cache', 'test', 'tests/python/pants_test/util:'], env={'PANTS_DEV': '1'}, config=ChainedConfig(['/Users/stuhood/src/pants/pants.toml', '/Users/stuhood/.pants.rc'])), WrappedTestFieldSet(field_set=PythonTestFieldSet(address=Address(tests/python/pants_test/util, argutil), origin=SiblingAddresses(directory='tests/python/pants_test/util'), sources=(alias='sources', sanitized_raw_value=('test_argutil.py',), default=('test_*.py', '*_test.py', 'tests.py', 'conftest.py')), timeout=pants.backend.python.target_types.PythonTestsTimeout(alias='timeout', value=None, default=None), coverage=pants.backend.python.target_types.PythonCoverage(alias='coverage', value=('pants.util.argutil',), default=None)))))` in 500ms... 12:39:57.17 [INFO] Completed: Building requirements.pex with 1 requirement: dataclasses==0.6 ``` ...is rendered immediately when a file that a test depends on is invalidated. Fixes #10081. --- src/python/pants/core/goals/test.py | 2 +- src/rust/engine/graph/src/entry.rs | 93 ++++++------ src/rust/engine/graph/src/lib.rs | 29 ++-- src/rust/engine/graph/src/node.rs | 6 - src/rust/engine/graph/src/tests.rs | 133 ++++++++++++------ .../engine/process_execution/src/local.rs | 6 +- src/rust/engine/src/nodes.rs | 34 +++-- 7 files changed, 180 insertions(+), 123 deletions(-) diff --git a/src/python/pants/core/goals/test.py b/src/python/pants/core/goals/test.py index a7ef697a13b..93049f8f417 100644 --- a/src/python/pants/core/goals/test.py +++ b/src/python/pants/core/goals/test.py @@ -340,7 +340,7 @@ async def run_tests( return Test(exit_code) -@rule +@rule(desc="Run test target") async def coordinator_of_tests(wrapped_field_set: WrappedTestFieldSet) -> AddressAndTestResult: field_set = wrapped_field_set.field_set result = await Get[TestResult](TestFieldSet, field_set) diff --git a/src/rust/engine/graph/src/entry.rs b/src/rust/engine/graph/src/entry.rs index 22332697f5d..e827b26948d 100644 --- a/src/rust/engine/graph/src/entry.rs +++ b/src/rust/engine/graph/src/entry.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use crate::node::{EntryId, Node, NodeContext, NodeError}; use futures::channel::oneshot; -use futures::future::{self, BoxFuture, FutureExt}; +use futures::future::{self, AbortHandle, Abortable, Aborted, BoxFuture, FutureExt}; use log::{self, trace}; use parking_lot::Mutex; @@ -167,10 +167,10 @@ pub enum EntryState { // The `previous_result` value for a Running node is not a valid value. See NotStarted. Running { run_token: RunToken, + abort_handle: AbortHandle, generation: Generation, waiters: Vec>, previous_result: Option>, - dirty: bool, }, // A node that has completed, and then possibly been marked dirty. Because marking a node // dirty does not eagerly re-execute any logic, it will stay this way until a caller moves it @@ -289,6 +289,7 @@ impl Entry { let run_token = run_token.next(); let context = context_factory.clone_for(entry_id); let node = node.clone(); + let (abort_handle, abort_registration) = AbortHandle::new_pair(); context_factory.spawn(async move { // If we have previous result generations, compare them to all current dependency @@ -319,8 +320,13 @@ impl Entry { .graph() .complete(&context, entry_id, run_token, None); } else { - // The Node needs to (re-)run! - let res = node.run(context.clone()).await; + // The Node needs to (re-)run! Wrap the potentially long running computation in an + // Abortable. + let res = match Abortable::new(node.run(context.clone()), abort_registration).await { + Ok(r) => r, + Err(Aborted) => Err(N::Error::invalidated()), + }; + context .graph() .complete(&context, entry_id, run_token, Some(res)); @@ -328,11 +334,11 @@ impl Entry { }); EntryState::Running { - waiters: Vec::new(), run_token, + abort_handle, + waiters: Vec::new(), generation, previous_result, - dirty: false, } } @@ -456,7 +462,6 @@ impl Entry { pub(crate) fn complete( &mut self, context: &N::Context, - entry_id: EntryId, result_run_token: RunToken, dep_generations: Vec, result: Option>, @@ -472,41 +477,23 @@ impl Entry { _ => { // We care about exactly one case: a Running state with the same run_token. All other states // represent various (legal) race conditions. - trace!("Not completing node {:?} because it was invalidated (different run_token) before completing.", self.node); + trace!( + "Not completing node {:?} because it was invalidated.", + self.node + ); return; } } *state = match mem::replace(&mut *state, EntryState::initial()) { EntryState::Running { - waiters, run_token, + waiters, mut generation, mut previous_result, - dirty, .. } => { match result { - _ if dirty => { - // The node was dirtied while it was running. The dep_generations and new result cannot - // be trusted and were never published. We continue to use the previous result. - trace!( - "Not completing node {:?} because it was dirtied before completing.", - self.node - ); - if let Some(previous_result) = previous_result.as_mut() { - previous_result.dirty(); - } - Self::run( - context, - &self.node, - entry_id, - run_token, - generation, - None, - previous_result, - ) - } Some(Err(e)) => { if let Some(previous_result) = previous_result.as_mut() { previous_result.dirty(); @@ -641,13 +628,17 @@ impl Entry { generation, previous_result, .. - } - | EntryState::Running { + } => (run_token, generation, previous_result), + EntryState::Running { run_token, + abort_handle, generation, previous_result, .. - } => (run_token, generation, previous_result), + } => { + abort_handle.abort(); + (run_token, generation, previous_result) + } EntryState::Completed { run_token, generation, @@ -682,12 +673,6 @@ impl Entry { let state = &mut *self.state.lock(); trace!("Dirtying node {:?}", self.node); match state { - &mut EntryState::Running { ref mut dirty, .. } => { - // An uncacheable node can never be marked dirty. - if self.node.cacheable() { - *dirty = true; - } - } &mut EntryState::Completed { ref mut result, ref mut pollers, @@ -698,8 +683,36 @@ impl Entry { let _ = poller.send(()); } result.dirty(); + return; + } + &mut EntryState::NotStarted { .. } => return, + &mut EntryState::Running { .. } if !self.node.cacheable() => { + // An uncacheable node cannot be interrupted. + return; + } + &mut EntryState::Running { .. } => { + // Handled below: we need to move back to NotStarted. + } + }; + + *state = match mem::replace(&mut *state, EntryState::initial()) { + EntryState::Running { + run_token, + abort_handle, + generation, + previous_result, + .. + } => { + // Dirtying a Running node immediately cancels it. + trace!("Node {:?} was dirtied while running.", self.node); + abort_handle.abort(); + EntryState::NotStarted { + run_token, + generation, + previous_result, + } } - &mut EntryState::NotStarted { .. } => {} + _ => unreachable!(), } } diff --git a/src/rust/engine/graph/src/lib.rs b/src/rust/engine/graph/src/lib.rs index 4f13deba42b..fa8303950b6 100644 --- a/src/rust/engine/graph/src/lib.rs +++ b/src/rust/engine/graph/src/lib.rs @@ -40,11 +40,11 @@ use std::fs::File; use std::hash::BuildHasherDefault; use std::io::{self, BufWriter, Write}; use std::path::Path; -use std::time::{Duration, Instant}; +use std::time::Duration; use fnv::FnvHasher; use futures::future; -use log::{debug, trace, warn}; +use log::{debug, info, trace, warn}; use parking_lot::Mutex; use petgraph::graph::DiGraph; use petgraph::visit::EdgeRef; @@ -453,22 +453,22 @@ impl InnerGraph { /// pub struct Graph { inner: Mutex>, - invalidation_timeout: Duration, + invalidation_delay: Duration, } impl Graph { pub fn new() -> Graph { - Self::new_with_invalidation_timeout(Duration::from_secs(60)) + Self::new_with_invalidation_delay(Duration::from_millis(500)) } - pub fn new_with_invalidation_timeout(invalidation_timeout: Duration) -> Graph { + pub fn new_with_invalidation_delay(invalidation_delay: Duration) -> Graph { let inner = InnerGraph { nodes: HashMap::default(), pg: DiGraph::new(), }; Graph { inner: Mutex::new(inner), - invalidation_timeout, + invalidation_delay, } } @@ -531,17 +531,19 @@ impl Graph { if dst_retry { // Retry the dst a number of times to handle Node invalidation. let context = context.clone(); - let deadline = Instant::now() + self.invalidation_timeout; - let mut interval = Duration::from_millis(100); loop { match entry.get(&context, entry_id).await { Ok(r) => break Ok(r), Err(err) if err == N::Error::invalidated() => { - if deadline < Instant::now() { - break Err(N::Error::exhausted()); - } - delay_for(interval).await; - interval *= 2; + let node = { + let inner = self.inner.lock(); + inner.unsafe_entry_for_id(entry_id).node().clone() + }; + info!( + "Filesystem changed during run: retrying `{}` in {:?}...", + node, self.invalidation_delay + ); + delay_for(self.invalidation_delay).await; continue; } Err(other_err) => break Err(other_err), @@ -801,7 +803,6 @@ impl Graph { let mut inner = self.inner.lock(); entry.complete( context, - entry_id, run_token, dep_generations, result, diff --git a/src/rust/engine/graph/src/node.rs b/src/rust/engine/graph/src/node.rs index 6365630e134..4bd35f37151 100644 --- a/src/rust/engine/graph/src/node.rs +++ b/src/rust/engine/graph/src/node.rs @@ -41,12 +41,6 @@ pub trait NodeError: Clone + Debug + Eq + Send { /// Graph (generally while running). /// fn invalidated() -> Self; - /// - /// Creates an instance that represents an uncacheable node failing from - /// retrying its dependencies too many times, but never able to resolve them, - /// usually because they were invalidated too many times while running. - /// - fn exhausted() -> Self; /// /// Creates an instance that represents that a Node dependency was cyclic along the given path. diff --git a/src/rust/engine/graph/src/tests.rs b/src/rust/engine/graph/src/tests.rs index f3930c382a1..c7c6f200181 100644 --- a/src/rust/engine/graph/src/tests.rs +++ b/src/rust/engine/graph/src/tests.rs @@ -9,7 +9,7 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use parking_lot::Mutex; use rand::{self, Rng}; -use tokio::time::{timeout, Elapsed}; +use tokio::time::{delay_for, timeout, Elapsed}; use crate::{EntryId, Graph, InvalidationResult, Node, NodeContext, NodeError}; @@ -347,8 +347,8 @@ async fn uncachable_node_only_runs_once() { TNode::new(2), TNode::new(1), TNode::new(0), + TNode::new(2), TNode::new(0), - TNode::new(2) ] ); } @@ -356,9 +356,7 @@ async fn uncachable_node_only_runs_once() { #[tokio::test] async fn retries() { let _logger = env_logger::try_init(); - let graph = Arc::new(Graph::new_with_invalidation_timeout(Duration::from_secs( - 10, - ))); + let graph = Arc::new(Graph::new()); let context = { let delay_for_root = Duration::from_millis(100); @@ -388,42 +386,51 @@ async fn retries() { } #[tokio::test] -async fn exhaust_uncacheable_retries() { +async fn canceled_immediately() { let _logger = env_logger::try_init(); - let graph = Arc::new(Graph::new_with_invalidation_timeout(Duration::from_secs(2))); + let invalidation_delay = Duration::from_millis(10); + let graph = Arc::new(Graph::new_with_invalidation_delay(invalidation_delay)); + let delay_for_middle = Duration::from_millis(2000); + let start_time = Instant::now(); let context = { - let mut uncacheable = HashSet::new(); - uncacheable.insert(TNode::new(1)); - let delay_for_root = Duration::from_millis(100); let mut delays = HashMap::new(); - delays.insert(TNode::new(0), delay_for_root); - TContext::new(graph.clone()) - .with_uncacheable(uncacheable) - .with_delays(delays) + delays.insert(TNode::new(1), delay_for_middle); + TContext::new(graph.clone()).with_delays(delays) }; - let sleep_per_invalidation = Duration::from_millis(10); + // We invalidate three times: the mid should only actually run to completion once, because we + // should cancel it the other times. We wait longer than the invalidation_delay for each + // invalidation to ensure that work actually starts before being invalidated. + let iterations = 3; + let sleep_per_invalidation = invalidation_delay * 10; + assert!(delay_for_middle > sleep_per_invalidation * 3); let graph2 = graph.clone(); - let (send, recv) = mpsc::channel(); - let _join = thread::spawn(move || loop { - if let Ok(_) = recv.try_recv() { - break; - }; - thread::sleep(sleep_per_invalidation); - graph2.invalidate_from_roots(|&TNode(n, _)| n == 0); + let _join = thread::spawn(move || { + for _ in 0..iterations { + thread::sleep(sleep_per_invalidation); + graph2.invalidate_from_roots(|&TNode(n, _)| n == 1); + } }); - let (assertion, subject) = match graph.create(TNode::new(2), &context).await { - Err(TError::Exhausted) => (true, None), - Err(e) => (false, Some(Err(e))), - other => (false, Some(other)), - }; - send.send(()).unwrap(); - assert!( - assertion, - "expected {:?} found {:?}", - Err::<(), TError>(TError::Exhausted), - subject + assert_eq!( + graph.create(TNode::new(2), &context).await, + Ok(vec![T(0, 0), T(1, 0), T(2, 0)]) + ); + + // We should have waited much less than the time it would have taken to complete three times. + assert!(Instant::now() < start_time + (delay_for_middle * iterations)); + + // And the top nodes should have seen three aborts. + assert_eq!( + vec![ + TNode::new(1), + TNode::new(2), + TNode::new(1), + TNode::new(2), + TNode::new(1), + TNode::new(2) + ], + context.aborts(), ); } @@ -611,16 +618,19 @@ impl Node for TNode { type Error = TError; async fn run(self, context: TContext) -> Result, TError> { + let mut abort_guard = context.abort_guard(self.clone()); context.ran(self.clone()); let token = T(self.0, context.salt()); - context.maybe_delay(&self); - if let Some(dep) = context.dependency_of(&self) { + context.maybe_delay(&self).await; + let res = if let Some(dep) = context.dependency_of(&self) { let mut v = context.get(dep).await?; v.push(token); Ok(v) } else { Ok(vec![token]) - } + }; + abort_guard.did_not_abort(); + res } fn cacheable(&self) -> bool { @@ -699,6 +709,7 @@ struct TContext { delays: Arc>, uncacheable: Arc>, graph: Arc>, + aborts: Arc>>, runs: Arc>>, entry_id: Option, } @@ -714,6 +725,7 @@ impl NodeContext for TContext { delays: self.delays.clone(), uncacheable: self.uncacheable.clone(), graph: self.graph.clone(), + aborts: self.aborts.clone(), runs: self.runs.clone(), entry_id: Some(entry_id), } @@ -745,6 +757,7 @@ impl TContext { delays: Arc::default(), uncacheable: Arc::default(), graph, + aborts: Arc::new(Mutex::new(Vec::new())), runs: Arc::new(Mutex::new(Vec::new())), entry_id: None, } @@ -787,14 +800,26 @@ impl TContext { self.graph.get(self.entry_id, self, dst).await } + fn abort_guard(&self, node: TNode) -> AbortGuard { + AbortGuard { + context: self.clone(), + node: Some(node), + } + } + + fn aborted(&self, node: TNode) { + let mut aborts = self.aborts.lock(); + aborts.push(node); + } + fn ran(&self, node: TNode) { let mut runs = self.runs.lock(); runs.push(node); } - fn maybe_delay(&self, node: &TNode) { + async fn maybe_delay(&self, node: &TNode) { if let Some(delay) = self.delays.get(node) { - thread::sleep(*delay); + delay_for(*delay).await; } } @@ -816,15 +841,41 @@ impl TContext { } } + fn aborts(&self) -> Vec { + self.aborts.lock().clone() + } + fn runs(&self) -> Vec { self.runs.lock().clone() } } +/// +/// A guard that if dropped, records that the given Node was aborted. When a future is canceled, it +/// is dropped without re-running. +/// +struct AbortGuard { + context: TContext, + node: Option, +} + +impl AbortGuard { + fn did_not_abort(&mut self) { + self.node = None; + } +} + +impl Drop for AbortGuard { + fn drop(&mut self) { + if let Some(node) = self.node.take() { + self.context.aborted(node); + } + } +} + #[derive(Clone, Debug, Eq, PartialEq)] enum TError { Cyclic, - Exhausted, Invalidated, } impl NodeError for TError { @@ -832,10 +883,6 @@ impl NodeError for TError { TError::Invalidated } - fn exhausted() -> Self { - TError::Exhausted - } - fn cyclic(_path: Vec) -> Self { TError::Cyclic } diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index 5fd447c9bf7..f20e4f94933 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -126,10 +126,8 @@ impl StreamedHermeticCommand { let mut inner = Command::new(program); inner // TODO: This will not universally prevent child processes continuing to run in the - // background, for a few reasons: - // 1) the Graph memoizes runs, and generally completes them rather than cancelling them, - // 2) killing a pantsd client with Ctrl+C kills the server with a signal, which won't - // currently result in an orderly dropping of everything in the graph. See #10004. + // background, because killing a pantsd client with Ctrl+C kills the server with a signal, + // which won't currently result in an orderly dropping of everything in the graph. See #10004. .kill_on_drop(true) .env_clear() // It would be really nice not to have to manually set PATH but this is sadly the only way diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index be12c6e263a..e8b68c85983 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -1040,14 +1040,16 @@ impl NodeKey { fn user_facing_name(&self) -> Option { match self { NodeKey::Task(ref task) => task.task.display_info.desc.as_ref().map(|s| s.to_owned()), - NodeKey::Snapshot(_) => Some(format!("{}", self)), + NodeKey::Snapshot(ref s) => Some(format!("Snapshotting: {}", s.0)), NodeKey::MultiPlatformExecuteProcess(mp_epr) => mp_epr.0.user_facing_name(), NodeKey::DigestFile(DigestFile(File { path, .. })) => { Some(format!("Fingerprinting: {}", path.display())) } - NodeKey::DownloadedFile(..) => None, + NodeKey::DownloadedFile(ref d) => Some(format!("Downloading: {}", d.0)), NodeKey::ReadLink(..) => None, - NodeKey::Scandir(Scandir(Dir(path))) => Some(format!("Reading {}", path.display())), + NodeKey::Scandir(Scandir(Dir(path))) => { + Some(format!("Reading directory: {}", path.display())) + } NodeKey::Select(..) => None, } } @@ -1170,16 +1172,22 @@ impl Node for NodeKey { impl Display for NodeKey { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { match self { - &NodeKey::DigestFile(ref s) => write!(f, "DigestFile({:?})", s.0), - &NodeKey::DownloadedFile(ref s) => write!(f, "DownloadedFile({:?})", s.0), + &NodeKey::DigestFile(ref s) => write!(f, "DigestFile({})", s.0.path.display()), + &NodeKey::DownloadedFile(ref s) => write!(f, "DownloadedFile({})", s.0), &NodeKey::MultiPlatformExecuteProcess(ref s) => { - write!(f, "MultiPlatformExecuteProcess({:?}", s.0) + if let Some(name) = s.0.user_facing_name() { + write!(f, "Process({})", name) + } else { + write!(f, "Process({:?})", s) + } } - &NodeKey::ReadLink(ref s) => write!(f, "ReadLink({:?})", s.0), - &NodeKey::Scandir(ref s) => write!(f, "Scandir({:?})", s.0), - &NodeKey::Select(ref s) => write!(f, "Select({}, {})", s.params, s.product,), - &NodeKey::Task(ref task) => write!(f, "{:?}", task), - &NodeKey::Snapshot(ref s) => write!(f, "Snapshot({})", format!("{}", &s.0)), + &NodeKey::ReadLink(ref s) => write!(f, "ReadLink({})", (s.0).0.display()), + &NodeKey::Scandir(ref s) => write!(f, "Scandir({})", (s.0).0.display()), + &NodeKey::Select(ref s) => write!(f, "{}", s.product), + &NodeKey::Task(ref task) => { + write!(f, "@rule {}({})", task.task.display_info.name, task.params) + } + &NodeKey::Snapshot(ref s) => write!(f, "Snapshot({})", s.0), } } } @@ -1189,10 +1197,6 @@ impl NodeError for Failure { Failure::Invalidated } - fn exhausted() -> Failure { - Context::mk_error("Exhausted retries while waiting for the filesystem to stabilize.") - } - fn cyclic(mut path: Vec) -> Failure { let path_len = path.len(); if path_len > 1 {