Skip to content

Commit

Permalink
feat(Turborepo): wire file hashing to grpc server
Browse files Browse the repository at this point in the history
  • Loading branch information
Greg Soltis authored and Greg Soltis committed Apr 15, 2024
1 parent 4413a35 commit 751442f
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 6 deletions.
2 changes: 1 addition & 1 deletion crates/turborepo-filewatch/src/package_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl PackageWatcher {
})
}

pub(crate) fn watch_discovery(&self) -> watch::Receiver<Option<DiscoveryData>> {
pub fn watch_discovery(&self) -> watch::Receiver<Option<DiscoveryData>> {
self.package_discovery_lazy.watch()
}

Expand Down
13 changes: 13 additions & 0 deletions crates/turborepo-lib/src/daemon/proto/turbod.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ service Turbod {
rpc DiscoverPackagesBlocking (DiscoverPackagesRequest) returns (DiscoverPackagesResponse);

rpc PackageChanges (PackageChangesRequest) returns (stream PackageChangeEvent);

rpc GetFileHashes (GetFileHashesRequest) returns (GetFileHashesResponse);
}

message HelloRequest {
Expand Down Expand Up @@ -134,3 +136,14 @@ enum PackageManager {
Bun = 5;
Pnpm9 = 6;
}

message GetFileHashesRequest {
// AnchoredSystemPathBuf
string package_path = 1;
repeated string input_globs = 2;
}

message GetFileHashesResponse {
// RelativeUnixPathBuf -> Hash
map<string, string> file_hashes = 1;
}
46 changes: 41 additions & 5 deletions crates/turborepo-lib/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf};
use turborepo_filewatch::{
cookies::CookieWriter,
globwatcher::{Error as GlobWatcherError, GlobError, GlobSet, GlobWatcher},
hash_watcher::HashWatcher,
package_watcher::{PackageWatchError, PackageWatcher},
FileSystemWatcher, WatchError,
};
use turborepo_repository::package_manager;
use turborepo_scm::SCM;

use super::{bump_timeout::BumpTimeout, endpoint::SocketOpenError, proto};
use crate::{
Expand Down Expand Up @@ -63,6 +65,7 @@ pub struct FileWatching {
pub glob_watcher: Arc<GlobWatcher>,
pub package_watcher: Arc<PackageWatcher>,
pub package_changes_watcher: Arc<PackageChangesWatcher>,
pub hash_watcher: Arc<HashWatcher>,
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -115,6 +118,13 @@ impl FileWatching {
PackageWatcher::new(repo_root.clone(), recv.clone(), cookie_writer)
.map_err(|e| WatchError::Setup(format!("{:?}", e)))?,
);
let scm = SCM::new(&repo_root);
let hash_watcher = Arc::new(HashWatcher::new(
repo_root,
package_watcher.watch_discovery(),
recv.clone(),
scm,
));

let package_changes_watcher =
Arc::new(PackageChangesWatcher::new(repo_root.clone(), recv.clone()));
Expand All @@ -124,6 +134,7 @@ impl FileWatching {
glob_watcher,
package_watcher,
package_changes_watcher,
hash_watcher,
})
}
}
Expand Down Expand Up @@ -164,12 +175,7 @@ where
external_shutdown,
}
}
}

impl<S> TurboGrpcService<S>
where
S: Future<Output = CloseReason>,
{
pub async fn serve(self) -> Result<CloseReason, package_manager::Error> {
let Self {
external_shutdown,
Expand Down Expand Up @@ -338,6 +344,23 @@ impl TurboGrpcServiceInner {
.await?;
Ok((changed_globs, time_saved))
}

async fn get_file_hashes(
&self,
package_path: String,
inputs: Vec<String>,
) -> Result<HashMap<String, String>, RpcError> {
let hashes = self
.file_watching
.hash_watcher
.get_file_hashes(package_path, inputs)
.await
.map_err(|e| match e {
PackageWatchError::Unavailable => RpcError::NoFileWatching,
PackageWatchError::InvalidState(_) => RpcError::NoFileWatching,
})?;
Ok(hashes)
}
}

async fn watch_root(
Expand Down Expand Up @@ -473,6 +496,19 @@ impl proto::turbod_server::Turbod for TurboGrpcServiceInner {
}))
}

async fn get_file_hashes(
&self,
request: tonic::Request<proto::GetFileHashesRequest>,
) -> Result<tonic::Response<proto::GetFileHashesResponse>, tonic::Status> {
let inner = request.into_inner();
let file_hashes = self
.get_file_hashes(inner.package_path, inner.input_globs)
.await?;
Ok(tonic::Response::new(proto::GetFileHashesResponse {
file_hashes,
}))
}

async fn discover_packages(
&self,
_request: tonic::Request<proto::DiscoverPackagesRequest>,
Expand Down

0 comments on commit 751442f

Please sign in to comment.