Skip to content

Commit

Permalink
Route through existing wait_for_filewatching call
Browse files Browse the repository at this point in the history
  • Loading branch information
Greg Soltis authored and Greg Soltis committed Feb 2, 2024
1 parent 3518041 commit df76d38
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 90 deletions.
2 changes: 1 addition & 1 deletion crates/turborepo-filewatch/src/package_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ mod test {
let (tx, rx) = broadcast::channel(10);
let (_exit_tx, exit_rx) = tokio::sync::oneshot::channel();

let root = AbsoluteSystemPathBuf::new(tmp.path().to_string_lossy()).unwrap();
let root: AbsoluteSystemPathBuf = tmp.path().try_into().unwrap();
let manager = PackageManager::Yarn;

let package_data = vec![
Expand Down
74 changes: 30 additions & 44 deletions crates/turborepo-lib/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use semver::Version;
use thiserror::Error;
use tokio::{
select,
sync::{mpsc, oneshot, watch, Mutex as AsyncMutex},
sync::{mpsc, oneshot, watch},
};
use tonic::transport::{NamedService, Server};
use tower::ServiceBuilder;
Expand All @@ -39,18 +39,15 @@ use turborepo_filewatch::{
FileSystemWatcher, WatchError,
};
use turborepo_repository::discovery::{
LocalPackageDiscoveryBuilder, PackageDiscovery, PackageDiscoveryBuilder,
DiscoveryResponse, LocalPackageDiscoveryBuilder, PackageDiscovery, PackageDiscoveryBuilder,
};

use super::{
bump_timeout::BumpTimeout,
endpoint::SocketOpenError,
proto::{self},
};
use crate::{
daemon::{bump_timeout_layer::BumpTimeoutLayer, endpoint::listen_socket},
run::package_discovery::WatchingPackageDiscovery,
};
use crate::daemon::{bump_timeout_layer::BumpTimeoutLayer, endpoint::listen_socket};

#[derive(Debug)]
#[allow(dead_code)]
Expand Down Expand Up @@ -123,7 +120,7 @@ async fn start_filewatching<PD: PackageDiscovery + Send + 'static>(
/// Timeout for every RPC the server handles
const REQUEST_TIMEOUT: Duration = Duration::from_millis(100);

pub struct TurboGrpcService<S, PDA, PDB> {
pub struct TurboGrpcService<S, PDB> {
watcher_tx: watch::Sender<Option<Arc<FileWatching>>>,
watcher_rx: watch::Receiver<Option<Arc<FileWatching>>>,
repo_root: AbsoluteSystemPathBuf,
Expand All @@ -132,11 +129,10 @@ pub struct TurboGrpcService<S, PDA, PDB> {
timeout: Duration,
external_shutdown: S,

package_discovery: PDA,
package_discovery_backup: PDB,
}

impl<S> TurboGrpcService<S, WatchingPackageDiscovery, LocalPackageDiscoveryBuilder>
impl<S> TurboGrpcService<S, LocalPackageDiscoveryBuilder>
where
S: Future<Output = CloseReason>,
{
Expand All @@ -155,7 +151,6 @@ where
) -> Self {
let (watcher_tx, watcher_rx) = watch::channel(None);

let package_discovery = WatchingPackageDiscovery::new(watcher_rx.clone());
let package_discovery_backup =
LocalPackageDiscoveryBuilder::new(repo_root.clone(), None, None);

Expand All @@ -170,16 +165,14 @@ where
log_file,
timeout,
external_shutdown,
package_discovery,
package_discovery_backup,
}
}
}

impl<S, PDA, PDB> TurboGrpcService<S, PDA, PDB>
impl<S, PDB> TurboGrpcService<S, PDB>
where
S: Future<Output = CloseReason>,
PDA: PackageDiscovery + Send + 'static,
PDB: PackageDiscoveryBuilder,
PDB::Output: PackageDiscovery + Send + 'static,
{
Expand All @@ -188,9 +181,8 @@ where
pub fn with_package_discovery_backup<PDB2: PackageDiscoveryBuilder>(
self,
package_discovery_backup: PDB2,
) -> TurboGrpcService<S, PDA, PDB2> {
) -> TurboGrpcService<S, PDB2> {
TurboGrpcService {
package_discovery: self.package_discovery,
daemon_root: self.daemon_root,
external_shutdown: self.external_shutdown,
log_file: self.log_file,
Expand All @@ -211,7 +203,6 @@ where
log_file,
repo_root,
timeout,
package_discovery,
package_discovery_backup,
} = self;

Expand Down Expand Up @@ -270,7 +261,6 @@ where
// so we use a private struct with just the pieces of state needed to handle
// RPCs.
let service = TurboGrpcServiceInner {
package_discovery: AsyncMutex::new(package_discovery),
shutdown: trigger_shutdown,
watcher_rx,
times_saved: Arc::new(Mutex::new(HashMap::new())),
Expand Down Expand Up @@ -313,17 +303,15 @@ where
}
}

struct TurboGrpcServiceInner<PD> {
//shutdown: Arc<Mutex<Option<oneshot::Sender<()>>>>,
struct TurboGrpcServiceInner {
shutdown: mpsc::Sender<()>,
watcher_rx: watch::Receiver<Option<Arc<FileWatching>>>,
times_saved: Arc<Mutex<HashMap<String, u64>>>,
start_time: Instant,
log_file: AbsoluteSystemPathBuf,
package_discovery: AsyncMutex<PD>,
}

impl<PD> TurboGrpcServiceInner<PD> {
impl TurboGrpcServiceInner {
async fn trigger_shutdown(&self) {
info!("triggering shutdown");
let _ = self.shutdown.send(()).await;
Expand Down Expand Up @@ -364,6 +352,14 @@ impl<PD> TurboGrpcServiceInner<PD> {
let changed_globs = fw.glob_watcher.get_changed_globs(hash, candidates).await?;
Ok((changed_globs, time_saved))
}

async fn discover_packages(&self) -> Result<DiscoveryResponse, RpcError> {
let fw = self.wait_for_filewatching().await?;
Ok(DiscoveryResponse {
workspaces: fw.package_watcher.get_package_data().await,
package_manager: fw.package_watcher.get_package_manager().await,
})
}
}

async fn wait_for_filewatching(
Expand Down Expand Up @@ -422,10 +418,7 @@ async fn watch_root(
}

#[tonic::async_trait]
impl<PD> proto::turbod_server::Turbod for TurboGrpcServiceInner<PD>
where
PD: PackageDiscovery + Send + 'static,
{
impl proto::turbod_server::Turbod for TurboGrpcServiceInner {
async fn hello(
&self,
request: tonic::Request<proto::HelloRequest>,
Expand Down Expand Up @@ -513,25 +506,18 @@ where
&self,
_request: tonic::Request<proto::DiscoverPackagesRequest>,
) -> Result<tonic::Response<proto::DiscoverPackagesResponse>, tonic::Status> {
self.package_discovery
.lock()
.await
.discover_packages()
.await
.map(|packages| {
tonic::Response::new(proto::DiscoverPackagesResponse {
package_files: packages
.workspaces
.into_iter()
.map(|d| proto::PackageFiles {
package_json: d.package_json.to_string(),
turbo_json: d.turbo_json.map(|t| t.to_string()),
})
.collect(),
package_manager: proto::PackageManager::from(packages.package_manager).into(),
let resp = self.discover_packages().await?;
Ok(tonic::Response::new(proto::DiscoverPackagesResponse {
package_files: resp
.workspaces
.into_iter()
.map(|d| proto::PackageFiles {
package_json: d.package_json.to_string(),
turbo_json: d.turbo_json.map(|t| t.to_string()),
})
})
.map_err(|e| tonic::Status::internal(format!("{}", e)))
.collect(),
package_manager: proto::PackageManager::from(resp.package_manager).into(),
}))
}
}

Expand All @@ -558,7 +544,7 @@ fn compare_versions(client: Version, server: Version, constraint: proto::Version
}
}

impl<T> NamedService for TurboGrpcServiceInner<T> {
impl NamedService for TurboGrpcServiceInner {
const NAME: &'static str = "turborepo.Daemon";
}

Expand Down
45 changes: 1 addition & 44 deletions crates/turborepo-lib/src/run/package_discovery/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use std::sync::Arc;

use tokio::sync::watch::Receiver;
use turbopath::AbsoluteSystemPathBuf;
use turborepo_repository::discovery::{DiscoveryResponse, Error, PackageDiscovery, WorkspaceData};

use crate::daemon::{proto::PackageManager, DaemonClient, FileWatching};
use crate::daemon::{proto::PackageManager, DaemonClient};

#[derive(Debug)]
pub struct DaemonPackageDiscovery<'a, C: Clone> {
Expand Down Expand Up @@ -44,43 +41,3 @@ impl<'a, C: Clone + Send> PackageDiscovery for DaemonPackageDiscovery<'a, C> {
})
}
}

/// A package discovery strategy that watches the file system for changes. Basic
/// idea:
/// - Set up a watcher on file changes on the relevant workspace file for the
/// package manager
/// - When the workspace globs change, re-discover the workspace
/// - When a package.json changes, re-discover the workspace
/// - Keep an in-memory cache of the workspace
pub struct WatchingPackageDiscovery {
/// file watching may not be ready yet so we store a watcher
/// through which we can get the file watching stack
watcher: Receiver<Option<Arc<crate::daemon::FileWatching>>>,
}

impl WatchingPackageDiscovery {
pub fn new(watcher: Receiver<Option<Arc<FileWatching>>>) -> Self {
Self { watcher }
}
}

impl PackageDiscovery for WatchingPackageDiscovery {
async fn discover_packages(&mut self) -> Result<DiscoveryResponse, Error> {
tracing::debug!("discovering packages using watcher implementation");

// need to clone and drop the Ref before we can await
let watcher = {
let watcher = self
.watcher
.wait_for(|opt| opt.is_some())
.await
.map_err(|e| Error::Failed(Box::new(e)))?;
watcher.as_ref().expect("guaranteed some above").clone()
};

Ok(DiscoveryResponse {
workspaces: watcher.package_watcher.get_package_data().await,
package_manager: watcher.package_watcher.get_package_manager().await,
})
}
}
1 change: 0 additions & 1 deletion crates/turborepo-repository/src/package_graph/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ impl<'a, P> PackageGraphBuilder<'a, P> {
/// Set the package discovery strategy to use. Note that whatever strategy
/// selected here will be wrapped in a `CachingPackageDiscovery` to
/// prevent unnecessary work during building.
#[allow(dead_code)]
pub fn with_package_discovery<P2: PackageDiscoveryBuilder>(
self,
discovery: P2,
Expand Down

0 comments on commit df76d38

Please sign in to comment.