Skip to content

Commit

Permalink
feat(turborepo): Persistent Tasks in Watch Mode (#7922)
Browse files Browse the repository at this point in the history
### Description

Implements handling of persistent tasks. Basically, we strip out any
persistent tasks from the task graph when we do either an execution for
package rediscovery or package changes. Instead, we run the persistent
tasks in a separate thread, so they don't block anything.

### Testing Instructions

Added tests for both pruning methods.

Closes TURBO-2775
  • Loading branch information
NicholasLYang committed Apr 23, 2024
1 parent 08aa2be commit da53c14
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 10 deletions.
4 changes: 3 additions & 1 deletion crates/turborepo-graph-utils/src/walker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ impl<N: Eq + Hash + Copy + Send + 'static> Walker<N, Start> {
}
// We will be emitting at most txs.len() nodes so emitting a node should never
// block
let (node_tx, node_rx) = mpsc::channel(txs.len());
//
// Always have at least 1 entry in buffer or this will panic
let (node_tx, node_rx) = mpsc::channel(std::cmp::max(txs.len(), 1));
let join_handles = FuturesUnordered::new();
for node in graph.node_identifiers() {
let tx = txs.remove(&node).expect("should have sender for all nodes");
Expand Down
235 changes: 234 additions & 1 deletion crates/turborepo-lib/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct Engine<S = Built> {
task_definitions: HashMap<TaskId<'static>, TaskDefinition>,
task_locations: HashMap<TaskId<'static>, Spanned<()>>,
package_tasks: HashMap<PackageName, Vec<petgraph::graph::NodeIndex>>,
pub(crate) has_persistent_tasks: bool,
}

impl Engine<Building> {
Expand All @@ -60,6 +61,7 @@ impl Engine<Building> {
task_definitions: HashMap::default(),
task_locations: HashMap::default(),
package_tasks: HashMap::default(),
has_persistent_tasks: false,
}
}

Expand All @@ -86,6 +88,9 @@ impl Engine<Building> {
task_id: TaskId<'static>,
definition: TaskDefinition,
) -> Option<TaskDefinition> {
if definition.persistent {
self.has_persistent_tasks = true;
}
self.task_definitions.insert(task_id, definition)
}

Expand All @@ -111,6 +116,7 @@ impl Engine<Building> {
task_definitions,
task_locations,
package_tasks,
has_persistent_tasks: has_persistent_task,
..
} = self;
Engine {
Expand All @@ -121,6 +127,7 @@ impl Engine<Building> {
task_definitions,
task_locations,
package_tasks,
has_persistent_tasks: has_persistent_task,
}
}
}
Expand Down Expand Up @@ -152,6 +159,17 @@ impl Engine<Built> {

let new_graph = self.task_graph.filter_map(
|node_idx, node| {
if let TaskNode::Task(task) = &self.task_graph[node_idx] {
// We only want to include tasks that are not persistent
let def = self
.task_definitions
.get(task)
.expect("task should have definition");

if def.persistent {
return None;
}
}
// If the node is reachable from any of the entrypoint tasks, we include it
entrypoint_indices
.iter()
Expand Down Expand Up @@ -186,6 +204,120 @@ impl Engine<Built> {
task_definitions: self.task_definitions.clone(),
task_locations: self.task_locations.clone(),
package_tasks: self.package_tasks.clone(),
// We've filtered out persistent tasks
has_persistent_tasks: false,
}
}

/// Creates an `Engine` with persistent tasks filtered out. Used in watch
/// mode to re-run the non-persistent tasks.
pub fn create_engine_without_persistent_tasks(&self) -> Engine<Built> {
let new_graph = self.task_graph.filter_map(
|node_idx, node| match &self.task_graph[node_idx] {
TaskNode::Task(task) => {
let def = self
.task_definitions
.get(task)
.expect("task should have definition");

if !def.persistent {
Some(node.clone())
} else {
None
}
}
TaskNode::Root => Some(node.clone()),
},
|_, _| Some(()),
);

let root_index = new_graph
.node_indices()
.find(|index| new_graph[*index] == TaskNode::Root)
.expect("root node should be present");

let task_lookup: HashMap<_, _> = new_graph
.node_indices()
.filter_map(|index| {
let task = new_graph
.node_weight(index)
.expect("node index should be present");
match task {
TaskNode::Root => None,
TaskNode::Task(task) => Some((task.clone(), index)),
}
})
.collect();

Engine {
marker: std::marker::PhantomData,
root_index,
task_graph: new_graph,
task_lookup,
task_definitions: self.task_definitions.clone(),
task_locations: self.task_locations.clone(),
package_tasks: self.package_tasks.clone(),
has_persistent_tasks: false,
}
}

/// Creates an `Engine` that is only the persistent tasks.
pub fn create_engine_for_persistent_tasks(&self) -> Engine<Built> {
let mut new_graph = self.task_graph.filter_map(
|node_idx, node| match &self.task_graph[node_idx] {
TaskNode::Task(task) => {
let def = self
.task_definitions
.get(task)
.expect("task should have definition");

if def.persistent {
Some(node.clone())
} else {
None
}
}
TaskNode::Root => Some(node.clone()),
},
|_, _| Some(()),
);

let root_index = new_graph
.node_indices()
.find(|index| new_graph[*index] == TaskNode::Root)
.expect("root node should be present");

// Connect persistent tasks to root
for index in new_graph.node_indices() {
if new_graph[index] == TaskNode::Root {
continue;
}

new_graph.add_edge(root_index, index, ());
}

let task_lookup: HashMap<_, _> = new_graph
.node_indices()
.filter_map(|index| {
let task = new_graph
.node_weight(index)
.expect("node index should be present");
match task {
TaskNode::Root => None,
TaskNode::Task(task) => Some((task.clone(), index)),
}
})
.collect();

Engine {
marker: std::marker::PhantomData,
root_index,
task_graph: new_graph,
task_lookup,
task_definitions: self.task_definitions.clone(),
task_locations: self.task_locations.clone(),
package_tasks: self.package_tasks.clone(),
has_persistent_tasks: true,
}
}

Expand Down Expand Up @@ -436,6 +568,7 @@ mod test {
};

use super::*;
use crate::run::task_id::TaskName;

struct DummyDiscovery<'a>(&'a TempDir);

Expand All @@ -455,7 +588,11 @@ mod test {

let scripts = if had_build {
BTreeMap::from_iter(
[("build".to_string(), "echo built!".to_string())].into_iter(),
[
("build".to_string(), "echo built!".to_string()),
("dev".to_string(), "echo running dev!".to_string()),
]
.into_iter(),
)
} else {
BTreeMap::default()
Expand Down Expand Up @@ -541,4 +678,100 @@ mod test {
// if our limit is greater, then it should pass
engine.validate(&graph, 4, false).expect("ok");
}

#[tokio::test]
async fn test_prune_persistent_tasks() {
// Verifies that we can prune the `Engine` to include only the persistent tasks
// or only the non-persistent tasks.

let mut engine = Engine::new();

// add two packages with a persistent build task
for package in ["a", "b"] {
let build_task_id = TaskId::new(package, "build");
let dev_task_id = TaskId::new(package, "dev");

engine.get_index(&build_task_id);
engine.add_definition(build_task_id.clone(), TaskDefinition::default());

engine.get_index(&dev_task_id);
engine.add_definition(
dev_task_id,
TaskDefinition {
persistent: true,
task_dependencies: vec![Spanned::new(TaskName::from(build_task_id))],
..Default::default()
},
);
}

let engine = engine.seal();

let persistent_tasks_engine = engine.create_engine_for_persistent_tasks();
for node in persistent_tasks_engine.tasks() {
if let TaskNode::Task(task_id) = node {
let def = persistent_tasks_engine
.task_definition(task_id)
.expect("task should have definition");
assert!(def.persistent, "task should be persistent");
}
}

let non_persistent_tasks_engine = engine.create_engine_without_persistent_tasks();
for node in non_persistent_tasks_engine.tasks() {
if let TaskNode::Task(task_id) = node {
let def = non_persistent_tasks_engine
.task_definition(task_id)
.expect("task should have definition");
assert!(!def.persistent, "task should not be persistent");
}
}
}

#[tokio::test]
async fn test_get_subgraph_for_package() {
// Verifies that we can prune the `Engine` to include only the persistent tasks
// or only the non-persistent tasks.

let mut engine = Engine::new();

// Add two tasks in package `a`
let a_build_task_id = TaskId::new("a", "build");
let a_dev_task_id = TaskId::new("a", "dev");

let a_build_idx = engine.get_index(&a_build_task_id);
engine.add_definition(a_build_task_id.clone(), TaskDefinition::default());

engine.get_index(&a_dev_task_id);
engine.add_definition(a_dev_task_id.clone(), TaskDefinition::default());

// Add two tasks in package `b` where the `build` task depends
// on the `build` task from package `a`
let b_build_task_id = TaskId::new("b", "build");
let b_dev_task_id = TaskId::new("b", "dev");

let b_build_idx = engine.get_index(&b_build_task_id);
engine.add_definition(
b_build_task_id.clone(),
TaskDefinition {
task_dependencies: vec![Spanned::new(TaskName::from(a_build_task_id.clone()))],
..Default::default()
},
);

engine.get_index(&b_dev_task_id);
engine.add_definition(b_dev_task_id.clone(), TaskDefinition::default());
engine.task_graph.add_edge(b_build_idx, a_build_idx, ());

let engine = engine.seal();
let subgraph = engine.create_engine_for_subgraph(&PackageName::from("a"));

// Verify that the subgraph only contains tasks from package `a` and the `build`
// task from package `b`
let tasks: Vec<_> = subgraph.tasks().collect();
assert_eq!(tasks.len(), 3);
assert!(tasks.contains(&&TaskNode::Task(a_build_task_id)));
assert!(tasks.contains(&&TaskNode::Task(a_dev_task_id)));
assert!(tasks.contains(&&TaskNode::Task(b_build_task_id)));
}
}
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/run/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ impl RunBuilder {
run_telemetry,
task_access,
repo_root: self.repo_root,
opts: self.opts,
opts: Arc::new(self.opts),
api_client: self.api_client,
api_auth: self.api_auth,
env_at_execution_start,
Expand Down
22 changes: 21 additions & 1 deletion crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::{
DaemonClient, DaemonConnector,
};

#[derive(Clone)]
pub struct Run {
version: &'static str,
ui: UI,
Expand All @@ -49,7 +50,7 @@ pub struct Run {
processes: ProcessManager,
run_telemetry: GenericEventBuilder,
repo_root: AbsoluteSystemPathBuf,
opts: Opts,
opts: Arc<Opts>,
api_client: APIClient,
api_auth: Option<APIAuth>,
env_at_execution_start: EnvironmentVariableMap,
Expand All @@ -66,6 +67,9 @@ pub struct Run {
}

impl Run {
fn has_persistent_tasks(&self) -> bool {
self.engine.has_persistent_tasks
}
fn print_run_prelude(&self) {
let targets_list = self.opts.run_opts.tasks.join(", ");
if self.opts.run_opts.single_package {
Expand Down Expand Up @@ -97,6 +101,22 @@ impl Run {
}
}

pub fn create_run_for_persistent_tasks(&self) -> Self {
let mut new_run = self.clone();
let new_engine = new_run.engine.create_engine_for_persistent_tasks();
new_run.engine = Arc::new(new_engine);

new_run
}

pub fn create_run_without_persistent_tasks(&self) -> Self {
let mut new_run = self.clone();
let new_engine = new_run.engine.create_engine_without_persistent_tasks();
new_run.engine = Arc::new(new_engine);

new_run
}

pub async fn run(&mut self) -> Result<i32, Error> {
if self.should_print_prelude {
self.print_run_prelude();
Expand Down
12 changes: 11 additions & 1 deletion crates/turborepo-lib/src/run/task_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct TaskId<'a> {
task: Cow<'a, str>,
}

/// A task name as it appears in in a `turbo.json` it might be for all
/// A task name as it appears in a `turbo.json` it might be for all
/// workspaces or just one.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash)]
#[serde(try_from = "String", into = "String")]
Expand All @@ -22,6 +22,16 @@ pub struct TaskName<'a> {
task: Cow<'a, str>,
}

impl<'a> From<TaskId<'a>> for TaskName<'a> {
fn from(value: TaskId<'a>) -> Self {
let TaskId { package, task } = value;
TaskName {
package: Some(package),
task,
}
}
}

#[derive(Debug, thiserror::Error)]
#[error("No workspace found in task id '{input}'")]
pub struct TaskIdError<'a> {
Expand Down

0 comments on commit da53c14

Please sign in to comment.