Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Re-run dependent tasks in watch mode #7898

Merged
merged 4 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions crates/turborepo-lib/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1346,17 +1346,17 @@ fn warn_all_deprecated_flags(args: &Args) {
warn_flag_removal("--cpuprofile");
}

if let Some(Command::Run(run_args)) = args.command.as_ref() {
if run_args.since.is_some() {
if let Some(Command::Run { execution_args, .. }) = args.command.as_ref() {
if execution_args.since.is_some() {
warn_flag_removal("--since");
}
if !run_args.scope.is_empty() {
if !execution_args.scope.is_empty() {
warn_flag_removal("--scope");
}
if run_args.include_dependencies {
if execution_args.include_dependencies {
warn_flag_removal("--include-dependencies");
}
if run_args.no_deps {
if execution_args.no_deps {
warn_flag_removal("--no-deps");
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use turborepo_telemetry::events::command::CommandEventBuilder;
use crate::{commands::CommandBase, run, run::builder::RunBuilder, signal::SignalHandler};

#[cfg(windows)]
pub async fn get_signal() -> Result<impl Future<Output = Option<()>>, run::Error> {
pub fn get_signal() -> Result<impl Future<Output = Option<()>>, run::Error> {
let mut ctrl_c = tokio::signal::windows::ctrl_c().map_err(run::Error::SignalHandler)?;
Ok(async move { ctrl_c.recv().await })
}
Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/engine/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct StopExecution;
impl Engine {
/// Execute a task graph by sending task ids to the visitor
/// while respecting concurrency limits.
/// The visitor is expected to handle any error handling on it's end.
/// The visitor is expected to handle any error handling on its end.
/// We enforce this by only allowing the returning of a sentinel error
/// type which will stop any further execution of tasks.
/// This will not stop any task which is currently running, simply it will
Expand Down
66 changes: 66 additions & 0 deletions crates/turborepo-lib/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct Engine<S = Built> {
task_lookup: HashMap<TaskId<'static>, petgraph::graph::NodeIndex>,
task_definitions: HashMap<TaskId<'static>, TaskDefinition>,
task_locations: HashMap<TaskId<'static>, Spanned<()>>,
package_tasks: HashMap<PackageName, Vec<petgraph::graph::NodeIndex>>,
}

impl Engine<Building> {
Expand All @@ -58,13 +59,19 @@ impl Engine<Building> {
task_lookup: HashMap::default(),
task_definitions: HashMap::default(),
task_locations: HashMap::default(),
package_tasks: HashMap::default(),
}
}

pub fn get_index(&mut self, task_id: &TaskId<'static>) -> petgraph::graph::NodeIndex {
self.task_lookup.get(task_id).copied().unwrap_or_else(|| {
let index = self.task_graph.add_node(TaskNode::Task(task_id.clone()));
self.task_lookup.insert(task_id.clone(), index);
self.package_tasks
.entry(PackageName::from(task_id.package()))
.or_default()
.push(index);

index
})
}
Expand Down Expand Up @@ -103,6 +110,7 @@ impl Engine<Building> {
root_index,
task_definitions,
task_locations,
package_tasks,
..
} = self;
Engine {
Expand All @@ -112,6 +120,7 @@ impl Engine<Building> {
root_index,
task_definitions,
task_locations,
package_tasks,
}
}
}
Expand All @@ -123,6 +132,63 @@ impl Default for Engine<Building> {
}

impl Engine<Built> {
/// Creates an instance of `Engine` that only contains tasks that depend on
/// tasks from a given package. This is useful for watch mode, where we
/// need to re-run only a portion of the task graph.
pub fn create_engine_for_subgraph(&self, changed_package: &PackageName) -> Engine<Built> {
let entrypoint_indices: &[petgraph::graph::NodeIndex] = self
.package_tasks
.get(changed_package)
.map_or(&[], |v| &v[..]);

// We reverse the graph because we want the *dependents* of entrypoint tasks
let mut reversed_graph = self.task_graph.clone();
reversed_graph.reverse();

// This is `O(V^3)`, so in theory a bottleneck. Running dijkstra's
// algorithm for each entrypoint task could potentially be faster.
let node_distances = petgraph::algo::floyd_warshall::floyd_warshall(&reversed_graph, |_| 1)
.expect("no negative cycles");

let new_graph = self.task_graph.filter_map(
|node_idx, node| {
// If the node is reachable from any of the entrypoint tasks, we include it
entrypoint_indices
.iter()
.any(|idx| {
node_distances
.get(&(*idx, node_idx))
.map_or(false, |dist| *dist != i32::MAX)
})
.then_some(node.clone())
},
|_, _| Some(()),
);

let task_lookup: HashMap<_, _> = new_graph
.node_indices()
.filter_map(|index| {
let task = new_graph
.node_weight(index)
.expect("node index should be present");
match task {
TaskNode::Root => None,
TaskNode::Task(task) => Some((task.clone(), index)),
}
})
.collect();

Engine {
marker: std::marker::PhantomData,
root_index: self.root_index,
task_graph: new_graph,
task_lookup,
task_definitions: self.task_definitions.clone(),
task_locations: self.task_locations.clone(),
package_tasks: self.package_tasks.clone(),
}
}

pub fn dependencies(&self, task_id: &TaskId) -> Option<HashSet<&TaskNode>> {
self.neighbors(task_id, petgraph::Direction::Outgoing)
}
Expand Down
31 changes: 30 additions & 1 deletion crates/turborepo-lib/src/run/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ pub struct RunBuilder {
experimental_ui: bool,
api_client: APIClient,
analytics_sender: Option<AnalyticsSender>,
// In watch mode, we can have a changed package that we want to serve as an entrypoint.
// We will then prune away any tasks that do not depend on tasks inside
// this package.
entrypoint_package: Option<PackageName>,
should_print_prelude_override: Option<bool>,
}

impl RunBuilder {
Expand Down Expand Up @@ -98,6 +103,7 @@ impl RunBuilder {
(!cfg!(windows) || experimental_ui),
);
let CommandBase { repo_root, ui, .. } = base;

Ok(Self {
processes,
opts,
Expand All @@ -108,9 +114,21 @@ impl RunBuilder {
version,
experimental_ui,
analytics_sender: None,
entrypoint_package: None,
should_print_prelude_override: None,
})
}

pub fn with_entrypoint_package(mut self, entrypoint_package: PackageName) -> Self {
self.entrypoint_package = Some(entrypoint_package);
self
}

pub fn hide_prelude(mut self) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this because the prelude makes less sense with re-run tasks. It prints the packages in scope, but the tasks in those packages are not always run, since the task graph is pruned.

self.should_print_prelude_override = Some(false);
self
}

fn connect_process_manager(&self, signal_subscriber: SignalSubscriber) {
let manager = self.processes.clone();
tokio::spawn(async move {
Expand Down Expand Up @@ -377,6 +395,10 @@ impl RunBuilder {
self.opts.run_opts.env_mode = EnvMode::Strict;
}

let should_print_prelude = self.should_print_prelude_override.unwrap_or_else(|| {
self.opts.run_opts.dry_run.is_none() && self.opts.run_opts.graph.is_none()
});

Ok(Run {
version: self.version,
ui: self.ui,
Expand All @@ -397,6 +419,7 @@ impl RunBuilder {
engine: Arc::new(engine),
run_cache,
signal_handler: signal_handler.clone(),
should_print_prelude,
})
}

Expand All @@ -406,7 +429,7 @@ impl RunBuilder {
root_turbo_json: &TurboJson,
filtered_pkgs: &HashSet<PackageName>,
) -> Result<Engine, Error> {
let engine = EngineBuilder::new(
let mut engine = EngineBuilder::new(
&self.repo_root,
pkg_dep_graph,
self.opts.run_opts.single_package,
Expand All @@ -425,6 +448,12 @@ impl RunBuilder {
}))
.build()?;

// If we have an initial task, we prune out the engine to only
// tasks that are reachable from that initial task.
if let Some(entrypoint_package) = &self.entrypoint_package {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will eventually need to be a set of packages, especially with debouncing and waiting for current runs to finish being cancelled or completed

engine = engine.create_engine_for_subgraph(entrypoint_package);
}

if !self.opts.run_opts.parallel {
engine
.validate(
Expand Down
3 changes: 2 additions & 1 deletion crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub struct Run {
signal_handler: SignalHandler,
engine: Arc<Engine>,
task_access: TaskAccess,
should_print_prelude: bool,
}

impl Run {
Expand Down Expand Up @@ -95,7 +96,7 @@ impl Run {
}

pub async fn run(&self) -> Result<i32, Error> {
if self.opts.run_opts.dry_run.is_none() && self.opts.run_opts.graph.is_none() {
if self.should_print_prelude {
self.print_run_prelude();
}
if let Some(subscriber) = self.signal_handler.subscribe() {
Expand Down
9 changes: 9 additions & 0 deletions crates/turborepo-lib/src/run/task_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ impl<'a> From<TaskId<'a>> for String {
}
}

impl TaskId<'static> {
pub fn from_static(package: String, task: String) -> Self {
TaskId {
package: package.into(),
task: task.into(),
}
}
}

impl<'a> TaskId<'a> {
pub fn new(package: &'a str, task: &'a str) -> Self {
TaskId::try_from(task).unwrap_or_else(|_| Self {
Expand Down
22 changes: 13 additions & 9 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl WatchClient {

let mut new_base = base.clone();
new_base.args_mut().command = Some(Command::Run {
run_args: Box::new(RunArgs::default()),
run_args: Box::default(),
execution_args: execution_args.clone(),
});

Expand All @@ -79,6 +79,8 @@ impl WatchClient {
.build(&handler, telemetry.clone())
.await?;

run.print_run_prelude();

let connector = DaemonConnector {
can_start_server: true,
can_kill_server: true,
Expand Down Expand Up @@ -143,10 +145,7 @@ impl WatchClient {
args.command = args.command.map(|c| {
if let Command::Watch(execution_args) = c {
Command::Run {
execution_args: Box::new(ExecutionArgs {
filter: vec![format!("...{}", package_name)],
..*execution_args
}),
execution_args: Box::new(ExecutionArgs { ..*execution_args }),
run_args: Box::new(RunArgs {
no_cache: true,
daemon: true,
Expand All @@ -166,14 +165,18 @@ impl WatchClient {
run.abort();
}

let signal_handler = handler.clone();
let telemetry = telemetry.clone();
let handler = handler.clone();

current_runs.insert(
package_name,
package_name.clone(),
tokio::spawn(async move {
let mut run = RunBuilder::new(new_base)?
.build(&handler, telemetry)
let run = RunBuilder::new(new_base)?
.with_entrypoint_package(package_name)
.hide_prelude()
.build(&signal_handler, telemetry)
.await?;

run.run().await
}),
);
Expand Down Expand Up @@ -204,6 +207,7 @@ impl WatchClient {

// rebuild run struct
*run = RunBuilder::new(base.clone())?
.hide_prelude()
.build(handler, telemetry.clone())
.await?;

Expand Down
16 changes: 16 additions & 0 deletions crates/turborepo-lib/src/turbo_json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,22 @@ impl TurboJson {
}
};

if repo_root.join_component("Cargo.toml").exists() {
turbo_json.pipeline.insert(
TaskName::from("build").into_root_task(),
Spanned::new(RawTaskDefinition {
cache: Some(Spanned::new(true)),
inputs: Some(vec![
Spanned::new("Cargo.toml".into()),
Spanned::new("Cargo.lock".into()),
Spanned::new("**/*.rs".into()),
]),
outputs: Some(vec![Spanned::new("target".into())]),
..RawTaskDefinition::default()
}),
);
}

// TODO: Add location info from package.json
for script_name in root_package_json.scripts.keys() {
let task_name = TaskName::from(script_name.as_str());
Expand Down
4 changes: 2 additions & 2 deletions turborepo-tests/integration/tests/conflicting-flags.t
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
Setup
$ . ${TESTDIR}/../../helpers/setup_integration_test.sh
$ ${TURBO} run build --daemon --no-daemon
ERROR the argument '--daemon' cannot be used with '--no-daemon'
ERROR the argument '--[no-]daemon' cannot be used with '--no-daemon'

Usage: turbo(\.exe)? run --daemon (re)
Usage: turbo(\.exe)? run --\[no-\]daemon (re)

For more information, try '--help'.

Expand Down