From 8562f6a8d05c31efd778e2a7fbb4b555cfbe8b19 Mon Sep 17 00:00:00 2001 From: nicholaslyang Date: Tue, 23 Apr 2024 17:54:16 -0400 Subject: [PATCH 1/6] Refactoring in preparation for deduplicating events --- crates/turborepo-lib/src/cli/mod.rs | 3 +- crates/turborepo-lib/src/run/watch.rs | 107 +++++++++++++++----------- 2 files changed, 62 insertions(+), 48 deletions(-) diff --git a/crates/turborepo-lib/src/cli/mod.rs b/crates/turborepo-lib/src/cli/mod.rs index dbf3438c82dde..dd9dc72a7a1c7 100644 --- a/crates/turborepo-lib/src/cli/mod.rs +++ b/crates/turborepo-lib/src/cli/mod.rs @@ -1286,7 +1286,8 @@ pub async fn run( event.track_call(); let base = CommandBase::new(cli_args, repo_root, version, ui); - WatchClient::start(base, event).await?; + let mut client = WatchClient::new(base, event).await?; + client.start().await?; // We only exit if we get a signal, so we return a non-zero exit code return Ok(1); } diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index d172e39ff78ab..ba992e11f852c 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -18,7 +18,15 @@ use crate::{ DaemonConnector, DaemonPaths, }; -pub struct WatchClient {} +pub struct WatchClient { + run: Run, + persistent_tasks_handle: Option>>, + current_runs: HashMap>>, + connector: DaemonConnector, + base: CommandBase, + telemetry: CommandEventBuilder, + handler: SignalHandler, +} #[derive(Debug, Error, Diagnostic)] pub enum Error { @@ -51,6 +59,8 @@ pub enum Error { }, #[error("daemon connection closed")] ConnectionClosed, + #[error("failed to subscribe to signal handler, shutting down")] + NoSignalHandler, #[error("watch interrupted due to signal")] SignalInterrupt, #[error("package change error")] @@ -58,13 +68,9 @@ pub enum Error { } impl WatchClient { - pub async fn start(base: CommandBase, telemetry: CommandEventBuilder) -> Result<(), Error> { + pub async fn new(base: CommandBase, telemetry: CommandEventBuilder) -> Result { let signal = commands::run::get_signal()?; let handler = SignalHandler::new(signal); - let Some(signal_subscriber) = handler.subscribe() else { - tracing::warn!("failed to subscribe to signal handler, shutting down"); - return Ok(()); - }; let Some(Command::Watch(execution_args)) = &base.args().command else { unreachable!() @@ -76,38 +82,41 @@ impl WatchClient { execution_args: execution_args.clone(), }); - let mut run = RunBuilder::new(new_base)? + let run = RunBuilder::new(new_base)? .build(&handler, telemetry.clone()) .await?; - run.print_run_prelude(); - let connector = DaemonConnector { can_start_server: true, can_kill_server: true, paths: DaemonPaths::from_repo_root(&base.repo_root), }; + Ok(Self { + base, + run, + connector, + handler, + telemetry, + persistent_tasks_handle: None, + current_runs: HashMap::new(), + }) + } + + pub async fn start(&mut self) -> Result<(), Error> { + let connector = self.connector.clone(); let mut client = connector.connect().await?; let mut events = client.package_changes().await?; - let mut current_runs: HashMap>> = - HashMap::new(); - let mut persistent_tasks_handle = None; + + self.run.print_run_prelude(); + + let signal_subscriber = self.handler.subscribe().ok_or(Error::NoSignalHandler)?; let event_fut = async { while let Some(event) = events.next().await { let event = event?; - Self::handle_change_event( - &mut run, - event.event.unwrap(), - &mut current_runs, - &base, - &telemetry, - &handler, - &mut persistent_tasks_handle, - ) - .await?; + self.handle_change_event(event.event.unwrap()).await?; } Err(Error::ConnectionClosed) @@ -127,13 +136,8 @@ impl WatchClient { } async fn handle_change_event( - run: &mut Run, + &mut self, event: proto::package_change_event::Event, - current_runs: &mut HashMap>>, - base: &CommandBase, - telemetry: &CommandEventBuilder, - handler: &SignalHandler, - persistent_tasks_handle: &mut Option>>, ) -> Result<(), Error> { // Should we recover here? match event { @@ -142,11 +146,11 @@ impl WatchClient { }) => { let package_name = PackageName::from(package_name); // If not in the filtered pkgs, ignore - if !run.filtered_pkgs.contains(&package_name) { + if !self.run.filtered_pkgs.contains(&package_name) { return Ok(()); } - let mut args = base.args().clone(); + let mut args = self.base.args().clone(); args.command = args.command.map(|c| { if let Command::Watch(execution_args) = c { Command::Run { @@ -162,18 +166,22 @@ impl WatchClient { } }); - let new_base = - CommandBase::new(args, base.repo_root.clone(), get_version(), base.ui); + let new_base = CommandBase::new( + args, + self.base.repo_root.clone(), + get_version(), + self.base.ui, + ); // TODO: Add logic on when to abort vs wait - if let Some(run) = current_runs.remove(&package_name) { + if let Some(run) = self.current_runs.remove(&package_name) { run.abort(); } - let signal_handler = handler.clone(); - let telemetry = telemetry.clone(); + let signal_handler = self.handler.clone(); + let telemetry = self.telemetry.clone(); - current_runs.insert( + self.current_runs.insert( package_name.clone(), tokio::spawn(async move { let mut run = RunBuilder::new(new_base)? @@ -187,7 +195,7 @@ impl WatchClient { ); } proto::package_change_event::Event::RediscoverPackages(_) => { - let mut args = base.args().clone(); + let mut args = self.base.args().clone(); args.command = args.command.map(|c| { if let Command::Watch(execution_args) = c { Command::Run { @@ -203,36 +211,41 @@ impl WatchClient { } }); - let base = CommandBase::new(args, base.repo_root.clone(), get_version(), base.ui); + let base = CommandBase::new( + args, + self.base.repo_root.clone(), + get_version(), + self.base.ui, + ); // When we rediscover, stop all current runs - for (_, run) in current_runs.drain() { + for (_, run) in self.current_runs.drain() { run.abort(); } // rebuild run struct - *run = RunBuilder::new(base.clone())? + self.run = RunBuilder::new(base.clone())? .hide_prelude() - .build(handler, telemetry.clone()) + .build(&self.handler, self.telemetry.clone()) .await?; - if run.has_persistent_tasks() { + if self.run.has_persistent_tasks() { // Abort old run - if let Some(run) = persistent_tasks_handle.take() { + if let Some(run) = self.persistent_tasks_handle.take() { run.abort(); } - let mut persistent_run = run.create_run_for_persistent_tasks(); + let mut persistent_run = self.run.create_run_for_persistent_tasks(); // If we have persistent tasks, we run them on a separate thread // since persistent tasks don't finish - *persistent_tasks_handle = + self.persistent_tasks_handle = Some(tokio::spawn(async move { persistent_run.run().await })); // But we still run the regular tasks blocking - let mut non_persistent_run = run.create_run_without_persistent_tasks(); + let mut non_persistent_run = self.run.create_run_without_persistent_tasks(); non_persistent_run.run().await?; } else { - run.run().await?; + self.run.run().await?; } } proto::package_change_event::Event::Error(proto::PackageChangeError { message }) => { From c706f99bbf3acf17b06c39844b85c1058f672a60 Mon Sep 17 00:00:00 2001 From: nicholaslyang Date: Wed, 24 Apr 2024 11:36:55 -0400 Subject: [PATCH 2/6] Split event handler into separate future from run processing --- crates/turborepo-lib/src/engine/mod.rs | 16 +-- crates/turborepo-lib/src/run/builder.rs | 12 +-- crates/turborepo-lib/src/run/watch.rs | 125 ++++++++++++++++-------- 3 files changed, 102 insertions(+), 51 deletions(-) diff --git a/crates/turborepo-lib/src/engine/mod.rs b/crates/turborepo-lib/src/engine/mod.rs index 40f6539685263..b63bc0bee5e94 100644 --- a/crates/turborepo-lib/src/engine/mod.rs +++ b/crates/turborepo-lib/src/engine/mod.rs @@ -142,11 +142,15 @@ impl Engine { /// Creates an instance of `Engine` that only contains tasks that depend on /// tasks from a given package. This is useful for watch mode, where we /// need to re-run only a portion of the task graph. - pub fn create_engine_for_subgraph(&self, changed_package: &PackageName) -> Engine { - let entrypoint_indices: &[petgraph::graph::NodeIndex] = self - .package_tasks - .get(changed_package) - .map_or(&[], |v| &v[..]); + pub fn create_engine_for_subgraph( + &self, + changed_packages: &HashSet, + ) -> Engine { + let entrypoint_indices: Vec<_> = changed_packages + .iter() + .flat_map(|pkg| self.package_tasks.get(pkg)) + .flatten() + .collect(); // We reverse the graph because we want the *dependents* of entrypoint tasks let mut reversed_graph = self.task_graph.clone(); @@ -175,7 +179,7 @@ impl Engine { .iter() .any(|idx| { node_distances - .get(&(*idx, node_idx)) + .get(&(**idx, node_idx)) .map_or(false, |dist| *dist != i32::MAX) }) .then_some(node.clone()) diff --git a/crates/turborepo-lib/src/run/builder.rs b/crates/turborepo-lib/src/run/builder.rs index 1caff844e8fa5..2eb49f335dde8 100644 --- a/crates/turborepo-lib/src/run/builder.rs +++ b/crates/turborepo-lib/src/run/builder.rs @@ -62,7 +62,7 @@ pub struct RunBuilder { // In watch mode, we can have a changed package that we want to serve as an entrypoint. // We will then prune away any tasks that do not depend on tasks inside // this package. - entrypoint_package: Option, + entrypoint_packages: Option>, should_print_prelude_override: Option, } @@ -114,13 +114,13 @@ impl RunBuilder { version, experimental_ui, analytics_sender: None, - entrypoint_package: None, + entrypoint_packages: None, should_print_prelude_override: None, }) } - pub fn with_entrypoint_package(mut self, entrypoint_package: PackageName) -> Self { - self.entrypoint_package = Some(entrypoint_package); + pub fn with_entrypoint_packages(mut self, entrypoint_packages: HashSet) -> Self { + self.entrypoint_packages = Some(entrypoint_packages); self } @@ -451,8 +451,8 @@ impl RunBuilder { // If we have an initial task, we prune out the engine to only // tasks that are reachable from that initial task. - if let Some(entrypoint_package) = &self.entrypoint_package { - engine = engine.create_engine_for_subgraph(entrypoint_package); + if let Some(entrypoint_packages) = &self.entrypoint_packages { + engine = engine.create_engine_for_subgraph(entrypoint_packages); } if !self.opts.run_opts.parallel { diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index ba992e11f852c..a65c605d25e93 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -1,9 +1,12 @@ -use std::collections::HashMap; +use std::{cell::RefCell, collections::HashSet, sync::Mutex}; use futures::StreamExt; use miette::{Diagnostic, SourceSpan}; use thiserror::Error; -use tokio::{select, task::JoinHandle}; +use tokio::{ + select, + task::{yield_now, JoinHandle}, +}; use turborepo_repository::package_graph::PackageName; use turborepo_telemetry::events::command::CommandEventBuilder; @@ -18,10 +21,29 @@ use crate::{ DaemonConnector, DaemonPaths, }; +enum ChangedPackages { + All, + Some(HashSet), +} + +impl Default for ChangedPackages { + fn default() -> Self { + ChangedPackages::Some(HashSet::new()) + } +} + +impl ChangedPackages { + pub fn is_empty(&self) -> bool { + match self { + ChangedPackages::All => false, + ChangedPackages::Some(pkgs) => pkgs.is_empty(), + } + } +} + pub struct WatchClient { run: Run, persistent_tasks_handle: Option>>, - current_runs: HashMap>>, connector: DaemonConnector, base: CommandBase, telemetry: CommandEventBuilder, @@ -99,7 +121,6 @@ impl WatchClient { handler, telemetry, persistent_tasks_handle: None, - current_runs: HashMap::new(), }) } @@ -113,15 +134,26 @@ impl WatchClient { let signal_subscriber = self.handler.subscribe().ok_or(Error::NoSignalHandler)?; + let changed_packages = Mutex::new(RefCell::new(ChangedPackages::default())); + let event_fut = async { while let Some(event) = events.next().await { let event = event?; - self.handle_change_event(event.event.unwrap()).await?; + Self::handle_change_event(&changed_packages, event.event.unwrap()).await?; } Err(Error::ConnectionClosed) }; + let run_fut = async { + loop { + if !changed_packages.lock().unwrap().borrow().is_empty() { + self.execute_run(&changed_packages).await?; + } + yield_now().await; + } + }; + select! { biased; _ = signal_subscriber.listen() => { @@ -132,11 +164,14 @@ impl WatchClient { result = event_fut => { result } + run_result = run_fut => { + run_result + } } } async fn handle_change_event( - &mut self, + changed_packages: &Mutex>, event: proto::package_change_event::Event, ) -> Result<(), Error> { // Should we recover here? @@ -145,10 +180,42 @@ impl WatchClient { package_name, }) => { let package_name = PackageName::from(package_name); - // If not in the filtered pkgs, ignore - if !self.run.filtered_pkgs.contains(&package_name) { - return Ok(()); + + match changed_packages.lock().unwrap().get_mut() { + ChangedPackages::All => { + // If we've already changed all packages, ignore + } + ChangedPackages::Some(ref mut pkgs) => { + pkgs.insert(package_name); + } } + } + proto::package_change_event::Event::RediscoverPackages(_) => { + *changed_packages.lock().unwrap().get_mut() = ChangedPackages::All; + } + proto::package_change_event::Event::Error(proto::PackageChangeError { message }) => { + return Err(DaemonError::Unavailable(message).into()); + } + } + + Ok(()) + } + + async fn execute_run( + &mut self, + changed_packages: &Mutex>, + ) -> Result { + let changed_packages = changed_packages.lock().unwrap().take(); + // Should we recover here? + match changed_packages { + ChangedPackages::Some(packages) => { + let packages = packages + .into_iter() + .filter(|pkg| { + // If not in the filtered pkgs, ignore + self.run.filtered_pkgs.contains(&pkg) + }) + .collect(); let mut args = self.base.args().clone(); args.command = args.command.map(|c| { @@ -173,28 +240,18 @@ impl WatchClient { self.base.ui, ); - // TODO: Add logic on when to abort vs wait - if let Some(run) = self.current_runs.remove(&package_name) { - run.abort(); - } - let signal_handler = self.handler.clone(); let telemetry = self.telemetry.clone(); - self.current_runs.insert( - package_name.clone(), - tokio::spawn(async move { - let mut run = RunBuilder::new(new_base)? - .with_entrypoint_package(package_name) - .hide_prelude() - .build(&signal_handler, telemetry) - .await?; - - run.run().await - }), - ); + let mut run = RunBuilder::new(new_base)? + .with_entrypoint_packages(packages) + .hide_prelude() + .build(&signal_handler, telemetry) + .await?; + + Ok(run.run().await?) } - proto::package_change_event::Event::RediscoverPackages(_) => { + ChangedPackages::All => { let mut args = self.base.args().clone(); args.command = args.command.map(|c| { if let Command::Watch(execution_args) = c { @@ -218,11 +275,6 @@ impl WatchClient { self.base.ui, ); - // When we rediscover, stop all current runs - for (_, run) in self.current_runs.drain() { - run.abort(); - } - // rebuild run struct self.run = RunBuilder::new(base.clone())? .hide_prelude() @@ -243,16 +295,11 @@ impl WatchClient { // But we still run the regular tasks blocking let mut non_persistent_run = self.run.create_run_without_persistent_tasks(); - non_persistent_run.run().await?; + Ok(non_persistent_run.run().await?) } else { - self.run.run().await?; + Ok(self.run.run().await?) } } - proto::package_change_event::Event::Error(proto::PackageChangeError { message }) => { - return Err(DaemonError::Unavailable(message).into()); - } } - - Ok(()) } } From a1db1ea92db48206fc278bade16bacefebeae41c Mon Sep 17 00:00:00 2001 From: nicholaslyang Date: Wed, 24 Apr 2024 12:15:08 -0400 Subject: [PATCH 3/6] Increasing buffer for package change events and making error response a little more robust --- crates/turborepo-lib/src/daemon/server.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/crates/turborepo-lib/src/daemon/server.rs b/crates/turborepo-lib/src/daemon/server.rs index 5d7cf52c9bb31..c8e6891a7192e 100644 --- a/crates/turborepo-lib/src/daemon/server.rs +++ b/crates/turborepo-lib/src/daemon/server.rs @@ -18,7 +18,7 @@ use semver::Version; use thiserror::Error; use tokio::{ select, - sync::{mpsc, oneshot}, + sync::{broadcast::error::RecvError, mpsc, oneshot}, task::JoinHandle, }; use tokio_stream::wrappers::ReceiverStream; @@ -594,7 +594,8 @@ impl proto::turbod_server::Turbod for TurboGrpcServiceInner { .package_changes_watcher .package_changes() .await; - let (tx, rx) = mpsc::channel(1); + + let (tx, rx) = mpsc::channel(1024); tx.send(Ok(proto::PackageChangeEvent { event: Some(proto::package_change_event::Event::RediscoverPackages( @@ -607,6 +608,14 @@ impl proto::turbod_server::Turbod for TurboGrpcServiceInner { tokio::spawn(async move { loop { let event = match package_changes_rx.recv().await { + Err(RecvError::Lagged(_)) => { + warn!("package changes stream lagged"); + proto::PackageChangeEvent { + event: Some(proto::package_change_event::Event::RediscoverPackages( + proto::RediscoverPackages {}, + )), + } + } Err(err) => proto::PackageChangeEvent { event: Some(proto::package_change_event::Event::Error( proto::PackageChangeError { From 11c7d3a8f62e57e5f817cf857f1609574ffbd177 Mon Sep 17 00:00:00 2001 From: nicholaslyang Date: Wed, 24 Apr 2024 15:11:55 -0400 Subject: [PATCH 4/6] Fix tests --- crates/turborepo-lib/src/engine/mod.rs | 3 ++- crates/turborepo-lib/src/run/watch.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/turborepo-lib/src/engine/mod.rs b/crates/turborepo-lib/src/engine/mod.rs index b63bc0bee5e94..d6e577d952504 100644 --- a/crates/turborepo-lib/src/engine/mod.rs +++ b/crates/turborepo-lib/src/engine/mod.rs @@ -768,7 +768,8 @@ mod test { engine.task_graph.add_edge(b_build_idx, a_build_idx, ()); let engine = engine.seal(); - let subgraph = engine.create_engine_for_subgraph(&PackageName::from("a")); + let subgraph = + engine.create_engine_for_subgraph(&[PackageName::from("a")].into_iter().collect()); // Verify that the subgraph only contains tasks from package `a` and the `build` // task from package `b` diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index a65c605d25e93..6327631bd682b 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -213,7 +213,7 @@ impl WatchClient { .into_iter() .filter(|pkg| { // If not in the filtered pkgs, ignore - self.run.filtered_pkgs.contains(&pkg) + self.run.filtered_pkgs.contains(pkg) }) .collect(); From 13d72504fe63512a207211f498b9a34f19ad28b1 Mon Sep 17 00:00:00 2001 From: nicholaslyang Date: Wed, 24 Apr 2024 17:18:19 -0400 Subject: [PATCH 5/6] Responded to feedback, switched to watch channel --- crates/turborepo-lib/src/run/watch.rs | 44 +++++++++++++++------------ 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index 6327631bd682b..8b44972a5327f 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -1,10 +1,11 @@ -use std::{cell::RefCell, collections::HashSet, sync::Mutex}; +use std::collections::HashSet; use futures::StreamExt; use miette::{Diagnostic, SourceSpan}; use thiserror::Error; use tokio::{ select, + sync::watch, task::{yield_now, JoinHandle}, }; use turborepo_repository::package_graph::PackageName; @@ -21,7 +22,7 @@ use crate::{ DaemonConnector, DaemonPaths, }; -enum ChangedPackages { +pub enum ChangedPackages { All, Some(HashSet), } @@ -87,6 +88,10 @@ pub enum Error { SignalInterrupt, #[error("package change error")] PackageChange(#[from] tonic::Status), + #[error("changed packages channel closed, cannot receive new changes")] + ChangedPackagesRecv(#[from] watch::error::RecvError), + #[error("changed packages channel closed, cannot send new changes")] + ChangedPackagesSend(#[from] watch::error::SendError), } impl WatchClient { @@ -134,12 +139,12 @@ impl WatchClient { let signal_subscriber = self.handler.subscribe().ok_or(Error::NoSignalHandler)?; - let changed_packages = Mutex::new(RefCell::new(ChangedPackages::default())); + let (changed_pkgs_tx, mut changed_pkgs_rx) = watch::channel(ChangedPackages::default()); let event_fut = async { while let Some(event) = events.next().await { let event = event?; - Self::handle_change_event(&changed_packages, event.event.unwrap()).await?; + Self::handle_change_event(&changed_pkgs_tx, event.event.unwrap()).await?; } Err(Error::ConnectionClosed) @@ -147,9 +152,11 @@ impl WatchClient { let run_fut = async { loop { - if !changed_packages.lock().unwrap().borrow().is_empty() { - self.execute_run(&changed_packages).await?; - } + changed_pkgs_rx.changed().await?; + let changed_pkgs = changed_pkgs_rx.borrow_and_update(); + + self.execute_run(&changed_pkgs).await?; + yield_now().await; } }; @@ -171,7 +178,7 @@ impl WatchClient { } async fn handle_change_event( - changed_packages: &Mutex>, + changed_packages_tx: &watch::Sender, event: proto::package_change_event::Event, ) -> Result<(), Error> { // Should we recover here? @@ -181,17 +188,17 @@ impl WatchClient { }) => { let package_name = PackageName::from(package_name); - match changed_packages.lock().unwrap().get_mut() { - ChangedPackages::All => { - // If we've already changed all packages, ignore - } + changed_packages_tx.send_if_modified(|changed_pkgs| match changed_pkgs { + ChangedPackages::All => false, ChangedPackages::Some(ref mut pkgs) => { pkgs.insert(package_name); + + true } - } + }); } proto::package_change_event::Event::RediscoverPackages(_) => { - *changed_packages.lock().unwrap().get_mut() = ChangedPackages::All; + changed_packages_tx.send(ChangedPackages::All)?; } proto::package_change_event::Event::Error(proto::PackageChangeError { message }) => { return Err(DaemonError::Unavailable(message).into()); @@ -201,20 +208,17 @@ impl WatchClient { Ok(()) } - async fn execute_run( - &mut self, - changed_packages: &Mutex>, - ) -> Result { - let changed_packages = changed_packages.lock().unwrap().take(); + async fn execute_run(&mut self, changed_packages: &ChangedPackages) -> Result { // Should we recover here? match changed_packages { ChangedPackages::Some(packages) => { let packages = packages - .into_iter() + .iter() .filter(|pkg| { // If not in the filtered pkgs, ignore self.run.filtered_pkgs.contains(pkg) }) + .cloned() .collect(); let mut args = self.base.args().clone(); From 78f33a5c9cc96a35f63b5584c939f07e511b40c6 Mon Sep 17 00:00:00 2001 From: nicholaslyang Date: Thu, 25 Apr 2024 11:07:01 -0400 Subject: [PATCH 6/6] PR feedback --- crates/turborepo-lib/src/run/watch.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index 8b44972a5327f..cf024ad53c253 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -22,6 +22,7 @@ use crate::{ DaemonConnector, DaemonPaths, }; +#[derive(Clone)] pub enum ChangedPackages { All, Some(HashSet), @@ -153,11 +154,9 @@ impl WatchClient { let run_fut = async { loop { changed_pkgs_rx.changed().await?; - let changed_pkgs = changed_pkgs_rx.borrow_and_update(); + let changed_pkgs = { changed_pkgs_rx.borrow_and_update().clone() }; - self.execute_run(&changed_pkgs).await?; - - yield_now().await; + self.execute_run(changed_pkgs).await?; } }; @@ -208,17 +207,16 @@ impl WatchClient { Ok(()) } - async fn execute_run(&mut self, changed_packages: &ChangedPackages) -> Result { + async fn execute_run(&mut self, changed_packages: ChangedPackages) -> Result { // Should we recover here? match changed_packages { ChangedPackages::Some(packages) => { let packages = packages - .iter() + .into_iter() .filter(|pkg| { // If not in the filtered pkgs, ignore self.run.filtered_pkgs.contains(pkg) }) - .cloned() .collect(); let mut args = self.base.args().clone();