Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make sure that our concurrency check ignore packages without task #6790

Merged
merged 3 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
137 changes: 132 additions & 5 deletions crates/turborepo-lib/src/engine/mod.rs
Expand Up @@ -163,10 +163,6 @@ impl Engine<Built> {
// No need to check the root node if that's where we are.
return Ok(false);
};
let is_persistent = self
.task_definitions
.get(task_id)
.map_or(false, |task_def| task_def.persistent);

for dep_index in self
.task_graph
Expand Down Expand Up @@ -203,7 +199,24 @@ impl Engine<Built> {
}
}

Ok(is_persistent)
// check if the package for the task has that task in its package.json
let info = package_graph
.workspace_info(&WorkspaceName::from(task_id.package().to_string()))
.expect("package graph should contain workspace info for task package");

let package_has_task = info
.package_json
.scripts
.get(task_id.task())
// handle legacy behaviour from go where an empty string may appear
.map_or(false, |script| !script.is_empty());

let task_is_persistent = self
.task_definitions
.get(task_id)
.map_or(false, |task_def| task_def.persistent);

Ok(task_is_persistent && package_has_task)
})
.fold((0, Vec::new()), |(mut count, mut errs), result| {
match result {
Expand All @@ -214,6 +227,8 @@ impl Engine<Built> {
(count, errs)
});

// there must always be at least one concurrency 'slot' available for
// non-persistent tasks otherwise we get race conditions
if persistent_count >= concurrency {
validation_errors.push(ValidateError::PersistentTasksExceedConcurrency {
persistent_count,
Expand Down Expand Up @@ -260,3 +275,115 @@ impl fmt::Display for TaskNode {
}
}
}

#[cfg(test)]
mod test {

use std::collections::BTreeMap;

use tempdir::TempDir;
use turbopath::AbsoluteSystemPath;
use turborepo_repository::{
discovery::{DiscoveryResponse, PackageDiscovery, PackageDiscoveryBuilder, WorkspaceData},
package_graph::PackageGraphBuilder,
package_json::PackageJson,
};

use super::*;

struct DummyDiscovery<'a>(&'a TempDir);

impl<'a> PackageDiscovery for DummyDiscovery<'a> {
async fn discover_packages(
&mut self,
) -> Result<
turborepo_repository::discovery::DiscoveryResponse,
turborepo_repository::discovery::Error,
> {
// our workspace has three packages, two of which have a build script
let workspaces = [("a", true), ("b", true), ("c", false)]
.into_iter()
.map(|(name, had_build)| {
let path = AbsoluteSystemPath::from_std_path(self.0.path()).unwrap();
let package_json = path.join_component(&format!("{}.json", name));

let scripts = if had_build {
BTreeMap::from_iter(
[("build".to_string(), "echo built!".to_string())].into_iter(),
)
} else {
BTreeMap::default()
};

let package = PackageJson {
name: Some(name.to_string()),
scripts,
..Default::default()
};

let file = std::fs::File::create(package_json.as_std_path()).unwrap();
serde_json::to_writer(file, &package).unwrap();

WorkspaceData {
package_json,
turbo_json: None,
}
})
.collect();

Ok(DiscoveryResponse {
package_manager: turborepo_repository::package_manager::PackageManager::Pnpm,
workspaces,
})
}
}

#[tokio::test]
async fn issue_4291() {
// we had an issue where our engine validation would reject running persistent
// tasks if the number of _total packages_ exceeded the concurrency limit,
// rather than the number of package that had that task. in this test, we
// set up a workspace with three packages, two of which have a persistent build
// task. we expect concurrency limit 1 to fail, but 2 and 3 to pass.

let tmp = tempdir::TempDir::new("issue_4291").unwrap();

let mut engine = Engine::new();

// add two packages with a persistent build task
for package in ["a", "b"] {
let task_id = TaskId::new(package, "build");
engine.get_index(&task_id);
engine.add_definition(
task_id,
TaskDefinition {
persistent: true,
..Default::default()
},
);
}

let engine = engine.seal();

let mut graph_builder = PackageGraph::builder(
AbsoluteSystemPath::from_std_path(&tmp.path()).unwrap(),
PackageJson::default(),
)
.with_package_discovery(DummyDiscovery(&tmp));

let graph = graph_builder.build().await.unwrap();

// if our limit is less than, it should fail
engine.validate(&graph, 1).expect_err("not enough");

// if our limit is less than, it should fail
engine.validate(&graph, 2).expect_err("not enough");

// we have two persistent tasks, and a slot for all other tasks, so this should
// pass
engine.validate(&graph, 3).expect("ok");

// if our limit is greater, then it should pass
engine.validate(&graph, 4).expect("ok");
}
}
Expand Up @@ -10,3 +10,8 @@
ERROR run failed: error preparing engine: Invalid persistent task configuration:
You have 2 persistent tasks but `turbo` is configured for concurrency of 2. Set --concurrency to at least 3
[1]

$ ${TURBO} run build --concurrency=3 > tmp.log 2>&1
$ grep -E "2 successful, 2 total" tmp.log
Tasks: 2 successful, 2 total