Skip to content

Commit

Permalink
wire up file hashes
Browse files Browse the repository at this point in the history
  • Loading branch information
arlyon committed Feb 2, 2024
1 parent 82a2fca commit afa8e69
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 66 deletions.
4 changes: 2 additions & 2 deletions crates/turborepo-lib/src/daemon/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,14 @@ impl<T> DaemonClient<T> {

pub async fn discover_package_hashes(
&mut self,
) -> Result<Vec<proto::PackageHash>, DaemonError> {
) -> Result<proto::DiscoverPackageHashesResponse, DaemonError> {
let response = self
.client
.discover_package_hashes(proto::DiscoverPackageHashesRequest {})
.await?
.into_inner();

Ok(response.package_hashes)
Ok(response)
}
}

Expand Down
113 changes: 90 additions & 23 deletions crates/turborepo-lib/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
};

use futures::{stream, Future, Stream};
use itertools::Itertools;
use semver::Version;
use thiserror::Error;
use tokio::{
Expand All @@ -31,17 +32,20 @@ use tokio::{
use tonic::transport::{NamedService, Server};
use tower::ServiceBuilder;
use tracing::{error, info, trace, warn};
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf};
use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPath};
use turborepo_filewatch::{
cookie_jar::CookieJar,
globwatcher::{Error as GlobWatcherError, GlobError, GlobSet, GlobWatcher},
package_hash_watcher::PackageHashWatcher,
package_watcher::PackageWatcher,
FileSystemWatcher, WatchError,
};
use turborepo_repository::discovery::{
LocalPackageDiscoveryBuilder, PackageDiscovery, PackageDiscoveryBuilder,
use turborepo_repository::{
discovery::{LocalPackageDiscoveryBuilder, PackageDiscovery, PackageDiscoveryBuilder},
package_graph::{PackageGraph, WorkspaceName},
package_json::PackageJson,
};
use turborepo_scm::SCM;
use turborepo_telemetry::events::generic::GenericEventBuilder;

use super::{
Expand All @@ -51,11 +55,12 @@ use super::{
};
use crate::{
daemon::{bump_timeout_layer::BumpTimeoutLayer, endpoint::listen_socket},
engine::EngineBuilder,
run::{
package_discovery::WatchingPackageDiscovery,
package_hashes::{watch::WatchingPackageHasher, LocalPackageHashes, PackageHasher},
task_id::TaskId,
},
turbo_json::TurboJson,
};

#[derive(Debug)]
Expand Down Expand Up @@ -275,10 +280,61 @@ where
};

let package_discovery = Arc::new(AsyncMutex::new(package_discovery));

let scm = SCM::new(&repo_root);

let package_json_path = repo_root.join_component("package.json");
let root_package_json = PackageJson::load(&package_json_path).unwrap();
let root_turbo_json = TurboJson::load(
&repo_root,
AnchoredSystemPath::empty(),
&root_package_json,
false,
)
.unwrap();

let pkg_dep_graph = {
let discovery = package_discovery.lock().await;
PackageGraph::builder(&repo_root, root_package_json.clone())
.with_package_discovery(discovery)
.build()
.await
.unwrap()
};

let engine = EngineBuilder::new(&repo_root, &pkg_dep_graph, false)
.with_root_tasks(root_turbo_json.pipeline.keys().cloned())
.with_tasks(root_turbo_json.pipeline.keys().cloned())
.with_turbo_jsons(Some(
[(WorkspaceName::Root, root_turbo_json)]
.into_iter()
.collect(),
))
.with_workspaces(
pkg_dep_graph
.workspaces()
.map(|(name, _)| name.to_owned())
.collect(),
)
.build()
.unwrap();

let fallback = LocalPackageHashes::new(
scm,
pkg_dep_graph
.workspaces()
.map(|(name, info)| (name.to_owned(), info.to_owned()))
.collect(),
engine.tasks().cloned(),
engine.task_definitions().to_owned(),
repo_root,
);

tracing::debug!("initing package hash watcher");
let package_hashes = AsyncMutex::new(
WatchingPackageHasher::new(
package_discovery.clone(),
None::<LocalPackageHashes>,
fallback,
Duration::from_secs(60 * 5),
watcher_rx.clone(),
)
Expand Down Expand Up @@ -560,25 +616,36 @@ where
&self,
_request: tonic::Request<proto::DiscoverPackageHashesRequest>,
) -> Result<tonic::Response<proto::DiscoverPackageHashesResponse>, tonic::Status> {
self.package_hashes
.lock()
.await
.calculate_hashes(GenericEventBuilder::new())
.await
.map(|hashes| {
tonic::Response::new(proto::DiscoverPackageHashesResponse {
package_hashes: hashes
.hashes
.into_iter()
.map(|(task_id, hash)| proto::PackageHash {
package: task_id.package().into(),
task: task_id.task().into(),
hash,
})
.collect(),
let hashes = {
let mut hasher = self.package_hashes.lock().await;
hasher
.calculate_hashes(GenericEventBuilder::new())
.await
.map_err(|e| tonic::Status::internal(format!("{}", e)))?
};

Ok(tonic::Response::new(proto::DiscoverPackageHashesResponse {
package_hashes: hashes
.hashes
.into_iter()
.map(|(task_id, hash)| proto::PackageHash {
package: task_id.package().into(),
task: task_id.task().into(),
hash,
inputs: vec![],
})
})
.map_err(|e| tonic::Status::internal(format!("{}", e)))
.collect(),
file_hashes: hashes
.expanded_hashes
.into_iter()
.flat_map(|h| h.1 .0.into_iter())
.unique() // the same file may appear under multiple package-tasks
.map(|(k, v)| proto::FileHash {
relative_path: k.to_string(),
hash: v,
})
.collect(),
}))
}

type SubscribePackageHashesStream =
Expand Down
10 changes: 10 additions & 0 deletions crates/turborepo-lib/src/daemon/turbod.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,21 @@ message DiscoverPackageHashesRequest {

message DiscoverPackageHashesResponse {
repeated PackageHash package_hashes = 1;
repeated FileHash file_hashes = 2;
}


message PackageHash {
string package = 1;
string task = 2;
string hash = 3;
// the relative path of file inputs for the package task
repeated string inputs = 4;
}

message FileHash {
// the repo-root-relative unix-formatted path of the file
string relative_path = 1;
// the hash of that file
string hash = 2;
}
10 changes: 7 additions & 3 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,13 @@ impl Run {
(
Some(package_hashes::LocalPackageHashes::new(
scm,
&pkg_dep_graph,
&engine,
&self.base.repo_root,
pkg_dep_graph
.workspaces()
.map(|(name, info)| (name.to_owned(), info.to_owned()))
.collect(),
engine.tasks().cloned(),
engine.task_definitions().to_owned(),
self.base.repo_root.clone(),
)),
Duration::from_millis(10),
)
Expand Down
101 changes: 72 additions & 29 deletions crates/turborepo-lib/src/run/package_hashes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
pub mod watch;

use rayon::iter::ParallelBridge;
use tokio::sync::Mutex;
use turbopath::AbsoluteSystemPath;
use turborepo_repository::{
discovery::{DiscoveryResponse, PackageDiscovery},
package_graph::PackageGraph,
};
use std::collections::HashMap;

use rayon::prelude::*;
use turbopath::{AbsoluteSystemPathBuf, RelativeUnixPathBuf};
use turborepo_repository::package_graph::{WorkspaceInfo, WorkspaceName};
use turborepo_scm::SCM;
use turborepo_telemetry::events::generic::GenericEventBuilder;

use super::task_id::TaskId;
use crate::{engine::Engine, run::error::Error, task_hash::PackageInputsHashes, DaemonClient};
use crate::{
engine::TaskNode, hash::FileHashes, run::error::Error, task_graph::TaskDefinition,
task_hash::PackageInputsHashes, DaemonClient,
};

pub trait PackageHasher {
fn calculate_hashes(
Expand All @@ -20,41 +21,52 @@ pub trait PackageHasher {
) -> impl std::future::Future<Output = Result<PackageInputsHashes, Error>> + Send;
}

pub struct LocalPackageHashes<'a> {
pub struct LocalPackageHashes {
scm: SCM,
pkg_dep_graph: &'a PackageGraph,
engine: &'a Engine,
repo_root: &'a AbsoluteSystemPath,
workspaces: HashMap<WorkspaceName, WorkspaceInfo>,
tasks: Vec<TaskNode>,
task_definitions: HashMap<TaskId<'static>, TaskDefinition>,
repo_root: AbsoluteSystemPathBuf,
}

impl<'a> LocalPackageHashes<'a> {
impl LocalPackageHashes {
pub fn new(
scm: SCM,
pkg_dep_graph: &'a PackageGraph,
engine: &'a Engine,
repo_root: &'a AbsoluteSystemPath,
workspaces: HashMap<WorkspaceName, WorkspaceInfo>,
tasks: impl Iterator<Item = TaskNode>,
task_definitions: HashMap<TaskId<'static>, TaskDefinition>,
repo_root: AbsoluteSystemPathBuf,
) -> Self {
let tasks: Vec<_> = tasks.collect();
tracing::debug!(
"creating new local package hasher with {} tasks and {} definitions across {} \
workspaces",
tasks.len(),
task_definitions.len(),
workspaces.len()
);
Self {
scm,
pkg_dep_graph,
engine,
workspaces,
tasks,
task_definitions,
repo_root,
}
}
}

impl<'a> PackageHasher for LocalPackageHashes<'a> {
impl PackageHasher for LocalPackageHashes {
async fn calculate_hashes(
&mut self,
run_telemetry: GenericEventBuilder,
) -> Result<PackageInputsHashes, Error> {
let workspaces = self.pkg_dep_graph.workspaces().collect();
tracing::debug!("running local package hash discovery in {}", self.repo_root);
let package_inputs_hashes = PackageInputsHashes::calculate_file_hashes(
&self.scm,
self.engine.tasks().par_bridge(),
workspaces,
self.engine.task_definitions(),
self.repo_root,
self.tasks.par_iter(),
&self.workspaces,
&self.task_definitions,
&self.repo_root,
&run_telemetry,
)?;
Ok(package_inputs_hashes)
Expand All @@ -73,12 +85,43 @@ impl<'a, C: Clone + Send> PackageHasher for DaemonPackageHasher<'a, C> {
let package_hashes = self.daemon.discover_package_hashes().await;

package_hashes
.map(|resp| PackageInputsHashes {
hashes: resp
.map(|resp| {
let mapping: HashMap<_, _> = resp
.file_hashes
.into_iter()
.map(|fh| (fh.relative_path, fh.hash))
.collect();

let (expanded_hashes, hashes) = resp
.package_hashes
.into_iter()
.map(|f| (TaskId::new(&f.package, &f.task).into_owned(), f.hash))
.collect(),
..Default::default()
.map(|ph| {
(
(
TaskId::new(&ph.package, &ph.task).into_owned(),
FileHashes(
ph.inputs
.into_iter()
.filter_map(|f| {
mapping.get(&f).map(|hash| {
(
RelativeUnixPathBuf::new(f).unwrap(),
hash.to_owned(),
)
})
})
.collect(),
),
),
(TaskId::from_owned(ph.package, ph.task), ph.hash),
)
})
.unzip();

PackageInputsHashes {
expanded_hashes,
hashes,
}
})
.map_err(Error::Daemon)
}
Expand Down

0 comments on commit afa8e69

Please sign in to comment.