From b67cde6984c3f85f2f0523250a4834b3b2fd44a3 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Mon, 1 Apr 2024 12:25:53 -0700 Subject: [PATCH] More server refactor --- crates/turborepo-lib/src/daemon/client.rs | 19 ++++- crates/turborepo-lib/src/daemon/connector.rs | 7 ++ crates/turborepo-lib/src/daemon/server.rs | 45 +++++++--- crates/turborepo-lib/src/run/builder.rs | 1 + crates/turborepo-lib/src/task_hash.rs | 89 +++++++++++++++++--- 5 files changed, 139 insertions(+), 22 deletions(-) diff --git a/crates/turborepo-lib/src/daemon/client.rs b/crates/turborepo-lib/src/daemon/client.rs index dc38bd483b36d..bdde1810dd015 100644 --- a/crates/turborepo-lib/src/daemon/client.rs +++ b/crates/turborepo-lib/src/daemon/client.rs @@ -4,12 +4,12 @@ use globwalk::ValidatedGlob; use thiserror::Error; use tonic::{Code, IntoRequest, Status}; use tracing::info; -use turbopath::AbsoluteSystemPathBuf; +use turbopath::{AbsoluteSystemPathBuf, AnchoredSystemPath}; use super::{ connector::{DaemonConnector, DaemonConnectorError}, endpoint::SocketOpenError, - proto::DiscoverPackagesResponse, + proto::{DiscoverPackagesResponse, GetFileHashesResponse}, Paths, }; use crate::{ @@ -156,7 +156,22 @@ impl DaemonClient { .package_changes(proto::PackageChangesRequest {}) .await? .into_inner(); + Ok(response) + } + pub async fn get_file_hashes( + &mut self, + package_path: &AnchoredSystemPath, + inputs: &[String], + ) -> Result { + let response = self + .client + .get_file_hashes(proto::GetFileHashesRequest { + package_path: package_path.to_string(), + input_globs: inputs.to_vec(), + }) + .await? + .into_inner(); Ok(response) } } diff --git a/crates/turborepo-lib/src/daemon/connector.rs b/crates/turborepo-lib/src/daemon/connector.rs index 094b76661d992..8f0557a69a871 100644 --- a/crates/turborepo-lib/src/daemon/connector.rs +++ b/crates/turborepo-lib/src/daemon/connector.rs @@ -649,6 +649,13 @@ mod test { ) -> Result, Status> { unimplemented!() } + + async fn get_file_hashes( + &self, + _req: tonic::Request, + ) -> Result, tonic::Status> { + unimplemented!() + } } #[tokio::test] diff --git a/crates/turborepo-lib/src/daemon/server.rs b/crates/turborepo-lib/src/daemon/server.rs index 335d9e2538c46..eebc919c50b40 100644 --- a/crates/turborepo-lib/src/daemon/server.rs +++ b/crates/turborepo-lib/src/daemon/server.rs @@ -25,11 +25,11 @@ use tokio_stream::wrappers::ReceiverStream; use tonic::{server::NamedService, transport::Server}; use tower::ServiceBuilder; use tracing::{error, info, trace, warn}; -use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; +use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPathBuf, PathError}; use turborepo_filewatch::{ cookies::CookieWriter, globwatcher::{Error as GlobWatcherError, GlobError, GlobSet, GlobWatcher}, - hash_watcher::HashWatcher, + hash_watcher::{Error as HashWatcherError, HashSpec, HashWatcher}, package_watcher::{PackageWatchError, PackageWatcher}, FileSystemWatcher, WatchError, }; @@ -72,12 +72,16 @@ pub struct FileWatching { enum RpcError { #[error("deadline exceeded")] DeadlineExceeded, + #[error("invalid relative system path {0}: {1}")] + InvalidAnchoredPath(String, PathError), #[error("invalid glob: {0}")] InvalidGlob(#[from] GlobError), #[error("globwatching failed: {0}")] GlobWatching(#[from] GlobWatcherError), #[error("filewatching unavailable")] NoFileWatching, + #[error("file hashing failed: {0}")] + FileHashing(#[from] HashWatcherError), } impl From for tonic::Status { @@ -89,6 +93,12 @@ impl From for tonic::Status { RpcError::InvalidGlob(e) => tonic::Status::invalid_argument(e.to_string()), RpcError::GlobWatching(e) => tonic::Status::unavailable(e.to_string()), RpcError::NoFileWatching => tonic::Status::unavailable("filewatching unavailable"), + RpcError::FileHashing(e) => { + tonic::Status::failed_precondition(format!("File hashing not available: {e}",)) + } + e @ RpcError::InvalidAnchoredPath(_, _) => { + tonic::Status::invalid_argument(e.to_string()) + } } } } @@ -350,16 +360,28 @@ impl TurboGrpcServiceInner { package_path: String, inputs: Vec, ) -> Result, RpcError> { - let hashes = self - .file_watching + let glob_set = if inputs.is_empty() { + None + } else { + Some(GlobSet::from_raw_unfiltered(inputs)?) + }; + let package_path = AnchoredSystemPathBuf::try_from(package_path.as_str()) + .map_err(|e| RpcError::InvalidAnchoredPath(package_path, e))?; + let hash_spec = HashSpec { + package_path, + inputs: glob_set, + }; + self.file_watching .hash_watcher - .get_file_hashes(package_path, inputs) + .get_file_hashes(hash_spec) .await - .map_err(|e| match e { - PackageWatchError::Unavailable => RpcError::NoFileWatching, - PackageWatchError::InvalidState(_) => RpcError::NoFileWatching, - })?; - Ok(hashes) + .map_err(RpcError::FileHashing) + .map(|hashes| { + hashes + .into_iter() + .map(|(path, hash)| (path.to_string(), hash)) + .collect() + }) } } @@ -496,6 +518,9 @@ impl proto::turbod_server::Turbod for TurboGrpcServiceInner { })) } + // Note that this is implemented as a blocking call. We expect the default + // server timeout to apply, as well as whatever timeout the client may have + // set. async fn get_file_hashes( &self, request: tonic::Request, diff --git a/crates/turborepo-lib/src/run/builder.rs b/crates/turborepo-lib/src/run/builder.rs index 0bd17d61fd502..cf8ef8da115b1 100644 --- a/crates/turborepo-lib/src/run/builder.rs +++ b/crates/turborepo-lib/src/run/builder.rs @@ -357,6 +357,7 @@ impl RunBuilder { engine.task_definitions(), &self.repo_root, &run_telemetry, + &mut daemon, )?; if self.opts.run_opts.parallel { diff --git a/crates/turborepo-lib/src/task_hash.rs b/crates/turborepo-lib/src/task_hash.rs index 795a50efc05a2..6fbe80f3d3644 100644 --- a/crates/turborepo-lib/src/task_hash.rs +++ b/crates/turborepo-lib/src/task_hash.rs @@ -1,13 +1,16 @@ use std::{ collections::{HashMap, HashSet}, sync::{Arc, Mutex}, + time::Duration, }; use rayon::prelude::*; use serde::Serialize; use thiserror::Error; use tracing::{debug, Span}; -use turbopath::{AbsoluteSystemPath, AnchoredSystemPath, AnchoredSystemPathBuf}; +use turbopath::{ + AbsoluteSystemPath, AnchoredSystemPath, AnchoredSystemPathBuf, RelativeUnixPathBuf, +}; use turborepo_cache::CacheHitMetadata; use turborepo_env::{BySource, DetailedMap, EnvironmentVariableMap, ResolvedEnvMode}; use turborepo_repository::package_graph::{PackageInfo, PackageName}; @@ -23,6 +26,7 @@ use crate::{ opts::RunOpts, run::task_id::TaskId, task_graph::TaskDefinition, + DaemonClient, DaemonConnector, }; #[derive(Debug, Error)] @@ -74,11 +78,12 @@ impl PackageInputsHashes { task_definitions: &HashMap, TaskDefinition>, repo_root: &AbsoluteSystemPath, telemetry: &GenericEventBuilder, + daemon: &mut Option>, ) -> Result { tracing::trace!(scm_manual=%scm.is_manual(), "scm running in {} mode", if scm.is_manual() { "manual" } else { "git" }); let span = Span::current(); - + let handle = tokio::runtime::Handle::current(); let (hashes, expanded_hashes): (HashMap<_, _>, HashMap<_, _>) = all_tasks .filter_map(|task| { let span = tracing::info_span!(parent: &span, "calculate_file_hash", ?task); @@ -87,6 +92,10 @@ impl PackageInputsHashes { return None; }; + let mut daemon = daemon + .as_ref() // Option::ref + .cloned(); + let task_definition = match task_definitions .get(task_id) .ok_or_else(|| Error::MissingPipelineEntry(task_id.clone())) @@ -115,14 +124,74 @@ impl PackageInputsHashes { .unwrap_or_else(|| AnchoredSystemPath::new("").unwrap()); let scm_telemetry = package_task_event.child(); - let mut hash_object = match scm.get_package_file_hashes( - repo_root, - package_path, - &task_definition.inputs, - Some(scm_telemetry), - ) { - Ok(hash_object) => hash_object, - Err(err) => return Some(Err(err.into())), + // Try hashing with the daemon, if we have a connection. If we don't, or if we + // timeout or get an error, fallback to local hashing + let hash_object = daemon + .as_mut() + .and_then(|daemon| { + let handle = handle.clone(); + // We need an async block here because the timeout must be created with an + // active tokio context. Constructing it directly in + // the rayon thread doesn't provide one and will crash at runtime. + handle + .block_on(async { + tokio::time::timeout( + Duration::from_millis(100), + daemon.get_file_hashes(package_path, &task_definition.inputs), + ) + .await + }) + .map_err(|e| { + tracing::debug!( + "daemon file hashing timed out for {}", + package_path + ); + e + }) + .ok() // If we timed out, we don't need to error, + // just return None so we can move on to local + }) + .and_then(|result| { + match result { + Ok(hashes_resp) => Some( + hashes_resp + .file_hashes + .into_iter() + .map(|(path, hash)| { + ( + RelativeUnixPathBuf::new(path) + .expect("daemon returns relative unix paths"), + hash, + ) + }) + .collect::>(), + ), + Err(e) => { + // Daemon could've failed for various reasons. We can still try + // local hashing. + tracing::debug!( + "daemon file hashing failed for {}: {}", + package_path, + e + ); + None + } + } + }); + let mut hash_object = match hash_object { + Some(hash_object) => hash_object, + None => { + let local_hash_result = scm.get_package_file_hashes( + repo_root, + package_path, + &task_definition.inputs, + Some(scm_telemetry), + ); + match local_hash_result { + Ok(hash_object) => hash_object, + Err(err) => return Some(Err(err.into())), + } + } }; if let Some(dot_env) = &task_definition.dot_env { if !dot_env.is_empty() {