From 527ad4fed8bc18ae772ed2187e677c77583660a4 Mon Sep 17 00:00:00 2001 From: Nicholas Yang Date: Thu, 18 Apr 2024 10:24:20 -0400 Subject: [PATCH] feat: Re-run dependent tasks in watch mode (#7898) Instead of using `--filter` in our re-running of tasks for watch mode, we instead prune the task graph to run the changed task and its dependent tasks. This has the benefit of following the task graph and not the package graph. You can verify this works by creating a task dependency relation that is not in the package graph. So, make a `ui#test` task that depends on a `docs#test` task, but don't have `ui` depend on `docs`. Then verify that when the `docs` are changed, we re-run the `ui#test` task too. Closes TURBO-2752 --- crates/turborepo-lib/src/engine/execute.rs | 2 +- crates/turborepo-lib/src/engine/mod.rs | 66 +++++++++++++++++++ crates/turborepo-lib/src/run/builder.rs | 31 ++++++++- crates/turborepo-lib/src/run/mod.rs | 3 +- crates/turborepo-lib/src/run/task_id.rs | 9 +++ crates/turborepo-lib/src/run/watch.rs | 22 ++++--- crates/turborepo-lib/src/turbo_json/mod.rs | 16 +++++ .../integration/tests/conflicting-flags.t | 3 +- 8 files changed, 139 insertions(+), 13 deletions(-) diff --git a/crates/turborepo-lib/src/engine/execute.rs b/crates/turborepo-lib/src/engine/execute.rs index 699319444bb2f..067c263e71130 100644 --- a/crates/turborepo-lib/src/engine/execute.rs +++ b/crates/turborepo-lib/src/engine/execute.rs @@ -55,7 +55,7 @@ pub struct StopExecution; impl Engine { /// Execute a task graph by sending task ids to the visitor /// while respecting concurrency limits. - /// The visitor is expected to handle any error handling on it's end. + /// The visitor is expected to handle any error handling on its end. /// We enforce this by only allowing the returning of a sentinel error /// type which will stop any further execution of tasks. /// This will not stop any task which is currently running, simply it will diff --git a/crates/turborepo-lib/src/engine/mod.rs b/crates/turborepo-lib/src/engine/mod.rs index f65f504c3b72e..92347529b805e 100644 --- a/crates/turborepo-lib/src/engine/mod.rs +++ b/crates/turborepo-lib/src/engine/mod.rs @@ -45,6 +45,7 @@ pub struct Engine { task_lookup: HashMap, petgraph::graph::NodeIndex>, task_definitions: HashMap, TaskDefinition>, task_locations: HashMap, Spanned<()>>, + package_tasks: HashMap>, } impl Engine { @@ -58,6 +59,7 @@ impl Engine { task_lookup: HashMap::default(), task_definitions: HashMap::default(), task_locations: HashMap::default(), + package_tasks: HashMap::default(), } } @@ -65,6 +67,11 @@ impl Engine { self.task_lookup.get(task_id).copied().unwrap_or_else(|| { let index = self.task_graph.add_node(TaskNode::Task(task_id.clone())); self.task_lookup.insert(task_id.clone(), index); + self.package_tasks + .entry(PackageName::from(task_id.package())) + .or_default() + .push(index); + index }) } @@ -103,6 +110,7 @@ impl Engine { root_index, task_definitions, task_locations, + package_tasks, .. } = self; Engine { @@ -112,6 +120,7 @@ impl Engine { root_index, task_definitions, task_locations, + package_tasks, } } } @@ -123,6 +132,63 @@ impl Default for Engine { } impl Engine { + /// Creates an instance of `Engine` that only contains tasks that depend on + /// tasks from a given package. This is useful for watch mode, where we + /// need to re-run only a portion of the task graph. + pub fn create_engine_for_subgraph(&self, changed_package: &PackageName) -> Engine { + let entrypoint_indices: &[petgraph::graph::NodeIndex] = self + .package_tasks + .get(changed_package) + .map_or(&[], |v| &v[..]); + + // We reverse the graph because we want the *dependents* of entrypoint tasks + let mut reversed_graph = self.task_graph.clone(); + reversed_graph.reverse(); + + // This is `O(V^3)`, so in theory a bottleneck. Running dijkstra's + // algorithm for each entrypoint task could potentially be faster. + let node_distances = petgraph::algo::floyd_warshall::floyd_warshall(&reversed_graph, |_| 1) + .expect("no negative cycles"); + + let new_graph = self.task_graph.filter_map( + |node_idx, node| { + // If the node is reachable from any of the entrypoint tasks, we include it + entrypoint_indices + .iter() + .any(|idx| { + node_distances + .get(&(*idx, node_idx)) + .map_or(false, |dist| *dist != i32::MAX) + }) + .then_some(node.clone()) + }, + |_, _| Some(()), + ); + + let task_lookup: HashMap<_, _> = new_graph + .node_indices() + .filter_map(|index| { + let task = new_graph + .node_weight(index) + .expect("node index should be present"); + match task { + TaskNode::Root => None, + TaskNode::Task(task) => Some((task.clone(), index)), + } + }) + .collect(); + + Engine { + marker: std::marker::PhantomData, + root_index: self.root_index, + task_graph: new_graph, + task_lookup, + task_definitions: self.task_definitions.clone(), + task_locations: self.task_locations.clone(), + package_tasks: self.package_tasks.clone(), + } + } + pub fn dependencies(&self, task_id: &TaskId) -> Option> { self.neighbors(task_id, petgraph::Direction::Outgoing) } diff --git a/crates/turborepo-lib/src/run/builder.rs b/crates/turborepo-lib/src/run/builder.rs index e054fd65432ab..a0a92eba97675 100644 --- a/crates/turborepo-lib/src/run/builder.rs +++ b/crates/turborepo-lib/src/run/builder.rs @@ -59,6 +59,11 @@ pub struct RunBuilder { experimental_ui: bool, api_client: APIClient, analytics_sender: Option, + // In watch mode, we can have a changed package that we want to serve as an entrypoint. + // We will then prune away any tasks that do not depend on tasks inside + // this package. + entrypoint_package: Option, + should_print_prelude_override: Option, } impl RunBuilder { @@ -98,6 +103,7 @@ impl RunBuilder { (!cfg!(windows) || experimental_ui), ); let CommandBase { repo_root, ui, .. } = base; + Ok(Self { processes, opts, @@ -108,9 +114,21 @@ impl RunBuilder { version, experimental_ui, analytics_sender: None, + entrypoint_package: None, + should_print_prelude_override: None, }) } + pub fn with_entrypoint_package(mut self, entrypoint_package: PackageName) -> Self { + self.entrypoint_package = Some(entrypoint_package); + self + } + + pub fn hide_prelude(mut self) -> Self { + self.should_print_prelude_override = Some(false); + self + } + fn connect_process_manager(&self, signal_subscriber: SignalSubscriber) { let manager = self.processes.clone(); tokio::spawn(async move { @@ -377,6 +395,10 @@ impl RunBuilder { self.opts.run_opts.env_mode = EnvMode::Strict; } + let should_print_prelude = self.should_print_prelude_override.unwrap_or_else(|| { + self.opts.run_opts.dry_run.is_none() && self.opts.run_opts.graph.is_none() + }); + Ok(Run { version: self.version, ui: self.ui, @@ -398,6 +420,7 @@ impl RunBuilder { run_cache, signal_handler: signal_handler.clone(), daemon, + should_print_prelude, }) } @@ -407,7 +430,7 @@ impl RunBuilder { root_turbo_json: &TurboJson, filtered_pkgs: &HashSet, ) -> Result { - let engine = EngineBuilder::new( + let mut engine = EngineBuilder::new( &self.repo_root, pkg_dep_graph, self.opts.run_opts.single_package, @@ -426,6 +449,12 @@ impl RunBuilder { })) .build()?; + // If we have an initial task, we prune out the engine to only + // tasks that are reachable from that initial task. + if let Some(entrypoint_package) = &self.entrypoint_package { + engine = engine.create_engine_for_subgraph(entrypoint_package); + } + if !self.opts.run_opts.parallel { engine .validate( diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index d2b8f3222d184..4e7e3075c1152 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -62,6 +62,7 @@ pub struct Run { engine: Arc, task_access: TaskAccess, daemon: Option>, + should_print_prelude: bool, } impl Run { @@ -97,7 +98,7 @@ impl Run { } pub async fn run(&mut self) -> Result { - if self.opts.run_opts.dry_run.is_none() && self.opts.run_opts.graph.is_none() { + if self.should_print_prelude { self.print_run_prelude(); } if let Some(subscriber) = self.signal_handler.subscribe() { diff --git a/crates/turborepo-lib/src/run/task_id.rs b/crates/turborepo-lib/src/run/task_id.rs index ed18580837e33..b56e32d91b8b6 100644 --- a/crates/turborepo-lib/src/run/task_id.rs +++ b/crates/turborepo-lib/src/run/task_id.rs @@ -43,6 +43,15 @@ impl<'a> From> for String { } } +impl TaskId<'static> { + pub fn from_static(package: String, task: String) -> Self { + TaskId { + package: package.into(), + task: task.into(), + } + } +} + impl<'a> TaskId<'a> { pub fn new(package: &'a str, task: &'a str) -> Self { TaskId::try_from(task).unwrap_or_else(|_| Self { diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index 9a921e3c64a49..c89f6ed0e6f8d 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -8,7 +8,7 @@ use turborepo_repository::package_graph::PackageName; use turborepo_telemetry::events::command::CommandEventBuilder; use crate::{ - cli::{Command, ExecutionArgs, RunArgs}, + cli::{Command, RunArgs}, commands, commands::CommandBase, daemon::{proto, DaemonConnectorError, DaemonError}, @@ -79,6 +79,8 @@ impl WatchClient { .build(&handler, telemetry.clone()) .await?; + run.print_run_prelude(); + let connector = DaemonConnector { can_start_server: true, can_kill_server: true, @@ -143,10 +145,7 @@ impl WatchClient { args.command = args.command.map(|c| { if let Command::Watch(execution_args) = c { Command::Run { - execution_args: Box::new(ExecutionArgs { - filter: vec![format!("...{}", package_name)], - ..*execution_args - }), + execution_args, run_args: Box::new(RunArgs { no_cache: true, daemon: true, @@ -166,14 +165,18 @@ impl WatchClient { run.abort(); } + let signal_handler = handler.clone(); let telemetry = telemetry.clone(); - let handler = handler.clone(); + current_runs.insert( - package_name, + package_name.clone(), tokio::spawn(async move { - let run = RunBuilder::new(new_base)? - .build(&handler, telemetry) + let mut run = RunBuilder::new(new_base)? + .with_entrypoint_package(package_name) + .hide_prelude() + .build(&signal_handler, telemetry) .await?; + run.run().await }), ); @@ -204,6 +207,7 @@ impl WatchClient { // rebuild run struct *run = RunBuilder::new(base.clone())? + .hide_prelude() .build(handler, telemetry.clone()) .await?; diff --git a/crates/turborepo-lib/src/turbo_json/mod.rs b/crates/turborepo-lib/src/turbo_json/mod.rs index 2de4b4533a046..14d292e0b02d9 100644 --- a/crates/turborepo-lib/src/turbo_json/mod.rs +++ b/crates/turborepo-lib/src/turbo_json/mod.rs @@ -604,6 +604,22 @@ impl TurboJson { } }; + if repo_root.join_component("Cargo.toml").exists() { + turbo_json.pipeline.insert( + TaskName::from("build").into_root_task(), + Spanned::new(RawTaskDefinition { + cache: Some(Spanned::new(true)), + inputs: Some(vec![ + Spanned::new("Cargo.toml".into()), + Spanned::new("Cargo.lock".into()), + Spanned::new("**/*.rs".into()), + ]), + outputs: Some(vec![Spanned::new("target".into())]), + ..RawTaskDefinition::default() + }), + ); + } + // TODO: Add location info from package.json for script_name in root_package_json.scripts.keys() { let task_name = TaskName::from(script_name.as_str()); diff --git a/turborepo-tests/integration/tests/conflicting-flags.t b/turborepo-tests/integration/tests/conflicting-flags.t index e961d634f8a2a..872ea520d1b9e 100644 --- a/turborepo-tests/integration/tests/conflicting-flags.t +++ b/turborepo-tests/integration/tests/conflicting-flags.t @@ -3,11 +3,12 @@ Setup $ ${TURBO} run build --daemon --no-daemon ERROR the argument '--\[no-\]daemon' cannot be used with '--no-daemon' (re) - Usage: turbo(.exe)? run --\[no-\]daemon (re) + Usage: turbo(\.exe)? run --\[no-\]daemon (re) For more information, try '--help'. [1] + $ ${TURBO} run build --since main ERROR the following required arguments were not provided: --scope