diff --git a/crates/turborepo-lib/src/engine/mod.rs b/crates/turborepo-lib/src/engine/mod.rs index 763e4311128d7..ff601888b745c 100644 --- a/crates/turborepo-lib/src/engine/mod.rs +++ b/crates/turborepo-lib/src/engine/mod.rs @@ -175,7 +175,7 @@ impl Engine { .iter() .any(|idx| { node_distances - .get(&(**idx, node_idx)) + .get(&(*idx, node_idx)) .map_or(false, |dist| *dist != i32::MAX) }) .then_some(node.clone()) diff --git a/crates/turborepo-lib/src/run/builder.rs b/crates/turborepo-lib/src/run/builder.rs index a609263dd8634..f4de965470ccb 100644 --- a/crates/turborepo-lib/src/run/builder.rs +++ b/crates/turborepo-lib/src/run/builder.rs @@ -124,11 +124,6 @@ impl RunBuilder { self } - pub fn hide_prelude(mut self) -> Self { - self.should_print_prelude_override = Some(false); - self - } - fn connect_process_manager(&self, signal_subscriber: SignalSubscriber) { let manager = self.processes.clone(); tokio::spawn(async move { diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index 8739b2800e891..aa64247c9a8cf 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -72,8 +72,6 @@ impl WatchClient { execution_args: execution_args.clone(), }); - let mut main_run_handle: Option> = None; - let mut run = RunBuilder::new(new_base)? .build(&handler, telemetry.clone()) .await?; @@ -91,6 +89,8 @@ impl WatchClient { let mut events = client.package_changes().await?; let mut current_runs: HashMap>> = HashMap::new(); + let mut persistent_tasks_handle = None; + let event_fut = async { while let Some(event) = events.next().await { let event = event.unwrap(); @@ -101,6 +101,7 @@ impl WatchClient { &base, &telemetry, &handler, + &mut persistent_tasks_handle, ) .await?; } @@ -128,6 +129,7 @@ impl WatchClient { base: &CommandBase, telemetry: &CommandEventBuilder, handler: &SignalHandler, + persistent_tasks_handle: &mut Option>>, ) -> Result<(), Error> { // Should we recover here? match event { @@ -210,8 +212,24 @@ impl WatchClient { .build(handler, telemetry.clone()) .await?; - // Execute run - run.run().await?; + if run.has_persistent_tasks() { + // Abort old run + if let Some(run) = persistent_tasks_handle.take() { + run.abort(); + } + + let mut persistent_run = run.create_run_for_persistent_tasks(); + // If we have persistent tasks, we run them on a separate thread + // since persistent tasks don't finish + *persistent_tasks_handle = + Some(tokio::spawn(async move { persistent_run.run().await })); + + // But we still run the regular tasks blocking + let mut non_persistent_run = run.create_run_without_persistent_tasks(); + non_persistent_run.run().await?; + } else { + run.run().await?; + } } proto::package_change_event::Event::Error(proto::PackageChangeError { message }) => { return Err(DaemonError::Unavailable(message).into());