Skip to content

Commit

Permalink
More server refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Greg Soltis authored and Greg Soltis committed Apr 15, 2024
1 parent 751442f commit b67cde6
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 22 deletions.
19 changes: 17 additions & 2 deletions crates/turborepo-lib/src/daemon/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use globwalk::ValidatedGlob;
use thiserror::Error;
use tonic::{Code, IntoRequest, Status};
use tracing::info;
use turbopath::AbsoluteSystemPathBuf;
use turbopath::{AbsoluteSystemPathBuf, AnchoredSystemPath};

use super::{
connector::{DaemonConnector, DaemonConnectorError},
endpoint::SocketOpenError,
proto::DiscoverPackagesResponse,
proto::{DiscoverPackagesResponse, GetFileHashesResponse},
Paths,
};
use crate::{
Expand Down Expand Up @@ -156,7 +156,22 @@ impl<T> DaemonClient<T> {
.package_changes(proto::PackageChangesRequest {})
.await?
.into_inner();
Ok(response)
}

pub async fn get_file_hashes(
&mut self,
package_path: &AnchoredSystemPath,
inputs: &[String],
) -> Result<GetFileHashesResponse, DaemonError> {
let response = self
.client
.get_file_hashes(proto::GetFileHashesRequest {
package_path: package_path.to_string(),
input_globs: inputs.to_vec(),
})
.await?
.into_inner();
Ok(response)
}
}
Expand Down
7 changes: 7 additions & 0 deletions crates/turborepo-lib/src/daemon/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,13 @@ mod test {
) -> Result<Response<Self::PackageChangesStream>, Status> {
unimplemented!()
}

async fn get_file_hashes(
&self,
_req: tonic::Request<proto::GetFileHashesRequest>,
) -> Result<tonic::Response<proto::GetFileHashesResponse>, tonic::Status> {
unimplemented!()
}
}

#[tokio::test]
Expand Down
45 changes: 35 additions & 10 deletions crates/turborepo-lib/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use tokio_stream::wrappers::ReceiverStream;
use tonic::{server::NamedService, transport::Server};
use tower::ServiceBuilder;
use tracing::{error, info, trace, warn};
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf};
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPathBuf, PathError};
use turborepo_filewatch::{
cookies::CookieWriter,
globwatcher::{Error as GlobWatcherError, GlobError, GlobSet, GlobWatcher},
hash_watcher::HashWatcher,
hash_watcher::{Error as HashWatcherError, HashSpec, HashWatcher},
package_watcher::{PackageWatchError, PackageWatcher},
FileSystemWatcher, WatchError,
};
Expand Down Expand Up @@ -72,12 +72,16 @@ pub struct FileWatching {
enum RpcError {
#[error("deadline exceeded")]
DeadlineExceeded,
#[error("invalid relative system path {0}: {1}")]
InvalidAnchoredPath(String, PathError),
#[error("invalid glob: {0}")]
InvalidGlob(#[from] GlobError),
#[error("globwatching failed: {0}")]
GlobWatching(#[from] GlobWatcherError),
#[error("filewatching unavailable")]
NoFileWatching,
#[error("file hashing failed: {0}")]
FileHashing(#[from] HashWatcherError),
}

impl From<RpcError> for tonic::Status {
Expand All @@ -89,6 +93,12 @@ impl From<RpcError> for tonic::Status {
RpcError::InvalidGlob(e) => tonic::Status::invalid_argument(e.to_string()),
RpcError::GlobWatching(e) => tonic::Status::unavailable(e.to_string()),
RpcError::NoFileWatching => tonic::Status::unavailable("filewatching unavailable"),
RpcError::FileHashing(e) => {
tonic::Status::failed_precondition(format!("File hashing not available: {e}",))
}
e @ RpcError::InvalidAnchoredPath(_, _) => {
tonic::Status::invalid_argument(e.to_string())
}
}
}
}
Expand Down Expand Up @@ -350,16 +360,28 @@ impl TurboGrpcServiceInner {
package_path: String,
inputs: Vec<String>,
) -> Result<HashMap<String, String>, RpcError> {
let hashes = self
.file_watching
let glob_set = if inputs.is_empty() {
None
} else {
Some(GlobSet::from_raw_unfiltered(inputs)?)
};
let package_path = AnchoredSystemPathBuf::try_from(package_path.as_str())
.map_err(|e| RpcError::InvalidAnchoredPath(package_path, e))?;
let hash_spec = HashSpec {
package_path,
inputs: glob_set,
};
self.file_watching
.hash_watcher
.get_file_hashes(package_path, inputs)
.get_file_hashes(hash_spec)
.await
.map_err(|e| match e {
PackageWatchError::Unavailable => RpcError::NoFileWatching,
PackageWatchError::InvalidState(_) => RpcError::NoFileWatching,
})?;
Ok(hashes)
.map_err(RpcError::FileHashing)
.map(|hashes| {
hashes
.into_iter()
.map(|(path, hash)| (path.to_string(), hash))
.collect()
})
}
}

Expand Down Expand Up @@ -496,6 +518,9 @@ impl proto::turbod_server::Turbod for TurboGrpcServiceInner {
}))
}

// Note that this is implemented as a blocking call. We expect the default
// server timeout to apply, as well as whatever timeout the client may have
// set.
async fn get_file_hashes(
&self,
request: tonic::Request<proto::GetFileHashesRequest>,
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-lib/src/run/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ impl RunBuilder {
engine.task_definitions(),
&self.repo_root,
&run_telemetry,
&mut daemon,
)?;

if self.opts.run_opts.parallel {
Expand Down
89 changes: 79 additions & 10 deletions crates/turborepo-lib/src/task_hash.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use std::{
collections::{HashMap, HashSet},
sync::{Arc, Mutex},
time::Duration,
};

use rayon::prelude::*;
use serde::Serialize;
use thiserror::Error;
use tracing::{debug, Span};
use turbopath::{AbsoluteSystemPath, AnchoredSystemPath, AnchoredSystemPathBuf};
use turbopath::{
AbsoluteSystemPath, AnchoredSystemPath, AnchoredSystemPathBuf, RelativeUnixPathBuf,
};
use turborepo_cache::CacheHitMetadata;
use turborepo_env::{BySource, DetailedMap, EnvironmentVariableMap, ResolvedEnvMode};
use turborepo_repository::package_graph::{PackageInfo, PackageName};
Expand All @@ -23,6 +26,7 @@ use crate::{
opts::RunOpts,
run::task_id::TaskId,
task_graph::TaskDefinition,
DaemonClient, DaemonConnector,
};

#[derive(Debug, Error)]
Expand Down Expand Up @@ -74,11 +78,12 @@ impl PackageInputsHashes {
task_definitions: &HashMap<TaskId<'static>, TaskDefinition>,
repo_root: &AbsoluteSystemPath,
telemetry: &GenericEventBuilder,
daemon: &mut Option<DaemonClient<DaemonConnector>>,
) -> Result<PackageInputsHashes, Error> {
tracing::trace!(scm_manual=%scm.is_manual(), "scm running in {} mode", if scm.is_manual() { "manual" } else { "git" });

let span = Span::current();

let handle = tokio::runtime::Handle::current();
let (hashes, expanded_hashes): (HashMap<_, _>, HashMap<_, _>) = all_tasks
.filter_map(|task| {
let span = tracing::info_span!(parent: &span, "calculate_file_hash", ?task);
Expand All @@ -87,6 +92,10 @@ impl PackageInputsHashes {
return None;
};

let mut daemon = daemon
.as_ref() // Option::ref
.cloned();

let task_definition = match task_definitions
.get(task_id)
.ok_or_else(|| Error::MissingPipelineEntry(task_id.clone()))
Expand Down Expand Up @@ -115,14 +124,74 @@ impl PackageInputsHashes {
.unwrap_or_else(|| AnchoredSystemPath::new("").unwrap());

let scm_telemetry = package_task_event.child();
let mut hash_object = match scm.get_package_file_hashes(
repo_root,
package_path,
&task_definition.inputs,
Some(scm_telemetry),
) {
Ok(hash_object) => hash_object,
Err(err) => return Some(Err(err.into())),
// Try hashing with the daemon, if we have a connection. If we don't, or if we
// timeout or get an error, fallback to local hashing
let hash_object = daemon
.as_mut()
.and_then(|daemon| {
let handle = handle.clone();
// We need an async block here because the timeout must be created with an
// active tokio context. Constructing it directly in
// the rayon thread doesn't provide one and will crash at runtime.
handle
.block_on(async {
tokio::time::timeout(
Duration::from_millis(100),
daemon.get_file_hashes(package_path, &task_definition.inputs),
)
.await
})
.map_err(|e| {
tracing::debug!(
"daemon file hashing timed out for {}",
package_path
);
e
})
.ok() // If we timed out, we don't need to error,
// just return None so we can move on to local
})
.and_then(|result| {
match result {
Ok(hashes_resp) => Some(
hashes_resp
.file_hashes
.into_iter()
.map(|(path, hash)| {
(
RelativeUnixPathBuf::new(path)
.expect("daemon returns relative unix paths"),
hash,
)
})
.collect::<HashMap<_, _>>(),
),
Err(e) => {
// Daemon could've failed for various reasons. We can still try
// local hashing.
tracing::debug!(
"daemon file hashing failed for {}: {}",
package_path,
e
);
None
}
}
});
let mut hash_object = match hash_object {
Some(hash_object) => hash_object,
None => {
let local_hash_result = scm.get_package_file_hashes(
repo_root,
package_path,
&task_definition.inputs,
Some(scm_telemetry),
);
match local_hash_result {
Ok(hash_object) => hash_object,
Err(err) => return Some(Err(err.into())),
}
}
};
if let Some(dot_env) = &task_definition.dot_env {
if !dot_env.is_empty() {
Expand Down

0 comments on commit b67cde6

Please sign in to comment.