diff --git a/src/python/pants/core/goals/test.py b/src/python/pants/core/goals/test.py index a7ef697a13b..93049f8f417 100644 --- a/src/python/pants/core/goals/test.py +++ b/src/python/pants/core/goals/test.py @@ -340,7 +340,7 @@ async def run_tests( return Test(exit_code) -@rule +@rule(desc="Run test target") async def coordinator_of_tests(wrapped_field_set: WrappedTestFieldSet) -> AddressAndTestResult: field_set = wrapped_field_set.field_set result = await Get[TestResult](TestFieldSet, field_set) diff --git a/src/rust/engine/graph/src/entry.rs b/src/rust/engine/graph/src/entry.rs index 22332697f5d..e827b26948d 100644 --- a/src/rust/engine/graph/src/entry.rs +++ b/src/rust/engine/graph/src/entry.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use crate::node::{EntryId, Node, NodeContext, NodeError}; use futures::channel::oneshot; -use futures::future::{self, BoxFuture, FutureExt}; +use futures::future::{self, AbortHandle, Abortable, Aborted, BoxFuture, FutureExt}; use log::{self, trace}; use parking_lot::Mutex; @@ -167,10 +167,10 @@ pub enum EntryState { // The `previous_result` value for a Running node is not a valid value. See NotStarted. Running { run_token: RunToken, + abort_handle: AbortHandle, generation: Generation, waiters: Vec>, previous_result: Option>, - dirty: bool, }, // A node that has completed, and then possibly been marked dirty. Because marking a node // dirty does not eagerly re-execute any logic, it will stay this way until a caller moves it @@ -289,6 +289,7 @@ impl Entry { let run_token = run_token.next(); let context = context_factory.clone_for(entry_id); let node = node.clone(); + let (abort_handle, abort_registration) = AbortHandle::new_pair(); context_factory.spawn(async move { // If we have previous result generations, compare them to all current dependency @@ -319,8 +320,13 @@ impl Entry { .graph() .complete(&context, entry_id, run_token, None); } else { - // The Node needs to (re-)run! - let res = node.run(context.clone()).await; + // The Node needs to (re-)run! Wrap the potentially long running computation in an + // Abortable. + let res = match Abortable::new(node.run(context.clone()), abort_registration).await { + Ok(r) => r, + Err(Aborted) => Err(N::Error::invalidated()), + }; + context .graph() .complete(&context, entry_id, run_token, Some(res)); @@ -328,11 +334,11 @@ impl Entry { }); EntryState::Running { - waiters: Vec::new(), run_token, + abort_handle, + waiters: Vec::new(), generation, previous_result, - dirty: false, } } @@ -456,7 +462,6 @@ impl Entry { pub(crate) fn complete( &mut self, context: &N::Context, - entry_id: EntryId, result_run_token: RunToken, dep_generations: Vec, result: Option>, @@ -472,41 +477,23 @@ impl Entry { _ => { // We care about exactly one case: a Running state with the same run_token. All other states // represent various (legal) race conditions. - trace!("Not completing node {:?} because it was invalidated (different run_token) before completing.", self.node); + trace!( + "Not completing node {:?} because it was invalidated.", + self.node + ); return; } } *state = match mem::replace(&mut *state, EntryState::initial()) { EntryState::Running { - waiters, run_token, + waiters, mut generation, mut previous_result, - dirty, .. } => { match result { - _ if dirty => { - // The node was dirtied while it was running. The dep_generations and new result cannot - // be trusted and were never published. We continue to use the previous result. - trace!( - "Not completing node {:?} because it was dirtied before completing.", - self.node - ); - if let Some(previous_result) = previous_result.as_mut() { - previous_result.dirty(); - } - Self::run( - context, - &self.node, - entry_id, - run_token, - generation, - None, - previous_result, - ) - } Some(Err(e)) => { if let Some(previous_result) = previous_result.as_mut() { previous_result.dirty(); @@ -641,13 +628,17 @@ impl Entry { generation, previous_result, .. - } - | EntryState::Running { + } => (run_token, generation, previous_result), + EntryState::Running { run_token, + abort_handle, generation, previous_result, .. - } => (run_token, generation, previous_result), + } => { + abort_handle.abort(); + (run_token, generation, previous_result) + } EntryState::Completed { run_token, generation, @@ -682,12 +673,6 @@ impl Entry { let state = &mut *self.state.lock(); trace!("Dirtying node {:?}", self.node); match state { - &mut EntryState::Running { ref mut dirty, .. } => { - // An uncacheable node can never be marked dirty. - if self.node.cacheable() { - *dirty = true; - } - } &mut EntryState::Completed { ref mut result, ref mut pollers, @@ -698,8 +683,36 @@ impl Entry { let _ = poller.send(()); } result.dirty(); + return; + } + &mut EntryState::NotStarted { .. } => return, + &mut EntryState::Running { .. } if !self.node.cacheable() => { + // An uncacheable node cannot be interrupted. + return; + } + &mut EntryState::Running { .. } => { + // Handled below: we need to move back to NotStarted. + } + }; + + *state = match mem::replace(&mut *state, EntryState::initial()) { + EntryState::Running { + run_token, + abort_handle, + generation, + previous_result, + .. + } => { + // Dirtying a Running node immediately cancels it. + trace!("Node {:?} was dirtied while running.", self.node); + abort_handle.abort(); + EntryState::NotStarted { + run_token, + generation, + previous_result, + } } - &mut EntryState::NotStarted { .. } => {} + _ => unreachable!(), } } diff --git a/src/rust/engine/graph/src/lib.rs b/src/rust/engine/graph/src/lib.rs index 4f13deba42b..fa8303950b6 100644 --- a/src/rust/engine/graph/src/lib.rs +++ b/src/rust/engine/graph/src/lib.rs @@ -40,11 +40,11 @@ use std::fs::File; use std::hash::BuildHasherDefault; use std::io::{self, BufWriter, Write}; use std::path::Path; -use std::time::{Duration, Instant}; +use std::time::Duration; use fnv::FnvHasher; use futures::future; -use log::{debug, trace, warn}; +use log::{debug, info, trace, warn}; use parking_lot::Mutex; use petgraph::graph::DiGraph; use petgraph::visit::EdgeRef; @@ -453,22 +453,22 @@ impl InnerGraph { /// pub struct Graph { inner: Mutex>, - invalidation_timeout: Duration, + invalidation_delay: Duration, } impl Graph { pub fn new() -> Graph { - Self::new_with_invalidation_timeout(Duration::from_secs(60)) + Self::new_with_invalidation_delay(Duration::from_millis(500)) } - pub fn new_with_invalidation_timeout(invalidation_timeout: Duration) -> Graph { + pub fn new_with_invalidation_delay(invalidation_delay: Duration) -> Graph { let inner = InnerGraph { nodes: HashMap::default(), pg: DiGraph::new(), }; Graph { inner: Mutex::new(inner), - invalidation_timeout, + invalidation_delay, } } @@ -531,17 +531,19 @@ impl Graph { if dst_retry { // Retry the dst a number of times to handle Node invalidation. let context = context.clone(); - let deadline = Instant::now() + self.invalidation_timeout; - let mut interval = Duration::from_millis(100); loop { match entry.get(&context, entry_id).await { Ok(r) => break Ok(r), Err(err) if err == N::Error::invalidated() => { - if deadline < Instant::now() { - break Err(N::Error::exhausted()); - } - delay_for(interval).await; - interval *= 2; + let node = { + let inner = self.inner.lock(); + inner.unsafe_entry_for_id(entry_id).node().clone() + }; + info!( + "Filesystem changed during run: retrying `{}` in {:?}...", + node, self.invalidation_delay + ); + delay_for(self.invalidation_delay).await; continue; } Err(other_err) => break Err(other_err), @@ -801,7 +803,6 @@ impl Graph { let mut inner = self.inner.lock(); entry.complete( context, - entry_id, run_token, dep_generations, result, diff --git a/src/rust/engine/graph/src/node.rs b/src/rust/engine/graph/src/node.rs index 6365630e134..4bd35f37151 100644 --- a/src/rust/engine/graph/src/node.rs +++ b/src/rust/engine/graph/src/node.rs @@ -41,12 +41,6 @@ pub trait NodeError: Clone + Debug + Eq + Send { /// Graph (generally while running). /// fn invalidated() -> Self; - /// - /// Creates an instance that represents an uncacheable node failing from - /// retrying its dependencies too many times, but never able to resolve them, - /// usually because they were invalidated too many times while running. - /// - fn exhausted() -> Self; /// /// Creates an instance that represents that a Node dependency was cyclic along the given path. diff --git a/src/rust/engine/graph/src/tests.rs b/src/rust/engine/graph/src/tests.rs index f3930c382a1..c7c6f200181 100644 --- a/src/rust/engine/graph/src/tests.rs +++ b/src/rust/engine/graph/src/tests.rs @@ -9,7 +9,7 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use parking_lot::Mutex; use rand::{self, Rng}; -use tokio::time::{timeout, Elapsed}; +use tokio::time::{delay_for, timeout, Elapsed}; use crate::{EntryId, Graph, InvalidationResult, Node, NodeContext, NodeError}; @@ -347,8 +347,8 @@ async fn uncachable_node_only_runs_once() { TNode::new(2), TNode::new(1), TNode::new(0), + TNode::new(2), TNode::new(0), - TNode::new(2) ] ); } @@ -356,9 +356,7 @@ async fn uncachable_node_only_runs_once() { #[tokio::test] async fn retries() { let _logger = env_logger::try_init(); - let graph = Arc::new(Graph::new_with_invalidation_timeout(Duration::from_secs( - 10, - ))); + let graph = Arc::new(Graph::new()); let context = { let delay_for_root = Duration::from_millis(100); @@ -388,42 +386,51 @@ async fn retries() { } #[tokio::test] -async fn exhaust_uncacheable_retries() { +async fn canceled_immediately() { let _logger = env_logger::try_init(); - let graph = Arc::new(Graph::new_with_invalidation_timeout(Duration::from_secs(2))); + let invalidation_delay = Duration::from_millis(10); + let graph = Arc::new(Graph::new_with_invalidation_delay(invalidation_delay)); + let delay_for_middle = Duration::from_millis(2000); + let start_time = Instant::now(); let context = { - let mut uncacheable = HashSet::new(); - uncacheable.insert(TNode::new(1)); - let delay_for_root = Duration::from_millis(100); let mut delays = HashMap::new(); - delays.insert(TNode::new(0), delay_for_root); - TContext::new(graph.clone()) - .with_uncacheable(uncacheable) - .with_delays(delays) + delays.insert(TNode::new(1), delay_for_middle); + TContext::new(graph.clone()).with_delays(delays) }; - let sleep_per_invalidation = Duration::from_millis(10); + // We invalidate three times: the mid should only actually run to completion once, because we + // should cancel it the other times. We wait longer than the invalidation_delay for each + // invalidation to ensure that work actually starts before being invalidated. + let iterations = 3; + let sleep_per_invalidation = invalidation_delay * 10; + assert!(delay_for_middle > sleep_per_invalidation * 3); let graph2 = graph.clone(); - let (send, recv) = mpsc::channel(); - let _join = thread::spawn(move || loop { - if let Ok(_) = recv.try_recv() { - break; - }; - thread::sleep(sleep_per_invalidation); - graph2.invalidate_from_roots(|&TNode(n, _)| n == 0); + let _join = thread::spawn(move || { + for _ in 0..iterations { + thread::sleep(sleep_per_invalidation); + graph2.invalidate_from_roots(|&TNode(n, _)| n == 1); + } }); - let (assertion, subject) = match graph.create(TNode::new(2), &context).await { - Err(TError::Exhausted) => (true, None), - Err(e) => (false, Some(Err(e))), - other => (false, Some(other)), - }; - send.send(()).unwrap(); - assert!( - assertion, - "expected {:?} found {:?}", - Err::<(), TError>(TError::Exhausted), - subject + assert_eq!( + graph.create(TNode::new(2), &context).await, + Ok(vec![T(0, 0), T(1, 0), T(2, 0)]) + ); + + // We should have waited much less than the time it would have taken to complete three times. + assert!(Instant::now() < start_time + (delay_for_middle * iterations)); + + // And the top nodes should have seen three aborts. + assert_eq!( + vec![ + TNode::new(1), + TNode::new(2), + TNode::new(1), + TNode::new(2), + TNode::new(1), + TNode::new(2) + ], + context.aborts(), ); } @@ -611,16 +618,19 @@ impl Node for TNode { type Error = TError; async fn run(self, context: TContext) -> Result, TError> { + let mut abort_guard = context.abort_guard(self.clone()); context.ran(self.clone()); let token = T(self.0, context.salt()); - context.maybe_delay(&self); - if let Some(dep) = context.dependency_of(&self) { + context.maybe_delay(&self).await; + let res = if let Some(dep) = context.dependency_of(&self) { let mut v = context.get(dep).await?; v.push(token); Ok(v) } else { Ok(vec![token]) - } + }; + abort_guard.did_not_abort(); + res } fn cacheable(&self) -> bool { @@ -699,6 +709,7 @@ struct TContext { delays: Arc>, uncacheable: Arc>, graph: Arc>, + aborts: Arc>>, runs: Arc>>, entry_id: Option, } @@ -714,6 +725,7 @@ impl NodeContext for TContext { delays: self.delays.clone(), uncacheable: self.uncacheable.clone(), graph: self.graph.clone(), + aborts: self.aborts.clone(), runs: self.runs.clone(), entry_id: Some(entry_id), } @@ -745,6 +757,7 @@ impl TContext { delays: Arc::default(), uncacheable: Arc::default(), graph, + aborts: Arc::new(Mutex::new(Vec::new())), runs: Arc::new(Mutex::new(Vec::new())), entry_id: None, } @@ -787,14 +800,26 @@ impl TContext { self.graph.get(self.entry_id, self, dst).await } + fn abort_guard(&self, node: TNode) -> AbortGuard { + AbortGuard { + context: self.clone(), + node: Some(node), + } + } + + fn aborted(&self, node: TNode) { + let mut aborts = self.aborts.lock(); + aborts.push(node); + } + fn ran(&self, node: TNode) { let mut runs = self.runs.lock(); runs.push(node); } - fn maybe_delay(&self, node: &TNode) { + async fn maybe_delay(&self, node: &TNode) { if let Some(delay) = self.delays.get(node) { - thread::sleep(*delay); + delay_for(*delay).await; } } @@ -816,15 +841,41 @@ impl TContext { } } + fn aborts(&self) -> Vec { + self.aborts.lock().clone() + } + fn runs(&self) -> Vec { self.runs.lock().clone() } } +/// +/// A guard that if dropped, records that the given Node was aborted. When a future is canceled, it +/// is dropped without re-running. +/// +struct AbortGuard { + context: TContext, + node: Option, +} + +impl AbortGuard { + fn did_not_abort(&mut self) { + self.node = None; + } +} + +impl Drop for AbortGuard { + fn drop(&mut self) { + if let Some(node) = self.node.take() { + self.context.aborted(node); + } + } +} + #[derive(Clone, Debug, Eq, PartialEq)] enum TError { Cyclic, - Exhausted, Invalidated, } impl NodeError for TError { @@ -832,10 +883,6 @@ impl NodeError for TError { TError::Invalidated } - fn exhausted() -> Self { - TError::Exhausted - } - fn cyclic(_path: Vec) -> Self { TError::Cyclic } diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index 5fd447c9bf7..f20e4f94933 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -126,10 +126,8 @@ impl StreamedHermeticCommand { let mut inner = Command::new(program); inner // TODO: This will not universally prevent child processes continuing to run in the - // background, for a few reasons: - // 1) the Graph memoizes runs, and generally completes them rather than cancelling them, - // 2) killing a pantsd client with Ctrl+C kills the server with a signal, which won't - // currently result in an orderly dropping of everything in the graph. See #10004. + // background, because killing a pantsd client with Ctrl+C kills the server with a signal, + // which won't currently result in an orderly dropping of everything in the graph. See #10004. .kill_on_drop(true) .env_clear() // It would be really nice not to have to manually set PATH but this is sadly the only way diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index be12c6e263a..e8b68c85983 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -1040,14 +1040,16 @@ impl NodeKey { fn user_facing_name(&self) -> Option { match self { NodeKey::Task(ref task) => task.task.display_info.desc.as_ref().map(|s| s.to_owned()), - NodeKey::Snapshot(_) => Some(format!("{}", self)), + NodeKey::Snapshot(ref s) => Some(format!("Snapshotting: {}", s.0)), NodeKey::MultiPlatformExecuteProcess(mp_epr) => mp_epr.0.user_facing_name(), NodeKey::DigestFile(DigestFile(File { path, .. })) => { Some(format!("Fingerprinting: {}", path.display())) } - NodeKey::DownloadedFile(..) => None, + NodeKey::DownloadedFile(ref d) => Some(format!("Downloading: {}", d.0)), NodeKey::ReadLink(..) => None, - NodeKey::Scandir(Scandir(Dir(path))) => Some(format!("Reading {}", path.display())), + NodeKey::Scandir(Scandir(Dir(path))) => { + Some(format!("Reading directory: {}", path.display())) + } NodeKey::Select(..) => None, } } @@ -1170,16 +1172,22 @@ impl Node for NodeKey { impl Display for NodeKey { fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { match self { - &NodeKey::DigestFile(ref s) => write!(f, "DigestFile({:?})", s.0), - &NodeKey::DownloadedFile(ref s) => write!(f, "DownloadedFile({:?})", s.0), + &NodeKey::DigestFile(ref s) => write!(f, "DigestFile({})", s.0.path.display()), + &NodeKey::DownloadedFile(ref s) => write!(f, "DownloadedFile({})", s.0), &NodeKey::MultiPlatformExecuteProcess(ref s) => { - write!(f, "MultiPlatformExecuteProcess({:?}", s.0) + if let Some(name) = s.0.user_facing_name() { + write!(f, "Process({})", name) + } else { + write!(f, "Process({:?})", s) + } } - &NodeKey::ReadLink(ref s) => write!(f, "ReadLink({:?})", s.0), - &NodeKey::Scandir(ref s) => write!(f, "Scandir({:?})", s.0), - &NodeKey::Select(ref s) => write!(f, "Select({}, {})", s.params, s.product,), - &NodeKey::Task(ref task) => write!(f, "{:?}", task), - &NodeKey::Snapshot(ref s) => write!(f, "Snapshot({})", format!("{}", &s.0)), + &NodeKey::ReadLink(ref s) => write!(f, "ReadLink({})", (s.0).0.display()), + &NodeKey::Scandir(ref s) => write!(f, "Scandir({})", (s.0).0.display()), + &NodeKey::Select(ref s) => write!(f, "{}", s.product), + &NodeKey::Task(ref task) => { + write!(f, "@rule {}({})", task.task.display_info.name, task.params) + } + &NodeKey::Snapshot(ref s) => write!(f, "Snapshot({})", s.0), } } } @@ -1189,10 +1197,6 @@ impl NodeError for Failure { Failure::Invalidated } - fn exhausted() -> Failure { - Context::mk_error("Exhausted retries while waiting for the filesystem to stabilize.") - } - fn cyclic(mut path: Vec) -> Failure { let path_len = path.len(); if path_len > 1 {