From 74b7de2639809b2cc8ab2b1d7371f1a45a669abd Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 13 Nov 2025 11:21:40 +0100 Subject: [PATCH 1/8] Split AggregatedDirtyContainer --- .../turbo-tasks-backend/src/backend/mod.rs | 29 ++--- .../backend/operation/aggregation_update.rs | 106 ++++++++++++++---- .../src/backend/operation/mod.rs | 27 +++++ .../src/backend/storage.rs | 32 ++++++ .../crates/turbo-tasks-backend/src/data.rs | 16 ++- 5 files changed, 168 insertions(+), 42 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index e8ed5d7945721..1fec5147e0287 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -582,14 +582,7 @@ impl TurboTasksBackendInner { .set_active_until_clean(); if ctx.should_track_activeness() { // A newly added Activeness need to make sure to schedule the tasks - task_ids_to_schedule = get_many!( - task, - AggregatedDirtyContainer { - task - } count if count.get(self.session_id) > 0 => { - task - } - ); + task_ids_to_schedule = task.dirty_containers(self.session_id).collect(); task_ids_to_schedule.push(task_id); } get!(task, Activeness).unwrap() @@ -650,16 +643,8 @@ impl TurboTasksBackendInner { "{task_id} {task_description}{count} (aggr={aggregation_number}, \ {in_progress}, {activeness}{is_dirty})", ); - let children: Vec<_> = iter_many!( - task, - AggregatedDirtyContainer { - task - } count => { - (task, count.get(ctx.session_id())) - } - ) - .filter(|(_, count)| *count > 0) - .collect(); + let children: Vec<_> = + task.dirty_containers_with_count(ctx.session_id()).collect(); drop(task); if missing_upper { @@ -2973,9 +2958,9 @@ impl TurboTasksBackendInner { } } - let is_dirty = task.is_dirty(self.session_id); - let has_dirty_container = get!(task, AggregatedDirtyContainerCount) - .is_some_and(|count| count.get(self.session_id) > 0); + let is_dirty = get!(task, Dirty).is_some(); + let has_dirty_container = + get!(task, AggregatedDirtyContainerCount).is_some_and(|count| count.count > 0); let should_be_in_upper = is_dirty || has_dirty_container; let aggregation_number = get_aggregation_number(&task); @@ -3000,7 +2985,7 @@ impl TurboTasksBackendInner { for upper_id in uppers { let task = ctx.task(task_id, TaskDataCategory::All); let in_upper = get!(task, AggregatedDirtyContainer { task: task_id }) - .is_some_and(|dirty| dirty.get(self.session_id) > 0); + .is_some_and(|&dirty| dirty > 0); if !in_upper { panic!( "Task {} ({}) is dirty, but is not listed in the upper task {} \ diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index d483ce43c6e45..203109c74478b 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -28,7 +28,9 @@ use crate::{ backend::{ TaskDataCategory, get_mut, get_mut_or_insert_with, operation::{ExecuteContext, Operation, TaskGuard, invalidate::make_task_dirty}, - storage::{count, get, get_many, iter_many, remove, update, update_count}, + storage::{ + count, get, get_many, iter_many, remove, update, update_count, update_count_and_get, + }, }, data::{ ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType, @@ -327,34 +329,100 @@ impl AggregatedDataUpdate { collectibles_update, } = self; let mut result = Self::default(); - if let &Some((dirty_container_id, count, session_dependent_clean_update)) = + if let &Some((dirty_container_id, count, current_session_clean_update)) = dirty_container_update { if should_track_activeness { // When a dirty container count is increased and the task is considered as active // we need to schedule the dirty tasks in the new dirty container - let current_session_update = count - *session_dependent_clean_update; + let current_session_update = count - *current_session_clean_update; if current_session_update > 0 && task.has_key(&CachedDataItemKey::Activeness {}) { queue.push_find_and_schedule_dirty(dirty_container_id) } } - let mut aggregated_update = DirtyContainerCount::default(); - update!( - task, - AggregatedDirtyContainer { - task: dirty_container_id - }, - |old: Option| { - let mut new = old.unwrap_or_default(); - aggregated_update = - new.update_count(&DirtyContainerCount::from_current_session_clean( - count, - current_session_id, - *session_dependent_clean_update, - )); - (!new.is_zero()).then_some(new) + let mut aggregated_count_update = 0; + let mut aggregated_current_session_clean_update = 0; + let old_aggregated_dirty_container_count; + let new_aggregated_dirty_container_count; + let old_current_session_clean_count; + let new_current_session_clean_count; + + if count != 0 { + new_aggregated_dirty_container_count = update_count_and_get!( + task, + AggregatedDirtyContainer { + task: dirty_container_id + }, + count + ); + old_aggregated_dirty_container_count = new_aggregated_dirty_container_count - count; + match ( + old_aggregated_dirty_container_count > 0, + new_aggregated_dirty_container_count > 0, + ) { + (true, false) => { + aggregated_count_update = -1; + } + (false, true) => { + aggregated_count_update = 1; + } + _ => {} + } + } else { + new_aggregated_dirty_container_count = get!( + task, + AggregatedDirtyContainer { + task: dirty_container_id + } + ) + .copied() + .unwrap_or_default(); + old_aggregated_dirty_container_count = new_aggregated_dirty_container_count; + } + + if *current_session_clean_update != 0 { + new_current_session_clean_count = update_count_and_get!( + task, + AggregatedSessionDependentCleanContainer { + task: dirty_container_id, + session_id: current_session_id + }, + *current_session_clean_update + ); + old_current_session_clean_count = + new_current_session_clean_count - *current_session_clean_update; + } else { + new_current_session_clean_count = get!( + task, + AggregatedSessionDependentCleanContainer { + task: dirty_container_id, + session_id: current_session_id + } + ) + .copied() + .unwrap_or_default(); + old_current_session_clean_count = new_current_session_clean_count; + } + + let was_clean = old_aggregated_dirty_container_count > 0 + && old_aggregated_dirty_container_count - old_current_session_clean_count <= 0; + let is_clean = new_aggregated_dirty_container_count > 0 + && new_aggregated_dirty_container_count - new_current_session_clean_count <= 0; + match (was_clean, is_clean) { + (true, false) => { + aggregated_current_session_clean_update = -1; + } + (false, true) => { + aggregated_current_session_clean_update = 1; } + _ => {} + } + + let aggregated_update = DirtyContainerCount::from_current_session_clean( + aggregated_count_update, + current_session_id, + aggregated_current_session_clean_update, ); if !aggregated_update.is_zero() { @@ -1273,7 +1341,7 @@ impl AggregationUpdateQueue { // this would already be scheduled by the `Activeness` let is_active_until_clean = get!(task, Activeness).is_some_and(|a| a.active_until_clean); if !is_active_until_clean { - let mut dirty_containers = iter_many!(task, AggregatedDirtyContainer { task } count if count.get(session_id) > 0 => task).peekable(); + let mut dirty_containers = task.dirty_containers(session_id).peekable(); let is_empty = dirty_containers.peek().is_none(); if !is_empty || dirty { self.extend_find_and_schedule_dirty(dirty_containers); diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 443ee182e5df2..3f154b0f5973b 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -430,6 +430,33 @@ pub trait TaskGuard: Debug { )), } } + fn dirty_containers(&self, session_id: SessionId) -> impl Iterator { + self.dirty_containers_with_count(session_id) + .map(|(task_id, _)| task_id) + } + fn dirty_containers_with_count( + &self, + session_id: SessionId, + ) -> impl Iterator { + iter_many!(self, AggregatedDirtyContainer { task } count => (task, *count)).filter( + move |&(task_id, count)| { + if count > 0 { + let clean_count = get!( + self, + AggregatedSessionDependentCleanContainer { + task: task_id, + session_id + } + ) + .copied() + .unwrap_or_default(); + count > clean_count + } else { + false + } + }, + ) + } } pub struct TaskGuardImpl<'a, B: BackingStorage> { diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index aeb0e076ad621..755f84cc3d9df 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -1096,6 +1096,37 @@ macro_rules! update_count { }; } +macro_rules! update_count_and_get { + ($task:ident, $key:ident $input:tt, -$update:expr) => {{ + let update = $update; + let mut new = 0; + $crate::backend::storage::update!($task, $key $input, |old: Option<_>| { + let old = old.unwrap_or(0); + new = old - update; + (new != 0).then_some(new) + }); + new + }}; + ($task:ident, $key:ident $input:tt, $update:expr) => { + match $update { + update => { + let mut new = 0; + $crate::backend::storage::update!($task, $key $input, |old: Option<_>| { + let old = old.unwrap_or(0); + new = old + update; + (new != 0).then_some(new) + }); + new + } + } + }; + ($task:ident, $key:ident, -$update:expr) => { + $crate::backend::storage::update_count_and_get!($task, $key {}, -$update) + }; ($task:ident, $key:ident, $update:expr) => { + $crate::backend::storage::update_count_and_get!($task, $key {}, $update) + }; +} + macro_rules! remove { ($task:ident, $key:ident $input:tt) => {{ #[allow(unused_imports)] @@ -1122,6 +1153,7 @@ pub(crate) use iter_many; pub(crate) use remove; pub(crate) use update; pub(crate) use update_count; +pub(crate) use update_count_and_get; pub struct SnapshotGuard<'l> { storage: &'l Storage, diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index 47db822e94bc3..02b4dbb18d677 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -653,7 +653,12 @@ pub enum CachedDataItem { // Aggregated Data AggregatedDirtyContainer { task: TaskId, - value: DirtyContainerCount, + value: i32, + }, + AggregatedSessionDependentCleanContainer { + task: TaskId, + session_id: SessionId, + value: i32, }, AggregatedCollectible { collectible: CollectibleRef, @@ -734,6 +739,9 @@ impl CachedDataItem { CachedDataItem::Follower { task, .. } => !task.is_transient(), CachedDataItem::Upper { task, .. } => !task.is_transient(), CachedDataItem::AggregatedDirtyContainer { task, .. } => !task.is_transient(), + CachedDataItem::AggregatedSessionDependentCleanContainer { task, .. } => { + !task.is_transient() + } CachedDataItem::AggregatedCollectible { collectible, .. } => { !collectible.cell.task.is_transient() } @@ -808,6 +816,7 @@ impl CachedDataItem { | Self::Child { .. } | Self::Upper { .. } | Self::AggregatedDirtyContainer { .. } + | Self::AggregatedSessionDependentCleanContainer { .. } | Self::AggregatedCollectible { .. } | Self::AggregatedDirtyContainerCount { .. } | Self::Stateful { .. } @@ -852,6 +861,9 @@ impl CachedDataItemKey { CachedDataItemKey::Follower { task, .. } => !task.is_transient(), CachedDataItemKey::Upper { task, .. } => !task.is_transient(), CachedDataItemKey::AggregatedDirtyContainer { task, .. } => !task.is_transient(), + CachedDataItemKey::AggregatedSessionDependentCleanContainer { task, .. } => { + !task.is_transient() + } CachedDataItemKey::AggregatedCollectible { collectible, .. } => { !collectible.cell.task.is_transient() } @@ -894,6 +906,7 @@ impl CachedDataItemType { | Self::Child { .. } | Self::Upper { .. } | Self::AggregatedDirtyContainer { .. } + | Self::AggregatedSessionDependentCleanContainer { .. } | Self::AggregatedCollectible { .. } | Self::AggregatedDirtyContainerCount { .. } | Self::Stateful { .. } @@ -930,6 +943,7 @@ impl CachedDataItemType { | Self::Follower | Self::Upper | Self::AggregatedDirtyContainer + | Self::AggregatedSessionDependentCleanContainer | Self::AggregatedCollectible | Self::AggregatedDirtyContainerCount | Self::Stateful From e2eb3486cea374dc1ff87d32c4cc43c08e2a5421 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 12 Nov 2025 10:16:16 +0100 Subject: [PATCH 2/8] rename and helper --- .../backend/operation/aggregation_update.rs | 75 +++++++++---------- 1 file changed, 35 insertions(+), 40 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index 203109c74478b..51415644078f7 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -324,6 +324,14 @@ impl AggregatedDataUpdate { should_track_activeness: bool, queue: &mut AggregationUpdateQueue, ) -> AggregatedDataUpdate { + fn before_after_to_diff_value(before: bool, after: bool) -> i32 { + match (before, after) { + (true, false) => -1, + (false, true) => 1, + _ => 0, + } + } + let Self { dirty_container_update, collectibles_update, @@ -341,36 +349,25 @@ impl AggregatedDataUpdate { } } + // Update AggregatedDirtyContainer and compute aggregated update let mut aggregated_count_update = 0; - let mut aggregated_current_session_clean_update = 0; - let old_aggregated_dirty_container_count; - let new_aggregated_dirty_container_count; - let old_current_session_clean_count; - let new_current_session_clean_count; - + let old_dirty_container_count; + let new_dirty_container_count; if count != 0 { - new_aggregated_dirty_container_count = update_count_and_get!( + new_dirty_container_count = update_count_and_get!( task, AggregatedDirtyContainer { task: dirty_container_id }, count ); - old_aggregated_dirty_container_count = new_aggregated_dirty_container_count - count; - match ( - old_aggregated_dirty_container_count > 0, - new_aggregated_dirty_container_count > 0, - ) { - (true, false) => { - aggregated_count_update = -1; - } - (false, true) => { - aggregated_count_update = 1; - } - _ => {} - } + old_dirty_container_count = new_dirty_container_count - count; + aggregated_count_update = before_after_to_diff_value( + old_dirty_container_count > 0, + new_dirty_container_count > 0, + ); } else { - new_aggregated_dirty_container_count = get!( + new_dirty_container_count = get!( task, AggregatedDirtyContainer { task: dirty_container_id @@ -378,11 +375,14 @@ impl AggregatedDataUpdate { ) .copied() .unwrap_or_default(); - old_aggregated_dirty_container_count = new_aggregated_dirty_container_count; + old_dirty_container_count = new_dirty_container_count; } + // Update AggregatedSessionDependentCleanContainer + let old_container_current_session_clean_count; + let new_container_current_session_clean_count; if *current_session_clean_update != 0 { - new_current_session_clean_count = update_count_and_get!( + new_container_current_session_clean_count = update_count_and_get!( task, AggregatedSessionDependentCleanContainer { task: dirty_container_id, @@ -390,10 +390,10 @@ impl AggregatedDataUpdate { }, *current_session_clean_update ); - old_current_session_clean_count = - new_current_session_clean_count - *current_session_clean_update; + old_container_current_session_clean_count = + new_container_current_session_clean_count - *current_session_clean_update; } else { - new_current_session_clean_count = get!( + new_container_current_session_clean_count = get!( task, AggregatedSessionDependentCleanContainer { task: dirty_container_id, @@ -402,22 +402,17 @@ impl AggregatedDataUpdate { ) .copied() .unwrap_or_default(); - old_current_session_clean_count = new_current_session_clean_count; + old_container_current_session_clean_count = + new_container_current_session_clean_count; } - let was_clean = old_aggregated_dirty_container_count > 0 - && old_aggregated_dirty_container_count - old_current_session_clean_count <= 0; - let is_clean = new_aggregated_dirty_container_count > 0 - && new_aggregated_dirty_container_count - new_current_session_clean_count <= 0; - match (was_clean, is_clean) { - (true, false) => { - aggregated_current_session_clean_update = -1; - } - (false, true) => { - aggregated_current_session_clean_update = 1; - } - _ => {} - } + // compute aggregated update + let was_container_clean = old_dirty_container_count > 0 + && old_dirty_container_count - old_container_current_session_clean_count <= 0; + let is_container_clean = new_dirty_container_count > 0 + && new_dirty_container_count - new_container_current_session_clean_count <= 0; + let aggregated_current_session_clean_update = + before_after_to_diff_value(was_container_clean, is_container_clean); let aggregated_update = DirtyContainerCount::from_current_session_clean( aggregated_count_update, From 468278868227951f2b72d5b389dcfe4f20dc928c Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 13 Nov 2025 12:59:22 +0100 Subject: [PATCH 3/8] update from_task --- .../backend/operation/aggregation_update.rs | 24 +++++++++---------- .../src/backend/operation/mod.rs | 11 +++++++++ 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index 51415644078f7..e734b21114a01 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -273,23 +273,23 @@ impl AggregatedDataUpdate { collectibles_update.push((collectible, 1)); } } - if let Some((dirtyness, clean_in_session)) = task.dirtyness_and_session() { - dirty_container_count.update_with_dirtyness_and_session(dirtyness, clean_in_session); + let mut dirty_count = dirty_container_count.count; + let mut current_session_clean_count = + dirty_container_count.current_session_clean(current_session_id); + let (dirty, current_session_clean) = task.dirty(current_session_id); + if dirty { + dirty_count += 1; + } + if current_session_clean { + current_session_clean_count += 1; } let mut result = Self::new().collectibles_update(collectibles_update); - if !dirty_container_count.is_zero() { + if dirty_count > 0 { result = result.dirty_container_update( task.id(), - if dirty_container_count.count > 0 { - 1 - } else { - 0 - }, - if dirty_container_count.count > 0 - && dirty_container_count.current_session_clean(current_session_id) - >= dirty_container_count.count - { + if dirty_count > 0 { 1 } else { 0 }, + if dirty_count > 0 && dirty_count - current_session_clean_count <= 0 { 1 } else { 0 diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 3f154b0f5973b..3a9a12e6b8b4d 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -430,6 +430,17 @@ pub trait TaskGuard: Debug { )), } } + /// Returns (is_dirty, is_clean_in_current_session) + fn dirty(&self, session_id: SessionId) -> (bool, bool) { + match get!(self, Dirty) { + None => (false, false), + Some(Dirtyness::Dirty) => (true, false), + Some(Dirtyness::SessionDependent) => ( + true, + get!(self, CleanInSession).copied() == Some(session_id), + ), + } + } fn dirty_containers(&self, session_id: SessionId) -> impl Iterator { self.dirty_containers_with_count(session_id) .map(|(task_id, _)| task_id) From 3b3fe14341a055c04760dbf03bcfda24706ef812 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 13 Nov 2025 13:25:44 +0100 Subject: [PATCH 4/8] add dirty_container_count method --- .../turbo-tasks-backend/src/backend/operation/mod.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 3a9a12e6b8b4d..63127a58b7939 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -25,7 +25,7 @@ use crate::{ backing_storage::BackingStorage, data::{ CachedDataItem, CachedDataItemKey, CachedDataItemType, CachedDataItemValue, - CachedDataItemValueRef, CachedDataItemValueRefMut, Dirtyness, + CachedDataItemValueRef, CachedDataItemValueRefMut, DirtyContainerCount, Dirtyness, }, }; @@ -468,6 +468,12 @@ pub trait TaskGuard: Debug { }, ) } + + fn dirty_container_count(&self) -> DirtyContainerCount { + get!(self, AggregatedDirtyContainerCount) + .cloned() + .unwrap_or_default() + } } pub struct TaskGuardImpl<'a, B: BackingStorage> { From 1e2115f30178396cbf37bdeb9bc366c3e68561ec Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 13 Nov 2025 16:57:50 +0100 Subject: [PATCH 5/8] Allow infinite test runs --- .../crates/turbo-tasks-testing/src/run.rs | 61 +++++++++++-------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/turbopack/crates/turbo-tasks-testing/src/run.rs b/turbopack/crates/turbo-tasks-testing/src/run.rs index 565c80e31606f..9f329358920fd 100644 --- a/turbopack/crates/turbo-tasks-testing/src/run.rs +++ b/turbopack/crates/turbo-tasks-testing/src/run.rs @@ -98,38 +98,45 @@ where F: Future> + Send + 'static, T: Debug + PartialEq + Eq + TraceRawVcs + Send + 'static, { - let single_run = env::var("SINGLE_RUN").is_ok(); + let infinite_initial_runs = env::var("INFINITE_INITIAL_RUNS").is_ok(); + let infinite_memory_runs = !infinite_initial_runs && env::var("INFINITE_MEMORY_RUNS").is_ok(); + let single_run = infinite_initial_runs || env::var("SINGLE_RUN").is_ok(); let name = closure_to_name(&fut); - let tt = registration.create_turbo_tasks(&name, true); - println!("Run #1 (without cache)"); - let start = std::time::Instant::now(); - let first = fut(tt.clone()).await?; - println!("Run #1 took {:?}", start.elapsed()); - if !single_run { - for i in 2..10 { - println!("Run #{i} (with memory cache, same TurboTasks instance)"); - let start = std::time::Instant::now(); - let second = fut(tt.clone()).await?; - println!("Run #{i} took {:?}", start.elapsed()); - assert_eq!(first, second); - } - } - let start = std::time::Instant::now(); - tt.stop_and_wait().await; - println!("Stopping TurboTasks took {:?}", start.elapsed()); - if single_run { - return Ok(()); - } - for i in 10..20 { - let tt = registration.create_turbo_tasks(&name, false); - println!("Run #{i} (with filesystem cache if available, new TurboTasks instance)"); + loop { + let tt = registration.create_turbo_tasks(&name, true); + println!("Run #1 (without cache)"); let start = std::time::Instant::now(); - let third = fut(tt.clone()).await?; - println!("Run #{i} took {:?}", start.elapsed()); + let first = fut(tt.clone()).await?; + println!("Run #1 took {:?}", start.elapsed()); + if !single_run { + let max_run = if infinite_memory_runs { usize::MAX } else { 10 }; + for i in 2..max_run { + println!("Run #{i} (with memory cache, same TurboTasks instance)"); + let start = std::time::Instant::now(); + let second = fut(tt.clone()).await?; + println!("Run #{i} took {:?}", start.elapsed()); + assert_eq!(first, second); + } + } let start = std::time::Instant::now(); tt.stop_and_wait().await; println!("Stopping TurboTasks took {:?}", start.elapsed()); - assert_eq!(first, third); + if !single_run { + for i in 10..20 { + let tt = registration.create_turbo_tasks(&name, false); + println!("Run #{i} (with filesystem cache if available, new TurboTasks instance)"); + let start = std::time::Instant::now(); + let third = fut(tt.clone()).await?; + println!("Run #{i} took {:?}", start.elapsed()); + let start = std::time::Instant::now(); + tt.stop_and_wait().await; + println!("Stopping TurboTasks took {:?}", start.elapsed()); + assert_eq!(first, third); + } + } + if !infinite_initial_runs { + break; + } } Ok(()) } From beb61ef007c33778de9ea8725fd665974ecbcd3e Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 13 Nov 2025 17:22:02 +0100 Subject: [PATCH 6/8] Split AggregatedDirtyContainerCount --- .../turbo-tasks-backend/src/backend/mod.rs | 158 ++++---- .../backend/operation/aggregation_update.rs | 212 ++++++++-- .../src/backend/operation/invalidate.rs | 98 +++-- .../src/backend/operation/mod.rs | 24 +- .../crates/turbo-tasks-backend/src/data.rs | 370 +----------------- 5 files changed, 344 insertions(+), 518 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 1fec5147e0287..df1cf1ecf2d64 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -48,8 +48,8 @@ use crate::backend::operation::TaskDirtyCause; use crate::{ backend::{ operation::{ - AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue, - CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl, + AggregationUpdateJob, AggregationUpdateQueue, CleanupOldEdgesOperation, + ComputeDirtyAndCleanUpdate, ConnectChildOperation, ExecuteContext, ExecuteContextImpl, Operation, OutdatedEdge, TaskGuard, connect_children, get_aggregation_number, get_uppers, is_root_node, make_task_dirty_internal, prepare_new_children, }, @@ -560,11 +560,8 @@ impl TurboTasksBackendInner { let is_dirty = task.is_dirty(self.session_id); // Check the dirty count of the root node - let dirty_tasks = get!(task, AggregatedDirtyContainerCount) - .cloned() - .unwrap_or_default() - .get(self.session_id); - if dirty_tasks > 0 || is_dirty { + let has_dirty_containers = task.has_dirty_containers(self.session_id); + if has_dirty_containers || is_dirty { let activeness = get_mut!(task, Activeness); let mut task_ids_to_schedule: Vec<_> = Vec::new(); // When there are dirty task, subscribe to the all_clean_event @@ -627,10 +624,7 @@ impl TurboTasksBackendInner { }; // Check the dirty count of the root node - let dirty_tasks = get!(task, AggregatedDirtyContainerCount) - .cloned() - .unwrap_or_default() - .get(ctx.session_id()); + let has_dirty_containers = task.has_dirty_containers(ctx.session_id()); let task_description = ctx.get_task_description(task_id); let is_dirty = if is_dirty { ", dirty" } else { "" }; @@ -651,8 +645,8 @@ impl TurboTasksBackendInner { info.push_str("\n ERROR: missing upper connection"); } - if dirty_tasks > 0 || !children.is_empty() { - writeln!(info, "\n {dirty_tasks} dirty tasks:").unwrap(); + if has_dirty_containers || !children.is_empty() { + writeln!(info, "\n dirty tasks:").unwrap(); for (child_task_id, count) in children { let task_description = ctx.get_task_description(child_task_id); @@ -2338,72 +2332,96 @@ impl TurboTasksBackendInner { }, )); - // Update the dirty state - let old_dirtyness = task.dirtyness_and_session(); - - let new_dirtyness = if session_dependent { - Some((Dirtyness::SessionDependent, Some(self.session_id))) - } else { - None + // Grab the old dirty state + let old_dirtyness = get!(task, Dirty).cloned(); + let (was_dirty, was_current_session_clean, old_clean_in_session) = match old_dirtyness { + None => (false, false, None), + Some(Dirtyness::Dirty) => (true, false, None), + Some(Dirtyness::SessionDependent) => { + let clean_in_session = get!(task, CleanInSession).copied(); + ( + true, + clean_in_session == Some(self.session_id), + clean_in_session, + ) + } }; + let old_dirty_value = if was_dirty { 1 } else { 0 }; + let old_current_session_clean_value = if was_current_session_clean { 1 } else { 0 }; - let dirty_changed = old_dirtyness != new_dirtyness; - let data_update = if dirty_changed { - if let Some((value, _)) = new_dirtyness { + // Compute the new dirty state + let (new_dirtyness, new_clean_in_session, new_dirty_value, new_current_session_clean_value) = + if session_dependent { + ( + Some(Dirtyness::SessionDependent), + Some(self.session_id), + 1, + 1, + ) + } else { + (None, None, 0, 0) + }; + + // Update the dirty state + if old_dirtyness != new_dirtyness { + if let Some(value) = new_dirtyness { task.insert(CachedDataItem::Dirty { value }); } else if old_dirtyness.is_some() { task.remove(&CachedDataItemKey::Dirty {}); } - if let Some(session_id) = new_dirtyness.and_then(|t| t.1) { + } + if old_clean_in_session != new_clean_in_session { + if let Some(session_id) = new_clean_in_session { task.insert(CachedDataItem::CleanInSession { value: session_id }); - } else if old_dirtyness.is_some_and(|t| t.1.is_some()) { + } else if old_clean_in_session.is_some() { task.remove(&CachedDataItemKey::CleanInSession {}); } + } - let mut dirty_containers = get!(task, AggregatedDirtyContainerCount) + // Propagate dirtyness changes + let data_update = if old_dirty_value != new_dirty_value + || old_current_session_clean_value != new_current_session_clean_value + { + let dirty_container_count = get!(task, AggregatedDirtyContainerCount) .cloned() .unwrap_or_default(); - if let Some((old_dirtyness, old_clean_in_session)) = old_dirtyness { - dirty_containers - .update_with_dirtyness_and_session(old_dirtyness, old_clean_in_session); - } - let aggregated_update = match (old_dirtyness, new_dirtyness) { - (None, None) => unreachable!(), - (Some(old), None) => { - dirty_containers.undo_update_with_dirtyness_and_session(old.0, old.1) - } - (None, Some(new)) => { - dirty_containers.update_with_dirtyness_and_session(new.0, new.1) - } - (Some(old), Some(new)) => { - dirty_containers.replace_dirtyness_and_session(old.0, old.1, new.0, new.1) - } - }; - if !aggregated_update.is_zero() { - if aggregated_update.get(self.session_id) < 0 - && let Some(activeness_state) = get_mut!(task, Activeness) - { - activeness_state.all_clean_event.notify(usize::MAX); - activeness_state.unset_active_until_clean(); - if activeness_state.is_empty() { - task.remove(&CachedDataItemKey::Activeness {}); - } + let current_session_clean_container_count = get!( + task, + AggregatedSessionDependentCleanContainerCount { + session_id: self.session_id } - AggregationUpdateJob::data_update( - &mut task, - AggregatedDataUpdate::new().dirty_container_update( - task_id, - aggregated_update.count, - aggregated_update.current_session_clean(ctx.session_id()), - ), - ) - } else { - None - } + ) + .copied() + .unwrap_or_default(); + let result = ComputeDirtyAndCleanUpdate { + old_dirty_container_count: dirty_container_count, + new_dirty_container_count: dirty_container_count, + old_current_session_clean_container_count: current_session_clean_container_count, + new_current_session_clean_container_count: current_session_clean_container_count, + old_dirty_value, + new_dirty_value, + old_current_session_clean_value, + new_current_session_clean_value, + } + .compute(); + result + .aggregated_update(task_id) + .and_then(|aggregated_update| { + AggregationUpdateJob::data_update(&mut task, aggregated_update) + }) } else { None }; + if let Some(activeness_state) = get_mut!(task, Activeness) { + // The task is clean now + activeness_state.all_clean_event.notify(usize::MAX); + activeness_state.unset_active_until_clean(); + if activeness_state.is_empty() { + task.remove(&CachedDataItemKey::Activeness {}); + } + } + #[cfg(feature = "verify_determinism")] let reschedule = (dirty_changed || no_output_set) && !task_id.is_transient(); #[cfg(not(feature = "verify_determinism"))] @@ -2866,10 +2884,7 @@ impl TurboTasksBackendInner { let mut ctx = self.execute_context(turbo_tasks); let mut task = ctx.task(task_id, TaskDataCategory::All); let is_dirty = task.is_dirty(self.session_id); - let has_dirty_containers = get!(task, AggregatedDirtyContainerCount) - .map_or(false, |dirty_containers| { - dirty_containers.get(self.session_id) > 0 - }); + let has_dirty_containers = task.has_dirty_containers(self.session_id); if is_dirty || has_dirty_containers { if let Some(activeness_state) = get_mut!(task, Activeness) { // We will finish the task, but it would be removed after the task is done @@ -2959,8 +2974,7 @@ impl TurboTasksBackendInner { } let is_dirty = get!(task, Dirty).is_some(); - let has_dirty_container = - get!(task, AggregatedDirtyContainerCount).is_some_and(|count| count.count > 0); + let has_dirty_container = task.has_dirty_containers(self.session_id); let should_be_in_upper = is_dirty || has_dirty_container; let aggregation_number = get_aggregation_number(&task); @@ -2983,17 +2997,19 @@ impl TurboTasksBackendInner { if should_be_in_upper { for upper_id in uppers { - let task = ctx.task(task_id, TaskDataCategory::All); + let task = ctx.task(upper_id, TaskDataCategory::All); let in_upper = get!(task, AggregatedDirtyContainer { task: task_id }) .is_some_and(|&dirty| dirty > 0); if !in_upper { + let containers: Vec<_> = get_many!(task, AggregatedDirtyContainer { task: task_id } value => (task_id, *value)); panic!( "Task {} ({}) is dirty, but is not listed in the upper task {} \ - ({})", + ({})\nThese dirty containers are present:\n{:#?}", task_id, ctx.get_task_description(task_id), upper_id, - ctx.get_task_description(upper_id) + ctx.get_task_description(upper_id), + containers, ); } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs index e734b21114a01..d3813979072cc 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/aggregation_update.rs @@ -34,7 +34,7 @@ use crate::{ }, data::{ ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType, - CollectibleRef, DirtyContainerCount, + CollectibleRef, }, utils::swap_retain, }; @@ -93,6 +93,83 @@ pub fn get_aggregation_number(task: &impl TaskGuard) -> u32 { .unwrap_or_default() } +#[derive(Debug)] +pub struct ComputeDirtyAndCleanUpdate { + pub old_dirty_container_count: i32, + pub new_dirty_container_count: i32, + pub old_current_session_clean_container_count: i32, + pub new_current_session_clean_container_count: i32, + pub old_dirty_value: i32, + pub new_dirty_value: i32, + pub old_current_session_clean_value: i32, + pub new_current_session_clean_value: i32, +} + +pub struct ComputeDirtyAndCleanUpdateResult { + pub dirty_count_update: i32, + pub current_session_clean_update: i32, +} + +impl ComputeDirtyAndCleanUpdate { + pub fn compute(self) -> ComputeDirtyAndCleanUpdateResult { + let ComputeDirtyAndCleanUpdate { + old_dirty_container_count, + new_dirty_container_count, + old_current_session_clean_container_count, + new_current_session_clean_container_count, + old_dirty_value, + new_dirty_value, + old_current_session_clean_value, + new_current_session_clean_value, + } = self; + let was_dirty_without_clean = old_dirty_container_count + old_dirty_value > 0; + let is_dirty_without_clean = new_dirty_container_count + new_dirty_value > 0; + let was_dirty = was_dirty_without_clean + && old_dirty_container_count + old_dirty_value + - old_current_session_clean_container_count + - old_current_session_clean_value + > 0; + let is_dirty = is_dirty_without_clean + && new_dirty_container_count + new_dirty_value + - new_current_session_clean_container_count + - new_current_session_clean_value + > 0; + let was_flagged_clean = was_dirty_without_clean && !was_dirty; + let is_flagged_clean = is_dirty_without_clean && !is_dirty; + + fn before_after_to_diff_value(before: bool, after: bool) -> i32 { + match (before, after) { + (true, false) => -1, + (false, true) => 1, + _ => 0, + } + } + let dirty_count_update = + before_after_to_diff_value(was_dirty_without_clean, is_dirty_without_clean); + let current_session_clean_update = + before_after_to_diff_value(was_flagged_clean, is_flagged_clean); + + ComputeDirtyAndCleanUpdateResult { + dirty_count_update, + current_session_clean_update, + } + } +} + +impl ComputeDirtyAndCleanUpdateResult { + pub fn aggregated_update(&self, task_id: TaskId) -> Option { + if self.dirty_count_update != 0 || self.current_session_clean_update != 0 { + Some(AggregatedDataUpdate::new().dirty_container_update( + task_id, + self.dirty_count_update, + self.current_session_clean_update, + )) + } else { + None + } + } +} + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct InnerOfUppersHasNewFollowersJob { pub upper_ids: TaskIdVec, @@ -254,13 +331,22 @@ impl AggregatedDataUpdate { /// upper task. fn from_task(task: &mut impl TaskGuard, current_session_id: SessionId) -> Self { let aggregation = get_aggregation_number(task); - let mut dirty_container_count = Default::default(); + let mut dirty_count = 0; + let mut current_session_clean_count = 0; let mut collectibles_update: Vec<_> = get_many!(task, Collectible { collectible } count => (collectible, *count)); if is_aggregating_node(aggregation) { - dirty_container_count = get!(task, AggregatedDirtyContainerCount) - .cloned() + dirty_count = get!(task, AggregatedDirtyContainerCount) + .copied() .unwrap_or_default(); + current_session_clean_count = get!( + task, + AggregatedSessionDependentCleanContainerCount { + session_id: current_session_id + } + ) + .copied() + .unwrap_or_default(); let collectibles = iter_many!( task, AggregatedCollectible { @@ -273,9 +359,6 @@ impl AggregatedDataUpdate { collectibles_update.push((collectible, 1)); } } - let mut dirty_count = dirty_container_count.count; - let mut current_session_clean_count = - dirty_container_count.current_session_clean(current_session_id); let (dirty, current_session_clean) = task.dirty(current_session_id); if dirty { dirty_count += 1; @@ -414,40 +497,91 @@ impl AggregatedDataUpdate { let aggregated_current_session_clean_update = before_after_to_diff_value(was_container_clean, is_container_clean); - let aggregated_update = DirtyContainerCount::from_current_session_clean( - aggregated_count_update, - current_session_id, - aggregated_current_session_clean_update, - ); + if aggregated_count_update != 0 || aggregated_current_session_clean_update != 0 { + let (is_dirty, current_session_clean) = task.dirty(current_session_id); + let dirty_value = if is_dirty { 1 } else { 0 }; + let clean_value = if current_session_clean { 1 } else { 0 }; - if !aggregated_update.is_zero() { - let dirtyness_and_session = task.dirtyness_and_session(); let task_id = task.id(); - update!(task, AggregatedDirtyContainerCount, |old: Option< - DirtyContainerCount, - >| { - let mut new = old.unwrap_or_default(); - if let Some((dirtyness, clean_in_session)) = dirtyness_and_session { - new.update_with_dirtyness_and_session(dirtyness, clean_in_session); - } - let aggregated_update = new.update_count(&aggregated_update); - if let Some((dirtyness, clean_in_session)) = dirtyness_and_session { - new.undo_update_with_dirtyness_and_session(dirtyness, clean_in_session); - } - if !aggregated_update.is_zero() { - result.dirty_container_update = Some(( - task_id, - aggregated_update.count, - SessionDependent::new( - aggregated_update.current_session_clean(current_session_id), - ), - )); - } - (!new.is_zero()).then_some(new) - }); - if let Some((_, count, current_session_clean)) = result.dirty_container_update - && count - *current_session_clean < 0 - { + + // Update AggregatedDirtyContainerCount and compute aggregate value + let old_aggregated_dirty_container_count; + let new_aggregated_dirty_container_count; + if aggregated_count_update != 0 { + new_aggregated_dirty_container_count = update_count_and_get!( + task, + AggregatedDirtyContainerCount, + aggregated_count_update + ); + old_aggregated_dirty_container_count = + new_aggregated_dirty_container_count - aggregated_count_update; + } else { + new_aggregated_dirty_container_count = + get!(task, AggregatedDirtyContainerCount) + .copied() + .unwrap_or_default(); + old_aggregated_dirty_container_count = new_aggregated_dirty_container_count; + }; + + let was_dirty_without_clean = + old_aggregated_dirty_container_count + dirty_value > 0; + let is_dirty_without_clean = new_aggregated_dirty_container_count + dirty_value > 0; + + let aggregated_count_update = + before_after_to_diff_value(was_dirty_without_clean, is_dirty_without_clean); + + // Update AggregatedSessionDependentCleanContainerCount and compute aggregate value + let new_aggregated_current_session_clean_container_count; + let old_aggregated_current_session_clean_container_count; + if aggregated_current_session_clean_update != 0 { + new_aggregated_current_session_clean_container_count = update_count_and_get!( + task, + AggregatedSessionDependentCleanContainerCount { + session_id: current_session_id + }, + aggregated_current_session_clean_update + ); + old_aggregated_current_session_clean_container_count = + new_aggregated_current_session_clean_container_count + - aggregated_current_session_clean_update; + } else { + new_aggregated_current_session_clean_container_count = get!( + task, + AggregatedSessionDependentCleanContainerCount { + session_id: current_session_id + } + ) + .copied() + .unwrap_or_default(); + old_aggregated_current_session_clean_container_count = + new_aggregated_current_session_clean_container_count; + }; + + let was_dirty = was_dirty_without_clean + && old_aggregated_dirty_container_count + dirty_value + - old_aggregated_current_session_clean_container_count + - clean_value + > 0; + let is_dirty = is_dirty_without_clean + && new_aggregated_dirty_container_count + dirty_value + - new_aggregated_current_session_clean_container_count + - clean_value + > 0; + + let was_flagged_clean = was_dirty_without_clean && !was_dirty; + let is_flagged_clean = is_dirty_without_clean && !is_dirty; + let aggregated_current_session_clean_update = + before_after_to_diff_value(was_flagged_clean, is_flagged_clean); + + if aggregated_count_update != 0 || aggregated_current_session_clean_update != 0 { + result.dirty_container_update = Some(( + task_id, + aggregated_count_update, + SessionDependent::new(aggregated_current_session_clean_update), + )); + } + + if was_dirty && !is_dirty { // When the current task is no longer dirty, we need to fire the // aggregate root events and do some cleanup if let Some(activeness_state) = get_mut!(task, Activeness) { diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs index da1f5f03093cd..3eba0ba06c3d7 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs @@ -8,7 +8,7 @@ use crate::{ operation::{ ExecuteContext, Operation, TaskGuard, aggregation_update::{ - AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue, + AggregationUpdateJob, AggregationUpdateQueue, ComputeDirtyAndCleanUpdate, }, }, storage::{get, get_mut, remove}, @@ -234,7 +234,7 @@ pub fn make_task_dirty_internal( let old = task.insert(CachedDataItem::Dirty { value: Dirtyness::Dirty, }); - let mut dirty_container = match old { + let (old_dirty_value, old_current_session_clean_value) = match old { Some(CachedDataItemValue::Dirty { value: Dirtyness::Dirty, }) => { @@ -252,38 +252,48 @@ pub fn make_task_dirty_internal( Some(CachedDataItemValue::Dirty { value: Dirtyness::SessionDependent, }) => { + // It was a session-dependent dirty before, so we need to remove that clean count let old = remove!(task, CleanInSession); - match old { - None => { - #[cfg(feature = "trace_task_dirty")] - let _span = tracing::trace_span!( - "session-dependent task already dirty", - name = ctx.get_task_description(task_id), - cause = %TaskDirtyCauseInContext::new(&cause, ctx) - ) - .entered(); - // already dirty - return; - } - Some(session_id) => { - // Got dirty in that one session only - let mut dirty_container = get!(task, AggregatedDirtyContainerCount) - .cloned() - .unwrap_or_default(); - dirty_container.update_session_dependent(session_id, 1); - dirty_container - } + if let Some(session_id) = old + && session_id == ctx.session_id() + { + // There was a clean count for a session. If it was the current session, we need to + // propagate that change. + (1, 1) + } else { + #[cfg(feature = "trace_task_dirty")] + let _span = tracing::trace_span!( + "session-dependent task already dirty", + name = ctx.get_task_description(task_id), + cause = %TaskDirtyCauseInContext::new(&cause, ctx) + ) + .entered(); + // already dirty + return; } } None => { - // Get dirty for all sessions - get!(task, AggregatedDirtyContainerCount) - .cloned() - .unwrap_or_default() + // It was clean before, so we need to increase the dirty count + (0, 0) } _ => unreachable!(), }; + let new_dirty_value = 1; + let new_current_session_clean_value = 0; + + let dirty_container_count = get!(task, AggregatedDirtyContainerCount) + .copied() + .unwrap_or_default(); + let current_session_clean_container_count = get!( + task, + AggregatedSessionDependentCleanContainerCount { + session_id: ctx.session_id(), + } + ) + .copied() + .unwrap_or_default(); + #[cfg(feature = "trace_task_dirty")] let _span = tracing::trace_span!( "make task dirty", @@ -293,21 +303,27 @@ pub fn make_task_dirty_internal( ) .entered(); - let should_schedule = { - let aggregated_update = - dirty_container.update_with_dirtyness_and_session(Dirtyness::Dirty, None); - if !aggregated_update.is_zero() { - queue.extend(AggregationUpdateJob::data_update( - &mut task, - AggregatedDataUpdate::new().dirty_container_update( - task_id, - aggregated_update.count, - aggregated_update.current_session_clean(ctx.session_id()), - ), - )); - } - !ctx.should_track_activeness() || task.has_key(&CachedDataItemKey::Activeness {}) - }; + let result = ComputeDirtyAndCleanUpdate { + old_dirty_container_count: dirty_container_count, + new_dirty_container_count: dirty_container_count, + old_current_session_clean_container_count: current_session_clean_container_count, + new_current_session_clean_container_count: current_session_clean_container_count, + old_dirty_value, + new_dirty_value, + old_current_session_clean_value, + new_current_session_clean_value, + } + .compute(); + + if let Some(aggregated_update) = result.aggregated_update(task_id) { + queue.extend(AggregationUpdateJob::data_update( + &mut task, + aggregated_update, + )); + } + + let should_schedule = + !ctx.should_track_activeness() || task.has_key(&CachedDataItemKey::Activeness {}); if should_schedule { let description = || ctx.get_task_desc_fn(task_id); diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 63127a58b7939..631fc986fa3de 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -25,7 +25,7 @@ use crate::{ backing_storage::BackingStorage, data::{ CachedDataItem, CachedDataItemKey, CachedDataItemType, CachedDataItemValue, - CachedDataItemValueRef, CachedDataItemValueRefMut, DirtyContainerCount, Dirtyness, + CachedDataItemValueRef, CachedDataItemValueRefMut, Dirtyness, }, }; @@ -469,10 +469,20 @@ pub trait TaskGuard: Debug { ) } - fn dirty_container_count(&self) -> DirtyContainerCount { - get!(self, AggregatedDirtyContainerCount) - .cloned() - .unwrap_or_default() + fn has_dirty_containers(&self, session_id: SessionId) -> bool { + let dirty_count = get!(self, AggregatedDirtyContainerCount) + .copied() + .unwrap_or_default(); + if dirty_count <= 0 { + return false; + } + let clean_count = get!( + self, + AggregatedSessionDependentCleanContainerCount { session_id } + ) + .copied() + .unwrap_or_default(); + dirty_count < clean_count } } @@ -799,8 +809,8 @@ impl_operation!(AggregationUpdate aggregation_update::AggregationUpdateQueue); pub use self::invalidate::TaskDirtyCause; pub use self::{ aggregation_update::{ - AggregatedDataUpdate, AggregationUpdateJob, get_aggregation_number, get_uppers, - is_aggregating_node, is_root_node, + AggregatedDataUpdate, AggregationUpdateJob, ComputeDirtyAndCleanUpdate, + get_aggregation_number, get_uppers, is_aggregating_node, is_root_node, }, cleanup_old_edges::OutdatedEdge, connect_children::connect_children, diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index 02b4dbb18d677..54763bf9c70c5 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -1,5 +1,3 @@ -use std::cmp::Ordering; - use rustc_hash::FxHashSet; use serde::{Deserialize, Serialize}; use turbo_tasks::{ @@ -149,363 +147,6 @@ pub enum Dirtyness { SessionDependent, } -fn add_with_diff(v: &mut i32, u: i32) -> i32 { - let old = *v; - *v += u; - if old <= 0 && *v > 0 { - 1 - } else if old > 0 && *v <= 0 { - -1 - } else { - 0 - } -} - -/// Represents a count of dirty containers. Since dirtiness can be session dependent, there might be -/// a different count for a specific session. It only need to store the highest session count, since -/// old sessions can't be visited again, so we can ignore their counts. -#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct DirtyContainerCount { - pub count: i32, - pub count_in_session: Option<(SessionId, i32)>, -} - -impl DirtyContainerCount { - pub fn from_current_session_clean( - count: i32, - current_session_id: SessionId, - current_session_clean: i32, - ) -> DirtyContainerCount { - DirtyContainerCount { - count, - count_in_session: Some((current_session_id, count - current_session_clean)), - } - } - - pub fn current_session_clean(&self, current_session_id: SessionId) -> i32 { - if let Some((s, c)) = self.count_in_session - && s == current_session_id - { - return self.count - c; - } - 0 - } - - /// Get the count for a specific session. It's only expected to be asked for the current - /// session, since old session counts might be dropped. - pub fn get(&self, session: SessionId) -> i32 { - if let Some((s, count)) = self.count_in_session - && s == session - { - return count; - } - self.count - } - - /// Increase/decrease the count by the given value. - pub fn update(&mut self, count: i32) -> DirtyContainerCount { - self.update_count(&DirtyContainerCount { - count, - count_in_session: None, - }) - } - - /// Increase/decrease the count by the given value, but does not update the count for a specific - /// session. This matches the "dirty, but clean in one session" behavior. - pub fn update_session_dependent( - &mut self, - ignore_session: SessionId, - count: i32, - ) -> DirtyContainerCount { - self.update_count(&DirtyContainerCount { - count, - count_in_session: Some((ignore_session, 0)), - }) - } - - /// Adds the `count` to the current count. This correctly handles session dependent counts. - /// Returns a new count object that represents the aggregated count. The aggregated count will - /// be +1 when the self count changes from <= 0 to > 0 and -1 when the self count changes from > - /// 0 to <= 0. The same for the session dependent count. - pub fn update_count(&mut self, count: &DirtyContainerCount) -> DirtyContainerCount { - let mut diff = DirtyContainerCount::default(); - match ( - self.count_in_session.as_mut(), - count.count_in_session.as_ref(), - ) { - (None, None) => {} - (Some((s, c)), None) => { - let d = add_with_diff(c, count.count); - diff.count_in_session = Some((*s, d)); - } - (None, Some((s, c))) => { - let mut new = self.count; - let d = add_with_diff(&mut new, *c); - self.count_in_session = Some((*s, new)); - diff.count_in_session = Some((*s, d)); - } - (Some((s1, c1)), Some((s2, c2))) => match (*s1).cmp(s2) { - Ordering::Less => { - let mut new = self.count; - let d = add_with_diff(&mut new, *c2); - self.count_in_session = Some((*s2, new)); - diff.count_in_session = Some((*s2, d)); - } - Ordering::Equal => { - let d = add_with_diff(c1, *c2); - diff.count_in_session = Some((*s1, d)); - } - Ordering::Greater => { - let d = add_with_diff(c1, count.count); - diff.count_in_session = Some((*s1, d)); - } - }, - } - let d = add_with_diff(&mut self.count, count.count); - diff.count = d; - diff - } - - /// Applies a dirtyness to the count. Returns an aggregated count that represents the change. - pub fn update_with_dirtyness_and_session( - &mut self, - dirtyness: Dirtyness, - clean_in_session: Option, - ) -> DirtyContainerCount { - if let (Dirtyness::SessionDependent, Some(session_id)) = (dirtyness, clean_in_session) { - self.update_session_dependent(session_id, 1) - } else { - self.update(1) - } - } - - /// Undoes the effect of a dirtyness on the count. Returns an aggregated count that represents - /// the change. - pub fn undo_update_with_dirtyness_and_session( - &mut self, - dirtyness: Dirtyness, - clean_in_session: Option, - ) -> DirtyContainerCount { - if let (Dirtyness::SessionDependent, Some(session_id)) = (dirtyness, clean_in_session) { - self.update_session_dependent(session_id, -1) - } else { - self.update(-1) - } - } - - /// Replaces the old dirtyness with the new one. Returns an aggregated count that represents - /// the change. - pub fn replace_dirtyness_and_session( - &mut self, - old_dirtyness: Dirtyness, - old_clean_in_session: Option, - new_dirtyness: Dirtyness, - new_clean_in_session: Option, - ) -> DirtyContainerCount { - let mut diff = - self.undo_update_with_dirtyness_and_session(old_dirtyness, old_clean_in_session); - diff.update_count( - &self.update_with_dirtyness_and_session(new_dirtyness, new_clean_in_session), - ); - diff - } - - /// Returns true if the count is zero and applying it would have no effect - pub fn is_zero(&self) -> bool { - self.count == 0 && self.count_in_session.map(|(_, c)| c == 0).unwrap_or(true) - } - - /// Negates the counts. - pub fn negate(&self) -> Self { - Self { - count: -self.count, - count_in_session: self.count_in_session.map(|(s, c)| (s, -c)), - } - } -} - -#[cfg(test)] -mod dirty_container_count_tests { - use turbo_tasks::SessionId; - - use super::*; - - const SESSION_1: SessionId = unsafe { SessionId::new_unchecked(1) }; - const SESSION_2: SessionId = unsafe { SessionId::new_unchecked(2) }; - const SESSION_3: SessionId = unsafe { SessionId::new_unchecked(3) }; - - #[test] - fn test_update() { - let mut count = DirtyContainerCount::default(); - assert!(count.is_zero()); - - let diff = count.update(1); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 1); - assert_eq!(diff.get(SESSION_1), 1); - assert_eq!(count.get(SESSION_2), 1); - assert_eq!(diff.get(SESSION_2), 1); - - let diff = count.update(-1); - assert!(count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), -1); - assert_eq!(count.get(SESSION_2), 0); - assert_eq!(diff.get(SESSION_2), -1); - - let diff = count.update(2); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 2); - assert_eq!(diff.get(SESSION_1), 1); - assert_eq!(count.get(SESSION_2), 2); - assert_eq!(diff.get(SESSION_2), 1); - - let diff = count.update(-1); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 1); - assert_eq!(diff.get(SESSION_1), 0); - assert_eq!(count.get(SESSION_2), 1); - assert_eq!(diff.get(SESSION_2), 0); - - let diff = count.update(-1); - assert!(count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), -1); - assert_eq!(count.get(SESSION_2), 0); - assert_eq!(diff.get(SESSION_2), -1); - - let diff = count.update(-1); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), -1); - assert_eq!(diff.get(SESSION_1), 0); - assert_eq!(count.get(SESSION_2), -1); - assert_eq!(diff.get(SESSION_2), 0); - - let diff = count.update(2); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 1); - assert_eq!(diff.get(SESSION_1), 1); - assert_eq!(count.get(SESSION_2), 1); - assert_eq!(diff.get(SESSION_2), 1); - - let diff = count.update(-2); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), -1); - assert_eq!(diff.get(SESSION_1), -1); - assert_eq!(count.get(SESSION_2), -1); - assert_eq!(diff.get(SESSION_2), -1); - - let diff = count.update(1); - assert!(count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), 0); - assert_eq!(count.get(SESSION_2), 0); - assert_eq!(diff.get(SESSION_2), 0); - } - - #[test] - fn test_session_dependent() { - let mut count = DirtyContainerCount::default(); - assert!(count.is_zero()); - - let diff = count.update_session_dependent(SESSION_1, 1); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), 0); - assert_eq!(count.get(SESSION_2), 1); - assert_eq!(diff.get(SESSION_2), 1); - - let diff = count.update_session_dependent(SESSION_1, -1); - assert!(count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), 0); - assert_eq!(count.get(SESSION_2), 0); - assert_eq!(diff.get(SESSION_2), -1); - - let diff = count.update_session_dependent(SESSION_1, 2); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), 0); - assert_eq!(count.get(SESSION_2), 2); - assert_eq!(diff.get(SESSION_2), 1); - - let diff = count.update_session_dependent(SESSION_2, -2); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), -1); - assert_eq!(count.get(SESSION_2), 2); - assert_eq!(diff.get(SESSION_2), 0); - assert_eq!(count.get(SESSION_3), 0); - assert_eq!(diff.get(SESSION_3), -1); - } - - #[test] - fn test_update_with_dirtyness_and_session() { - let mut count = DirtyContainerCount::default(); - let diff = count.update_with_dirtyness_and_session(Dirtyness::Dirty, None); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 1); - assert_eq!(diff.get(SESSION_1), 1); - assert_eq!(count.get(SESSION_2), 1); - assert_eq!(diff.get(SESSION_2), 1); - - let diff = count.undo_update_with_dirtyness_and_session(Dirtyness::Dirty, None); - assert!(count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), -1); - assert_eq!(count.get(SESSION_2), 0); - assert_eq!(diff.get(SESSION_2), -1); - - let mut count = DirtyContainerCount::default(); - let diff = - count.update_with_dirtyness_and_session(Dirtyness::SessionDependent, Some(SESSION_1)); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), 0); - assert_eq!(count.get(SESSION_2), 1); - assert_eq!(diff.get(SESSION_2), 1); - - let diff = count - .undo_update_with_dirtyness_and_session(Dirtyness::SessionDependent, Some(SESSION_1)); - assert!(count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), 0); - assert_eq!(count.get(SESSION_2), 0); - assert_eq!(diff.get(SESSION_2), -1); - } - - #[test] - fn test_replace_dirtyness_and_session() { - let mut count = DirtyContainerCount::default(); - count.update_with_dirtyness_and_session(Dirtyness::Dirty, None); - let diff = count.replace_dirtyness_and_session( - Dirtyness::Dirty, - None, - Dirtyness::SessionDependent, - Some(SESSION_1), - ); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 0); - assert_eq!(diff.get(SESSION_1), -1); - assert_eq!(count.get(SESSION_2), 1); - assert_eq!(diff.get(SESSION_2), 0); - - let mut count = DirtyContainerCount::default(); - count.update_with_dirtyness_and_session(Dirtyness::SessionDependent, Some(SESSION_1)); - let diff = count.replace_dirtyness_and_session( - Dirtyness::SessionDependent, - Some(SESSION_1), - Dirtyness::Dirty, - None, - ); - assert!(!count.is_zero()); - assert_eq!(count.get(SESSION_1), 1); - assert_eq!(diff.get(SESSION_1), 1); - assert_eq!(count.get(SESSION_2), 1); - assert_eq!(diff.get(SESSION_2), 0); - } -} - #[derive(Debug, Clone, Copy)] pub enum RootType { RootTask, @@ -665,7 +306,11 @@ pub enum CachedDataItem { value: i32, }, AggregatedDirtyContainerCount { - value: DirtyContainerCount, + value: i32, + }, + AggregatedSessionDependentCleanContainerCount { + session_id: SessionId, + value: i32, }, // Flags @@ -746,6 +391,7 @@ impl CachedDataItem { !collectible.cell.task.is_transient() } CachedDataItem::AggregatedDirtyContainerCount { .. } => true, + CachedDataItem::AggregatedSessionDependentCleanContainerCount { .. } => true, CachedDataItem::Stateful { .. } => true, CachedDataItem::HasInvalidator { .. } => true, CachedDataItem::Immutable { .. } => true, @@ -819,6 +465,7 @@ impl CachedDataItem { | Self::AggregatedSessionDependentCleanContainer { .. } | Self::AggregatedCollectible { .. } | Self::AggregatedDirtyContainerCount { .. } + | Self::AggregatedSessionDependentCleanContainerCount { .. } | Self::Stateful { .. } | Self::HasInvalidator { .. } | Self::Immutable { .. } @@ -868,6 +515,7 @@ impl CachedDataItemKey { !collectible.cell.task.is_transient() } CachedDataItemKey::AggregatedDirtyContainerCount { .. } => true, + CachedDataItemKey::AggregatedSessionDependentCleanContainerCount { .. } => true, CachedDataItemKey::Stateful { .. } => true, CachedDataItemKey::HasInvalidator { .. } => true, CachedDataItemKey::Immutable { .. } => true, @@ -909,6 +557,7 @@ impl CachedDataItemType { | Self::AggregatedSessionDependentCleanContainer { .. } | Self::AggregatedCollectible { .. } | Self::AggregatedDirtyContainerCount { .. } + | Self::AggregatedSessionDependentCleanContainerCount { .. } | Self::Stateful { .. } | Self::HasInvalidator { .. } | Self::Immutable { .. } @@ -946,6 +595,7 @@ impl CachedDataItemType { | Self::AggregatedSessionDependentCleanContainer | Self::AggregatedCollectible | Self::AggregatedDirtyContainerCount + | Self::AggregatedSessionDependentCleanContainerCount | Self::Stateful | Self::HasInvalidator | Self::Immutable => true, From 976714cc4b69eca3ff6e809d8bdf2f4e961f9687 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 13 Nov 2025 17:22:23 +0100 Subject: [PATCH 7/8] Allow infinite test runs --- turbopack/crates/turbo-tasks-testing/src/run.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/turbopack/crates/turbo-tasks-testing/src/run.rs b/turbopack/crates/turbo-tasks-testing/src/run.rs index 9f329358920fd..b7e7a3b2ea095 100644 --- a/turbopack/crates/turbo-tasks-testing/src/run.rs +++ b/turbopack/crates/turbo-tasks-testing/src/run.rs @@ -102,19 +102,22 @@ where let infinite_memory_runs = !infinite_initial_runs && env::var("INFINITE_MEMORY_RUNS").is_ok(); let single_run = infinite_initial_runs || env::var("SINGLE_RUN").is_ok(); let name = closure_to_name(&fut); + let mut i = 1; loop { let tt = registration.create_turbo_tasks(&name, true); - println!("Run #1 (without cache)"); + println!("Run #{i} (without cache)"); let start = std::time::Instant::now(); let first = fut(tt.clone()).await?; - println!("Run #1 took {:?}", start.elapsed()); + println!("Run #{i} took {:?}", start.elapsed()); + i += 1; if !single_run { let max_run = if infinite_memory_runs { usize::MAX } else { 10 }; - for i in 2..max_run { + for _ in 0..max_run { println!("Run #{i} (with memory cache, same TurboTasks instance)"); let start = std::time::Instant::now(); let second = fut(tt.clone()).await?; println!("Run #{i} took {:?}", start.elapsed()); + i += 1; assert_eq!(first, second); } } @@ -122,12 +125,13 @@ where tt.stop_and_wait().await; println!("Stopping TurboTasks took {:?}", start.elapsed()); if !single_run { - for i in 10..20 { + for _ in 10..20 { let tt = registration.create_turbo_tasks(&name, false); println!("Run #{i} (with filesystem cache if available, new TurboTasks instance)"); let start = std::time::Instant::now(); let third = fut(tt.clone()).await?; println!("Run #{i} took {:?}", start.elapsed()); + i += 1; let start = std::time::Instant::now(); tt.stop_and_wait().await; println!("Stopping TurboTasks took {:?}", start.elapsed()); From 089131d40261833e1244dda087e864e14cbb6c18 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 12 Nov 2025 09:58:02 +0100 Subject: [PATCH 8/8] improve test case to test session dependent and restoring --- .../tests/emptied_cells.rs | 20 +++-- .../tests/emptied_cells_session_dependent.rs | 82 +++++++++++++++++++ 2 files changed, 95 insertions(+), 7 deletions(-) create mode 100644 turbopack/crates/turbo-tasks-backend/tests/emptied_cells_session_dependent.rs diff --git a/turbopack/crates/turbo-tasks-backend/tests/emptied_cells.rs b/turbopack/crates/turbo-tasks-backend/tests/emptied_cells.rs index d10af67b3e28e..7a23a8d4bfd42 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/emptied_cells.rs +++ b/turbopack/crates/turbo-tasks-backend/tests/emptied_cells.rs @@ -4,19 +4,17 @@ use anyhow::Result; use turbo_tasks::{State, Vc}; -use turbo_tasks_testing::{Registration, register, run_once}; +use turbo_tasks_testing::{Registration, register, run}; static REGISTRATION: Registration = register!(); #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn recompute() { - run_once(®ISTRATION, || async { - let input = ChangingInput { - state: State::new(1), - } - .cell(); + run(®ISTRATION, || async { + let input = get_state().resolve().await?; + input.await?.state.set(0); let output = compute(input); - assert_eq!(*output.await?, 1); + assert_eq!(*output.strongly_consistent().await?, 0); println!("changing input"); input.await?.state.set(10); @@ -44,6 +42,14 @@ async fn recompute() { .unwrap(); } +#[turbo_tasks::function] +fn get_state() -> Vc { + ChangingInput { + state: State::new(0), + } + .cell() +} + #[turbo_tasks::value] struct ChangingInput { state: State, diff --git a/turbopack/crates/turbo-tasks-backend/tests/emptied_cells_session_dependent.rs b/turbopack/crates/turbo-tasks-backend/tests/emptied_cells_session_dependent.rs new file mode 100644 index 0000000000000..b2ad8ec529b13 --- /dev/null +++ b/turbopack/crates/turbo-tasks-backend/tests/emptied_cells_session_dependent.rs @@ -0,0 +1,82 @@ +#![feature(arbitrary_self_types)] +#![feature(arbitrary_self_types_pointers)] +#![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this + +use anyhow::Result; +use turbo_tasks::{State, Vc, mark_session_dependent}; +use turbo_tasks_testing::{Registration, register, run}; + +static REGISTRATION: Registration = register!(); + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn recompute() { + run(®ISTRATION, || async { + let input = get_state().resolve().await?; + input.await?.state.set(0); + let output = compute(input); + assert_eq!(*output.strongly_consistent().await?, 0); + + println!("changing input"); + input.await?.state.set(10); + assert_eq!(*output.strongly_consistent().await?, 10); + + println!("changing input"); + input.await?.state.set(5); + assert_eq!(*output.strongly_consistent().await?, 5); + + println!("changing input"); + input.await?.state.set(20); + assert_eq!(*output.strongly_consistent().await?, 20); + + println!("changing input"); + input.await?.state.set(15); + assert_eq!(*output.strongly_consistent().await?, 15); + + println!("changing input"); + input.await?.state.set(1); + assert_eq!(*output.strongly_consistent().await?, 1); + + anyhow::Ok(()) + }) + .await + .unwrap(); +} + +#[turbo_tasks::function] +fn get_state() -> Vc { + ChangingInput { + state: State::new(0), + } + .cell() +} + +#[turbo_tasks::value] +struct ChangingInput { + state: State, +} + +#[turbo_tasks::function] +async fn compute(input: Vc) -> Result> { + println!("compute()"); + let value = *inner_compute(input).await?; + Ok(Vc::cell(value)) +} + +#[turbo_tasks::function] +async fn inner_compute(input: Vc) -> Result> { + println!("inner_compute()"); + let state_value = *input.await?.state.get(); + let mut last = None; + for i in 0..=state_value { + last = Some(compute2(Vc::cell(i))); + } + Ok(last.unwrap()) +} + +#[turbo_tasks::function] +async fn compute2(input: Vc) -> Result> { + mark_session_dependent(); + println!("compute2()"); + let value = *input.await?; + Ok(Vc::cell(value)) +}