Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Turborepo): Route through existing wait_for_filewatching call #7237

Merged
merged 1 commit into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/turborepo-filewatch/src/package_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ mod test {
let (tx, rx) = broadcast::channel(10);
let (_exit_tx, exit_rx) = tokio::sync::oneshot::channel();

let root = AbsoluteSystemPathBuf::new(tmp.path().to_string_lossy()).unwrap();
let root: AbsoluteSystemPathBuf = tmp.path().try_into().unwrap();
let manager = PackageManager::Yarn;

let package_data = vec![
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod server;

pub use client::{DaemonClient, DaemonError};
pub use connector::{DaemonConnector, DaemonConnectorError};
pub use server::{CloseReason, FileWatching, TurboGrpcService};
pub use server::{CloseReason, TurboGrpcService};

pub(crate) mod proto {

Expand Down
74 changes: 30 additions & 44 deletions crates/turborepo-lib/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use semver::Version;
use thiserror::Error;
use tokio::{
select,
sync::{mpsc, oneshot, watch, Mutex as AsyncMutex},
sync::{mpsc, oneshot, watch},
};
use tonic::transport::{NamedService, Server};
use tower::ServiceBuilder;
Expand All @@ -39,18 +39,15 @@ use turborepo_filewatch::{
FileSystemWatcher, WatchError,
};
use turborepo_repository::discovery::{
LocalPackageDiscoveryBuilder, PackageDiscovery, PackageDiscoveryBuilder,
DiscoveryResponse, LocalPackageDiscoveryBuilder, PackageDiscovery, PackageDiscoveryBuilder,
};

use super::{
bump_timeout::BumpTimeout,
endpoint::SocketOpenError,
proto::{self},
};
use crate::{
daemon::{bump_timeout_layer::BumpTimeoutLayer, endpoint::listen_socket},
run::package_discovery::WatchingPackageDiscovery,
};
use crate::daemon::{bump_timeout_layer::BumpTimeoutLayer, endpoint::listen_socket};

#[derive(Debug)]
#[allow(dead_code)]
Expand Down Expand Up @@ -123,7 +120,7 @@ async fn start_filewatching<PD: PackageDiscovery + Send + 'static>(
/// Timeout for every RPC the server handles
const REQUEST_TIMEOUT: Duration = Duration::from_millis(100);

pub struct TurboGrpcService<S, PDA, PDB> {
pub struct TurboGrpcService<S, PDB> {
watcher_tx: watch::Sender<Option<Arc<FileWatching>>>,
watcher_rx: watch::Receiver<Option<Arc<FileWatching>>>,
repo_root: AbsoluteSystemPathBuf,
Expand All @@ -132,11 +129,10 @@ pub struct TurboGrpcService<S, PDA, PDB> {
timeout: Duration,
external_shutdown: S,

package_discovery: PDA,
package_discovery_backup: PDB,
}

impl<S> TurboGrpcService<S, WatchingPackageDiscovery, LocalPackageDiscoveryBuilder>
impl<S> TurboGrpcService<S, LocalPackageDiscoveryBuilder>
where
S: Future<Output = CloseReason>,
{
Expand All @@ -155,7 +151,6 @@ where
) -> Self {
let (watcher_tx, watcher_rx) = watch::channel(None);

let package_discovery = WatchingPackageDiscovery::new(watcher_rx.clone());
let package_discovery_backup =
LocalPackageDiscoveryBuilder::new(repo_root.clone(), None, None);

Expand All @@ -170,16 +165,14 @@ where
log_file,
timeout,
external_shutdown,
package_discovery,
package_discovery_backup,
}
}
}

impl<S, PDA, PDB> TurboGrpcService<S, PDA, PDB>
impl<S, PDB> TurboGrpcService<S, PDB>
where
S: Future<Output = CloseReason>,
PDA: PackageDiscovery + Send + 'static,
PDB: PackageDiscoveryBuilder,
PDB::Output: PackageDiscovery + Send + 'static,
{
Expand All @@ -188,9 +181,8 @@ where
pub fn with_package_discovery_backup<PDB2: PackageDiscoveryBuilder>(
self,
package_discovery_backup: PDB2,
) -> TurboGrpcService<S, PDA, PDB2> {
) -> TurboGrpcService<S, PDB2> {
TurboGrpcService {
package_discovery: self.package_discovery,
daemon_root: self.daemon_root,
external_shutdown: self.external_shutdown,
log_file: self.log_file,
Expand All @@ -211,7 +203,6 @@ where
log_file,
repo_root,
timeout,
package_discovery,
package_discovery_backup,
} = self;

Expand Down Expand Up @@ -270,7 +261,6 @@ where
// so we use a private struct with just the pieces of state needed to handle
// RPCs.
let service = TurboGrpcServiceInner {
package_discovery: AsyncMutex::new(package_discovery),
shutdown: trigger_shutdown,
watcher_rx,
times_saved: Arc::new(Mutex::new(HashMap::new())),
Expand Down Expand Up @@ -313,17 +303,15 @@ where
}
}

struct TurboGrpcServiceInner<PD> {
//shutdown: Arc<Mutex<Option<oneshot::Sender<()>>>>,
struct TurboGrpcServiceInner {
shutdown: mpsc::Sender<()>,
watcher_rx: watch::Receiver<Option<Arc<FileWatching>>>,
times_saved: Arc<Mutex<HashMap<String, u64>>>,
start_time: Instant,
log_file: AbsoluteSystemPathBuf,
package_discovery: AsyncMutex<PD>,
}

impl<PD> TurboGrpcServiceInner<PD> {
impl TurboGrpcServiceInner {
async fn trigger_shutdown(&self) {
info!("triggering shutdown");
let _ = self.shutdown.send(()).await;
Expand Down Expand Up @@ -364,6 +352,14 @@ impl<PD> TurboGrpcServiceInner<PD> {
let changed_globs = fw.glob_watcher.get_changed_globs(hash, candidates).await?;
Ok((changed_globs, time_saved))
}

async fn discover_packages(&self) -> Result<DiscoveryResponse, RpcError> {
let fw = self.wait_for_filewatching().await?;
Ok(DiscoveryResponse {
workspaces: fw.package_watcher.get_package_data().await,
package_manager: fw.package_watcher.get_package_manager().await,
})
}
}

async fn wait_for_filewatching(
Expand Down Expand Up @@ -422,10 +418,7 @@ async fn watch_root(
}

#[tonic::async_trait]
impl<PD> proto::turbod_server::Turbod for TurboGrpcServiceInner<PD>
where
PD: PackageDiscovery + Send + 'static,
{
impl proto::turbod_server::Turbod for TurboGrpcServiceInner {
async fn hello(
&self,
request: tonic::Request<proto::HelloRequest>,
Expand Down Expand Up @@ -513,25 +506,18 @@ where
&self,
_request: tonic::Request<proto::DiscoverPackagesRequest>,
) -> Result<tonic::Response<proto::DiscoverPackagesResponse>, tonic::Status> {
self.package_discovery
.lock()
.await
.discover_packages()
.await
.map(|packages| {
tonic::Response::new(proto::DiscoverPackagesResponse {
package_files: packages
.workspaces
.into_iter()
.map(|d| proto::PackageFiles {
package_json: d.package_json.to_string(),
turbo_json: d.turbo_json.map(|t| t.to_string()),
})
.collect(),
package_manager: proto::PackageManager::from(packages.package_manager).into(),
let resp = self.discover_packages().await?;
Ok(tonic::Response::new(proto::DiscoverPackagesResponse {
package_files: resp
.workspaces
.into_iter()
.map(|d| proto::PackageFiles {
package_json: d.package_json.to_string(),
turbo_json: d.turbo_json.map(|t| t.to_string()),
})
})
.map_err(|e| tonic::Status::internal(format!("{}", e)))
.collect(),
package_manager: proto::PackageManager::from(resp.package_manager).into(),
}))
}
}

Expand All @@ -558,7 +544,7 @@ fn compare_versions(client: Version, server: Version, constraint: proto::Version
}
}

impl<T> NamedService for TurboGrpcServiceInner<T> {
impl NamedService for TurboGrpcServiceInner {
const NAME: &'static str = "turborepo.Daemon";
}

Expand Down
45 changes: 1 addition & 44 deletions crates/turborepo-lib/src/run/package_discovery/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use std::sync::Arc;

use tokio::sync::watch::Receiver;
use turbopath::AbsoluteSystemPathBuf;
use turborepo_repository::discovery::{DiscoveryResponse, Error, PackageDiscovery, WorkspaceData};

use crate::daemon::{proto::PackageManager, DaemonClient, FileWatching};
use crate::daemon::{proto::PackageManager, DaemonClient};

#[derive(Debug)]
pub struct DaemonPackageDiscovery<'a, C: Clone> {
Expand Down Expand Up @@ -44,43 +41,3 @@ impl<'a, C: Clone + Send> PackageDiscovery for DaemonPackageDiscovery<'a, C> {
})
}
}

/// A package discovery strategy that watches the file system for changes. Basic
/// idea:
/// - Set up a watcher on file changes on the relevant workspace file for the
/// package manager
/// - When the workspace globs change, re-discover the workspace
/// - When a package.json changes, re-discover the workspace
/// - Keep an in-memory cache of the workspace
pub struct WatchingPackageDiscovery {
/// file watching may not be ready yet so we store a watcher
/// through which we can get the file watching stack
watcher: Receiver<Option<Arc<crate::daemon::FileWatching>>>,
}

impl WatchingPackageDiscovery {
pub fn new(watcher: Receiver<Option<Arc<FileWatching>>>) -> Self {
Self { watcher }
}
}

impl PackageDiscovery for WatchingPackageDiscovery {
async fn discover_packages(&mut self) -> Result<DiscoveryResponse, Error> {
tracing::debug!("discovering packages using watcher implementation");

// need to clone and drop the Ref before we can await
let watcher = {
let watcher = self
.watcher
.wait_for(|opt| opt.is_some())
.await
.map_err(|e| Error::Failed(Box::new(e)))?;
watcher.as_ref().expect("guaranteed some above").clone()
};

Ok(DiscoveryResponse {
workspaces: watcher.package_watcher.get_package_data().await,
package_manager: watcher.package_watcher.get_package_manager().await,
})
}
}
1 change: 0 additions & 1 deletion crates/turborepo-repository/src/package_graph/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ impl<'a, P> PackageGraphBuilder<'a, P> {
/// Set the package discovery strategy to use. Note that whatever strategy
/// selected here will be wrapped in a `CachingPackageDiscovery` to
/// prevent unnecessary work during building.
#[allow(dead_code)]
pub fn with_package_discovery<P2: PackageDiscoveryBuilder>(
self,
discovery: P2,
Expand Down
Loading