Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 47 additions & 46 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::{
backing_storage::BackingStorage,
data::{
ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
CachedDataItemValueRef, CellRef, CollectibleRef, CollectiblesRef, DirtyState,
CachedDataItemValueRef, CellRef, CollectibleRef, CollectiblesRef, Dirtyness,
InProgressCellState, InProgressState, InProgressStateInner, OutputValue, RootType,
},
utils::{
Expand Down Expand Up @@ -557,8 +557,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
}
}

let is_dirty =
get!(task, Dirty).map_or(false, |dirty_state| dirty_state.get(self.session_id));
let is_dirty = task.is_dirty(self.session_id);

// Check the dirty count of the root node
let dirty_tasks = get!(task, AggregatedDirtyContainerCount)
Expand Down Expand Up @@ -614,8 +613,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
visited: &mut FxHashSet<TaskId>,
) -> String {
let task = ctx.task(task_id, TaskDataCategory::Data);
let is_dirty = get!(task, Dirty)
.map_or(false, |dirty_state| dirty_state.get(ctx.session_id()));
let is_dirty = task.is_dirty(ctx.session_id());
let in_progress =
get!(task, InProgress).map_or("not in progress", |p| match p {
InProgressState::InProgress(_) => "in progress",
Expand Down Expand Up @@ -2356,57 +2354,60 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
));

// Update the dirty state
let old_dirty_state = get!(task, Dirty).copied();
let old_dirtyness = task.dirtyness_and_session();

let new_dirty_state = if session_dependent {
Some(DirtyState {
clean_in_session: Some(self.session_id),
})
let new_dirtyness = if session_dependent {
Some((Dirtyness::SessionDependent, Some(self.session_id)))
} else {
None
};

let dirty_changed = old_dirty_state != new_dirty_state;
let dirty_changed = old_dirtyness != new_dirtyness;
let data_update = if dirty_changed {
if let Some(new_dirty_state) = new_dirty_state {
task.insert(CachedDataItem::Dirty {
value: new_dirty_state,
});
} else {
if let Some((value, _)) = new_dirtyness {
task.insert(CachedDataItem::Dirty { value });
} else if old_dirtyness.is_some() {
task.remove(&CachedDataItemKey::Dirty {});
}
if let Some(session_id) = new_dirtyness.and_then(|t| t.1) {
task.insert(CachedDataItem::CleanInSession { value: session_id });
} else if old_dirtyness.is_some_and(|t| t.1.is_some()) {
task.remove(&CachedDataItemKey::CleanInSession {});
}

if old_dirty_state.is_some() || new_dirty_state.is_some() {
let mut dirty_containers = get!(task, AggregatedDirtyContainerCount)
.cloned()
.unwrap_or_default();
if let Some(old_dirty_state) = old_dirty_state {
dirty_containers.update_with_dirty_state(&old_dirty_state);
let mut dirty_containers = get!(task, AggregatedDirtyContainerCount)
.cloned()
.unwrap_or_default();
if let Some((old_dirtyness, old_clean_in_session)) = old_dirtyness {
dirty_containers
.update_with_dirtyness_and_session(old_dirtyness, old_clean_in_session);
}
let aggregated_update = match (old_dirtyness, new_dirtyness) {
(None, None) => unreachable!(),
(Some(old), None) => {
dirty_containers.undo_update_with_dirtyness_and_session(old.0, old.1)
}
let aggregated_update = match (old_dirty_state, new_dirty_state) {
(None, None) => unreachable!(),
(Some(old), None) => dirty_containers.undo_update_with_dirty_state(&old),
(None, Some(new)) => dirty_containers.update_with_dirty_state(&new),
(Some(old), Some(new)) => dirty_containers.replace_dirty_state(&old, &new),
};
if !aggregated_update.is_zero() {
if aggregated_update.get(self.session_id) < 0
&& let Some(activeness_state) = get_mut!(task, Activeness)
{
activeness_state.all_clean_event.notify(usize::MAX);
activeness_state.unset_active_until_clean();
if activeness_state.is_empty() {
task.remove(&CachedDataItemKey::Activeness {});
}
(None, Some(new)) => {
dirty_containers.update_with_dirtyness_and_session(new.0, new.1)
}
(Some(old), Some(new)) => {
dirty_containers.replace_dirtyness_and_session(old.0, old.1, new.0, new.1)
}
};
if !aggregated_update.is_zero() {
if aggregated_update.get(self.session_id) < 0
&& let Some(activeness_state) = get_mut!(task, Activeness)
{
activeness_state.all_clean_event.notify(usize::MAX);
activeness_state.unset_active_until_clean();
if activeness_state.is_empty() {
task.remove(&CachedDataItemKey::Activeness {});
}
AggregationUpdateJob::data_update(
&mut task,
AggregatedDataUpdate::new()
.dirty_container_update(task_id, aggregated_update),
)
} else {
None
}
AggregationUpdateJob::data_update(
&mut task,
AggregatedDataUpdate::new().dirty_container_update(task_id, aggregated_update),
)
} else {
None
}
Expand Down Expand Up @@ -2875,7 +2876,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {

let mut ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task_id, TaskDataCategory::All);
let is_dirty = get!(task, Dirty).map_or(false, |dirty| dirty.get(self.session_id));
let is_dirty = task.is_dirty(self.session_id);
let has_dirty_containers = get!(task, AggregatedDirtyContainerCount)
.map_or(false, |dirty_containers| {
dirty_containers.get(self.session_id) > 0
Expand Down Expand Up @@ -2968,7 +2969,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
}
}

let is_dirty = get!(task, Dirty).is_some_and(|dirty| dirty.get(self.session_id));
let is_dirty = task.is_dirty(self.session_id);
let has_dirty_container = get!(task, AggregatedDirtyContainerCount)
.is_some_and(|count| count.get(self.session_id) > 0);
let should_be_in_upper = is_dirty || has_dirty_container;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ impl AggregatedDataUpdate {
collectibles_update.push((collectible, 1));
}
}
if let Some(dirty) = get!(task, Dirty) {
dirty_container_count.update_with_dirty_state(dirty);
if let Some((dirtyness, clean_in_session)) = task.dirtyness_and_session() {
dirty_container_count.update_with_dirtyness_and_session(dirtyness, clean_in_session);
}

let mut result = Self::new().collectibles_update(collectibles_update);
Expand Down Expand Up @@ -323,18 +323,18 @@ impl AggregatedDataUpdate {
);

if !aggregated_update.is_zero() {
let dirty_state = get!(task, Dirty).copied();
let dirtyness_and_session = task.dirtyness_and_session();
let task_id = task.id();
update!(task, AggregatedDirtyContainerCount, |old: Option<
DirtyContainerCount,
>| {
let mut new = old.unwrap_or_default();
if let Some(dirty_state) = dirty_state {
new.update_with_dirty_state(&dirty_state);
if let Some((dirtyness, clean_in_session)) = dirtyness_and_session {
new.update_with_dirtyness_and_session(dirtyness, clean_in_session);
}
let aggregated_update = new.update_count(&aggregated_update);
if let Some(dirty_state) = dirty_state {
new.undo_update_with_dirty_state(&dirty_state);
if let Some((dirtyness, clean_in_session)) = dirtyness_and_session {
new.undo_update_with_dirtyness_and_session(dirtyness, clean_in_session);
}
if !aggregated_update.is_zero() {
result.dirty_container_update = Some((task_id, aggregated_update));
Expand Down Expand Up @@ -1209,7 +1209,7 @@ impl AggregationUpdateQueue {
) {
let session_id = ctx.session_id();
// Task need to be scheduled if it's dirty or doesn't have output
let dirty = get!(task, Dirty).map_or(false, |d| d.get(session_id));
let dirty = task.is_dirty(session_id);
let should_schedule = if dirty {
Some(TaskExecutionReason::ActivateDirty)
} else if !task.has_key(&CachedDataItemKey::Output {}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ use crate::{
AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
},
},
storage::{get, get_mut},
storage::{get, get_mut, remove},
},
data::{
CachedDataItem, CachedDataItemKey, CachedDataItemValue, DirtyState, InProgressState,
CachedDataItem, CachedDataItemKey, CachedDataItemValue, Dirtyness, InProgressState,
InProgressStateInner,
},
};
Expand Down Expand Up @@ -232,15 +232,11 @@ pub fn make_task_dirty_internal(
*stale = true;
}
let old = task.insert(CachedDataItem::Dirty {
value: DirtyState {
clean_in_session: None,
},
value: Dirtyness::Dirty,
});
let mut dirty_container = match old {
Some(CachedDataItemValue::Dirty {
value: DirtyState {
clean_in_session: None,
},
value: Dirtyness::Dirty,
}) => {
#[cfg(feature = "trace_task_dirty")]
let _span = tracing::trace_span!(
Expand All @@ -254,16 +250,30 @@ pub fn make_task_dirty_internal(
return;
}
Some(CachedDataItemValue::Dirty {
value: DirtyState {
clean_in_session: Some(session_id),
},
value: Dirtyness::SessionDependent,
}) => {
// Got dirty in that one session only
let mut dirty_container = get!(task, AggregatedDirtyContainerCount)
.cloned()
.unwrap_or_default();
dirty_container.update_session_dependent(session_id, 1);
dirty_container
let old = remove!(task, CleanInSession);
match old {
None => {
#[cfg(feature = "trace_task_dirty")]
let _span = tracing::trace_span!(
"session-dependent task already dirty",
name = ctx.get_task_description(task_id),
cause = %TaskDirtyCauseInContext::new(&cause, ctx)
)
.entered();
// already dirty
return;
Comment on lines +259 to +266
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let _span = tracing::trace_span!(
"session-dependent task already dirty",
name = ctx.get_task_description(task_id),
cause = %TaskDirtyCauseInContext::new(&cause, ctx)
)
.entered();
// already dirty
return;
let extra_info = format!(
" Invalidation cause: {}",
TaskDirtyCauseInContext::new(&cause, ctx)
);
#[cfg(not(feature = "trace_task_dirty"))]
let extra_info = "";
panic!(
"Task {} has Dirty::SessionDependent but CleanInSession is missing. This \
violates the data structure invariant and indicates a bug in the storage \
layer or task state management.{extra_info}",
ctx.get_task_description(task_id),
);

When a SessionDependent task is being made fully dirty, if the corresponding CleanInSession entry is missing, the code returns early without updating the aggregations and scheduling the task. This skips critical system state updates.

View Details

Analysis

Missing aggregation update when SessionDependent task lacks CleanInSession entry

What fails: In make_task_dirty_internal(), when a task with Dirty { SessionDependent } is being transitioned to fully dirty but the corresponding CleanInSession entry is missing, the function silently returns without updating AggregatedDirtyContainerCount and scheduling the task.

How to reproduce: This requires the data structure invariant to be violated: Dirty { SessionDependent } must exist without a corresponding CleanInSession { SessionId } entry. This can occur due to bugs in the storage layer, concurrent access issues, or incomplete state transitions.

What happens:

  • Line 234 inserts Dirty { Dirty }, replacing Dirty { SessionDependent }
  • Line 255 attempts to remove CleanInSession
  • If missing (None case), the function returns at line 265-266 without:
    • Updating aggregations via update_with_dirtyness_and_session()
    • Scheduling the task for re-execution
  • The task state becomes inconsistent: marked as fully dirty but aggregation counts not updated

Expected behavior: Per the refactoring commit 08f394e2cf that split DirtyState into separate Dirtyness and CleanInSession items, a data structure invariant was introduced: if Dirty { SessionDependent } exists, CleanInSession must also exist. Violating this invariant should be treated as a bug and exposed with a panic rather than silently skipping critical state updates.

Fix: Convert the None case to a panic that exposes the invariant violation, matching the pattern used elsewhere in the codebase for invariant violations (e.g., immutable task invalidation at line 212).

}
Some(session_id) => {
// Got dirty in that one session only
let mut dirty_container = get!(task, AggregatedDirtyContainerCount)
.cloned()
.unwrap_or_default();
dirty_container.update_session_dependent(session_id, 1);
dirty_container
}
}
}
None => {
// Get dirty for all sessions
Expand All @@ -284,9 +294,8 @@ pub fn make_task_dirty_internal(
.entered();

let should_schedule = {
let aggregated_update = dirty_container.update_with_dirty_state(&DirtyState {
clean_in_session: None,
});
let aggregated_update =
dirty_container.update_with_dirtyness_and_session(Dirtyness::Dirty, None);
if !aggregated_update.is_zero() {
queue.extend(AggregationUpdateJob::data_update(
&mut task,
Expand Down
19 changes: 17 additions & 2 deletions turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ use crate::{
backend::{
OperationGuard, TaskDataCategory, TransientTask, TurboTasksBackend, TurboTasksBackendInner,
TurboTasksBackendJob,
storage::{SpecificTaskDataCategory, StorageWriteGuard, iter_many},
storage::{SpecificTaskDataCategory, StorageWriteGuard, get, iter_many},
},
backing_storage::BackingStorage,
data::{
CachedDataItem, CachedDataItemKey, CachedDataItemType, CachedDataItemValue,
CachedDataItemValueRef, CachedDataItemValueRefMut,
CachedDataItemValueRef, CachedDataItemValueRefMut, Dirtyness,
},
};

Expand Down Expand Up @@ -415,6 +415,21 @@ pub trait TaskGuard: Debug {
fn invalidate_serialization(&mut self);
fn prefetch(&mut self) -> Option<FxIndexMap<TaskId, bool>>;
fn is_immutable(&self) -> bool;
fn is_dirty(&self, session_id: SessionId) -> bool {
get!(self, Dirty).is_some_and(|dirtyness| match dirtyness {
Dirtyness::Dirty => true,
Dirtyness::SessionDependent => get!(self, CleanInSession).copied() != Some(session_id),
})
}
fn dirtyness_and_session(&self) -> Option<(Dirtyness, Option<SessionId>)> {
match get!(self, Dirty)? {
Dirtyness::Dirty => Some((Dirtyness::Dirty, None)),
Dirtyness::SessionDependent => Some((
Dirtyness::SessionDependent,
get!(self, CleanInSession).copied(),
)),
}
}
}

pub struct TaskGuardImpl<'a, B: BackingStorage> {
Expand Down
Loading
Loading