Skip to content

Commit

Permalink
feat: Re-run dependent tasks in watch mode (#7898)
Browse files Browse the repository at this point in the history
Instead of using `--filter` in our re-running of tasks for watch mode,
we instead prune the task graph to run the changed task and its
dependent tasks. This has the benefit of following the task graph and
not the package graph.

You can verify this works by creating a task dependency relation that is
not in the package graph. So, make a `ui#test` task that depends on a
`docs#test` task, but don't have `ui` depend on `docs`. Then verify that
when the `docs` are changed, we re-run the `ui#test` task too.

Closes TURBO-2752
  • Loading branch information
NicholasLYang committed Apr 19, 2024
1 parent 7f693b0 commit 527ad4f
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 13 deletions.
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/engine/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct StopExecution;
impl Engine {
/// Execute a task graph by sending task ids to the visitor
/// while respecting concurrency limits.
/// The visitor is expected to handle any error handling on it's end.
/// The visitor is expected to handle any error handling on its end.
/// We enforce this by only allowing the returning of a sentinel error
/// type which will stop any further execution of tasks.
/// This will not stop any task which is currently running, simply it will
Expand Down
66 changes: 66 additions & 0 deletions crates/turborepo-lib/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct Engine<S = Built> {
task_lookup: HashMap<TaskId<'static>, petgraph::graph::NodeIndex>,
task_definitions: HashMap<TaskId<'static>, TaskDefinition>,
task_locations: HashMap<TaskId<'static>, Spanned<()>>,
package_tasks: HashMap<PackageName, Vec<petgraph::graph::NodeIndex>>,
}

impl Engine<Building> {
Expand All @@ -58,13 +59,19 @@ impl Engine<Building> {
task_lookup: HashMap::default(),
task_definitions: HashMap::default(),
task_locations: HashMap::default(),
package_tasks: HashMap::default(),
}
}

pub fn get_index(&mut self, task_id: &TaskId<'static>) -> petgraph::graph::NodeIndex {
self.task_lookup.get(task_id).copied().unwrap_or_else(|| {
let index = self.task_graph.add_node(TaskNode::Task(task_id.clone()));
self.task_lookup.insert(task_id.clone(), index);
self.package_tasks
.entry(PackageName::from(task_id.package()))
.or_default()
.push(index);

index
})
}
Expand Down Expand Up @@ -103,6 +110,7 @@ impl Engine<Building> {
root_index,
task_definitions,
task_locations,
package_tasks,
..
} = self;
Engine {
Expand All @@ -112,6 +120,7 @@ impl Engine<Building> {
root_index,
task_definitions,
task_locations,
package_tasks,
}
}
}
Expand All @@ -123,6 +132,63 @@ impl Default for Engine<Building> {
}

impl Engine<Built> {
/// Creates an instance of `Engine` that only contains tasks that depend on
/// tasks from a given package. This is useful for watch mode, where we
/// need to re-run only a portion of the task graph.
pub fn create_engine_for_subgraph(&self, changed_package: &PackageName) -> Engine<Built> {
let entrypoint_indices: &[petgraph::graph::NodeIndex] = self
.package_tasks
.get(changed_package)
.map_or(&[], |v| &v[..]);

// We reverse the graph because we want the *dependents* of entrypoint tasks
let mut reversed_graph = self.task_graph.clone();
reversed_graph.reverse();

// This is `O(V^3)`, so in theory a bottleneck. Running dijkstra's
// algorithm for each entrypoint task could potentially be faster.
let node_distances = petgraph::algo::floyd_warshall::floyd_warshall(&reversed_graph, |_| 1)
.expect("no negative cycles");

let new_graph = self.task_graph.filter_map(
|node_idx, node| {
// If the node is reachable from any of the entrypoint tasks, we include it
entrypoint_indices
.iter()
.any(|idx| {
node_distances
.get(&(*idx, node_idx))
.map_or(false, |dist| *dist != i32::MAX)
})
.then_some(node.clone())
},
|_, _| Some(()),
);

let task_lookup: HashMap<_, _> = new_graph
.node_indices()
.filter_map(|index| {
let task = new_graph
.node_weight(index)
.expect("node index should be present");
match task {
TaskNode::Root => None,
TaskNode::Task(task) => Some((task.clone(), index)),
}
})
.collect();

Engine {
marker: std::marker::PhantomData,
root_index: self.root_index,
task_graph: new_graph,
task_lookup,
task_definitions: self.task_definitions.clone(),
task_locations: self.task_locations.clone(),
package_tasks: self.package_tasks.clone(),
}
}

pub fn dependencies(&self, task_id: &TaskId) -> Option<HashSet<&TaskNode>> {
self.neighbors(task_id, petgraph::Direction::Outgoing)
}
Expand Down
31 changes: 30 additions & 1 deletion crates/turborepo-lib/src/run/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ pub struct RunBuilder {
experimental_ui: bool,
api_client: APIClient,
analytics_sender: Option<AnalyticsSender>,
// In watch mode, we can have a changed package that we want to serve as an entrypoint.
// We will then prune away any tasks that do not depend on tasks inside
// this package.
entrypoint_package: Option<PackageName>,
should_print_prelude_override: Option<bool>,
}

impl RunBuilder {
Expand Down Expand Up @@ -98,6 +103,7 @@ impl RunBuilder {
(!cfg!(windows) || experimental_ui),
);
let CommandBase { repo_root, ui, .. } = base;

Ok(Self {
processes,
opts,
Expand All @@ -108,9 +114,21 @@ impl RunBuilder {
version,
experimental_ui,
analytics_sender: None,
entrypoint_package: None,
should_print_prelude_override: None,
})
}

pub fn with_entrypoint_package(mut self, entrypoint_package: PackageName) -> Self {
self.entrypoint_package = Some(entrypoint_package);
self
}

pub fn hide_prelude(mut self) -> Self {
self.should_print_prelude_override = Some(false);
self
}

fn connect_process_manager(&self, signal_subscriber: SignalSubscriber) {
let manager = self.processes.clone();
tokio::spawn(async move {
Expand Down Expand Up @@ -377,6 +395,10 @@ impl RunBuilder {
self.opts.run_opts.env_mode = EnvMode::Strict;
}

let should_print_prelude = self.should_print_prelude_override.unwrap_or_else(|| {
self.opts.run_opts.dry_run.is_none() && self.opts.run_opts.graph.is_none()
});

Ok(Run {
version: self.version,
ui: self.ui,
Expand All @@ -398,6 +420,7 @@ impl RunBuilder {
run_cache,
signal_handler: signal_handler.clone(),
daemon,
should_print_prelude,
})
}

Expand All @@ -407,7 +430,7 @@ impl RunBuilder {
root_turbo_json: &TurboJson,
filtered_pkgs: &HashSet<PackageName>,
) -> Result<Engine, Error> {
let engine = EngineBuilder::new(
let mut engine = EngineBuilder::new(
&self.repo_root,
pkg_dep_graph,
self.opts.run_opts.single_package,
Expand All @@ -426,6 +449,12 @@ impl RunBuilder {
}))
.build()?;

// If we have an initial task, we prune out the engine to only
// tasks that are reachable from that initial task.
if let Some(entrypoint_package) = &self.entrypoint_package {
engine = engine.create_engine_for_subgraph(entrypoint_package);
}

if !self.opts.run_opts.parallel {
engine
.validate(
Expand Down
3 changes: 2 additions & 1 deletion crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub struct Run {
engine: Arc<Engine>,
task_access: TaskAccess,
daemon: Option<DaemonClient<DaemonConnector>>,
should_print_prelude: bool,
}

impl Run {
Expand Down Expand Up @@ -97,7 +98,7 @@ impl Run {
}

pub async fn run(&mut self) -> Result<i32, Error> {
if self.opts.run_opts.dry_run.is_none() && self.opts.run_opts.graph.is_none() {
if self.should_print_prelude {
self.print_run_prelude();
}
if let Some(subscriber) = self.signal_handler.subscribe() {
Expand Down
9 changes: 9 additions & 0 deletions crates/turborepo-lib/src/run/task_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ impl<'a> From<TaskId<'a>> for String {
}
}

impl TaskId<'static> {
pub fn from_static(package: String, task: String) -> Self {
TaskId {
package: package.into(),
task: task.into(),
}
}
}

impl<'a> TaskId<'a> {
pub fn new(package: &'a str, task: &'a str) -> Self {
TaskId::try_from(task).unwrap_or_else(|_| Self {
Expand Down
22 changes: 13 additions & 9 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use turborepo_repository::package_graph::PackageName;
use turborepo_telemetry::events::command::CommandEventBuilder;

use crate::{
cli::{Command, ExecutionArgs, RunArgs},
cli::{Command, RunArgs},
commands,
commands::CommandBase,
daemon::{proto, DaemonConnectorError, DaemonError},
Expand Down Expand Up @@ -79,6 +79,8 @@ impl WatchClient {
.build(&handler, telemetry.clone())
.await?;

run.print_run_prelude();

let connector = DaemonConnector {
can_start_server: true,
can_kill_server: true,
Expand Down Expand Up @@ -143,10 +145,7 @@ impl WatchClient {
args.command = args.command.map(|c| {
if let Command::Watch(execution_args) = c {
Command::Run {
execution_args: Box::new(ExecutionArgs {
filter: vec![format!("...{}", package_name)],
..*execution_args
}),
execution_args,
run_args: Box::new(RunArgs {
no_cache: true,
daemon: true,
Expand All @@ -166,14 +165,18 @@ impl WatchClient {
run.abort();
}

let signal_handler = handler.clone();
let telemetry = telemetry.clone();
let handler = handler.clone();

current_runs.insert(
package_name,
package_name.clone(),
tokio::spawn(async move {
let run = RunBuilder::new(new_base)?
.build(&handler, telemetry)
let mut run = RunBuilder::new(new_base)?
.with_entrypoint_package(package_name)
.hide_prelude()
.build(&signal_handler, telemetry)
.await?;

run.run().await
}),
);
Expand Down Expand Up @@ -204,6 +207,7 @@ impl WatchClient {

// rebuild run struct
*run = RunBuilder::new(base.clone())?
.hide_prelude()
.build(handler, telemetry.clone())
.await?;

Expand Down
16 changes: 16 additions & 0 deletions crates/turborepo-lib/src/turbo_json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,22 @@ impl TurboJson {
}
};

if repo_root.join_component("Cargo.toml").exists() {
turbo_json.pipeline.insert(
TaskName::from("build").into_root_task(),
Spanned::new(RawTaskDefinition {
cache: Some(Spanned::new(true)),
inputs: Some(vec![
Spanned::new("Cargo.toml".into()),
Spanned::new("Cargo.lock".into()),
Spanned::new("**/*.rs".into()),
]),
outputs: Some(vec![Spanned::new("target".into())]),
..RawTaskDefinition::default()
}),
);
}

// TODO: Add location info from package.json
for script_name in root_package_json.scripts.keys() {
let task_name = TaskName::from(script_name.as_str());
Expand Down
3 changes: 2 additions & 1 deletion turborepo-tests/integration/tests/conflicting-flags.t
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ Setup
$ ${TURBO} run build --daemon --no-daemon
ERROR the argument '--\[no-\]daemon' cannot be used with '--no-daemon' (re)

Usage: turbo(.exe)? run --\[no-\]daemon (re)
Usage: turbo(\.exe)? run --\[no-\]daemon (re)

For more information, try '--help'.

[1]

$ ${TURBO} run build --since main
ERROR the following required arguments were not provided:
--scope <SCOPE>
Expand Down

0 comments on commit 527ad4f

Please sign in to comment.