Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(turborepo): Watch mode not responding to changes #8057

Merged
merged 5 commits into from
Apr 30, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 23 additions & 26 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
use std::collections::HashSet;
use std::{cell::RefCell, collections::HashSet};

use futures::StreamExt;
use miette::{Diagnostic, SourceSpan};
use thiserror::Error;
use tokio::{
select,
sync::watch,
task::{yield_now, JoinHandle},
};
use tokio::{select, sync::Mutex, task::JoinHandle};
use turborepo_repository::package_graph::PackageName;
use turborepo_telemetry::events::command::CommandEventBuilder;

Expand All @@ -22,8 +18,7 @@ use crate::{
DaemonConnector, DaemonPaths,
};

#[derive(Clone)]
pub enum ChangedPackages {
enum ChangedPackages {
All,
Some(HashSet<PackageName>),
}
Expand Down Expand Up @@ -89,10 +84,6 @@ pub enum Error {
SignalInterrupt,
#[error("package change error")]
PackageChange(#[from] tonic::Status),
#[error("changed packages channel closed, cannot receive new changes")]
ChangedPackagesRecv(#[from] watch::error::RecvError),
#[error("changed packages channel closed, cannot send new changes")]
ChangedPackagesSend(#[from] watch::error::SendError<ChangedPackages>),
}

impl WatchClient {
Expand Down Expand Up @@ -140,23 +131,25 @@ impl WatchClient {

let signal_subscriber = self.handler.subscribe().ok_or(Error::NoSignalHandler)?;

let (changed_pkgs_tx, mut changed_pkgs_rx) = watch::channel(ChangedPackages::default());
// We explicitly use a tokio::sync::Mutex here to avoid deadlocks.
// If we used a std::sync::Mutex, we could deadlock by spinning the lock
// and not yielding back to the tokio runtime.
let changed_packages = Mutex::new(RefCell::new(ChangedPackages::default()));

let event_fut = async {
while let Some(event) = events.next().await {
let event = event?;
Self::handle_change_event(&changed_pkgs_tx, event.event.unwrap()).await?;
Self::handle_change_event(&changed_packages, event.event.unwrap()).await?;
}

Err(Error::ConnectionClosed)
};

let run_fut = async {
loop {
changed_pkgs_rx.changed().await?;
let changed_pkgs = { changed_pkgs_rx.borrow_and_update().clone() };

self.execute_run(changed_pkgs).await?;
if !changed_packages.lock().await.borrow().is_empty() {
self.execute_run(&changed_packages).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this spin? If changed_packages is in fact empty, won't this loop back around? It will yield the event loop, but the task will always be runnable as long as nothing else is holding the lock...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, if changed packages is empty, yes it’ll loop around, but on the next time it tries to lock, it’ll yield first to let the other task run and add to the changed packages.

Could you clarify what you mean by the task being runnable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will yield, but the tokio scheduler will always see this loop as ready to run, even if there's no work to be done. If instead we use a channel or some other notification mechanism, tokio can park this task until there is work to be done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm okay. I'll add a Notify. I don't think we can use channels here because the communication is not one way. The run_fut needs to indicate to the event_fut that it has taken the ChangedPackages

}
NicholasLYang marked this conversation as resolved.
Show resolved Hide resolved
}
};

Expand All @@ -177,7 +170,7 @@ impl WatchClient {
}

async fn handle_change_event(
changed_packages_tx: &watch::Sender<ChangedPackages>,
changed_packages: &Mutex<RefCell<ChangedPackages>>,
event: proto::package_change_event::Event,
) -> Result<(), Error> {
// Should we recover here?
Expand All @@ -187,17 +180,17 @@ impl WatchClient {
}) => {
let package_name = PackageName::from(package_name);

changed_packages_tx.send_if_modified(|changed_pkgs| match changed_pkgs {
ChangedPackages::All => false,
match changed_packages.lock().await.get_mut() {
ChangedPackages::All => {
// If we've already changed all packages, ignore
}
ChangedPackages::Some(ref mut pkgs) => {
pkgs.insert(package_name);

true
}
});
}
}
proto::package_change_event::Event::RediscoverPackages(_) => {
changed_packages_tx.send(ChangedPackages::All)?;
*changed_packages.lock().await.get_mut() = ChangedPackages::All;
}
proto::package_change_event::Event::Error(proto::PackageChangeError { message }) => {
return Err(DaemonError::Unavailable(message).into());
Expand All @@ -207,7 +200,11 @@ impl WatchClient {
Ok(())
}

async fn execute_run(&mut self, changed_packages: ChangedPackages) -> Result<i32, Error> {
async fn execute_run(
&mut self,
changed_packages: &Mutex<RefCell<ChangedPackages>>,
) -> Result<i32, Error> {
let changed_packages = changed_packages.lock().await.take();
// Should we recover here?
match changed_packages {
ChangedPackages::Some(packages) => {
Expand Down
Loading